Skip to content

Commit

Permalink
[SPARK-50381][CORE] Support spark.master.rest.maxThreads
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to support `spark.master.rest.maxThreads`.

### Why are the changes needed?

To provide users a way to control the number of maximum threads of REST API. Previously, Apache Spark uses a default constructor whose value is fixed to `200` always.

https://github.com/apache/spark/blob/2e1c3dc8004b4f003cde8dfae6857f5bef4bb170/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala#L94

https://github.com/jetty/jetty.project/blob/5dfc59a691b748796f922208956bd1f2794bcd16/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java#L118-L121

### Does this PR introduce _any_ user-facing change?

No, the default value of new configuration is identical with the previously-used Jetty's default value.

### How was this patch tested?

Pass the CIs with a newly added test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48921 from dongjoon-hyun/SPARK-50381.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Nov 21, 2024
1 parent 2e1c3dc commit 2d09ef2
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.MASTER_REST_SERVER_FILTERS
import org.apache.spark.internal.config.{MASTER_REST_SERVER_FILTERS, MASTER_REST_SERVER_MAX_THREADS}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -63,7 +63,8 @@ private[spark] abstract class RestSubmissionServer(
protected val clearRequestServlet: ClearRequestServlet
protected val readyzRequestServlet: ReadyzRequestServlet

private var _server: Option[Server] = None
// Visible for testing
private[rest] var _server: Option[Server] = None

// A mapping from URL prefixes to servlets that serve them. Exposed for testing.
protected val baseContext = s"/${RestSubmissionServer.PROTOCOL_VERSION}/submissions"
Expand Down Expand Up @@ -91,7 +92,7 @@ private[spark] abstract class RestSubmissionServer(
* Return a 2-tuple of the started server and the bound port.
*/
private def doStart(startPort: Int): (Server, Int) = {
val threadPool = new QueuedThreadPool
val threadPool = new QueuedThreadPool(masterConf.get(MASTER_REST_SERVER_MAX_THREADS))
threadPool.setDaemon(true)
val server = new Server(threadPool)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1987,6 +1987,12 @@ package object config {
.intConf
.createWithDefault(6066)

private[spark] val MASTER_REST_SERVER_MAX_THREADS = ConfigBuilder("spark.master.rest.maxThreads")
.doc("Maximum number of threads to use in the Spark Master REST API Server.")
.version("4.0.0")
.intConf
.createWithDefault(200)

private[spark] val MASTER_REST_SERVER_FILTERS = ConfigBuilder("spark.master.rest.filters")
.doc("Comma separated list of filter class names to apply to the Spark Master REST API.")
.version("4.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.Base64
import scala.collection.mutable

import jakarta.servlet.http.HttpServletResponse
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._

Expand All @@ -33,7 +34,7 @@ import org.apache.spark.deploy.{SparkSubmit, SparkSubmitArguments}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.DriverState._
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.internal.config.MASTER_REST_SERVER_FILTERS
import org.apache.spark.internal.config.{MASTER_REST_SERVER_FILTERS, MASTER_REST_SERVER_MAX_THREADS}
import org.apache.spark.rpc._
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -545,6 +546,19 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
}
}

test("SPARK-50381: Support spark.master.rest.maxThreads") {
val conf = new SparkConf()
val localhost = Utils.localHostName()
val securityManager = new SecurityManager(conf)
rpcEnv = Some(RpcEnv.create("rest-with-maxThreads", localhost, 0, conf, securityManager))
val fakeMasterRef = rpcEnv.get.setupEndpoint("fake-master", new DummyMaster(rpcEnv.get))
conf.set(MASTER_REST_SERVER_MAX_THREADS, 2000)
server = Some(new StandaloneRestServer(localhost, 0, conf, fakeMasterRef, "spark://fake:7077"))
server.get.start()
val pool = server.get._server.get.getThreadPool.asInstanceOf[SizedThreadPool]
assert(pool.getMaxThreads === 2000)
}

/* --------------------- *
| Helper methods |
* --------------------- */
Expand Down

0 comments on commit 2d09ef2

Please sign in to comment.