Spark UDF实现demo
Spark UDF实现demo
1 前言
使用Spark开发代码过程时,很多时候当前库中的算子不能满足业务需求。此时,UDFs(user defined functions) 派上非常大的作用。基于DataFrame(或者DataSet) 的Java(或Python、Scale) 可以轻松的定义注册UDF,但是想在SQL(SparkSQL、Hive) 中自定义或者想共用就遇到困难。这时,可以先按照一定规约自定义函数,再向Spark(或Hive)注册为永久函数,实现在Spark和Hive共享UDF的目的。
2 具体实现
根据官网1,可以知道,要实现UDF,至少需要继承UDAF、AbstractGenericUDAFResolver、GenericUDF、 GenericUDTF、UserDefinedAggregateFunction中的一个。如下已继承UDF为列进行说明:</br>
整体的实现包括两部:
- 继承父类开发UDF
- 注册UDF
2.1 继承父类开发UDF
2.1.1 基于java实现2
maven工程的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sogo</groupId>
<artifactId>sparkudf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spark.version>2.3.1</spark.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.16</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.4</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
一个类实现一个evaluate方法,定义一个UDF</br>
类中的main仅用于测试,打包前请先注解掉</br>
StringLengthUdf.java
package com.sogo.sparkudf.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan@sogou-inc.com
* @Date: 2020/7/26
* @Time: 23:54
* @des:
*/
public class StringLengthUdf extends UDF {
// 默认调用 "evaluate" 方法
public int evaluate(String str) {
if (null == str) {
return 0;
} else {
return str.length();
}
}
// public static void main(String[] args) {
// StringLengthUdf stringLengthUdf = new StringLengthUdf();
// String str = "test";
// System.out.println("out:" + stringLengthUdf.evaluate(str));
// }
}
UDF输入多个参数 StringContainUdf.java
package com.sogo.sparkudf.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.io.Serializable;
public class StringContainUdf extends UDF implements Serializable {
// 修改evaluate的形参,满足UDF不同输入参数及类型的场景
public Boolean evaluate(String s1, String s2) {
if (null == s1 || null == s2) {
return false;
} else return s1.contains(s2);
}
}
2. 注册UDF
2.1 语法
2.1.1 通用语法
CREATE [ OR REPLACE ] [ TEMPORARY ] FUNCTION [ IF NOT EXISTS ] function_name AS class_name [ resource_locations ]
2.1.2 基于jar的语法
CREATE OR REPLACE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf' USING JAR 'file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar';
2.2 注册
实验发现,在SparkSQL中注册的UDF需要在Hive客户端再次启动时生效;而在Hive中注册的UDF立即在SparkSQL中生效。
有时明明注册了UDF,客户端也重新连接了,但依然找不到UDF,可能是不在同一数据库,这点也需要重点关注下。</br>
2.2.1 查看已注册的functions
# 查看已注册的function(hive、SparkSQL)
show functions;
## 查看已注册的UDF(SparkSQL)
show user functions;
2.2.2 在Hive中注册
# 进入hive环境(若没有指定数据库,UDF将归当前数据库所有)
> hive
# 添加jar包
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
# 注册为临时UDF
CREATE TEMPORARY FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 注册为永久UDF
CREATE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 更新永久UDF(这种方法在hive中不可用)
CREATE OR REPLACE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 不更新,类似追加的方式
CREATE FUNCTION IF NOT EXISTS strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
每次添加显得麻烦,我们可以把将注册语句写入脚本,在进入hive前让它初始化。我们在配置SparkSQL时将这样做。
2.2.3 在SparkSQL中注册
在SparkSQL中,可以采用在Hive中注册的方法。下面采用初始化配置sql脚本的方式说明。</br>
.bashrc配置
alias spark_sql="/opt/spark/bin/spark-sql
--master yarn
--deploy-mode client
--driver-memory 4G
--executor-memory 10G
--num-executors 80
--executor-cores 4
--name 'pyspark_cluster_lzx'
--queue adx_online
--database bigdata_lzx
--conf spark.dynamicAllocation.minExecutors=40
--conf spark.dynamicAllocation.maxExecutors=80
--conf spark.default.parallelism=1200
--conf spark.sql.shuffle.partitions=1200
--conf spark.eventLog.enabled=true
--conf spark.sql.autoBroadcastJoinThreshold=104857600
--hiveconf spark.hadoop.hive.cli.print.current.db=true
--hiveconf spark.hadoop.hive.cli.print.header=true
--hiveconf spark.hadoop.hive.resultset.use.unique.column.names=false
--jars file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
-i /search/work/bigdata/liuzhixuan/sparkudf/spark_udf.sql"
注:--jars参数添加UDF的java实现到集群</br>
-i参数为预执行的代码</br>
spark_udf.sql
CREATE OR REPLACE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf' USING JAR 'file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar';
执行
> spark_sql
> show user functions;
结果
spark-sql (default)> show user functions;
function
bigdata_lzx.strlen_udf_int
Time taken: 0.549 seconds, Fetched 1 row(s)
spark-sql (default)> select strlen_udf_int("liu");
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
Added [file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar]
bigdata_lzx.strlen_udf_int(liu)
3
参考文献:
- 现代Web开发需要学习的15大技术
- 《剑指Offer》附加题_用两个队列实现一个栈_C++版
- MFC中的CListCtrl的最左边一列必须左对齐吗?
- 查找数组中重复的数字
- Lua学习笔记
- 使用Jpeglib
- 赋值运算符函数__from <剑指Offer>
- 从static变量导出问题解析 __declspec(dllexport) 和 __declspec(dllimport)的作用
- php实现SESSION跨域
- 使用cJSON解析JSON字符串
- 逻辑回归 | TensorFlow深度学习笔记
- MakeSureDirectoryPathExists与CreateDirectory的区别
- 粗略的物体碰撞预测及检测
- 讨厌算法的程序员 1 | 插入排序
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法