spark error:java.lang.IllegalArgumentException: 大小超过了Integer.MAX_VALUE

我尝试计算负样本的数量,如下所示:

val numNegatives = dataSet.filter(col("label") < 0.5).count

但是我得到了一个大小超过了Integer.MAX_VALUE的错误:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)    at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427)    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:636)    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    at java.lang.Thread.run(Thread.java:745)

一些解决方案建议增加分区数量,所以我更新了上面的代码如下:

val data = dataSet.repartition(5000).cache()val numNegatives = data.filter(col("label") < 0.5).count

但是它报告了同样的错误!这让我困惑了好几天。谁能帮帮我?谢谢。


回答:

这里的问题是,在它具体化后,ShuffleRDD块的大小超过了2GB。Spark有这个限制。你需要更改spark.sql.shuffle.partitions参数,默认设置为200。

你可能还需要增加数据集的分区数量。先重新分区并保存它,然后读取新的数据集并执行操作。

spark.sql("SET spark.sql.shuffle.partitions = 10000")dataset.repartition(10000).write.parquet("/path/to/hdfs")val newDataset = spark.read.parquet("/path/to/hdfs")  newDatase.filter(...).count

或者,如果你想使用Hive表

spark.sql("SET spark.sql.shuffle.partitions = 10000")dataset.repartition(10000).asveAsTable("newTableName")val newDataset = spark.table("newTableName")  newDatase.filter(...).count              

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注