如何使用 pyspark 从 Spark 获取批量行

Posted

技术标签:

【中文标题】如何使用 pyspark 从 Spark 获取批量行【英文标题】:How do you get batches of rows from Spark using pyspark 【发布时间】:2020-03-11 22:30:30 【问题描述】:

我有一个包含超过 60 亿行数据的 Spark RDD,我想使用它来训练深度学习模型,使用 train_on_batch。我无法将所有行都放入内存中,因此我想一次获得 10K 左右的数据,以批处理成 64 或 128 的块(取决于模型大小)。我目前正在使用 rdd.sample() 但我认为这不能保证我会得到所有行。有没有更好的方法对数据进行分区以使其更易于管理,以便我可以编写一个生成器函数来获取批次?我的代码如下:

data_df = spark.read.parquet(PARQUET_FILE)
print(f'RDD Count: data_df.count()') # 6B+
data_sample = data_df.sample(True, 0.0000015).take(6400) 
sample_df = data_sample.toPandas()

def get_batch():
  for row in sample_df.itertuples():
    # TODO: put together a batch size of BATCH_SIZE
    yield row

for i in range(10):
    print(next(get_batch()))

【问题讨论】:

只是提醒你的命名是错误的,那不是 rdd 那是一个数据框。 另外我不相信你在使用 pandas 迭代 spark 上没有什么可取胜的,如果你在 python 中分块阅读这个会更好。 我更改了命名以反映它是一个数据框。我需要将数据作为 Pandas 数据框输入现有模型。 我想问一下为什么这个问题被否决了。我花了很多时间在谷歌上搜索如何做到这一点。如果答案是显而易见的,或者有一个记录/很好理解的机制,我会很感激一个链接,我会为浪费时间道歉。 【参考方案1】:

我不相信 spark 会让你对数据进行偏移或分页。

但是你可以添加一个索引,然后对其进行分页,首先:

    from pyspark.sql.functions import lit
    data_df = spark.read.parquet(PARQUET_FILE)
    count = data_df.count()
    chunk_size = 10000

    # Just adding a column for the ids
    df_new_schema = data_df.withColumn('pres_id', lit(1))

    # Adding the ids to the rdd 
    rdd_with_index = data_df.rdd.zipWithIndex().map(lambda (row,rowId): (list(row) + [rowId+1]))

    # Creating a dataframe with index
    df_with_index = spark.createDataFrame(chunk_rdd,schema=df_new_schema.schema)

    # Iterating into the chunks
    for chunk_size in range(0,count+1 ,chunk_size):
        initial_page = page_num*chunk_size
        final_page = initial_page + chunk_size 
        where_query = ('pres_id > 0 and pres_id <= 1').format(initial_page,final_page)
        chunk_df = df_with_index.where(where_query).toPandas()
        train_on_batch(chunk_df) # <== Your function here        

这不是最优的,由于使用了 pandas 数据框,它会严重利用 spark,但会解决您的问题。

如果这会影响您的功能,请不要忘记删除 id。

【讨论】:

【参考方案2】:

试试这个:

 from pyspark.sql import functions as F
 sample_dict = 

 # Read the parquet file
 df = spark.read.parquet("parquet file")

 # add the partition_number as a column
 df = df.withColumn('partition_num', F.spark_partition_id())
 df.persist()

 total_partition = [int(row.partition_num) for row in 
 df.select('partition_num').distinct().collect()]

 for each_df in total_partition:
     sample_dict[each_df] = df.where(df.partition_num == each_df) 

【讨论】:

这对我有用,特别是因为我有一个很大的镶木地板文件【参考方案3】:

我了解到您正计划训练深度学习模型。查看专为该用例创建的 Petastorm 开源库。

https://docs.databricks.com/applications/machine-learning/load-data/petastorm.html

Petastorm 是一个开源数据访问库。该库支持直接从 Apache Parquet 格式的数据集和已作为 Apache Spark DataFrames 加载的数据集对深度学习模型进行单节点或分布式训练和评估。 Petastorm 支持流行的基于 Python 的机器学习 (ML) 框架,例如 Tensorflow、PyTorch 和 PySpark。有关 Petastorm 的更多信息,请参阅 Petastorm GitHub 页面和Petastorm API documentation。

【讨论】:

以上是关于如何使用 pyspark 从 Spark 获取批量行的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pyspark 从 REST API 获取数据到 Spark Dataframe

从数据框批量插入到数据库,忽略 Pyspark 中的失败行

pyspark:如何获取 spark 数据帧的 Spark SQLContext?

如何保存从 PySpark 中的 URL 获取的 JSON 数据?

如何使用 Pyspark 中的 Graphframes 和 Spark Dataframe 中的原始数据获取连接的组件?

如何从 PySpark 中的 JavaSparkContext 获取 SparkContext?