Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate native query planning overhead #1098

Open
andygrove opened this issue Nov 19, 2024 · 5 comments
Open

Investigate native query planning overhead #1098

andygrove opened this issue Nov 19, 2024 · 5 comments
Assignees
Labels
enhancement New feature or request

Comments

@andygrove
Copy link
Member

What is the problem the feature request solves?

For each query stage, the serialized query plan is sent to the executor with each task. Each task deserializes the protobuf and then creates a PhysicalPlanner and builds a native query plan. The query plan for each partition in a stage is essentially identical, except for the scan input JNI references, so we are duplicating this query planning work across each partition.

In some cases, planning is very expensive, TPC-H q3 stage 18 seems to take around 90 seconds. Here is partial debug output. Note that each partition seems to create the query plan twice, which needs further investigation.

executePlan() stage 18 partition 6 of 29: planning took 1.482816587s
executePlan() stage 18 partition 6 of 29: planning took 1.748504654s
executePlan() stage 18 partition 7 of 29: planning took 1.552462415s
executePlan() stage 18 partition 7 of 29: planning took 1.822570717s
executePlan() stage 18 partition 8 of 29: planning took 1.498230863s
executePlan() stage 18 partition 8 of 29: planning took 1.771406765s
executePlan() stage 18 partition 9 of 29: planning took 1.457221672s
executePlan() stage 18 partition 9 of 29: planning took 1.771535457s
...

Here is another example where planning is relatively cheap, but repeated many times, resulting in 1.76 seconds total planning time.

executePlan() stage 10 partition 171 of 176: planning took 10.97809ms
executePlan() stage 10 partition 171 of 176: planning took 11.395246ms
executePlan() stage 10 partition 172 of 176: planning took 10.283634ms
executePlan() stage 10 partition 172 of 176: planning took 10.669009ms
executePlan() stage 10 partition 173 of 176: planning took 9.233809ms
executePlan() stage 10 partition 173 of 176: planning took 9.651204ms
executePlan() stage 10 partition 174 of 176: planning took 9.536889ms
executePlan() stage 10 partition 174 of 176: planning took 9.927454ms
...

Questions:

  1. Are there any general optimizations we can make?
  2. Can we cache query plans in each executor and copy them to each task rather than duplicate the planning work?
  3. Why do we create the plan twice per partition, or is there an error in how I am logging this?

I used the following code to pass the partition numbers to the native code:

        nativeLib.executePlan(
          plan,
          arrayAddrs,
          schemaAddrs,
          taskContext.stageId(),
          taskContext.partitionId(),
          taskContext.numPartitions())

Describe the potential solution

No response

Additional context

No response

@andygrove andygrove added the enhancement New feature or request label Nov 19, 2024
@andygrove
Copy link
Member Author

The reason that each plan appears to be planned twice is that we split the ShuffleWriterExec from the rest of the plan (as noted in #977).

executePlan() stage 18 partition 28 of 29: planning took 666.045344ms
Comet native query plan:
 SortExec: TopK(fetch=10), expr=[col_1@1 DESC NULLS LAST, col_2@2 ASC], preserve_partitioning=[false]
  CopyExec [UnpackOrDeepCopy]
    ScanExec: source=[], schema=[col_0: Int64, col_1: Decimal128(34, 4), col_2: Date32, col_3: Int32]

executePlan() stage 18 partition 28 of 29: planning took 689.940558ms
Comet native query plan:
 ShuffleWriterExec: partitioning=UnknownPartitioning(1)
  ScanExec: source=[], schema=[col_0: Int64, col_1: Decimal128(34, 4), col_2: Date32, col_3: Int32]

@andygrove
Copy link
Member Author

For comparison, physical planning time in Ballista for the same query (TPC-H q3) never takes more than 1ms, and overall execution time is ~6 seconds compares to ~20 seconds in Comet.

@andygrove
Copy link
Member Author

This is getting pretty interesting. I improved the native explain feature in #1099 and we now see the planning time. The following trivial plan takes more than a second to plan.

24/11/19 11:25:26 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.304405241s):
 ShuffleWriterExec: partitioning=UnknownPartitioning(1)
  ScanExec: source=[], schema=[col_0: Int64, col_1: Decimal128(34, 4), col_2: Date32, col_3: Int32]

I also added a criterion benchmark to plan this query in the same PR, and it only takes 23 microseconds.

Another observation is that the planning time increases each time:

24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.377855944s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.403194796s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.424167789s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.487451934s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.492228413s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.517392898s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.534502296s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.556645236s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.622664488s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.622664287s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.730488921s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.730601884s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.730662237s):
24/11/19 11:25:24 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 1.73067477s):

Something strange is happening here. Does this indicate that some sort of contention or locking is happening which is resulting in these long times?

@andygrove
Copy link
Member Author

There are other instances where the planning is much faster:

24/11/19 11:44:20 INFO core/src/execution/jni_api.rs: Comet native query plan (planning took 15.748199ms):
 ShuffleWriterExec: partitioning=Hash([Column { name: "col_1", index: 1 }], 200)
  ScanExec: source=[], schema=[col_0: Int64, col_1: Int64, col_2: Date32, col_3: Int32]

@andygrove
Copy link
Member Author

Plan creation time can take longer than actually executing the plan in some cases:

24/11/19 12:47:47 INFO core/src/execution/jni_api.rs: Comet native query plan with metrics (plan creation time: 1361ms):
ShuffleWriterExec: partitioning=UnknownPartitioning(1), metrics=[output_rows=10, elapsed_compute=28.875µs, spill_count=0, spilled_bytes=0, data_size=704]
  ScanExec: source=[], schema=[col_0: Int64, col_1: Decimal128(34, 4), col_2: Date32, col_3: Int32], metrics=[output_rows=10, elapsed_compute=532ns, cast_time=1ns]
24/11/19 12:47:31 INFO core/src/execution/jni_api.rs: Comet native query plan with metrics (plan creation time: 18ms):
FilterExec: col_2@2 IS NOT NULL AND col_2@2 < 1995-03-15 AND col_1@1 IS NOT NULL AND col_0@0 IS NOT NULL, metrics=[output_rows=1529587, elapsed_compute=9.038835ms]
  ScanExec: source=[CometScan parquet  (unknown)], schema=[col_0: Int64, col_1: Int64, col_2: Date32, col_3: Int32], metrics=[output_rows=3145728, elapsed_compute=4.589971ms, cast_time=3.56982ms]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant