Skip to content

Commit

Permalink
Record Arrow FFI metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Nov 29, 2024
1 parent 5400fd7 commit 1715b5f
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator
Expand Down Expand Up @@ -148,20 +149,28 @@ class NativeUtil {
*/
def getNextBatch(
numOutputCols: Int,
arrowFfiMetric: Option[SQLMetric],
func: (Array[Long], Array[Long]) => Long): Option[ColumnarBatch] = {

val start = System.nanoTime()
val (arrays, schemas) = allocateArrowStructs(numOutputCols)

val arrayAddrs = arrays.map(_.memoryAddress())
val schemaAddrs = schemas.map(_.memoryAddress())

var arrowFfiTime = System.nanoTime() - start

val result = func(arrayAddrs, schemaAddrs)

result match {
case -1 =>
// EOF
None
case numRows =>
val start = System.nanoTime()
val cometVectors = importVector(arrays, schemas)
arrowFfiTime += System.nanoTime() - start
arrowFfiMetric.foreach(_.add(arrowFfiTime))
Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt))
case flag =>
throw new IllegalStateException(s"Invalid native flag: $flag")
Expand Down
14 changes: 10 additions & 4 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,14 @@ Some Comet metrics are not directly comparable to Spark metrics in some cases:

Comet also adds some custom metrics:

### ShuffleWriterExec
### CometScanExec

| Metric | Description |
| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. |
| Metric | Description |
| ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------- |
| `scanTime` | Total time to read Parquet batches into JVM. This does not include the Arrow FFI cost of exporting these batches to native code for processing. |

### Common to all Comet Operators

| Metric | Description |
| ---------------- | ------------------------------------------------------------------------------------------------ |
| `arrow_ffi_time` | Measure the time it takes to transfer Arrow batches between JVM and native code using Arrow FFI. |
17 changes: 15 additions & 2 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub struct ScanExec {
metrics: ExecutionPlanMetricsSet,
/// Baseline metrics
baseline_metrics: BaselineMetrics,
/// Timer
arrow_ffi_time: Time,
}

impl ScanExec {
Expand All @@ -88,6 +90,7 @@ impl ScanExec {
) -> Result<Self, CometError> {
let metrics_set = ExecutionPlanMetricsSet::default();
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);

// Scan's schema is determined by the input batch, so we need to set it before execution.
// Note that we determine if arrays are dictionary-encoded based on the
Expand All @@ -97,8 +100,12 @@ impl ScanExec {
// Dictionary-encoded primitive arrays are always unpacked.
let first_batch = if let Some(input_source) = input_source.as_ref() {
let mut timer = baseline_metrics.elapsed_compute().timer();
let batch =
ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?;
let batch = ScanExec::get_next(
exec_context_id,
input_source.as_obj(),
data_types.len(),
&arrow_ffi_time,
)?;
timer.stop();
batch
} else {
Expand All @@ -124,6 +131,7 @@ impl ScanExec {
cache,
metrics: metrics_set,
baseline_metrics,
arrow_ffi_time,
schema,
})
}
Expand Down Expand Up @@ -171,6 +179,7 @@ impl ScanExec {
self.exec_context_id,
self.input_source.as_ref().unwrap().as_obj(),
self.data_types.len(),
&self.arrow_ffi_time,
)?;
*current_batch = Some(next_batch);
}
Expand All @@ -185,6 +194,7 @@ impl ScanExec {
exec_context_id: i64,
iter: &JObject,
num_cols: usize,
arrow_ffi_time: &Time,
) -> Result<InputBatch, CometError> {
if exec_context_id == TEST_EXEC_CONTEXT_ID {
// This is a unit test. We don't need to call JNI.
Expand All @@ -197,6 +207,7 @@ impl ScanExec {
exec_context_id
))));
}
let mut timer = arrow_ffi_time.timer();

let mut env = JVMClasses::get_env()?;

Expand Down Expand Up @@ -255,6 +266,8 @@ impl ScanExec {
}
}

timer.stop();

Ok(InputBatch::new(inputs, Some(num_rows as usize)))
}
}
Expand Down
3 changes: 3 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.comet

import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
Expand Down Expand Up @@ -49,6 +50,7 @@ class CometExecIterator(
inputs: Seq[Iterator[ColumnarBatch]],
numOutputCols: Int,
protobufQueryPlan: Array[Byte],
arrowFfiMetric: Option[SQLMetric],
nativeMetrics: CometMetricNode,
numParts: Int,
partitionIndex: Int)
Expand Down Expand Up @@ -102,6 +104,7 @@ class CometExecIterator(

nativeUtil.getNextBatch(
numOutputCols,
arrowFfiMetric,
(arrayAddrs, schemaAddrs) => {
val ctx = TaskContext.get()
nativeLib.executePlan(ctx.stageId(), partitionIndex, plan, arrayAddrs, schemaAddrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ case class CometCollectLimitExec(
private lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(
sparkContext,
CometMetricNode.ARROW_FFI_TIME_DESCRIPTION),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ case class CometMetricNode(metrics: Map[String, SQLMetric], children: Seq[CometM

object CometMetricNode {

val ARROW_FFI_TIME_KEY = "arrow_ffi_time"
val ARROW_FFI_TIME_DESCRIPTION = "Arrow FFI time"

/**
* The baseline SQL metrics for DataFusion `BaselineMetrics`.
*/
Expand All @@ -77,7 +80,8 @@ object CometMetricNode {
"output_rows" -> SQLMetrics.createMetric(sc, "number of output rows"),
"elapsed_compute" -> SQLMetrics.createNanoTimingMetric(
sc,
"total time (in ms) spent in this operator"))
"total time (in ms) spent in this operator"),
ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(sc, ARROW_FFI_TIME_DESCRIPTION))
}

/**
Expand All @@ -93,41 +97,41 @@ object CometMetricNode {
* SQL Metrics for DataFusion HashJoin
*/
def hashJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Map(
"build_time" ->
SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"),
"build_input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"),
"build_input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"),
"build_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"))
baselineMetrics(sc) ++
Map(
"build_time" ->
SQLMetrics.createNanoTimingMetric(sc, "Total time for collecting build-side of join"),
"build_input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by build-side"),
"build_input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by build-side"),
"build_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"))
}

/**
* SQL Metrics for DataFusion SortMergeJoin
*/
def sortMergeJoinMetrics(sc: SparkContext): Map[String, SQLMetric] = {
Map(
"peak_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"output_rows" -> SQLMetrics.createMetric(sc, "Number of rows produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"),
"spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"),
"spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"),
"spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows"))
baselineMetrics(sc) ++
Map(
"peak_mem_used" ->
SQLMetrics.createSizeMetric(sc, "Memory used by build-side"),
"input_batches" ->
SQLMetrics.createMetric(sc, "Number of batches consumed by probe-side"),
"input_rows" ->
SQLMetrics.createMetric(sc, "Number of rows consumed by probe-side"),
"output_batches" -> SQLMetrics.createMetric(sc, "Number of batches produced"),
"join_time" -> SQLMetrics.createNanoTimingMetric(sc, "Total time for joining"),
"spill_count" -> SQLMetrics.createMetric(sc, "Count of spills"),
"spilled_bytes" -> SQLMetrics.createSizeMetric(sc, "Total spilled bytes"),
"spilled_rows" -> SQLMetrics.createMetric(sc, "Total spilled rows"))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ case class CometTakeOrderedAndProjectExec(
private lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(
sparkContext,
CometMetricNode.ARROW_FFI_TIME_DESCRIPTION),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ case class CometShuffleExchangeExec(
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric(
CometMetricNode.ARROW_FFI_TIME_KEY -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time fetching batches from JVM"),
CometMetricNode.ARROW_FFI_TIME_DESCRIPTION),
"numPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of partitions")) ++ readMetrics ++ writeMetrics
Expand Down Expand Up @@ -482,26 +482,20 @@ class CometShuffleWriteProcessor(

// Maps native metrics to SQL metrics
val nativeSQLMetrics = Map(
CometMetricNode.ARROW_FFI_TIME_KEY -> metrics(CometMetricNode.ARROW_FFI_TIME_KEY),
"output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
"data_size" -> metrics("dataSize"),
"elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME))

val nativeMetrics = if (metrics.contains("jvm_fetch_time")) {
CometMetricNode(
nativeSQLMetrics ++ Map("jvm_fetch_time" ->
metrics("jvm_fetch_time")))
} else {
CometMetricNode(nativeSQLMetrics)
}

// Getting rid of the fake partitionId
val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2)

val cometIter = CometExec.getCometIterator(
Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]),
outputAttributes.length,
nativePlan,
nativeMetrics,
metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY),
CometMetricNode(nativeSQLMetrics),
numParts,
context.partitionId())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ object CometExec {
inputs,
numOutputCols,
nativePlan,
None,
CometMetricNode(Map.empty),
numParts,
partitionIdx)
Expand All @@ -136,6 +137,7 @@ object CometExec {
inputs: Seq[Iterator[ColumnarBatch]],
numOutputCols: Int,
nativePlan: Operator,
arrowFfiMetric: Option[SQLMetric],
nativeMetrics: CometMetricNode,
numParts: Int,
partitionIdx: Int): CometExecIterator = {
Expand All @@ -148,6 +150,7 @@ object CometExec {
inputs,
numOutputCols,
bytes,
arrowFfiMetric,
nativeMetrics,
numParts,
partitionIdx)
Expand Down Expand Up @@ -240,6 +243,7 @@ abstract class CometNativeExec extends CometExec {
inputs,
output.length,
serializedPlanCopy,
metrics.get(CometMetricNode.ARROW_FFI_TIME_KEY),
nativeMetrics,
numParts,
partitionIndex)
Expand Down

0 comments on commit 1715b5f

Please sign in to comment.