大数据之发送到卡夫卡

Posted 潇洒哥浩浩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据之发送到卡夫卡相关的知识,希望对你有一定的参考价值。

package com.sjw.flink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

object KafkaSink {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val stream: DataStream[String] = env.readTextFile("src/main/resources/sensor.txt")

val dataDS: DataStream[String] = stream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString
})
dataDS.addSink(new FlinkKafkaProducer011[String]("sunjunwei1.com:9092","sensor",new SimpleStringSchema()))

 

env.execute("kafka sink")
}
}

以上是关于大数据之发送到卡夫卡的主要内容,如果未能解决你的问题,请参考以下文章

大数据开发之词频统计传参打包成jar包发送到Hadoop运行并创建可执行文件方便运行

精彩解密大数据之精绝古城

大数据架构之:Flume

大数据集群组件之Flume的配置及其使用

大数据框架开发基础之Zookeeper入门

大数据之Redis:Redis的发布和订阅