spark kafka producer 可序列化

Posted

技术标签:

【中文标题】spark kafka producer 可序列化【英文标题】:spark kafka producer serializable 【发布时间】:2017-03-22 21:55:42 【问题描述】:

我想出了一个例外:

ERROR yarn.ApplicationMaster:用户类抛出异常: org.apache.spark.SparkException:任务不可序列化 org.apache.spark.SparkException:任务不可序列化 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2032) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:889) 在 org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:888) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:306) 在 org.apache.spark.rdd.RDD.foreach(RDD.scala:888) 在 com.Boot$.test(Boot.scala:60) 在 com.Boot$.main(Boot.scala:36) 在 com.Boot.main(Boot.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525) 引起:java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer 序列化栈: - 对象不可序列化(类:org.apache.kafka.clients.producer.KafkaProducer,值: org.apache.kafka.clients.producer.KafkaProducer@77624599) - 字段(类:com.Boot$$anonfun$test$1,名称:producer$1,类型:类 org.apache.kafka.clients.producer.KafkaProducer) - org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 处的对象(com.Boot$$anonfun$test$1 类) 在 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) 在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

//    @transient
val sparkConf = new SparkConf()

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

//    @transient
val sc = new SparkContext(sparkConf)

val requestSet: RDD[String] = sc.textFile(s"hdfs:/user/bigdata/ADVERTISE-IMPRESSION-STAT*/*")

//    @transient
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, NearLineConfig.kafka_brokers)
//    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
//    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152")

//    @transient
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)

requestSet.foreachPartition((partisions: Iterator[String]) => 
  partisions.foreach((line: String) => 
    try 
      producer.send(new ProducerRecord[String, String]("testtopic", line))
     catch 
      case ex: Exception => 
        log.warn(ex.getMessage, ex)
      
    
  )
)

producer.close()

在这个程序中,我尝试从 hdfs 路径读取记录并将它们保存到 kafka。 问题是当我删除有关向 kafka 发送记录的代码时,它运行良好。 我错过了什么?

【问题讨论】:

【参考方案1】:

KafkaProducer 不可序列化。您需要将实例的创建移至 foreachPartition 内部:

requestSet.foreachPartition((partitions: Iterator[String]) => 
  val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
  partitions.foreach((line: String) => 
    try 
      producer.send(new ProducerRecord[String, String]("testtopic", line))
     catch 
      case ex: Exception => 
        log.warn(ex.getMessage, ex)
      
    
  )
)

请注意,KafkaProducer.send 返回一个 Future[RecordMetadata],如果无法序列​​化键或值,则唯一可以从它传播的异常是 SerializationException

【讨论】:

非常感谢。我按照你的方式更改了我的代码。它有效。 @Steven.Prgm 他提到了什么? 顺便说一句,我的一位同事提到了驱动变量和执行变量,它们之间有什么区别吗? @Steven.Prgm 您创建它的第一种方式,驱动程序必须将KafkaProducer 序列化给工作人员。我提出的方式,KafkaProducer为每个数据分区创建一次,代码在每个worker节点的executor内部运行。 所以,更像是驱动程序无法序列化 KafkaProducer 对象并传输给工作人员。但如果 KafkaProducer 是由 executor 创建的,则不需要考虑序列化和传输。 (如果我认为这样可以吗?)【参考方案2】:

我不推荐 Yuval Itzchakov 的答案,因为您打开并关闭了很多套接字,即使在使用 kafka 的代理中打开连接又重又慢,所以我强烈建议阅读此博客 https://allegro.tech/2015/08/spark-kafka-integration.html 我使用它并进行了测试它也是我投入生产环境的最佳选择。

【讨论】:

欢迎提供指向解决方案的链接,但请确保您的答案在没有它的情况下有用:add context around the link 这样您的其他用户就会知道它是什么以及它存在的原因,然后引用最相关的您链接到的页面的一部分,以防目标页面不可用。 Answers that are little more than a link may be deleted. 如果您的帖子难以阅读,因为缺少标点符号和含义完全分开的句子,请原谅我的夸大其词,同时删除空格。

以上是关于spark kafka producer 可序列化的主要内容,如果未能解决你的问题,请参考以下文章

使用 Avro 序列化器将 Spark Structured Streaming 数据发送到 Confluent Kafka

设计- Kafka Producer 可以写成 Spark-job 吗?

将 Spark 数据集转换为 JSON 并写入 Kafka Producer

kafka producer serializer序列化

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码

Kafka源码分析-序列2 -Producer