Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor: use defaults instead of hard-coding values #1060

Merged
merged 2 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 37 additions & 40 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object CometConf extends ShimCometConf {
"Whether to enable native scans. When this is turned on, Spark will use Comet to " +
"read supported data sources (currently only Parquet is supported natively). Note " +
"that to enable native vectorized execution, both this config and " +
"'spark.comet.exec.enabled' need to be enabled. By default, this config is true.")
"'spark.comet.exec.enabled' need to be enabled.")
.booleanConf
.createWithDefault(true)

Expand All @@ -82,7 +82,7 @@ object CometConf extends ShimCometConf {
.doc(
"Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads " +
"ranges of consecutive data in a file in parallel. It is faster for large files and " +
"row groups but uses more resources. The parallel reader is enabled by default.")
"row groups but uses more resources.")
.booleanConf
.createWithDefault(true)

Expand All @@ -98,7 +98,7 @@ object CometConf extends ShimCometConf {
.doc(
"When enabled the parallel reader will try to merge ranges of data that are separated " +
"by less than 'comet.parquet.read.io.mergeRanges.delta' bytes. Longer continuous reads " +
"are faster on cloud storage. The default behavior is to merge consecutive ranges.")
"are faster on cloud storage.")
.booleanConf
.createWithDefault(true)

Expand All @@ -115,7 +115,7 @@ object CometConf extends ShimCometConf {
.doc("In the parallel reader, if the read ranges submitted are skewed in sizes, this " +
"option will cause the reader to break up larger read ranges into smaller ranges to " +
"reduce the skew. This will result in a slightly larger number of connections opened to " +
"the file system but may give improved performance. The option is off by default.")
"the file system but may give improved performance.")
.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -153,7 +153,7 @@ object CometConf extends ShimCometConf {
"native space. Note: each operator is associated with a separate config in the " +
"format of 'spark.comet.exec.<operator_name>.enabled' at the moment, and both the " +
"config and this need to be turned on, in order for the operator to be executed in " +
"native. By default, this config is true.")
"native.")
.booleanConf
.createWithDefault(true)

Expand Down Expand Up @@ -215,7 +215,7 @@ object CometConf extends ShimCometConf {
"spark.comet.memory.overhead.factor")
.doc(
"Fraction of executor memory to be allocated as additional non-heap memory per executor " +
"process for Comet. Default value is 0.2.")
"process for Comet.")
.doubleConf
.checkValue(
factor => factor > 0,
Expand Down Expand Up @@ -247,8 +247,7 @@ object CometConf extends ShimCometConf {
"is enabled. Available modes are 'native', 'jvm', and 'auto'. " +
"'native' is for native shuffle which has best performance in general. " +
"'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. " +
"'auto' is for Comet to choose the best shuffle mode based on the query plan. " +
"By default, this config is 'auto'.")
"'auto' is for Comet to choose the best shuffle mode based on the query plan.")
.internal()
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
Expand All @@ -258,8 +257,8 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
"Whether to force enabling broadcasting for Comet native operators. By default, " +
"this config is false. Comet broadcast feature will be enabled automatically by " +
"Whether to force enabling broadcasting for Comet native operators. " +
"Comet broadcast feature will be enabled automatically by " +
"Comet extension. But for unit tests, we need this feature to force enabling it " +
"for invalid cases. So this config is only used for unit test.")
.internal()
Expand All @@ -280,27 +279,26 @@ object CometConf extends ShimCometConf {
.stringConf
.createWithDefault("zstd")

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.columnar.shuffle.async.enabled")
.doc(
"Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config " +
"is false.")
.booleanConf
.createWithDefault(false)
val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.async.enabled")
.doc("Whether to enable asynchronous shuffle for Arrow-based shuffle.")
.booleanConf
.createWithDefault(false)

val COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.columnar.shuffle.async.thread.num")
.doc("Number of threads used for Comet async columnar shuffle per shuffle task. " +
"By default, this config is 3. Note that more threads means more memory requirement to " +
"buffer shuffle data before flushing to disk. Also, more threads may not always " +
"improve performance, and should be set based on the number of cores available.")
.doc(
"Number of threads used for Comet async columnar shuffle per shuffle task. " +
"Note that more threads means more memory requirement to " +
"buffer shuffle data before flushing to disk. Also, more threads may not always " +
"improve performance, and should be set based on the number of cores available.")
.intConf
.createWithDefault(3)

val COMET_COLUMNAR_SHUFFLE_ASYNC_MAX_THREAD_NUM: ConfigEntry[Int] = {
conf("spark.comet.columnar.shuffle.async.max.thread.num")
.doc("Maximum number of threads on an executor used for Comet async columnar shuffle. " +
"By default, this config is 100. This is the upper bound of total number of shuffle " +
"This is the upper bound of total number of shuffle " +
"threads per executor. In other words, if the number of cores * the number of shuffle " +
"threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than " +
"this config. Comet will use this config as the number of shuffle threads per " +
Expand All @@ -317,8 +315,7 @@ object CometConf extends ShimCometConf {
"Higher value means more memory requirement to buffer shuffle data before " +
"flushing to disk. As Comet uses columnar shuffle which is columnar format, " +
"higher value usually helps to improve shuffle data compression ratio. This is " +
"internal config for testing purpose or advanced tuning. By default, " +
"this config is Int.Max.")
"internal config for testing purpose or advanced tuning.")
.internal()
.intConf
.createWithDefault(Int.MaxValue)
Expand All @@ -341,8 +338,7 @@ object CometConf extends ShimCometConf {
.doc(
"Fraction of Comet memory to be allocated per executor process for Comet shuffle. " +
"Comet memory size is specified by `spark.comet.memoryOverhead` or " +
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`. " +
"By default, this config is 1.0.")
"calculated by `spark.comet.memory.overhead.factor` * `spark.executor.memory`.")
.doubleConf
.checkValue(
factor => factor > 0,
Expand All @@ -360,11 +356,12 @@ object CometConf extends ShimCometConf {

val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
"spark.comet.shuffle.preferDictionary.ratio")
.doc("The ratio of total values to distinct values in a string column to decide whether to " +
"prefer dictionary encoding when shuffling the column. If the ratio is higher than " +
"this config, dictionary encoding will be used on shuffling string column. This config " +
"is effective if it is higher than 1.0. By default, this config is 10.0. Note that this " +
"config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
.doc(
"The ratio of total values to distinct values in a string column to decide whether to " +
"prefer dictionary encoding when shuffling the column. If the ratio is higher than " +
"this config, dictionary encoding will be used on shuffling string column. This config " +
"is effective if it is higher than 1.0. Note that this " +
"config is only used when `spark.comet.exec.shuffle.mode` is `jvm`.")
.doubleConf
.createWithDefault(10.0)

Expand All @@ -377,7 +374,7 @@ object CometConf extends ShimCometConf {
val COMET_DEBUG_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.debug.enabled")
.doc(
"Whether to enable debug mode for Comet. By default, this config is false. " +
"Whether to enable debug mode for Comet. " +
"When enabled, Comet will do additional checks for debugging purpose. For example, " +
"validating array when importing arrays from JVM at native side. Note that these " +
"checks may be expensive in performance and should only be enabled for debugging " +
Expand Down Expand Up @@ -437,27 +434,27 @@ object CometConf extends ShimCometConf {
"The fraction of memory from Comet memory overhead that the native memory " +
"manager can use for execution. The purpose of this config is to set aside memory for " +
"untracked data structures, as well as imprecise size estimation during memory " +
"acquisition. Default value is 0.7.")
"acquisition.")
.doubleConf
.createWithDefault(0.7)

val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] = conf(
"spark.comet.parquet.enable.directBuffer")
.doc("Whether to use Java direct byte buffer when reading Parquet. By default, this is false")
.booleanConf
.createWithDefault(false)
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
conf("spark.comet.parquet.enable.directBuffer")
.doc("Whether to use Java direct byte buffer when reading Parquet.")
.booleanConf
.createWithDefault(false)

val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.preFetch.enabled")
.doc("Whether to enable pre-fetching feature of CometScan. By default is disabled.")
.doc("Whether to enable pre-fetching feature of CometScan.")
.booleanConf
.createWithDefault(false)

val COMET_SCAN_PREFETCH_THREAD_NUM: ConfigEntry[Int] =
conf("spark.comet.scan.preFetch.threadNum")
.doc(
"The number of threads running pre-fetching for CometScan. Effective if " +
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. By default it is 2. Note that more " +
s"${COMET_SCAN_PREFETCH_ENABLED.key} is enabled. Note that more " +
"pre-fetching threads means more memory requirement to store pre-fetched row groups.")
.intConf
.createWithDefault(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,8 +1275,14 @@ object CometSparkSessionExtensions extends Logging {
.byteFromString(sparkConf.get("spark.executor.memory", "1024MB"), ByteUnit.MiB)

val minimum = ConfigHelpers
.byteFromString(sparkConf.get(COMET_MEMORY_OVERHEAD_MIN_MIB.key, "384"), ByteUnit.MiB)
val overheadFactor = sparkConf.getDouble(COMET_MEMORY_OVERHEAD_FACTOR.key, 0.2)
Comment on lines -1278 to -1279
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change, to remove these hard-coded defaults.

.byteFromString(
sparkConf.get(
COMET_MEMORY_OVERHEAD_MIN_MIB.key,
COMET_MEMORY_OVERHEAD_MIN_MIB.defaultValueString),
ByteUnit.MiB)
val overheadFactor = sparkConf.getDouble(
COMET_MEMORY_OVERHEAD_FACTOR.key,
COMET_MEMORY_OVERHEAD_FACTOR.defaultValue.get)

val overHeadMemFromConf = sparkConf
.getOption(COMET_MEMORY_OVERHEAD.key)
Expand Down
Loading