将 Spark 数据集转换为 JSON 并写入 Kafka Producer
Posted
技术标签:
【中文标题】将 Spark 数据集转换为 JSON 并写入 Kafka Producer【英文标题】:Convert Spark Dataset to JSON and Write to Kafka Producer 【发布时间】:2018-10-05 14:37:32 【问题描述】:我想从 Hive 读取一个表并写入 Kafka Producer(批处理作业)。
目前,我在我的 java 类中以 Dataset<Row>
的形式读取表格并尝试转换为 json,以便我可以使用 KafkaProducer 将其写入为 json 消息。
Dataset<Row> data = spark.sql("select * from tablename limit 5");
List<Row> rows = data.collectAsList();
for(Row row: rows)
List<String> stringList = new ArrayList<String>(Arrays.asList(row.schema().fieldNames()));
Seq<String> row_seq = JavaConverters.asScalaIteratorConverter(stringList.iterator()).asScala().toSeq();
Map map = (Map) row.getValuesMap(row_seq);
JSONObject json = new JSONObject();
json.putAll( map);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(SPARK_CONF.get("topic.name"), json.toString());
producer.send(record);
我收到 ClassCastException
【问题讨论】:
请编辑您的问题以包含您的代码 已包含代码。请建议 【参考方案1】:一旦你写了collectAsList();
,你就不再使用 Spark,而是使用原始的 Kafka Java API。
我的建议是使用Spark Structured Streaming Kafka Integration,你可以这样做
这是一个示例,您需要形成一个至少包含两列的 DataFrame,因为 Kafka 需要键和值。
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic_name")
.save()
至于将数据输入 JSON,collectToList()
是错误的。不要将数据拉入单个节点。
您可以使用data.map()
将数据集从一种格式转换为另一种格式。
例如,您可以将 Row 映射为 JSON 格式的字符串。
row -> "\"f0\":" + row.get(0) + ""
【讨论】:
感谢您的回复。但是我上面提到的逻辑可以在 scala 中完成。这是代码: val df = spark.sql("select * from table limit 5") val row = df.first() val m = row.getValuesMap(row.schema.fieldNames) JSONObject(m).toString()我正在尝试在 java 中复制相同的逻辑并将其写入生产者。区别就在 scala 中,它能够将 scala.collection.immutable.Map[String,Nothing] 类型直接转换为 JSONObject 类型,而在 Java 的情况下它会出错。请建议 您应该能够从 Java HashMap 创建new JSONObject
,但是您需要知道如何将 Scala 映射转换为 hava 映射,但是,row.first()
不会给您所有数据帧行都作为 Kafka 事件。以上是关于将 Spark 数据集转换为 JSON 并写入 Kafka Producer的主要内容,如果未能解决你的问题,请参考以下文章