Flink SQL 自定义函数指南 - 以读取 GBK 编码的数据库为例
背景介绍
近期我们遇到了一位客户提出的问题:MySQL 建表时,数据库表定义的字符集是 latin1,里面的数据是以 GBK 编码的方式写入的。当 Flink 的 JDBC Connector 在读取此维表时,输出数据的中文出现了乱码现象,如下图:
原因分析
对于 Oceanus 平台而言,内部的数据处理都是以 Unicode 为标准的。对于非 Unicode 的字符集,在 JDBC Connector 读取时,可能会出现各种异常情况,即使 JDBC 连接 URL 参数中指定了characterEncoding
也无法避免中文乱码问题。
对于 MySQL 数据而言,最怕的不是数据乱码,而是变成问号 (????)。通常来讲,如果遇到了全是问号的情况,则数据基本无法还原了;而对于乱码来说,很可能源数据还在,只是编码选错了,通过恰当的解码方式,还是有希望恢复原有的数据。
因此我们需要编写一个 UDF(用户自定义函数),将 JDBC Connector 读到的 Latin1(这里实际上是 GBK)数据进行解码。
首先我们来看一下数据库中的原始数据(首先需要将终端的编码改为 GBK,否则显示的仍然是乱码):
以 id 为 1 的数据为例,这里喵的 GBK 编码是0xDF 0xF7
。
那问题来了,既然 Flink 并没有报类型错误,说明输入输出还是当作字符串看待的,只是字符串没有经过妥善解码。那 Flink 将其读取成了什么呢?我们来写一个 UDF 自定义函数看看。
UDF 编写
对于这种编解码的场景,适合使用 Flink 的标量函数(Scalar Function),即单条记录进,单条记录出,无需聚合等复杂操作。
在当前的流计算 Oceanus 版本中,已经支持通过CREATE TEMPORARY SYSTEM FUNCTION
的方式来 声明 UDF。声明 UDF 后,在 程序包管理 界面,可以上传具体的实现类 JAR 包。
我们先编写一个打印出 String 里每个 Char 内容的函数,类名为DecodeLatin1
.
初步代码
请先在 pom.xml 中引入 Flink 相关依赖,随后可以开始编写 UDF:
package com.tencent.cloud.oceanus.udf;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.stream.IntStream;
public class DecodeLatin1 extends ScalarFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);
public String eval(String input) {
char[] inputCharArray = input.toCharArray(); // 不能用 getBytes() 方法, 否则原始内容会被再次编码
IntStream.range(0, inputCharArray.length).forEach(i -> {
LOGGER.info("{}", Integer.toHexString(inputCharArray[i]));
});
return input;
}
}
编写完成并打包后,可以将程序包上传(对于自建的 Flink 集群,则是放入 Flink 的 lib 目录):
随后可以在 SQL 代码中,引用这个程序包:
作业提交运行后,我们可以尝试读取 id=1 的数据,发现打印出来的日志里,字符串中实际上保留了原始字符的 GBK 编码,只是没有经过妥善解码,导致输出时误当作 Unicode 处理了。
另外还注意到,对于原始 Latin1 而言,每个字符占 1 个字节,而这里 Java String 中使用的是 Char 结构,每个字符占了 2 个字节,且高位字节恒为 0. 此猜想在 这篇 MySQL 官方文档 中得到了验证。
那么给我们的启示是:可以直接将 char[] 数组转为等长的 byte[] 数组,而不能按照传统思路,创建一个长度为 char[] 数组两倍的 byte[] 数组。
改版后的代码
按照上面的思路,我们重新实现了一版,该版本可以实现解码并重新生成正确 String。
package com.tencent.cloud.oceanus.udf;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.stream.IntStream;
/**
* 如果 JDBC 数据库的 VARCHAR 为 Latin1 (或 GBK 等) 编码
* 可以使用这个函数转换为标准字符串
*
* SQL 代码声明方式:
* CREATE TEMPORARY SYSTEM FUNCTION DECODE_LATIN1 AS 'com.tencent.cloud.oceanus.udf.DecodeLatin1' LANGUAGE JAVA;
*/
public class DecodeLatin1 extends ScalarFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);
public String eval(String input) {
return eval(input, "latin1");
}
public String eval(String input, String fromCharset) {
char[] inputCharArray = input.toCharArray();
// JDBC Driver 读取的 Latin1 字符, 高 8 位都是 0x00, 因此只考虑低 8 位即可, byte 和 char 数据部分等长, 长度无需乘以二
byte[] inputBytes = new byte[inputCharArray.length];
IntStream.range(0, inputCharArray.length).forEach(i -> {
inputBytes[i] = (byte) inputCharArray[i];
LOGGER.debug("{}", String.format("0x%02X ", inputBytes[i]));
});
try {
return new String(inputBytes, fromCharset);
} catch (UnsupportedEncodingException e) {
// 与 GET_JSON_OBJECT 的异常处理方式保持一致, 遇到异常数据时输出 null, 避免日志过量打印
LOGGER.debug("Unsupported charset {} for input string {}", fromCharset, input, e);
return null;
}
}
}
上传新版的 UDF,然后再次运行(注意本次增加了一个新字段FromCharset
,表示解码使用的实际字符集):
然后我们再读取数据库中 id 为 1 的数据,现在输出就正常了:
总结
在遇到数据乱码等原生 Flink 无法轻易解决的问题时,可以尝试自定义函数来定位和排查,一旦确认问题根源,可以同样使用自定义函数来对数据进行校正。大大扩展了 Flink SQL 的功能。
另外,程序包可以分版本在不同的作业之间复用,基础包(UDF)和业务包(调用 UDF 的主程序)可以实现解耦。如果有更优化的实现,可以只更新基础包,避免对业务包的改动引入的风险。
- PYTHON黑帽编程 4.1 SNIFFER(嗅探器)之数据捕获--补充
- es 5 数组reduce方法记忆
- CSS3与动画有关的属性transition、animation、transform对比
- 总结CSS3新特性(Transiton篇)
- 【实战】MS14-068域权限提升漏洞总结
- 总结CSS3新特性(Transform篇)
- Python 黑帽编程 4.2 Sniffer之数据本地存储和加载
- 老司机教你下载tumblr上视频和图片的正确姿势
- 总结CSS3新特性(媒体查询篇)
- 总结CSS3新特性(选择器篇)
- python无线网络安全入门案例【翻译】
- 总结CSS3新特性(颜色篇)
- RedTigers Hackit SQL 注入题解
- 【翻译】旧技术成就新勒索软件,Petya添加蠕虫特性
- MySQL 教程
- MySQL 安装
- MySQL 管理与配置
- MySQL PHP 语法
- MySQL 连接
- MySQL 创建数据库
- MySQL 删除数据库
- MySQL 选择数据库
- MySQL 数据类型
- MySQL 创建数据表
- MySQL 删除数据表
- MySQL 插入数据
- MySQL 查询数据
- MySQL where 子句
- MySQL UPDATE 查询
- MySQL DELETE 语句
- MySQL LIKE 子句
- mysql order by
- Mysql Join的使用
- MySQL NULL 值处理
- MySQL 正则表达式
- MySQL 事务
- MySQL ALTER命令
- MySQL 索引
- MySQL 临时表
- MySQL 复制表
- 查看MySQL 元数据
- MySQL 序列 AUTO_INCREMENT
- MySQL 处理重复数据
- MySQL 及 SQL 注入
- MySQL 导出数据
- MySQL 导入数据
- MYSQL 函数大全
- MySQL Group By 实例讲解
- MySQL Max()函数实例讲解
- mysql count函数实例
- MYSQL UNION和UNION ALL实例
- MySQL IN 用法
- MySQL between and 实例讲解
- MongoDB中的CURD操作
- 高可用的Redis主从复制集群,从理论到实践
- SpringBoot实战(一):使用Lombok简化你的代码
- Kubernetes Ingress入门指南和实践练习
- [译]Go语言常用文件操作汇总
- Redis常用数据类型对应的数据结构
- 详解卷积中的Winograd加速算法
- SpringMVC源码学习(一) - DispatcherSerlet和相关组件
- SpringMVC源码学习(二) - DispatcherServlet和相关组件
- 微服务使用 Hystrix 实现服务降级
- SpringMVC源码学习(三) - 请求处理的流程
- Hadoop框架:集群模式下分布式环境搭建
- 微服务 Hystrix 实现服务熔断
- 微服务 Gateway 的基本配置
- 有赞营销逆向域的探索与实践