使用 PySpark 将 csv 转换为 .avro - 缺少依赖项
Posted
技术标签:
【中文标题】使用 PySpark 将 csv 转换为 .avro - 缺少依赖项【英文标题】:Converting csv to .avro using PySpark - missing dependencies 【发布时间】:2021-09-21 19:04:58 【问题描述】:我有一个 Python 脚本,它使用 fastavro
库来转换 csv 文件并根据提供的架构对其进行序列化:
from fastavro import writer
from fastavro.schema import load_schema
import csv
schema = load_schema('schema.avsc')
def csv_reader():
with open('data.csv') as f:
yield from csv.DictReader(f)
with open('data.snappy.avro', 'wb') as out:
writer(out, schema, csv_reader(), codec='snappy')
上述方法在小文件上运行良好,但在大文件上非常慢。一个 185MB 大小的 csv 文件需要 4.5 分钟来序列化,而我的一些文件的大小接近 5GB。
所以,我想测试一下 Spark 如何使用 PySpark 2.4.3 处理 csv 到 avro 的转换:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
spark = SparkSession \
.builder \
.appName("Avro testing") \
.getOrCreate()
schema = open("schema.avsc", "r").read()
df = spark.read.csv(path="/data/data.csv",
header=True)
output = df\
.select(from_avro("value", schema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
但这会返回以下错误:
ModuleNotFoundError:没有名为“pyspark.sql.avro”的模块
好的,我知道默认情况下不包含 avro 库,我收到错误是有道理的。
Spark documentation 建议运行 ./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.1.2 ...
,但我没有直接在命令行上运行 spark-submit - 它是从我的 Python 代码中调用的。
我的问题是:如何修改我的 Python 代码,以便仍然从代码中调用 Spark 作业,但还包括 pyspark.sql.avro
处缺少的外部 avro 库?
【问题讨论】:
【参考方案1】:如果我理解了这个问题,那么您正在寻找一个环境变量
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.12:3.1.2'
spark = SparkSession \
.builder \
.appName("Avro testing") \
.getOrCreate()
但是,如果您正在读取 CSV 文件,则不应使用 from_avro
如果您想将 CSV 转换为 Avro,您只需要想要to_avro
如果你只是在本地运行,在一台机器上,如果 Spark 比普通的 Python 方法更快,我会感到惊讶
【讨论】:
以上是关于使用 PySpark 将 csv 转换为 .avro - 缺少依赖项的主要内容,如果未能解决你的问题,请参考以下文章
使用带有过滤器和其他列的 pyspark 将 CSV 转换为 JSON [关闭]
PySpark:将临时视图转换为表格并在本地驱动器中另存为 .CSV
使用 pyspark 将 Spark 数据框中的列转换为数组 [重复]