Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Create separate instance of CometShuffleMemoryAllocator per plan #1054

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ object CometConf extends ShimCometConf {
"spark.comet.memory.overhead.factor")
.doc(
"Fraction of executor memory to be allocated as additional non-heap memory per executor " +
"process for Comet. Default value is 0.2.")
"process for Comet.")
.doubleConf
.checkValue(
factor => factor > 0,
Expand Down Expand Up @@ -323,26 +323,12 @@ object CometConf extends ShimCometConf {
.intConf
.createWithDefault(Int.MaxValue)

val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
conf("spark.comet.columnar.shuffle.memorySize")
.doc(
"The optional maximum size of the memory used for Comet columnar shuffle, in MiB. " +
"Note that this config is only used when `spark.comet.exec.shuffle.mode` is " +
"`jvm`. Once allocated memory size reaches this config, the current batch will be " +
"flushed to disk immediately. If this is not configured, Comet will use " +
"`spark.comet.shuffle.memory.factor` * `spark.comet.memoryOverhead` as " +
"shuffle memory size. If final calculated value is larger than Comet memory " +
"overhead, Comet will use Comet memory overhead as shuffle memory size.")
.bytesConf(ByteUnit.MiB)
.createOptional

val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
conf("spark.comet.columnar.shuffle.memory.factor")
.doc(
"Fraction of Comet memory to be allocated per executor process for Comet shuffle. " +
"Comet memory size is specified by `spark.comet.memoryOverhead` or " +
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
"By default, this config is 1.0.")
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`.")
.doubleConf
.checkValue(
factor => factor > 0,
Expand Down
9 changes: 9 additions & 0 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` ar

Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool.

## Shuffle Memory Management

When Comet performs a columnar JVM shuffle (rather than a native shuffle) then memory is allocated independently of the
Unified or Native Memory Management approach.

By default, the amount of executor memory allocated for JVM shuffle is
`spark.comet.columnar.shuffle.memory.factor * spark.executor.memory`. The default value for
`spark.comet.columnar.shuffle.memory.factor` is `1.0`.

### Determining How Much Memory to Allocate

Generally, increasing memory overhead will improve query performance, especially for queries containing joins and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ public static synchronized CometShuffleMemoryAllocator getInstance(
return INSTANCE;
}

CometShuffleMemoryAllocator(SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
public CometShuffleMemoryAllocator(
SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
super(taskMemoryManager, pageSize, MemoryMode.OFF_HEAP);
this.pageSize = pageSize;
this.totalMemory =
CometSparkSessionExtensions$.MODULE$.getCometShuffleMemorySize(conf, SQLConf.get());
CometSparkSessionExtensions$.MODULE$.getCometPerShuffleMemorySize(conf, SQLConf.get());
}

public synchronized long acquireMemory(long size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public CometShuffleExternalSorter(
ShuffleWriteMetricsReporter writeMetrics,
StructType schema) {
this.allocator =
CometShuffleMemoryAllocator.getInstance(
new CometShuffleMemoryAllocator(
conf,
memoryManager,
Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1295,10 +1295,8 @@ object CometSparkSessionExtensions extends Logging {
val cometMemoryOverhead = getCometMemoryOverheadInMiB(sparkConf)

val overheadFactor = COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.get(conf)
val cometShuffleMemoryFromConf = COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.get(conf)

val shuffleMemorySize =
cometShuffleMemoryFromConf.getOrElse((overheadFactor * cometMemoryOverhead).toLong)
val shuffleMemorySize = (overheadFactor * cometMemoryOverhead).toLong
if (shuffleMemorySize > cometMemoryOverhead) {
logWarning(
s"Configured shuffle memory size $shuffleMemorySize is larger than Comet memory overhead " +
Expand All @@ -1309,6 +1307,27 @@ object CometSparkSessionExtensions extends Logging {
}
}

/** Calculates required shuffle memory size in bytes per shuffle operator for Comet. */
def getCometPerShuffleMemorySize(sparkConf: SparkConf, conf: SQLConf): Long = {
(getCometShuffleMemorySize(sparkConf, conf).toFloat /
numDriverOrExecutorCores(sparkConf)).toLong
}

def numDriverOrExecutorCores(conf: SparkConf): Int = {
def convertToInt(threads: String): Int = {
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
}
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
val master = conf.get("spark.master")
master match {
case "local" => 1
case LOCAL_N_REGEX(threads) => convertToInt(threads)
case LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case _ => conf.get("spark.executor.cores", "1").toInt
}
}

/**
* Attaches explain information to a TreeNode, rolling up the corresponding information tags
* from any child nodes. For now, we are using this to attach the reasons why certain Spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class CometSparkSessionExtensionsSuite extends CometTestBase {
val conf = new SparkConf()
val sqlConf = new SQLConf
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "512m")
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.key, "0.5")

assert(
CometSparkSessionExtensions
Expand All @@ -138,7 +138,7 @@ class CometSparkSessionExtensionsSuite extends CometTestBase {
val conf = new SparkConf()
val sqlConf = new SQLConf
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "10g")
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.key, "10.0")
assert(
CometSparkSessionExtensions
.getCometShuffleMemorySize(conf, sqlConf) == getBytesFromMib(1024))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
CometConf.COMET_EXEC_ENABLED.key -> "false",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") {
CometConf.COMET_MEMORY_OVERHEAD.key -> "1536m",
CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.key -> "1.0") {
testFun
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object CometExecBenchmark extends CometBenchmarkBase {
sparkSession.conf.set(CometConf.COMET_ENABLED.key, "false")
sparkSession.conf.set(CometConf.COMET_EXEC_ENABLED.key, "false")
sparkSession.conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "10g")
sparkSession.conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "10g")
sparkSession.conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.key, "1.0")
// TODO: support dictionary encoding in vectorized execution
sparkSession.conf.set("parquet.enable.dictionary", "false")
sparkSession.conf.set("spark.sql.shuffle.partitions", "2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
sparkSession.conf.set(CometConf.COMET_ENABLED.key, "false")
sparkSession.conf.set(CometConf.COMET_EXEC_ENABLED.key, "false")
sparkSession.conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "10g")
sparkSession.conf.set(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.key, "1.0")
// TODO: support dictionary encoding in vectorized execution
sparkSession.conf.set("parquet.enable.dictionary", "false")

Expand Down
Loading