Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Record Arrow FFI metrics #1128

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator
Expand Down Expand Up @@ -148,20 +149,28 @@ class NativeUtil {
*/
def getNextBatch(
numOutputCols: Int,
arrowFfiMetric: Option[SQLMetric],
func: (Array[Long], Array[Long]) => Long): Option[ColumnarBatch] = {

val start = System.nanoTime()
val (arrays, schemas) = allocateArrowStructs(numOutputCols)

val arrayAddrs = arrays.map(_.memoryAddress())
val schemaAddrs = schemas.map(_.memoryAddress())

var arrowFfiTime = System.nanoTime() - start

val result = func(arrayAddrs, schemaAddrs)

result match {
case -1 =>
// EOF
None
case numRows =>
val start = System.nanoTime()
val cometVectors = importVector(arrays, schemas)
arrowFfiTime += System.nanoTime() - start
arrowFfiMetric.foreach(_.add(arrowFfiTime))
Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt))
case flag =>
throw new IllegalStateException(s"Invalid native flag: $flag")
Expand Down
14 changes: 10 additions & 4 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,14 @@ Some Comet metrics are not directly comparable to Spark metrics in some cases:

Comet also adds some custom metrics:

### ShuffleWriterExec
### CometScanExec

| Metric | Description |
| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. |
| Metric | Description |
| ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------- |
| `scanTime` | Total time to read Parquet batches into JVM. This does not include the Arrow FFI cost of exporting these batches to native code for processing. |

### Common to all Comet Operators

| Metric | Description |
| ---------------- | ------------------------------------------------------------------------------------------------ |
| `arrow_ffi_time` | Measure the time it takes to transfer Arrow batches between JVM and native code using Arrow FFI. |
26 changes: 6 additions & 20 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl ExecutionPlan for ShuffleWriterExec {
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, Arc::clone(&context))?;
let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0);
let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
Expand All @@ -152,7 +151,6 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
metrics,
context,
jvm_fetch_time,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -1085,7 +1083,6 @@ impl Debug for ShuffleRepartitioner {
}
}

#[allow(clippy::too_many_arguments)]
async fn external_shuffle(
mut input: SendableRecordBatchStream,
partition_id: usize,
Expand All @@ -1094,7 +1091,6 @@ async fn external_shuffle(
partitioning: Partitioning,
metrics: ShuffleRepartitionerMetrics,
context: Arc<TaskContext>,
jvm_fetch_time: Time,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::new(
Expand All @@ -1108,23 +1104,13 @@ async fn external_shuffle(
context.session_config().batch_size(),
);

loop {
let mut timer = jvm_fetch_time.timer();
let b = input.next().await;
timer.stop();

match b {
Some(batch_result) => {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch_result?))?;
}
_ => break,
}
while let Some(batch) = input.next().await {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch?))?;
}

repartitioner.shuffle_write().await
}

Expand Down
17 changes: 15 additions & 2 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub struct ScanExec {
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
/// Timer
arrow_ffi_time: Time,
}

impl ScanExec {
Expand All @@ -88,6 +90,7 @@ impl ScanExec {
) -> Result<Self, CometError> {
let metrics_set = ExecutionPlanMetricsSet::default();
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);

// Scan's schema is determined by the input batch, so we need to set it before execution.
// Note that we determine if arrays are dictionary-encoded based on the
Expand All @@ -97,8 +100,12 @@ impl ScanExec {
// Dictionary-encoded primitive arrays are always unpacked.
let first_batch = if let Some(input_source) = input_source.as_ref() {
let mut timer = baseline_metrics.elapsed_compute().timer();
let batch =
ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?;
let batch = ScanExec::get_next(
exec_context_id,
input_source.as_obj(),
data_types.len(),
&arrow_ffi_time,
)?;
timer.stop();
batch
} else {
Expand All @@ -124,6 +131,7 @@ impl ScanExec {
cache,
metrics: metrics_set,
baseline_metrics,
arrow_ffi_time,
schema,
})
}
Expand Down Expand Up @@ -171,6 +179,7 @@ impl ScanExec {
self.exec_context_id,
self.input_source.as_ref().unwrap().as_obj(),
self.data_types.len(),
&self.arrow_ffi_time,
)?;
*current_batch = Some(next_batch);
}
Expand All @@ -185,6 +194,7 @@ impl ScanExec {
exec_context_id: i64,
iter: &JObject,
num_cols: usize,
arrow_ffi_time: &Time,
) -> Result<InputBatch, CometError> {
if exec_context_id == TEST_EXEC_CONTEXT_ID {
// This is a unit test. We don't need to call JNI.
Expand All @@ -197,6 +207,7 @@ impl ScanExec {
exec_context_id
))));
}
let mut timer = arrow_ffi_time.timer();

let mut env = JVMClasses::get_env()?;

Expand Down Expand Up @@ -255,6 +266,8 @@ impl ScanExec {
}
}

timer.stop();

Ok(InputBatch::new(inputs, Some(num_rows as usize)))
}
}
Expand Down
3 changes: 3 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.comet

import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
Expand Down Expand Up @@ -49,6 +50,7 @@ class CometExecIterator(
inputs: Seq[Iterator[ColumnarBatch]],
numOutputCols: Int,
protobufQueryPlan: Array[Byte],
arrowFfiMetric: Option[SQLMetric],
nativeMetrics: CometMetricNode,
numParts: Int,
partitionIndex: Int)
Expand Down Expand Up @@ -102,6 +104,7 @@ class CometExecIterator(

nativeUtil.getNextBatch(
numOutputCols,
arrowFfiMetric,
(arrayAddrs, schemaAddrs) => {
val ctx = TaskContext.get()
nativeLib.executePlan(ctx.stageId(), partitionIndex, plan, arrayAddrs, schemaAddrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ case class CometCollectLimitExec(
private lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(
sparkContext,
CometMetricNode.ARROW_FFI_TIME_DESCRIPTION),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ case class CometMetricNode(metrics: Map[String, SQLMetric], children: Seq[CometM

object CometMetricNode {

val ARROW_FFI_TIME_KEY = "arrow_ffi_time"
val ARROW_FFI_TIME_DESCRIPTION = "Arrow FFI time"

/**
* The baseline SQL metrics for DataFusion `BaselineMetrics`.
*/
Expand All @@ -77,7 +80,8 @@ object CometMetricNode {
"output_rows" -> SQLMetrics.createMetric(sc, "number of output rows"),
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(
sc,
"total time (in ms) spent in this operator"))
"total time (in ms) spent in this operator"),
ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(sc, ARROW_FFI_TIME_DESCRIPTION))
}

/**
Expand All @@ -93,41 +97,41 @@ object CometMetricNode {
* SQL Metrics for DataFusion HashJoin
*/
def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Map(
"build_time" ->
SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"),
"build_input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"),
"build_input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"),
"build_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"))
baselineMetrics(sc) ++
Map(
"build_time" ->
SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"),
"build_input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"),
"build_input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"),
"build_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"))
}

/**
* SQL Metrics for DataFusion SortMergeJoin
*/
def sortMergeJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Map(
"peak_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"),
"spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"),
"spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"),
"spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows"))
baselineMetrics(sc) ++
Map(
"peak_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"),
"spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"),
"spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"),
"spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows"))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ case class CometTakeOrderedAndProjectExec(
private lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(
sparkContext,
CometMetricNode.ARROW_FFI_TIME_DESCRIPTION),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ case class CometShuffleExchangeExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric(
CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time fetching batches from JVM"),
CometMetricNode.ARROW_FFI_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down Expand Up @@ -482,26 +482,20 @@ class CometShuffleWriteProcessor(

// Maps native metrics to SQL metrics
val nativeSQLMetrics = Map(
CometMetricNode.ARROW_FFI_TIME_KEY -> metrics(CometMetricNode.ARROW_FFI_TIME_KEY),
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"data_size" -> metrics("dataSize"),
"elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME))

val nativeMetrics = if (metrics.contains("jvm_fetch_time")) {
CometMetricNode(
nativeSQLMetrics ++ Map("jvm_fetch_time" ->
metrics("jvm_fetch_time")))
} else {
CometMetricNode(nativeSQLMetrics)
}

// Getting rid of the fake partitionId
val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2)

val cometIter = CometExec.getCometIterator(
Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]),
outputAttributes.length,
nativePlan,
nativeMetrics,
metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY),
CometMetricNode(nativeSQLMetrics),
numParts,
context.partitionId())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ object CometExec {
inputs,
numOutputCols,
nativePlan,
None,
CometMetricNode(Map.empty),
numParts,
partitionIdx)
Expand All @@ -136,6 +137,7 @@ object CometExec {
inputs: Seq[Iterator[ColumnarBatch]],
numOutputCols: Int,
nativePlan: Operator,
arrowFfiMetric: Option[SQLMetric],
nativeMetrics: CometMetricNode,
numParts: Int,
partitionIdx: Int): CometExecIterator = {
Expand All @@ -148,6 +150,7 @@ object CometExec {
inputs,
numOutputCols,
bytes,
arrowFfiMetric,
nativeMetrics,
numParts,
partitionIdx)
Expand Down Expand Up @@ -240,6 +243,7 @@ abstract class CometNativeExec extends CometExec {
inputs,
output.length,
serializedPlanCopy,
metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY),
nativeMetrics,
numParts,
partitionIndex)
Expand Down
Loading