目前我在数百万用户和产品上运行Spark Mllib的ALS,由于高频率的磁盘shuffle,与推荐产品给用户的步骤相比,收集数据的步骤耗时更长。因此,如果我能以某种方式去掉收集步骤,并直接从执行器向Elasticsearch提供数据,将节省大量时间和计算资源。
import com.sksamuel.elastic4s.ElasticClientimport com.sksamuel.elastic4s.ElasticDsl._import org.elasticsearch.common.settings.ImmutableSettingsval settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MYCLUSTER").build()val client = ElasticClient.remote(settings, "11.11.11.11", 9300)var ESMap = Map[String, List[String]]() val topKReco = bestModel.get // 以下步骤耗时3小时 .recommendProductsForUsers(30) // 以下步骤耗时6小时 .collect() .foreach { r => var i = 1 var curr_user = r._1 r._2.foreach { r2 => item_ids(r2.product)) ESMap += i.toString -> List(r2.product.toString) i += 1 } client.execute { index into "recommendations1" / "items" id curr_user fields ESMap }.await}
因此,当我运行不带收集步骤的代码时,我得到了以下错误:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.foreach(RDD.scala:868) at CatalogALS2$.main(CatalogALS2.scala:157) at CatalogALS2.main(CatalogALS2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: com.sksamuel.elastic4s.ElasticClientSerialization stack: - object not serializable (class: com.sksamuel.elastic4s.ElasticClient, value: com.sksamuel.elastic4s.ElasticClient@e4c4af) - field (class: CatalogALS2$$anonfun$2, name: client$1, type: class com.sksamuel.elastic4s.ElasticClient) - object (class CatalogALS2$$anonfun$2, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
从中我理解到,如果我能以某种方式序列化com.sksamuel.elastic4s.ElasticClient类,那么我就可以在不将数据收集到驱动程序的情况下并行运行此任务。如果我概括这个问题,那么如何在Scala中序列化任何类或函数以在RDD上操作?
回答:
通过使用序列化找到了相同的答案,如下所示:
object ESConnection extends Serializable { // Elasticsearch客户端初始化 val settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MyCluster").build() lazy val client = ElasticClient.remote(settings, "11.11.11.11", 9300)}
然后你可以在RDD上使用它,而无需实际将数据收集到驱动程序,如下所示:
val topKReco = bestModel.get .recommendProductsForUsers(30) // 现在不需要收集 .foreach { r => var i = 1 var curr_user = r._1 r._2.foreach { r2 => ESMap += i.toString -> List(r2.product.toString, item_ids(r2.product)) i += 1 } ESConnection.client.execute { index into "recommendation1" / "items" id curr_user fields ESMap }.await }