Dask map_partitions() 将 `partition_info` 打印为 None

Posted

技术标签:

【中文标题】Dask map_partitions() 将 `partition_info` 打印为 None【英文标题】:Dask map_partitions() prints `partition_info` as None 【发布时间】:2021-09-30 11:41:52 【问题描述】:

我正在尝试使用 Dask 中的 DataFrame.map_partitions() 在每个分区上应用一个函数。该函数接受输入值列表,并且必须在特定列上返回包含这些值的数据帧分区的行(使用loc()isin())。 问题是我收到此错误:

"index = partition_info['number'] - 1 TypeError: 'NoneType' 对象不可下标"

当我打印 partition_info 时,它会打印 None 数百次(但我在循环中只有 60 个元素,所以我们预计只有 60 个打印),打印 None 是否正常,因为它是一个子进程还是我缺少 partition_info 的内容?我找不到这方面的有用信息。

def apply_f(df, barcodes_per_core: List[List[str]], partition_info=None):
    print(partition_info)
    index = partition_info['number'] - 1
    indexes = barcodes_per_core[index]
    return df.loc[df['barcode'].isin(indexes)]

df = from_pandas(df, npartitions=nb_cores)
dfs_per_core = df.map_partitions(apply_f, barcodes_per_core, meta=df)
dfs_per_core = dfs_per_core.compute(scheduler='processes')

=>page 末尾的 partition_info 文档。

【问题讨论】:

【参考方案1】:

目前尚不清楚为什么事情对您不起作用,一件可能的事情是您多次重复使用df。这是一个有效的 MWE:

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame(range(10), columns=["a"])

ddf = dd.from_pandas(df, npartitions=3)

def my_func(d, x, partition_info='None'):
    print(x, partition_info)

ddf.map_partitions(my_func, 3, meta=df.head()).compute(scheduler='processes')

【讨论】:

以上是关于Dask map_partitions() 将 `partition_info` 打印为 None的主要内容,如果未能解决你的问题,请参考以下文章

将Dask包的Pandas DataFrame转换为单个Dask DataFrame

如何将数据读取到 dask 数据帧并删除坏行

将 Dask 分区写入单个文件

如何将 Dask.DataFrame 转换为 pd.DataFrame?

Dask - 将多列合并为一列

如何将Python Dask Dataframes合并到列中?