Flink---各种输出(Sink)
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---各种输出(Sink)相关的知识,希望对你有一定的参考价值。
文章目录
一、kafka
Java
import Flink.beans.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class sink1_kafka
public static void main(String[] args)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 kafka
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // 读取最新消息
// 创建 FlinkKafkaConsumer (主题,SimpleStringSchema,kafka配置)
DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));
// 转换操作
SingleOutputStreamOperator<String> result = dss.map(line ->
String[] split = line.split(",");
SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sr.toString();
);
// sink : 利用生产者再写入到新的主题中
DataStreamSink<String> flinkKafka = result.addSink(new FlinkKafkaProducer09<String>(
"192.168.159.100:9092","sensorOut",new SimpleStringSchema()));
try
env.execute("kafka-sink");
catch (Exception e)
e.printStackTrace();
Scala
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011, FlinkKafkaProducer010
import org.apache.kafka.clients.consumer.ConsumerConfig
object Sink1_Kafka
def main(args: Array[String]): Unit =
// 导入隐式类
import org.apache.flink.api.scala._
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val properties = new Properties
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.100:9092")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-3")
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
val kafkaSource = new FlinkKafkaConsumer011[String]("flinkKafka",new SimpleStringSchema(),properties)
val value: DataStream[String] = env.addSource(kafkaSource)
val sinkStream: DataStreamSink[String] = value.addSink(new FlinkKafkaProducer010[String]("192.168.159.100:9092","sensorOut",new SimpleStringSchema()))
env.execute("kafka-sink")
二、mysql
Java
【从kafka中读取,在写入到MySQL中】
import Flink.beans.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Properties;
public class sink2_Mysql
public static void main(String[] args)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 kafka
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro2");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// 创建消费者...
DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));
SingleOutputStreamOperator<SensorReading> result = dss.map(line ->
String[] split = line.split(",");
SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sr;
);
// 使用自定义方式处理
result.addSink(new MyJDBCSink());
try
env.execute("sink-mysql");
catch (Exception e)
e.printStackTrace();
// 自定义类 extends RichSinkFunction,本来是应该继承SinkFunction,
// 但是RichSinkFunction是其子类,拥有更多大方法
private static class MyJDBCSink extends RichSinkFunction<SensorReading>
Connection conn = null;
@Override
// open: 配置连接数据库等信息
public void open(Configuration parameters) throws Exception
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(
"jdbc:mysql://192.168.159.100:3306/flinkDemo",
"root",
"root"
);
@Override
// invoke: sql 处理
public void invoke(SensorReading value, Context context) throws Exception
String sql = "insert into sensor_temp values (?,?)";
PreparedStatement prep = conn.prepareStatement(sql);
PreparedStatement upprep = conn.prepareStatement("update sensor_temp set temp = ? where id =?");
upprep.setDouble(1,value.getTemperature());
upprep.setString(2,value.getId());
upprep.execute();
if(upprep.getUpdateCount()==0)
prep.setString(1,value.getId());
prep.setDouble(2,value.getTemperature());
prep.execute();
@Override
// close: 关闭资源
public void close() throws Exception
conn.close();
Scala
import java.sql
import java.sql.DriverManager, PreparedStatement
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction, SinkFunction
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.kafka.clients.consumer.ConsumerConfig
case class Sensors(id:String,timestamp:Long,tempature:Double) // 设置样例类用于存储
/*
* 从Kafka中读取消息,flink处理完后,并存放到Mysql
* */
object Sink2_Mysql
def main(args: Array[String]): Unit =
import org.apache.flink.api.scala._
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val properties = new Properties
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.100:9092")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-3")
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val kafkaSource = new FlinkKafkaConsumer011[String]("flinkKafka",new SimpleStringSchema(),properties)
val kafkaStream: DataStream[String] = env.addSource(kafkaSource)
val value: DataStream[Sensors] = kafkaStream.map(line =>
val strs: Array[String] = line.split(",")
val sensors = new Sensors(strs(0), strs(1).toLong, strs(2).toDouble)
sensors
)
value.addSink(new MySinkFunction())
env.execute("Sink-Mysql")
class MySinkFunction extends RichSinkFunction[Sensors]
var conn:sql.Connection = null;
override def open(parameters: Configuration): Unit =
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection("jdbc:mysql://192.168.159.100:3306/flinkDemo", "root", "root")
// 在 invoke 函数中进行sql 处理
override def invoke(value: Sensors, context: SinkFunction.Context[_]): Unit =
val sql = "insert into sensor1_temp values (?,?)"
val prep: PreparedStatement = conn.prepareStatement(sql)
val upprep: PreparedStatement = conn.prepareStatement("update sensor1_temp set temp = ? where id =?")
upprep.setDouble(1, value.tempature)
upprep.setString(2, value.id)
upprep.execute
if (upprep.getUpdateCount == 0)
prep.setString(1, value.id)
prep.setDouble(2, value.tempature)
prep.execute
override def close(): Unit =
conn.close()
三、Redis
package Flink.sink;
import Flink.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class sink3_Redis
public static void main(String[] args)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取文件
String filePath = "D:\\\\peixun\\\\soft\\\\projects\\\\Flink\\\\resource\\\\b.txt";
DataStreamSource<String> dataStreamSource = env.readTextFile(filePath);
// 转换操作
SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(line ->
String[] splits = line.split(",");
return new SensorReading(splits[0], Long.valueOf(splits[1]), Double.valueOf(splits[2]));
);
// 配置redis
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("192.168.184.156")
.setPort(6379)
.setDatabase(0)
.build();
// 放入addSink(config,RedisMapper)
map.addSink(new RedisSink<>(config, new RedisMapper<SensorReading>()
@Override
public RedisCommandDescription getCommandDescription() Flink---各种输出(Sink)
Flink系列文档-(YY06)-Flink编程API-Sink
10-flink-1.10.1- flink Sink api 输出算子