从0到1Flink的成长之路-Table API& SQL巩固案例

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路-Table API& SQL巩固案例相关的知识,希望对你有一定的参考价值。

流计算

需求
使用Flink SQL进行实时交易订单数据基本统计分析:

每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额

自定义数据源Source,模拟产生交易订单数据,订单实体类:OrderInfo

package xx.xxxxxx.flink.stream;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderInfo 
private String orderId;
private Integer userId;
private Integer money;
private Long createTime;

SQL 实现
使用Flink SQL API实现:对订单数据进行统计分析,具体如下所示:

package xx.xxxxxx.flink.stream;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 使用Flink Table API实现:对订单数据进行统计分析
*/
public class FlinkStreamTableDemo 
public static void main(String[] args) throws Exception 
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 数据源-source
DataStreamSource<OrderInfo> orderDataStream = env.addSource(
new RichSourceFunction<OrderInfo>() 
private boolean isRunning = true;
@Override
public void run(SourceContext<OrderInfo> ctx) throws Exception 
Random random = new Random();
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
while(isRunning)
OrderInfo info = new OrderInfo(
UUID.randomUUID().toString().substring(0, 18), //
random.nextInt(3) + 1 , //
random.nextInt(100) + 1, //
System.currentTimeMillis() //
);
System.out.println(
info.getUserId() + ", " + info.getMoney() + ", " + format.format(info.getCreateTime())
);
ctx.collect(info);
TimeUnit.SECONDS.sleep(1);


@Override
public void cancel() isRunning = false; 

);
// 设置事件时间EventTime字段和Watermark水位线
SingleOutputStreamOperator<OrderInfo> eventTimeDataStream = orderDataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<OrderInfo>(Time.seconds(0)) 
@Override
public long extractTimestamp(OrderInfo orderInfo) 
return orderInfo.getCreateTime();


);
// 3. 数据转换-transformation
// 注册表
tableEnv.createTemporaryView(
"t_orders", eventTimeDataStream, "orderId, userId, money, createTime.rowtime"
);
// 执行SQL
String sql = "SELECT userId AS user_id, " +
"TUMBLE_START(createTime, interval '5' second) AS win_start, " +
"TUMBLE_END(createTime, interval '5' second) AS win_end, " +
"COUNT(1) AS total_count, " +
"MAX(money) AS max_money, " +
"MIN(money) AS min_money " +
"FROM t_orders " +
"GROUP BY userId, tumble(createTime, interval '5' second) " ;
Table table = tableEnv.sqlQuery(sql);
// Table转换为DataSet
DataStream<Tuple2<Boolean, Row>> resultDataStream = tableEnv.toRetractStream(table, Row.class);
// 4. 数据终端-sink
resultDataStream.printToErr();
// 5. 触发执行-execute
env.execute(FlinkStreamTableDemo.class.getSimpleName());


使用flinkSQL处理实时数据当我们把表转化成流的时候,需要使用toAppendStream与
toRetractStream这两个方法:
toAppendStream:将计算后的数据append到结果DataStream中去,只有在动态Table仅通
过INSERT更改修改时才能使用此模式,即它仅附加,并且以前发出的结果永远不会更新。

toRetractStream:将计算后的新的数据在DataStream原数据的基础上更新true或是删除
false,始终可以使用此模式。返回值是boolean类型。它用true或false来标记数据的插入和撤
回,返回true代表数据插入,false代表数据的撤回。

在Table API中基于事件时间窗口分析,书写注意说明:
在Table API中基于事件时间窗口分析

Table API 实现
使用Flink Table API实现:对订单数据进行统计分析,代码如下所示:

package xx.xxxxxx.flink.stream;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 使用Flink SQL API实现:对订单数据进行统计分析
*/
public class FlinkStreamSQLDemo 
public static void main(String[] args) throws Exception 
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 数据源-source
DataStreamSource<OrderInfo> orderStream = env.addSource(
new RichSourceFunction<OrderInfo>() 
private boolean isRunning = true;
@Override
public void run(SourceContext<OrderInfo> ctx) throws Exception 
Random random = new Random();
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
while(isRunning)
OrderInfo info = new OrderInfo(
UUID.randomUUID().toString().substring(0, 18), //
random.nextInt(3) + 1 , //
random.nextInt(100) + 1, //
System.currentTimeMillis() //
);
System.out.println(
info.getUserId() + ", " + info.getMoney() + ", "
+ format.format(info.getCreateTime())
);
ctx.collect(info);
TimeUnit.SECONDS.sleep(1);


@Override
public void cancel() 
isRunning = false;


);
// 设置事件时间EventTime字段和Watermark水位线
SingleOutputStreamOperator<OrderInfo> timeStream = orderStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<OrderInfo>(Time.seconds(0)) 
@Override
public long extractTimestamp(OrderInfo orderInfo) 
return orderInfo.getCreateTime();


);
// 3. 数据转换-transformation
// 注册表
tableEnv.createTemporaryView(
"t_orders", timeStream, "orderId, userId, money, createTime.rowtime"
);
// 查看表的约束
tableEnv.from("t_orders").printSchema();
// TODO: Table API 查询
Table resultTable = tableEnv
.from("t_orders") // 转换DataStream为Table
// 设置窗口
.window(Tumble.over("5.second").on("createTime").as("win"))
// 分组
.groupBy("win, userId")
// 选择字段
.select(
"userId AS user_id, win.start AS wstart, win.end AS wend, " +
"COUNT(1) AS total_count, MAX(money) AS max_money, MIN(money) AS min_money"
);
// Table转换为DataSet
DataStream<Tuple2<Boolean, Row>> resultDataStream = tableEnv.toRetractStream(resultTable, Row.class);
// 4. 数据终端-sink
resultDataStream.printToErr();
// 5. 触发执行-execute
env.execute(FlinkStreamSQLDemo.class.getSimpleName());


以上是关于从0到1Flink的成长之路-Table API& SQL巩固案例的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路-Table API& SQL巩固案例

从0到1Flink的成长之路-Table API& SQL入门案例

从0到1Flink的成长之路(二十一)-Flink Table API 与 SQL

从0到1Flink的成长之路-Table API& SQL发展历程

从0到1Flink的成长之路-Table API& SQL巩固案例

从0到1Flink的成长之路-Table API& SQL巩固案例