10-flink-1.10.1- flink Sink api 输出算子

Posted 逃跑的沙丁鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了10-flink-1.10.1- flink Sink api 输出算子相关的知识,希望对你有一定的参考价值。

目录

1 flink sink

 2 file sink

3 kafka sink

3.1 生产者生产到topic:topic:sensor_input_csv

3.2 flink代码

3.3 消费者从topic:sensor_out消费

4 redis sink

4.1 引入依赖

4.2 代码 

4.3 运行验证结果

5 Es Sink

5.1 引入依赖

5.2 代码 

5.3 验证结果

6 flink 自定义sink

6.1 引入mysql jdbc 驱动依赖

6.2 代码 

 6.3 总结


1 flink sink

 2 file sink

package com.study.liucf.unbounded.sink

import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/9/13
 */
object FileSink 
  def main(args: Array[String]): Unit = 
    //创建flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //读取数据
    val inputStream: DataStream[String] = env.readTextFile("src\\\\main\\\\resources\\\\sensor.txt")
    //转换数据类型 string 类型转换成LiucfSensorReding,求最小值
    val ds = inputStream.map(r=>
      val arr = r.split(",")
      LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble)
    )
    //输出到控制台
    ds.print()
    //输出到文件
//    ds
//      .writeAsCsv("src\\\\main\\\\resources\\\\sensor.csv")
//      .setParallelism(1)//默认会分布式并行执行根据多少并行度生成多少文件,这里我让它生成一个文件
    ds.addSink(StreamingFileSink.forRowFormat(
      new Path("src\\\\main\\\\resources\\\\sensor2.csv"),
      new SimpleStringEncoder[LiucfSensorReding]()
    ).build())
    //可见writeAsCSV已经被弃用了
    //启动flink执行
    env.execute("liucf sink api")
  

3 kafka sink

本示例演示,数据从kafka的一个topic:sensor_input_csv读入然
后写出到kafka的另一个topic:sensor_out

3.1 生产者生产到topic:topic:sensor_input_csv

package com.study.liucf.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.Callback, KafkaProducer, ProducerRecord, RecordMetadata
import org.apache.kafka.common.serialization.StringSerializer

object SensorProduce2 
  def main(args: Array[String]): Unit = 
    val kafkaProp = new Properties()
    kafkaProp.put("bootstrap.servers", "192.168.109.151:9092")
    kafkaProp.put("acks", "1")
    kafkaProp.put("retries", "3")
    //kafkaProp.put("batch.size", 16384)//16k
    kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("topic","sensor_input_csv")
    val producer = new KafkaProducer[String, String](kafkaProp)
    val sensor = "sensor_1,1617505482,36.6"
    send(sensor,producer)
    producer.close()
  


  def send(str:String,producer: KafkaProducer[String, String]): Unit =
    val record = new ProducerRecord[String, String]("sensor_input_csv", str )
    producer.send(record, new Callback 
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = 
        if (metadata != null) 
          println("发送成功")
        
        if (exception != null) 
          println("消息发送失败")
        
      
    )
  

3.2 flink代码

数据从kafka的一个topic:sensor_input_csv读入然 后写出到kafka的另一个topic:sensor_out

package com.study.liucf.unbounded.sink

import java.util.Properties

import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
/**
 * @Author liucf
 * @Date 2021/9/18
 *      本示例演示,数据从kafka的一个topic:sensor_input_csv读入然
 *      后写出到kafka的另一个topic:sensor_out
 */
object KafkaSink 
  def main(args: Array[String]): Unit = 
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置kafak配置项
    val props = new Properties()
    props.setProperty("bootstrap.servers","192.168.109.151:9092")
    props.setProperty("topic","sensor_input_csv")
    //添加kafka数据源
    val inputDs: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor_input_csv",new SimpleStringSchema(),props))
    val transDs: DataStream[String] = inputDs.map(d=>
      val arr = d.split(",")
      LiucfSensorReding(arr(0),arr(1).toLong,arr(2).toDouble).toString
    )
    //输出结果到标准控制台
    transDs.print()
    //输出到kafka的另一个topic里
    val outProps = new Properties()
    outProps.setProperty("bootstrap.servers","192.168.109.151:9092")
    transDs.addSink(new FlinkKafkaProducer[String](
      "sensor_out",new SimpleStringSchema(),outProps))
    //启动执行flink
    env.execute("liucf kafka sink api test")
  

3.3 消费者从topic:sensor_out消费

消费数据并打印出来

package com.study.liucf.kafka

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerRecords, KafkaConsumer

import scala.collection.JavaConversions._

/**
 * @author liuchangfu@easyretailpro.com
 * @date 2021/1/14 20:44
 * @version 1.0
 */
object SimpleConsumer 
  def main(args: Array[String]): Unit = 
    /** 判断是否指定消费的topic */
    if (args.length == 0) 
      println("Enter topic name")
      return
    
    //Kafka consumer configuration settings
    /**参数传递topic*/
    val topicName = args(0)
    /**为属性创建实例来访问生成器配置*/
    val props: Properties = new Properties()
    import java.util.UUID
    props.put("bootstrap.servers", "192.168.109.151:9092")
    props.put("acks", "all")
    props.put("retries", "0")

    /** 将单个消费者分配给组 */
    props.put("group.id", "test")

    /** 如果值为true,则为偏移启用自动落实,否则不提交。 */
    props.put("enable.auto.commit", "true")

    /** 返回更新的消耗偏移量写入ZooKeeper的频率 */
    props.put("auto.commit.interval.ms", "1000")

    /** 表示Kafka在放弃和继续消费消息之前等待ZooKeeper响应请求(读取或写入)多少毫秒。 */
    props.put("session.timeout.ms", "30000")

    /** 反序列化器接口的键 */
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    /** 反序列化器接口的值 */
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    /** 当各分区下有已提交的offset时,
     * 从提交的offset开始消费(也就是从上一次消费的偏移量开始往后消费,不管上次消费的consumer是谁);
     * 无提交的offset时,从头开始消费(如果该topic没被消费过就从开始消费) */
    props.put("auto.offset.reset", "earliest")

    /** props.put("auto.offset.reset", "earliest"); 加上props.put("group.id", UUID.randomUUID().toString());
     * 可以实现 --from-beginning 功能 */
    props.put("group.id", UUID.randomUUID.toString)

    /** 当各分区下有已提交的offset时,从提交的offset开始消费(也就是从上一次消费的偏移量开始往后消费,不管上次消费的consumer是谁);
     * 无提交的offset时,消费新产生的该分区下的数据(也就是说如果该topic没被消费过即没有offset记录的情况下
     * 则从我这个consumer启动开始消费开始往后生产到该topic的数据才被消费,之前的数就不消费了) */
    //        props.put("auto.offset.reset", "latest");
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer(props)
    /**Kafka Consumer subscribes list of topics here.Kafka使用者在这里订阅主题列表。*/
    consumer.subscribe(util.Arrays.asList(topicName))
    println("Subscribed to topic " + topicName)
    while (true)

      val records: ConsumerRecords[String, String] = consumer.poll(100)

      for (record <- records) 
        /** print the offset,key and value for the consumer records.打印消费者记录的偏移量、键和值。 */
        printf("offset = %d, key = %s, value = %s\\n", record.offset, record.key, record.value)
      
    
  

4 redis sink

redis安装请参考:redis 安装_m0_37813354的博客-CSDN博客

4.1 引入依赖

<!-- flink 连接redis依赖 -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

4.2 代码 

package com.study.liucf.unbounded.sink

import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand, RedisCommandDescription, RedisMapper

/**
 * @Author liucf
 * @Date 2021/9/19
 */
object RedisSinkLiucf 
  def main(args: Array[String]): Unit = 
    //创建flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //读取数据
    val inputStream: DataStream[String] = env.readTextFile("src\\\\main\\\\resources\\\\sensor.txt")
    //转换数据类型 string 类型转换成LiucfSensorReding,求最小值
    val ds = inputStream.map(r => 
      val arr = r.split(",")
      LiucfSensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
    )
    //输出到控制台
    ds.print()
    val conf = new FlinkJedisPoolConfig.Builder()
      .setHost("192.168.109.151")
      .setPort(6379)
      .setPassword("liucf123456")
      .build()
    ds.addSink(new RedisSink(conf, new LiucfRedisMapper()))
    env.execute("flink sink api : redis sink")
  


/**
 * 定义一个RedisMapper类
 */
class LiucfRedisMapper extends RedisMapper[LiucfSensorReding] 
  //定义redis保存写入命名,HSET 表名 key value
  override def getCommandDescription: RedisCommandDescription = 
    new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")

  

  //将温度值指定位value
  override def getKeyFromData(data: LiucfSensorReding): String = 
    data.id
  

  //将传感器id指定位key
  override def getValueFromData(data: LiucfSensorReding): String = 
    data.temperature.toString
  

4.3 运行验证结果

192.168.109.151:6379> keys *
(empty array)
192.168.109.151:6379> hget sensor_temp sensor_1 ##获取表sensor_temp 中key=sensor_1的
"36.9"
192.168.109.151:6379> hgetall sensor_temp ##获取表sensor_temp里所有数据
 1) "sensor_5"
 2) "36.5"
 3) "sensor_1"
 4) "36.9"
 5) "sensor_10"
 6) "38.1"
 7) "sensor_4"
 8) "36.4"
 9) "sensor_21"
10) "38.1"
11) "sensor_3"
12) "36.3"
13) "sensor_2"
14) "36.2"

5 Es Sink

centos7安装elasticsearch-6.6.0_m0_37813354的博客-CSDN博客

5.1 引入依赖

<!-- 连接es -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>

5.2 代码 

package com.study.liucf.unbounded.sink

import java.util

import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction, RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

/**
 * @Author liucf
 * @Date 2021/9/20
 */
object LiucfEsSinke 
  def main(args: Array[String]): Unit = 
    //创建flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //读取数据
    val inputStream: DataStream[String] = env.readTextFile("src\\\\main\\\\resources\\\\sensor.txt")
    //转换数据类型 string 类型转换成LiucfSensorReding,求最小值
    val ds = inputStream.map(r => 
      val arr = r.split(",")
      LiucfSensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
    )
    //输出到控制台
    ds.print()
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("192.168.109.151",9200))
    //定义匿名类
    val myEsSinkFunction = new ElasticsearchSinkFunction[LiucfSensorReding]
      override def process(element: LiucfSensorReding,
                           ctx: RuntimeContext,
                           indexer: RequestIndexer): Unit = 
        //包装一个map作为DataSource
        val dataSource = new util.HashMap[String,String]()
        dataSource.put("sensor_id",element.id)
        dataSource.put("timestamp",element.timestamp.toString)
        dataSource.put("timestamp",element.temperature.toString)
        //创建index request 用户发送http请求
        val indexRequest: IndexRequest = Requests
          .indexRequest()
          .index("liucf_sensor")
          .`type`("temperature")
          .source(dataSource)
        //用indexer发送请求
        indexer.add(indexRequest)

      
    
    //输出到es
    ds.addSink(new ElasticsearchSink.Builder[LiucfSensorReding](
      httpHosts,
      myEsSinkFunction
    ).build())
    env.execute("flink sink api : elasticsearch sink")
  

5.3 验证结果

6 flink 自定义sink

这里尝试自定义sink写入数据到mysql

6.1 引入mysql jdbc 驱动依赖

<dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>

6.2 代码 

package com.study.liucf.unbounded.sink


import java.sql.Connection, DriverManager, PreparedStatement

import com.study.liucf.bean.LiucfSensorReding
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/9/20
 */
object LiucfDefinedJdbcSink 
  def main(args: Array[String]): Unit = 
    //创建flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //读取数据
    val inputStream: DataStream[String] = env.readTextFile("src\\\\main\\\\resources\\\\sensor.txt")
    //转换数据类型 string 类型转换成LiucfSensorReding,求最小值
    val ds = inputStream.map(r => 
      val arr = r.split(",")
      LiucfSensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
    )
    //输出到控制台
    ds.print()
    ds.addSink(new LiucfJDBCSink())
    env.execute("flink sink api:liucf jdbc sink ")
  



class LiucfJDBCSink() extends RichSinkFunction[LiucfSensorReding] 
  var conn:Connection = _
  var insertStmt: PreparedStatement=_
  var updateStmt:PreparedStatement=_

  override def open(parameters: Configuration): Unit = 
    conn = DriverManager.getConnection(
      "jdbc:mysql://192.168.109.151:3306/liucf_db?useSSL=false",
      "root",
      "Lcf#123456")
    insertStmt = conn.prepareStatement("insert into sensor_tb (sensor_id,`timestamp`,temperature) values (?,?,?)")
    updateStmt = conn.prepareStatement("update sensor_tb set `timestamp`=?,temperature=? where sensor_id=?")
  
  override def invoke(in: LiucfSensorReding): Unit = 
    //先更新,查到就更新
    updateStmt.setLong(1,in.timestamp)
    updateStmt.setDouble(2,in.temperature)
    updateStmt.setString(3,in.id)
    updateStmt.execute()
    //如果更新没有查到数据就插入
    if(updateStmt.getUpdateCount()==0)
      insertStmt.setString(1,in.id)
      insertStmt.setLong(2,in.timestamp)
      insertStmt.setDouble(3,in.temperature)
      insertStmt.execute()
    
  

  override def close(): Unit = 
    insertStmt.close()
    updateStmt.close()
    conn.close()
  

结果

 6.3 总结

① 自定义flink jdbc sink 类

LiucfJDBCSink 继承 RichSinkFunction

② 实现  invoke( ) 方法

数据写入动作在这里实现

③ 要在生命周期函数open()里创建jdbc 连接

这样减少jdbc 每次数据来建立连接对mysql的压力,提高性能

④ 要在生命周期函数close()里释放资源

⑤最后要使用自己定义的class

ds.addSink(new LiucfJDBCSink())

 

以上是关于10-flink-1.10.1- flink Sink api 输出算子的主要内容,如果未能解决你的问题,请参考以下文章

在一个 flink 作业中使用 collect() 和 env.execute()

Flink 1.7.1 无法使用 core-site.xml 对 s3a 进行身份验证

Allegro Sigrity SI和PCB SI有啥不同

无线数传模块SI4463SI4438SI4432方案无线通信比对

请问PCB设计中的SI设计和SI仿真有啥关系

超低功耗智能门锁Si522A/Si523/Si512--具有超低功耗自动载波侦测功能(ACD功能)的13.56MHz芯片