Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Require offHeap memory to be enabled (always use unified memory) #1062

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,12 @@ Comet provides some tuning options to help you get the best performance from you

## Memory Tuning

Comet provides two options for memory management:

- **Unified Memory Management** shares an off-heap memory pool between Spark and Comet. This is the recommended option.
- **Native Memory Management** leverages DataFusion's memory management for the native plans and allocates memory independently of Spark.

### Unified Memory Management

This option is automatically enabled when `spark.memory.offHeap.enabled=true`.
Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark.

Each executor will have a single memory pool which will be shared by all native plans being executed within that
process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`.

### Native Memory Management

This option is automatically enabled when `spark.memory.offHeap.enabled=false`.

Each native plan has a dedicated memory pool.

By default, the size of each pool is `spark.comet.memory.overhead.factor * spark.executor.memory`. The default value
for `spark.comet.memory.overhead.factor` is `0.2`.

It is important to take executor concurrency into account. The maximum number of concurrent plans in an executor can
be calculated with `spark.executor.cores / spark.task.cpus`.

For example, if the executor can execute 4 plans concurrently, then the total amount of memory allocated will be
`4 * spark.comet.memory.overhead.factor * spark.executor.memory`.

It is also possible to set `spark.comet.memoryOverhead` to the desired size for each pool, rather than calculating
it based on `spark.comet.memory.overhead.factor`.

If both `spark.comet.memoryOverhead` and `spark.comet.memory.overhead.factor` are set, the former will be used.

Comet will allocate at least `spark.comet.memory.overhead.min` memory per pool.

### Determining How Much Memory to Allocate

Generally, increasing memory overhead will improve query performance, especially for queries containing joins and
Expand Down
24 changes: 3 additions & 21 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,27 +202,9 @@ fn prepare_datafusion_session_context(

let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);

// Check if we are using unified memory manager integrated with Spark. Default to false if not
// set.
let use_unified_memory_manager = parse_bool(conf, "use_unified_memory_manager")?;

if use_unified_memory_manager {
// Set Comet memory pool for native
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
} else {
// Use the memory pool from DF
if conf.contains_key("memory_limit") {
let memory_limit = conf.get("memory_limit").unwrap().parse::<usize>()?;
let memory_fraction = conf
.get("memory_fraction")
.ok_or(CometError::Internal(
"Config 'memory_fraction' is not specified from Comet JVM side".to_string(),
))?
.parse::<f64>()?;
rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction)
}
}
// Set Comet memory pool for native
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));

// Get Datafusion configuration from Spark Execution context
// can be configured in Comet Spark JVM using Spark --conf parameters
Expand Down
11 changes: 1 addition & 10 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.vector.NativeUtil

/**
Expand Down Expand Up @@ -75,15 +75,6 @@ class CometExecIterator(
val result = new java.util.HashMap[String, String]()
val conf = SparkEnv.get.conf

val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
// Only enable unified memory manager when off-heap mode is enabled. Otherwise,
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
// and `memory_fraction` below.
result.put(
"use_unified_memory_manager",
String.valueOf(conf.get("spark.memory.offHeap.enabled", "false")))
result.put("memory_limit", String.valueOf(maxMemory))
result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
result.put("explain_native", String.valueOf(COMET_EXPLAIN_NATIVE_ENABLED.get()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,13 @@ class CometSparkSessionExtensions
}

override def apply(plan: SparkPlan): SparkPlan = {

// Comet required off-heap memory to be enabled
if ("true" != conf.getConfString("spark.memory.offHeap.enabled", "false")) {
logInfo("Comet extension disabled because spark.memory.offHeap.enabled=false")
return plan
}

// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable

import org.apache.commons.io.FileUtils
import org.apache.spark.SparkContext
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE}
import org.apache.spark.sql.TPCDSBase
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.util.resourceToString
Expand Down Expand Up @@ -286,6 +287,8 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa
conf.set(
"spark.shuffle.manager",
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
Expand Down
Loading