Flink---各种数据源(Source)
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---各种数据源(Source)相关的知识,希望对你有一定的参考价值。
文章目录
public class SensorReading
private String id;
private Long timestamp;
private Double temperature;
// 这里 构造器、getter、setter、toString() 就不粘贴了
一、内存
public class Source1_Collection
public static void main(String[] args)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<SensorReading> list = new ArrayList<>();
list.add(new SensorReading("sensor_1",System.currentTimeMillis(),37.5));
list.add(new SensorReading("sensor_2",System.currentTimeMillis(),37.2));
list.add(new SensorReading("sensor_3",System.currentTimeMillis(),37.4));
list.add(new SensorReading("sensor_4",System.currentTimeMillis(),36.9));
list.add(new SensorReading("sensor_5",System.currentTimeMillis(),36.8));
list.add(new SensorReading("sensor_6",System.currentTimeMillis(),37.1));
// 通过内存获取数据,当然是封装在对象中的
DataStreamSource<SensorReading> dss = env.fromCollection(list);
dss.print("sensor");
try
env.execute("collectionSource");
catch (Exception e)
e.printStackTrace();
二、文件
【a.txt 自己添加,内容随意】
public class Source2_File
public static void main(String[] args)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path = "D:\\\\peixun\\\\soft\\\\projects\\\\Flink\\\\resource\\\\a.txt";
// 读文件方式
DataStreamSource<String> stringDataStreamSource = env.readTextFile(path);
stringDataStreamSource.print();
try
env.execute("FileExample");
catch (Exception e)
e.printStackTrace();
三、kafka
Java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class Source3_Kafka
public static void main(String[] args)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
// kafka 配置
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// 创建 FlinkKafkaConsumer (主题,SimpleStringSchema,kafka配置)
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>("flinkKafka", new SimpleStringSchema(), properties);
// 将创建好的 消费者放入 addSource
DataStreamSource<String> dss = env.addSource(consumer);
dss.print();
try
env.execute("kafka-example");
catch (Exception e)
e.printStackTrace();
Scala
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.kafka.clients.consumer.ConsumerConfig
object Sources_kafka
def main(args: Array[String]): Unit =
// 导入隐式类
import org.apache.flink.api.scala._
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val properties = new Properties
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.100:9092")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-3")
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val kafkaSource = new FlinkKafkaConsumer011[String]("flinkKafka",new SimpleStringSchema(),properties)
val value: DataStream[String] = env.addSource(kafkaSource)
// value.print("se")
val s: DataStream[(String, Long, Double)] = value.map(x =>
val strs: Array[String] = x.split(",")
(strs(0), strs(1).toLong, strs(2).toDouble)
)
s.print("ka")
env.execute("kafka-source")
四、自定义
import Flink.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class Source4_MySource
public static void main(String[] args)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 利用 addSource 传入自定义类
DataStreamSource<SensorReading> ssd = env.addSource(new MySensorSource());
ssd.print();
try
env.execute("MySource-example");
catch (Exception e)
e.printStackTrace();
// 自定义类 extends SourceFaction, 实现 run 和 cancel 方法
private static class MySensorSource implements SourceFunction<SensorReading>
boolean flag = true;
@Override
public void run(SourceContext<SensorReading> sourceContext) throws Exception
while (flag) // 使一直保持输入状态,模拟一直接受数据
sourceContext.collect(new SensorReading("sensor_"+new Random().nextInt(10),System.currentTimeMillis(),new Random().nextInt(9)+30.0));
@Override
public void cancel()
flag = false;
Scala
import java.util.Random
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.DataStream, StreamExecutionEnvironment
/*
* 自定义输入类型:SourceFunction 接口,重写 run 和 cancel 方法
* */
object Sources_MySource
def main(args: Array[String]): Unit =
import org.apache.flink.api.scala._
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val ds: DataStream[String] = env.addSource(new MySourceFunction())
ds.print("MySource")
env.execute("Source-MySource")
// 内部类实现 SourceFunction 接口,重写 run 和 cancel 方法
class MySourceFunction extends SourceFunction[String]
var flag: Boolean = true;
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit =
while (flag) sourceContext.collect("sensor_" + new Random().nextInt(10))
override def cancel(): Unit = flag = false
以上是关于Flink---各种数据源(Source)的主要内容,如果未能解决你的问题,请参考以下文章
Apache Flink:使用Apache Kafka作为Sink的简单demo(数据结果存放地方)
7.FLINK Source基于集合基于文件基于Socket自定义Source--随机订单数量自定义Source自定义Source-MySQL