从0到1Flink的成长之路-Table API& SQL入门案例
Posted 熊老二-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路-Table API& SQL入门案例相关的知识,希望对你有一定的参考价值。
入门案例:流计算
使用Flink Table API和SQL分别对流计算编写入门案例,具体如下所示。
SQL 案例
Flink SQL流式数据处理案例演示,官方Example案例,具体代码如下所示:
package xx.xxxxxx.flink.start.stream;
import lombok.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import java.util.Arrays;
/**
* Flink SQL流式数据处理案例演示,官方Example案例。
*/
public class StreamSQLDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Order {
public Long user;
public String product;
public Integer amount;
}
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 数据源-source
// 模拟数据集
DataStream<Order> orderA = env.fromCollection(Arrays.asList(
new Order(1001L, "beer", 3),
new Order(1001L, "diaper", 4),
new Order(1003L, "rubber", 2)
));
DataStream<Order> orderB = env.fromCollection(Arrays.asList(
new Order(1002L, "pen", 3),
new Order(1002L, "rubber", 3),
new Order(1004L, "beer", 1)
));
// 转换DataStream为Table
Table tableA = tableEnv.fromDataStream(orderA, "user, product, amount");
// 注册DataStream为Table
tableEnv.createTemporaryView("orderB", orderB, "user, product, amount");
// 3. 数据转换-transformation
// 查询并合并结果
Table resultTable = tableEnv.sqlQuery(
"SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL "
+ "SELECT * FROM orderB WHERE amount > 2"
);
// 将Table转换为DataStream
DataStream<Order> resultDataStream = tableEnv.toAppendStream(resultTable, Order.class);
// 4. 数据终端-sink
resultDataStream.printToErr();
// 5. 触发执行-execute
env.execute(StreamSQLDemo.class.getSimpleName()) ;
}
}
Table API 案例
Flink Table API使用,基于事件时间窗口统计分析,数据集如下:
1,beer,3,2020-12-12 00:00:01
1,diaper,4,2020-12-12 00:00:02
2,pen,3,2020-12-12 00:00:04
2,rubber,3,2020-12-12 00:00:06
3,rubber,2,2020-12-12 00:00:05
4,beer,1,2020-12-12 00:00:08
2,rubber,3,2020-12-12 00:00:10
3,rubber,2,2020-12-12 00:00:10
案例代码如下所示:
package xx.xxxxxx.flink.start.stream;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* Flink Table API使用,基于事件时间窗口统计分析,数据集如下:
*/
public class StreamWindowTableDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 设置时间语义:事件时间EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2. 数据源-source
/*
1,beer,3,2020-12-12 00:00:01
1,diaper,4,2020-12-12 00:00:02
2,pen,3,2020-12-12 00:00:04
2,rubber,3,2020-12-12 00:00:06
3,rubber,2,2020-12-12 00:00:05
4,beer,1,2020-12-12 00:00:08
2,rubber,3,2020-12-12 00:00:10
3,rubber,2,2020-12-12 00:00:10
*/
DataStreamSource<String> inputStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 数据转换
FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
// 3.1 数据过滤filter和转换map
SingleOutputStreamOperator<Row> rowStream = inputStream
.filter(line -> null != line && line.trim().split(",").length == 4)
.map(new MapFunction<String, Row>() {
@Override
public Row map(String line) throws Exception {
// 样本数据:3,rubber,2,2019-12-12 00:00:05
String[] fields = line.trim().split(",");
Long orderTime = format.parse(fields[3]).getTime();
String userId = fields[0];
String productName = fields[1];
Integer amount = Integer.parseInt(fields[2]);
// 返回Row对象
return Row.of(orderTime, userId, productName, amount);
}
}).returns(Types.ROW(Types.LONG, Types.STRING, Types.STRING, Types.INT));
// 3.2 设置事件时间字段和水位线Watermark
SingleOutputStreamOperator<Row> timeStream = rowStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.seconds(0)) {
@Override
public long extractTimestamp(Row row) {return (Long) row.getField(0); }
}
);
// TODO: 将DataStream创建为临时视图
tableEnv.createTemporaryView(
"t_orders",
timeStream,
"order_time, user_id, product_name, amount, event_time.rowtime"
);
// 数据处理分析,DSL编程
Table resultTable = tableEnv
.from("t_orders") // from
// 设置窗口
.window(Tumble.over("5.seconds").on("event_time").as("win"))
// 先按照窗口分组,再按照用户分组
.groupBy("win, user_id")
.select("win.start, win.end, user_id, amount.sum as total");
// 转换Table为DataStream
DataStream<Row> resultDataStream = tableEnv.toAppendStream(resultTable, Row.class);
// 4. 数据终端-sink
resultDataStream.printToErr();
// 5. 应用执行
env.execute(StreamWindowTableDemo.class.getSimpleName()) ;
}
}
以上是关于从0到1Flink的成长之路-Table API& SQL入门案例的主要内容,如果未能解决你的问题,请参考以下文章
从0到1Flink的成长之路-Table API& SQL巩固案例
从0到1Flink的成长之路-Table API& SQL入门案例
从0到1Flink的成长之路(二十一)-Flink Table API 与 SQL
从0到1Flink的成长之路-Table API& SQL发展历程