并行迭代 pandas df

Posted

技术标签:

【中文标题】并行迭代 pandas df【英文标题】:iteration over a pandas df in parallel 【发布时间】:2019-05-02 20:39:17 【问题描述】:

所以,我想 所以假设我有 15 行然后我想并行迭代它而不是一个一个地迭代。

df:-

df = pd.DataFrame.from_records([
    'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' ,
    'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' ,
    'domain':'blhp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' ,
    'domain':'rbswp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' ,
    'domain':'foxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' ,
    'domain':'rbsxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' ,
    'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' ,
    'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' 
   
])

所以,我正在遍历 df 并制作命令行,然后将输出存储在 df 中并进行数据过滤,最后将其存储到 influxdb 中。问题是我在迭代它时一个一个地做。我想并行迭代所有行。

到目前为止,我已经制作了 20 个脚本,并使用多处理并行处理所有脚本。当我必须在所有 20 个脚本中进行更改时,这很痛苦。我的脚本如下所示:-

for index, row in dff.iterrows():
    domain = row['domain']
    duration = str(row['duration'])
    media_file = row['media_file']
    user = row['user']
    channel = row['channel']
    cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' + 
    duration + ' -f ' + media_file + ' -u .' + user + '. -c 
    sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps- 
    test'

    rows = [shlex.split(line) for line in os.popen(
    cmda).read().splitlines() if line.strip()]

    df = pd.DataFrame(rows)
    """
    Bunch of data filteration and pushing it into influx 
    """

到目前为止,如果我在 df 中使用 15 行并进行如下并行处理,我将拥有 15 个脚本:-

import os
import time
from multiprocessing import Process
os.chdir('/Users/akumar/vivox-sdk-4.9.0002.30719.ebb523a9')
def run_program(cmd):
    # Function that processes will run
    os.system(cmd)

# Creating command to run
commands = ['python testv.py']
commands.extend(['python testv.py'.format(i) for i in range(1, 15)])

# Amount of times your programs will run
runs = 1

for run in range(runs):
    # Initiating Processes with desired arguments
    running_programs = []
    for command in commands:
        running_programs.append(Process(target=run_program, args=(command,)))
        running_programs[-1].daemon = True

    # Start our processes simultaneously
    for program in running_programs:
        program.start()

    # Wait untill all programs are done
    while any(program.is_alive() for program in running_programs):
        time.sleep(1)

问题:- 我如何遍历 df 并使所有 15 行并行运行并在 for 循环中执行所有操作。

【问题讨论】:

【参考方案1】:

我将从 Reddit 复制并粘贴我的答案到这里(以防有人偶然发现类似情况):

import dask.dataframe as ddf

def your_function(row):
    domain = row['domain']
    duration = str(row['duration'])
    media_file = row['media_file']
    user = row['user']
    channel = row['channel']
    cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' + 
    duration + ' -f ' + media_file + ' -u .' + user + '. -c 
        sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps- test'

    rows = [shlex.split(line) for line in os.popen(
            cmda).read().splitlines() if line.strip()]

df_dask = ddf.from_pandas(df, npartitions=4)   # where the number of partitions is the number of cores you want to use
df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing')

您可能不得不在apply 方法中使用axis 参数。

【讨论】:

更新:对 Pandas 数据帧进行多处理的另一种好方法是 modin 项目:github.com/modin-project/modin 在最后一行中,lambda x: your_function(x) 可以简化为 your_function【参考方案2】:

使用线程并使用参数调用线程函数,而不是启动 15 个进程。 threading.Thread(target=func, args=(i,)) 其中 i 是您的号码,func 是包装整个代码的函数。然后遍历它。您不需要在 15 个项目上并行化迭代。

【讨论】:

以上是关于并行迭代 pandas df的主要内容,如果未能解决你的问题,请参考以下文章

使用多索引迭代地连接 pandas 数据帧

与迭代两个大型 Pandas 数据框相比,效率更高

带有迭代的 Pandas DataFrame 切片

pandas.groupby中的迭代

使用多个 excel 表加快 pandas 迭代

Pandas FutureWarning:字符的列迭代将在未来版本中被弃用