Skip to content

Commit

Permalink
Remove CometShuffleMemoryAllocator singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Nov 6, 2024
1 parent 562a877 commit a099acb
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 22 deletions.
17 changes: 8 additions & 9 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,21 +325,20 @@ object CometConf extends ShimCometConf {

val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
conf("spark.comet.columnar.shuffle.memorySize")
.doc(
"The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " +
"Note that this config is only used when `spark.comet.exec.shuffle.mode` is " +
"`jvm`. Once allocated memory size reaches this config, the current batch will be " +
"flushed to disk immediately. If this is not configured, Comet will use " +
"`spark.comet.shuffle.memory.factor` * `spark.comet.memoryOverhead` as " +
"shuffle memory size. If final calculated value is larger than Comet memory " +
"overhead, Comet will use Comet memory overhead as shuffle memory size.")
.doc("The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " +
"Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm` " +
"or `auto`. Once allocated memory size reaches this config, the current batch will be " +
"flushed to disk immediately. If this is not configured, Comet will use " +
"`spark.comet.columnar.shuffle.memory.factor` * `spark.comet.memoryOverhead` as " +
"shuffle memory size. If final calculated value is larger than Comet memory " +
"overhead, Comet will use Comet memory overhead as shuffle memory size.")
.bytesConf(ByteUnit.MiB)
.createOptional

val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
conf("spark.comet.columnar.shuffle.memory.factor")
.doc(
"Fraction of Comet memory to be allocated per executor process for Comet shuffle. " +
"Fraction of Comet memory to be allocated per shuffle in the executor process. " +
"Comet memory size is specified by `spark.comet.memoryOverhead` or " +
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
"By default, this config is 1.0.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,8 @@ public final class CometShuffleMemoryAllocator extends MemoryConsumer {
private static final int OFFSET_BITS = 51;
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;

private static CometShuffleMemoryAllocator INSTANCE;

public static synchronized CometShuffleMemoryAllocator getInstance(
public CometShuffleMemoryAllocator(
SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
if (INSTANCE == null) {
INSTANCE = new CometShuffleMemoryAllocator(conf, taskMemoryManager, pageSize);
}

return INSTANCE;
}

CometShuffleMemoryAllocator(SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
this.pageSize = pageSize;
this.totalMemory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public CometShuffleExternalSorter(
ShuffleWriteMetricsReporter writeMetrics,
StructType schema) {
this.allocator =
CometShuffleMemoryAllocator.getInstance(
new CometShuffleMemoryAllocator(
conf,
memoryManager,
Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public final class CometDiskBlockWriter {
int asyncThreadNum,
ExecutorService threadPool) {
this.allocator =
CometShuffleMemoryAllocator.getInstance(
new CometShuffleMemoryAllocator(
conf,
taskMemoryManager,
Math.min(MAXIMUM_PAGE_SIZE_BYTES, taskMemoryManager.pageSizeBytes()));
Expand Down

0 comments on commit a099acb

Please sign in to comment.