Flink---各种输出(Sink)

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---各种输出(Sink)相关的知识,希望对你有一定的参考价值。

文章目录

一、kafka

Java

import Flink.beans.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class sink1_kafka 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		// 配置 kafka
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");	// 读取最新消息

		// 创建 FlinkKafkaConsumer (主题,SimpleStringSchema,kafka配置)
        DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));

		// 转换操作
        SingleOutputStreamOperator<String> result = dss.map(line -> 
            String[] split = line.split(",");
            SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sr.toString();
        );

		// sink : 利用生产者再写入到新的主题中
        DataStreamSink<String> flinkKafka = result.addSink(new FlinkKafkaProducer09<String>(
                "192.168.159.100:9092","sensorOut",new SimpleStringSchema()));
        
        try 
            env.execute("kafka-sink");
         catch (Exception e) 
            e.printStackTrace();
        
    

Scala

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011, FlinkKafkaProducer010
import org.apache.kafka.clients.consumer.ConsumerConfig

object Sink1_Kafka 
  def main(args: Array[String]): Unit = 
	// 导入隐式类
    import org.apache.flink.api.scala._
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val properties = new Properties
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.100:9092")
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-3")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

    val kafkaSource = new FlinkKafkaConsumer011[String]("flinkKafka",new SimpleStringSchema(),properties)

    val value: DataStream[String] = env.addSource(kafkaSource)

    val sinkStream: DataStreamSink[String] = value.addSink(new FlinkKafkaProducer010[String]("192.168.159.100:9092","sensorOut",new SimpleStringSchema()))

    env.execute("kafka-sink")
  

二、mysql

Java
【从kafka中读取,在写入到MySQL中】

import Flink.beans.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Properties;

public class sink2_Mysql 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		// 配置 kafka
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro2");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

		// 创建消费者...
        DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<SensorReading> result = dss.map(line -> 
            String[] split = line.split(",");
            SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
            return sr;
        );
		
		// 使用自定义方式处理
        result.addSink(new MyJDBCSink());

        try 
            env.execute("sink-mysql");
         catch (Exception e) 
            e.printStackTrace();
        
    

	// 自定义类 extends RichSinkFunction,本来是应该继承SinkFunction,
	// 但是RichSinkFunction是其子类,拥有更多大方法
    private static class MyJDBCSink extends RichSinkFunction<SensorReading> 
        Connection conn = null;

        @Override
        // open: 配置连接数据库等信息
        public void open(Configuration parameters) throws Exception 
            Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection(
              "jdbc:mysql://192.168.159.100:3306/flinkDemo",
              "root",
              "root"
            );
        

        @Override
        // invoke: sql 处理
        public void invoke(SensorReading value, Context context) throws Exception 
            String sql = "insert into sensor_temp values (?,?)";
            PreparedStatement prep = conn.prepareStatement(sql);
            PreparedStatement upprep = conn.prepareStatement("update sensor_temp set temp = ? where id =?");
            upprep.setDouble(1,value.getTemperature());
            upprep.setString(2,value.getId());
            upprep.execute();
            if(upprep.getUpdateCount()==0)
                prep.setString(1,value.getId());
                prep.setDouble(2,value.getTemperature());
                prep.execute();
            
        

        @Override
        // close: 关闭资源
        public void close() throws Exception 
            conn.close();
        
    

Scala

import java.sql
import java.sql.DriverManager, PreparedStatement
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction, SinkFunction
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.kafka.clients.consumer.ConsumerConfig

case class Sensors(id:String,timestamp:Long,tempature:Double)  // 设置样例类用于存储

/*
*   从Kafka中读取消息,flink处理完后,并存放到Mysql
* */

object Sink2_Mysql 
  def main(args: Array[String]): Unit = 
    import org.apache.flink.api.scala._

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val properties = new Properties
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.100:9092")
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-3")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val kafkaSource = new FlinkKafkaConsumer011[String]("flinkKafka",new SimpleStringSchema(),properties)

    val kafkaStream: DataStream[String] = env.addSource(kafkaSource)

    val value: DataStream[Sensors] = kafkaStream.map(line => 
      val strs: Array[String] = line.split(",")
      val sensors = new Sensors(strs(0), strs(1).toLong, strs(2).toDouble)
      sensors
    )

    value.addSink(new MySinkFunction())
    env.execute("Sink-Mysql")
  

  class MySinkFunction extends RichSinkFunction[Sensors]
    var conn:sql.Connection = null;
    override def open(parameters: Configuration): Unit = 
      Class.forName("com.mysql.jdbc.Driver")
      conn = DriverManager.getConnection("jdbc:mysql://192.168.159.100:3306/flinkDemo", "root", "root")
    

	// 在 invoke 函数中进行sql 处理
    override def invoke(value: Sensors, context: SinkFunction.Context[_]): Unit = 
      val sql = "insert into sensor1_temp values (?,?)"
      val prep: PreparedStatement = conn.prepareStatement(sql)
      val upprep: PreparedStatement = conn.prepareStatement("update sensor1_temp set temp = ? where id =?")
      upprep.setDouble(1, value.tempature)
      upprep.setString(2, value.id)
      upprep.execute
      if (upprep.getUpdateCount == 0) 
        prep.setString(1, value.id)
        prep.setDouble(2, value.tempature)
        prep.execute
      
    

    override def close(): Unit = 
      conn.close()
    
  

三、Redis

package Flink.sink;

import Flink.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class sink3_Redis 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        // 读取文件
        String filePath = "D:\\\\peixun\\\\soft\\\\projects\\\\Flink\\\\resource\\\\b.txt";
        DataStreamSource<String> dataStreamSource = env.readTextFile(filePath);
		
		// 转换操作
        SingleOutputStreamOperator<SensorReading> map = dataStreamSource.map(line -> 
            String[] splits = line.split(",");
            return new SensorReading(splits[0], Long.valueOf(splits[1]), Double.valueOf(splits[2]));
        );

        // 配置redis
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("192.168.184.156")
                .setPort(6379)
                .setDatabase(0)
                .build();

        // 放入addSink(config,RedisMapper)
        map.addSink(new RedisSink<>(config, new RedisMapper<SensorReading>() 
            @Override
            public RedisCommandDescription getCommandDescription() Flink---各种输出(Sink)

Flink系列文档-(YY06)-Flink编程API-Sink

10-flink-1.10.1- flink Sink api 输出算子

10-flink-1.10.1- flink Sink api 输出算子

Flink-输出算子(Sink)使用

Apache Flink:使用Apache Kafka作为Sink的简单demo(数据结果存放地方)