使用 commitAsync 提交偏移量时出现 Kafka 异常

Posted

技术标签:

【中文标题】使用 commitAsync 提交偏移量时出现 Kafka 异常【英文标题】:Kafka Exception while Committing Offset Using commitAsync 【发布时间】:2019-01-26 23:41:45 【问题描述】:

我的 Kafka 应用程序读取实时流数据,对其进行处理并存储到 Hive。我正在尝试使用 commitAsync 提交偏移量。 我遇到了以下异常:

引起:java.io.NotSerializableException:对象 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream 正在 序列化可能作为 RDD 操作关闭的一部分。这是 因为 DStream 对象是从内部引用的 关闭。请重写此 DStream 内部的 RDD 操作以避免 这。这已被强制执行,以避免 Spark 任务膨胀 不必要的对象。

以下是我的代码的工作流程:

public void method1(SparkConf conf,String app) 
    spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
    final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
                new Duration(<spark duration>));
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
    messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() 
        @Override
        public void call(JavaRDD<ConsumerRecord<String, String>> rdd) 
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() 
                    @Override
                    public String call(ConsumerRecord<String, String> tuple2) throws Exception 
                        return tuple2.value();
                    
                );

                records.foreachRDD(new VoidFunction<JavaRDD<String>>() 
                    @Override
                    public void call(JavaRDD<String> rdd) throws Exception 
                        if(!rdd.isEmpty()) 
                            methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                        
                    
                 );
                ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
        
    );
    javaStreamContext.start();
    javaStreamContext.awaitTermination();

感谢任何建议。


以下代码在数据处理后工作并提交偏移量。 但问题是,它在以下情况下处理重复项: 可以说-消费者作业正在运行,并且配置单元表有 0 条记录,当前偏移量是(FORMAT- fromOffest,untilOffset,Difference): 512 512 0 然后我产生了 1000 条记录,当它读取 34 条记录但没有提交时,我杀死了它 512 546 34

我看到此时,34 个记录已经加载到 Hive 表中

接下来,我重新启动了应用程序。 我看到它再次读取了 34 条记录(而不是读取 1000-34=76 条记录),尽管它已经处理了它们并加载到 Hive 512 1512 1000 然后几秒钟后它会更新。 1512 1512 0 Hive 现在有 (34+1000=1034)

这会导致表中出现重复记录(额外 34 条)。 如代码中所述,我仅在处理/加载到 Hive 表后才提交偏移量。

请提出建议。

public void method1(SparkConf conf,String app) 
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
            new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));

            JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() 
                @Override
                public String call(ConsumerRecord<String, String> tuple2) throws Exception 
                    return tuple2.value();
                
            );

            records.foreachRDD(new VoidFunction<JavaRDD<String>>() 
                @Override
                public void call(JavaRDD<String> rdd) throws Exception 
                    if(!rdd.isEmpty()) 
                        methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                    
                
             );

             messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() 
              @Override
              public void call(JavaRDD<ConsumerRecord<String, String>> rdd) 
                    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                    ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);                     
                    for (OffsetRange offset : offsetRanges) 
                        System.out.println(offset.fromOffset() + " " + offset.untilOffset()+ "  "+offset.count());
                    
                     
              );             
javaStreamContext.start();
javaStreamContext.awaitTermination();

【问题讨论】:

如果您只想将 Kafka 数据写入 Hive,那么我的建议是使用 Kafka Connect 而不是编写自己的 Spark 代码。如果您需要在消息到达 Hive 之前对其进行过滤/处理,然后将这些结果写回 Kafka,然后使用该主题的连接器。 但是目前我需要修复上面的 kafka 异常,因为有多个流式作业正在使用 CreateDirectStream API 在 Spark 上运行,并且不可能将它们全部转换为使用 Kafka Connect。感谢任何处理此异常的解决方案 好吧,阅读错误... DirectKafkaInputDStream 可能作为 RDD 操作关闭的一部分被序列化。这是因为 DStream 对象是从闭包中引用的 ...我看不到您的 methodToSaveDataInHive 调用,但问题可能存在,因为您似乎正在这样做 spark.apache.org/docs/latest/… methodToSaveDataInHive 只是我将 rdd 转换为 Dataset 并写入 hive 表的一种方法 【参考方案1】:

尝试移动 ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);超出 foreachRDD 块

public void method1(SparkConf conf,String app) 
    spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
    final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
                new Duration(<spark duration>));
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
    messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() 
        @Override
        public void call(JavaRDD<ConsumerRecord<String, String>> rdd) 
                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() 
                    @Override
                    public String call(ConsumerRecord<String, String> tuple2) throws Exception 
                        return tuple2.value();
                    
                );

                records.foreachRDD(new VoidFunction<JavaRDD<String>>() 
                    @Override
                    public void call(JavaRDD<String> rdd) throws Exception 
                        if(!rdd.isEmpty()) 
                            methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                        
                    
                 );
        
    );
     ((CanCommitOffsets)  messages.inputDStream()).commitAsync(offsetRanges);
    javaStreamContext.start();
    javaStreamContext.awaitTermination();

【讨论】:

感谢 Rishi 的回复。在您的代码中,offsetRange 无法访问在块内初始化的变量。如果我在块外用 null 初始化,它会说“在封闭范围内定义的局部变量 offsetRanges 必须是最终的或实际上是最终的”。但是如果我在块外用 null 初始化为 final 并尝试用块内的实际值重新分配它,那么它将不允许。请指教 如果我必须将 commitAsync 带出块,请告诉我如何解决初始化 offsetRanges 的问题 @user1326784 我在 scala 中写过同样的程序,你绝对可以拿出来。 如果我使用 KafkaUtils.createDirectStream 读取消息,我可以使用 CommitSync 吗【参考方案2】:

以下代码有效。 但我不确定这是否会在处理到 hive 后提交偏移量,因为 commitAsync 块在 hive 存储方法调用之前。

public void method1(SparkConf conf,String app) 
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
            new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(javaStreamContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() 
    @Override
    public void call(JavaRDD<ConsumerRecord<String, String>> rdd) 
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
    
);
            JavaDStream<String> records = messages.map(new Function<ConsumerRecord<String, String>, String>() 
                @Override
                public String call(ConsumerRecord<String, String> tuple2) throws Exception 
                    return tuple2.value();
                
            );

            records.foreachRDD(new VoidFunction<JavaRDD<String>>() 
                @Override
                public void call(JavaRDD<String> rdd) throws Exception 
                    if(!rdd.isEmpty()) 
                        methodToSaveDataInHive(rdd, <StructTypeSchema>,<OtherParams>);
                    
                
             );

javaStreamContext.start();
javaStreamContext.awaitTermination();

在这段代码中,如果我添加下面的块(在初始化 offsetRanges 之后)来打印偏移详细信息,它不会再次工作,抛出相同的异常

messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() 
              @Override
              public void call(JavaRDD<ConsumerRecord<String, String>> rdd) 


                OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

               rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String,String>>>() 
                   @Override
                   public void call(Iterator<org.apache.kafka.clients.consumer.ConsumerRecord<String,String>> arg0) throws Exception 

                   OffsetRange o = offsetRanges[TaskContext.get().partitionId()];

                   System.out.println(o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
                   
            );

                ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);

              
              );

请提供您的cmets

【讨论】:

以上是关于使用 commitAsync 提交偏移量时出现 Kafka 异常的主要内容,如果未能解决你的问题,请参考以下文章

如果我将偏移量设置为非零,则在使用 hyperslab 从 HDF5 文件中读取数据时出现异常

未定义的偏移量:1 laravel

Kafka - 如何在使用高级消费者的每条消息后提交偏移量?

尝试访问 bool PHP 类型值的数组偏移量 [重复]

Kafka consumerGroup 丢失了所有分区中提交的偏移量信息,并从头开始消费偏移量

警告:非法字符串偏移量'city'Symfony表单搜索