由于最近在项目中需要用到Spark的累加器,同时需要自己去自定义实现Spark的累加器,从而满足生产上的需求。对此,对Spark的累加器实现机制进行了追踪学习。
本系列文章,将从以下几个方面入手,对Spark累加器进行剖析:
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,只能累加,不能减少累加器只能在Driver端构建,并只能从Driver端读取结果,在Task端只能进行累加。
至于这里为什么只能在Task累加呢?下面的内容将会进行详细的介绍,先简单介绍下:
1 | 在Task节点,准确的就是说在executor上; |
累加器不会改变Spark lazy计算的特点,只会在Job触发的时候进行相关的累加操作
现有累加器类型:
累加器的重点类介绍
class Accumulator extends Accumulable源码(源码中已经对这个类的作用做了十分详细的解释):
1 | /** |
主要实现了累加器的初始化及封装了相关的累加器操作方法
同时在类对象构建的时候向Accumulators注册累加器
累加器的add操作的返回值类型和传入进去的值类型可以不一样
所以一定要定义好两步操作(即add方法):累加操作/合并操作
object Accumulators该方法在Driver端管理着累加器,也包含了累加器的聚合操作
trait AccumulatorParam[T] extends AccumulableParam[T, T]源码:
1 | /** |
AccumulatorParam的addAccumulator操作的泛型封装
具体的实现还是需要在具体实现类里面实现addInPlace方法
自定义实现累加器的关键
object AccumulatorParam源码:
1 | object AccumulatorParam { |
从源码中大量的implicit关键词,可以发现该类主要进行隐式类型转换的操作
TaskContextImpl在Executor端管理着我们的累加器,累加器是通过该类进行返回的
累加器的源码解析
Driver端accumulator方法
以下列这段代码中的accumulator方法为入口点,进入到相应的源码中去
val acc = new Accumulator(initialValue, param, Some(name))
源码:
1 | class Accumulator[T] private[spark] ( |
继承的Accumulable[T, T]
源码:
1 | class Accumulable[R, T] private[spark] ( |
Accumulators.register()
源码:
1 | // 传入参数,注册累加器 |
Executor端的反序列化是一个得到我们的对象的过程
初始化是在反序列化的时候就完成的,同时反序列化的时候还完成了Accumulator向TaskContextImpl的注册
TaskRunner中的run方法
1 | // 在计算的过程中,会将RDD和function经过序列化之后传给Executor端 |
Task中的collectAccumulators()方法
1 | private[spark] abstract class Task[T]( |
ResultTask中的runTask方法
1 | override def runTask(context: TaskContext): U = { |
Accumulable中的readObject方法
1 | // 在反序列化的过程中会调用Accumulable.readObject方法 |
Executor.scala
1 | // 在executor端拿到accumuUpdates值之后,会去构造一个DirectTaskResult |
CoarseGrainedExecutorBackend中的statusUpdate方法
1 | // 通过ExecutorBackend的一个实现类:CoarseGrainedExecutorBackend 中的statusUpdate方法 |
CoarseGrainedSchedulerBackend中的receive方法
1 | // Driver端在接收到消息之后,会调用CoarseGrainedSchedulerBackend中的receive方法 |
TaskSchedulerImpl的statusUpdate方法
1 | def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { |
TaskResultGetter的enqueueSuccessfulTask方法
1 | def enqueueSuccessfulTask(taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { |
TaskSchedulerImpl的handleSuccessfulTask方法
1 | def handleSuccessfulTask( |
DAGScheduler的taskEnded方法
1 | def taskEnded( |
DAGScheduler的handleTaskCompletion方法
1 | // 与上述CoarseGrainedSchedulerBackend中的receive方法章节对应 |
DAGScheduler的updateAccumulators方法
1 | private def updateAccumulators(event: CompletionEvent): Unit = { |
Accumulators的add方法
1 | def add(values: Map[Long, Any]): Unit = synchronized { |
Accumulators的++=方法
1 | def ++= (term: R) { value_ = param.addInPlace(value_, term)} |
Accumulators的value方法
1 | def value: R = { |