2021年大数据Flink(三十七):​​​​​​​Table与SQL ​​​​​​案例四

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年大数据Flink(三十七):​​​​​​​Table与SQL ​​​​​​案例四相关的知识,希望对你有一定的参考价值。

目录

案例四

需求

​​​​​​​代码实现


​​​​​​​案例四

需求

从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka


{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "success"}

{"user_id": "1", "page_id":"1", "status": "fail"}
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka


/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka


/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka


/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning

 

​​​​​​​代码实现

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html

 

package cn.itcast.sql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Author itcast
 * Desc
 */
public class FlinkSQL_Table_Demo06 {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //2.Source
        TableResult inputTable = tEnv.executeSql(
                "CREATE TABLE input_kafka (\\n" +
                        "  `user_id` BIGINT,\\n" +
                        "  `page_id` BIGINT,\\n" +
                        "  `status` STRING\\n" +
                        ") WITH (\\n" +
                        "  'connector' = 'kafka',\\n" +
                        "  'topic' = 'input_kafka',\\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\\n" +
                        "  'properties.group.id' = 'testGroup',\\n" +
                        "  'scan.startup.mode' = 'latest-offset',\\n" +
                        "  'format' = 'json'\\n" +
                        ")"
        );
        TableResult outputTable = tEnv.executeSql(
                "CREATE TABLE output_kafka (\\n" +
                        "  `user_id` BIGINT,\\n" +
                        "  `page_id` BIGINT,\\n" +
                        "  `status` STRING\\n" +
                        ") WITH (\\n" +
                        "  'connector' = 'kafka',\\n" +
                        "  'topic' = 'output_kafka',\\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\\n" +
                        "  'format' = 'json',\\n" +
                        "  'sink.partitioner' = 'round-robin'\\n" +
                        ")"
        );

        String sql = "select " +
                "user_id," +
                "page_id," +
                "status " +
                "from input_kafka " +
                "where status = 'success'";

        Table ResultTable = tEnv.sqlQuery(sql);

        DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();

        tEnv.executeSql("insert into output_kafka select * from "+ResultTable);


        //7.excute
        env.execute();
    }


}

 

以上是关于2021年大数据Flink(三十七):​​​​​​​Table与SQL ​​​​​​案例四的主要内容,如果未能解决你的问题,请参考以下文章

2021年大数据Flink(三十三):​​​​​​​Table与SQL相关概念

2021年大数据Flink(三十四):​​​​​​​Table与SQL ​​​​​​案例一

2021年大数据Flink(三十九):​​​​​​​Table与SQL ​​​​​​总结 Flink-SQL常用算子

2021年大数据Flink(三十):Flink ​​​​​​​Table API & SQL 介绍

2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

2021年大数据Flink(三十一):​​​​​​​Table与SQL案例准备 依赖和​​​​​​​程序结构