将 Spark 数据集转换为 JSON 并写入 Kafka Producer

Posted

技术标签:

【中文标题】将 Spark 数据集转换为 JSON 并写入 Kafka Producer【英文标题】:Convert Spark Dataset to JSON and Write to Kafka Producer 【发布时间】:2018-10-05 14:37:32 【问题描述】:

我想从 Hive 读取一个表并写入 Kafka Producer(批处理作业)。

目前,我在我的 java 类中以 Dataset<Row> 的形式读取表格并尝试转换为 json,以便我可以使用 KafkaProducer 将其写入为 json 消息。

Dataset<Row> data = spark.sql("select * from tablename limit 5");
List<Row> rows = data.collectAsList();
for(Row row: rows) 
        List<String> stringList = new ArrayList<String>(Arrays.asList(row.schema().fieldNames())); 
        Seq<String> row_seq = JavaConverters.asScalaIteratorConverter(stringList.iterator()).asScala().toSeq();
        Map map = (Map) row.getValuesMap(row_seq);
        JSONObject json = new JSONObject();
        json.putAll( map);

        ProducerRecord<String, String> record = new ProducerRecord<String, String>(SPARK_CONF.get("topic.name"), json.toString());
        producer.send(record);

我收到 ClassCastException

【问题讨论】:

请编辑您的问题以包含您的代码 已包含代码。请建议 【参考方案1】:

一旦你写了collectAsList();,你就不再使用 Spark,而是使用原始的 Kafka Java API。

我的建议是使用Spark Structured Streaming Kafka Integration,你可以这样做

这是一个示例,您需要形成一个至少包含两列的 DataFrame,因为 Kafka 需要键和值。

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic_name")
  .save()

至于将数据输入 JSON,collectToList() 是错误的。不要将数据拉入单个节点。

您可以使用data.map() 将数据集从一种格式转换为另一种格式。

例如,您可以将 Row 映射为 JSON 格式的字符串。

row -> "\"f0\":" + row.get(0) + ""

【讨论】:

感谢您的回复。但是我上面提到的逻辑可以在 scala 中完成。这是代码: val df = spark.sql("select * from table limit 5") val row = df.first() val m = row.getValuesMap(row.schema.fieldNames) JSONObject(m).toString()我正在尝试在 java 中复制相同的逻辑并将其写入生产者。区别就在 scala 中,它能够将 scala.collection.immutable.Map[String,Nothing] 类型直接转换为 JSONObject 类型,而在 Java 的情况下它会出错。请建议 您应该能够从 Java HashMap 创建 new JSONObject,但是您需要知道如何将 Scala 映射转换为 hava 映射,但是,row.first() 不会给您所有数据帧行都作为 Kafka 事件。

以上是关于将 Spark 数据集转换为 JSON 并写入 Kafka Producer的主要内容,如果未能解决你的问题,请参考以下文章