一文搞懂Flink生成StreamGraph
时间:2022-07-25
本文章向大家介绍一文搞懂Flink生成StreamGraph,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1.前言
通过一文搞懂这一系列的文章,我们已经知道了,Flink 作业的提交过程:
这篇文章主要聚焦在
我们以简单的代码为例
/**
* @author shengjk1
* @date 2018/11/23
*/
public class FlinkJava8Demo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5);
source.flatMap((Integer number, Collector<String> out)->{
StringBuilder builder = new StringBuilder();
for (int i = 0; i < number; i++) {
builder.append("a");
out.collect(builder.toString());
}
}).returns(Types.STRING).print();
source.map(i-> Tuple2.of(i,i))
.returns(Types.TUPLE(Types.INT,Types.INT))
.print();
env.execute("aa");
}
}
2.FlatMap 的转化
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
flatMap 是 DataStream 的一个方法或者就是我们常数的算子,而 StreamFlatMap 其实才是 StreamOperator
transform 方法
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
// DataStream 的一个子类
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//添加 operator,成为 StreamGraph 的一个 operator
getExecutionEnvironment().addOperator(resultTransform);
// 返回 stream,供下游继续操作
return returnStream;
}
像 filter、map等都会进行类似的操作
- Netty粘包拆包解决方案
- 为什么要用 Node.js
- JavaScript定时器:setTimeout与setInterval 定时器与异步循环数组
- 深入理解javascript原型和闭包(1)——一切都是对象
- 程序员面试50题(2)—二元查找树的后序遍历结果[数据结构]
- 总结了一些指针易出错的常见问题(一)
- Eureka 服务上下线监控
- 程序员面试50题(1)—查找最小的k个元素[算法]
- Netty4自带编解码器详解
- C和指针小结(C/C++程序设计)
- Netty-整合Protobuf高性能数据传输
- Netty-整合kryo高性能数据传输
- 40个重要的HTML 5面试问题及答案
- js调用原生API--陀螺仪和加速器
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- rxjs里scan operator的执行研究
- rxjs pipe和map组合的一个实际例子的单步调试
- Win10+Python2.7.14+cocos2d-x-3.17.2+VS2017环境搭建
- VUE-001-在表格单元格(el-table-column)中添加超链接访问
- 关于vue的title标签中出现的htmlWebpackPlugin.options.title
- dotnet tool 工具安装提示 Could not find a part of the path 安装失败
- dotnet core 进行 XML 序列化抛出 XmlSerializers dll 文件找不到
- C# dotnet 高性能多线程工具 ExecuteOnceAwaiter 只执行一次的任务
- 一道Postgresql递归树题
- 突击并发编程JUC系列-JDK1.8 扩展类型 LongAdder
- 利用tensorflow训练简单的生成对抗网络GAN
- 《Java从入门到失业》第三章:基础语法及基本程序结构(3.7):运算符(基本算数运算符、原码、反码、补码)
- 《Java从入门到失业》第三章:基础语法及基本程序结构(3.6):基本数据类型及字符集编码(字符编码和char型)
- 《Java从入门到失业》第三章:基础语法及基本程序结构(3.6):基本数据类型及字符集编码(整型、浮点型、布尔型)
- 多图详解Spring框架的设计理念与设计模式