使用 Spark SQL 流时缺少 Avro 自定义标头

Posted

技术标签:

【中文标题】使用 Spark SQL 流时缺少 Avro 自定义标头【英文标题】:Missing Avro Custom Header when using Spark SQL Streaming 【发布时间】:2020-06-08 21:22:58 【问题描述】:

在向 Kafka 发送 Avro GenericRecord 之前,会像这样插入一个 Header。

ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, key, message);
record.headers().add("schema", schema);

消费记录。

使用 Spark Streaming 时,ConsumerRecord 的标头是完整的。

    KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParams)).foreachRDD(rdd -> 
          rdd.foreach(record -> 

            System.out.println(new String(record.headers().headers("schema").iterator().next().value()));
          );
        );
    ;

但在使用 Spark SQL Streaming 时,似乎缺少标头。

   StreamingQuery query = dataset.writeStream().foreach(new ForeachWriter<>() 

      ...

      @Override
      public void process(Row row) 
        String topic = (String) row.get(2);
        int partition = (int) row.get(3);
        long offset = (long) row.get(4);
        String key = new String((byte[]) row.get(0));
        byte[] value = (byte[]) row.get(1);

        ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>(topic, partition, offset, key,
            value);

        //I need the schema to decode the Avro!

      
    ).start();

使用 Spark SQL Streaming 方法时,我在哪里可以找到自定义标头值?

版本:

<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>

更新

我尝试了 spark-sql_2.12 和 spark-sql-kafka-0-10_2.12 的 3.0.0-preview2。我加了

.option("includeHeaders", true)

但我仍然只能从 Row 中获取这些列。

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+

【问题讨论】:

【参考方案1】:

仅从 3.0 开始支持结构化流中的 Kafka 标头:https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html 详情请关注includeHeaders

【讨论】:

从 3.0.0-preview2 开始,Row 模式还不包括标题?我在线程“main” org.apache.spark.sql.AnalysisException: cannot resolve 'headers' given input columns: [offset, value, topic, timestamp, timestampType, partition, key]中得到异常; 请看代码:github.com/apache/spark/blob/… 必须设置includeHeaders 否则不起作用。 嗯,我看到你已经添加了includeHeaders。如果里面有bug,我会深入研究并修复它...... 不确定您的问题是什么,但它在这里完美运行:github.com/gaborgsomogyi/spark/blob/… 当我执行测试时,生成了以下条目:aaaaaaaaaaaa: WrappedArray([a,[B@3fcf9f74], [c,[B@2bd9204b]) 请分析您的应用程序与上述应用程序之间的区别火花代码。

以上是关于使用 Spark SQL 流时缺少 Avro 自定义标头的主要内容,如果未能解决你的问题,请参考以下文章

Spark sql怎么使用Kafka Avro序列化器

如何在 Spark SQL 中查询 Avro 表

databricks avro 架构无法转换为 Spark SQL 结构类型

Spark 2.4.0 Avro Java - 无法解析方法from_avro

在 Spark 中读取 Avro 文件

在 Spark 2.0 中从 AVRO 写入镶木地板时出现 NullPointerException