Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python

Posted

技术标签:

【中文标题】Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python【英文标题】:Pyspark 2.4.0, read avro from kafka with read stream - Python 【发布时间】:2019-07-08 15:21:09 【问题描述】:

我正在尝试使用 PySpark 2.4.0 从 Kafka 读取 avro 消息。

spark-avro 外部模块可以提供这个解决方案来读取 avro 文件:

df = spark.read.format("avro").load("examples/src/main/resources/users.avro") 
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

但是,我需要阅读流式传输的 avro 消息。库文档建议使用 from_avro() 函数,该函数仅适用于 Scala 和 Java。

是否还有其他模块支持读取从 Kafka 流式传输的 avro 消息?

【问题讨论】:

【参考方案1】:

您可以包含 spark-avro 包,例如使用 --packages(调整版本以匹配 spark 安装):

bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0

并提供您自己的包装器:

from pyspark.sql.column import Column, _to_java_column 

def from_avro(col, jsonFormatSchema): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema)) 


def to_avro(col): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
    return Column(f(_to_java_column(col))) 

使用示例(取自the official test suite):

from pyspark.sql.functions import col, struct


avro_type_struct = """

  "type": "record",
  "name": "struct",
  "fields": [
    "name": "col1", "type": "long",
    "name": "col2", "type": "string"
  ]
"""


df = spark.range(10).select(struct(
    col("id"),
    col("id").cast("string").alias("id2")
).alias("struct"))
avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))
avro_struct_df.show(3)
+----------+
|      avro|
+----------+
|[00 02 30]|
|[02 02 31]|
|[04 02 32]|
+----------+
only showing top 3 rows
avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)
+------------------------------------------------+
|from_avro(avro, struct<col1:bigint,col2:string>)|
+------------------------------------------------+
|                                          [0, 0]|
|                                          [1, 1]|
|                                          [2, 2]|
+------------------------------------------------+
only showing top 3 rows

【讨论】:

这里要注意我在使用 spark-submit 导入包时遇到的一个问题是 $spark-submit job.py --packages org.apache.spark:spark-avro_2.11:2.4 .0 不起作用。相反,它应该像这样写 $spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 job.py 这不适用于来自 Confluent Schema Registry 的 Avro。为此,这个答案似乎更好***.com/a/55786881/2308683

以上是关于Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python的主要内容,如果未能解决你的问题,请参考以下文章

使用火花流从数据库流式读取

使用 spark 结构化流从 s3 读取 avro 文件

如何使用 C++ 中的流从文件末尾读取给定数量的行?

net.jpounz.lz4 使用火花流从 kafka 读取时出现异常

有没有办法修改此代码以让火花流从 json 中读取?

带有 PySpark 2.4 的 Pandas UDF [重复]