您当前的位置:首页 > 电脑百科 > 站长技术 > 服务器

在Apache Spark中执行聚合的五种方法

时间:2020-07-24 10:31:48  来源:  作者:
在Apache Spark中执行聚合的五种方法

 

聚合是数据分析任务中广泛使用的运算符,Spark为此提供了坚实的框架。 以下是使用Spark可以针对大数据进行聚合的五种不同方式。

RDD上的GroupByKey或ReduceByKey转换:RDD是Spark中分布式数据收集的最早表示,其中数据通过" T"类型的任意JAVA对象表示。 RDD上的聚合与map-reduce框架中的reduce概念相似,在reduce中,reduce函数(作用于两个输入记录以生成聚合记录)是聚合的关键。 使用RDD时,聚合可以通过GroupByKey或ReduceByKey转换来执行,但是,这些转换仅限于Pair RDD(元组对象的集合,每个元组都由类型为" K"的键对象和类型为" V"的值对象组成) 。

在通过GroupByKey进行聚合的情况下,转换会导致元组对象具有键对象和针对该键对象的所有值对象的集合。 因此,之后需要应用一个映射器(通过map,maptoPair或mAppartitions进行映射转换),以便将每个Tuple对象的值对象的集合减少为一个聚合的值对象。

在Apache Spark中执行聚合的五种方法

> Aggregation on a Pair RDD (with 2 partitions) via GroupByKey followed via either of map, maptopair

 

映射程序(例如map,maptoPair和mappartitions转换)包含聚合函数,以将类型为" V"的值对象的集合减少为类型为" U"的聚合对象。 聚合函数可以是任意函数,不需要遵循关联性或交换性状。 GroupByKey转换具有三种风格,它们因应用GroupByKey转换而在RDD的分区规范上有所不同。 GroupByKey可以总结为:

GroupByKey (PairRDD<K,V>) => PairRDD<K,Iterator<V>> Map (PairRDD<K,Iterator<V>>) => PairRDD<K,U>

如果通过ReduceByKey进行聚合,则转换将直接导致具有键对象和针对该键对象的聚合对象的元组对象。 与GroupByKey一样,在ReduceByKey之后不需要映射器。 ReduceByKey转换采用关联和可交换的聚合函数,以便在跨分区聚合记录之前,可以在本地聚合位于同一分区的记录。 同样,聚合函数接受两个说类型为" V"的值对象,并返回一个类型为" V"的对象。 与GroupByKey相似,ReduceByKey转换也具有三种风格,它们的区别在于通过应用ReduceByKey转换而导致的RDD分区规范。 ReduceByKey可以总结为:

ReduceByKey(PairRDD<K,V>, Function<V,V,V>) => PairRDD<K,V>

在GroupByKey和ReduceByKey中,前者更通用,可以与任何聚合函数一起使用,而后者则更有效,但仅适用于前面所述的一类聚合函数。

RDD或数据集上的Mappartitions:如先前博客中所述,Mappartitions是功能强大的窄转换之一,可在RDD和Dataset(Spark中的数据表示)上使用,以明智地执行各种操作。 这样的操作之一也包括聚合。 但是,唯一需要满足的条件是,属于相同分组关键字的记录应位于单个分区中。 在涉及分组密钥的混排操作中实现的RDD或数据集(要聚合)中可以隐式满足此条件。 同样,可以通过首先基于分组密钥对RDD或数据集进行重新分区来明确实现该条件。

在用于典型聚合流的mappartitions内,必须首先实例化一个Hashmap,将Hashmap与相应的分组键相对应地存储聚合的Value Objects。 然后,在迭代基础分区的数据收集时,将重复更新此Hashmap。 最后,返回包含在映射中的聚合值/对象(可选以及关联的分组键)的迭代器。

由于基于Mappartitions的聚合涉及将Hashmap保留在内存中以保存键和聚合的Value对象,因此,如果大量唯一分组键驻留在基础分区中,则Hashmap将需要大量堆内存,因此可能导致 相应执行程序的内存不足终止的风险。 从此以后,不应该歪曲跨分区的分组密钥分配,否则会由于过度提供执行程序内存来处理偏斜而导致执行程序内存浪费。 此外,由于需要基于堆内存的聚合哈希图,因此与Spark中的专用聚合运算符相比,对内存的相对内存分配更多,但是如果内存不是约束,则基于Mappartitions的聚合可以提供良好的性能提升。

用于数据帧或数据集的UDAF:与上述方法不同,UDAF基于聚合缓冲区的概念以及在此缓冲区上运行的一组方法来实现聚合。

在Apache Spark中执行聚合的五种方法

> Aggregation buffer based aggregation flow in Spark (for Datasets and Dataframe)

 

到目前为止,UDAF是为Spark中的分布式数据收集的Dataframe或Dataset表示编写聚合逻辑的最常用方法。 UDAF在数据收集的无类型视图上工作,在该视图中,数据记录被视为(表的)一行,其架构定义了该行中每一列的类型和可空性。 通过扩展包" org.Apache.spark.sql.expressions"中存在的" UserDefinedAggregationFunction"类并覆盖基类中以下方法的实现,可以在Spark中创建UDAF:

/*Return schema for input column(s) to the UDAF, schema being built using StructType*/
=> public StructType inputSchema()
/*Return schema of aggregation buffer, schema being built using StructType */
=> public StructType bufferSchema()
/*DataType of final aggregation result*/
=> public DataType dataType()
/*Initialize aggregation buffer*/
=> public void initialize(MutableAggregationBuffer buffer)
/*Update aggregation buffer for each of the untyped view (Row) of an input object*/
=> public void update(MutableAggregationBuffer buffer, Row row)
/*Update current aggregation buffer with a partially aggregated buffer*/
=> public void merge(MutableAggregationBuffer buffer, Row buffer)
/*Evaluate final aggregation buffer and return the evaluated value of DataType declared earlier */
=> public Object evaluate(Row buffer)

除了覆盖上述方法外,还可以始终声明其他字段(在UDAF构造函数中使用可选的初始化)和自定义UDAF类中的其他方法,以便在覆盖方法中使用它们以实现聚合目标。

在使用UDAF之前,必须先在Spark框架中注册相同的实例:

spark.udf.register('sampleUDAF, new SampleUDAF());

注册后,可以在Spark SQL查询中使用UDAF来聚合整个数据集/数据框或数据集/数据框中的记录组(通过一列或多列分组)。 除了直接在Spark SQL查询中使用外,还可以通过数据框/数据集聚合API(例如" agg")使用UDAF。

UDAF虽然是定义自定义聚合的一种流行方法,但是当在聚合缓冲区中使用复杂的数据类型(数组或映射)时,会遇到性能问题。 这是由于以下事实:在UDAF中的每次更新操作期间,对于复杂的数据类型,将scala数据类型(用户特定)转换为相应的催化剂数据类型(催化剂内部数据类型)(反之亦然)变得非常昂贵。 从内存和计算的角度来看,此成本都更高。

数据集的聚合器:聚合器是对数据集执行聚合的最新方法,类似于UDAF,它也基于聚合缓冲区的概念以及在该缓冲区上运行的一组方法。 但是,聚合器进行聚合的方式称为类型化聚合,因为它涉及对各种类型的对象进行操作/使用各种类型的对象进行操作。 聚合器的输入,聚合缓冲区和最终的聚合输出(从缓冲区派生)都是具有相应Spark编码器的某些类型的对象。 用户可以通过使用为IN定义的类型(输入记录类型)扩展抽象的通用'Aggregator <IN,BUF,OUT>'类(在包'org.apache.spark.sql.expressions中提供)来定义自己的自定义Aggregator。 ,为BUF(聚合缓冲区)定义的类型和为OUT(输出记录类型)定义的类型,以及在基类中重写以下方法的实现:

/* return Encoder for aggregation buffer of type BUF. This is required for buffer ser/deser during shuffling or disk spilling */
=> public Encoder<BUF> bufferEncoder()
/* return Encoder for output object of type OUT after aggregation is performed */
=> public Encoder<OUT> outputEncoder()
/* return updated aggregation buffer object of type BUF after aggregating the existing buffer object of type BUF with the input object of type IN*/
=> public BUF reduce(BUF buffer, IN input) ()
/* return updated aggregation buffer of type BUF after merging two partially aggregated buffer objects of type BUF */
=> public BUF merge(BUF buffer1, BUF buffer2)
/* return output object of type OUT from evaluation of  aggregation buffer of type BUF */
=> public OUT finish(BUF arg0)
/* return buffer object of type BUF after initializing the same */
=> public BUF zero()

由于Aggregator本机支持将聚合缓冲区作为对象,因此它是高效的,并且不需要与从Scala类型转换为催化剂类型(反之亦然)相关的不必要的开销(与UDAF一样)。 同样,聚合器的聚合方式在编写聚合逻辑时提供了更多的灵活性和编程的美感。 聚合器也已集成到无类型聚合流中,以支持SQL,例如即将发布的版本中的查询。

预定义的聚合功能:Spark提供了各种预构建的聚合功能,可用于分布式数据收集的数据框或数据集表示形式。 这些预先构建的函数可以在SPARK SQL查询表达式中使用,也可以与为Dataframe或Dataset定义的聚合API一起使用。 在org.apache.spark.sql包中,所有预先构建的聚合函数都定义为"函数"类的静态方法。 带下划线的链接可以列出所有这些功能的列表。

预定义的聚合函数经过高度优化,在大多数情况下可以直接与Spark tungusten格式一起使用。 因此,如果" functions"类中存在预先构建的聚合函数,则Spark程序员应始终偏向于使用它们。 万一那里没有所需的聚合函数,那么只有一个可以诉诸于编写自定义聚合函数。

如果您对Spark Aggregation框架有更多查询,请随时在评论部分提问。

(本文翻译自Ajay Gupta的文章《Five Ways to Perform Aggregation in Apache Spark》,参考:https://medium.com/swlh/five-ways-to-perform-aggregation-in-apache-spark-1cdf1651ced4)



Tags:Apache Spark   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
1. 目标在本教程中,我们将讨论Apache Spark和Apache Flink之间的比较。Apache spark和Apache Flink都是用于大规模批处理和流处理的开源平台,为分布式计算提供容错和数据分布...【详细内容】
2020-07-31  Tags: Apache Spark  点击:(107)  评论:(0)  加入收藏
聚合是数据分析任务中广泛使用的运算符,Spark为此提供了坚实的框架。 以下是使用Spark可以针对大数据进行聚合的五种不同方式。RDD上的GroupByKey或ReduceByKey转换:RDD是Spa...【详细内容】
2020-07-24  Tags: Apache Spark  点击:(57)  评论:(0)  加入收藏
> Source: Pixabay Apache Spark是一种开放源代码的分布式计算引擎,目前是用于内存中批处理驱动的数据处理的最受欢迎的框架(它还支持实时数据流传输)。 得益于其先进的查询优...【详细内容】
2020-07-22  Tags: Apache Spark  点击:(77)  评论:(0)  加入收藏
Databricks 上周发布了 Apache Spark 3.0,并将此作为新版 Databricks Runtime 7.0 的一部分。3.0.0 版本包含 3400 多个补丁,为 Python 和 SQL 功能带来了重大进展。Spark 是...【详细内容】
2020-06-22  Tags: Apache Spark  点击:(70)  评论:(0)  加入收藏
▌简易百科推荐
阿里云镜像源地址及安装网站地址https://developer.aliyun.com/mirror/centos?spm=a2c6h.13651102.0.0.3e221b111kK44P更新源之前把之前的国外的镜像先备份一下 切换到yumcd...【详细内容】
2021-12-27  干程序那些事    Tags:CentOS7镜像   点击:(1)  评论:(0)  加入收藏
前言在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开 ? 如何实现断线后重...【详细内容】
2021-12-24  程序猿阿嘴  CSDN  Tags:Netty   点击:(12)  评论:(0)  加入收藏
一. 配置yum源在目录 /etc/yum.repos.d/ 下新建文件 google-chrome.repovim /etc/yum.repos.d/google-chrome.repo按i进入编辑模式写入如下内容:[google-chrome]name=googl...【详细内容】
2021-12-23  有云转晴    Tags:chrome   点击:(7)  评论:(0)  加入收藏
一. HTTP gzip压缩,概述 request header中声明Accept-Encoding : gzip,告知服务器客户端接受gzip的数据 response body,同时加入以下header:Content-Encoding: gzip:表明bo...【详细内容】
2021-12-22  java乐园    Tags:gzip压缩   点击:(8)  评论:(0)  加入收藏
yum -y install gcc automake autoconf libtool makeadduser testpasswd testmkdir /tmp/exploitln -s /usr/bin/ping /tmp/exploit/targetexec 3< /tmp/exploit/targetls -...【详细内容】
2021-12-22  SofM    Tags:Centos7   点击:(7)  评论:(0)  加入收藏
Windows操作系统和Linux操作系统有何区别?Windows操作系统:需支付版权费用,(华为云已购买正版版权,在华为云购买云服务器的用户安装系统时无需额外付费),界面化的操作系统对用户使...【详细内容】
2021-12-21  卷毛琴姨    Tags:云服务器   点击:(6)  评论:(0)  加入收藏
参考资料:Hive3.1.2安装指南_厦大数据库实验室博客Hive学习(一) 安装 环境:CentOS 7 + Hadoop3.2 + Hive3.1 - 一个人、一座城 - 博客园1.安装hive1.1下载地址hive镜像路径 ht...【详细内容】
2021-12-20  zebra-08    Tags:Hive   点击:(9)  评论:(0)  加入收藏
以下是服务器安全加固的步骤,本文以腾讯云的CentOS7.7版本为例来介绍,如果你使用的是秘钥登录服务器1-5步骤可以跳过。1、设置复杂密码服务器设置大写、小写、特殊字符、数字...【详细内容】
2021-12-20  网安人    Tags:服务器   点击:(7)  评论:(0)  加入收藏
项目中,遇到了一个问题,就是PDF等文档不能够在线预览,预览时会报错。错误描述浏览器的console中,显示如下错误:nginx代理服务报Mixed Content: The page at ******** was loaded...【详细内容】
2021-12-17  mdong    Tags:Nginx   点击:(7)  评论:(0)  加入收藏
转自: https://kermsite.com/p/wt-ssh/由于格式问题,部分链接、表格可能会失效,若失效请访问原文密码登录 以及 通过密钥实现免密码登录Dec 15, 2021阅读时长: 6 分钟简介Windo...【详细内容】
2021-12-17  LaLiLi    Tags:SSH连接   点击:(16)  评论:(0)  加入收藏
相关文章
    无相关信息
最新更新
栏目热门
栏目头条