如何将elastic4s的ElasticSearch客户端序列化以与Spark RDD一起运行?

目前我在数百万用户和产品上运行Spark Mllib的ALS,由于高频率的磁盘shuffle,与推荐产品给用户的步骤相比,收集数据的步骤耗时更长。因此,如果我能以某种方式去掉收集步骤,并直接从执行器向Elasticsearch提供数据,将节省大量时间和计算资源。

import com.sksamuel.elastic4s.ElasticClientimport com.sksamuel.elastic4s.ElasticDsl._import org.elasticsearch.common.settings.ImmutableSettingsval settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MYCLUSTER").build()val client = ElasticClient.remote(settings, "11.11.11.11", 9300)var ESMap = Map[String, List[String]]()  val topKReco = bestModel.get  // 以下步骤耗时3小时  .recommendProductsForUsers(30)  // 以下步骤耗时6小时  .collect()  .foreach { r =>  var i = 1  var curr_user = r._1  r._2.foreach { r2 =>  item_ids(r2.product))    ESMap += i.toString -> List(r2.product.toString)    i += 1  }  client.execute {    index into "recommendations1" / "items" id curr_user fields ESMap  }.await}

因此,当我运行不带收集步骤的代码时,我得到了以下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)    at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)    at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)    at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)    at CatalogALS2$.main(CatalogALS2.scala:157)    at CatalogALS2.main(CatalogALS2.scala)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:606)    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException:      com.sksamuel.elastic4s.ElasticClientSerialization stack:    - object not serializable (class: com.sksamuel.elastic4s.ElasticClient,     value: com.sksamuel.elastic4s.ElasticClient@e4c4af)    - field (class: CatalogALS2$$anonfun$2, name: client$1, type: class    com.sksamuel.elastic4s.ElasticClient)    - object (class CatalogALS2$$anonfun$2, <function1>)    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)

从中我理解到,如果我能以某种方式序列化com.sksamuel.elastic4s.ElasticClient类,那么我就可以在不将数据收集到驱动程序的情况下并行运行此任务。如果我概括这个问题,那么如何在Scala中序列化任何类或函数以在RDD上操作?


回答:

通过使用序列化找到了相同的答案,如下所示:

object ESConnection extends Serializable {  //    Elasticsearch客户端初始化  val settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MyCluster").build()  lazy val client = ElasticClient.remote(settings, "11.11.11.11", 9300)}

然后你可以在RDD上使用它,而无需实际将数据收集到驱动程序,如下所示:

   val topKReco = bestModel.get      .recommendProductsForUsers(30)      // 现在不需要收集      .foreach { r =>      var i = 1      var curr_user = r._1      r._2.foreach { r2 =>      ESMap += i.toString -> List(r2.product.toString, item_ids(r2.product))        i += 1      }      ESConnection.client.execute {        index into "recommendation1" / "items" id curr_user fields ESMap      }.await    }

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注