Flink 流处理 API_Sink
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 流处理 API_Sink相关的知识,希望对你有一定的参考价值。
Sink
Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。
1 Kafka
pom.xml
<dependency>
groupId>org.apache.flink</groupId>
artifactId>flink-connector-kafka-0.11_2.11</artifactId>
version>1.7.2</version>
</dependency>
主函数中添加 sink 具体代码
package com.imau.edu.sinkTest
import java.util.Properties
import com.imau.edu.Flink_StreamAPI.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011, FlinkKafkaProducer011
object KafkaSinkTest
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
// val inputStream = env.readTextFile("D:\\\\Projects\\\\BigData\\\\FlinkTutorial\\\\src\\\\main\\\\resources\\\\sensor.txt")
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val inputStream = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
// Transform操作
val dataStream = inputStream
.map(
data =>
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 转成String方便序列化输出
)
// sink
dataStream.addSink(new FlinkKafkaProducer011[String]( "sinkTest", new SimpleStringSchema(), properties) )
dataStream.print()
env.execute("kafka sink test")
2 Redis
pom.xml
<dependency>
groupId>org.apache.bahir</groupId>
artifactId>flink-connector-redis_2.11</artifactId>
version>1.0</version>
</dependency>
定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令:
package com.imau.edu.sinkTest
import com.imau.edu.Flink_StreamAPI.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand, RedisCommandDescription, RedisMapper
object RedisSinkTest
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
val inputStream = env.readTextFile("F:\\\\IDEA-DATA\\\\Flink_Demo\\\\source.txt")
// transform
val dataStream = inputStream
.map(
data =>
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
)
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
// sink
dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )
env.execute("redis sink test")
class MyRedisMapper() extends RedisMapper[SensorReading]
// 定义保存数据到redis的命令
override def getCommandDescription: RedisCommandDescription =
// 把传感器id和温度值保存成哈希表 HSET key field value
new RedisCommandDescription( RedisCommand.HSET, "sensor_temperature" )
// 定义保存到redis的value
override def getValueFromData(t: SensorReading): String = t.temperature.toString
// 定义保存到redis的key
override def getKeyFromData(t: SensorReading): String = t.id
3 Elasticsearch
pom.xml
<dependency>
groupId>org.apache.flink</groupId>
artifactId>flink-connector-elasticsearch6_2.11</artifactId>
version>1.7.2</version>
</dependency>
具体代码
package com.imau.edu.sinkTest
import java.util
import com.imau.edu.Flink_StreamAPI.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction, RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object EsSinkTest
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
val inputStream = env.readTextFile("D:\\\\Projects\\\\BigData\\\\FlinkTutorial\\\\src\\\\main\\\\resources\\\\sensor.txt")
// transform
val dataStream = inputStream
.map(
data =>
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
)
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
// 创建一个esSink 的builder
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading]
override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit =
println("saving data: " + element)
// 包装成一个Map或者JsonObject
val json = new util.HashMap[String, String]()
json.put("sensor_id", element.id)
json.put("temperature", element.temperature.toString)
json.put("ts", element.timestamp.toString)
// 创建index request,准备发送数据
val indexRequest = Requests.indexRequest()
.index("sensor")
.`type`("readingdata")
.source(json)
// 利用index发送请求,写入数据
indexer.add(indexRequest)
println("data saved.")
)
// sink
dataStream.addSink( esSinkBuilder.build() )
env.execute("es sink test")
4 JDBC 自定义 sink
<dependency>
groupId>mysql</groupId>
artifactId>mysql-connector-java</artifactId>
version>5.1.44</version>
</dependency>
具体代码
package com.imau.edu.sinkTest
import java.sql.Connection, DriverManager, PreparedStatement
import com.imau.edu.Flink_StreamAPI.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction, SinkFunction
import org.apache.flink.streaming.api.scala._
object JdbcSinkTest
def main(args: Array[String]): Unit =
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
val inputStream = env.readTextFile("D:\\\\Projects\\\\BigData\\\\FlinkTutorial\\\\src\\\\main\\\\resources\\\\sensor.txt")
// transform
val dataStream = inputStream
.map(
data =>
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
)
// sink
dataStream.addSink( new MyJdbcSink() )
env.execute("jdbc sink test")
class MyJdbcSink() extends RichSinkFunction[SensorReading]
// 定义sql连接、预编译器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// 初始化,创建连接和预编译语句
override def open(parameters: Configuration): Unit =
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?,?)")
updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
// 调用连接,执行sql
override def invoke(value: SensorReading): Unit =
// 执行更新语句
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
// 如果update没有查到数据,那么执行插入语句
if( updateStmt.getUpdateCount == 0 )
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
// 关闭时做清理工作
override def close(): Unit =
insertStmt.close()
updateStmt.close()
conn.close()
在 main 方法中增加,把明细保存到 mysql 中
dataStream.addSink(new MyJdbcSink())
以上是关于Flink 流处理 API_Sink的主要内容,如果未能解决你的问题,请参考以下文章
Flink流处理API大合集:掌握所有flink流处理API技术,看这一篇就够了