textFile构建RDD的分区及compute计算策略
1,textFile
A),第一点,就是输入格式,key,value类型及并行度的意义。
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
//输入文件的格式TextInputFormat,key的类型LongWritable ,value的类型Text
//最小分区数defaultMinPartitions
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
并行度
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
真正意义是啥?实际是决定我们goalSize的值。并不决定我们的分区数。
B),hadoopRDD的getPartition方法。
主要是获取分片的过程通过调用FileInputFormat.getSplits方法来实现分片。主要有一下几个步骤:
1) ,获取所有 FileStatus
FileStatus[] files = listStatus(job);
ListStatus方法里面:
1,判断是否需要递归
boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
2,接着是创建路径过滤器,筛选掉一些我们不需要的文件,入以_,.开头的
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
3,根据mapreduce.input.fileinputformat.list-status.num-threads决定是并发还是单线程
FileStatus[] result;
int numThreads = job
.getInt(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
Stopwatch sw = new Stopwatch().start();
if (numThreads == 1) {
List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive);
result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job, dirs, recursive, inputFilter, false);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Iterables.toArray(locatedFiles, FileStatus.class);
}
2) ,获取目标分片goalsize和最小minsize
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
3) ,判断文件是否支持切分,不压缩或者压缩方式为BZip2Codec支持切分
protected boolean isSplitable(FileSystem fs, Path file) {
final CompressionCodec codec = compressionCodecs.getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
支持切分就进行切分分片,切分分片大小为
Math.max(minSize, Math.min(maxSize, blockSize));
不支持切分的话就直接返回一个文件一个分片
最终,用InputSplit构建HadoopPartition
C),接着进入compute方法
重点掌握根据指定分片获取reader
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
实际上是在TextInputFormat构建了
new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
还有就是识别不同系统的过程,比如hdfs ,本地file,tachyon。
final FileSystem fs = file.getFileSystem(job);
里面会根据uri获取scheme,然后构建为"fs." + scheme + ".impl" 通过反射的到相应的对象。
clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
类加载器为Configuration对象里面初始化的
private ClassLoader classLoader;
{
classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = Configuration.class.getClassLoader();
}
}
而此,configuration对象是在compute方法中通过jobConf = getJobConf()获得的实际是
从Driver端发送过来的。
val conf: Configuration = broadcastedConf.value.value
由此可以得到结论是tachyon使用是依赖,必须方法系统类加载器的Classpath中去
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- [Oracle 11g RAC安装]RAC环境搭建
- [Oracle 11g RAC安装]Grid安装
- [Oracle 11g RAC安装]Oracle安装
- [Oracle 11g RAC安装]UDEV设置
- Oracle参数(Undo_Retention)
- Oracle基本概念(Undo空间)
- [AWR报告]Buffer Hit %
- [AWR报告]Library Hit %
- [Oracle 9i安装]Redhat 4.8的配置
- [Oracle 9i安装]Oracle软件的安装
- [AWR报告]Latch Hit %
- [Python运维]Python3.6的安装
- [Python运维]cx_Oracle模块的安装
- C#中抽象类与抽象方法的作用与实例
- C++ 基础扫盲(1)