从Kafka主题中读取结构化流
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从Kafka主题中读取结构化流相关的知识,希望对你有一定的参考价值。
我已经读取了一个csv文件,并将值字段转换为字节,并使用Kafka生成器应用程序写入Kafka主题。现在我尝试使用结构化流来读取Kafka主题但不能在值字段上应用自定义kryo反序列化。
谁能告诉我如何在结构化流中使用自定义反序列化?
答案
我有类似的问题,基本上,我在Protobuf上收到了所有Kafka的消息,我用UDF解决了这个问题。
from pyspark.sql.functions import udf
def deserialization_function(message):
#You need to add your code to deserialize your messages
#I returned a json but you can return other structure
json = {"x": x_deserializable,
"y": y_deserializable,
"w": w_deserializable,
"z": z_deserializable,
return json
schema = StructType()
.add("x", TimestampType())
.add("y", StringType())
.add("z", StringType())
.add("w", StringType())
own_udf = udf(deserialization_function, schema)
stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
.option("subscribe", topic)
.load()
query = stream
.select(col("value"))
.select((own_udf("value")).alias("value_udf"))
.select("value_udf.x", "value_udf.y", "value_udf.w", "value_udf.z")
以上是关于从Kafka主题中读取结构化流的主要内容,如果未能解决你的问题,请参考以下文章
如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?
如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?
运行自定义构建的 Kafka 流 DSL 应用程序返回 java.lang.ClassNotFoundException