Skip to content

Commit

Permalink
Merge pull request JerryLead#26 from zhouxw-lang/review_5-Architectur…
Browse files Browse the repository at this point in the history
…e_english_translation

Refine English translation of 5-Architecture, mainly fixing grammatic…
  • Loading branch information
JerryLead committed Jul 14, 2015
2 parents 13258d8 + d008a23 commit 85bae95
Showing 1 changed file with 28 additions and 28 deletions.
56 changes: 28 additions & 28 deletions markdown/english/5-Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

We talked about spark jobs in chapter 3. In this chapter, we will talk about the architecture and how master, worker, driver and executors are coordinated to finish a job.

> Feel free to skip code if you prefer to the pictures on the principle.
> Feel free to skip code if you prefer diagrams.

## Deployment diagram
Expand All @@ -15,7 +15,7 @@ Next, we will talk about some details about it.

## Job submission

The picture below illustrates how driver program (on master node) produces job, and then submits them to worker nodes.
The diagram below illustrates how driver program (on master node) produces job, and then submits it to worker nodes.

![JobSubmission](../PNGfigures/JobSubmission.png)

Expand Down Expand Up @@ -52,7 +52,7 @@ finalRDD.action()

Explanation:

When the following code gets evaluated, the program will launch a lot of driver communication, e.g. job's executors, threads, actors, etc.
When the following code is evaluated, the program will launch a bunch of driver communications, e.g. job's executors, threads, actors, etc.

```scala
val sc = new SparkContext(sparkConf)
Expand All @@ -76,7 +76,7 @@ Each `action()` triggers a job:

### Task distribution

After `sparkDeploySchedulerBackend` gets `TaskSet`, the `Driver Actor` sends the serialized tasks to `CoarseGrainedExecutorBackend Actor` on worker node.
After `sparkDeploySchedulerBackend` gets `TaskSet`, the `Driver Actor` sends serialized tasks to `CoarseGrainedExecutorBackend Actor` on worker node.

## Job reception

Expand All @@ -87,18 +87,18 @@ coarseGrainedExecutorBackend ! LaunchTask(serializedTask)
=> executor.launchTask()
=> executor.threadPool.execute(new TaskRunner(taskId, serializedTask))
```
**Executor packages each task into `taskRunner`, and picks a free thread to run the task. A `CoarseGrainedExecutorBackend` process has only one executor**
**Executor packages each task into `taskRunner`, and picks a free thread to run the task. A `CoarseGrainedExecutorBackend` process has exactly one executor**

## Task execution

The picture below shows the execution of a task distributed to worker node and how to process task results.
The diagram below shows the execution of a task received by worker node and how driver processes task results.

![TaskExecution](../PNGfigures/taskexecution.png)

After receiving a serialized task, the executor deserializes it to normal task, and then runs the task to get `directResult` which will be sent back to driver. But data package sent from `Actor` can not be too big:
After receiving a serialized task, the executor deserializes it into a normal task, and then runs the task to get `directResult` which will be sent back to driver. It is noteworthy that data package sent from `Actor` can not be too big:

- If the result is too big (e.g. the one of `groupByKey`), it will be persisted to memory + hard disk managed by `blockManager`. Driver will only get `indirectResult` containing the storage location. When result is needed, driver will fetch them via http.
- If the result is not big (less than `spark.akka.frameSize = 10MB`), then it will be directly sent to driver.
- If the result is too big (e.g. the one of `groupByKey`), it will be persisted to "memory + hard disk" and managed by `blockManager`. Driver will only get `indirectResult` containing the storage location. When result is needed, driver will fetch it via HTTP.
- If the result is not too big (less than `spark.akka.frameSize = 10MB`), then it will be directly sent to driver.

**Some more details about `blockManager`:**

Expand All @@ -118,14 +118,14 @@ In TaskRunner.run()
=> coarseGrainedExecutorBackend.statusUpdate(result)
=> driver ! StatusUpdate(executorId, taskId, result)
```
The result produced by `ShuffleMapTask` and `ResultTask` are different.
The results produced by `ShuffleMapTask` and `ResultTask` are different.

- `ShuffleMapTask` produces `MapStatus` containing 2 things:
- `ShuffleMapTask` produces `MapStatus` containing 2 parts:
- the `BlockManagerId` of the task's `BlockManager`: (executorId + host, port, nettyPort)
- the size of each output `FileSegment` of a task

- `ResultTask` produces the execution result of the specified `function` on one partition
e.g. the `function` of `count()` is simply for counting the number of records in partition. Since `ShuffleMapTask` needs `FileSegment` for writing on disk, `OutputStream` writers are needed. These writers are produced and managed by `blockManger` of `shuffleBlockManager`
e.g. The `function` of `count()` is simply for counting the number of records in a partition. Since `ShuffleMapTask` needs `FileSegment` for writing to disk, `OutputStream` writers are needed. These writers are produced and managed by `blockManger` of `shuffleBlockManager`

```scala
In task.run(taskId)
Expand All @@ -139,13 +139,13 @@ In task.run(taskId)
=> return func(context, rdd.iterator(split, context))
```

A series of operations will be done after driver gets a task's result:
A series of operations will be executed after driver gets a task's result:

`TaskScheduler` will be notified that the task is finished, and its result will be analyzed.
- If it's a `indirectResult`, `BlockManager.getRemotedBytes()` will be invoked to fetch actual results.
- If it is `ResultTask`, `ResultHandler()` invokes driver side computation on result (e.g. `count()` take `sum` operation on all ResultTask)
- If it is `MapStatus` of `ShuffleMapTask`, then `MapStatus` will be put into `mapStatuses` of `mapOutputTrackerMaster`, which makes it more easy to be requested during reduce shuffle
- If the received task on driver is the last task in the stage, then next stage will be submitted. If the stage is already the last one, `dagScheduler` will be told that the job is finished
`TaskScheduler` will be notified that the task is finished, and its result will be processed:
- If it is `indirectResult`, `BlockManager.getRemotedBytes()` will be invoked to fetch actual results.
- If it is `ResultTask`, `ResultHandler()` invokes driver side computation on result (e.g. `count()` take `sum` operation on all ResultTask).
- If it is `MapStatus` of `ShuffleMapTask`, then `MapStatus` will be put into `mapStatuses` of `mapOutputTrackerMaster`, which makes it more easy to be queried during reduce shuffle.
- If the received task on driver is the last task in the stage, then next stage will be submitted. If the stage is already the last one, `dagScheduler` will be informed that the job is finished.

```scala
After driver receives StatusUpdate(result)
Expand All @@ -167,7 +167,7 @@ After driver receives StatusUpdate(result)
=> jobWaiter.taskSucceeded(index, result)
=> resultHandler(index, result)

// if the finished task is ShuffleMapTask
// If the finished task is ShuffleMapTask
=> stage.addOutputLoc(smt.partitionId, status)
=> if (all tasks in current stage have finished)
mapOutputTrackerMaster.registerMapOutputs(shuffleId, Array[MapStatus])
Expand All @@ -177,13 +177,13 @@ After driver receives StatusUpdate(result)

## Shuffle read

In the last paragraph, we talked about task execution and result processing, now we will talk about how reducer (tasks needs shuffle) gets the input data. The shuffle read part in last chapter has already talked about how reducer processes input data.
In the preceding paragraph, we talked about task execution and result processing, now we will talk about how reducer (tasks needs shuffle) gets the input data. The shuffle read part in last chapter has already talked about how reducer processes input data.

**How does reducer know where to fetch data ?**

![readMapStatus](../PNGfigures/readMapStatus.png)

Reducer needs to know on which node the `FileSegments` produced by `ShuffleMapTask` of parent stage are. **This kind of information is sent to driver’s `mapOutputTrackerMaster` when `ShuffleMapTask` is finished. The information is also stored in `mapStatuses: HashMap[stageId, Array[MapStatus]]`**. Given `stageId`, we can get`Array[MapStatus]` which contains information about `FileSegments` produced by `ShuffleMapTasks`. `Array(taskId)` tells the location(`blockManagerId`) and the size of each `FileSegment`.
Reducer needs to know on which node the `FileSegments` produced by `ShuffleMapTask` of parent stage are. **This kind of information is sent to driver’s `mapOutputTrackerMaster` when `ShuffleMapTask` is finished. The information is also stored in `mapStatuses: HashMap[stageId, Array[MapStatus]]`**. Given `stageId`, we can get`Array[MapStatus]` which contains information about `FileSegments` produced by `ShuffleMapTasks`. `Array(taskId)` contains the location(`blockManagerId`) and the size of each `FileSegment`.

When reducer need fetch input data, it will first invoke `blockStoreShuffleFetcher` to get input data’s location (`FileSegments`). `blockStoreShuffleFetcher` calls local `MapOutputTrackerWorker` to do the work. `MapOutputTrackerWorker` uses `mapOutputTrackerMasterActorRef` to communicate with `mapOutputTrackerMasterActor` in order to get `MapStatus`. `blockStoreShuffleFetcher` processes `MapStatus` and finds out where reducer should fetch `FileSegment` information, and then it stores this information in `blocksByAddress`. `blockStoreShuffleFetcher` tells `basicBlockFetcherIterator` to fetch `FileSegment` data.

Expand All @@ -201,9 +201,9 @@ rdd.iterator()

![blocksByAddress](../PNGfigures/blocksByAddress.png)

After `basicBlockFetcherIterator` received the task for data retrieving, it produces several `fetchRequest`s. **Each of them contains the tasks to fetch `FileSegment`s from several nodes. **According to the picture above, we know that `reducer-2` needs to fetch `FileSegment`(FS)(in white) from 3 worker nodes. The global data fetching task can be represented by `blockByAddress`: 4 blocks from node 1, 3 blocks from node 2, and 4 blocks from node 3
After `basicBlockFetcherIterator` has received the task of data retrieving, it produces several `fetchRequest`s. **Each of them contains the tasks to fetch `FileSegment`s from several nodes. **According to the diagram above, we know that `reducer-2` needs to fetch `FileSegment`(FS)(in white) from 3 worker nodes. The global data fetching task can be represented by `blockByAddress`: 4 blocks from node 1, 3 blocks from node 2, and 4 blocks from node 3

In order to accelerate data fetching process, it makes sense to divide the global tasks into sub tasks(`fetchRequest`), then every task takes a thread to fetch data. Spark launches 5 parallel threads for each reducer (same as Hadoop). Since the fetched data will be buffered into memory, one fetch is not able to take too much data (no more than `spark.reducer.maxMbInFlight=48MB`). **Note that `48MB` is shared by the 5 fetch threads,** so each sub task should take no more than `48MB / 5 = 9.6MB`. In the picture, on node 1, `size(FS0-2) + size(FS1-2) < 9.6MB, but size(FS0-2) + size(FS1-2) + size(FS2-2) > 9.6MB`, then we should break between `t1-r2` and `t2-r2`. As a result, we can see 2 `fetchRequest`s fetch data from node 1. **Will there be `fetchRequest` larger than 9.6MB?** The answer is yes. If one `FileSegment` is too large, it still needs to be fetched in one shot. Besides, if reducer needs some `FileSegment`s already exist on the local, it will do local read. At the end of shuffle read, it will deserialize fetched `FileSegment` and offer record iterators to `RDD.compute()`
In order to accelerate data fetching process, it makes sense to divide the global tasks into sub tasks(`fetchRequest`), then every task takes a thread to fetch data. Spark launches 5 parallel threads for each reducer (the same as Hadoop). Since the fetched data will be buffered into memory, one fetch is not able to take too much data (no more than `spark.reducer.maxMbInFlight=48MB`). **Note that `48MB` is shared by the 5 fetch threads,** so each sub task should take no more than `48MB / 5 = 9.6MB`. In the diagram, on node 1, we have `size(FS0-2) + size(FS1-2) < 9.6MB, but size(FS0-2) + size(FS1-2) + size(FS2-2) > 9.6MB`, so we should break between `t1-r2` and `t2-r2`. As a result, we can see 2 `fetchRequest`s fetching data from node 1. **Will there be `fetchRequest` larger than 9.6MB?** The answer is yes. If one `FileSegment` is too large, it still needs to be fetched in one shot. Besides, if reducer needs some `FileSegment`s already existing on the local, it will do local read. At the end of shuffle read, it will deserialize fetched `FileSegment` and offer record iterators to `RDD.compute()`

```scala
In basicBlockFetcherIterator:
Expand Down Expand Up @@ -231,9 +231,9 @@ Some more details:

![fetchrequest](../PNGfigures/fetchrequest.png)

When `RDD.iterator()` meets `ShuffleDependency`, `BasicBlockFetcherIterator` will be called to fetch `FileSegment`s. `BasicBlockFetcherIterator` uses `connectionManager` of `blockManger` to send `fetchRequest` to `connectionManager`s on the other nodes. NIO is used for communication between `connectionManager`s. On the other nodes, for example, after `connectionManager` on worker node 2 receives a message, it will forward to `blockManager`. The latter uses `diskStore` to read `FileSegment`s requested by `fetchRequest` locally, they will still be sent back by `connectionManager`. If `FileConsolidation` is activated, `diskStore` needs the location of `blockId` given by `shuffleBolockManager`. If `FileSegment` is no more than `spark.storage.memoryMapThreshold = 8KB`, then diskStore will put `FileSegment` into memory when reading it, otherwise, The memory mapping method in `FileChannel` of `RandomAccessFile` will be used to read `FileSegment`, thus large `FileSegment` can be loaded into memory.
When `RDD.iterator()` meets `ShuffleDependency`, `BasicBlockFetcherIterator` will be called to fetch `FileSegment`s. `BasicBlockFetcherIterator` uses `connectionManager` of `blockManger` to send `fetchRequest` to `connectionManager`s on the other nodes. NIO is used for communication between `connectionManager`s. On the other nodes, for example, after `connectionManager` on worker node 2 receives a message, it will forward the message to `blockManager`. The latter uses `diskStore` to read `FileSegment`s requested by `fetchRequest` locally, they will still be sent back by `connectionManager`. If `FileConsolidation` is activated, `diskStore` needs the location of `blockId` given by `shuffleBolockManager`. If `FileSegment` is no more than `spark.storage.memoryMapThreshold = 8KB`, then diskStore will put `FileSegment` into memory when reading it, otherwise, The memory mapping method in `FileChannel` of `RandomAccessFile` will be used to read `FileSegment`, thus large `FileSegment` can be loaded into memory.

When `BasicBlockFetcherIterator` receives serialized `FileSegments` from the other nodes, it will deserialize and put them in `fetchResults.Queue`. You may notice that **`fetchResults.Queue` is similar to `softBuffer` in `Shuffle detials` chapter.** If the `FileSegment`s needed by `BasicBlockFetcherIterator` are local, they will be found locally by `diskStore`, and put in `fetchResults`, finally, reducer reads the records from `FileSegment` and processes them.
When `BasicBlockFetcherIterator` receives serialized `FileSegments` from the other nodes, it will deserialize and put them in `fetchResults.Queue`. You may notice that **`fetchResults.Queue` is similar to `softBuffer` in `Shuffle detials` chapter.** If the `FileSegment`s needed by `BasicBlockFetcherIterator` are local, they will be found locally by `diskStore`, and put in `fetchResults`. Finally, reducer reads the records from `FileSegment` and processes them.

```scala
After the blockManager receives the fetch request
Expand All @@ -254,7 +254,7 @@ After the blockManager receives the fetch request
channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
```

Every reducer has a `BasicBlockFetcherIterator`, and one `BasicBlockFetcherIterator` could, in theory, hold 48MB `fetchResults`. As soon as one `FileSegment` in `fetchResults` is read off, some `FileSegment`s will be fetched to fill the 48MB.
Every reducer has a `BasicBlockFetcherIterator`, and one `BasicBlockFetcherIterator` could, in theory, hold 48MB of `fetchResults`. As soon as one `FileSegment` in `fetchResults` is read off, some `FileSegment`s will be fetched to fill that 48MB.

```scala
BasicBlockFetcherIterator.next()
Expand All @@ -268,8 +268,8 @@ BasicBlockFetcherIterator.next()

## Discussion

In terms of architecture design, functionalities and modules are pretty independent. `BlockManager` is well designed, but it seems to manage too many things (data block, memory, disk, network communication)
In terms of architecture design, functionalities and modules are pretty independent. `BlockManager` is well designed, but it seems to manage too many things (data block, memory, disk and network communication)

This chapter discussed about how the modules of spark system are coordinated to finish a job (production, submission, execution, results collection, results computation and shuffle). A lot of code is pasted, many pictures are drawn. More details can be found in source code, if you want.
This chapter discussed how the modules of spark system are coordinated to finish a job (production, submission, execution, results collection, results computation and shuffle). A lot of code is pasted, many diagrams are drawn. More details can be found in source code, if you want.

If you also want to know more about `blockManager`, please refer to Jerry Shao's [blog](http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/) (in Chinese).

0 comments on commit 85bae95

Please sign in to comment.