在Spark的map函数中运行机器学习算法

我已经尝试了几天在Spark的map函数中运行机器学习算法。我发布了一个更具体的问题,但引用Spark的机器学习算法时,我得到了以下错误:

AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?

显然,我无法在apply_classifier函数内部引用SparkContext。我的代码与我之前提问时建议的类似,但仍然没有找到我想要的解决方案:

def apply_classifier(clf):    dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)    if clf == 0:        clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=3)    elif clf == 1:        clf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=5)classifiers = [0, 1]sc.parallelize(classifiers).map(lambda x: apply_classifier(x)).collect() 

我尝试使用flatMap替代map,但得到了NoneType object is not iterable的错误。

我还希望将广播的数据集(这是一个DataFrame)作为参数传递到apply_classifier函数中。最后,我尝试做的事情是否可能?有什么替代方案吗?


回答:

我尝试做的事情是否可能?

不可能。Apache Spark不支持任何形式的嵌套,分布式操作只能由驱动程序初始化。这包括访问分布式数据结构,如Spark的DataFrame

有什么替代方案吗?

这取决于许多因素,如数据大小、可用资源量和算法选择。一般来说,你有三种选择:

  • 仅将Spark用作任务管理工具来训练本地、非分布式模型。看起来你已经在某种程度上探索了这条路径。对于这种方法的更高级实现,你可以查看spark-sklearn

    一般来说,当数据相对较小时,这种方法特别有用。它的优势在于多个作业之间没有竞争。

  • 使用标准多线程工具从单一上下文提交多个独立作业。例如,你可以使用threadingjoblib

    虽然这种方法是可能的,但我实际上不推荐这样做。并非所有Spark组件都是线程安全的,你必须非常小心以避免意外的行为。它也几乎不提供对资源分配的控制。

  • 参数化你的Spark应用程序,并使用外部管道管理器(如Apache AirflowLuigiToil)来提交你的作业。

    虽然这种方法有一些缺点(它将需要将数据保存到持久存储中),但它也是最通用和最robust的,并且对资源分配提供了很多控制。

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注