如何将数据读取到 dask 数据帧并删除坏行

Posted

技术标签:

【中文标题】如何将数据读取到 dask 数据帧并删除坏行【英文标题】:How to read data to dask dataframe and remove bad lines 【发布时间】:2020-02-21 15:30:12 【问题描述】:

我正在尝试使用 dask 聚合一个包含多行不良数据的大型 (66gb) 数据库。

由于 dask 没有删除坏线功能,我首先将所有数据作为 pandas 数据框读取并删除坏线。然后我将其转换为 dask 数据框。我的代码如下:

将 dask.dataframe 导入为 dd 将熊猫导入为 pd 从 dask.distributed 导入客户端

#Groups the average Thresholds by NEATGeneration and finds the mean, standard deviation, minimum and maximum of the data
def group(df):
    res = df.groupby(df["NEATGeneration"]).agg('averageThreshold': ['mean', 'std','max','min']).compute()
    return res


if __name__ == '__main__':

   Client(n_workers=4, threads_per_worker=6,memory_limit='120GB')

   #Loads in the data as a pandas datframe inlcuding bad lines
   df = dd.read_csv("agentsvfitness.txt",error_bad_lines=False,usecols=["NEATGeneration","averageThreshold"])

   #Replaces elements in  the averageThreshold column that are not numeric with NA
   pd.to_numeric(df['averageThreshold'] , errors ='coerce') 

   #Removes rows with NA
   df = df.dropna()

   #runs the group() function in parallel
   df = group(df)

   #Sets all column names and prepares data for writing to csv
   df.columns = ['mean', 'std','max','min']

   #Writes aggregated data to a single csv file
   df.to_csv("averageThreshold.csv")

我遇到的问题是数据被错误地记录在以下方式(粗体):

NEATGeneration,averageFitness,averageResourcesConsumed,averageThreshold

0,8.32,0.8533333333333334,0.48199999999999999

0,8.486666666666666,1.7266666666666666.47333333333333333 #lacking " ,0 "

0,8.0533333333333331.8466666666666667,0.4500000000000001 #缺少“,”

0,8.306666666666667,1.9466666666666668,0.44933131583851454

在将数据读入 dask 数据帧时,我当前的方法无法删除这些行。有没有办法从已经存在的数据框中删除这些坏行?否则有没有办法只读取“好”数据(具有正确数量数据点的数据)?我在一个有 24 个 CPU 和 120GB 内存的集群上运行。

【问题讨论】:

不知道我是否理解,为什么你不能阅读 csv 到 dask? df = dd.read_csv("agentsvfitness.txt", error_bad_lines=False) 将 err_bad_lines 设置为 False 将删除不正确的行 @effy 好的,我使用了dd.read_csv("agentsvfitness.txt", error_bad_lines=False) 并稍微编辑了我的代码以反映这一点。即使这样,仍然会将行读入数据帧中的元素太少,这会导致 groupby 崩溃。 【参考方案1】:

我的猜测是read_csv 关键字的某种组合可以解决您的问题,但我对它们不是很熟悉,所以我将建议另一种方法。

您可以使用 Dask Bag 将文本行读取为文本而不是 Pandas Dataframes。然后,您可以使用 Python 函数过滤掉坏行(也许通过计算逗号或其他东西的数量),然后您可以将其写回文本文件,然后使用 Dask Dataframe 重新读取,因为数据多一点清理干净。可能还有一些不错的方法可以将 Dask Bag 转换为 Dask Dataframe,而无需将中间文件写入磁盘,但这可能稍微复杂一些。

【讨论】:

以上是关于如何将数据读取到 dask 数据帧并删除坏行的主要内容,如果未能解决你的问题,请参考以下文章

如何将单个镶木地板文件从 s3 读入 dask 数据帧?

是否可以将巨大的 dask 数据框保存到镶木地板中?

Dask 数据帧大于内存

Dask 从二进制文件中读取数据

如何使用 dask/fastparquet 从多个目录中读取多个 parquet 文件(具有相同架构)

将列表写入 pandas 数据帧到 csv,从 csv 读取数据帧并再次转换为列表而无需字符串