06-flink-1.10.1-flink source api
Posted 逃跑的沙丁鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了06-flink-1.10.1-flink source api相关的知识,希望对你有一定的参考价值。
目录
3.1 pom引入依赖flink-connector-kafka
1 flink 从集合中读取数据
1.1 代码
package com.study.liucf.bounded.api.source
import org.apache.flink.streaming.api.scala._
/**
* @Author liucf
* @Date 2021/9/5
*/
case class SensorReding(id:String,timeStamp:Long,temperature:Double)
object CollectionSource
def main(args: Array[String]): Unit =
//定义一个提供数据的List
val dataList = List(
SensorReding("sensor_1",1630851513,36.1),
SensorReding("sensor_2",1630851512,36.2),
SensorReding("sensor_3",1630851513,36.3),
SensorReding("sensor_4",1630851514,36.4),
SensorReding("sensor_5",1630851515,36.5),
)
//定义执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//读取集合数据源
val ds: DataStream[SensorReding] = env.fromCollection(dataList)
//输出结果到标准控制台
ds.print()
//启动执行器
env.execute("liucf collection source test")
1.2 执行结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
4> SensorReding(sensor_2,1630851512,36.2)
6> SensorReding(sensor_4,1630851514,36.4)
5> SensorReding(sensor_3,1630851513,36.3)
7> SensorReding(sensor_5,1630851515,36.5)
3> SensorReding(sensor_1,1630851513,36.1)
Process finished with exit code 0
2 fink 从文件中读取数据
2.1 数据文件
2.2 代码
package com.study.liucf.bounded.api.source
import org.apache.flink.streaming.api.scala._
/**
* @Author liucf
* @Date 2021/9/5
*/
object FileSource
def main(args: Array[String]): Unit =
//创建flink执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据
val ds = env.readTextFile("src\\\\main\\\\resources\\\\sensor.txt")
//输出结果到标准控制台
ds.print()
//启动flink执行
env.execute("liucf File source test")
2.3 输出结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
6> sensor_4,1630851514,36.4
5> sensor_3,1630851513,36.3
2> sensor_1,1630851513,36.1
3> sensor_2,1630851512,36.2
8> sensor_5,1630851515,36.5
Process finished with exit code 0
3 flink 从kafka读取数据
3.1 pom引入依赖flink-connector-kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.10.1</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
3.2 创建生产者
package com.study.liucf.kafka
import java.util.Properties
import org.apache.kafka.clients.producer.Callback, KafkaProducer, ProducerRecord, RecordMetadata
import org.apache.kafka.common.serialization.StringSerializer
object SensorProduce2
def main(args: Array[String]): Unit =
val kafkaProp = new Properties()
kafkaProp.put("bootstrap.servers", "192.168.189.151:9092")
kafkaProp.put("acks", "1")
kafkaProp.put("retries", "3")
//kafkaProp.put("batch.size", 16384)//16k
kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
kafkaProp.put("topic","sensor_input_csv")
val producer = new KafkaProducer[String, String](kafkaProp)
val sensor = "sensor_1,1617505482,36.6"
send(sensor,producer)
producer.close()
def send(str:String,producer: KafkaProducer[String, String]): Unit =
val record = new ProducerRecord[String, String]("sensor_input_csv", str )
producer.send(record, new Callback
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
if (metadata != null)
println("发送成功")
if (exception != null)
println("消息发送失败")
)
3.3 使用flink kafka api 读取数据
package com.study.liucf.unbounded.source
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* @Author liucf
* @Date 2021/9/7
*/
object KafkaSource
def main(args: Array[String]): Unit =
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置kafak配置项
val props = new Properties()
props.setProperty("bootstrap.servers","192.168.109.151:9092")
props.setProperty("topic","sensor_input_csv")
//添加kafka数据源
val ds: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor_input_csv",new SimpleStringSchema(),props))
//输出结果到标准控制台
ds.print()
//启动执行flink
env.execute("liucf kafka api test")
3.4 测试结果
生成数据
flink消费到数据
以上是关于06-flink-1.10.1-flink source api的主要内容,如果未能解决你的问题,请参考以下文章
Spoj-BOKAM143SOU Checking cubes.
pickerview didSelect 随机崩溃:__cfrunloop_is_calling_out_to_a_source1_perform1_function
SIGABRT 在 __CFRUNLOOP_IS_CALLING_OUT_TO_A_SOURCE0_PERFORM_FUNCTION__ 上崩溃
国产软件如何让人再次失望——!20824 mindspore1.3.0gpu version can not compile from source code, because openmpi sou
File contains parsing errors: file:///etc/yum.repos.d/docker-ce.repo[line 84]: docker-ce-nightly-sou