06-flink-1.10.1-flink source api

Posted 逃跑的沙丁鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了06-flink-1.10.1-flink source api相关的知识,希望对你有一定的参考价值。

目录

1 flink 从集合中读取数据

1.1 代码

1.2 执行结果

2 fink 从文件中读取数据

2.1 数据文件

2.2 代码

2.3 输出结果

3 flink 从kafka读取数据

3.2 创建生产者

3.3 使用flink kafka api 读取数据

3.4 测试结果


1 flink 从集合中读取数据

1.1 代码

package com.study.liucf.bounded.api.source

import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/9/5
 */

case class SensorReding(id:String,timeStamp:Long,temperature:Double)
object CollectionSource 
  def main(args: Array[String]): Unit = 
    //定义一个提供数据的List
    val  dataList = List(
      SensorReding("sensor_1",1630851513,36.1),
      SensorReding("sensor_2",1630851512,36.2),
      SensorReding("sensor_3",1630851513,36.3),
      SensorReding("sensor_4",1630851514,36.4),
      SensorReding("sensor_5",1630851515,36.5),
    )

    //定义执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //读取集合数据源
    val ds: DataStream[SensorReding] = env.fromCollection(dataList)
    //输出结果到标准控制台
    ds.print()
    //启动执行器
    env.execute("liucf collection source test")

  


1.2 执行结果

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
4> SensorReding(sensor_2,1630851512,36.2)
6> SensorReding(sensor_4,1630851514,36.4)
5> SensorReding(sensor_3,1630851513,36.3)
7> SensorReding(sensor_5,1630851515,36.5)
3> SensorReding(sensor_1,1630851513,36.1)

Process finished with exit code 0

2 fink 从文件中读取数据

2.1 数据文件

2.2 代码

package com.study.liucf.bounded.api.source

import org.apache.flink.streaming.api.scala._

/**
 * @Author liucf
 * @Date 2021/9/5
 */
object FileSource 
  def main(args: Array[String]): Unit = 
     //创建flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据
    val ds = env.readTextFile("src\\\\main\\\\resources\\\\sensor.txt")
    //输出结果到标准控制台
    ds.print()
    //启动flink执行
    env.execute("liucf File source test")
  

2.3 输出结果

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
6> sensor_4,1630851514,36.4
5> sensor_3,1630851513,36.3
2> sensor_1,1630851513,36.1
3> sensor_2,1630851512,36.2
8> sensor_5,1630851515,36.5

Process finished with exit code 0

3 flink 从kafka读取数据

       <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.10.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

3.2 创建生产者

package com.study.liucf.kafka

import java.util.Properties

import org.apache.kafka.clients.producer.Callback, KafkaProducer, ProducerRecord, RecordMetadata
import org.apache.kafka.common.serialization.StringSerializer

object SensorProduce2 
  def main(args: Array[String]): Unit = 
    val kafkaProp = new Properties()
    kafkaProp.put("bootstrap.servers", "192.168.189.151:9092")
    kafkaProp.put("acks", "1")
    kafkaProp.put("retries", "3")
    //kafkaProp.put("batch.size", 16384)//16k
    kafkaProp.put("key.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("value.serializer", classOf[StringSerializer].getName)
    kafkaProp.put("topic","sensor_input_csv")
    val producer = new KafkaProducer[String, String](kafkaProp)
    val sensor = "sensor_1,1617505482,36.6"
    send(sensor,producer)
    producer.close()
  


  def send(str:String,producer: KafkaProducer[String, String]): Unit =
    val record = new ProducerRecord[String, String]("sensor_input_csv", str )
    producer.send(record, new Callback 
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = 
        if (metadata != null) 
          println("发送成功")
        
        if (exception != null) 
          println("消息发送失败")
        
      
    )
  

3.3 使用flink kafka api 读取数据

package com.study.liucf.unbounded.source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

/**
 * @Author liucf
 * @Date 2021/9/7
 */
object KafkaSource 
  def main(args: Array[String]): Unit = 
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置kafak配置项
    val props = new Properties()
    props.setProperty("bootstrap.servers","192.168.109.151:9092")
    props.setProperty("topic","sensor_input_csv")
    //添加kafka数据源
    val ds: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor_input_csv",new SimpleStringSchema(),props))
    //输出结果到标准控制台
    ds.print()
    //启动执行flink
    env.execute("liucf kafka api test")

  

3.4 测试结果

生成数据

flink消费到数据

以上是关于06-flink-1.10.1-flink source api的主要内容,如果未能解决你的问题,请参考以下文章

Spoj-BOKAM143SOU Checking cubes.

一道很简单的题目--不同的写法

pickerview didSelect 随机崩溃:__cfrunloop_is_calling_out_to_a_source1_perform1_function

SIGABRT 在 __CFRUNLOOP_IS_CALLING_OUT_TO_A_SOURCE0_PERFORM_FUNCTION__ 上崩溃

国产软件如何让人再次失望——!20824 mindspore1.3.0gpu version can not compile from source code, because openmpi sou

File contains parsing errors: file:///etc/yum.repos.d/docker-ce.repo[line 84]: docker-ce-nightly-sou