小知识:Spark自定义累加器的使用实例详解

累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。

累加器简单使用

Spark内置的提供了Long和Double类型的累加器。下面是一个简单的使用示例,在这个例子中我们在过滤掉RDD中奇数的同时进行计数,最后计算剩下整数的和。

?
1
2
3
4
5
6
7
8
9
10
val sparkConf = new SparkConf().setAppName(“Test”).setMaster(“local[2]”)
val sc = new SparkContext(sparkConf)
val accum = sc.longAccumulator(“longAccum”) //统计奇数的个数
val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{
if(n%2!=0) accum.add(1L) 
n%2==0
}).reduce(_+_)
println(“sum: “+sum)
println(“accum: “+accum.value)
sc.stop()

结果为:

sum: 20

accum: 5

这是结果正常的情况,但是在使用累加器的过程中如果对于spark的执行过程理解的不够深入就会遇到两类典型的错误:少加(或者没加)、多加。

自定义累加器

自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。官方同时给出了一个实现的示例:CollectionAccumulator类,这个类允许以集合的形式收集spark应用执行过程中的一些信息。例如,我们可以用这个类收集Spark处理数据时的一些细节,当然,由于累加器的值最终要汇聚到driver端,为了避免 driver端的outofmemory问题,需要对收集的信息的规模要加以控制,不宜过大。

继承AccumulatorV2类,并复写它的所有方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package spark
import constant.Constant
import org.apache.spark.util.AccumulatorV2
import util.getFieldFromConcatString
import util.setFieldFromConcatString
open class SessionAccmulator : AccumulatorV2<String, String>() {
private var result = Constant.SESSION_COUNT + “=0|”+
Constant.TIME_PERIOD_1s_3s + “=0|”+
Constant.TIME_PERIOD_4s_6s + “=0|”+
Constant.TIME_PERIOD_7s_9s + “=0|”+
Constant.TIME_PERIOD_10s_30s + “=0|”+
Constant.TIME_PERIOD_30s_60s + “=0|”+
Constant.TIME_PERIOD_1m_3m + “=0|”+
Constant.TIME_PERIOD_3m_10m + “=0|”+
Constant.TIME_PERIOD_10m_30m + “=0|”+
Constant.TIME_PERIOD_30m + “=0|”+
Constant.STEP_PERIOD_1_3 + “=0|”+
Constant.STEP_PERIOD_4_6 + “=0|”+
Constant.STEP_PERIOD_7_9 + “=0|”+
Constant.STEP_PERIOD_10_30 + “=0|”+
Constant.STEP_PERIOD_30_60 + “=0|”+
Constant.STEP_PERIOD_60 + “=0”
override fun value(): String {
return this.result
}
/**
* 合并数据
*/
override fun merge(other: AccumulatorV2<String, String>?) {
if (other == null) return else {
if (other is SessionAccmulator) {
var newResult = “”
val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,
Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,
Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,
Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,
Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)
resultArray.forEach {
val oldValue = other.result.getFieldFromConcatString(“|”, it)
if (oldValue.isNotEmpty()) {
val newValue = oldValue.toInt() + 1
//找到原因,一直在循环赋予值,debug30分钟 很烦
if (newResult.isEmpty()){
newResult = result.setFieldFromConcatString(“|”, it, newValue.toString())
}
//问题就在于这里,自定义没有写错,合并错了
newResult = newResult.setFieldFromConcatString(“|”, it, newValue.toString())
}
}
result = newResult
}
}
}
override fun copy(): AccumulatorV2<String, String> {
val sessionAccmulator = SessionAccmulator()
sessionAccmulator.result = this.result
return sessionAccmulator
}
override fun add(p0: String?) {
val v1 = this.result
val v2 = p0
if (v2.isNullOrEmpty()){
return
}else{
var newResult = “”
val oldValue = v1.getFieldFromConcatString(“|”, v2!!)
if (oldValue.isNotEmpty()){
val newValue = oldValue.toInt() + 1
newResult = result.setFieldFromConcatString(“|”, v2, newValue.toString())
}
result = newResult
}
}
override fun reset() {
val newResult = Constant.SESSION_COUNT + “=0|”+
Constant.TIME_PERIOD_1s_3s + “=0|”+
Constant.TIME_PERIOD_4s_6s + “=0|”+
Constant.TIME_PERIOD_7s_9s + “=0|”+
Constant.TIME_PERIOD_10s_30s + “=0|”+
Constant.TIME_PERIOD_30s_60s + “=0|”+
Constant.TIME_PERIOD_1m_3m + “=0|”+
Constant.TIME_PERIOD_3m_10m + “=0|”+
Constant.TIME_PERIOD_10m_30m + “=0|”+
Constant.TIME_PERIOD_30m + “=0|”+
Constant.STEP_PERIOD_1_3 + “=0|”+
Constant.STEP_PERIOD_4_6 + “=0|”+
Constant.STEP_PERIOD_7_9 + “=0|”+
Constant.STEP_PERIOD_10_30 + “=0|”+
Constant.STEP_PERIOD_30_60 + “=0|”+
Constant.STEP_PERIOD_60 + “=0”
result = newResult
}
override fun isZero(): Boolean {
val newResult = Constant.SESSION_COUNT + “=0|”+
Constant.TIME_PERIOD_1s_3s + “=0|”+
Constant.TIME_PERIOD_4s_6s + “=0|”+
Constant.TIME_PERIOD_7s_9s + “=0|”+
Constant.TIME_PERIOD_10s_30s + “=0|”+
Constant.TIME_PERIOD_30s_60s + “=0|”+
Constant.TIME_PERIOD_1m_3m + “=0|”+
Constant.TIME_PERIOD_3m_10m + “=0|”+
Constant.TIME_PERIOD_10m_30m + “=0|”+
Constant.TIME_PERIOD_30m + “=0|”+
Constant.STEP_PERIOD_1_3 + “=0|”+
Constant.STEP_PERIOD_4_6 + “=0|”+
Constant.STEP_PERIOD_7_9 + “=0|”+
Constant.STEP_PERIOD_10_30 + “=0|”+
Constant.STEP_PERIOD_30_60 + “=0|”+
Constant.STEP_PERIOD_60 + “=0”
return this.result == newResult
}
}

方法介绍

value方法:获取累加器中的值

       merge方法:该方法特别重要,一定要写对,这个方法是各个task的累加器进行合并的方法(下面介绍执行流程中将要用到)

        iszero方法:判断是否为初始值

        reset方法:重置累加器中的值

        copy方法:拷贝累加器

spark中累加器的执行流程:

          首先有几个task,spark engine就调用copy方法拷贝几个累加器(不注册的),然后在各个task中进行累加(注意在此过程中,被最初注册的累加器的值是不变的),执行最后将调用merge方法和各个task的结果累计器进行合并(此时被注册的累加器是初始值)

总结

以上就是本文关于Spark自定义累加器的使用实例详解的全部内容,希望对大家有所帮助。有什么问题可以随时留言,小编会及时回复大家的。

原文链接:http://www.cnblogs.com/zhangweilun/p/6684776.html

声明: 猿站网有关资源均来自网络搜集与网友提供,任何涉及商业盈利目的的均不得使用,否则产生的一切后果将由您自己承担! 本平台资源仅供个人学习交流、测试使用 所有内容请在下载后24小时内删除,制止非法恶意传播,不对任何下载或转载者造成的危害负任何法律责任!也请大家支持、购置正版! 。本站一律禁止以任何方式发布或转载任何违法的相关信息访客发现请向站长举报,会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。本网站的资源部分来源于网络,如有侵权烦请发送邮件至:2697268773@qq.com进行处理。
建站知识

小知识:Spark Graphx计算指定节点的N度关系节点源码

2023-4-27 12:00:54

建站知识

小知识:Spark的广播变量和累加器使用方法代码示例

2023-4-27 12:15:21

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索