我已经尝试了几天在Spark的map函数中运行机器学习算法。我发布了一个更具体的问题,但引用Spark的机器学习算法时,我得到了以下错误:
AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?
显然,我无法在apply_classifier
函数内部引用SparkContext
。我的代码与我之前提问时建议的类似,但仍然没有找到我想要的解决方案:
def apply_classifier(clf): dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3) if clf == 0: clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3) elif clf == 1: clf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=5)classifiers = [0, 1]sc.parallelize(classifiers).map(lambda x: apply_classifier(x)).collect()
我尝试使用flatMap
替代map
,但得到了NoneType object is not iterable
的错误。
我还希望将广播的数据集(这是一个DataFrame)作为参数传递到apply_classifier
函数中。最后,我尝试做的事情是否可能?有什么替代方案吗?
回答:
我尝试做的事情是否可能?
不可能。Apache Spark不支持任何形式的嵌套,分布式操作只能由驱动程序初始化。这包括访问分布式数据结构,如Spark的DataFrame
。
有什么替代方案吗?
这取决于许多因素,如数据大小、可用资源量和算法选择。一般来说,你有三种选择:
-
仅将Spark用作任务管理工具来训练本地、非分布式模型。看起来你已经在某种程度上探索了这条路径。对于这种方法的更高级实现,你可以查看
spark-sklearn
。一般来说,当数据相对较小时,这种方法特别有用。它的优势在于多个作业之间没有竞争。
-
使用标准多线程工具从单一上下文提交多个独立作业。例如,你可以使用
threading
或joblib
。虽然这种方法是可能的,但我实际上不推荐这样做。并非所有Spark组件都是线程安全的,你必须非常小心以避免意外的行为。它也几乎不提供对资源分配的控制。
-
参数化你的Spark应用程序,并使用外部管道管理器(如Apache Airflow、Luigi、Toil)来提交你的作业。
虽然这种方法有一些缺点(它将需要将数据保存到持久存储中),但它也是最通用和最robust的,并且对资源分配提供了很多控制。