package spark
import constant.Constant
import org.apache.spark.util.AccumulatorV2
import util.getFieldFromConcatString
import util.setFieldFromConcatString
open class SessionAccmulator : AccumulatorV2<String, String>() {
private var result = Constant.SESSION_COUNT + “=0|”+
Constant.TIME_PERIOD_1s_3s + “=0|”+
Constant.TIME_PERIOD_4s_6s + “=0|”+
Constant.TIME_PERIOD_7s_9s + “=0|”+
Constant.TIME_PERIOD_10s_30s + “=0|”+
Constant.TIME_PERIOD_30s_60s + “=0|”+
Constant.TIME_PERIOD_1m_3m + “=0|”+
Constant.TIME_PERIOD_3m_10m + “=0|”+
Constant.TIME_PERIOD_10m_30m + “=0|”+
Constant.TIME_PERIOD_30m + “=0|”+
Constant.STEP_PERIOD_1_3 + “=0|”+
Constant.STEP_PERIOD_4_6 + “=0|”+
Constant.STEP_PERIOD_7_9 + “=0|”+
Constant.STEP_PERIOD_10_30 + “=0|”+
Constant.STEP_PERIOD_30_60 + “=0|”+
Constant.STEP_PERIOD_60 + “=0”
override fun value(): String {
return this.result
}
/**
* 合并数据
*/
override fun merge(other: AccumulatorV2<String, String>?) {
if (other == null) return else {
if (other is SessionAccmulator) {
var newResult = “”
val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,
Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,
Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,
Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,
Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)
resultArray.forEach {
val oldValue = other.result.getFieldFromConcatString(“|”, it)
if (oldValue.isNotEmpty()) {
val newValue = oldValue.toInt() + 1
//找到原因,一直在循环赋予值,debug30分钟 很烦
if (newResult.isEmpty()){
newResult = result.setFieldFromConcatString(“|”, it, newValue.toString())
}
//问题就在于这里,自定义没有写错,合并错了
newResult = newResult.setFieldFromConcatString(“|”, it, newValue.toString())
}
}
result = newResult
}
}
}
override fun copy(): AccumulatorV2<String, String> {
val sessionAccmulator = SessionAccmulator()
sessionAccmulator.result = this.result
return sessionAccmulator
}
override fun add(p0: String?) {
val v1 = this.result
val v2 = p0
if (v2.isNullOrEmpty()){
return
}else{
var newResult = “”
val oldValue = v1.getFieldFromConcatString(“|”, v2!!)
if (oldValue.isNotEmpty()){
val newValue = oldValue.toInt() + 1
newResult = result.setFieldFromConcatString(“|”, v2, newValue.toString())
}
result = newResult
}
}
override fun reset() {
val newResult = Constant.SESSION_COUNT + “=0|”+
Constant.TIME_PERIOD_1s_3s + “=0|”+
Constant.TIME_PERIOD_4s_6s + “=0|”+
Constant.TIME_PERIOD_7s_9s + “=0|”+
Constant.TIME_PERIOD_10s_30s + “=0|”+
Constant.TIME_PERIOD_30s_60s + “=0|”+
Constant.TIME_PERIOD_1m_3m + “=0|”+
Constant.TIME_PERIOD_3m_10m + “=0|”+
Constant.TIME_PERIOD_10m_30m + “=0|”+
Constant.TIME_PERIOD_30m + “=0|”+
Constant.STEP_PERIOD_1_3 + “=0|”+
Constant.STEP_PERIOD_4_6 + “=0|”+
Constant.STEP_PERIOD_7_9 + “=0|”+
Constant.STEP_PERIOD_10_30 + “=0|”+
Constant.STEP_PERIOD_30_60 + “=0|”+
Constant.STEP_PERIOD_60 + “=0”
result = newResult
}
override fun isZero(): Boolean {
val newResult = Constant.SESSION_COUNT + “=0|”+
Constant.TIME_PERIOD_1s_3s + “=0|”+
Constant.TIME_PERIOD_4s_6s + “=0|”+
Constant.TIME_PERIOD_7s_9s + “=0|”+
Constant.TIME_PERIOD_10s_30s + “=0|”+
Constant.TIME_PERIOD_30s_60s + “=0|”+
Constant.TIME_PERIOD_1m_3m + “=0|”+
Constant.TIME_PERIOD_3m_10m + “=0|”+
Constant.TIME_PERIOD_10m_30m + “=0|”+
Constant.TIME_PERIOD_30m + “=0|”+
Constant.STEP_PERIOD_1_3 + “=0|”+
Constant.STEP_PERIOD_4_6 + “=0|”+
Constant.STEP_PERIOD_7_9 + “=0|”+
Constant.STEP_PERIOD_10_30 + “=0|”+
Constant.STEP_PERIOD_30_60 + “=0|”+
Constant.STEP_PERIOD_60 + “=0”
return this.result == newResult
}
}