-
2021-05-06 11:43:26
累加器
全局累加器
Accumulators(累加器) 是一个只支持 added(添加) 的分布式变量, 可以在分布式环境下保持一致性, 并且能够做到高效的并发.
原生 Spark 支持数值型的累加器, 可以用于实现计数或者求和, 开发者也可以使用自定义累加器以实现更高级的需求
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) val counter = sc.longAccumulator("counter") sc.parallelize(Seq(1, 2, 3, 4, 5)) .foreach(counter.add(_)) // 运行结果: 15 println(counter.value)
注意点:
-
Accumulator 是支持并发并行的, 在任何地方都可以通过
add
来修改数值, 无论是 Driver 还是 Executor -
只能在 Driver 中才能调用
value
来获取数值
累计器件还有两个小特性,第一, 累加器能保证在 Spark 任务出现问题被重启的时候不会出现重复计算. 第二, 累加器只有在 Action 执行的时候才会被触发.
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) val counter = sc.longAccumulator("counter") sc.parallelize(Seq(1, 2, 3, 4, 5)) .map(counter.add(_)) // 这个地方不是 Action, 而是一个 Transformation // 运行结果是 0 println(counter.value)
更多相关内容 -
-
Spark自定义累加器的使用实例详解
2021-01-10 09:30:29累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。 累加器简单使用 Spark... -
累加器_FPGAverilog_code_
2021-10-02 06:47:41计数器是由基本的计数单元和一些控制门所组成,计数单元则由一系列具有存储信息功能的各类触发器构成,这些触发器有RS触发器、T触发器、D触发器及JK触发器等。本代码在fpga中实现累加器的功能 -
accumulator:累加器
2021-03-11 11:56:27累加器 累加器 -
simulink计数器、累加器.zip
2019-09-16 16:39:00simulink做累加计算时用到的小模块,可用于时钟、数据的累加或计数等 -
Accumulator_manufacturingo9h_累加器_累加器;32位_
2021-10-01 17:25:08实现32位累加器功能,外部输入的信号位宽为32位 -
寄存器、累加器、暂存器有什么区别?
2021-01-20 01:05:33什么是寄存器 寄存器,是集成电路中非常重要的一种存储单元,通常由触发器组成。在集成电路设计中,寄存器可分为电路内部使用的寄存器和充当内外部接口的寄存器... 什么是累加器 在中央处理器中,累加器 (accumula -
基于动态累加器的去中心化加密搜索方案
2021-01-20 04:57:08近年来区块链技术取得广泛关注,涌现出众多基于区块链技术的新型应用,其中以 StorJ、Filecoin为代表的去中心化存储...新方法将动态累加器算法引入加密搜索过程中,保障用户存储内容隐私并提供了更好的加密搜索性能。 -
C#中累加器函数Aggregate用法实例
2020-09-03 13:31:27主要介绍了C#中累加器函数Aggregate用法,实例分析了C#中累加器的实现与使用技巧,具有一定参考借鉴价值,需要的朋友可以参考下 -
累加器labview
2017-11-25 19:45:26labview的累加器,有程序框图,但是我也不知道有啥用,怎么用,谨慎使用积分观看! -
单片机累加器A与片外RAM数据传递指令
2020-07-18 09:45:52单片机的累加器A与片外RAM之间的数据传递类指令 MOVX A,@Ri MOVX @Ri,A MOVX A,@DPTR MOVX @DPTR,A 说明: 1)在51系列单片机中,与外部存储器RAM打交道的只能是A累加器。所有需要传送入外部RAM的数据必需要通过A送... -
密码累加器关于区块链身份验证方面
2021-06-21 15:34:57密码累加器 关于 区块链 身份验证 方面的资料 -
无符号乘法器累加器
2018-05-27 16:51:09Verilog HDL中的一个8比特无符号乘法器累加器设计,它具有寄存I/O端口,支持同步装入。综合工具能够探测HDL代码中的乘法器累加器设计,自动推断出altmult_accum宏功能,提供最优结果。 -
基于动态累加器的异构传感网认证组密钥管理方案
2021-01-15 00:15:38利用动态累加器的证人能够证明特定累加项是否参与累加的特性,实现了组成员身份认证,提出了一种新的支持节点动态增加和撤销的组密钥管理方案DAAG。在需要建立组密钥时,所有成员节点提供自己持有的累加项,参与累加... -
spark中用scala编写累加器小程序统计文章中空白行
2017-03-06 14:35:06spark中用scala编写累加器小程序统计指定文章中的空白行,然后通过split函数通过空格切分文章,输出到指定的目录中。 -
对51单片机累加器A的逻辑操作指令及使用举例
2021-01-19 18:25:28对单片机的累加器A的逻辑操作: CLR A ;将A中的值清0,单周期单字节指令,与MOV A,#00H效果相同。 CPL A ;将A中的值按位取反 RL A ;将A中的值逻辑左移 RLC A ;将A中的值加上进位位进行逻辑左移 ... -
accumulator:Rust中的密码累加器
2021-05-06 14:46:57累加器 Rust中的密码累加器,是通过通用组接口实现的。 包括电池(RSA和类组实现)! 安装 # Cargo.toml [ dependencies ] accumulator = { git = " https://github.com/cambrian/accumulator.git " , tag = " v... -
传感技术中的14位模数ADC+20位突发累加器的ADI手势识别传感器方案
2020-10-15 23:47:16ADI公司的ADUX1020是集成了14位模数转换器(ADC)和20位突发累加器的高效率光度传感器,采用单点检测改善了应用的可靠性,同时所需元件数更低,因而为系统开发人员降低了设计复杂性和成本;同时还具有高环境光抑制... -
相位累加器SUM99的VHDL源程序
2021-01-19 22:58:17欢迎转载,信息维库电子市场网(www.dzsc.com) : window._bd_share_config = { "common": { "bdSnsKey": {}, "bdText": "", "b -
PySpark 累加器使用及自定义累加器
2021-06-17 19:59:05累加器(accumulator) 功能 实现在Driver端和Executor端共享变量 写的功能 实现机制 Driver端定义的变量,在Executor端的每个Task都会得到这个变量的副本; 在每个Task对自己内部的变量副本值更新完成后,传回给...累加器(accumulator)
功能
- 实现在Driver端和Executor端共享变量 写的功能
实现机制
- Driver端定义的变量,在Executor端的每个Task都会得到这个变量的副本;
在每个Task对自己内部的变量副本值更新完成后,传回给Driver端,然后将每个变量副本的值进行累计操作;
触发/生效 时机
- 受惰性求值机制的影响,只有在行动算子执行时 累加器才起作用;
使用地方
- 最好只在行动算子中使用,不要在转换算子中使用,因为转换算子可能出现失败时会重试,这时对应的累加器的值也会重试,这样累加器的值就是脏写;
使用场景
- 需要 求和 或 计数时;
注意事项
- 在对同一个rdd执行多次执行 行动算子时可能会导致 累加器 多次重复计算,导致累加器的结果错误;可以 通过在转换算子后面添加cache()解决;
一般思维下定义的无效示例
# -*- coding: utf-8 -*- """ (C) rgc All rights reserved create time '2021/5/30 20:06' Usage: 此处累加器失效的原因是 Driver端定义了累加器,将Driver端的累加器序列化到Executor端,这时是对Executor端的累加器进行写操作; 结果没有同步到Driver端,所以Driver端累加器的值仍然是0 """ # 构建spark from pyspark.conf import SparkConf from pyspark.context import SparkContext conf = SparkConf() # 使用本地模式;且 executor设置为1个方便debug conf.setMaster('local[1]').setAppName('rgc') sc = SparkContext(conf=conf) # 偶数累加器 even_num_acc = 0 # 奇数累加器 uneven_num_acc = 0 rdd = sc.parallelize([2, 1, 3, 4, 4], 1) def map_func(x: int) -> tuple: """ 将每个元素转为元祖 :param x: rdd中每个元素 :return: """ global even_num_acc global uneven_num_acc # 偶数 if x % 2 == 0: even_num_acc += 1 else: uneven_num_acc += 1 return (x, 1) # map操作 map_rdd = rdd.map(map_func) print(map_rdd.collect()) # [(2, 1), (1, 1), (3, 1), (4, 1), (4, 1)] print('偶数累加器', even_num_acc) # 0 print('奇数累加器', uneven_num_acc) # 0
累加器用法
# -*- coding: utf-8 -*- """ (C) rgc All rights reserved create time '2021/5/30 20:06' Usage: 累加器:实现在Driver端和Executor端共享变量 写的功能 实现机制:Driver端定义的变量,在Executor端的每个Task都会得到这个变量的副本,在每个Task对自己内部的变量副本值更新完成后,传回给Driver端,然后将每个变量副本的值进行累计操作; 触发时机:只有在行动算子执行时 累加器才起作用; 使用地方:最好只在行动算子中使用,不用在转换算子中使用,因为转换算子可能出现失败时会重试,这时对应的累加器的值也会重试,这样累加器的值就是脏写; 使用场景: 1.需要 求和 或 计数时; 注意事项: 1.在对同一个rdd执行多次行动算子时可能会导致在 转换算子中的 累加器 多次重复计算,导致累加器的结果错误;可以 通过在转换算子后面添加cache()解决; """ # 构建spark from pyspark.conf import SparkConf from pyspark.context import SparkContext conf = SparkConf() # 使用本地模式;且 executor设置为1个方便debug conf.setMaster('local[1]').setAppName('rgc') sc = SparkContext(conf=conf) # 偶数累加器 even_num_acc = sc.accumulator(0) # 奇数累加器 uneven_num_acc = sc.accumulator(0) rdd = sc.parallelize([2, 1, 3, 4, 4], 1) rdd1 = sc.parallelize([2, 1, 3, 4, 4], 1) def map_func(x: int) -> tuple: """ 将每个元素转为元祖 :param x: rdd中每个元素 :return: """ global even_num_acc global uneven_num_acc # 偶数 if x % 2 == 0: even_num_acc += 1 else: uneven_num_acc += 1 return (x, 1) # 操作算子 添加cache的map操作 map_rdd = rdd.map(map_func).cache() print(map_rdd.collect()) # [(2, 1), (1, 1), (3, 1), (4, 1), (4, 1)] print('操作算子 添加cache的map操作 偶数累加器', even_num_acc) # 3 print('操作算子 添加cache的map操作 奇数累加器', uneven_num_acc) # 2 print(map_rdd.collect()) # [(2, 1), (1, 1), (3, 1), (4, 1), (4, 1)] print('操作算子 添加cache的map操作 偶数累加器', even_num_acc) # 3 print('操作算子 添加cache的map操作 奇数累加器', uneven_num_acc) # 2 print('') # 将累加器的值 置零 even_num_acc.value = 0 uneven_num_acc.value = 0 # 操作算子 未添加cache的map操作 map_rdd = rdd1.map(map_func) print(map_rdd.collect()) # [(2, 1), (1, 1), (3, 1), (4, 1), (4, 1)] print('操作算子 未添加cache的map操作 偶数累加器', even_num_acc) # 3 print('操作算子 未添加cache的map操作 奇数累加器', uneven_num_acc) # 2 print(map_rdd.collect()) # [(2, 1), (1, 1), (3, 1), (4, 1), (4, 1)] print('操作算子 未添加cache的map操作 偶数累加器', even_num_acc) # 6 print('操作算子 未添加cache的map操作 奇数累加器', uneven_num_acc) # 4
结果:
自定义累加器
实现计算list中每个值出现的次数,用dict表示出来
# -*- coding: utf-8 -*- """ (C) rgc All rights reserved create time '2021/5/30 20:06' Usage: """ # 构建spark from pyspark import AccumulatorParam from pyspark.conf import SparkConf from pyspark.context import SparkContext class MyAccum(AccumulatorParam): def zero(self, value): """ task内部累加操作时的 初始化 默认值 :param value: :return: """ return {} @classmethod def dict_add(cls, a: dict, b: dict) -> dict: """ 用户自定义方法 2个dict的value相加 :param a: :param b: :return: Usage: >>> a = {'c': 2, 'e': 1, 'd': 1} >>> b = {'e': 2, 'f': 2} >>> dict_add(a, b) # {'c': 2, 'e': 3, 'd': 1, 'f': 2} """ b_key_list = list(b.keys()) for k in b_key_list: if k in a: a[k] += b[k] else: a[k] = b[k] return a def addInPlace(self, value1, value2: str or dict) -> dict: """ 实现父类的方法 此方法需要实现 分区间第一次的操作;分区间非第一次的操作;分区内每个task内部的操作 3个部分 才能保证不报错 :param value1: 上一次 累加器的值 :param value2: 这次新增的数据 :return: """ # 此处主要在 Driver端 分区之间第一次进行操作时,这时value1默认为空,所以新的值直接为value2 if value1 == "": print('Driver端第一次操作', f'value1:{value1},value2:{value2}') return value2 # 此处主要在 Driver端 分区之间 非第一次 进行 操作;只有在 分区个数>=2时才执行到此处 # value1是dict类型,如 {'a':1,'b':2} 表示 之前 分区之间进行累加器操作的结果dict # value2也是dict类型,如 {'a':1,'b':2} 表示 最新 分区的累加器的结果dict if isinstance(value2, dict): # rdd 可能会被分割成多份并行计算,所以这里处理当 value2 为某部分 rdd 计算得到的值 value = self.dict_add(value1, value2) print('Driver端非第一次操作', value1, value2, value) return value else: # 此处主要在 Execturo端 每个task内部 进行操作 # value1是dict类型,如 {'a':1,'b':2} # value2是str类型,也就是rdd中每个元素的值;如 'a' 或 'b' 或 'c' # 如果 rdd中的元素在 累加器的 dict类型的值中,则加一;不在 则设置为1 print('Executor', value1, value2) if value1.get(value2) is not None: value1[value2] += 1 else: value1[value2] = 1 # 返回最新的 累加器的值 return value1 conf = SparkConf() # 使用本地模式;且 executor设置为1个方便debug conf.setMaster('local[1]').setAppName('rgc') sc = SparkContext(conf=conf) accum = sc.accumulator("", accum_param=MyAccum()) rdd = sc.parallelize(["a", "b", "a", "c", "e", "d", "c"], 2) # accum.add()操作实际调用的就是 addInPlace(value,x)方法 rdd = rdd.map(lambda x: accum.add(x)) rdd.count() print(accum.value, 'result') assert accum.value == {'a': 2, 'b': 1, 'c': 2, 'e': 1, 'd': 1}
结果:
自定义累加器实现注意点
- 理解 累加器实现机制
- 继承自 AccumulatorParam类,实现其 zero,addInPlace 2个方法
- zero方法 用来设置 Executor端 task内部累加操作时的 初始化 默认值(不是Driver端分区间操作的默认值)
- addInPlace方法 需要实现 分区间第一次的操作;分区间非第一次的操作;分区内每个task内部的操作 3个部分 才能保证不报错
相关链接
- https://blog.csdn.net/qq_41489540/article/details/110003165
- https://blog.csdn.net/zlbingo/article/details/112635574
- https://waterandair.github.io/2018-04-03-pyspark-custom-accumulator
-
14位模数ADC+20位突发累加器的ADI手势识别传感器方案
2020-07-12 23:58:17ADI公司的ADUX1020是集成了14位模数转换器(ADC)和20位突发累加器的高效率光度传感器,采用单点检测改善了应用的可靠性,同时所需元件数更低,因而为系统开发人员降低了设计复杂性和成本; -
16进制累加器-小杉杉作品
2021-10-13 17:01:59方便快捷的工具 -
EDA/PLD中的相位累加器SUM99的VHDL源程序
2020-11-16 09:45:24EDA/PLD中的相位累加器SUM99的VHDL源程序 -
Spark2中使用累加器、注意点以及实现自定义累加器.docx
2019-08-21 12:27:44累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。 -
Spark代码笔记03——自定义排序、自定义分区、累加器、广播变量
2021-01-07 06:01:12一、自定义排序 自定义排序 Spark对简单的数据类型可以直接排序,但是对于一些复杂的条件就需要用自定义排序来实现了 1.第一种定义方法: 用到了隐式转换 package scalaBase.day11 import org.apache.spark.rdd... -
Spark的广播变量和累加器使用方法代码示例
2020-09-30 05:56:44主要介绍了Spark的广播变量和累加器使用方法代码示例,文中介绍了广播变量和累加器的含义,然后通过实例演示了其用法,需要的朋友可以参考下。 -
Counter.zipFPGA累加器
2019-06-01 18:07:38FPGA累加器。 用D触发器(或74LS74)构成的4位二进制计数器(分频器) 1)建立波形文件,对所设计电路进行波形仿真。并记录Q0、Q1、Q2、Q3的状态。 2)对所设计电路进行器件编程。将CLK引脚连接到实验系统的单脉冲输出... -
3.11累加器实验.zip
2020-11-17 16:29:42这个压缩文件包括了单片机实验相关的文档代码等详细资料,主要基于UNO系列和Arduino平台,适合初学者参考学习