找回密码
 立即注册
首页 业界区 安全 Flink和StreamPark自定义UDF函数的使用

Flink和StreamPark自定义UDF函数的使用

汤昕昕 昨天 14:28
本文分享自天翼云开发者社区《Flink和StreamPark自定义UDF函数的使用》,作者:王****帅
1、什么是函数

    在 SQL 中,我们可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是“函数”(functions)。Flink 的 Table API 和 SQL 同样提供了函数的功能。两者在调用时略有不同:Table API 中的函数是通过数据对象的方法调用来实现的;而 SQL 则是直接引用函数名称,传入数据作为参数。例如,要把一个字符串 str 转换成全大写的形式,Table API 的写法是调用 str 这个 String对象的 upperCase()方法:
  1. str.upperCase();
复制代码
    而 SQL 中的写法就是直接引用 UPPER()函数,将 str 作为参数传入:
  1. UPPER(str)
复制代码
    由于 Table API 是内嵌在 Java 语言中的,很多方法需要在类中额外添加,因此扩展功能比较麻烦,目前支持的函数比较少;而且 Table API 也不如 SQL 的通用性强,所以一般情况下较少使用。下面我们主要介绍 Flink SQL 中函数的使用。Flink SQL 中的函数可以分为两类:一类是 SQL 中内置的系统函数,直接通过函数名调用就可以,能够实现一些常用的转换操作,比如之前我们用到的 COUNT()、CHAR_LENGTH()、UPPER()等等;而另一类函数则是用户自定义的函数(UDF),需要在表环境中注册才能使用。
2、什么是自定义UDF函数

    系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。事实上,系统内置函数仍然在不断扩充,如果我们认为自己实现的自定义函数足够通用、应用非常广泛,也可以在项目跟踪工具 JIRA 上向 Flink 开发团队提出“议题”(issue),请求将新的函数添加到系统函数中。
2.1 编写自定义UDF函数

    自定义一个ScalarFunction,传入一个String类型的参数,输出这个参数的hashCode
  1. public class HashScalarFunction extends ScalarFunction {
  2.     public String eval(String str){
  3.         return String.valueOf(str.hashCode());
  4.     }
  5. }
复制代码
2.2 在代码中以SQL方式使用UDF函数

2.2.1 读取mysql数据使用UDF函数转换并输出到控制台
  1. package cn.ctyun.demo.flinksql;
  2. import cn.ctyun.demo.flinksql.udf.HashScalarFunction;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.Table;
  5. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  6. /**
  7. * @Date 2023/4/14 14:38
  8. * @Description 读取mysql数据使用UDF函数转换并输出到控制台
  9. */
  10. public class FlinkSqlUdfMysql2Print {
  11.     public static void main(String[] args) {
  12.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13.         env.setParallelism(1);
  14.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  15.         // 1. 创建读取表,使用mysql进行
  16.         String source_ddl = "CREATE TABLE UserSource (" +
  17.                 " id INT, " +
  18.                 " name VARCHAR, " +
  19.                 " phone VARCHAR, " +
  20.                 " sex INT " +
  21.                 ") WITH (" +
  22.                 " 'connector.type' = 'jdbc', " +
  23.                 " 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_source?useSSL=false', " +
  24.                 " 'connector.table' = 'test_user_table', " +
  25.                 " 'connector.username' = 'root', " +
  26.                 " 'connector.password' = '******'" +
  27.                 ")";
  28.         tableEnv.executeSql(source_ddl);
  29.         // 3. 注册自定义标量函数
  30.         tableEnv.createTemporarySystemFunction("MyHash", HashScalarFunction.class);
  31.         // 4. 调用UDF查询转换
  32.         Table resultTable = tableEnv.sqlQuery("select id, name, phone, sex, MyHash(name) as name_hash from UserSource");
  33.         // 5. 输出到控制台
  34.         tableEnv.executeSql("create table output (" +
  35.                 "id INT, " +
  36.                 "name STRING, " +
  37.                 "phone STRING, " +
  38.                 "sex INT, " +
  39.                 "name_hash STRING ) " +
  40.                 "WITH (" +
  41.                 "'connector' = 'print')");
  42.         resultTable.executeInsert("output");
  43.     }
  44. }
复制代码
2.2.2 读取mysql数据使用UDF函数转换并输出到mysql
  1. package cn.ctyun.demo.flinksql;
  2. import cn.ctyun.demo.flinksql.udf.HashScalarFunction;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. /**
  6. * @Date 2023/4/14 14:50
  7. * @Description 读取mysql数据使用UDF函数转换并输出到mysql
  8. */
  9. public class FlinkSqlUdfMysql2Mysql {
  10.     public static void main(String[] args) {
  11.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12.         env.setParallelism(1);
  13.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  14.         // 1. 创建读取表,使用mysql进行
  15.         String source_ddl = "CREATE TABLE UserSource (" +
  16.                 " id INT, " +
  17.                 " name VARCHAR, " +
  18.                 " phone VARCHAR, " +
  19.                 " sex INT " +
  20.                 ") WITH (" +
  21.                 " 'connector.type' = 'jdbc', " +
  22.                 " 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_source?useSSL=false', " +
  23.                 " 'connector.table' = 'test_user_table', " +
  24.                 " 'connector.username' = 'root', " +
  25.                 " 'connector.password' = '*******'" +
  26.                 ")";
  27.         tableEnv.executeSql(source_ddl);
  28.         //  2. 创建写出表,使用mysql进行
  29.         String sink_ddl = "CREATE TABLE UserSink (" +
  30.                 "id INT, " +
  31.                 "name STRING, " +
  32.                 "phone STRING, " +
  33.                 "sex INT, " +
  34.                 "name_hash STRING " +
  35.                 ") WITH (" +
  36.                 " 'connector.type' = 'jdbc', " +
  37.                 " 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_sink?useSSL=false', " +
  38.                 " 'connector.table' = 'test_user_table_udf', " +
  39.                 " 'connector.username' = 'root', " +
  40.                 " 'connector.password' = '********'" +
  41.                 ")";
  42.         tableEnv.executeSql(sink_ddl);
  43.         // 3. 注册自定义标量函数
  44.         tableEnv.createTemporarySystemFunction("MyHash", HashScalarFunction.class);
  45.         // 4. 使用insert语句进行数据输出,在这里进行UDF查询转换
  46.         String insertSql = "INSERT INTO UserSink select id, name, phone, sex, MyHash(name) as name_hash from UserSource";
  47.         tableEnv.executeSql(insertSql);
  48.     }
  49. }
复制代码
2.3 在StreamPark中以SQL方式使用UDF函数

在StreamPark创建作业,导入作业依赖:
flink-connector-jdbc_2.12-1.14.3.jar
flink-demo-jar-job-1.0-SNAPSHOT.jar
mysql-connector-java-8.0.21.jar
FlinkSQL为:
  1. CREATE FUNCTION MyHash AS 'cn.ctyun.demo.flinksql.udf.HashScalarFunction';
  2. CREATE TABLE UserSource (
  3. id INT,
  4. name VARCHAR,
  5. phone VARCHAR,
  6. sex INT
  7. ) WITH (
  8. 'connector.type' = 'jdbc',
  9. 'connector.url' = 'jdbc:mysql://********:3306/flink_test_source?useSSL=false',
  10. 'connector.table' = 'test_user_table',
  11. 'connector.username' = 'root',
  12. 'connector.password' = '*********'
  13. );
  14. CREATE TABLE UserSink (
  15. id INT,
  16. name STRING,
  17. phone STRING,
  18. sex INT,
  19. name_hash STRING
  20. ) WITH (
  21. 'connector.type' = 'jdbc',
  22. 'connector.url' = 'jdbc:mysql://*******:3306/flink_test_sink?useSSL=false',
  23. 'connector.table' = 'test_user_table_udf',
  24. 'connector.username' = 'root',
  25. 'connector.password' = '**********'
  26. );
  27. INSERT INTO UserSink select id, name, phone, sex, MyHash(name) as name_hash from UserSource;
复制代码
运行作业后mysql可正常插入数据

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册