os.sched_getaffinity(0) 与 os.cpu_count()

Posted

技术标签:

【中文标题】os.sched_getaffinity(0) 与 os.cpu_count()【英文标题】:os.sched_getaffinity(0) vs os.cpu_count() 【发布时间】:2021-01-19 04:06:45 【问题描述】:

所以,我知道标题中两种方法的区别,但不知道实际含义。

据我了解:如果您使用的 NUM_WORKERS 多于实际可用的内核数,您将面临性能大幅下降,因为您的操作系统会不断地来回切换以保持并行运行。不知道这是多么真实,但我在这里从比我聪明的人那里读到它。

os.cpu_count() 的文档中说:

返回系统中的 CPU 数量。如果未确定,则返回 None。这个数字不等于 CPU 的数量 当前进程可以使用。可以获得可用CPU的数量 与 len(os.sched_getaffinity(0))

所以,如果一个进程可用的 CPU 比“系统”中的 CPU 多,我正在尝试弄清楚“系统”指的是什么。

我只想安全有效地实现multiprocessing.pool 功能。所以这里是我总结的问题:

有哪些实际意义:

NUM_WORKERS = os.cpu_count() - 1
# vs.
NUM_WORKERS = len(os.sched_getaffinity(0)) - 1

-1 是因为我发现如果我在处理数据的同时尝试工作,我的系统会减少很多延迟。

【问题讨论】:

【参考方案1】:

这两个函数非常不同,NUM_WORKERS = os.sched_getaffinity(0) - 1 会立即失败并显示TypeError,因为您尝试从集合中减去一个整数。 os.cpu_count() 告诉您系统有多少个内核,os.sched_getaffinity(pid) 告诉您某个线程/进程允许在哪些内核上运行。


os.cpu_count()

os.cpu_count() 显示操作系统已知的可用内核数量(虚拟 内核)。很可能您有一半的 物理 核心。如果使用比物理内核更多的进程甚至比虚拟内核更多的进程是有意义的,那么很大程度上取决于你在做什么。计算循环越紧密(指令的多样性很小,缓存未命中的次数很少,...),您就越有可能无法从更多使用的内核中受益(通过使用更多的工作进程),甚至会遇到性能下降。

显然,它还取决于您的系统正在运行的其他内容,因为您的系统会尝试为系统中的每个线程(作为进程的实际执行单元)分配可用内核上的运行时间公平份额。因此,就您应该使用多少工人而言,不可能一概而论。但是,例如,如果您有一个紧密的循环并且您的系统处于空闲状态,那么优化的一个很好的起点是

os.cpu_count() // 2 # same as mp.cpu_count() // 2 

...并从那里增加。

@Frank Yellin 已经提到,multiprocessing.Pool 使用 os.cpu_count() 作为默认的工人数量。

os.sched_getaffinity(pid)

os.sched_getaffinity(pid)

返回具有 PID pid 的进程的 CPU 集(或当前 如果为零则处理)被限制为。

现在 core/cpu/processor/-affinity 是关于您的线程(在您的工作进程内)允许在哪些具体(虚拟)核心上运行。您的操作系统为每个核心提供一个 id,从 0 到 (number-of-cores - 1) 并且更改关联性允许限制(“固定”)某个线程允许在哪些实际核心上运行。

至少在 Linux 上,我发现这意味着如果当前没有可用的允许内核,则子进程的线程将不会运行,即使其他不允许的内核空闲也是如此。所以“亲和力”在这里有点误导。

调整关联性的目标是最大程度地减少上下文切换和核心迁移造成的缓存失效。您的操作系统通常具有更好的洞察力,并且已经尝试通过其调度策略保持缓存“热”,因此除非您知道自己在做什么,否则您不能指望从干扰中轻松获得收益。

默认情况下,关联性设置为所有核心,对于 multiprocessing.Pool,至少在您的系统处于空闲状态时,更改它并没有太大意义。

请注意,尽管此处的文档提到了“进程”,但设置亲和力确实是每个线程的事情。因此,例如,在“子”线程中为“当前进程如果为零”设置亲和性,不会改变主线程或进程内其他线程的亲和性。 但是,子线程从主线程继承它们的亲和性,而子进程(通过它们的主线程)从父进程的主线程继承亲和性。这会影响所有可能的启动方法(“spawn”、“fork”、“forkserver”)。下面的示例演示了这一点以及如何使用multiprocessing.Pool 修改亲和力。

import multiprocessing as mp
import threading
import os


def _location():
    return f"mp.current_process().name threading.current_thread().name"


def thread_foo():
    print(f"_location(), affinity before change: os.sched_getaffinity(0)")
    os.sched_setaffinity(0, 4)
    print(f"_location(), affinity after change: os.sched_getaffinity(0)")


def foo(_, iterations=200e6):

    print(f"_location(), affinity before thread_foo:"
          f" os.sched_getaffinity(0)")

    for _ in range(int(iterations)):  # some dummy computation
        pass

    t = threading.Thread(target=thread_foo)
    t.start()
    t.join()

    print(f"_location(), affinity before exit is unchanged: "
          f"os.sched_getaffinity(0)")

    return _


if __name__ == '__main__':

    mp.set_start_method("spawn")  # alternatives on Unix: "fork", "forkserver"

    # for current process, exclude cores 0,1 from affinity-mask
    print(f"parent affinity before change: os.sched_getaffinity(0)")
    excluded_cores = 0, 1
    os.sched_setaffinity(0, os.sched_getaffinity(0).difference(excluded_cores))
    print(f"parent affinity after change: os.sched_getaffinity(0)")

    with mp.Pool(2) as pool:
        pool.map(foo, range(5))

输出:

parent affinity before change: 0, 1, 2, 3, 4, 5, 6, 7
parent affinity after change: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-1 MainThread, affinity before thread_foo: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-2 MainThread, affinity before thread_foo: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-1 Thread-1, affinity before change: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-1 Thread-1, affinity after change: 4
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-1 MainThread, affinity before thread_foo: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-2 Thread-1, affinity before change: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-2 Thread-1, affinity after change: 4
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-2 MainThread, affinity before thread_foo: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-2 Thread-2, affinity before change: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-2 Thread-2, affinity after change: 4
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-2 MainThread, affinity before thread_foo: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-1 Thread-2, affinity before change: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-1 Thread-2, affinity after change: 4
SpawnPoolWorker-1 MainThread, affinity before exit is unchanged: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-2 Thread-3, affinity before change: 2, 3, 4, 5, 6, 7
SpawnPoolWorker-2 Thread-3, affinity after change: 4
SpawnPoolWorker-2 MainThread, affinity before exit is unchanged: 2, 3, 4, 5, 6, 7

【讨论】:

【参考方案2】:

如果您有一个纯 100% CPU 限制的任务,即除了计算什么都不做,那么很明显,如果进程池大小大于您计算机上可用的 CPU 数量,则不会/不可能获得任何东西。但是如果有一个混合的 I/O 被抛出,一个进程会放弃 CPU 等待 I/O 完成(或者,例如,一个从网站返回的 URL,它需要一个相对 长时间)?对我来说,不清楚在这种情况下,如果进程池大小超过 os.cpu_count(),您是否无法提高吞吐量。

更新

这是演示这一点的代码。这段代码可能最好通过使用线程来处理,它正在使用进程。我的桌面上有 8 个内核。该程序简单地同时检索 54 个 URL(或在这种情况下并行)。程序传递了一个参数,即要使用的池的大小。不幸的是,仅仅创建额外的进程就会产生初始开销,因此如果创建太多进程,节省的成本就会开始下降。但是如果任务运行时间很长并且有很多 I/O,那么创建进程的开销最终是值得的:

from concurrent.futures import ProcessPoolExecutor, as_completed
import requests
from timing import time_it

def get_url(url):
    resp = requests.get(url, headers='user-agent': 'my-app/0.0.1')
    return resp.text


@time_it
def main(poolsize):
    urls = [
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
        'https://ibm.com',
        'https://microsoft.com',
        'https://google.com',
    ]
    with ProcessPoolExecutor(poolsize) as executor:
        futures = executor.submit(get_url, url): url for url in urls
        for future in as_completed(futures):
            text = future.result()
            url = futures[future]
            print(url, text[0:80])
            print('-' * 100)

if __name__ == '__main__':
    import sys
    main(int(sys.argv[1]))

8 个进程:(我拥有的核心数):

func: main args: [(8,), ] took: 2.316840410232544 sec.

16 个进程:

func: main args: [(16,), ] took: 1.7964842319488525 sec.

24 个进程:

func: main args: [(24,), ] took: 2.2560818195343018 sec.

【讨论】:

FWIW,我在这个答案here 中有代码,它证明了你的观点。 性能提升是不是因为“虚拟”内核? @rocksNwaves 我有 4 个真实内核 + 4 个虚拟内核 = 8 (== os.cpu_count())。性能的提高是由于正在创建的进程在等待返回 URL 时放弃了它们拥有的核心(真实或虚拟),并且如果有另一个进程在等待核心运行,它现在将有机会。 好的,所以可以创建一个进程但不分配一个核心。本质上,您所说的是我可以启动任意数量的进程,这对于可能需要等待时间的大量 I/O 或操作可能有意义。在等待期间,该进程可以放弃核心并允许其他人工作......所以我唯一的问题是:多处理池是否真的处理这个“我什么都没做,所以我会让我的邻居有一个转”种思维? @rocksNwaves 我相当肯定,当另一个进程运行导致 CPU 变得可用时,现在负责调度进程的是底层操作系统 (OS),例如 Linux 或 Windows进入等待。因此,它在比 Python 的 Process 类更低的级别上完成。但是请记住,与相当轻量级的线程不同,创建无法有效使用的进程(参见我的示例)变得昂贵。这可能就是为什么创建 Python 池时(合理的)默认值是您拥有的实际 CPU 数量的原因。【参考方案3】:

multiprocessing.pool的实现使用

        if processes is None:
            processes = os.cpu_count() or 1

不确定这是否能回答您的问题,但至少它是一个数据点。

【讨论】:

以上是关于os.sched_getaffinity(0) 与 os.cpu_count()的主要内容,如果未能解决你的问题,请参考以下文章

Node.js Sass 版本 7.0.0 与 ^4.0.0 不兼容 || ^5.0.0 || ^6.0.0

数据的表示——0与1

Navicat Premium 12.0.18 / 12.0.24安装与激活

0、800、600、0 与 0、800、0、600 OpenGL

无法与在 0.0.0.0:8443 上侦听的应用程序建立连接

无法与在 0.0.0.0:8443 上侦听的应用程序建立连接