现象描述
1 | val acc = sc.accumulator(0, “Error Accumulator”) |
上述现象,会造成acc.value的最终值变为10
原因分析
Spark中的一系列transform操作都会构造成一长串的任务链,此时就需要通过一个action操作来触发(lazy的特性),accumulator也是如此。
- 因此在一个action操作之后,调用value方法查看,是没有任何变化
- 第一次action操作之后,调用value方法查看,变成了5
- 第二次action操作之后,调用value方法查看,变成了10
原因就在于第二次action操作的时候,又执行了一次累加器的操作,同个累加器,在原有的基础上又加了5,从而变成了10
解决方案
通过上述的现象描述,我们可以很快知道解决的方法:只进行一次action操作。基于此,我们只要切断任务之间的依赖关系就可以了,即使用cache、persist。这样操作之后,那么后续的累加器操作就不会受前面的transform操作影响了
案例地址
相关的工程案例地址在Github上:https://github.com/lemonahit/spark-train/tree/master/01-Accumulator
1 | import org.apache.spark.{SparkConf, SparkContext} |