Spark 流式传输 Kafka 消息未使用

Posted

技术标签:

【中文标题】Spark 流式传输 Kafka 消息未使用【英文标题】:Spark streaming Kafka messages not consumed 【发布时间】:2018-06-25 11:47:21 【问题描述】:

我想使用 Spark (1.6.2) Streaming 从 Kafka (broker v 0.10.2.1) 中的某个主题接收消息。

我正在使用 Receiver 方法。代码如下:

public static void main(String[] args) throws Exception

    SparkConf sparkConf = new SparkConf().setAppName("SimpleStreamingApp");
    JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(5000));
    //
    Map<String, Integer> topicMap = new HashMap<>();
    topicMap.put("myTopic", 1);
    //
    String zkQuorum = "host1:port1,host2:port2,host3:port3";
    //
    Map<String, String> kafkaParamsMap = new HashMap<>();
    kafkaParamsMap.put("bootstraps.server", zkQuorum);
    kafkaParamsMap.put("metadata.broker.list", zkQuorum);
    kafkaParamsMap.put("zookeeper.connect", zkQuorum);
    kafkaParamsMap.put("group.id", "group_name");
    kafkaParamsMap.put("security.protocol", "SASL_PLAINTEXT");
    kafkaParamsMap.put("security.mechanism", "GSSAPI");
    kafkaParamsMap.put("ssl.kerberos.service.name", "kafka");
    kafkaParamsMap.put("key.deserializer", "kafka.serializer.StringDecoder");
    kafkaParamsMap.put("value.deserializer", "kafka.serializer.DefaultDecoder");
    //
    JavaPairReceiverInputDStream<byte[], byte[]> stream = KafkaUtils.createStream(javaStreamingContext,
                            byte[].class, byte[].class,
                            DefaultDecoder.class, DefaultDecoder.class,
                            kafkaParamsMap,
                            topicMap,
                            StorageLevel.MEMORY_ONLY());

    VoidFunction<JavaPairRDD<byte[], byte[]>> voidFunc = new VoidFunction<JavaPairRDD<byte[], byte[]>> ()
    
       public void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception
       
          List<Tuple2<byte[], byte[]>> all = rdd.collect();
          System.out.println("size of red: " + all.size());
       
    

    stream.forEach(voidFunc);

    javaStreamingContext.start();
    javaStreamingContext.awaitTermination();

对 Kafka 的访问是kerberized。当我启动时

spark-submit --verbose --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" --files jaas.conf,privKey.der --principal &lt;accountName&gt; --keytab &lt;path to keytab file&gt; --master yarn --jars &lt;comma separated path to all jars&gt; --class &lt;fully qualified java main class&gt; &lt;path to jar file containing main class&gt;

    来自 Kafka 的 VerifiableProperties 类为 kafkaParams 哈希图中包含的属性记录 警告 消息:
INFO KafkaReceiver: connecting to zookeeper: <the correct zookeeper quorum provided in kafkaParams map>

VerifiableProperties: Property auto.offset.reset is overridden to largest
VerifiableProperties: Property enable.auto.commit is not valid.
VerifiableProperties: Property sasl.kerberos.service.name is not valid
VerifiableProperties: Property key.deserializer is not valid
...
VerifiableProperties: Property zookeeper.connect is overridden to ....

我认为是因为这些属性不被接受,所以可能会影响流处理。

** 当我在集群模式--master yarn 下启动时,不会出现这些警告消息**

    稍后,我看到以下日志按照配置每 5 秒重复一次:

    INFO BlockRDD: Removing RDD 4 from persistence list

    INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[4] at createStream at ...

    INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

    INFO ... INFO BlockManager: Removing RDD 4

但是,我没有看到控制台上打印出任何实际的消息

问题:为什么我的代码没有打印任何实际消息?

我的 gradle 依赖项是:

compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.6.2'
compile group: 'org.apache.spark', name: 'spark-streaming_2.10', version: '1.6.2'
compile group: 'org.apache.spark', name: 'spark-streaming-kafka_2.10', version: '1.6.2'

【问题讨论】:

您是否将 Spark 作业提交到远程集群? 是的,现在我使用选项--master yarn 将其提交到远程。我会在某个时候更新一些日志。删除了代码开头的setMaster(..) API 调用。 你为执行者分配的核心数量是多少,有多少执行者? 查看此链接 ***.com/questions/39565827/… 【参考方案1】:

stream 是 JavaPairReceiverInputDStream 的一个对象。将其转换为 Dstream 并使用 foreachRDD 打印从 Kafka 消费的消息

【讨论】:

没用。现在,我面临着不同的问题。我会用调查结果更新问题。【参考方案2】:

Spark 1.6.2 不支持 kafka 0.10,只支持 kafka0.8。对于 kafka 0.10,你应该使用 spark 2

【讨论】:

如果是这样,为什么没有任何错误提示?

以上是关于Spark 流式传输 Kafka 消息未使用的主要内容,如果未能解决你的问题,请参考以下文章

Spark流式传输作业不会删除随机播放文件

使用 Spark 流式传输的 Redshift

流式传输 Kmeans Spark JAVA

Spark 结构化流式传输 - 为不同的 GroupBy 键使用不同的 Windows

Kafka简介

基于kafka分区的结构化流式读取