使用 commitAsync 提交偏移量时出现 Kafka 异常
Posted
技术标签:
【中文标题】使用 commitAsync 提交偏移量时出现 Kafka 异常【英文标题】:Kafka Exception while Committing Offset Using commitAsync 【发布时间】:2019-01-26 23:41:45 【问题描述】:我的 Kafka 应用程序读取实时流数据,对其进行处理并存储到 Hive。我正在尝试使用 commitAsync
提交偏移量。
我遇到了以下异常:
引起:java.io.NotSerializableException:对象 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream 正在 序列化可能作为 RDD 操作关闭的一部分。这是 因为 DStream 对象是从内部引用的 关闭。请重写此 DStream 内部的 RDD 操作以避免 这。这已被强制执行,以避免 Spark 任务膨胀 不必要的对象。
以下是我的代码的工作流程:
public void method1(SparkConf conf,String app)
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>()
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd)
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>()
@Override
public String call(ConsumerRecord<String, String> tuple2) throws Exception
return tuple2.value();
);
records.foreachRDD(new VoidFunction<JavaRDD<String>>()
@Override
public void call(JavaRDD<String> rdd) throws Exception
if(!rdd.isEmpty())
methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
);
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
);
javaStreamContext.start();
javaStreamContext.awaitTermination();
感谢任何建议。
以下代码在数据处理后工作并提交偏移量。 但问题是,它在以下情况下处理重复项: 可以说-消费者作业正在运行,并且配置单元表有 0 条记录,当前偏移量是(FORMAT- fromOffest,untilOffset,Difference): 512 512 0 然后我产生了 1000 条记录,当它读取 34 条记录但没有提交时,我杀死了它 512 546 34
我看到此时,34 个记录已经加载到 Hive 表中
接下来,我重新启动了应用程序。 我看到它再次读取了 34 条记录(而不是读取 1000-34=76 条记录),尽管它已经处理了它们并加载到 Hive 512 1512 1000 然后几秒钟后它会更新。 1512 1512 0 Hive 现在有 (34+1000=1034)
这会导致表中出现重复记录(额外 34 条)。 如代码中所述,我仅在处理/加载到 Hive 表后才提交偏移量。
请提出建议。
public void method1(SparkConf conf,String app)
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>()
@Override
public String call(ConsumerRecord<String, String> tuple2) throws Exception
return tuple2.value();
);
records.foreachRDD(new VoidFunction<JavaRDD<String>>()
@Override
public void call(JavaRDD<String> rdd) throws Exception
if(!rdd.isEmpty())
methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
);
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>()
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd)
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
for (OffsetRange offset : offsetRanges)
System.out.println(offset.fromOffset() + " " + offset.untilOffset()+ " "+offset.count());
);
javaStreamContext.start();
javaStreamContext.awaitTermination();
【问题讨论】:
如果您只想将 Kafka 数据写入 Hive,那么我的建议是使用 Kafka Connect 而不是编写自己的 Spark 代码。如果您需要在消息到达 Hive 之前对其进行过滤/处理,然后将这些结果写回 Kafka,然后使用该主题的连接器。 但是目前我需要修复上面的 kafka 异常,因为有多个流式作业正在使用 CreateDirectStream API 在 Spark 上运行,并且不可能将它们全部转换为使用 Kafka Connect。感谢任何处理此异常的解决方案 好吧,阅读错误... DirectKafkaInputDStream 可能作为 RDD 操作关闭的一部分被序列化。这是因为 DStream 对象是从闭包中引用的 ...我看不到您的methodToSaveDataInHive
调用,但问题可能存在,因为您似乎正在这样做 spark.apache.org/docs/latest/…
methodToSaveDataInHive 只是我将 rdd 转换为 Dataset尝试移动 ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);超出 foreachRDD 块
public void method1(SparkConf conf,String app)
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>()
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd)
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>()
@Override
public String call(ConsumerRecord<String, String> tuple2) throws Exception
return tuple2.value();
);
records.foreachRDD(new VoidFunction<JavaRDD<String>>()
@Override
public void call(JavaRDD<String> rdd) throws Exception
if(!rdd.isEmpty())
methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
);
);
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
javaStreamContext.start();
javaStreamContext.awaitTermination();
【讨论】:
感谢 Rishi 的回复。在您的代码中,offsetRange 无法访问在块内初始化的变量。如果我在块外用 null 初始化,它会说“在封闭范围内定义的局部变量 offsetRanges 必须是最终的或实际上是最终的”。但是如果我在块外用 null 初始化为 final 并尝试用块内的实际值重新分配它,那么它将不允许。请指教 如果我必须将 commitAsync 带出块,请告诉我如何解决初始化 offsetRanges 的问题 @user1326784 我在 scala 中写过同样的程序,你绝对可以拿出来。 如果我使用 KafkaUtils.createDirectStream 读取消息,我可以使用 CommitSync 吗【参考方案2】:以下代码有效。 但我不确定这是否会在处理到 hive 后提交偏移量,因为 commitAsync 块在 hive 存储方法调用之前。
public void method1(SparkConf conf,String app)
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>()
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd)
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
);
JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>()
@Override
public String call(ConsumerRecord<String, String> tuple2) throws Exception
return tuple2.value();
);
records.foreachRDD(new VoidFunction<JavaRDD<String>>()
@Override
public void call(JavaRDD<String> rdd) throws Exception
if(!rdd.isEmpty())
methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
);
javaStreamContext.start();
javaStreamContext.awaitTermination();
在这段代码中,如果我添加下面的块(在初始化 offsetRanges 之后)来打印偏移详细信息,它不会再次工作,抛出相同的异常
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>()
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd)
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String,String>>>()
@Override
public void call(Iterator<org.apache.kafka.clients.consumer.ConsumerRecord<String,String>> arg0) throws Exception
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
);
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
);
请提供您的cmets
【讨论】:
以上是关于使用 commitAsync 提交偏移量时出现 Kafka 异常的主要内容,如果未能解决你的问题,请参考以下文章
如果我将偏移量设置为非零,则在使用 hyperslab 从 HDF5 文件中读取数据时出现异常
Kafka - 如何在使用高级消费者的每条消息后提交偏移量?