Flink学习之Table API(python版本)

Posted 柳小葱

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink学习之Table API(python版本)相关的知识,希望对你有一定的参考价值。

⛄️昨天学习完一些比较基础的Datastream的API后,让我们继续学习Datastream上一层的Table API和SQL,这两个API都是处理关系型数据的,可以降低flink的开发门槛。对往期内容感兴趣的同学可以参考如下内容👇:

💦Table API和SQL可以通过一种更加直观的方式对数据流进行多种处理,比如选择、过滤、分组、求和以及多表连接,也支持窗口操作。让我们开始今日份的学习吧!

目录

1. 开发环境的构建

我们在上一章DataStream API已经介绍过一遍,这里仔简单介绍一下:

  • 准备一台带java8或11、python(3.6,3.7,3.8)的机器
  • 写入命令👇
python3 -m pip install apache-flink

TableEnvironment的开发环境构建完成,后续就可以在IDEA中进行Table API和SQL的编码工作了。

2. 创建TableEnvironment

开发Table API和SQL必须先声明执行上下文环境为TableEnvironment,它是一个接口,主要有如下功能

  • 创建 Table
  • 将 Table 注册成临时表
  • 执行 SQL 查询,更多细节可查阅 SQL
  • 注册用户自定义的 (标量,表值,或者聚合) 函数
  • 配置作业
  • 管理 Python 依赖
  • 提交作业执行

例如:创建流和批的TableEnvironment

from pyflink.table import EnvironmentSettings, TableEnvironment
#创建流TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 创建批TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

3. 连接外部数据源

Flink Table API和SQL可以通过Connector与外部文件系统进行交互,用于流批数据的读取操作。外部文件系统包含传统的关系型数据库、K-V内存数据库、消息队列以及分布式文件系统等。

3.1 连接文件系统的文件

# 环境配置
t_env = TableEnvironment.create(
    environment_settings=EnvironmentSettings.in_batch_mode())
#连接文件系统文件
create table test01(
id int,
name string
)WITH(
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/Users/liuxiaocong/data01.csv'
)

其中WITH(…)中的内容为外部连接器的配置,不同的外部连接器涉及的配置不同,且必选的和可选的配置都不同。

解释:‘connector.type’ = ‘filesystem’说明外部连接器的类型为文件系统,‘connector.path’ =’/Users/liuxiaocong/data01.csv’指定了外部文件系统连接器的文件路径,‘format.type’ = 'csv’则说明文件系统的数据格式为CSV。

3.2 连接消息队列kafka

create table test02(
id int,
name string
)WITH(
'connector. type' = 'kafka'
'connector.version'='0.11'
'connector. topic'='topic name'
'connector.properties.bootstrap. servers'='localhost:9092'
'connector.properties.group.id'='my group id'
'connector.startup-mode'='earliest-offset'
'format.type '='json')

解释:‘connector.type’ = 'kafka’表示此外部连接器的类型为Kafka,‘connector.topic’ = 'topic_name’指定Kafka消息主题,‘connector.properties.bootstrap.servers’ = 'localhost:9092’指定Kafka服务的地址。‘format.type’ = 'json’指定数据格式为JSON。

3.3 连接ElasticSearch(ES)

create table test02(
id int,
name string
)
WITH
(
'connector. type'='elasticsearch',
'connector.version'='7',
'connector. hosts'='http://hostl:9092;http://host2:9093',
'connector. index'='myusers',
'connector. document-type'='user',
'format. type'='json',
'update-mode'='append',
'connector. key-delimiter'='$',
'connector. key-null-literal'='n/a',
'connector. failure-handler'='fail',
'connector. flush-on-checkpoint'='true',
'connector. bulk-flush.max-actions'='42',
'connector.bulk-flush.max-size'='42 mb',
'connector.bulk-flush. interval'='60000',
'connector. bulk-flush.backoff.type'='disabled',
'connector.bulk-flush.backoff.max-retries'='3',
'connector, bulk-flush.backoff.delay'='30000',
'connector. connection-max-retry-timeout'='3'

解释:connector.type’ = 'elasticsearch’表示此外部连接器的类型为Elasticsearch。‘connector.hosts’='http://host1:9092;http://host2:9093’指定Elasticsearch服务的地址。‘format.type’ ='json’指定数据格式为JSON。

3.4 Table API根据SQL建表

不知道大家看完上面有没有产生疑惑,上面都是SQL和Table有啥关系,这是个好问题,execute_sql可来执行Create Table相关DDL语句,来获取Table

例如:

from pyflink.table import *

# 环境配置
t_env = TableEnvironment.create(
    environment_settings=EnvironmentSettings.in_batch_mode())

# 在表环境中注册 Orders 表
source_data_path = "/Users/liuxiaocong/test/"
source_ddl = f"""
        create table Orders(
            a VARCHAR,
            b BIGINT,
            c BIGINT,
            rowtime TIMESTAMP(3),
            WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = 'source_data_path'
        )
        """
#execute_sql建表
t_env.execute_sql(source_ddl)
#获取Orders表
orders = t_env.from_path("Orders") 

4. Table的操作

下面将介绍一些比较常见的操作,以下所有操作都使用python编写,并都支持Streaming和Batch

4.1 from_path

和 SQL 查询的 FROM 子句类似,执行一个注册过的表的扫描。

orders = t_env.from_path("Orders")

4.2 from_elements

和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。

#1.不指定类型,自己猜
table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')])
#2.指定类型
table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')],
  schema=DataTypes.Row([DataTypes.FIELD('id', DataTypes.DECIMAL(10, 2)),
                        DataTypes.FIELD('name', DataTypes.STRING())]))

4.3 select

和 SQL 的 SELECT 子句类似。 执行一个选择操作。

#1.select基础用法
orders = t_env.from_path("Orders")
result = orders.select(orders.a, orders.c.alias('d'))
#2.也可以select*
from pyflink.table.expressions import col
result = orders.select(col("*"))

4.4 as

重命名字段

orders = t_env.from_path("Orders")
result = orders.alias("x, y, z, t")

4.5 where和filter

和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。

#1.where
orders = t_env.from_path("Orders")
result = orders.where(orders.a == 'red')
#2.filter
orders = t_env.from_path("Orders")
result = orders.filter(orders.a == 'red')

4.6 add_columns

执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。

from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
result = orders.add_columns(concat(orders.c, 'sunny'))

4.7 add_or_replace_columns

执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。

from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc'))

4.8 drop_column

删除某列

orders = t_env.from_path("Orders")
result = orders.drop_columns(orders.b, orders.c)

4.9 rename_columns

执行字段重命名操作。 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名。

orders = t_env.from_path("Orders")
result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2'))

4.10 group_by

和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。

orders = t_env.from_path("Orders")
result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d'))

4.11 Window group_by

使用分组窗口结合单个或者多个分组键对表进行分组和聚合。

from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col

orders = t_env.from_path("Orders")
result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \\ 
               .group_by(orders.a, col('w')) \\
               .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d'))

4.12 over_window

和 SQL 的 OVER 子句类似,所有的聚合必须定义在同一个窗口上,比如同一个分区、排序和范围内。目前只支持 PRECEDING 到当前行范围(无界或有界)的窗口。尚不支持 FOLLOWING 范围的窗口。ORDER BY 操作必须指定一个单一的时间属性。

from pyflink.table.window import Over
from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE

orders = t_env.from_path("Orders")
result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime)
                            .preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE)
                            .alias("w")) \\
               .select(orders.a, orders.b.avg.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w')))

4.13 distinct

和 SQL DISTINCT 聚合子句类似,例如 COUNT(DISTINCT a)。 Distinct 聚合声明的聚合函数(内置或用户定义的)仅应用于互不相同的输入值.

#1.distinct和聚合函数一起用
from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE
orders = t_env.from_path("Orders")
# 按属性分组后的的互异(互不相同、去重)聚合
group_by_distinct_result = orders.group_by(orders.a) \\
                                 .select(orders.a, orders.b.sum.distinct.alias('d'))
# 按属性、时间窗口分组后的互异(互不相同、去重)聚合
group_by_window_distinct_result = orders.window(
    Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, col('w')) \\
    .select(orders.a, orders.b.sum.distinct.alias('d'))
# over window 上的互异(互不相同、去重)聚合
result = orders.over_window(Over
                       .partition_by(orders.a)
                       .order_by(orders.rowtime)
                       .preceding(UNBOUNDED_RANGE)
                       .alias("w")) \\
                       .select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w')))

#2.单独去重
orders = t_env.from_path("Orders")
result = orders.distinct()

4.14 Inner Join

和 SQL 的 JOIN 子句类似。关联两张表。两张表必须有不同的字段名,并且必须通过 join 算子或者使用 where 或 filter 算子定义至少一个 join 等式连接谓词。

from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
result = left.join(right).where(left.a == right.d).select(left.a, left.b, right.e)

4.15 Outer Join

和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名,并且必须定义至少一个等式连接谓词。

from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
# 左连接
left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
#右链接
right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)
#全连接
full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e)

5. 时间概念

在前面我们提到过Flink可以分为事件时间和处理时间,同样地,在Flink Table API和SQL中,也有时间的概念,需要注意的点如下:

  • 在Table中,时间概念需要通过Table的时间属性(Time Attributes)来进行确定,它是Table Schema的一部分。
  • Table的时间属性可以用Create Table DDL语法或从DataStream创建表时进行定义。一旦定义了时间属性,就可以引用该字段。

为了对乱序事件数据进行处理,Table API和SQL程序需要知道每一行事件数据的时间戳,并且还需要定期来生成水位线Watermark。目前可以在CREATE TABLE DDL或DataStream到Table的转换过程中定义事件时间属性以及Watermark生成策略。

5.1 Watermark生成策略

  1. 严格的单调递增水印
-- 以当前事件数据中最大的时间戳为水印进行发送。当事件数据中的时间戳比水位印大时,则视为非迟到数据。
WATERMARK FOR rowtime_column AS rowtime_column
  1. 单调递增水印
-- 以当前事件数据中最大的时间戳减去1毫秒为水印进行发送。当事件数据中的时间戳比水印大或者相等时,则视为非迟到数据。
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
  1. 固定延迟水印
--当前事件数据中最大的时间戳减去一个固定的延迟时间长度为水印进行发送”
WATERMARK FOR addtime AS addtime - INTERVAL '5' SECOND

6. 总结

本章节本来是想将flink的Table API和SQL一起写了,可是发现光是Table API的内容就够多了,而且Table API这一部分中的流的转换、窗口操作、与pandas的转换等都还没讲述,后续将继续补充。

7. 参考资料

《PyDocs》(pyflink官方文档)
《Flink入门与实战》
《Kafka权威指南》
《Apache Flink 必知必会》
《Apache Flink 零基础入门》
《Flink 基础教程》

以上是关于Flink学习之Table API(python版本)的主要内容,如果未能解决你的问题,请参考以下文章