如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?

Posted

技术标签:

【中文标题】如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?【英文标题】:How to fetch offset id while consuming Kafka from Spark, save it in Cassandra and use it to restart Kafka? 【发布时间】:2017-01-03 05:07:45 【问题描述】:

我正在使用 Spark 使用来自 Kafka 的数据并将其保存在 Cassandra 中。我的程序是用 Java 编写的。我正在使用spark-streaming-kafka_2.10:1.6.2 lib 来完成此操作。我的代码是:

SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String,String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", App.GROUP);
JavaPairReceiverInputDStream<String, EventLog> messages =
  KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() 
    @Override
    public EventLog call(Tuple2<String, EventLog> tuple2) 
        return tuple2._2();
    
);
lines.foreachRDD(rdd -> 
    javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra();
);
jssc.start();

在我的 Cassandra 表 event_log 中,有一个名为 offsetid 的列来存储流的偏移 ID。如何获取偏移 id,直到该流读取 Kafka 流并将其存储在 Cassandra 中?

在 Cassandra 中保存后,我想使用最新的偏移 id,以便在 Spark 再次启动时使用。我该怎么做?

【问题讨论】:

【参考方案1】:

以下是参考代码,您可能需要根据需要更改内容。我对代码和方法所做的是为 Cassandra 中的每个主题维护 Kafka 分区明智的偏移量(这可以在 zookeeper 中完成,也可以作为使用其 java api 的建议)。在 EventLog 表中存储或更新接收到的每条字符串消息的主题的最新偏移范围。所以总是从表中检索并查看是否存在,然后从该偏移量创建直接流,否则为新的直接流。

package com.spark;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;

import scala.Tuple2;

public class KafkaChannelFetchOffset 
    public static void main(String[] args) 
        String topicName = "topicName";
        SparkConf sparkConf = new SparkConf().setAppName("name");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topicName));
        HashMap<TopicAndPartition, Long> kafkaTopicPartition = new HashMap<TopicAndPartition, Long>();
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("zookeeper.connect", "127.0.0.1");
        kafkaParams.put("group.id", "GROUP");
        kafkaParams.put("metadata.broker.list", "127.0.0.1");
        List<EventLog> eventLogList = javaFunctions(jssc).cassandraTable("test", "event_log", mapRowTo(EventLog.class))
                .select("topicName", "partion", "fromOffset", "untilOffset").where("topicName=?", topicName).collect();
        JavaDStream<String> kafkaOutStream = null;
        if (eventLogList == null || eventLogList.isEmpty()) 
            kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
                    topicsSet).transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() 
                @Override
                public JavaRDD<String> call(JavaPairRDD<String, String> pairRdd) throws Exception 
                    JavaRDD<String> rdd = pairRdd.map(new Function<Tuple2<String, String>, String>() 
                        @Override
                        public String call(Tuple2<String, String> arg0) throws Exception 
                            return arg0._2;
                        
                    );
                    writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());
                    return rdd;
                
            );
         else 
            for (EventLog eventLog : eventLogList) 
                kafkaTopicPartition.put(new TopicAndPartition(topicName, Integer.parseInt(eventLog.getPartition())),
                        Long.parseLong(eventLog.getUntilOffset()));
            
            kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class,
                    kafkaParams, kafkaTopicPartition, new Function<MessageAndMetadata<String, String>, String>() 
                        @Override
                        public String call(MessageAndMetadata<String, String> arg0) throws Exception 
                            return arg0.message();
                        
                    ).transform(new Function<JavaRDD<String>, JavaRDD<String>>() 

                @Override
                public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception 
                    writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());
                    return rdd;
                
            );
        
        // Use kafkaOutStream for further processing.
        jssc.start();
    

    private static void writeOffset(JavaRDD<String> rdd, final OffsetRange[] offsets) 
        for (OffsetRange offsetRange : offsets) 
            EventLog eventLog = new EventLog();
            eventLog.setTopicName(String.valueOf(offsetRange.topic()));
            eventLog.setPartition(String.valueOf(offsetRange.partition()));
            eventLog.setFromOffset(String.valueOf(offsetRange.fromOffset()));
            eventLog.setUntilOffset(String.valueOf(offsetRange.untilOffset()));
            javaFunctions(rdd).writerBuilder("test", "event_log", null).saveToCassandra();
        
    

希望这有助于解决您的问题...

【讨论】:

我的JavaRDDEventLog 类型。其中,是一个包含offsetId 的变量。如何将offsetId 添加到我的rdd?不会创建新对象。如何在 Zookeeper 中保存 offset ID? 我遇到了另一个问题。 javaFunctions(jssc) 显示错误The method javaFunctions(SparkContext) in the type CassandraJavaUtil is not applicable for the arguments (JavaStreamingContext)。你用的是哪个版本? 在(如果阻止)中出现以下错误:org.apache.spark.rdd.MapPartitionsRDD 无法转换为 org.apache.spark.streaming.kafka.HasOffsetRanges【参考方案2】:

所以,您想自己管理 kafka 偏移量。

为此:

    使用 createDirectStream 代替 createStream。这将允许您指定要读取的偏移量 (fromOffsets: Map[TopicAndPartition, Long])

    收集有关您已处理的偏移量的信息。这可以通过保存每条消息的偏移量来完成,或者您可以将此信息汇总在单独的表中。要从 rdd 获取偏移量范围:rdd.asInstanceOf[HasOffsetRanges].offsetRanges。对于java(根据文档)http://spark.apache.org/docs/latest/streaming-kafka-integration.htmlOffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

【讨论】:

我要求提供 Java 代码。您提供的解决方案适用于 Scala。

以上是关于如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?的主要内容,如果未能解决你的问题,请参考以下文章

如何获取 kafka 主题分区的最后/结束偏移量?

Kafka 消费者异常和偏移提交

如何获取 kafka 主题分区的最新偏移量?

如何正确使用 Kafka 消费者“寻找”以返回所有分区的未提交偏移量?

kafka查看消费了多少条数据

kafka查看消费了多少条数据