聊聊多线程程序的load balance

Posted 21CTO

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊多线程程序的load balance相关的知识,希望对你有一定的参考价值。

说起load balance,一般比较容易想到的是大型服务在多个replica之间的load balance、和kernal的load balance。前者一般只是在流量入口做一下流量分配,逻辑相对简单;而后者则比较复杂,需要不断发现正在运行的各个进程之间的imbalance,然后通过将进程在CPU之间进行迁移,使得各个CPU都被充分利用起来。

而本文想要讨论的load balance有别于以上两种,它是多线程(多进程)server程序内部,各个worker线程(进程)之间的load balance。
考虑一种常用的server模型:一个receiver线程负责接收请求,后面有一个线程池装了一堆worker线程,收到的请求被分派给这些worker进行处理。receiver与worker之间通过pthread_cond+request_queue来进行通信。一般的做法是:receiver将收到的请求放入queue,然后signal一下cond,就OK了。具体哪个worker会被唤醒,那是kernel的事情(实际上kernel会遵循先来后到原则,唤醒先进入等待的进程,参阅《linux futex浅析》)。通常情况下这样做就足够了,receiver唤醒worker不需要涉及load balance的逻辑。但是有时候我们还是可以做一些load balance的工作,来提高server的性能。

kernel load balance概述

由于这里的load balance跟kernel的load balance息息相关,所以我们有必要先看看kernel的load balance都做了些什么。详细的内容请参阅《linux内核SMP负载均衡浅析》,这里只做一些简要的概括。

说白了,kernel的load balance就做一件事情: 让系统中RUNNING状态的进程尽可能的被分摊,在每一个调度域上看都是balance的 。怎么理解呢?现在CPU的结构一般有:物理CPU、core、超线程、这么几个层次。”在每一个调度域上看都balance”可以理解为在每一个层次上都balance:每个物理CPU上的总load相当、每个core上的总load相当、每个超线程上的load也相当。

我们在系统中看到的”CPU”都是最底层的超线程这个层次,我们可能会直观的认为把RUNNING状态的进程分摊到每一个”CPU”上就行了,但是实际上kernel的load balance还有更高的要求。假设我们的机器有2个物理CPU、每个物理CPU有2个core、每个core有2个超线程,共8个”CPU”。如果现在有8个RUNNING状态的进程(假设优先级都相同),每个”CPU”各分摊一个进程,那么自然就是balance的。但是如果现在只有4个RUNNING状态的进程(假设优先级都相同),真正的balance并不仅仅是每个进程各自落到一个”CPU”上就行了,而是进一步要求每个物理CPU上跑两个进程、每个core上跑一个进程。
为什么要有这样的强约束呢?因为尽管各个”CPU”逻辑上是独立的(不存在主从关系之类),但它们并非孤立存在。相同物理CPU下的”CPU”会共享cache、相同core下的”CPU”会共享计算资源(所谓的超线程也就是一套流水线跑两个线程)。而共享也就意味着争抢。所以,在RUNNING状态的进程并非正好均摊给每一个”CPU”的情况下,需要考虑更高层次的CPU是否被均摊,以避免cache和CPU流水线的争抢(当然,除了性能,这也体现了kernel的公平性)。

最后再多提一点,kernel的load balance是异步的。为避免占用过多资源,kernel肯定不可能实时监控各个”CPU”的情况,然后面对变化实时的做出反应(当然,实时进程除外,但这不在我们讨论范围内)。

server的load balance考虑

有了kernel的load balance作为铺垫,看看我们server上的receiver线程能做些什么吧。

首先是worker线程的数量问题。如果worker数量过多会发生什么情况?还是假设我们的机器有上述的8个”CPU”,假设我们开了80个worker,再假设这80个线程被平均分派到每一个”CPU”上,等待处理任务。当一堆请求陆续到来的时候,由于我们的receiver没有任何load balance的策略,被唤醒的worker出现在哪个”CPU”上可以说是随机的。你想想,”同时”到来的8个请求正好落到8个不同”CPU”上的概率是多少?是:(70*60*50*40*30*20*10)/(79*78*77*76*75*74*73)=0.34%。也就是说几乎肯定会出现某些”CPU”要处理多个请求、某些”CPU”却闲着没事干的情况,系统的性能可想而知。而等到后知后觉的kernel load balance将这些请求balance到每一个”CPU”上时,可能请求已经处理得差不多了,等到下一批请求到来时,load又还是凌乱的。因为刚刚已经balance好的那些worker线程又被放回到了cond等待队列的尾部,而优先响应新请求的则是那些位于队列头部的未曾被balance过的worker。
那么会不会经历几轮请求之后就能达到balance了呢?如果请求真的是一轮一轮的过来,并且每个请求的处理时间完全相同,那么有可能会达到balance,但是实际情况肯定相差甚远。
解决办法是什么呢?将cond先进先出的队列式等待逻辑改为后进先出的栈式逻辑,或许可以解决问题,但是更好的办法应该是限制worker的数目等于或者略小于”CPU”数目,这样很自然的就balance了。

第二个问题,既然我们承认kernel在各个调度域上的load balance的有意义的,我们server中的receiver线程是不是也可以通过类似的办法来获得收益呢?现在我们吸取了之前的教训,只开了8个worker线程。依靠kernel load balance的作用,这8个线程基本会固定在每一个”CPU”上。假设现在一下子来了4个请求,它们会落到4个不同的”CPU”上,如果运气好,这4个”CPU”分别属于不同的core,那么处理请求的过程就不会涉及CPU资源的争抢;反之,可能形成2个core非常忙、2个core闲着的局面。
要解决这个问题需要做到两点,继续以我们之前的server程序为例。首先,receiver线程要知道各个worker线程都落在哪一个”CPU”上;然后在分派任务时还需要有balance的眼光。要做到第一点,最好是借助sched_affinity功能将线程固定在某个”CPU”上,避免kernel load balance把问题搞复杂了。既然前面我们已经得出了工作线程数等于或略小于CPU数的结论,现在每个线程固定在一个CPU上就是可行的。第二点,我们需要在现有pthread_cond的基础上做一些改进,给进入等待状态的worker线程赋一个优先级,比如每个core的第一个超线程作为第一优先级,第二个超线程为第二优先级。那么在cond唤醒工作线程的时候,我们就可以尽量让worker线程不落到同一个core上。实现上可以利用futex的bitset系列功能,通过bitset来标识优先级,以便在唤醒指定的worker线程。(参阅《linux futex浅析》。)

例子

好了,纸上谈兵讲了这么多,得来点实际的例子验证一下。为了简单,就不写什么server程序了,只需要一个生产者线程和若干消费者线程。生产者线程生成一些任务,通过cond+queue将其传递给消费者线程。为了观察在不同任务负载下的程序表现,我们需要控制任务负载。消费者线程在完成任务后通过另一组cond+queue把任务应答给生产者线程,于是生产者就知道当前有多少个任务正在处理中,以便控制生产新任务的节奏。最后,我们通过观察在不同条件下完成一批任务的时间来体会程序的性能。

这里面比较关键的是任务本身的处理逻辑,既然我们讨论的是CPU的负载,任务肯定应该是CPU密集型的任务。然后,单个任务的处理时间不宜太短,否则可能调度过程会成为程序的瓶颈,体现不出CPU的负载问题;另一方面,单个任务的处理时间也不宜太长,否则后知后觉的kernel load balance也能解决问题,体现不出我们主动做load balance的好处(比如任务处理时间是10秒,kernel load balance花费几十毫秒来解决balance问题其实也无伤大雅)。

代码贴在文章最后,编译出来的bin文件是这样的:

$g++ cond.cpp -pthread -O2
$./a.out
usage: ./a.out -j job_kind=shm|calc [-t thread_count=1] 
[-o job_load=1] [-c job_count=10] [-a affinity=0] [-l] 
[-f filename="./TEST" -n filelength=128M]
  • 代码里面准备了两种任务逻辑,”-j shm”是mmap一个文件,然后读取上面的数据做一些运算(文件及其长度由-f和-n参数来限定);”-j calc”是做一些算术运算;

  • “-t”参数指定工作线程的线程数;

  • “-o”指定任务负载;

  • “-c”指定单个线程处理任务的个数;

  • “-a”指定是否设置sched_affinity,并且指明跳几个”CPU”放一个worker线程。比如”-a 1″表示把worker线程顺序固定在1、2、3、……号”CPU”上,而”-a 2″表示固定在2、4、6、……号”CPU”上,以此类推。需要注意的是,邻近的”CPU”号并不表示”CPU”在物理上是邻近的,比如在我测试用的机器上,共24个”CPU”,0~11号是每个core的第一个超线程、12~23是第二个超线程。这个细节需要读/proc/cpuinfo来确定。

  • “-l”参数指定启用我们增强版的分级cond,启用的话会将0~11号worker作为第一优先级,12~23作为第二优先级(当然,需要配合”-a”参数才有实际意义,否则也不确定这些worker都落在哪些”CPU”上);

首先来看worker线程过多所带来的问题(以下case各运行5次取时间最小值)。

case-1,启240个worker线程,24个任务负载:
$./a.out -j calc -t 240 -o 24
total cost: 23790
$./a.out -j shm -t 240 -o 24
total cost: 16827case-2,启24个worker线程,24个任务负载:
$./a.out -j calc -t 24 -o 24
total cost: 23210
$./a.out -j shm -t 24 -o 24
total cost: 16121

case-2效果明显要好略一些。并且在运行过程中如果用top观察的话,你会发现case-1只能压到2200%左右的CPU,而case-2几乎能达到2400%。

在case-1的基础上,如果禁止kernel load balance会怎样?加affinity试试看:

case-3,启240个worker线程,24个任务负载,加affinity:
$./a.out -j calc -t 240 -o 24 -a 1
total cost: 27170
$./a.out -j shm -t 240 -o 24 -a 1
total cost: 15351

calc任务比较符合预期,没有kernel load balance的情况下,性能继续下降。
而shm任务则让人大跌眼镜,性能居然提升了!其实这个任务除了CPU之外还很依赖于内存,因为所有任务都工作在同一个文件的mmap上,”CPU”挨得近反而更能发挥内存cache。(可见在这种情况下,kernel load balance其实是帮了倒忙。)
那么,我们将工作线程再调回24,是不是应该更理想?

case-3'
$./a.out -j shm -t 24 -o 24 -a 1
total cost: 15133

再来看第二个问题,worker线程站位不均所带来的影响。

case-4,启24个worker线程,12个任务负载:
$./a.out -j calc -t 24 -o 12
total cost: 14686
$./a.out -j shm -t 24 -o 12
total cost: 13265case-5,启24个worker线程,12个任务负载,加affinity,启用分级cond:
$./a.out -j calc -t 24 -o 12 -a 1 -l
total cost: 12206
$./a.out -j shm -t 24 -o 12 -a 1 -l
total cost: 12376

效果还是不错的。改一下”-a”参数,让同一个core的两个超线程都分在同一优先级呢?

case-5'
$./a.out -j calc -t 24 -o 12 -a 2 -l
total cost: 23510
$./a.out -j shm -t 24 -o 12 -a 2 -l
total cost: 15063

由于争抢CPU资源,calc任务性能变得很差,几乎减半。而shm任务由于cache复用所带来的好处,情况还好(比case-3还略好一些)。

这里的任务只是举了calc和shm两个例子,实际情况可能是很复杂的。尽管load balance的问题肯定存在,但是任务会因共享cache而得利、还是因争抢cache而失利?争抢CPU流水线又会造成多大的损失?这些都只能具体问题具体分析。kernel的load balance将负载尽量均摊到离得远的”CPU”上,大多数情况下没有问题。不过我们也看到shm任务中cache共享的收益还是很大的,如果例子更极端一点,肯定会出现承受负载的CPU离得越近,反而效果越好的情况。
另一方面,争抢CPU流水线会有多大损失,也可以简单的分析一下。超线程相当于两个线程共用一套CPU流水线,如果单个线程的代码上下文依赖很严重,指令基本上只能串行工作,无法充分利用流水线,那么流水线的空余能力就可以留给第二个线程使用。反之如果一个线程就能把流水线填满,硬塞两个线程进来肯定就只能有50%的性能(上述calc的例子就差不多是这样)。
为了说明这个问题,我们给calc任务加了一个SERIAL_CALC的宏开关,让它的运算逻辑变成上下文强依赖。然后重跑case-5中的两个命令,我们会看到其实在这种情况下承受负载的CPU离得近一些似乎也问题不大:

case-6,采用SERIAL_CALC运算逻辑,重跑case-5中的calc任务
$g++ cond.cpp -pthread -O2 -DSERIAL_CALC
$./a.out -j calc -t 24 -o 12 -a 1 -l
total cost: 51269
$./a.out -j calc -t 24 -o 12 -a 2 -l
total cost: 56753

最后是代码,有兴趣你还可以尝试更多的case,have fun!

#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sched.h>
#include <sys/types.h>
#include <errno.h>
#include <string.h>
#include <linux/futex.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <math.h>
#include <sys/syscall.h>

#define CPUS    24
#define FUTEX_WAIT_BITSET   9
#define FUTEX_WAKE_BITSET   10

struct Job
{
    long _input;
    long _output;
};

class JobRunner
{
public:
    virtual void run(Job* job) = 0;
};

class ShmJobRunner : public JobRunner
{
public:
    ShmJobRunner(const char* filepath, size_t length)
            : _length(length) {
        int fd = open(filepath, O_RDONLY);
        _base = (long*)mmap(NULL, _length*sizeof(long),
                PROT_READ, MAP_SHARED|MAP_POPULATE, fd, 0);
        if (_base == MAP_FAILED) {
            printf("FATAL: mmap %s(%lu) failed!\n",
                    filepath, _length*sizeof(long));
            abort();
        }
        close(fd);
    }
    virtual void run(Job* job) {
        long i = job->_input % _length;
        long j = i + _length - 1;
        const int step = 4;
        while (i + step < j) {
            if (_base[i%_length] * _base[j%_length] > 0) {
                j -= step;
            }
            else {
                i += step;
            }
        }
        job->_output = _base[i%_length];
    }
private:
    const long* _base;
    size_t _length;
};

class CalcJobRunner : public JobRunner
{
public:
    virtual void run(Job* job) {
        long v1 = 1;
        long v2 = 1;
        long v3 = 1;
        for (int i = 0; i < job->_input; i++) {
#ifndef SERIAL_CALC
            v1 += v2 + v3;
            v2 *= 3;
            v3 *= 5;
#else
            v1 += v2 + v3;
            v2 = v1 * 5 + v2 * v3;
            v3 = v1 * 3 + v1 * v2;
#endif
        }
        job->_output = v1;
    }
};

class JobRunnerCreator
{
public:
    static JobRunner* create(const char* name,
            const char* filepath, size_t filelength) {
        if (strcmp(name, "shm") == 0) {
            printf("share memory job\n");
            return new ShmJobRunner(filepath, filelength);
        }
        else if (strcmp(name, "calc") == 0) {
            printf("caculation job\n");
            return new CalcJobRunner();
        }
        printf("unknown job '%s'\n", name);
        return NULL;
    }
};

class Cond
{
public:
    virtual void lock() = 0;
    virtual void unlock() = 0;
    virtual void wait(size_t) = 0;
    virtual void wake() = 0;
};

class NormalCond : public Cond
{
public:
    NormalCond() {
        pthread_mutex_init(&_mutex, NULL);
        pthread_cond_init(&_cond, NULL);
    }
    ~NormalCond() {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    void lock() { pthread_mutex_lock(&_mutex); }
    void unlock() { pthread_mutex_unlock(&_mutex); }
    void wait(size_t) { pthread_cond_wait(&_cond, &_mutex); }
    void wake() { pthread_cond_signal(&_cond); }
private:
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

class LayeredCond : public Cond
{
public:
    LayeredCond(size_t layers = 1) : _value(0), _layers(layers) {
        pthread_mutex_init(&_mutex, NULL);
        if (_layers > sizeof(int)*8) {
            printf("FATAL: cannot support such layer %u (max %u)\n",
                    _layers, sizeof(int)*8);
            abort();
        }
        _waiters = new size_t[_layers];
        memset(_waiters, 0, sizeof(size_t)*_layers);
    }
    ~LayeredCond() {
        pthread_mutex_destroy(&_mutex);
        delete _waiters;
        _waiters = NULL;
    }
    void lock() {
        pthread_mutex_lock(&_mutex);
    }
    void unlock() {
        pthread_mutex_unlock(&_mutex);
    }
    void wait(size_t layer) {
        if (layer >= _layers) {
            printf("FATAL: layer overflow (%u/%u)\n", layer, _layers);
            abort();
        }
        _waiters[layer]++;
        while (_value == 0) {
            int value = _value;
            unlock();
            syscall(__NR_futex, &_value, FUTEX_WAIT_BITSET, value,
                    NULL, NULL, layer2mask(layer));
            lock();
        }
        _waiters[layer]--;
        _value--;
    }
    void wake() {
        int mask = ~0;
        lock();
        for (size_t i = 0; i < _layers; i++) {
            if (_waiters[i] > 0) {
                mask = layer2mask(i);
                break;
            }
        }
        _value++;
        unlock();
        syscall(__NR_futex, &_value, FUTEX_WAKE_BITSET, 1,
                NULL, NULL, mask);
    }
private:
    int layer2mask(size_t layer) {
        return 1 << layer;
    }
private:
    pthread_mutex_t _mutex;
    int _value;
    size_t* _waiters;
    size_t _layers;
};

template<class T>
class Stack
{
public:
    Stack(size_t size, size_t cond_layers = 0) : _size(size), _sp(0) {
        _buf = new T*[_size];
        _cond = (cond_layers > 0) ?
            (Cond*)new LayeredCond(cond_layers) : (Cond*)new NormalCond();
    }
    ~Stack() {
        delete []_buf;
        delete _cond;
    }
    T* pop(size_t layer = 0) {
        T* ret = NULL;
        _cond->lock();
        do {
            if (_sp > 0) {
                ret = _buf[--_sp];
            }
            else {
                _cond->wait(layer);
            }
        } while (ret == NULL);
        _cond->unlock();
        return ret;
    }
    void push(T* obj) {
        _cond->lock();
        if (_sp >= _size) {
            printf("FATAL: stack overflow\n");
            abort();
        }
        _buf[_sp++] = obj;
        _cond->unlock();
        _cond->wake();
    }
private:
    const size_t _size;
    size_t _sp;
    T** _buf;
    Cond* _cond;
};

inline struct timeval cost_begin()
{
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv;
}

inline long cost_end(struct timeval &tv)
{
    struct timeval tv2;
    gettimeofday(&tv2, NULL);
    tv2.tv_sec -= tv.tv_sec;
    tv2.tv_usec -= tv.tv_usec;
    return tv2.tv_sec*1000+tv2.tv_usec/1000;
}

struct ThreadParam
{
    size_t layer;
    Stack<Job>* inputQ;
    Stack<Job>* outputQ;
    JobRunner* runner;
};

void* thread_func(void *data)
{
    size_t layer = ((ThreadParam*)data)->layer;
    Stack<Job>* inputQ = ((ThreadParam*)data)->inputQ;
    Stack<Job>* outputQ = ((ThreadParam*)data)->outputQ;
    JobRunner* runner = ((ThreadParam*)data)->runner;

    while (1) {
        Job* job = inputQ->pop(layer);
        runner->run(job);
        outputQ->push(job);
    }
    return NULL;
}

void force_cpu(pthread_t t, int n)
{
    cpu_set_t cpus;
    CPU_ZERO(&cpus);
    CPU_SET(n, &cpus);
    if (pthread_setaffinity_np(t, sizeof(cpus), &cpus) != 0) {
        printf("FATAL: force cpu %d failed: %s\n", n, strerror(errno));
        abort();
    }
}

void usage(const char* bin)
{
    printf("usage: %s -j job_kind=shm|calc "
        "[-t thread_count=1] [-o job_load=1] [-c job_count=10] "
        "[-a affinity=0] [-l] "
        "[-f filename=\"./TEST\" -n filelength=128M]\n", bin);
    abort();
}

int main(int argc, char* const* argv)
{
    int THREAD_COUNT = 1;
    int JOB_LOAD = 1;
    int JOB_COUNT = 10;
    int AFFINITY = 0;
    int LAYER = 0;
    char JOB_KIND[16] = "";
    char FILEPATH[1024] = "./TEST";
    size_t LENGTH = 128*1024*1024;
    for (int i = EOF;
        (i = getopt(argc, argv, "t:o:c:a:j:lf:n:")) != EOF;) {
        switch (i) {
        case 't': THREAD_COUNT = atoi(optarg); break;
        case 'o': JOB_LOAD = atoi(optarg); break;
        case 'c': JOB_COUNT = atoi(optarg); break;
        case 'a': AFFINITY = atoi(optarg); break;
        case 'l': LAYER = 2; break;
        case 'j': strncpy(JOB_KIND, optarg, sizeof(JOB_KIND)-1); break;
        case 'f': strncpy(FILEPATH, optarg, sizeof(FILEPATH)-1); break;
        case 'n': LENGTH = atoi(optarg); break;
        default: usage(argv[0]); break;
        }
    }
    JobRunner* runner = JobRunnerCreator::create(
            JOB_KIND, FILEPATH, LENGTH);
    if (!runner) {
        usage(argv[0]);
    }

    srand(0);
    Job jobs[JOB_LOAD];

#ifdef TEST_LOAD
    for (int i = 0; i < JOB_LOAD; i++) {
        jobs[i]._input = rand();
        struct timeval tv = cost_begin();
        runner->run(&jobs[i]);
        long cost = cost_end(tv);
        printf("job[%d](%ld)=(%ld) costs: %ld\n",
                i, jobs[i]._input, jobs[i]._output, cost);
    }
    delete runner;
    return 0;
#endif

    printf("use layer %d\n", LAYER);
    Stack<Job> inputQ(JOB_LOAD, LAYER);
    Stack<Job> outputQ(JOB_LOAD, LAYER);

    pthread_t t;
    ThreadParam param[THREAD_COUNT];

    printf("thread init: ");
    for (int i = 0; i < THREAD_COUNT; i++) {
        int cpu = AFFINITY ? (i/AFFINITY+i%AFFINITY*CPUS/2)%CPUS : -1;
        size_t layer = !!(LAYER && i % CPUS >= CPUS/2);
        param[i].inputQ = &inputQ;
        param[i].outputQ = &outputQ;
        param[i].runner = runner;
        param[i].layer = layer;
        pthread_create(&t, NULL, thread_func, (void*)&param[i]);
        if (cpu >= 0) {
            printf("%d(%d|%d),", i, cpu, layer);
            force_cpu(t, cpu);
        }
        else {
            printf("%d(*|%d),", i, layer);
        }
        usleep(1000);
    }
    printf("\n");

    struct timeval tv = cost_begin();
    for (int i = 0; i < JOB_LOAD; i++) {
        jobs[i]._input = rand();
        inputQ.push(&jobs[i]);
    }
    for (int i = 0; i < JOB_LOAD*JOB_COUNT; i++) {
        Job* job = outputQ.pop();
        job->_input = rand();
        inputQ.push(job);
    }
    for (int i = 0; i < JOB_LOAD; i++) {
        outputQ.pop();
    }
    long cost = cost_end(tv);
    printf("total cost: %ld\n", cost);

    delete runner;
    return 0;
}


若您对版权有异议,请联系21CTO社区编辑:QQ:623166 或 info@21cto.com

欢迎各位会员&研发专家原创投稿,点击查看原文可加入21CTO社区——中国互联网第一技术人脉与社交平台。



关于21CTO社区

21CTO.com是中国互联网第一技术人脉与社交平台。我们为国内最优秀的开发者提供社交、学习等产品,帮助企业快速对接开发者,包括人才招聘,项目研发,顾问咨询服务。

看微信文章不过瘾,欢迎到网站看更多文章和分享。

投稿邮箱:info@21cto.com





以上是关于聊聊多线程程序的load balance的主要内容,如果未能解决你的问题,请参考以下文章

负载均衡 lb load balance

多节点 cassandra 集群 - load_balancing_policy

04_Flume多节点load_balance实践

python 复习—并发编程实战——线程多进程多协程加速程序运行实例(多线程和多进程的对比)

线程学习知识点总结

多个用户访问同一段代码