spark error:java.lang.IllegalArgumentException: 大小超过了Integer.MAX_VALUE

我尝试计算负样本的数量,如下所示:

val numNegatives = dataSet.filter(col("label") < 0.5).count

但是我得到了一个大小超过了Integer.MAX_VALUE的错误:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)    at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427)    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:636)    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    at java.lang.Thread.run(Thread.java:745)

一些解决方案建议增加分区数量,所以我更新了上面的代码如下:

val data = dataSet.repartition(5000).cache()val numNegatives = data.filter(col("label") < 0.5).count

但是它报告了同样的错误!这让我困惑了好几天。谁能帮帮我?谢谢。


回答:

这里的问题是,在它具体化后,ShuffleRDD块的大小超过了2GB。Spark有这个限制。你需要更改spark.sql.shuffle.partitions参数,默认设置为200。

你可能还需要增加数据集的分区数量。先重新分区并保存它,然后读取新的数据集并执行操作。

spark.sql("SET spark.sql.shuffle.partitions = 10000")dataset.repartition(10000).write.parquet("/path/to/hdfs")val newDataset = spark.read.parquet("/path/to/hdfs")  newDatase.filter(...).count

或者,如果你想使用Hive表

spark.sql("SET spark.sql.shuffle.partitions = 10000")dataset.repartition(10000).asveAsTable("newTableName")val newDataset = spark.table("newTableName")  newDatase.filter(...).count              

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注