Skip to content

Commit

Permalink
fix: RDD 串接流的处理数据量计算使用bitset防止个别分区先计算的问题导致计算错误
Browse files Browse the repository at this point in the history
  • Loading branch information
xliuqq committed Nov 27, 2024
1 parent 0f0e0e2 commit 95d43e5
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions docs/posts/spark_accumulator.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ SparkContext 中注册累加器,不暴露 AccumulatorV2 的`countFailedValues`

## Transform 中的累加器重算

> 实验代码见:[spark/module-progress](https://gitee.com/oscsc/bigdatatech/tree/master/spark/module-progress)
由上一节可知:即使一个分区中调用多次累加器的`add`函数,也只会在 Task 完成后由 Driver更新。

- 因此一个分区只要触发一次`add`即可。
Expand All @@ -69,4 +71,41 @@ Driver调用累加器的`merge`函数时,如何知道重算(是个已经算



但是,<font color='red'>**如果 RDD 先执行部分分区的计算,再执行整体的计算,则无法得到正确的值**</font>。

- 如下面所示的代码,假设对于`newRdd `先执行了部分分区的操作,再执行所有分区数据操作;
- 依据分区总数,就会计算前面的分区两次,导致结果出错;
- 因此,使用scala原生的`BitSet`,使用位图索引,避免分区计算多次,同时节省内存。

```scala
// val rdd: RDD[(K, V)]

val newRdd: RDD[(Int, Array[Int])] = rdd.mapPartitions(iter => {
val id = TaskContext.getPartitionId()
new Iterator[(Int, Array[Int])] {
private var count = 0

override def hasNext: Boolean = {
val flag = iter.hasNext
if (!flag) {
accum.add(id, count) // 每个分区只在完成时发送,因此只需 add 一次
}
flag
}
override def next(): (Int, Array[Int]) = {
count += 1
iter.next()
}
}
})

newRDD.take(10)

newRDD.foreach()
```





具体的示例,可以见[示例:Spark串接流统计输出数据行数](https://gitee.com/oscsc/bigdatatech/blob/master/spark/module-progress/README.md)

0 comments on commit 95d43e5

Please sign in to comment.