我怎样才能从 kafka 主题接收数据到我的 Streaming Structured DataFrame?

Posted

技术标签:

【中文标题】我怎样才能从 kafka 主题接收数据到我的 Streaming Structured DataFrame?【英文标题】:How could i get receive the data from kafka topic to my Streaming Structured DataFrame? 【发布时间】:2021-12-22 01:23:31 【问题描述】:

我知道如何使用我的 Kafka 主题中的数据,但我无法在正确的列中获取正确的数据。

我在 value 列中收到所有数据,格式如下:

"timestamp":"2021-11-09T11:03:48.955+01:00","time":"1","duration":"0","SourceComputer":"C1707","SourcePort":"N1","DestinationComputer":"C925","start/end":" "

有些字段是空的“”,而其他字段里面有一些数据(例如:“C1707”)。 我在想我可以使用这个功能:

DataFrame=DataFrame.withColumn(ColumnName[i],split(DataFrame["value"],',').getItem(i))

但我无法获得确切列中的确切数据。

+--------------------+--------------------+----+--------+--------------+----------+-------------------+---------+
|               value|           timestamp|time|duration|SourceComputer|SourcePort|DestinationComputer|start/end|
+--------------------+--------------------+----+--------+--------------+----------+-------------------+---------+
|"timestamp":"202...|"timestamp":"202...|null|    null|          null|      null|               null|     null|
+--------------------+--------------------+----+--------+--------------+----------+-------------------+---------+

知道如何以正确的方式接收数据吗?

谢谢!

【问题讨论】:

请出示完整的 Spark 代码。其他,请阅读databricks.com/blog/2017/04/26/… 【参考方案1】:

使用 from_json 方法,您可以将 Spark DataFrame 列上的 JSON 字符串转换为结构类型。 然后你可以将你的结构类型转换为所需的数据帧

import org.apache.spark.sql.functions.from_json
val schema = new StructType()
    .add("col1", StringType, true)
    .add("col2", StringType, true)
    .add("col3", StringType, true)
val df4=df.withColumn("value",from_json(col("value"),schema))
val df5=df4.select(col("value.*"))

请参考this link,这里都说明了。

【讨论】:

以上是关于我怎样才能从 kafka 主题接收数据到我的 Streaming Structured DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章

如何在spring cloud stream和kafka中从同一主题发送和接收

从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题

Kafka Connect:一个接收器连接器,用于从一个主题写入多个表

如何从 Swift 的闭包中接收方法的输出?

如何配置Kafka RPC调用者主题和组

使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是啥?