flink教程-详解flink 1.11 中的JDBC Catalog
- 背景
- 示例
- 源码解析
- AbstractJdbcCatalog
- PostgresCatalog
背景
1.11.0 之前,用户如果依赖 Flink 的 source/sink 读写关系型数据库或读取 changelog 时,必须要手动创建对应的 schema。但是这样会有一个问题,当数据库中的 schema 发生变化时,也需要手动更新对应的 Flink 任务以保持类型匹配,任何不匹配都会造成运行时报错使作业失败。这个操作冗余且繁琐,体验极差。
实际上对于任何和 Flink 连接的外部系统都可能有类似的上述问题,在 1.11.0 中重点解决了和关系型数据库对接的这个问题。提供了 JDBC catalog 的基础接口以及 Postgres catalog 的实现,这样方便后续实现与其它类型的关系型数据库的对接。
1.11.0 版本后,用户使用 Flink SQL 时可以自动获取表的 schema 而不再需要输入 DDL。除此之外,任何 schema 不匹配的错误都会在编译阶段提前进行检查报错,避免了之前运行时报错造成的作业失败。
示例
目前对于jdbc catalog,flink仅提供了postgres catalog,我们基于postgres的catalog讲解一下如何使用flink的catalog ,
- 引入pom
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
</dependency>
- 新建PostgresCatalog 目前flink通过一个静态类来创建相相应的jdbc catalog,对于PostgresCatalog,没有提供public类型的构造方法。
通过JdbcCatalogUtils.createCatalog构造PostgresCatalog时这五个参数都是必填项,其中baseUrl要求是不能带有数据库名的
String catalogName = "mycatalog";
String defaultDatabase = "postgres";
String username = "postgres";
String pwd = "postgres";
String baseUrl = "jdbc:postgresql://localhost:5432/";
PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
catalogName,
defaultDatabase,
username,
pwd,
baseUrl);
访问postgres 数据库指定表名的时候完整的路径名应该是以下格式:
<catalog>.<db>.`<schema.table>`
其中schema默认是public,如果是使用缺省值,public是可以省略的。比如下面的查询语句:
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
如果非缺省schema,则不能被省略:
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
- 常见操作
我们PostgresCatalog将注册到StreamTableEnvironment 的变量tEnv中,然后就可以用tEnv进行一些操作了。
tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
tEnv.useCatalog(postgresCatalog.getName());
- 列出来所有的数据库:
System.out.println("list databases :");
String[] databases = tEnv.listDatabases();
Stream.of(databases).forEach(System.out::println);
- 列出来所有的table
tEnv.useDatabase(defaultDatabase);
System.out.println("list tables :");
String[] tables = tEnv.listTables(); // 也可以使用 postgresCatalog.listTables(defaultDatabase);
Stream.of(tables).forEach(System.out::println);
- 列出所有函数
System.out.println("list functions :");
String[] functions = tEnv.listFunctions();
Stream.of(functions).forEach(System.out::println);
- 获取table的schema
CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
defaultDatabase,
"table1"));
TableSchema tableSchema = catalogBaseTable.getSchema();
System.out.println("tableSchema --------------------- :");
System.out.println(tableSchema);
- 查询表的数据
List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
.execute()
.collect());
results.stream().forEach(System.out::println);
- 插入数据
tEnv.executeSql("insert into table1 values (3,'c')");
完整的代码请参考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java
源码解析
AbstractJdbcCatalog
这个类主要是对jdbc catalog一些公共的操作做了抽象.目前实现了实际功能的只有一个方法:getPrimaryKey,其他方式主要是对于Catalog的一些其他实现类做了特殊处理,比如类似create table 或者 alter table是不支持的,listView只是返回一个空列表,因为我们使用jdbc catalog主要是来做一些DML操作。
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
PostgresCatalog
在这里面,主要是实现了一些常用的操作数据库的方法,比如getTable、listTables、listDatabases等等,其实简单的来说就是从postgres元数据库里查询出来相应的信息,然后组装成flink的相关对象,返回给调用方。以一个简单的方法listDatabases为例:
从元数据表pg_database中查询所有的tablename,然后去掉内置的数据库,也就是template0和template1,然后封装到一个list对象里,返回。
@Override
public List<String> listDatabases() throws CatalogException {
List<String> pgDatabases = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String dbName = rs.getString(1);
if (!builtinDatabases.contains(dbName)) {
pgDatabases.add(rs.getString(1));
}
}
return pgDatabases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
}
有不兼容的地方需要做一些转换,比如getTable方法,有些数据类型是不匹配的,要做一些类型的匹配,如postgres里面的serial和int4都会转成flink的int类型,具体的参考下PostgresCatalog#fromJDBCType方法。
private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
String pgType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
case PG_BOOLEAN_ARRAY:
return DataTypes.ARRAY(DataTypes.BOOLEAN());
case PG_BYTEA:
return DataTypes.BYTES();
.........................
参考资料: [1].https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
- WordPress 标签页面只有一篇文章时自动跳转到该文章
- OS X 上使用.NET开发应用程序
- [C#6] 2-nameof 运算符
- Key-Value Coding(KVC),Key-Value Observing(KVO)和Cocoa Bindings for MonoMac
- [C#6] 7-索引初始化器
- jquery mobile 移动web(3)
- 卷积神经网络详解(二)——自己手写一个卷积神经网络
- VS 2010 SP1的一个功能(添加可部署依赖项)
- 一组扁平化组件推荐下载(PSD 格式)
- [C#6] 6-表达式形式的成员函数
- 在启用了IPV6的机器上获取客户端ipv4地址
- 使用 MDT 2010 进行可伸缩部署
- 性能优化工具 MVC Mini Profiler
- 在ASP.NET应用启动的时候初始化的几种方法
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 给IMX6ULL编译鸿蒙Liteos-a
- 鸿蒙源码下载并编译
- g2o、Eigen、Mat矩阵类型转换
- 相见恨晚!OLAP数仓基础入门大全
- Activiti7 流程变量(UEL-Value方式)
- 给IE9及其以下等不支持classList属性的浏览器,添加classList属性
- Docker使用手册 嵌入式Linux环境搭建
- Activiti7 流程部署
- Activiti7 启动流程实例
- linux文本处理工具及正则表达式
- linux目录结构及文件管理
- centos7-httpd虚拟主机
- k8s1.13.0二进制部署-node节点(四)
- k8s1.13.0二进制部署-flannel网络(二)
- k8s1.13.0二进制部署-master节点(三)