spark分区与任务切分
我们都知道在spark中,RDD是其基本的抽象数据集,其中每个RDD由多个Partition组成。在job的运行期间,参与运算的Parttion数据分布在多台机器中,进行并行计算,所以分区是计算大数据量的措施。
分区数越多越好吗?
不是的,分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。
分区太少有什么影响?
分区数太少的话,会导致一些结点没有分配到任务;另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。分区的目的就是要避免存在单任务处理时间过长。
合理的分区数是多少?如何设置?
总核数=executor-cores * num-executor?
一般合理的分区数设置为总核数的2~3倍
分区数就是任务数吗?
一般来说任务数对应为分区数量,默认情况下为每一个HDFS分区创建一个分区,默认为128MB,但如果文件中的行太长(比块大小更长),则分区将会更少。RDD创建与HDFS分区一致数量的分区。
当使用textFile压缩文件(file.txt.gz不是file.txt或类似的)时,Spark禁用拆分,这使得只有1个分区的RDD(因为对gzip文件的读取无法并行化)。在这种情况下,要更改应该重新分区的分区数
但有时候你需要为你的应用程序,调整分区的大小,或者使用另一种分区方案。
设置多大分区数 ?
Spark只能为RDD的每个分区运行1个并发任务,最多可以为集群中的核心数量。因此,如果您有一个包含50个内核的群集,您希望您的RDD至少有50个分区(可能是该分区的2-3倍)。
此外,分区数决定了将RDD保存到文件的操作生成的文件数。
划分RDD:repartition
repartition(numPartitions: Int)
rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)
请注意,Spark禁用拆分压缩文件,并创建只有1个分区的RDD。在这种情况下,使用sc.textFile('demo.gz')和重新分区是有帮助的,rdd.repartition(100)
rdd.repartition(N)做一个shuffle分割数据来匹配N
划分RDD:coalesce
coalesce(numPartitions: Int, shuffle: Boolean = false)
该coalesce转变是用来改变分区的数量。它可以根据标志触发RDD混洗shuffle(默认情况下禁用,即false)。
shuffle = true 和repartition是一致的。
分区的3种方式
1.HashPartitioner
val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new HashPartitioner(3))
HashPartitioner确定分区的方式:partition = key.hashCode () % numPartitions
2.RangePartitioner
val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new RangePartitioner(3,counts))
RangePartitioner会对key值进行排序,然后将key值被划分成3份key值集合。 3.CustomPartitioner
val counts = sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))
class CustomPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int =
{
if(key==1)){
0
}else if
(key==2){
1}else{ 2 }} override def equals(AcadGild: Any): Boolean = AcadGild match { case test: CustomPartitioner => test.numPartitions == numPartitions case _ => false }}
CustomPartitioner可以根据自己具体的应用需求,自定义分区。
- Java XML解析工具 dom4j介绍及使用实例
- redis学习教程之一基本命令
- 在java中使用redis
- springmvc学习笔记--json--返回json的日期格式问题
- springmvc学习笔记--mybatis--使用插件自动生成实体和mapper
- velocity的一些用法
- String.split()用法以及特殊分隔符注意,ps:|
- Mybatis在idea中错误:Invalid bound statement (not found)
- java字符编码和oracle乱码
- ehcache报错
- java继承覆盖与向上转型,权限
- java变量的加载顺序
- java类的初始化和对象的创建顺序
- 附近的人位置距离计算方法
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法