Dask 到展平字典列
Posted
技术标签:
【中文标题】Dask 到展平字典列【英文标题】:Dask to Flatten Dictionary Column 【发布时间】:2019-09-30 18:51:48 【问题描述】:我是 Dask 的新手,正在寻找一种方法来展平 PANDAS 数据框中的字典列。这是 1600 万行数据帧的第一行的屏幕截图:
这里是三行文本的示例:
u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'DEBRA MEALY', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'CHAIR PERSON', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X', u'F9_07_PZ_COMP_DIRECT': u'0', u'F9_07_PZ_DIRTRSTKEY_NAME': u'HELEN GORDON', u'F9_07_PZ_COMP_OTHER': u'0', u'F9_07_PZ_COMP_RELATED': u'0', u'F9_07_PZ_TITLE': u'VICE CHAIR', u'F9_07_PZ_AVE_HOURS_WEEK': u'1.00', u'F9_07_PC_TRUSTEE_INDIVIDUAL': u'X', 'F9_07_PC_HIGH_COMP_EMPLOYEE': 'X', 'F9_07_PZ_DIRTRSTKEY_NAME': 'ROB S KHANUJA', 'F9_07_PZ_COMP_OTHER': '14902', 'F9_07_PZ_COMP_RELATED': '0', 'F9_07_PZ_TITLE': 'EXEC. DIR. OPERATIONS', 'F9_07_PZ_AVE_HOURS_WEEK': '40.00', 'F9_07_PZ_COMP_DIRECT': '133173'
我通常会使用以下代码展平 Form990PartVIISectionAGrp 列:
df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].swifter.apply(pd.Series)], axis=1)
我希望在 Dask 中执行此操作,但收到以下错误:“ValueError:计算数据中的列与提供的元数据中的列不匹配。”
我正在使用 Python 2.7。我导入相关包
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
nCores = cpu_count()
为了测试代码,我创建了一个随机数据样本:
dfs = df.sample(1000)
然后生成Dask数据框:
ddf = dd.from_pandas(dfs, npartitions=nCores)
该列当前为字符串格式,因此我将其转换为字典。通常,我只会写一行代码:
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)
但我改为尝试以更“类似 Dask”的形式在此处执行此操作,因此我编写了以下函数然后应用它:
def make_dict(dfs):
dfs['Form990PartVIISectionAGrp'] = dfs['Form990PartVIISectionAGrp'].apply(literal_eval)
return dfs
ddf_out = ddf.map_partitions(make_dict, meta=dfs[:0]).compute()
这行得通——它返回一个 PANDAS 数据框,其中 Form990PartVIISectionAGrp 列是字典格式(但是它并不比非 Dask 应用快)。
然后我重新创建 Dask DF:
ddf = dd.from_pandas(ddf_out, npartitions=nCores)
并编写一个函数来展平列:
def flatten(ddf_out):
ddf_out = pd.concat([ddf_out.drop(['Form990PartVIISectionAGrp'], axis=1), ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
#ddf_out = ddf_out['Form990PartVIISectionAGrp'].apply(pd.Series)
return ddf_out
如果我再运行这段代码:
result = ddf.map_partitions(flatten)
我得到以下输出,其中列没有被展平:
我还收到有关缺少元数据的错误,并且鉴于上述内容无助于解析字典列,因此我创建了一个由普通 Python 展平列生成的列列表,并使用它来创建字典列和数据类型:
metadir = u'BusinessName': 'O', u'F9_07_PC_FORMER': 'O', u'F9_07_PC_HIGH_COMP_EMPLOYEE': 'O',
u'F9_07_PC_KEY_EMPLOYEE': 'O', u'F9_07_PC_OFFICER': 'O',
u'F9_07_PC_TRUSTEE_INDIVIDUAL': 'O', u'F9_07_PC_TRUSTEE_INSTITUTIONAL': 'O',
u'F9_07_PZ_AVE_HOURS_WEEK': 'O', u'F9_07_PZ_AVE_HOURS_WEEK_RELATED': 'O',
u'F9_07_PZ_COMP_DIRECT': 'O', u'F9_07_PZ_COMP_OTHER': 'O',
u'F9_07_PZ_COMP_RELATED': 'O', u'F9_07_PZ_DIRTRSTKEY_NAME': 'O',
u'F9_07_PZ_TITLE': 'O', u'NameBusiness': 'O', u'URL': 'O'
然后我用这个元数据应用 flatten 函数:
result = ddf.map_partitions(flatten, meta=metadir)
我得到以下输出结果:
运行 result.columns 会产生这样的结果:
失败的地方是运行 compute(),我收到以下错误消息:“ValueError:计算数据中的列与提供的元数据中的列不匹配。”无论我写,我都会得到同样的错误:
result.compute()
或
result.compute(meta=metadir)
我不确定我在这里做错了什么。 result 中的列似乎与 metadir 中的列相匹配。任何建议将不胜感激。
更新: 这是我更新 flatten 功能的尝试。
meta = pd.DataFrame(columns=['URL', 'F9_07_PC_TRUSTEE_INDIVIDUAL',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER',
'F9_07_PZ_COMP_RELATED',
'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK',
'F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
'F9_07_PC_OFFICER',
'F9_07_PC_HIGH_COMP_EMPLOYEE',
'BusinessName',
'F9_07_PC_KEY_EMPLOYEE',
'F9_07_PC_TRUSTEE_INSTITUTIONAL',
'NameBusiness',
'F9_07_PC_FORMER'], dtype="O")
def flatten(ddf_out):
ddf_out = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
for m in meta:
if m not in ddf_out:
df[m] = ''
return ddf_out
然后我运行:
result = ddf.map_partitions(flatten, meta=meta).compute()
【问题讨论】:
以上是一些示例数据。 【参考方案1】:一些注意事项开始:
.apply(literal_eval)
这不是map
更好吗?
然后我重新创建 Dask DF:
ddf = dd.from_pandas(ddf_out, npartitions=nCores)
ddf_out
已经是一个 dask 数据帧,我不知道你为什么要这样做。
结果中的列似乎与 metadir 中的列匹配。
result.columns
的值取自您提供的元数据,在您请求之前不会进行计算(dask 在大多数操作中是惰性的)。 ValueError 异常是否没有提供更多信息?
这是一个完整的例子
x = ('F9_07_PZ_COMP_DIRECT': '0',
'F9_07_PZ_DIRTRSTKEY_NAME': 'DEBRA MEALY',
'F9_07_PZ_COMP_OTHER': '0',
'F9_07_PZ_COMP_RELATED': '0',
'F9_07_PZ_TITLE': 'CHAIR PERSON',
'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X',
'F9_07_PZ_COMP_DIRECT': '0',
'F9_07_PZ_DIRTRSTKEY_NAME': 'HELEN GORDON',
'F9_07_PZ_COMP_OTHER': '0',
'F9_07_PZ_COMP_RELATED': '0',
'F9_07_PZ_TITLE': 'VICE CHAIR',
'F9_07_PZ_AVE_HOURS_WEEK': '1.00',
'F9_07_PC_TRUSTEE_INDIVIDUAL': 'X')
df = pd.DataFrame('a': x)
d = dd.from_pandas(df, 1)
meta = pd.DataFrame(columns=['F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PC_TRUSTEE_INDIVIDUAL'], dtype="O")
d.map_partitions(lambda df: df.a.apply(pd.Series), meta=meta).compute()
我怎么知道meta
使用什么?我将该函数应用于 pandas 数据框 - 您可以使用一小部分数据框来执行此操作。
一些补充说明:
使用 pandas 加载数据、传递给 dask 工作人员然后将整个结果收集回 pandas(在内存中)数据帧是一种反模式,您不太可能看到这种加速,并且可能会导致很多开销。您最好使用dd.read_csv
之类的内容进行加载,并使用 dask 函数进行聚合或编写。仅compute()
处理较小或不返回任何内容的内容(因为它涉及写入输出)。官方示例不使用 from_pandas。
string 和 dict 处理是 python 方法,因此持有任何 python 函数的解释器锁 (GIL):线程实际上不会并行运行。要获得并行性,您需要在进程中运行,使用https://docs.dask.org/en/latest/setup/single-distributed.html 最容易实现这一点
分布式调度程序还允许您访问仪表板,其中包含许多有用的信息来诊断系统的运行情况。您还可以对其行为进行大量配置,以防您需要遵守防火墙规则。
【讨论】:
感谢您的留言。首先,这是我第一次使用 Dask。要回答有关 apply(literal_eval) 与 map(literal_eval) 的问题,我真的不知道。我刚刚进入了一张尝试过的地图,它可以工作,但比应用要慢一些。至于 ddf_out,那只是为了测试——但从我上面显示的输出来看,它是一个 PANDAS DF 而不是 Dask DF——这不是计算的作用吗?不过,您是对的,我不需要这样做——我只是检查计算是否适用于literal_eval,它确实如此。 ...我粘贴了一些示例数据。我还通过不在literal_eval 阶段运行计算并且不重新创建Dask 数据帧来重新运行上述代码——同样的错误消息。我在 ValueError 消息中看不到任何重要的信息。谢谢。 谢谢。它在 Python 2.7 中不起作用,但我确实使用您的示例代码让它在 3.6 中起作用。我确实采用了您的方法,并通过将元数据应用到 PANDAS 数据框来确定元数据——这就是我在上面创建元目录的方式。我很抱歉,但我在提供的两个示例中犯了一个错误——两者都有相同的键,但情况并非总是如此。问题可能在于,在整个数据集中,有我在 metadir 中列出的键——但并非每条记录都有每个键。我尝试了您的方法并更新了元,如以下评论所示,但 ValueError 相同: 元= pd.DataFrame(列= [ 'F9_07_PC_TRUSTEE_INDIVIDUAL', 'F9_07_PZ_DIRTRSTKEY_NAME', 'F9_07_PZ_COMP_OTHER', 'F9_07_PZ_COMP_RELATED', 'F9_07_PZ_TITLE', 'F9_07_PZ_AVE_HOURS_WEEK', 'F9_07_PZ_COMP_DIRECT', 'F9_07_PZ_AVE_HOURS_WEEK_RELATED',' F9_07_PC_OFFICER'、'F9_07_PC_HIGH_COMP_EMPLOYEE'、'BusinessName'、'F9_07_PC_KEY_EMPLOYEE'、'F9_07_PC_TRUSTEE_INSTITUTIONAL'、'NameBusiness'、'F9_07_PC_FORMER']、dtype="O") 我更新了上面的答案,包括三个具有不同键的示例行。【参考方案2】:给定一个中小型数据集,普通的 PANDAS 解决方案可以工作:
df = pd.concat([df.drop(['Form990PartVIISectionAGrp'], axis=1), df['Form990PartVIISectionAGrp'].apply(pd.Series)], axis=1)
但是,PANDAS 解决方案有 1600 万行,无法在 16GB RAM 的 Macbook 或 96GB 的 Windows 机器上运行。出于这个原因,我看着达斯克。但是,如上面的答案和 cmets 所示,Dask 解决方案不起作用,因为我的数据集中的每个观察值不一定具有所有字典键。 Form990PartVIISectionAGrp 的 1600 万个观测值总共有以下列表中的 15 个键:
newkeys = ['F9_07_PC_TRUSTEE_INDIVIDUAL',
'F9_07_PZ_DIRTRSTKEY_NAME',
'F9_07_PZ_COMP_OTHER',
'F9_07_PZ_COMP_RELATED',
'F9_07_PZ_TITLE',
'F9_07_PZ_AVE_HOURS_WEEK',
'F9_07_PZ_COMP_DIRECT',
'F9_07_PZ_AVE_HOURS_WEEK_RELATED',
'F9_07_PC_OFFICER',
'F9_07_PC_HIGH_COMP_EMPLOYEE',
'BusinessName',
'F9_07_PC_KEY_EMPLOYEE',
'F9_07_PC_TRUSTEE_INSTITUTIONAL',
'NameBusiness',
'F9_07_PC_FORMER']
因此,我的解决方案涉及采用上面@mdurant 提供的一些提示,并首先将任何缺少的键添加到每一行:
for index, row in df[:].iterrows():
for k in newkeys:
row['Form990PartVIISectionAGrp'].setdefault(k, np.nan)
在我的 Macbook 上花了 100 分钟。根据 mdurant 的评论,我将数据框保存为 JSON 格式:
df.to_json('df.json', orient='records', lines=True)
并将文件作为文本读入 Dask:
import json
import dask.bag as db
b = db.read_text('df.json').map(json.loads)
然后创建一个函数来展平列:
def flatten(record):
return
'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
'F9_07_PZ_COMP_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_RELATED'],
'F9_07_PC_TRUSTEE_INDIVIDUAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INDIVIDUAL'],
'F9_07_PZ_DIRTRSTKEY_NAME': record['Form990PartVIISectionAGrp']['F9_07_PZ_DIRTRSTKEY_NAME'],
'F9_07_PZ_COMP_DIRECT': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_DIRECT'],
'F9_07_PZ_COMP_OTHER': record['Form990PartVIISectionAGrp']['F9_07_PZ_COMP_OTHER'],
'BusinessName': record['Form990PartVIISectionAGrp']['BusinessName'],
'F9_07_PC_FORMER': record['Form990PartVIISectionAGrp']['F9_07_PC_FORMER'],
'F9_07_PC_HIGH_COMP_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_HIGH_COMP_EMPLOYEE'],
'F9_07_PC_KEY_EMPLOYEE': record['Form990PartVIISectionAGrp']['F9_07_PC_KEY_EMPLOYEE'],
'F9_07_PC_OFFICER': record['Form990PartVIISectionAGrp']['F9_07_PC_OFFICER'],
'F9_07_PC_TRUSTEE_INSTITUTIONAL': record['Form990PartVIISectionAGrp']['F9_07_PC_TRUSTEE_INSTITUTIONAL'],
'F9_07_PZ_AVE_HOURS_WEEK': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK'],
'F9_07_PZ_AVE_HOURS_WEEK_RELATED': record['Form990PartVIISectionAGrp']['F9_07_PZ_AVE_HOURS_WEEK_RELATED'],
'F9_07_PZ_TITLE': record['Form990PartVIISectionAGrp']['F9_07_PZ_TITLE'],
'NameBusiness': record['Form990PartVIISectionAGrp']['NameBusiness'],
'URL': record['URL'],
然后我可以应用该功能:
df = b.map(flatten).to_dataframe()
并将数据导出为 CSV:
df.to_csv('compensation*.csv')
这就像一个魅力!简而言之,根据上面 mdurant 的有用 cmets,关键是 1) 为所有观察添加缺失的键,以及 2) 不将数据从 PANDAS 读入 Dask(改用文本或 CSV)。处理好这两个问题可以很好地解决这个问题。
【讨论】:
以上是关于Dask 到展平字典列的主要内容,如果未能解决你的问题,请参考以下文章