Dask 到展平字典列

Posted

技术标签:

【中文标题】Dask 到展平字典列【英文标题】:Dask to Flatten Dictionary Column 【发布时间】:2019-09-30 18:51:48 【问题描述】:

我是 Dask 的新手,正在寻找一种方法来展平 PANDAS 数据框中的字典列。这是 1600 万行数据帧的第一行的屏幕截图:

这里是三行文本的示例:

u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'DEBRA MEALY', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'CHAIR PERSON', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X', u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'HELEN GORDON', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'VICE CHAIR', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X', 'F9_07_PC_HIGH_COMP_EMPLOYEE': 'X', 'F9_07_PZ_DIRTRSTKEY_NAME': 'ROB S KHANUJA', 'F9_07_PZ_COMP_OTHER': '14902', 'F9_07_PZ_COMP_RELATED': '0', 'F9_07_PZ_TITLE': 'EXEC. DIR. OPERATIONS', 'F9_07_PZ_AVE_HOURS_WEEK': '40.00', 'F9_07_PZ_COMP_DIRECT': '133173'

我通常会使用以下代码展平 Form990PartVIISectionAGrp 列:

    df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].swifter.apply(pd.Series)], axis=1)

我希望在 Dask 中执行此操作,但收到以下错误:“ValueError:计算数据中的列与提供的元数据中的列不匹配。”

我正在使用 Python 2.7。我导入相关包

    from dask import dataframe as dd
    from dask.multiprocessing import get
    from multiprocessing import cpu_count
    nCores = cpu_count()

为了测试代码,我创建了一个随机数据样本:

    dfs = df.sample(1000)

然后生成Dask数据框:

    ddf = dd.from_pandas(dfs, npartitions=nCores)

该列当前为字符串格式,因此我将其转换为字典。通常,我只会写一行代码:

dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval) 

但我改为尝试以更“类似 Dask”的形式在此处执行此操作,因此我编写了以下函数然后应用它:

    def make_dict(dfs):
        dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)   
        return dfs
    ddf_out = ddf.map_partitions(make_dict, meta=dfs[:0]).compute()

这行得通——它返回一个 PANDAS 数据框,其中 Form990PartVIISectionAGrp 列是字典格式(但是它并不比非 Dask 应用快)。

然后我重新创建 Dask DF:

    ddf = dd.from_pandas(ddf_out, npartitions=nCores)

并编写一个函数来展平列:

    def flatten(ddf_out):
        ddf_out = pd.concat([ddf_out.drop(['Form990PartVIISectionAGrp'], axis=1), ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
        #ddf_out = ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)
    return ddf_out

如果我再运行这段代码:

    result = ddf.map_partitions(flatten)

我得到以下输出,其中列没有被展平:

我还收到有关缺少元数据的错误,并且鉴于上述内容无助于解析字典列,因此我创建了一个由普通 Python 展平列生成的列列表,并使用它来创建字典列和数据类型:

metadir = u'BusinessName': 'O', u'F9_07_PC_FORMER': 'O', u'F9_07_PC_HIGH_COMP_EMPLOYEE': 'O',
       u'F9_07_PC_KEY_EMPLOYEE': 'O', u'F9_07_PC_OFFICER': 'O',
       u'F9_07_PC_TRUSTEE_INDIVIDUAL': 'O', u'F9_07_PC_TRUSTEE_INSTITUTIONAL': 'O',
       u'F9_07_PZ_AVE_HOURS_WEEK': 'O', u'F9_07_PZ_AVE_HOURS_WEEK_RELATED': 'O',
       u'F9_07_PZ_COMP_DIRECT': 'O', u'F9_07_PZ_COMP_OTHER': 'O',
       u'F9_07_PZ_COMP_RELATED': 'O', u'F9_07_PZ_DIRTRSTKEY_NAME': 'O',
       u'F9_07_PZ_TITLE': 'O', u'NameBusiness': 'O', u'URL': 'O'

然后我用这个元数据应用 flatten 函数:

    result = ddf.map_partitions(flatten, meta=metadir)

我得到以下输出结果:

运行 result.columns 会产生这样的结果:

失败的地方是运行 compute(),我收到以下错误消息:“ValueError:计算数据中的列与提供的元数据中的列不匹配。”无论我写,我都会得到同样的错误:

result.compute()

result.compute(meta=metadir)

我不确定我在这里做错了什么。 result 中的列似乎与 metadir 中的列相匹配。任何建议将不胜感激。

更新: 这是我更新 flatten 功能的尝试。

    meta = pd.DataFrame(columns=['URL', 'F9_07_PC_TRUSTEE_INDIVIDUAL',
     'F9_07_PZ_DIRTRSTKEY_NAME',
     'F9_07_PZ_COMP_OTHER',
     'F9_07_PZ_COMP_RELATED',
     'F9_07_PZ_TITLE',
     'F9_07_PZ_AVE_HOURS_WEEK',
     'F9_07_PZ_COMP_DIRECT',
     'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
     'F9_07_PC_OFFICER',
     'F9_07_PC_HIGH_COMP_EMPLOYEE',
     'BusinessName',
     'F9_07_PC_KEY_EMPLOYEE',
     'F9_07_PC_TRUSTEE_INSTITUTIONAL',
     'NameBusiness',
     'F9_07_PC_FORMER'], dtype="O")

    def flatten(ddf_out):
        ddf_out = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
        for m in meta:
            if m not in ddf_out:
                df[m] = '' 
        return ddf_out

然后我运行:

result = ddf.map_partitions(flatten, meta=meta).compute()

【问题讨论】:

以上是一些示例数据。 【参考方案1】:

一些注意事项开始:

.apply(literal_eval)

这不是map更好吗?

然后我重新创建 Dask DF:

ddf = dd.from_pandas(ddf_out, npartitions=nCores)

ddf_out 已经是一个 dask 数据帧,我不知道你为什么要这样做。

结果中的列似乎与 metadir 中的列匹配。

result.columns 的值取自您提供的元数据,在您请求之前不会进行计算(dask 在大多数操作中是惰性的)。 ValueError 异常是否没有提供更多信息?

这是一个完整的例子

x = ('F9_07_PZ_COMP_DIRECT': '0',
  'F9_07_PZ_DIRTRSTKEY_NAME': 'DEBRA MEALY',
  'F9_07_PZ_COMP_OTHER': '0',
  'F9_07_PZ_COMP_RELATED': '0',
  'F9_07_PZ_TITLE': 'CHAIR PERSON',
  'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
  'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X',
 'F9_07_PZ_COMP_DIRECT': '0',
  'F9_07_PZ_DIRTRSTKEY_NAME': 'HELEN GORDON',
  'F9_07_PZ_COMP_OTHER': '0',
  'F9_07_PZ_COMP_RELATED': '0',
  'F9_07_PZ_TITLE': 'VICE CHAIR',
  'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
  'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X')
df = pd.DataFrame('a': x)
d = dd.from_pandas(df, 1)
meta = pd.DataFrame(columns=['F9_07_PZ_COMP_DIRECT', 
       'F9_07_PZ_DIRTRSTKEY_NAME',
       'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE',
       'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PC_TRUSTEE_INDIVIDUAL'], dtype="O")
d.map_partitions(lambda df: df.a.apply(pd.Series), meta=meta).compute()

我怎么知道meta 使用什么?我将该函数应用于 pandas 数据框 - 您可以使用一小部分数据框来执行此操作。

一些补充说明:

使用 pandas 加载数据、传递给 dask 工作人员然后将整个结果收集回 pandas(在内存中)数据帧是一种反模式,您不太可能看到这种加速,并且可能会导致很多开销。您最好使用 dd.read_csv 之类的内容进行加载,并使用 dask 函数进行聚合或编写。仅compute() 处理较小或不返回任何内容的内容(因为它涉及写入输出)。官方示例不使用 from_pandas。 string 和 dict 处理是 python 方法,因此持有任何 python 函数的解释器锁 (GIL):线程实际上不会并行运行。要获得并行性,您需要在进程中运行,使用https://docs.dask.org/en/latest/setup/single-distributed.html 最容易实现这一点 分布式调度程序还允许您访问仪表板,其中包含许多有用的信息来诊断系统的运行情况。您还可以对其行为进行大量配置,以防您需要遵守防火墙规则。

【讨论】:

感谢您的留言。首先,这是我第一次使用 Dask。要回答有关 apply(literal_eval) 与 map(literal_eval) 的问题,我真的不知道。我刚刚进入了一张尝试过的地图,它可以工作,但比应用要慢一些。至于 ddf_out,那只是为了测试——但从我上面显示的输出来看,它是一个 PANDAS DF 而不是 Dask DF——这不是计算的作用吗?不过,您是对的,我不需要这样做——我只是检查计算是否适用于literal_eval,它确实如此。 ...我粘贴了一些示例数据。我还通过不在literal_eval 阶段运行计算并且不重新创建Dask 数据帧来重新运行上述代码——同样的错误消息。我在 ValueError 消息中看不到任何重要的信息。谢谢。 谢谢。它在 Python 2.7 中不起作用,但我确实使用您的示例代码让它在 3.6 中起作用。我确实采用了您的方法,并通过将元数据应用到 PANDAS 数据框来确定元数据——这就是我在上面创建元目录的方式。我很抱歉,但我在提供的两个示例中犯了一个错误——两者都有相同的键,但情况并非总是如此。问题可能在于,在整个数据集中,有我在 metadir 中列出的键——但并非每条记录都有每个键。我尝试了您的方法并更新了元,如以下评论所示,但 ValueError 相同: 元= pd.DataFrame(列= [ 'F9_07_PC_TRUSTEE_INDIVIDUAL', 'F9_07_PZ_DIRTRSTKEY_NAME', 'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE', 'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PZ_COMP_DIRECT', 'F9_07_PZ_AVE_HOURS_WEEK_RELATED',' F9_07_PC_OFFICER'、'F9_07_PC_HIGH_COMP_EMPLOYEE'、'BusinessName'、'F9_07_PC_KEY_EMPLOYEE'、'F9_07_PC_TRUSTEE_INSTITUTIONAL'、'NameBusiness'、'F9_07_PC_FORMER']、dtype="O") 我更新了上面的答案,包括三个具有不同键的示例行。【参考方案2】:

给定一个中小型数据集,普通的 PANDAS 解决方案可以工作:

df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)

但是,PANDAS 解决方案有 1600 万行,无法在 16GB RAM 的 Macbook 或 96GB 的 Windows 机器上运行。出于这个原因,我看着达斯克。但是,如上面的答案和 cmets 所示,Dask 解决方案不起作用,因为我的数据集中的每个观察值不一定具有所有字典键。 Form990PartVIISectionAGrp 的 1600 万个观测值总共有以下列表中的 15 个键:

  newkeys = ['F9_07_PC_TRUSTEE_INDIVIDUAL',
 'F9_07_PZ_DIRTRSTKEY_NAME',
 'F9_07_PZ_COMP_OTHER',
 'F9_07_PZ_COMP_RELATED',
 'F9_07_PZ_TITLE',
 'F9_07_PZ_AVE_HOURS_WEEK',
 'F9_07_PZ_COMP_DIRECT',
 'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
 'F9_07_PC_OFFICER',
 'F9_07_PC_HIGH_COMP_EMPLOYEE',
 'BusinessName',
 'F9_07_PC_KEY_EMPLOYEE',
 'F9_07_PC_TRUSTEE_INSTITUTIONAL',
 'NameBusiness',
 'F9_07_PC_FORMER']

因此,我的解决方案涉及采用上面@mdurant 提供的一些提示,并首先将任何缺少的键添加到每一行:

for index, row in df[:].iterrows():
    for k in newkeys:
        row['Form990PartVIISectionAGrp'].setdefault(k, np.nan)

在我的 Macbook 上花了 100 分钟。根据 mdurant 的评论,我将数据框保存为 JSON 格式:

df.to_json('df.json', orient='records', lines=True)

并将文件作为文本读入 Dask:

import json
import dask.bag as db
b = db.read_text('df.json').map(json.loads)

然后创建一个函数来展平列:

def flatten(record):
    return 
    'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
    'F9_07_PZ_COMP_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_RELATED'],
    'F9_07_PC_TRUSTEE_INDIVIDUAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INDIVIDUAL'],
    'F9_07_PZ_DIRTRSTKEY_NAME': record['Form990PartVIISectionAGrp']['F9_07_PZ_DIRTRSTKEY_NAME'],
    'F9_07_PZ_COMP_DIRECT': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_DIRECT'],
    'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],  
    'BusinessName': record['Form990PartVIISectionAGrp']['BusinessName'],  
    'F9_07_PC_FORMER': record['Form990PartVIISectionAGrp']['F9_07_PC_FORMER'],
    'F9_07_PC_HIGH_COMP_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_HIGH_COMP_EMPLOYEE'],
    'F9_07_PC_KEY_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_KEY_EMPLOYEE'],
    'F9_07_PC_OFFICER': record['Form990PartVIISectionAGrp']['F9_07_PC_OFFICER'],
    'F9_07_PC_TRUSTEE_INSTITUTIONAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INSTITUTIONAL'],
    'F9_07_PZ_AVE_HOURS_WEEK': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK'],
    'F9_07_PZ_AVE_HOURS_WEEK_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK_RELATED'],
    'F9_07_PZ_TITLE': record['Form990PartVIISectionAGrp']['F9_07_PZ_TITLE'],
    'NameBusiness': record['Form990PartVIISectionAGrp']['NameBusiness'],
    'URL': record['URL'],

然后我可以应用该功能:

df = b.map(flatten).to_dataframe()

并将数据导出为 CSV:

df.to_csv('compensation*.csv')

这就像一个魅力!简而言之,根据上面 mdurant 的有用 cmets,关键是 1) 为所有观察添加缺失的键,以及 2) 不将数据从 PANDAS 读入 Dask(改用文本或 CSV)。处理好这两个问题可以很好地解决这个问题。

【讨论】:

以上是关于Dask 到展平字典列的主要内容,如果未能解决你的问题,请参考以下文章

pandas json_normalize 所有列都有嵌套字典展平

用嵌套列表和嵌套字典列表展平一个非常大的 Json

Python:展平多个嵌套的字典并追加

如何展平此数据框

如何使用嵌套字典列表展平熊猫数据框中的列

在 Julia 中展平字典