Dask连接的简单方法(水平,轴= 1,列)

Posted

技术标签:

【中文标题】Dask连接的简单方法(水平,轴= 1,列)【英文标题】:Simple way to Dask concatenate (horizontal, axis=1, columns) 【发布时间】:2018-04-05 07:00:30 【问题描述】:

动作 将两个 csv(data.csv 和 label.csv)读取到单个数据帧中。

df = dd.read_csv(data_files, delimiter=' ', header=None, names=['x', 'y', 'z', 'intensity', 'r', 'g', 'b'])
df_label = dd.read_csv(label_files, delimiter=' ', header=None, names=['label'])

问题 列的连接需要已知的划分。但是设置索引会对数据进行排序,这是我明确不想要的,因为两个文件的顺序是它们的匹配项。

df = dd.concat([df, df_label], axis=1)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-11-e6c2e1bdde55> in <module>()
----> 1 df = dd.concat([df, df_label], axis=1)

/uhome/hemmest/.local/lib/python3.5/site-packages/dask/dataframe/multi.py in concat(dfs, axis, join, interleave_partitions)
    573             return concat_unindexed_dataframes(dfs)
    574         else:
--> 575             raise ValueError('Unable to concatenate DataFrame with unknown '
    576                              'division specifying axis=1')
    577     else:

ValueError: Unable to concatenate DataFrame with unknown division specifying axis=1

试过 添加'id'

df['id'] = pd.Series(range(len(df)))

但是,Dataframe 的长度会导致 Series 大于内存。

问题 显然 Dask 知道两个 Dataframe 的长度相同:

In [15]:
df.index.compute()
Out[15]:
Int64Index([      0,       1,       2,       3,       4,       5,       6,
                  7,       8,       9,
            ...
            1120910, 1120911, 1120912, 1120913, 1120914, 1120915, 1120916,
            1120917, 1120918, 1120919],
           dtype='int64', length=280994776)
In [16]:
df_label.index.compute()
Out[16]:
Int64Index([1, 5, 5, 2, 2, 2, 2, 2, 2, 2,
            ...
            3, 3, 3, 3, 3, 3, 3, 3, 3, 3],
           dtype='int64', length=280994776)

如何利用这些知识进行简单的连接?

【问题讨论】:

添加了连接语句以获得完整的概述 尝试将interleave_partitions=True 添加到您的dd.concat() 添加interleave_partitions=Trueaxis=0 有效,在这种情况下,当它垂直连接时,会导致数据帧的长度增加一倍。但是,axis=1 并不能解决问题。 dask.__version__ 显示什么? 当前运行0.15.4 【参考方案1】:

解决方案(来自@Primer 的 cmets):

重新分区和重置索引 使用分配而不是连接

最终代码;

import os
from pathlib import Path
import dask.dataframe as dd
import numpy as np
import pandas as pd



df = dd.read_csv(['data/untermaederbrunnen_station1_xyz_intensity_rgb.txt'], delimiter=' ', header=None, names=['x', 'y', 'z', 'intensity', 'r', 'g', 'b'])
df_label = dd.read_csv(['data/untermaederbrunnen_station1_xyz_intensity_rgb.labels'], header=None, names=['label'])
# len(df), len(df_label), df_label.label.isnull().sum().compute()

df = df.repartition(npartitions=200)
df = df.reset_index(drop=True)
df_label = df_label.repartition(npartitions=200)
df_label = df_label.reset_index(drop=True)

df = df.assign(label = df_label.label)
df.head()

【讨论】:

跟进上面@AsifAli 的评论,如果连接的数据框有很多列,我真的需要在assign 中通过其名称显式指定每一列吗?目前dask.concat 在连接两个具有未知分区的数据帧时会发出警告(不是错误)。如果我们确定两个 df 的长度相同,那么可以忽略此警告吗?【参考方案2】:

我遇到了同样的问题并通过确保两个数据帧具有相同数量的分区来解决它(因为我们已经知道两者具有相同的长度):

df = df.repartition(npartitions=200)
df_label = df_label.repartition(npartitions=200)
df = dd.concat([df, df_label], axis=1)

【讨论】:

感谢您的建议,但 Dask 只是返回 ValueError: Concatenated DataFrames of different lengths【参考方案3】:

我有类似的问题,解决方案只是计算每个 dask 数组的块大小,我将使用 .compute_chunk_sizes() 放入数据帧中。之后,将它们连接到 axis=1 上的数据框就没有问题了。

【讨论】:

欢迎来到***。回答问题时,请努力解释您的解决方案如何解决问题。例如。简单地计算块大小如何帮助连接?在你的回答中解释这一点。

以上是关于Dask连接的简单方法(水平,轴= 1,列)的主要内容,如果未能解决你的问题,请参考以下文章

dask 如何处理大于内存的数据集

pandas 笔记

Dask 仪表板配置文件选项卡使用情况(又名火焰图)

python dask DataFrame,支持(可简单并行化)行吗?

Dask 到展平字典列

1简单控制