使用 PySpark 将 csv 转换为 .avro - 缺少依赖项

Posted

技术标签:

【中文标题】使用 PySpark 将 csv 转换为 .avro - 缺少依赖项【英文标题】:Converting csv to .avro using PySpark - missing dependencies 【发布时间】:2021-09-21 19:04:58 【问题描述】:

我有一个 Python 脚本,它使用 fastavro 库来转换 csv 文件并根据提供的架构对其进行序列化:

from fastavro import writer
from fastavro.schema import load_schema
import csv

schema = load_schema('schema.avsc')

def csv_reader():
    with open('data.csv') as f:
        yield from csv.DictReader(f)
with open('data.snappy.avro', 'wb') as out:
    writer(out, schema, csv_reader(), codec='snappy')

上述方法在小文件上运行良好,但在大文件上非常慢。一个 185MB 大小的 csv 文件需要 4.5 分钟来序列化,而我的一些文件的大小接近 5GB。

所以,我想测试一下 Spark 如何使用 PySpark 2.4.3 处理 csv 到 avro 的转换:

from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro

spark = SparkSession \
    .builder \
    .appName("Avro testing") \
    .getOrCreate()

schema = open("schema.avsc", "r").read()

df = spark.read.csv(path="/data/data.csv",
                    header=True)
output = df\
  .select(from_avro("value", schema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

但这会返回以下错误:

ModuleNotFoundError:没有名为“pyspark.sql.avro”的模块

好的,我知道默认情况下不包含 avro 库,我收到错误是有道理的。

Spark documentation 建议运行 ./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.1.2 ...,但我没有直接在命令行上运行 spark-submit - 它是从我的 Python 代码中调用的。

我的问题是:如何修改我的 Python 代码,以便仍然从代码中调用 Spark 作业,但还包括 pyspark.sql.avro 处缺少的外部 avro 库?

【问题讨论】:

【参考方案1】:

如果我理解了这个问题,那么您正在寻找一个环境变量

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.12:3.1.2' 

spark = SparkSession \
    .builder \
    .appName("Avro testing") \
    .getOrCreate()

但是,如果您正在读取 CSV 文件,则不应使用 from_avro

如果您想将 CSV 转换为 Avro,您只需要想要to_avro

如果你只是在本地运行,在一台机器上,如果 Spark 比普通的 Python 方法更快,我会感到惊讶

【讨论】:

以上是关于使用 PySpark 将 csv 转换为 .avro - 缺少依赖项的主要内容,如果未能解决你的问题,请参考以下文章

使用带有过滤器和其他列的 pyspark 将 CSV 转换为 JSON [关闭]

PySpark:将临时视图转换为表格并在本地驱动器中另存为 .CSV

使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]

如何从 pyspark 数据框中更快地保存 csv 文件?

将 pandas 数据框转换为 PySpark RDD 时出现问题?

Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串