Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Simplify CometShuffleMemoryAllocator to use Spark unified memory allocator #1063

Merged
merged 7 commits into from
Nov 14, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Nov 7, 2024

Which issue does this PR close?

Closes #1064
Closes #886

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@viirya viirya changed the title chore: Simplify CometShuffleMemoryAllocator to use Spark memory allocator chore: Simplify CometShuffleMemoryAllocator to use Spark unified memory allocator Nov 7, 2024
@andygrove
Copy link
Member

test failure:

 [info] - Spark vectorized reader - with partition data column - select a single complex field from a map entry and its parent map entry *** FAILED *** (653 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 215.0 failed 1 times, most recent failure: Lost task 0.0 in stage 215.0 (TID 370) (4bf8ef4698e6 executor driver): java.lang.IllegalArgumentException: CometShuffleMemoryAllocator should be used with off-heap memory mode, but got ON_HEAP
[info] 	at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.getInstance(CometShuffleMemoryAllocator.java:44)
[info] 	at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.<init>(CometDiskBlockWriter.java:139)
[info] 	at org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleWriter.write(CometBypassMergeSortShuffleWriter.java:181)

I think we need to specify spark.memory.offHeap.enabled=true when running Spark tests? I need to do the same in https://github.com/apache/datafusion-comet/pulls

For this PR we should also fall back to Spark for shuffle if spark.memory.offHeap.enabled=false?

@viirya
Copy link
Member Author

viirya commented Nov 7, 2024

Basically Spark tests are running with on-heap config, except for tests that particularly for off-heap test.

I'm not sure if enabling off-heap for all Spark tests can pass them all. If it works, let's do it.

If not, I plan to keep and rename current CometShuffleMemoryAllocator to a test-only class CometTestShuffleMemoryAllocator. Once it runs Spark tests, Comet can use CometTestShuffleMemoryAllocator to run Spark tests.

@viirya
Copy link
Member Author

viirya commented Nov 8, 2024

@andygrove All Spark tests are passed now.

@andygrove
Copy link
Member

I tried testing with TPC-H but see a memory issue:

│ 24/11/08 02:31:44 INFO core/src/lib.rs: Comet native library version 0.4.0 initialized                                                                                                                        │
│ #                                                                                                                                                                                                             │
│ # A fatal error has been detected by the Java Runtime Environment:                                                                                                                                            │
│ #                                                                                                                                                                                                             │
│ #  SIGSEGV (0xb) at pc=0x00007399e4661564, pid=11, tid=132                                                                                                                                                    │
│ #                                                                                                                                                                                                             │
│ # JRE version: OpenJDK Runtime Environment Temurin-11.0.24+8 (11.0.24+8) (build 11.0.24+8)                                                                                                                    │
│ # Java VM: OpenJDK 64-Bit Server VM Temurin-11.0.24+8 (11.0.24+8, mixed mode, sharing, tiered, compressed oops, g1 gc, linux-amd64)                                                                           │
│ # Problematic frame:                                                                                                                                                                                          │
│ corrupted double-linked list             

@andygrove
Copy link
Member

One other issue. I tested with spark.memory.offHeap.enabled=false and the shuffle did not fall back to Spark but failed at runtime.

@viirya
Copy link
Member Author

viirya commented Nov 8, 2024

I tried testing with TPC-H but see a memory issue:

I will test it locally too.

@viirya
Copy link
Member Author

viirya commented Nov 8, 2024

One other issue. I tested with spark.memory.offHeap.enabled=false and the shuffle did not fall back to Spark but failed at runtime.

Yes. If using on-heap config, CometShuffleMemoryAllocator will throw runtime error, i.e., you need to use off-heap config in Spark.

The test only CometTestShuffleMemoryAllocator is only used in Spark tests (as they are used for on-heap mostly).

@viirya
Copy link
Member Author

viirya commented Nov 8, 2024

I tried testing with TPC-H but see a memory issue:

Hmm, I just ran TPC-H with this PR on Spark 3.4 using datafusion-comet script without any error.

@andygrove
Copy link
Member

Yes. If using on-heap config, CometShuffleMemoryAllocator will throw runtime error, i.e., you need to use off-heap config in Spark.

Right, so if the user is using on-heap, we should not use Comet shuffle and should fall back to Spark. We probably just need to update isCometShuffleEnabled to check if off-heap is being used.

@viirya
Copy link
Member Author

viirya commented Nov 8, 2024

Yes. If using on-heap config, CometShuffleMemoryAllocator will throw runtime error, i.e., you need to use off-heap config in Spark.

Right, so if the user is using on-heap, we should not use Comet shuffle and should fall back to Spark. We probably just need to update isCometShuffleEnabled to check if off-heap is being used.

Oh, I see. That sounds good. I will update it.

@andygrove
Copy link
Member

Hmm, I just ran TPC-H with this PR on Spark 3.4 using datafusion-comet script without any error.

These are the settings that I am using. I am running in k8s.

$SPARK_HOME/bin/spark-submit \
    --master $SPARK_MASTER \
    --conf spark.eventLog.enabled=false \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.driver.memory=8G \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=12g \
    --conf spark.executor.instances=4 \
    --conf spark.executor.memory=30719m \
    --conf spark.executor.cores=6 \
    --conf spark.comet.memory.overhead.factor=0.04 \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.exec.shuffle.mode=jvm \

@viirya
Copy link
Member Author

viirya commented Nov 8, 2024

Hmm, I just ran TPC-H with this PR on Spark 3.4 using datafusion-comet script without any error.

These are the settings that I am using. I am running in k8s.

$SPARK_HOME/bin/spark-submit \
    --master $SPARK_MASTER \
    --conf spark.eventLog.enabled=false \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.driver.memory=8G \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=12g \
    --conf spark.executor.instances=4 \
    --conf spark.executor.memory=30719m \
    --conf spark.executor.cores=6 \
    --conf spark.comet.memory.overhead.factor=0.04 \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.exec.shuffle.mode=jvm \

This is what I used to run:

$SPARK_HOME/bin/spark-submit \
    --master "local[*]" \
    --jars $COMET_JAR \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.driver.memory=8G --conf spark.executor.memory=10G --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=16G \
    --conf spark.comet.enabled=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.cast.allowIncompatible=true \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.exec.shuffle.mode=jvm \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --benchmark tpch
    ...

@viirya
Copy link
Member Author

viirya commented Nov 8, 2024

I don't set spark.comet.memory.overhead.factor. Do you need it?

@andygrove
Copy link
Member

I don't set spark.comet.memory.overhead.factor. Do you need it?

This is the value from #886, which I think this PR is intended to close.

I ran a clean build this morning and did not see the segfault, so it is possible that I picked up an old docker image ... I will continue testing this morning.

@andygrove
Copy link
Member

Actually, this PR won't close #886 because this is still using a singleton, so let's ignore that for now.

This PR LGTM and I will approve after some more testing.

@viirya
Copy link
Member Author

viirya commented Nov 8, 2024

It fallbacks to Spark shuffle now if off-heap is not enabled.

@viirya viirya force-pushed the jvm_shuffle_allocator branch 2 times, most recently from 67fab58 to e7e7847 Compare November 8, 2024 18:49
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @viirya

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 59.09091% with 36 lines in your changes missing coverage. Please review.

Project coverage is 34.19%. Comparing base (845b654) to head (e7e7847).
Report is 13 commits behind head on main.

Files with missing lines Patch % Lines
...shuffle/comet/CometTestShuffleMemoryAllocator.java 68.96% 9 Missing and 9 partials ⚠️
...ark/shuffle/comet/CometShuffleMemoryAllocator.java 30.76% 8 Missing and 1 partial ⚠️
.../comet/execution/shuffle/CometDiskBlockWriter.java 0.00% 4 Missing ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 20.00% 0 Missing and 4 partials ⚠️
...spark/sql/comet/execution/shuffle/SpillWriter.java 50.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1063      +/-   ##
============================================
- Coverage     34.46%   34.19%   -0.28%     
+ Complexity      888      884       -4     
============================================
  Files           113      115       +2     
  Lines         43580    42765     -815     
  Branches       9658     9346     -312     
============================================
- Hits          15021    14622     -399     
+ Misses        25507    25279     -228     
+ Partials       3052     2864     -188     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@viirya
Copy link
Member Author

viirya commented Nov 8, 2024

Actually, this PR won't close #886 because this is still using a singleton, so let's ignore that for now.

As now the allocator uses all available memory on the executor (we don't specify memory size on the allocator), it should not be an issue for #886 now. @andygrove Do you want to re-check if #886 can be fixed by this PR too? Thanks.

And similar to TaskMemoryManager, I think it makes more sense to have a singleton of memory allocator for shuffle writers in same executor.

@andygrove
Copy link
Member

Can we make COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE and COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR internal configs now, since they are only used in tests now?

@viirya
Copy link
Member Author

viirya commented Nov 9, 2024

Can we make COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE and COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR internal configs now, since they are only used in tests now?

Yes. They should be internal configs now. Let me update it now.

@andygrove
Copy link
Member

As now the allocator uses all available memory on the executor (we don't specify memory size on the allocator), it should not be an issue for #886 now. @andygrove Do you want to re-check if #886 can be fixed by this PR too? Thanks.

I will test this again today.

@andygrove
Copy link
Member

andygrove commented Nov 11, 2024

I'm running into SIGSEGV issues again.

│ # A fatal error has been detected by the Java Runtime Environment:                                                                                                                                      │
│ #                                                                                                                                                                                                       │
│ #  SIGSEGV (0xb) at pc=0x000072e2c93b6bc8, pid=11, tid=127                                                                                                                                              │
│ #                                                                                                                                                                                                       │
│ # JRE version: OpenJDK Runtime Environment Temurin-11.0.24+8 (11.0.24+8) (build 11.0.24+8)                                                                                                              │
│ # Java VM: OpenJDK 64-Bit Server VM Temurin-11.0.24+8 (11.0.24+8, mixed mode, sharing, tiered, compressed oops, g1 gc, linux-amd64)                                                                     │
│ # Problematic frame:                                                                                                                                                                                    │
│ # C  [libcomet-14210005976568904946.so+0x736bc8]  comet::execution::shuffle::row::append_columns::h9b53b563e484a30e+0x1318                                                                              │
│ #      

I will try running the same benchmark on main.

edit: I cannot reproduce on main because it fails there with

Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 67108864 bytes of memory, got 39485440 bytes. Available: 39485440                                                           │

@andygrove
Copy link
Member

I increased the off-heap pool size, and now I can run TPC-H q5 @ sf=1TB on the main branch, but get SIGSEGV with this PR.

@viirya
Copy link
Member Author

viirya commented Nov 11, 2024

Let me see if I can reproduce it.

@viirya
Copy link
Member Author

viirya commented Nov 13, 2024

Ah, I figured out what was wrong there. I updated this with the change.

I ran the benchmarks locally and didn't see the error.

Please also run the benchmarks to verify it fixes the error. Thanks. @andygrove

Comment on lines +70 to +72
// CometShuffleMemoryAllocator stores pages in TaskMemoryManager which is not singleton,
// but one instance per task. So we need to create a new instance for each task.
return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should also address my concerns about #886. I am testing now.

* created. For Spark tests, this returns `CometTestShuffleMemoryAllocator` which is a test-only
* allocator that should not be used in production.
*/
public static synchronized CometShuffleMemoryAllocatorTrait getInstance(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit, but we could stop making the method synchronized and add a synchronized block around the INSTANCE creation when in test mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds good. I will update this.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I no longer see memory errors with the recent changes.

@viirya
Copy link
Member Author

viirya commented Nov 14, 2024

Cool. Thanks @andygrove for verifying it.

@andygrove andygrove merged commit c32bf0c into apache:main Nov 14, 2024
74 checks passed
@viirya viirya deleted the jvm_shuffle_allocator branch November 14, 2024 06:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants