Flink 流处理 API_Sink

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 流处理 API_Sink相关的知识,希望对你有一定的参考价值。


Sink

Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。


  stream.addSink(new MySink(xxxx)) 

官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。   

Flink

 

1 Kafka

pom.xml



<dependency>

groupId>org.apache.flink</groupId>

artifactId>flink-connector-kafka-0.11_2.11</artifactId>



version>1.7.2</version>

</dependency>

主函数中添加 sink 具体代码

package com.imau.edu.sinkTest

import java.util.Properties

import com.imau.edu.Flink_StreamAPI.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011, FlinkKafkaProducer011


object KafkaSinkTest
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// source
// val inputStream = env.readTextFile("D:\\\\Projects\\\\BigData\\\\FlinkTutorial\\\\src\\\\main\\\\resources\\\\sensor.txt")
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

// Transform操作

val dataStream = inputStream
.map(
data =>
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 转成String方便序列化输出

)

// sink
dataStream.addSink(new FlinkKafkaProducer011[String]( "sinkTest", new SimpleStringSchema(), properties) )
dataStream.print()

env.execute("kafka sink test")

 

2 Redis

pom.xml



<dependency>

groupId>org.apache.bahir</groupId>

artifactId>flink-connector-redis_2.11</artifactId>

version>1.0</version>

</dependency>

定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令:

package com.imau.edu.sinkTest

import com.imau.edu.Flink_StreamAPI.SensorReading

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand, RedisCommandDescription, RedisMapper

object RedisSinkTest
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// source
val inputStream = env.readTextFile("F:\\\\IDEA-DATA\\\\Flink_Demo\\\\source.txt")

// transform
val dataStream = inputStream
.map(
data =>
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )

)

val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()

// sink
dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )

env.execute("redis sink test")



class MyRedisMapper() extends RedisMapper[SensorReading]

// 定义保存数据到redis的命令
override def getCommandDescription: RedisCommandDescription =
// 把传感器id和温度值保存成哈希表 HSET key field value
new RedisCommandDescription( RedisCommand.HSET, "sensor_temperature" )


// 定义保存到redis的value
override def getValueFromData(t: SensorReading): String = t.temperature.toString

// 定义保存到redis的key
override def getKeyFromData(t: SensorReading): String = t.id

 

3 Elasticsearch  

pom.xml



<dependency>

groupId>org.apache.flink</groupId>

artifactId>flink-connector-elasticsearch6_2.11</artifactId>

version>1.7.2</version>

</dependency>

具体代码

package com.imau.edu.sinkTest

import java.util

import com.imau.edu.Flink_StreamAPI.SensorReading


import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction, RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests


object EsSinkTest
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// source
val inputStream = env.readTextFile("D:\\\\Projects\\\\BigData\\\\FlinkTutorial\\\\src\\\\main\\\\resources\\\\sensor.txt")

// transform
val dataStream = inputStream
.map(
data =>
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )

)

val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))

// 创建一个esSink 的builder
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading]
override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit =
println("saving data: " + element)
// 包装成一个Map或者JsonObject
val json = new util.HashMap[String, String]()
json.put("sensor_id", element.id)
json.put("temperature", element.temperature.toString)
json.put("ts", element.timestamp.toString)

// 创建index request,准备发送数据
val indexRequest = Requests.indexRequest()
.index("sensor")
.`type`("readingdata")
.source(json)

// 利用index发送请求,写入数据
indexer.add(indexRequest)
println("data saved.")


)

// sink
dataStream.addSink( esSinkBuilder.build() )

env.execute("es sink test")

 



 

4 JDBC 自定义 sink



<dependency>

groupId>mysql</groupId>

artifactId>mysql-connector-java</artifactId>

version>5.1.44</version>

</dependency>

具体代码

package com.imau.edu.sinkTest


import java.sql.Connection, DriverManager, PreparedStatement

import com.imau.edu.Flink_StreamAPI.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction, SinkFunction
import org.apache.flink.streaming.api.scala._

object JdbcSinkTest
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// source
val inputStream = env.readTextFile("D:\\\\Projects\\\\BigData\\\\FlinkTutorial\\\\src\\\\main\\\\resources\\\\sensor.txt")

// transform
val dataStream = inputStream
.map(
data =>
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)

)

// sink
dataStream.addSink( new MyJdbcSink() )

env.execute("jdbc sink test")



class MyJdbcSink() extends RichSinkFunction[SensorReading]
// 定义sql连接、预编译器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _

// 初始化,创建连接和预编译语句
override def open(parameters: Configuration): Unit =
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?,?)")
updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")



// 调用连接,执行sql
override def invoke(value: SensorReading): Unit =
// 执行更新语句
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
// 如果update没有查到数据,那么执行插入语句
if( updateStmt.getUpdateCount == 0 )
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()



// 关闭时做清理工作
override def close(): Unit =
insertStmt.close()
updateStmt.close()
conn.close()

在 main 方法中增加,把明细保存到 mysql 中

 dataStream.addSink(new MyJdbcSink())

以上是关于Flink 流处理 API_Sink的主要内容,如果未能解决你的问题,请参考以下文章

Flink流处理的时间窗口

Flink学习之路Flink简介

什么是 Flink (流处理框架)

Flink流处理API大合集:掌握所有flink流处理API技术,看这一篇就够了

Flink流处理API大合集:掌握所有flink流处理API技术,看这一篇就够了

有状态流处理:Flink 状态后端