Python多线程未获得所需的性能

Posted

技术标签:

【中文标题】Python多线程未获得所需的性能【英文标题】:Python multithreading not getting desired performance 【发布时间】:2021-08-11 00:01:47 【问题描述】:

我有一堆 pandas 数据帧,我想打印成任何格式(csv、json 等)——并希望根据读取的数据帧的顺序保留顺序。不幸的是,.to_csv() 可能需要一些时间,有时比​​读取数据帧要长 2 倍。

我们以图片为例:

在这里你可以看到线性运行任务,读取数据帧,打印出来,然后对剩余的数据帧重复。这可能比仅读取数据帧花费大约 3 倍的时间。从理论上讲,如果我们可以将打印(to_csv())推送到单独的线程(2 个线程,加上主线程读取),与线性(同步) 版本。当然,只有 3 次读取,它看起来只有一半的速度。但是你读取的数据帧越多,它就会越快(理论上)。

不幸的是,实际情况并非如此。我在性能上获得了非常小的收益。读取时间实际上需要更长的时间。这可能是因为 to_csv() 是 CPU 广泛的,并且使用了进程中的所有资源。而且由于它是多线程的,所以它们都共享相同的资源。因此收益不大。

所以我的问题是,如何改进代码以使性能更接近理论数字。我尝试使用多处理但未能获得工作代码。我怎样才能在多处理中拥有这个?还有其他方法可以提高此类任务的总执行时间吗?

这是我使用多线程的示例代码:

import pandas as pd
import datetime
import os
from threading import Thread
import queue
from io import StringIO
from line_profiler import LineProfiler


NUMS = 500
DEVNULL = open(os.devnull, 'w')

HEADERS = ",a,b,c,d,e,f,g\n"
SAMPLE_CSV = HEADERS + "\n".join([f"x,x,x,x,x,x,x,x" for x in range(4000)])


def linear_test():
    print("------Linear Test-------")
    main_start = datetime.datetime.now()
    total_read_time = datetime.timedelta(0)
    total_add_task = datetime.timedelta(0)
    total_to_csv_time = datetime.timedelta(0)
    total_to_print = datetime.timedelta(0)

    for x in range(NUMS):

        start = datetime.datetime.now()
        df = pd.read_csv(StringIO(SAMPLE_CSV), header=0, index_col=0)
        total_read_time += datetime.datetime.now() - start

        start = datetime.datetime.now()
        #
        total_add_task += datetime.datetime.now() - start

        start = datetime.datetime.now()
        data = df.to_csv()
        total_to_csv_time += datetime.datetime.now() - start

        start = datetime.datetime.now()
        print(data, file=DEVNULL)
        total_to_print += datetime.datetime.now() - start

    print("total_read_time: ".format(total_read_time))
    print("total_add_task: ".format(total_add_task))
    print("total_to_csv_time: ".format(total_to_csv_time))
    print("total_to_print: ".format(total_to_print))
    print("total: ".format(datetime.datetime.now() - main_start))


class Handler():
    def __init__(self, num_workers=1):
        self.num_workers = num_workers
        self.total_num_jobs = 0
        self.jobs_completed = 0
        self.answers_sent = 0
        self.jobs = queue.Queue()
        self.results = queue.Queue()
        self.start_workers()

    def add_task(self, task, *args, **kwargs):
        args = args or ()
        kwargs = kwargs or 
        self.total_num_jobs += 1
        self.jobs.put((task, args, kwargs))

    def start_workers(self):
        for i in range(self.num_workers):
            t = Thread(target=self.worker)
            t.daemon = True
            t.start()

    def worker(self):
        while True:
            item, args, kwargs = self.jobs.get()
            item(*args, **kwargs)
            self.jobs_completed += 1
            self.jobs.task_done()

    def get_answers(self):
        while self.answers_sent < self.total_num_jobs or self.jobs_completed == 0:
            yield self.results.get()
            self.answers_sent += 1
            self.results.task_done()


def task(task_num, df, q):
    ans = df.to_csv()
    q.put((task_num, ans))


def parallel_test():
    print("------Parallel Test-------")
    main_start = datetime.datetime.now()
    total_read_time = datetime.timedelta(0)
    total_add_task = datetime.timedelta(0)
    total_to_csv_time = datetime.timedelta(0)
    total_to_print = datetime.timedelta(0)
    h = Handler(num_workers=2)
    q = h.results
    answers = 
    curr_task = 1
    t = 1

    for x in range(NUMS):
        start = datetime.datetime.now()
        df = pd.read_csv(StringIO(SAMPLE_CSV), header=0, index_col=0)
        total_read_time += datetime.datetime.now() - start

        start = datetime.datetime.now()
        h.add_task(task, t, df, q)
        t += 1
        total_add_task += datetime.datetime.now() - start

        start = datetime.datetime.now()
        #data = df.to_csv()
        total_to_csv_time += datetime.datetime.now() - start

        start = datetime.datetime.now()
        #print(data, file=DEVNULL)
        total_to_print += datetime.datetime.now() - start

    print("total_read_time: ".format(total_read_time))
    print("total_add_task: ".format(total_add_task))
    print("total_to_csv_time: ".format(total_to_csv_time))
    print("total_to_print: ".format(total_to_print))

    for task_num, ans in h.get_answers():
        #print("got back: ".format(task_num, ans))
        answers[task_num] = ans
        if curr_task in answers:
            print(answers[curr_task], file=DEVNULL)
            del answers[curr_task]
            curr_task += 1

    # In case others are left out
    for k, v in answers.items():
        print(k)

    h.jobs.join()  # block until all tasks are done

    print("total: ".format(datetime.datetime.now() - main_start))

if __name__ == "__main__":
    # linear_test()
    # parallel_test()

    lp = LineProfiler()
    lp_wrapper = lp(linear_test)
    lp_wrapper()
    lp.print_stats()

    lp = LineProfiler()
    lp_wrapper = lp(parallel_test)
    lp_wrapper()
    lp.print_stats()

输出将在下面。您可以在线性测试中看到读取数据帧只用了 4.6 秒(占总执行时间的 42%)。但是在并行测试中读取数据帧需要 9.7 秒(占总执行时间的 93%):

------Linear Test-------
total_read_time: 0:00:04.672765
total_add_task: 0:00:00.001000
total_to_csv_time: 0:00:05.582663
total_to_print: 0:00:00.668319
total: 0:00:10.935723
Timer unit: 1e-07 s

Total time: 10.9309 s
File: ./test.py
Function: linear_test at line 33

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    33                                           def linear_test():
    34         1        225.0    225.0      0.0      print("------Linear Test-------")
    35         1         76.0     76.0      0.0      main_start = datetime.datetime.now()
    36         1         32.0     32.0      0.0      total_read_time = datetime.timedelta(0)
    37         1         11.0     11.0      0.0      total_add_task = datetime.timedelta(0)
    38         1          9.0      9.0      0.0      total_to_csv_time = datetime.timedelta(0)
    39         1          9.0      9.0      0.0      total_to_print = datetime.timedelta(0)
    40                                           
    41       501       3374.0      6.7      0.0      for x in range(NUMS):
    42                                           
    43       500       5806.0     11.6      0.0          start = datetime.datetime.now()
    44       500   46728029.0  93456.1     42.7          df = pd.read_csv(StringIO(SAMPLE_CSV), header=0, index_col=0)
    45       500      40199.0     80.4      0.0          total_read_time += datetime.datetime.now() - start
    46                                           
    47       500       6821.0     13.6      0.0          start = datetime.datetime.now()
    48                                                   #
    49       500       6916.0     13.8      0.0          total_add_task += datetime.datetime.now() - start
    50                                           
    51       500       5794.0     11.6      0.0          start = datetime.datetime.now()
    52       500   55843605.0 111687.2     51.1          data = df.to_csv()
    53       500      53640.0    107.3      0.0          total_to_csv_time += datetime.datetime.now() - start
    54                                           
    55       500       6798.0     13.6      0.0          start = datetime.datetime.now()
    56       500    6589129.0  13178.3      6.0          print(data, file=DEVNULL)
    57       500      18258.0     36.5      0.0          total_to_print += datetime.datetime.now() - start
    58                                           
    59         1        221.0    221.0      0.0      print("total_read_time: ".format(total_read_time))
    60         1         95.0     95.0      0.0      print("total_add_task: ".format(total_add_task))
    61         1         87.0     87.0      0.0      print("total_to_csv_time: ".format(total_to_csv_time))
    62         1         85.0     85.0      0.0      print("total_to_print: ".format(total_to_print))
    63         1        112.0    112.0      0.0      print("total: ".format(datetime.datetime.now() - main_start))

------Parallel Test-------
total_read_time: 0:00:09.779954
total_add_task: 0:00:00.016984
total_to_csv_time: 0:00:00.003000
total_to_print: 0:00:00.001001
total: 0:00:10.488563
Timer unit: 1e-07 s

Total time: 10.4803 s
File: ./test.py
Function: parallel_test at line 106

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   106                                           def parallel_test():
   107         1        100.0    100.0      0.0      print("------Parallel Test-------")
   108         1         33.0     33.0      0.0      main_start = datetime.datetime.now()
   109         1         24.0     24.0      0.0      total_read_time = datetime.timedelta(0)
   110         1         10.0     10.0      0.0      total_add_task = datetime.timedelta(0)
   111         1         10.0     10.0      0.0      total_to_csv_time = datetime.timedelta(0)
   112         1         10.0     10.0      0.0      total_to_print = datetime.timedelta(0)
   113         1      13550.0  13550.0      0.0      h = Handler(num_workers=2)
   114         1         15.0     15.0      0.0      q = h.results
   115         1          9.0      9.0      0.0      answers = 
   116         1          7.0      7.0      0.0      curr_task = 1
   117         1          7.0      7.0      0.0      t = 1
   118                                           
   119       501       5017.0     10.0      0.0      for x in range(NUMS):
   120       500       6545.0     13.1      0.0          start = datetime.datetime.now()
   121       500   97761876.0 195523.8     93.3          df = pd.read_csv(StringIO(SAMPLE_CSV), header=0, index_col=0)
   122       500      45702.0     91.4      0.0          total_read_time += datetime.datetime.now() - start
   123                                           
   124       500       8259.0     16.5      0.0          start = datetime.datetime.now()
   125       500     167269.0    334.5      0.2          h.add_task(task, t, df, q)
   126       500       5009.0     10.0      0.0          t += 1
   127       500      11865.0     23.7      0.0          total_add_task += datetime.datetime.now() - start
   128                                           
   129       500       6949.0     13.9      0.0          start = datetime.datetime.now()
   130                                                   #data = df.to_csv()
   131       500       7921.0     15.8      0.0          total_to_csv_time += datetime.datetime.now() - start
   132                                           
   133       500       6498.0     13.0      0.0          start = datetime.datetime.now()
   134                                                   #print(data, file=DEVNULL)
   135       500       8084.0     16.2      0.0          total_to_print += datetime.datetime.now() - start
   136                                           
   137         1       3321.0   3321.0      0.0      print("total_read_time: ".format(total_read_time))
   138         1       4669.0   4669.0      0.0      print("total_add_task: ".format(total_add_task))
   139         1       1995.0   1995.0      0.0      print("total_to_csv_time: ".format(total_to_csv_time))
   140         1     113037.0 113037.0      0.1      print("total_to_print: ".format(total_to_print))
   141                                           
   142       501     176106.0    351.5      0.2      for task_num, ans in h.get_answers():
   143                                                   #print("got back: ".format(task_num, ans))
   144       500       5169.0     10.3      0.0          answers[task_num] = ans
   145       500       4160.0      8.3      0.0          if curr_task in answers:
   146       500    6429159.0  12858.3      6.1              print(answers[curr_task], file=DEVNULL)
   147       500       5646.0     11.3      0.0              del answers[curr_task]
   148       500       4144.0      8.3      0.0              curr_task += 1
   149                                           
   150                                               # In case others are left out
   151         1         24.0     24.0      0.0      for k, v in answers.items():
   152                                                   print(k)
   153                                           
   154         1         61.0     61.0      0.0      h.jobs.join()  # block until all tasks are done
   155                                           
   156         1        328.0    328.0      0.0      print("total: ".format(datetime.datetime.now() - main_start))

【问题讨论】:

docs.python.org/3/library/threading.html 不会有任何好处。阅读 CPython 实现细节 段落。 您能否通过将multithreading 替换为multiprocessing 来发布另一个指标?如果你因为 Gil 而使用某种 i/o 请求,你只会看到在 python 中使用多线程的好处。 正如我所提到的,我无法让它与多处理一起工作,所以我真的没有任何指标 【参考方案1】:

与其削减自己的解决方案,不如查看 Dask - 特别是 Dask 的分布式数据帧,如果你想将多个 CSV 文件读入 1 个“虚拟”大数据帧延迟运行函数,如根据您的示例,跨多个内核并行。如果向下滚动,请在此处查看简单示例:https://docs.dask.org/en/latest/

您的另一个轻量级选择是使用 Joblib 的 Parallel 接口,这看起来与 Delayed 完全一样,但功能要少得多。如果我想要一个轻量级的解决方案,我倾向于选择 Joblib,如果需要更多,我会升级到 Dask:https://joblib.readthedocs.io/en/latest/parallel.html

对于这两种工具,如果您沿着延迟路线走 - 编写一个在 for 串行循环中工作的函数(您已经有了这个),然后将其包装在相应的延迟语法中并且“它应该​​可以工作”。在这两种情况下,默认情况下它都会使用您机器上的所有内核。

【讨论】:

以上是关于Python多线程未获得所需的性能的主要内容,如果未能解决你的问题,请参考以下文章

使用python多线程进行简单的性能测试

Python 多进程多线编程模板

Python 多线程效率不高吗

java 多线程子线程唤醒主线程问题

Python 代码未显示所需的输出但继续运行

Python:兼容性所需的未使用参数。如何避免皮林特抱怨它