Skip to content

Latest commit

 

History

History
365 lines (282 loc) · 13.6 KB

4-concurrency-parallelism.md

File metadata and controls

365 lines (282 loc) · 13.6 KB

4. Concurrency and Parallelism

4.1. SHOULD avoid concurrency like the plague it is

Avoid having to deal with concurrency as much as possible. People good at concurrency avoid it like the plague it is.

WARNING: concurrency issues happen not only when speaking about shared memory and threads, but also between processes when contention on any kind of resource (like a database) is involved.

Example - when a job is scheduled to execute every minute by using cron.d in Linux and that job fetches and updates items from a queue persisted in MySQL, that job can take longer than 1 minute to execute and thus you can end up with 2 or 3 processes executing at the same time and contending on the same MySQL table.

4.2. SHOULD use appropriate abstractions only where suitable - Future, Actors, Rx

Learn about the abstractions available and choose between them depending on the task at hand. There is no silver bullet that can be generally applied. The more high-level the abstraction, the less scope it has in solving issues. But the less scope and power it has, the simpler and more composable the model is. For example many developers in the Scala community are overusing Akka Actors - which are great, but not when misapplied. Like don't use an Akka Actor when a Future would do.

"Power tends to corrupt, and absolute power corrupts absolutely" -- Lord Acton

Scala's Futures and Promises are good because:

  • they are inherently parallelizable by eliminating concurrency concerns
  • fairly efficient, because when submitting a task to the implicit ExecutionContext, this execution context efficiently multiplexes between few threads by default (the number of threads in the thread-pool is often directly proportional to the number of CPU cores you have)
  • alternative implementations can be even more simpler and efficient than Scala's standard Future, because the standard Future was designed to be general purpose
  • the model is inherently simple and easy to use

Futures and Promises are bad because they signal only one value from the producer to the consumer and that's it - if you need a stream or bi-directional communications, a Future might not be the best abstraction.

Akka Actors are good because:

  • they make bidirectional communications over asynchronous boundaries easy - for example WebSocket is a prime candidate for actors
  • with Akka's Actors you can easily model state machines (see context.become)
  • the processing of messages has a strong guarantee of non-concurrency - messages are processed one by one, so there's no need to worry about concurrency issues while in the context of an actor
  • instead of implementing a half-assed in-memory queue for processing of things, you could just use an actor, since queuing of messages and acting on those messages is what they do

Akka's Actors are bad because:

  • they are fairly low-level for many tasks
  • it's extremely easy to model actors that keep a lot of state, ending up with a system that can't be horizontally scaled
  • because of the bidirectional communications capability, it's extremely easy to end up with data flows that are so complex as to be unmanageable
  • the model in general is actor A sending a message to actor B - but if you need to model a stream of events, this tight coupling between A and B is not acceptable
  • actors, as used in practice with Akka, tend to be inducing uncontrolled side effects and that's error prone, the opposite of functional programming and not idiomatic Scala

Streaming abstractions such as Play's Iteratees / Akka Streams / RxJava / Reactive Streams / FS2 / Monix are good because:

  • they model unidirectional communications between producers and consumers
  • events flow in one direction and so you much easily transform and compose these streams
  • depending on implementation, they address back-pressure concerns by default
  • just as in the case of Future, because of the limitations, the model is simple to use and much more reasonable and composable than actors, with the exposed operators being awesome

Streams are bad because:

  • they are only about unidirectional communications, it gets complicated if you want bidirectional communications, with actors being better at having dialogs
  • because of the strong contract they come with (i.e. no concurrent notifications for example), implementing new operators and data-source can be problematic, but usage on the consumer side is kept simple because of this

Watch this presentation by Runar Bjarnason on this subject because it's awesome: Constraints Liberate, Liberties Constrain

4.3. SHOULD NOT wrap purely CPU-bound operations in Scala's standard Futures

This is in general an anti-pattern:

def add(x: Int, y: Int) = Future { x + y }

If you don't see any kind of I/O in there, then that's a red flag. Shoving stuff in Futures without thinking is not going to solve your performance problems. Especially in the case of a web server, in which the requests are already paralellized and the above would get executed in response to requests, shoving purely CPU-bound in that Future constructor will make your logic slower to execute, not faster.

Also, in case you want to initialize a Future[T] with a constant, always use Future.successful().

4.4. MUST use Scala's BlockContext on blocking I/O

This includes all blocking I/O, including SQL queries. Real sample:

Future {
  DB.withConnection { implicit connection =>
    val query = SQL("select * from bar")
    query()
  }
}

Blocking calls are error-prone because one has to be aware of exactly what thread-pool gets affected and given the default configuration of the backend app, this can lead to non-deterministic dead-locks. It's a bug waiting to happen in production.

Here's a simplified example demonstrating the issue for didactic purposes:

implicit val ec = ExecutionContext
  .fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)

def multiply(x: Int, y: Int) = Future {
  val a = addOne(x)
  val b = addOne(y)
  val result = for (r1 <- a; r2 <- b) yield r1 * r2

  // this will dead-lock
  Await.result(result, Duration.Inf)
}

This sample is simplified to make the effect deterministic, but all thread-pools configured with upper bounds will sooner or later be affected by this.

Blocking calls have to be marked with a blocking call that signals to the BlockContext a blocking operation. It's a very neat mechanism in Scala that lets the ExecutionContext know that a blocking operation happens, such that the ExecutionContext can decide what to do about it, such as adding more threads to the thread-pool (which is what Scala's ForkJoin thread-pool does).

All the code has to be reviewed and whenever a blocking call happens, this is the fix:

import scala.concurrent.blocking
// ...
blocking {
  someBlockingCallHere()
}

NOTE: the blocking call also serves as documentation, even if the underlying thread-pool doesn't support BlockContext, as things that block are totally non-obvious.

4.5. SHOULD NOT block

Sometimes you have to block the thread underneath - unfortunately JDBC doesn't have a non-blocking API. However, when you have a choice, never, ever block. For example, don't do this:

def fetchSomething: Future[String] = ???

// later ...
val result = Await.result(fetchSomething, 3.seconds)
result.toUpperCase

Prefer keeping the context of that Future all the way:

def fetchSomething: Future[String] = ???

fetchSomething.map(_.toUpperCase)

Also checkout Scala-Async to make this easier.

4.6. SHOULD use a separate thread-pool for blocking I/O

Related to Rule 4.4, if you're doing a lot of blocking I/O (e.g. a lot of calls to JDBC), it's better to create a second thread-pool / execution context and execute all blocking calls on that, leaving the application's thread-pool to deal with CPU-bound stuff.

So you could do initialize this second execution context like:

import java.util.concurrent.Executors

// ...
private val ioThreadPool = Executors.newCachedThreadPool(
  new ThreadFactory {
    private val counter = new AtomicLong(0L)

    def newThread(r: Runnable) = {
      val th = new Thread(r)
      th.setName("eon-io-thread-" +
      counter.getAndIncrement.toString)
      th.setDaemon(true)
      th
    }
  })

Note that here I prefer to use an unbounded "cached thread-pool", so it doesn't have a limit. When doing blocking I/O the idea is that you've got to have enough threads that you can block. But if unbounded is too much, depending on use-case, you can later fine-tune it, the idea with this sample being that you get the ball rolling.

And then you could provide a helper, like:

def executeBlockingIO[T](cb: => T): Future[T] = {
  val p = Promise[T]()

  ioThreadPool.execute(new Runnable {
    def run() = try {
      p.success(blocking(cb))
    }
    catch {
      case NonFatal(ex) =>
        logger.error(s"Uncaught I/O exception", ex)
        p.failure(ex)
    }
  })

  p.future
}

Also, don't expose this I/O thread-pool as an execution context that can be imported as an implicit in scope, because then people would start using it for CPU-bound stuff by mistake, so it's better to hide it and provide this helper.

4.7. All public APIs SHOULD BE thread-safe

As a general rule of software engineering on top of the JVM, absolutely all public APIs from inside your process will end up being used in a context in which multiple threads are using these APIs at the same time. It's best practice for all public APIs (all components in your Cake for example) to be designed as thread-safe from the get go, because you do not want to chase concurrency bugs.

And if a public API is not thread-safe for some reason (like the usual compromises made in software development), then state this fact in BOLD CAPITAL LETTERS.

The reason is simple - if an API is not thread-safe, then there's no way a user of that API can know about the way to synchronize on it. Example:

val list = mutable.List.empty[String]

Lets say this mutable list is exposed. On what lock is a user supposed to synchronize? On the list itself? That's not enough, being a pretty useless lock to have in a larger context. Remember, locks are not composable and are very error-prone. Never leave the responsibility of synchronizing for contention on your users.

4.8. SHOULD avoid contention on shared reads

Meet Amdahl's Law. Synchronizing with locks drastically limits the parallelization possible and thus vertical scalability. Reads are embarrassingly paralellizable, so avoid at all times doing this:

def fetch = synchronized { someValue }

Come up with better synchronization schemes that does not involve synchronizing reads, like atomic references or STM. If you aren't able to do that, then avoid this altogether by using proper abstractions.

4.9. MUST provide a clearly defined and documented protocol for each component or actor that communicates over async boundaries

A function signature is not enough for documenting the protocol of problematic components. Especially when talking about communications over asynchronous boundaries (between threads, between processes on the network, etc...), the protocol needs a lot of detail on what you can and cannot rely on.

As a guideline, don't shy away from writing comments and document:

  • concurrency and latency concerns
  • the proper ordering of calls
  • everything that can go wrong

4.10. SHOULD always prefer single-producer scenarios

Shared writes are not parallelizable, whereas shared reads are embarrassingly parallelizable. As a metaphor, 100,000 people can watch the same soccer game on the same stadium at the same time (reads), but 100,000 people cannot all use the same bathroom (writes). In a multi-threading scenario, prefer single producer / multi consumer scenarios. This has the effect of avoiding contention and performance problems. An app does not scale vertically with multiple producers pounding on the same resource, because Amdahl's Law.

Checkout LMAX Disruptor.

4.11. MUST NOT hardcode the thread-pool / execution context

This is a general design issue related to the project as a whole, but don't do this:

import scala.concurrent.ExecutionContext.Implicits.global

def doSomething: Future[String] = ???

Tight coupling between the execution context and your logic is not good and that import is tight-coupling, especially since in the context of a Play Framework application you need to use a different thread-pool.

Just pass the ExecutionContext around as an implicit parameter. It's idiomatic and acceptable this way. Also, implicit parameters should be passed in the second group of parameters to avoid confusing implicit resolution. When passed in the first group, it allows for method calls like doSomething() that won't compile but most IDEs will show them as valid.

def doSomething()(implicit ec: ExecutionContext): Future[String] = ???