什么是改变 Dask Dataframe 的更干净的方法?

Posted

技术标签:

【中文标题】什么是改变 Dask Dataframe 的更干净的方法?【英文标题】:Whats a cleaner way to mutate a Dask Dataframe? 【发布时间】:2018-09-27 14:02:21 【问题描述】:

我必须开始处理一些比内存更大的数据集,这意味着我需要快速熟悉 Dask。到目前为止它还不错,但我刚刚遇到了一个我认为我已经解决了但它并不漂亮的问题,我想看看是否有更好的方法来解决它。

问题: 我将时间序列数据存储在 DataFrame 中。每个列(向量)都需要应用一个函数。该函数返回 3 个附加向量,我想将它们附加到原始 DataFrame 中。

代码: 以下代码的第一部分是我在普通 Pandas 中的解决方案。后半部分是我在 Dask 中所做的。

import numpy as np
import pandas as pd
import os
import dask
import datetime
from dask import delayed
from dask import visualize
import pandas as pd
import dask.dataframe as dd

#### Helper functions

def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://***.com/a/22640362/6029703
    '''
    import numpy as np

    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
            filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])


    return [labels, avg_filter, std_filter]


def make_example_data():
    # Make example data
    y = np.array(
        [1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
         1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
         2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1])
    # simulate data stored in individual files
    df = pd.DataFrame(
        
            "Time": np.arange(len(y)),
            "y1": y,
            "y2": y * 2,
            "y3": y ** 2,
            "yn": y ** (y)
        
    )

    bigdf = pd.DataFrame()
    for i in range(10):
        _df = df
        # create my partitioning column
        _df["session"] = "S0" + str(i)
        bigdf = pd.concat([bigdf, _df], axis=0)
    # return a normal dataframe that looks similar to a dask dataframe
    return bigdf

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0


############# Normal Pandas Solution ########################

bigdf = make_example_data()
results_df = pd.DataFrame()
columns = list(bigdf.columns)
columns.remove("Time")
columns.remove("session")
for col in columns:
    res1 = bigdf.groupby("session")[col].apply(peak_detection_smoothed_zscore_v2, lag, threshold, influence)
    res1 = pd.concat([pd.DataFrame(a).T for a in res1])
    res1.columns = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
    results_df = pd.concat([results_df, res1], axis=1)

pd_results = pd.concat([bigdf, results_df], axis=1)

############### Dask Solution ############################
bigdf = make_example_data()
ddf = dd.from_pandas(bigdf, npartitions=10)


columns = list(ddf.columns)
# remove columns that don't have the function applied to them
columns.remove("Time")
columns.remove("session")

# get all the different sessions
sessions = ddf.groupby("session").count().compute().index.tolist()

# column names that get returned by my function
returns = ["_Signal", "_meanFilter", "_stdFilter"]

# list to hold example series for meta data
rcols = []
for col in columns:
    for r in returns:
        s = pd.Series([])
        s.name = col + r
        rcols.append(s)

results = pd.DataFrame(rcols).T
results = dd.from_pandas(results, npartitions=len(sessions))

for session in sessions:
    sess_df = ddf[ddf["session"] == session].compute()
    # making a dask df to store the results in
    sess_results = dd.from_pandas(sess_df, npartitions=1)

    for col in columns:
        # returns a list of 3 lists
        res = peak_detection_smoothed_zscore_v2(sess_df[col], lag, threshold, influence)
        # turn 3 lists into a dataframe of 3 columns
        res = pd.concat([pd.DataFrame(a).T for a in res]).T
        _cols = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
        res.columns = _cols
        # do this iteratively cause I can't figure out how to do it in a single line
        for cc in _cols:
            sess_results[cc] = res[cc]
        # NOTE: If memory is a problem could probably throw this to disk here

    # append session results to main results
    results = results.append(sess_results)

dd_results = results.compute()

print("Are my Dask results the same as my Pandas results?", dd_results.shape == pd_results.shape)

问题:

我正在寻找更好的解决方案。如您所见,Dask 代码更长且有点复杂。有什么办法让它不那么凌乱吗?也许取消forloops?

我预见到的另一个问题是,如果我有一个小到足以放入内存的 Dask 分区会怎样。当我再创建 3 个长度相等的向量时会发生什么?我的系统会死机吗?

如果没有真正的清理方法。我是否至少尽可能高效地做事?

谢谢

【问题讨论】:

【参考方案1】:

在解决这个问题将近一周后,我想我有一个解决方案。它不像我想要的那样简洁,但它确实可以防止一次将过多的内容加载到内存中。我不是 100% 清楚我是否将其扩展并仅从我的笔记本电脑上关闭,它是否会将任务分配给其他工作节点。

我最终做的是将数据从羽毛文件移动到 bcolz ctables。这让我可以在没有 Dask 引入的麻烦的情况下改变数据帧/ctables。而且我很确定我不必担心我的计算机内存不足。

import bcolz
import numpy as np
import pandas as pd
import os
import dask
import datetime
from dask import delayed
from dask import visualize
import pandas as pd
import dask.dataframe as dd
from copy import copy


def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence, lst=True):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://***.com/a/22640362/6029703
    '''
    import numpy as np

    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
                filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])

    return [labels, avg_filter, std_filter]


def make_example_data():
    # Make example data
    y = np.array(
        [1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
         1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
         2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1])
    # simulate data stored in individual files
    df = pd.DataFrame(
        
            "Time": np.arange(len(y)),
            "y1": y,
            "y2": y * 2,
            "y3": y ** 2,
            "yn": y ** (y)
        
    )

    bigdf = pd.DataFrame()
    for i in range(10):
        _df = df
        # create my partitioning column
        _df["session"] = "S0" + str(i)
        bigdf = pd.concat([bigdf, _df], axis=0)
    # return a normal dataframe that looks similar to a dask dataframe
    return bigdf

def ctable_append(cts):
    """
    A function to append multiple ctables and clean up the disk entries along the 0 axis
    similar to pd.concat([df1, df2], axis=0)


    :param cts: a string containing the root directory path or a list of ctables
    :return: ctable
    """
    import shutil

    ctables = []
    first = True

    # check if we are getting a list or a root dir
    if type(cts) == str:
        cts = bcolz.walk(cts)

    for ct in cts:
        if first is True:
            ct1 = ct
        else:
            ct1.append(ct)
            shutil.rmtree(ct.rootdir)
        first = False

    return ct1

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0

bigdf = make_example_data()
results_df = pd.DataFrame()
columns = list(bigdf.columns)
columns.remove("Time")
columns.remove("session")
for col in columns:
    res1 = bigdf.groupby("session")[col].apply(peak_detection_smoothed_zscore_v2, lag, threshold, influence)
    res1 = pd.concat([pd.DataFrame(a).T for a in res1])
    res1.columns = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
    results_df = pd.concat([results_df, res1], axis=1)

pd_results = pd.concat([bigdf, results_df], axis=1)

bigdf = make_example_data()
sessions = list(set(bigdf['session']))
root_dir = os.path.join(os.getcwd(), 'example_data')

# breaking this example dataset out into something a little more like my real dataset
for session in sessions:
    sdf = bigdf[bigdf['session'] == session]
    sess_dir = os.path.join(root_dir, session)
    bcolz.ctable.fromdataframe(sdf, rootdir=sess_dir)

dnapply_cols = [
    'session',
    'Time'
]  # columns that are not signals to find peaks in

lazy_apply = []
# apply my function to all the data.. making the extra columns
# don't think that Dask is really needed here as I'm not sure if it actually distributes the tasks
# when I ran this on a lot more data I only had one maybe two cores doing anything.
# this could have been because of the cost of memory but my ram didn't really go beyond being
# half used.
for ct in bcolz.walk(root_dir):
    for column in ct.cols.names:
        if column not in dnapply_cols:
            #             signal, mean_filter, std_filter = delayed(peak_detection_smoothed_zscore_v2)(ct[column], lag, threshold, influence)
            res = delayed(peak_detection_smoothed_zscore_v2)(ct[column], lag, threshold, influence)
            lazy_apply.append(delayed(ct.addcol)(res[0], name=column + "_Signal"))
            lazy_apply.append(delayed(ct.addcol)(res[1], name=column + "_meanFilter"))
            lazy_apply.append(delayed(ct.addcol)(res[2], name=column + "_stdFilter"))

dask.compute(*lazy_apply)

# combine all ctables into a single ctable

ct1 = ctable_append(root_dir)
dd_results = dd.from_bcolz(ct1, chunksize=74)  # chose a chunk size of 74 cause thats about how long each session df was
print(dd_results.head(), dd_results.compute().shape, pd_results.shape)
print("Are my Dask results the same as my Pandas results?", dd_results.compute().shape == pd_results.shape)

【讨论】:

我不知道 Dash,但是峰值检测算法只需要最后一个 lag 元素就可以正常工作。我的原始算法每次调用时都会遍历所有数据点,这是非常低效的(我也警告不要这样做)。因此,理想情况下,您希望使用移动窗口循环数据,仅在内存中加载 lag 元素,更新算法,从内存中删除数据并继续到下一个窗口(+1 新观察)。这应该是最节省内存的方式。

以上是关于什么是改变 Dask Dataframe 的更干净的方法?的主要内容,如果未能解决你的问题,请参考以下文章

将Dask包的Pandas DataFrame转换为单个Dask DataFrame

如何将 Dask.DataFrame 转换为 pd.DataFrame?

dask dataframe.persist() 是不是保留下一个查询的结果?

懒惰地从 PostgreSQL / Cassandra 创建 Dask DataFrame

Dask 如何旋转 DataFrame

使用 dask.dataframe 从 CSV 文件中按分区读取尾部