将 Spark 数据集转换为 JSON 并写入 Kafka Producer

Posted

技术标签:

【中文标题】将 Spark 数据集转换为 JSON 并写入 Kafka Producer【英文标题】:Convert Spark Dataset to JSON and Write to Kafka Producer 【发布时间】:2018-10-05 14:37:32 【问题描述】:

我想从 Hive 读取一个表并写入 Kafka Producer(批处理作业)。

目前,我在我的 java 类中以 Dataset<Row> 的形式读取表格并尝试转换为 json,以便我可以使用 KafkaProducer 将其写入为 json 消息。

Dataset<Row> data = spark.sql("select * from tablename limit 5");
List<Row> rows = data.collectAsList();
for(Row row: rows) 
        List<String> stringList = new ArrayList<String>(Arrays.asList(row.schema().fieldNames())); 
        Seq<String> row_seq = JavaConverters.asScalaIteratorConverter(stringList.iterator()).asScala().toSeq();
        Map map = (Map) row.getValuesMap(row_seq);
        JSONObject json = new JSONObject();
        json.putAll( map);

        ProducerRecord<String, String> record = new ProducerRecord<String, String>(SPARK_CONF.get("topic.name"), json.toString());
        producer.send(record);

我收到 ClassCastException

【问题讨论】:

请编辑您的问题以包含您的代码 已包含代码。请建议 【参考方案1】:

一旦你写了collectAsList();,你就不再使用 Spark,而是使用原始的 Kafka Java API。

我的建议是使用Spark Structured Streaming Kafka Integration,你可以这样做

这是一个示例,您需要形成一个至少包含两列的 DataFrame,因为 Kafka 需要键和值。

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic_name")
  .save()

至于将数据输入 JSON,collectToList() 是错误的。不要将数据拉入单个节点。

您可以使用data.map() 将数据集从一种格式转换为另一种格式。

例如,您可以将 Row 映射为 JSON 格式的字符串。

row -> "\"f0\":" + row.get(0) + ""

【讨论】:

感谢您的回复。但是我上面提到的逻辑可以在 scala 中完成。这是代码: val df = spark.sql("select * from table limit 5") val row = df.first() val m = row.getValuesMap(row.schema.fieldNames) JSONObject(m).toString()我正在尝试在 java 中复制相同的逻辑并将其写入生产者。区别就在 scala 中,它能够将 scala.collection.immutable.Map[String,Nothing] 类型直接转换为 JSONObject 类型,而在 Java 的情况下它会出错。请建议 您应该能够从 Java HashMap 创建 new JSONObject,但是您需要知道如何将 Scala 映射转换为 hava 映射,但是,row.first() 不会给您所有数据帧行都作为 Kafka 事件。

以上是关于将 Spark 数据集转换为 JSON 并写入 Kafka Producer的主要内容,如果未能解决你的问题,请参考以下文章

如何将包含 JSON 的输入 CSV 数据转换为 spark 数据集?

将spark结构化流数据帧转换为JSON

通过将键作为列将 json 字典转换为 spark 数据帧

Spark:从异构数据中写入 Paquet

JSON 到 Spark 中的数据集

我如何将平面数据框转换为 spark(scala 或 java)中的嵌套 json