如何使用PySpark的FP-growth与RDD?

我想使用FP-growth来确定下面的RDD中是否存在相关的关联规则。我根据文档尝试了以下方法:

sqlContext = SQLContext(sc)spark_df = sqlContext.createDataFrame(pandas_df[['Category','Descript', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address']])spark_df.show(2)+--------------+--------------------+---------+----------+--------------+------------------+|      Category|            Descript|DayOfWeek|PdDistrict|    Resolution|           Address|+--------------+--------------------+---------+----------+--------------+------------------+|      WARRANTS|      WARRANT ARREST|Wednesday|  NORTHERN|ARREST, BOOKED|OAK ST / LAGUNA ST||OTHER OFFENSES|TRAFFIC VIOLATION...|Wednesday|  NORTHERN|ARREST, BOOKED|OAK ST / LAGUNA ST|+--------------+--------------------+---------+----------+--------------+------------------+仅显示前两行from pyspark.mllib.fpm import FPGrowthmodel = FPGrowth.train(spark_df.rdd, minSupport=0.2, numPartitions=10)result = model.freqItemsets().collect()for fi in result:    print(fi)

然而,我遇到了以下异常:

---------------------------------------------------------------------------Py4JJavaError                             Traceback (most recent call last)<ipython-input-7-fa62e885b01c> in <module>()      4 #transactions = spark_df.map(lambda line: line.strip().split(' '))      5 ----> 6 model = FPGrowth.train(spark_df.rdd, minSupport=0.2, numPartitions=10)      7       8 result = model.freqItemsets().collect()/Users/user/spark-1.6.2-bin-hadoop2.6/python/pyspark/mllib/fpm.py in train(cls, data, minSupport, numPartitions)     75             parallel FP-growth (default: same as input data).     76         """---> 77         model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartitions))     78         return FPGrowthModel(model)     79 /Users/user/spark-1.6.2-bin-hadoop2.6/python/pyspark/mllib/common.py in callMLlibFunc(name, *args)    128     sc = SparkContext.getOrCreate()    129     api = getattr(sc._jvm.PythonMLLibAPI(), name)--> 130     return callJavaFunc(sc, api, *args)    131     132 /Users/user/spark-1.6.2-bin-hadoop2.6/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)    121     """ Call Java Function """    122     args = [_py2java(sc, a) for a in args]--> 123     return _java2py(sc, func(*args))    124     125 /Users/user/spark-1.6.2-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)    811         answer = self.gateway_client.send_command(command)    812         return_value = get_return_value(--> 813             answer, self.gateway_client, self.target_id, self.name)    814     815         for temp_arg in temp_args:/Users/user/spark-1.6.2-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw)     43     def deco(*a, **kw):     44         try:---> 45             return f(*a, **kw)     46         except py4j.protocol.Py4JJavaError as e:     47             s = e.java_exception.toString()/Users/user/spark-1.6.2-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)    306                 raise Py4JJavaError(    307                     "An error occurred while calling {0}{1}{2}.\n".--> 308                     format(target_id, ".", name), value)    309             else:    310                 raise Py4JError(Py4JJavaError: An error occurred while calling o76.trainFPGrowthModel.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)    at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$2.apply(PythonMLLibAPI.scala:1507)    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$2.apply(PythonMLLibAPI.scala:1506)    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    at java.lang.Thread.run(Thread.java:745)Driver stacktrace:    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)    at scala.Option.foreach(Option.scala:236)    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)    at org.apache.spark.rdd.RDD.count(RDD.scala:1157)    at org.apache.spark.mllib.fpm.FPGrowth.run(FPGrowth.scala:114)    at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainFPGrowthModel(PythonMLLibAPI.scala:565)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:498)    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)    at py4j.Gateway.invoke(Gateway.java:259)    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)    at py4j.commands.CallCommand.execute(CallCommand.java:79)    at py4j.GatewayConnection.run(GatewayConnection.java:209)    at java.lang.Thread.run(Thread.java:745)Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)    at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$2.apply(PythonMLLibAPI.scala:1507)    at org.apache.spark.mllib.api.python.SerDe$$anonfun$pythonToJava$1$$anonfun$apply$2.apply(PythonMLLibAPI.scala:1506)    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1631)    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    ... 1 more

因此,正确使用FP-Growth实现的方法是什么?


回答:

这是错误的:transactions = spark_df.map(lambda line: line.strip().split(' '))。删除此行并尝试:

>>> FPGrowth.train(... spark_df.rdd.map(lambda x: list(set(x))),... minSupport=0.2, numPartitions=10)

这应该能提供一个解决方案。

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注