Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Create separate instance of CometShuffleMemoryAllocator per plan #1061

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,21 +325,20 @@ object CometConf extends ShimCometConf {

val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
conf("spark.comet.columnar.shuffle.memorySize")
.doc(
"The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " +
"Note that this config is only used when `spark.comet.exec.shuffle.mode` is " +
"`jvm`. Once allocated memory size reaches this config, the current batch will be " +
"flushed to disk immediately. If this is not configured, Comet will use " +
"`spark.comet.shuffle.memory.factor` * `spark.comet.memoryOverhead` as " +
"shuffle memory size. If final calculated value is larger than Comet memory " +
"overhead, Comet will use Comet memory overhead as shuffle memory size.")
.doc("The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " +
"Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm` " +
"or `auto`. Once allocated memory size reaches this config, the current batch will be " +
"flushed to disk immediately. If this is not configured, Comet will use " +
"`spark.comet.columnar.shuffle.memory.factor` * `spark.comet.memoryOverhead` as " +
"shuffle memory size. If final calculated value is larger than Comet memory " +
"overhead, Comet will use Comet memory overhead as shuffle memory size.")
.bytesConf(ByteUnit.MiB)
.createOptional

val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
conf("spark.comet.columnar.shuffle.memory.factor")
.doc(
"Fraction of Comet memory to be allocated per executor process for Comet shuffle. " +
"Fraction of Comet memory to be allocated per shuffle in the executor process. " +
"Comet memory size is specified by `spark.comet.memoryOverhead` or " +
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
"By default, this config is 1.0.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,8 @@ public final class CometShuffleMemoryAllocator extends MemoryConsumer {
private static final int OFFSET_BITS = 51;
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;

private static CometShuffleMemoryAllocator INSTANCE;

public static synchronized CometShuffleMemoryAllocator getInstance(
public CometShuffleMemoryAllocator(
SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
if (INSTANCE == null) {
INSTANCE = new CometShuffleMemoryAllocator(conf, taskMemoryManager, pageSize);
}

return INSTANCE;
}

CometShuffleMemoryAllocator(SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
this.pageSize = pageSize;
this.totalMemory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public CometShuffleExternalSorter(
ShuffleWriteMetricsReporter writeMetrics,
StructType schema) {
this.allocator =
CometShuffleMemoryAllocator.getInstance(
new CometShuffleMemoryAllocator(
conf,
memoryManager,
Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public final class CometDiskBlockWriter {
int asyncThreadNum,
ExecutorService threadPool) {
this.allocator =
CometShuffleMemoryAllocator.getInstance(
new CometShuffleMemoryAllocator(
conf,
taskMemoryManager,
Math.min(MAXIMUM_PAGE_SIZE_BYTES, taskMemoryManager.pageSizeBytes()));
Expand Down
Loading