2021年大数据Flink(三十七):Table与SQL 案例四
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年大数据Flink(三十七):Table与SQL 案例四相关的知识,希望对你有一定的参考价值。
目录
案例四
需求
从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning
代码实现
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html
package cn.itcast.sql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* Author itcast
* Desc
*/
public class FlinkSQL_Table_Demo06 {
public static void main(String[] args) throws Exception {
//1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//2.Source
TableResult inputTable = tEnv.executeSql(
"CREATE TABLE input_kafka (\\n" +
" `user_id` BIGINT,\\n" +
" `page_id` BIGINT,\\n" +
" `status` STRING\\n" +
") WITH (\\n" +
" 'connector' = 'kafka',\\n" +
" 'topic' = 'input_kafka',\\n" +
" 'properties.bootstrap.servers' = 'node1:9092',\\n" +
" 'properties.group.id' = 'testGroup',\\n" +
" 'scan.startup.mode' = 'latest-offset',\\n" +
" 'format' = 'json'\\n" +
")"
);
TableResult outputTable = tEnv.executeSql(
"CREATE TABLE output_kafka (\\n" +
" `user_id` BIGINT,\\n" +
" `page_id` BIGINT,\\n" +
" `status` STRING\\n" +
") WITH (\\n" +
" 'connector' = 'kafka',\\n" +
" 'topic' = 'output_kafka',\\n" +
" 'properties.bootstrap.servers' = 'node1:9092',\\n" +
" 'format' = 'json',\\n" +
" 'sink.partitioner' = 'round-robin'\\n" +
")"
);
String sql = "select " +
"user_id," +
"page_id," +
"status " +
"from input_kafka " +
"where status = 'success'";
Table ResultTable = tEnv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
resultDS.print();
tEnv.executeSql("insert into output_kafka select * from "+ResultTable);
//7.excute
env.execute();
}
}
以上是关于2021年大数据Flink(三十七):Table与SQL 案例四的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Flink(三十三):Table与SQL相关概念
2021年大数据Flink(三十四):Table与SQL 案例一
2021年大数据Flink(三十九):Table与SQL 总结 Flink-SQL常用算子
2021年大数据Flink(三十):Flink Table API & SQL 介绍