任何人都可以在 Scala 中分享 Flink Kafka 示例吗?

Posted

技术标签:

【中文标题】任何人都可以在 Scala 中分享 Flink Kafka 示例吗?【英文标题】:Can anyone share a Flink Kafka example in Scala? 【发布时间】:2015-10-05 10:28:35 【问题描述】:

任何人都可以在 Scala 中分享 Flink Kafka(主要从 Kafka 接收消息)的工作示例吗?我知道 Spark 中有一个 KafkaWordCount 示例。我只需要在 Flink 中打印出 Kafka 消息。真的很有帮助。

【问题讨论】:

这个问题怎么样***.com/questions/31419408/… 那里列出的代码主要用于编译工作。它不会产生和消费 Kafka 消息。我尝试添加“val stream = env.addSource(new KafkaSource[String](....)).addSink(new KafkaSink[String](...)),但它不起作用,运行时错误为”无法复制数据流接收器”,我无法通过谷歌搜索或查看 Flink 文档来弄清楚。 【参考方案1】:

与 Robert 添加的相反,下面是一段用于向 Kafka 主题发送消息的应用程序代码。

import org.apache.kafka.clients.producer.KafkaProducer, ProducerRecord

object KafkaProducer 

  def main(args: Array[String]): Unit = 
    KafkaProducer.sendMessageToKafkaTopic("localhost:9092", "topic_name")
      

  def sendMessageToKafkaTopic(server: String, topic:String): Unit = 
    val props = new Properties()
    props.put("bootstrap.servers", servers)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String,String](props)
    val record = new ProducerRecord[String,String](topic, "HELLO WORLD!")
    producer.send(record)
    producer.close()
  

【讨论】:

【参考方案2】:

以下代码展示了如何使用 Flink 的 Scala DataStream API 从 Kafka 主题中读取数据:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.streaming.util.serialization.SimpleStringSchema


object Main 
  def main(args: Array[String]) 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val stream = env
      .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
      .print

    env.execute("Flink Kafka Example")
  

【讨论】:

属性来自什么进口?我试图在 flink-scala shell 中复制它,但它找不到属性类型。 我是import java.util.Properties;

以上是关于任何人都可以在 Scala 中分享 Flink Kafka 示例吗?的主要内容,如果未能解决你的问题,请参考以下文章

在 Android 中分享一个词

让淘宝链接在微信中分享,GO

FLUME在最近的巡演回放中分享了自己的新音乐

在 Google Play 游戏中分享成就

Web 项目中分享到微博QQ空间等分享功能

如何在 FirefoxOS 上的社交网络中分享文本