Flink---各种数据源(Source)

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---各种数据源(Source)相关的知识,希望对你有一定的参考价值。

文章目录

public class SensorReading 
    private String id;
    private Long timestamp;
    private Double temperature;
	
	// 这里 构造器、getter、setter、toString() 就不粘贴了

一、内存

public class Source1_Collection 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        List<SensorReading> list = new ArrayList<>();
        list.add(new SensorReading("sensor_1",System.currentTimeMillis(),37.5));
        list.add(new SensorReading("sensor_2",System.currentTimeMillis(),37.2));
        list.add(new SensorReading("sensor_3",System.currentTimeMillis(),37.4));
        list.add(new SensorReading("sensor_4",System.currentTimeMillis(),36.9));
        list.add(new SensorReading("sensor_5",System.currentTimeMillis(),36.8));
        list.add(new SensorReading("sensor_6",System.currentTimeMillis(),37.1));
        
        // 通过内存获取数据,当然是封装在对象中的
        DataStreamSource<SensorReading> dss = env.fromCollection(list);

        dss.print("sensor");

        try 
            env.execute("collectionSource");
         catch (Exception e) 
            e.printStackTrace();
        
    

二、文件

【a.txt 自己添加,内容随意】

public class Source2_File 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path = "D:\\\\peixun\\\\soft\\\\projects\\\\Flink\\\\resource\\\\a.txt";
        
        // 读文件方式
        DataStreamSource<String> stringDataStreamSource = env.readTextFile(path);

        stringDataStreamSource.print();

        try 
            env.execute("FileExample");
         catch (Exception e) 
            e.printStackTrace();
        
    

三、kafka

Java

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class Source3_Kafka 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);

		// kafka 配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
		
		 // 创建 FlinkKafkaConsumer (主题,SimpleStringSchema,kafka配置)
        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>("flinkKafka", new SimpleStringSchema(), properties);

		// 将创建好的 消费者放入 addSource
        DataStreamSource<String> dss = env.addSource(consumer);

        dss.print();

        try 
            env.execute("kafka-example");
         catch (Exception e) 
            e.printStackTrace();
        
    

Scala

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.kafka.clients.consumer.ConsumerConfig

object Sources_kafka 
  def main(args: Array[String]): Unit = 

    // 导入隐式类
    import org.apache.flink.api.scala._
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val properties = new Properties
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.100:9092")
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-3")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    val kafkaSource = new FlinkKafkaConsumer011[String]("flinkKafka",new SimpleStringSchema(),properties)

    val value: DataStream[String] = env.addSource(kafkaSource)

//    value.print("se")

    val s: DataStream[(String, Long, Double)] = value.map(x => 
      val strs: Array[String] = x.split(",")
      (strs(0), strs(1).toLong, strs(2).toDouble)
    )

    s.print("ka")
    env.execute("kafka-source")
  

四、自定义

import Flink.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class Source4_MySource 
    public static void main(String[] args) 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 利用 addSource 传入自定义类
        DataStreamSource<SensorReading> ssd = env.addSource(new MySensorSource());

        ssd.print();

        try 
            env.execute("MySource-example");
         catch (Exception e) 
            e.printStackTrace();
        
    

	// 自定义类 extends SourceFaction, 实现 run 和 cancel 方法
    private static class MySensorSource implements SourceFunction<SensorReading> 
        boolean flag = true;
        @Override
        public void run(SourceContext<SensorReading> sourceContext) throws Exception 
            while (flag)		// 使一直保持输入状态,模拟一直接受数据
                sourceContext.collect(new SensorReading("sensor_"+new Random().nextInt(10),System.currentTimeMillis(),new Random().nextInt(9)+30.0));
            
        

        @Override
        public void cancel() 
            flag = false;
        
    

Scala

import java.util.Random
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment

/*
*   自定义输入类型:SourceFunction 接口,重写 run 和 cancel 方法
* */

object Sources_MySource 
  def main(args: Array[String]): Unit = 
    import org.apache.flink.api.scala._
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val ds: DataStream[String] = env.addSource(new MySourceFunction())
    ds.print("MySource")

    env.execute("Source-MySource")
  

  // 内部类实现 SourceFunction 接口,重写 run 和 cancel 方法
  class MySourceFunction extends SourceFunction[String]

    var flag: Boolean = true;

    override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = 
      while (flag) sourceContext.collect("sensor_" + new Random().nextInt(10))
    

    override def cancel(): Unit = flag = false
  

以上是关于Flink---各种数据源(Source)的主要内容,如果未能解决你的问题,请参考以下文章

flink1.7自定义source实现

从0 到1Flink的成长之路

Apache Flink:使用Apache Kafka作为Sink的简单demo(数据结果存放地方)

7.FLINK Source基于集合基于文件基于Socket自定义Source--随机订单数量自定义Source自定义Source-MySQL

《从0到1学习Flink》—— Flink Data transformation(转换)

06-flink-1.10.1-flink source api