Flink---各种数据源(Source)
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---各种数据源(Source)相关的知识,希望对你有一定的参考价值。
public class SensorReading {
private String id;
private Long timestamp;
private Double temperature;
// 这里 构造器、getter、setter、toString() 就不粘贴了
}
一、内存
public class Source1_Collection {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<SensorReading> list = new ArrayList<>();
list.add(new SensorReading("sensor_1",System.currentTimeMillis(),37.5));
list.add(new SensorReading("sensor_2",System.currentTimeMillis(),37.2));
list.add(new SensorReading("sensor_3",System.currentTimeMillis(),37.4));
list.add(new SensorReading("sensor_4",System.currentTimeMillis(),36.9));
list.add(new SensorReading("sensor_5",System.currentTimeMillis(),36.8));
list.add(new SensorReading("sensor_6",System.currentTimeMillis(),37.1));
// 通过内存获取数据,当然是封装在对象中的
DataStreamSource<SensorReading> dss = env.fromCollection(list);
dss.print("sensor");
try {
env.execute("collectionSource");
} catch (Exception e) {
e.printStackTrace();
}
}
}
二、文件
【a.txt 自己添加,内容随意】
public class Source2_File {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String path = "D:\\\\peixun\\\\soft\\\\projects\\\\Flink\\\\resource\\\\a.txt";
// 读文件方式
DataStreamSource<String> stringDataStreamSource = env.readTextFile(path);
stringDataStreamSource.print();
try {
env.execute("FileExample");
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、kafka
Java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class Source3_Kafka {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
// kafka 配置
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
// 创建 FlinkKafkaConsumer (主题,SimpleStringSchema,kafka配置)
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>("flinkKafka", new SimpleStringSchema(), properties);
// 将创建好的 消费者放入 addSource
DataStreamSource<String> dss = env.addSource(consumer);
dss.print();
try {
env.execute("kafka-example");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Scala
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.kafka.clients.consumer.ConsumerConfig
object Sources_kafka {
def main(args: Array[String]): Unit = {
// 导入隐式类
import org.apache.flink.api.scala._
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val properties = new Properties
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.100:9092")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-3")
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val kafkaSource = new FlinkKafkaConsumer011[String]("flinkKafka",new SimpleStringSchema(),properties)
val value: DataStream[String] = env.addSource(kafkaSource)
// value.print("se")
val s: DataStream[(String, Long, Double)] = value.map(x => {
val strs: Array[String] = x.split(",")
(strs(0), strs(1).toLong, strs(2).toDouble)
})
s.print("ka")
env.execute("kafka-source")
}
}
四、自定义
import Flink.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class Source4_MySource {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 利用 addSource 传入自定义类
DataStreamSource<SensorReading> ssd = env.addSource(new MySensorSource());
ssd.print();
try {
env.execute("MySource-example");
} catch (Exception e) {
e.printStackTrace();
}
}
// 自定义类 extends SourceFaction, 实现 run 和 cancel 方法
private static class MySensorSource implements SourceFunction<SensorReading> {
boolean flag = true;
@Override
public void run(SourceContext<SensorReading> sourceContext) throws Exception {
while (flag){ // 使一直保持输入状态,模拟一直接受数据
sourceContext.collect(new SensorReading("sensor_"+new Random().nextInt(10),System.currentTimeMillis(),new Random().nextInt(9)+30.0));
}
}
@Override
public void cancel() {
flag = false;
}
}
}
Scala
import java.util.Random
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/*
* 自定义输入类型:SourceFunction 接口,重写 run 和 cancel 方法
* */
object Sources_MySource {
def main(args: Array[String]): Unit = {
import org.apache.flink.api.scala._
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val ds: DataStream[String] = env.addSource(new MySourceFunction())
ds.print("MySource")
env.execute("Source-MySource")
}
// 内部类实现 SourceFunction 接口,重写 run 和 cancel 方法
class MySourceFunction extends SourceFunction[String]{
var flag: Boolean = true;
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while (flag) sourceContext.collect("sensor_" + new Random().nextInt(10))
}
override def cancel(): Unit = {flag = false}
}
}
以上是关于Flink---各种数据源(Source)的主要内容,如果未能解决你的问题,请参考以下文章
Flink`textInputFormat`不处理来自aws S3`Source`文件系统的GZ压缩文件
06-flink-1.10.1-flink source api
Flink实战系列Flink 1.14.0 消费 kafka 数据自定义反序列化器
7.FLINK Source基于集合基于文件基于Socket自定义Source--随机订单数量自定义Source自定义Source-MySQL