我尝试计算负样本的数量,如下所示:
val numNegatives = dataSet.filter(col("label") < 0.5).count
但是我得到了一个大小超过了Integer.MAX_VALUE的错误:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:636) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
一些解决方案建议增加分区数量,所以我更新了上面的代码如下:
val data = dataSet.repartition(5000).cache()val numNegatives = data.filter(col("label") < 0.5).count
但是它报告了同样的错误!这让我困惑了好几天。谁能帮帮我?谢谢。
回答:
这里的问题是,在它具体化后,ShuffleRDD块的大小超过了2GB。Spark有这个限制。你需要更改spark.sql.shuffle.partitions
参数,默认设置为200。
你可能还需要增加数据集的分区数量。先重新分区并保存它,然后读取新的数据集并执行操作。
spark.sql("SET spark.sql.shuffle.partitions = 10000")dataset.repartition(10000).write.parquet("/path/to/hdfs")val newDataset = spark.read.parquet("/path/to/hdfs") newDatase.filter(...).count
或者,如果你想使用Hive表
spark.sql("SET spark.sql.shuffle.partitions = 10000")dataset.repartition(10000).asveAsTable("newTableName")val newDataset = spark.table("newTableName") newDatase.filter(...).count