Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

Posted 勇敢羊羊在飞奔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据相关的知识,希望对你有一定的参考价值。

Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

一、引入flink相关依赖

    <groupId>com.bigdata</groupId>
    <artifactId>flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <flink.version>1.13.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_$scala.binary.version</artifactId>
            <version>$flink.version</version>
            <!-- <scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_$scala.binary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

    </dependencies>

二、properties保存连接kafka的配置

    //用properties保存kafka连接的相关配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")
    properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"debezium\\" password=\\"swlfalfal\\";")
    properties.setProperty("security.protocol","SASL_PLAINTEXT")
    properties.setProperty("sasl.mechanism", "PLAIN")
    properties.setProperty("group.id","flink-test")
    properties.setProperty("auto.offset.reset","earliest")

三、构建flink实时消费环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setRestartStrategy(RestartStrategies.noRestart())

四、添加Kafka源和处理数据

    val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]
    ("debezium-test-optics_uds",new SimpleStringSchema(),properties))
    lines.print()
    //触发执行
    env.execute()

五、完整代码



import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.util.Properties


object SourceKafka 


  def main(args: Array[String]): Unit = 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setRestartStrategy(RestartStrategies.noRestart())

    //用properties保存kafka连接的相关配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")
    properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"debezium\\" password=\\"******\\";")
    properties.setProperty("security.protocol","SASL_PLAINTEXT")
    properties.setProperty("sasl.mechanism", "PLAIN")
    properties.setProperty("group.id","flink-test")
    properties.setProperty("auto.offset.reset","earliest")


	//添加kafka源,并打印数据
    val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]
    ("debezium-test-optics_uds",new SimpleStringSchema(),properties))
    lines.print()
    //触发执行
    env.execute()
  




六、执行程序查看消费到的数据


    "schema":
        "type":"struct",
        "fields":[
            
                "type":"struct",
                "fields":[
                    
                        "type":"int32",
                        "optional":false,
                        "field":"sid"
                    ,
                    
                        "type":"string",
                        "optional":false,
                        "field":"sname"
                    ,
                    
                        "type":"int64",
                        "optional":false,
                        "name":"io.debezium.time.Timestamp",
                        "version":1,
                        "field":"updatetime"
                    ,
                    
                        "type":"string",
                        "optional":false,
                        "field":"ssex"
                    
                ],
                "optional":true,
                "name":"debezium_test_optics_uds.Value",
                "field":"before"
            ,
            
                "type":"struct",
                "fields":[
                    
                        "type":"int32",
                        "optional":false,
                        "field":"sid"
                    ,
                    
                        "type":"string",
                        "optional":false,
                        "field":"sname"
                    ,
                    
                        "type":"int64",
                        "optional":false,
                        "name":"io.debezium.time.Timestamp",
                        "version":1,
                        "field":"updatetime"
                    ,
                    
                        "type":"string",
                        "optional":false,
                        "field":"ssex"
                    
                ],
                "optional":true,
                "name":"debezium_test_optics_uds.Value",
                "field":"after"
            ,
            
                "type":"struct",
                "fields":[
                    
                        "type":"string",
                        "optional":false,
                        "field":"version"
                    ,
                    
                        "type":"string",
                        "optional":false,
                        "field":"connector"
                    ,
                    
                        "type":"string",
                        "optional":false,
                        "field":"name"
                    ,
                    
                        "type":"int64",
                        "optional":false,
                        "field":"ts_ms"
                    ,
                    
                        "type":"string",
                        "optional":true,
                        "name":"io.debezium.data.Enum",
                        "version":1,
                        "parameters":
                            "allowed":"true,last,false,incremental"
                        ,
                        "default":"false",
                        "field":"snapshot"
                    ,
                    
                        "type":"string",
                        "optional":false,
                        "field":"db"
                    ,
                    
                        "type":"string",
                        "optional":true,
                        "field":"sequence"
                    ,
                    
                        "type":"string",
                        "optional":true,
                        "field":"table"
                    ,
                    
                        "type":"int64",
                        "optional":false,
                        "field":"server_id"
                    ,
                    
                        "type":"string",
                        "optional":true,
                        "field":"gtid"
                    ,
                    
                        "type":"string",
                        "optional":false,
                        "field":"file"
                    ,
                    
                        "type":"int64",
                        "optional":false,
                        "field":"pos"
                    ,
                    
                        "type":"int32",
                        "optional":false,
                        "field":"row"
                    ,
                    
                        "type":"int64",
                        "optional":true,
                        "field":"thread"
                    ,
                    
                        "type":"string",
                        "optional":true,
                        "field":"query"
                    
                ],
                "optional":false,
                "name":"io.debezium.connector.mysql.Source",
                "field":"source"
            ,
            
                "type":"string",
                "optional":false,
                "field":"op"
            ,
            
                "type":"int64",
                "optional":true,
                "field":"ts_ms"
            ,
            
                "type":"struct",
                "fields":[
                    
                        "type":"string",
                        "optional":false,
                        "field":"id"
                    ,
                    
                        "type":"int64",
                        "optional":false,
                        "field":"total_order"
                    ,
                    
                        "type":"int64",
                        "optional":false,
                        "field":"data_collection_order"
                    
                ],
                "optional":true,
                "field":"transaction"
            
        ],
        "optional":false,
        "name":"debezium_test_optics_uds.Envelope"
    ,
    "payload":
        "before":null,
        "after":
            "sid":3600,
            "sname":"f",
            "updatetime":1661126400000,
            "ssex":"a"
        ,
        "source":
            "version":"1.9.6.Final",
            "connector":"mysql",
            "name":"debezium-uds8-optics8-test_1h",
            "ts_ms":1665155935000,
            "snapshot":"false",
            "db":"dw",
            "sequence":null,
            "table":"student",
            "server_id":223344,
            "gtid":null,
            "file":"mysql-bin.000012",
            "pos":6193972,
            "row":0,
            "thread":66072,
            "query":"/* ApplicationName=DBeaver 21.0.1 - SQLEditor <Script-3.sql> */ insert into dw.student values(3600,'f','20220822','a')"
        ,
        "op":"c",
        "ts_ms":1665155935640,
        "transaction":null
    

以上是关于Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据的主要内容,如果未能解决你的问题,请参考以下文章

入门大数据---Flink开发环境搭建

Flink学习之flink sql

Flink学习之flink sql

Flink内存管理源码解读之基础数据结构

Flink系列之:Flink CDC实现海量数据入湖

大数据flink系列第二话(flink架构)