diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 6833aefff..441e931d1 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -325,21 +325,20 @@ object CometConf extends ShimCometConf { val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] = conf("spark.comet.columnar.shuffle.memorySize") - .doc( - "The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " + - "Note that this config is only used when `spark.comet.exec.shuffle.mode` is " + - "`jvm`. Once allocated memory size reaches this config, the current batch will be " + - "flushed to disk immediately. If this is not configured, Comet will use " + - "`spark.comet.shuffle.memory.factor` * `spark.comet.memoryOverhead` as " + - "shuffle memory size. If final calculated value is larger than Comet memory " + - "overhead, Comet will use Comet memory overhead as shuffle memory size.") + .doc("The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " + + "Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm` " + + "or `auto`. Once allocated memory size reaches this config, the current batch will be " + + "flushed to disk immediately. If this is not configured, Comet will use " + + "`spark.comet.columnar.shuffle.memory.factor` * `spark.comet.memoryOverhead` as " + + "shuffle memory size. If final calculated value is larger than Comet memory " + + "overhead, Comet will use Comet memory overhead as shuffle memory size.") .bytesConf(ByteUnit.MiB) .createOptional val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] = conf("spark.comet.columnar.shuffle.memory.factor") .doc( - "Fraction of Comet memory to be allocated per executor process for Comet shuffle. " + + "Fraction of Comet memory to be allocated per shuffle in the executor process. " + "Comet memory size is specified by `spark.comet.memoryOverhead` or " + "calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. " + "By default, this config is 1.0.") diff --git a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java index 2837fa369..d8656dc3c 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java +++ b/spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java @@ -62,18 +62,8 @@ public final class CometShuffleMemoryAllocator extends MemoryConsumer { private static final int OFFSET_BITS = 51; private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL; - private static CometShuffleMemoryAllocator INSTANCE; - - public static synchronized CometShuffleMemoryAllocator getInstance( + public CometShuffleMemoryAllocator( SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) { - if (INSTANCE == null) { - INSTANCE = new CometShuffleMemoryAllocator(conf, taskMemoryManager, pageSize); - } - - return INSTANCE; - } - - CometShuffleMemoryAllocator(SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) { super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP); this.pageSize = pageSize; this.totalMemory = diff --git a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java index ed3e2be66..b2a595b44 100644 --- a/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java +++ b/spark/src/main/java/org/apache/spark/shuffle/sort/CometShuffleExternalSorter.java @@ -137,7 +137,7 @@ public CometShuffleExternalSorter( ShuffleWriteMetricsReporter writeMetrics, StructType schema) { this.allocator = - CometShuffleMemoryAllocator.getInstance( + new CometShuffleMemoryAllocator( conf, memoryManager, Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes())); diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java index f793874d7..d62284543 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java @@ -136,7 +136,7 @@ public final class CometDiskBlockWriter { int asyncThreadNum, ExecutorService threadPool) { this.allocator = - CometShuffleMemoryAllocator.getInstance( + new CometShuffleMemoryAllocator( conf, taskMemoryManager, Math.min(MAXIMUM_PAGE_SIZE_BYTES, taskMemoryManager.pageSizeBytes()));