如何将火花流 DF 写入 Kafka 主题
Posted
技术标签:
【中文标题】如何将火花流 DF 写入 Kafka 主题【英文标题】:How to write spark streaming DF to Kafka topic 【发布时间】:2015-10-13 22:47:07 【问题描述】:我正在使用 Spark Streaming 处理两个 Kafka 队列之间的数据,但我似乎找不到从 Spark 在 Kafka 上写入的好方法。我试过这个:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach
case x: String =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
)
)
它可以按预期工作,但是在实际环境中为每条消息实例化一个新的 KafkaProducer 显然是不可行的,我正在尝试解决它。
我想为每个进程保留对单个实例的引用,并在需要发送消息时访问它。如何从 Spark Streaming 写入 Kafka?
【问题讨论】:
Spark 2.2 and above - Both read and write operations on Kafka possible 【参考方案1】:是的,不幸的是,Spark(1.x,2.x)并没有直接说明如何以有效的方式写入 Kafka。
我建议采用以下方法:
每个执行程序进程/JVM 使用(和重复使用)一个KafkaProducer
实例。
这是此方法的高级设置:
-
首先,您必须“包装”Kafka 的
KafkaProducer
,因为正如您所提到的,它不可序列化。包装它允许您将它“运送”给执行者。这里的关键思想是使用lazy val
,这样您就可以在第一次使用生产者之前延迟实例化生产者,这实际上是一种解决方法,因此您不必担心KafkaProducer
无法序列化。
您可以使用广播变量将包装的生产者“运送”到每个执行者。
在您的实际处理逻辑中,您通过广播变量访问包装的生产者,并使用它将处理结果写回 Kafka。
从 Spark 2.0 开始,以下代码 sn-ps 可与 Spark Streaming 一起使用。
第 1 步:包装 KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.KafkaProducer, ProducerRecord, RecordMetadata
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
object MySparkKafkaProducer
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] =
val createProducerFunc = () =>
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
producer
new MySparkKafkaProducer(createProducerFunc)
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
第 2 步:使用广播变量为每个执行程序提供自己的包装 KafkaProducer
实例
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext =
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] =
val kafkaProducerConfig =
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
第 3 步:从 Spark Streaming 写入 Kafka,重复使用相同的包装 KafkaProducer
实例(针对每个执行程序)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD rdd =>
rdd.foreachPartition partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map record =>
kafkaProducer.value.send("my-output-topic", record)
.toStream
metadata.foreach metadata => metadata.get()
希望这会有所帮助。
【讨论】:
如果我可能会问,如何在 Python 中实现这个想法,尤其是lazy
部分?
第三步:元数据有什么用处:Stream[Future[RecordMetadata]]。我认为我没有看到它在任何地方使用。它可以用来做什么?
元数据按字面意思在下一条语句中使用,它确保只有在当前分区的所有期货都完成后(参见 metadata.get())才处理下一个分区(通过 foreachPartition)。
感谢您的回复!我不确定如果我们不等待所有未来的分区完成处理会出现什么问题?如果我们需要等待每个分区先处理后再转到下一个分区,为什么还要使用未来?
嗨,广播延迟对象的任何理由。初始化 Spark 集群时,我不能为每个工作节点创建 MySparkKafkaProducer 对象吗??【参考方案2】:
我的第一个建议是尝试在 foreachPartition 中创建一个新实例,并衡量它是否足够快以满足您的需求(在 foreachPartition 中实例化重物是官方文档所建议的)。
另一种选择是使用对象池,如本例所示:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
然而,我发现在使用检查点时很难实现。
另一个对我来说运行良好的版本是下面博客文章中描述的工厂,您只需要检查它是否提供足够的并行度来满足您的需求(查看 cmets 部分):
http://allegro.tech/2015/08/spark-kafka-integration.html
【讨论】:
您在检查点方面遇到了什么问题? 如果我们使用固定数量的 RDD,foreachPartition
会很好,但在 Spark Streaming(我们有微批处理)中,RDD 和分区都是永久创建的。如何在 Spark Streaming 中规避这个问题?
请包含链接的内容,以便在它们破坏您的答案时仍然有价值。【参考方案3】:
使用 Spark >= 2.2
使用结构化流 API 在 Kafka 上可以进行读取和写入操作
从 Kafka 主题构建流
// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
.readStream // use `read` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
读取键和值并为两者应用架构,为简单起见,我们将它们都转换为 String
类型。
val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
由于dsStruc
具有架构,它接受所有SQL 类型的操作,如filter
、agg
、select
..等。
将流写入 Kafka 主题
dsStruc
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
更多configuration for Kafka integration to read or write
要添加到应用程序中的关键工件
"org.apache.spark" % "spark-core_2.11" % 2.2.0,
"org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
【讨论】:
只是想问一下,当你做结构化流的方式时,使用了多少生产者实例。生产者实例会被广播吗?喜欢this的第2步【参考方案4】:有一个由 Cloudera 维护的 Streaming Kafka Writer(实际上是从 Spark JIRA [1] 中分离出来的)。它基本上为每个分区创建一个生产者,它将创建“重”对象所花费的时间摊销到一个(希望是大的)元素集合上。
作者可以在这里找到:https://github.com/cloudera/spark-kafka-writer
【讨论】:
404 未找到该项目,已删除? github.com/cloudera/spark-kafka-writer 现在有github.com/BenFradet/spark-kafka-writer(同名,但不确定是不是同一个代码)【参考方案5】:我遇到了同样的问题,发现this post。
作者通过为每个执行者创建 1 个生产者来解决问题。他没有发送生产者本身,而是仅发送一个“配方”,如何通过广播在执行器中创建生产者。
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
他使用了一个懒惰地创建生产者的包装器:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
object KafkaSink
def apply(config: Map[String, Object]): KafkaSink =
val f = () =>
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook
producer.close()
producer
new KafkaSink(f)
包装器是可序列化的,因为 Kafka 生产者在第一次在执行器上使用之前被初始化。驱动程序保留对包装器的引用,包装器使用每个执行器的生产者发送消息:
dstream.foreachRDD rdd =>
rdd.foreach message =>
kafkaSink.value.send("topicName", message)
【讨论】:
是什么阻止我在我的 JAR 中拥有一个单例类,其中包含 kafka 生产者。这样,我不需要广播变量。仅拥有一个单例 KafkaSink 将确保每个执行程序有一个 KafkaSink,因为每个 JVM(也称为执行程序)将初始化一次单例。【参考方案6】:为什么不可行?从根本上说,每个 RDD 的每个分区都将独立运行(并且很可能在不同的集群节点上运行),因此您必须在每个分区的任务开始时重做连接(以及任何同步)。如果这样做的开销太高,那么您应该增加StreamingContext
中的批量大小,直到它变得可以接受(显然,这样做会产生延迟成本)。
(如果您不在每个分区中处理数千条消息,您确定需要 spark-streaming 吗?您会使用独立应用程序做得更好吗?)
【讨论】:
【参考方案7】:这可能是您想要做的。您基本上为每个记录分区创建一个生产者。
input.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
case x:String=>
println(x)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
)
)
希望有帮助
【讨论】:
新的 KafkaProducer 可以被广播并重新用于每个分区......因为它异步缓冲了它的优化方式。看到这个***.com/a/39539234/647053 step-2 @BdEngineer 我认为您只需要更好地了解 DataFrames。已经有一个答案提到了这一点。如果您可以尝试了解 Dataframe 的上下文并阅读每个分区等的 api 文档,您将建立联系。【参考方案8】:使用 Spark
由于没有直接的方式将消息从 Spark Streaming 写入 Kafka
创建一个 KafkaSinkWritter
import java.util.Properties
import org.apache.kafka.clients.producer._
import org.apache.spark.sql.ForeachWriter
class KafkaSink(topic:String, servers:String) extends ForeachWriter[(String, String)]
val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", servers)
kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val results = new scala.collection.mutable.HashMap[String, String]
var producer: KafkaProducer[String, String] = _
def open(partitionId: Long,version: Long): Boolean =
producer = new KafkaProducer(kafkaProperties)
true
def process(value: (String, String)): Unit =
producer.send(new ProducerRecord(topic, value._1 + ":" + value._2))
def close(errorOrNull: Throwable): Unit =
producer.close()
使用 SinkWriter 编写消息
val topic = "<topic2>"
val brokers = "<server:ip>"
val writer = new KafkaSink(topic, brokers)
val query =
streamingSelectDF
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()
参考link
【讨论】:
以上是关于如何将火花流 DF 写入 Kafka 主题的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 pyspark 将经过火花转换的数据写回 kafka 代理?