线程同步

Posted 谭兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程同步相关的知识,希望对你有一定的参考价值。

 

线程同步

多个线程共享相同的内存时, 需要确保每个线程看到的数据视图一致.   如果每个线程的数据在其他线程不会用到,  或者变量是只读的, 这样没有一致性问题. 但是如果多个线程需要共享变量时, 就需要进行线程同步了.

举个例子,  有线程A读取变量并加1, 可分为以下三步:

1.从内存读入变量到寄存器

2.寄存器中进行变量值增加

3.把新值写回内存单元.

这时如果有线程B在 2 和 3 期间读取变量值, 就可能会得到不一致的值. 下面介绍几种常用同步机制.

 

互斥量

互斥量确保同一时间只有一个线程访问数据, 从本质上来说是一把锁.  在访问数据前要加锁, 访问数据后释放锁.  

如果有多个线程同时试图锁住数据, 则只有一个线程能获取锁, 变成运行状态( 谁能第一个获取锁与具体实现有关).  其它线程将被阻塞直到锁释放. 在这种方式下, 只有一个线程可以向前执行.

 

互斥变量使用 pthread_mutex_t 数据类型表示. 在使用前需要用 pthread_mutex_init( ) 初始化, 释放内存前需要调用 pthread_mutex_destroy( );

如果线程不希望被阻塞, 它可以使用 pthread_mutex_trylock ( ) 尝试对互斥量加锁,  如果互斥量可用, 则成功锁住; 否则锁住失败, 返回EBUSY, 线程不阻塞.

#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,const struct timespec *restrict tsptr);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
                                            All return: 0 if OK, error number on failure

 

 

下面是是APUE上面一个简单的程序.   由于pthread不是系统默认库, 所以在编译是注意加入-lpthread.

 gcc mutex.c -o mutex -lpthread

#include "apue.h"
#include <pthread.h>

/*
 *
 *    加入引用计数, 确保在所有使用该对象的线程完成数据访问之前, 该对象内存空间不会被释放.
 *    在加1, 减1 以及判断引用计数是否为0之前必须锁住互斥量.
 *
 *
 * */

struct foo{
    int    f_count;
    pthread_mutex_t f_lock;    //互斥变量
};


//调用malloc() 动态分配互斥变量
struct foo *foo_alloc(void){
    
    struct foo *fp;
    
    if((fp = malloc(sizeof(struct foo))) != NULL){
        fp->f_count = 1;
        //初始化互斥变量
        if(pthread_mutex_init(&fp->f_lock, NULL) != 0){
            free(fp);
            return (NULL);
        }
    }
    return (fp);
}

//加1操作
void foo_hold(struct foo *fp){
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;    
    printf("f_count = %d\\n", fp->f_count);
    pthread_mutex_unlock(&fp->f_lock);
}

//减1操作,  如果最后一个引用被释放, 释放对象内存空间.
void foo_rele(struct foo *fp){
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count--;
    printf("f_count = %d\\n", fp->f_count);
    if(0 == fp->f_count){    
        printf("free fp\\n");
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }else{    
        pthread_mutex_unlock(&fp->f_lock);
    }
}


//线程 1 执行函数
void *thr_fn1(void *arg){

    struct foo *fp = (struct foo*)arg;
    printf("thread 1 starting\\n");
    foo_rele(fp);
    printf("thread 1 exit\\n");
    pthread_exit((void *)1);
}


//线程 2 执行函数
void *thr_fn2(void *arg){

    struct foo *fp = (struct foo *)arg;
    printf("thread 2 starting\\n");
    foo_hold(fp);
    foo_hold(fp);
    printf("thread 2 exit\\n");
    pthread_exit((void *)2);
}

int main(void){

    
    pthread_t tid1, tid2;
    int err;
    void *pret;
    struct foo *fp;
    
    fp = foo_alloc();

    //创建线程
    if((err = pthread_create(&tid1, NULL, thr_fn1, (void *)fp)) != 0){
        err_quit("create thread 1 error\\n", strerror(err));
    }
    
    if((err = pthread_create(&tid2, NULL, thr_fn2, (void *)fp)) != 0){
        err_quit("create thread 2 error\\n", strerror(err));
    }
    //获取线程退出状态
    pthread_join(tid1, &pret);
    printf("thread 1 exit code is : %ld\\n", (long)pret);
    pthread_join(tid2, &pret);
    printf("thread 2 exit code is : %ld\\n", (long)pret);
    exit(0);
}
View Code

 

程序运行结果如下,  这里有个问题,  那就是线程1在线程2运行之前就已经结束.  既然 fp 指针已经被释放掉了.  为何仍然在线程2中得以引用呢 ?    答案是野指针 !   之前 fp 指向的内存没有被占用, 但是也可能被占用, 引起内存泄露. 

thread 1 starting
thread 2 starting
f_count = 0
free fp
thread 1 exit
f_count = 1
f_count = 2
thread 2 exit
thread 1 exit code is : 1
thread 2 exit code is : 2

 

避免死锁

产生死锁的几种情况

1.线程对同一互斥量多次加锁, 类似下面这种

pthread_mutex_lock(mutex);
pthread_mutex_lock(mutex);
pthread_mutex_unlock(mutex);
pthread_mutex_unlock(mutex);

第二行阻塞, 发生死锁.  这种情况比较容易避免,  取消多次加锁即可.

 

2.多个互斥量,  且不同线程各锁住一个互斥量, 并都在请求另一个互斥量阻塞.

//thread A
pthread_mutex_lock(mutex1);
pthread_mutex_lock(mutex2);
pthread_mutex_unlock(mutex2);
pthread_mutex_unlock(mutex1);
//thread B
pthread_mutex_lock(mutex2);
pthread_mutex_lock(mutex1);
pthread_mutex_unlock(mutex1);
pthread_mutex_unlock(mutex2);

 

互相请求对方所占资源, 导致死锁.  避免办法是各个进程对互斥量加锁的顺序要一致.  可以是使用一个hash表锁实现.  

下面程序来自APUE, 只是一种解决办法, 没有具体测试代码.   这里有两个参考文章.

http://www.cnblogs.com/xbf9xbf/p/4764747.html

https://segmentfault.com/q/1010000007227951

#include "apue.h"
#include <pthread.h>

#define NHASH 29                                    //哈希大值
#define HASH(fp) (((unsigned long)fp) % NHASH)        //简单hash取模
struct foo *fh[NHASH];                                //哈希表, 链表解决散列冲突

//初始化  只对静态变量分配的互斥量
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

struct foo{
    int    f_count;
    pthread_mutex_t f_lock;    //互斥变量
    struct foo *f_next;
    int f_id;
};


//调用malloc() 动态分配互斥变量
struct foo *foo_alloc(void){
    
    int idx;
    struct foo *fp;
    
    if((fp = malloc(sizeof(struct foo))) != NULL){
        fp->f_count = 1;
        //初始化互斥变量
        if(pthread_mutex_init(&fp->f_lock, NULL) != 0){
            free(fp);
            return (NULL);
        }

        idx = HASH(fp);
        // 先对散列表加锁
        pthread_mutex_lock(&hashlock);
        //头插
        fp->f_next = fh[idx];
        fh[idx] =fp->f_next;
        // 这里要加锁,  对于其它线程是全局可见的, 所以要先阻塞其它请求新结构的线程
        pthread_lock(&fp->f_lock);
        pthread_unlock(&hashlock);
        
        /* 其它初始化操作 */

        pthread_mutex_unlock(&fp->f_lock);
    }
    return (fp);
}

//加1操作
void foo_hold(struct foo *fp){
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;    
    pthread_mutex_unlock(&fp->f_lock);
}


/* 第二版这个函数有错误, 这里是第三版的 */
struct foo *foo_find(int id){

    struct foo  *fp;
    /* 锁住hash表 */
    pthread_mutex_lock(&hashlock);
    for(fp = fh[HASH(id)]; fp != NULL, fp = fp->f_next){
        //命中 增加其引用计数
        if(fp->f_id == id){
            foo_hold(fp);
            break;
        }
    }

    pthread_mutex_unlock(&hashlock);
    return (fp);
}

//减1操作.
void foo_rele(struct foo *fp){

    struct foo *tfp;
    int idx;
    //锁住此节点
    pthread_mutex_lock(&fp->f_lock);
    
    //是最后一个结构
    if(1 == fp->f_count){    
        
        // 这里需要先解锁然后重新获取
        // 为什么要重新获取呢 ?    因为释放节点必须先对其解锁, 才能进行操作!
        // 为什么是先获取hashlock 再获取 fp节点的锁 ?  考虑一种情况
        // foo_rele获取fp的锁, 请求hashlock锁,  而foo_find获取了此hashlock的锁, 请求此fp, 造成死锁
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_unlock(&hashlock);
        pthread_mutex_lock(&fp->f_lock);
        

        //重新判断, 防止前面调整锁的顺序, 而其它线程又对此引用计数加1.
        if(1 != fp_f_count){
            fp->f_count--;
            pthread_mutex_unlock(&fp->f_lock);
            pthread_mutex_unlock(&hashlock);
            return;
        }

        idx = HASH(fp);
        tfp = fh[idx];

        //链表查找
        if(tfp == tp){
            fh[idx] = fp->f_next;
        }else{
            while(tfp->f_index != fp) tfp = tfp->f_next;
            tfp->f_next = fp->f_next;
        }
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_unlock(&hashlock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }else{    
        //不是最后一个结构
        fp->f_count--;
        pthread_mutex_unlock(&fp->f_lock);
    }
}
View Code

 

这里可以对锁进行简化, 用hash锁赖保护结构引用计数,  结构互斥量保护结构体内的其它任何数据. 这种锁的粒度较粗, 但是围绕hash锁和结构互斥量锁的排序问题就没有了

对上面程序稍作修改

#include "apue.h"
#include <pthread.h>

#define NHASH 29                                    //哈希大值
#define HASH(fp) (((unsigned long)fp) % NHASH)        //简单hash取模
struct foo *fh[NHASH];                                //哈希表, 链表解决散列冲突

//初始化  只对静态变量分配的互斥量
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

struct foo{
    int    f_count;
    pthread_mutex_t f_lock;    //互斥变量
    struct foo *f_next;
    int f_id;
};


//调用malloc() 动态分配互斥变量
struct foo *foo_alloc(void){
    
    int idx;
    struct foo *fp;
    
    if((fp = malloc(sizeof(struct foo))) != NULL){
        fp->f_count = 1;
        //初始化互斥变量
        if(pthread_mutex_init(&fp->f_lock, NULL) != 0){
            free(fp);
            return (NULL);
        }

        idx = HASH(fp);
        // 先对散列表加锁
        pthread_mutex_lock(&hashlock);
        //头插
        fp->f_next = fh[idx];
        fh[idx] =fp->f_next;
        // 这里要加锁,  对于其它线程是全局可见的, 所以要先阻塞其它请求新结构的线程
        pthread_lock(&fp->f_lock);
        pthread_unlock(&hashlock);
        
        /* 其它初始化操作 */

        pthread_mutex_unlock(&fp->f_lock);    //第二版中此处应该有错误, 少了这句话.
    }
    return (fp);
}

//加1操作
void foo_hold(struct foo *fp){
    pthread_mutex_lock(&hashlock);
    fp->f_count++;    
    pthread_mutex_unlock(&hashlock);
}


/* 第二版这个函数有错误, 这里是第三版的 */
struct foo *foo_find(int id){

    struct foo  *fp;
    /*hash锁保护对象引用计数 */
    pthread_mutex_lock(&hashlock);
    for(fp = fh[HASH(id)]; fp != NULL, fp = fp->f_next){
        //命中 增加其引用计数
        if(fp->f_id == id){
            fp->f_count++;
            break;
        }
    }

    pthread_mutex_unlock(&hashlock);
    return (fp);
}

//减1操作.
void foo_rele(struct foo *fp){

    struct foo *tfp;
    int idx;
    //锁住hash表
    pthread_mutex_lock(&hashlock);
    
    //是最后一个结构
    if(--fp->f_count == 0){    

        idx = HASH(fp);
        tfp = fh[idx];

        //链表查找
        if(tfp == tp){
            fh[idx] = fp->f_next;
        }else{
            while(tfp->f_index != fp) tfp = tfp->f_next;
            tfp->f_next = fp->f_next;
        }
        pthread_mutex_unlock(&hashlock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }else{    
        //不是最后一个结构
        pthread_mutex_unlock(&fp->f_lock);
    }
}
View Code

 

 

读写锁

读写锁可以使得读操作比互斥量具有更高的并行性, 它有三种状态, 读加锁, 写加锁, 不加锁.  非常适合读操作远多余写操作的数据结构.  可以在读数据前加锁, 读完后释放, 写操作同样如此.

关键点

  1.  读写锁在读加锁状态时, 其它任何读操作都可以加锁.
  2.  读写锁在写加锁状态时, 其它任何操作都被阻塞(不管是加读锁还是写锁).
  3.  读写锁只有在无所状态下才能加写锁.

读写锁实现方式各不相同, 但是当读写锁处于读模式锁定状态时, 如果有线程试图加写锁操作, 那么其后的读锁请求将会被阻塞,  避免等待的写锁一直得不到满足.

其中一些API如下

初始与销毁

#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t*restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
                                               Both return: 0 if OK, error number on failure

 

加锁与解锁

#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
                                         All return: 0 if OK, error number on failure
#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
                                        Both return: 0 if OK, error number on failure
#include <pthread.h>
#include <time.h>
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrictrwlock,const struct timespec*restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrictrwlock,const struct timespec *restric ttsptr);

 

 

下面程序说明了读写锁的使用, 四个线程, 两个读线程, 两个写线程.

#include "apue.h"
#include <pthread.h>

/* 初始化读写锁 */
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;

/* 全局资源 */
int global_num = 10;

/* 读锁线程 */
void *thread_read_lock(void *arg){
    
    char *pthread_name = (char *)arg;
    while(1){    
        /* 读加锁 */
        pthread_rwlock_rdlock(&rwlock);
        printf("读线程 %s 进入临界区, global_num = %d\\n", pthread_name, global_num);
        sleep(1);
        printf("读线程 %s 退出临界区\\n", pthread_name);
    
        pthread_rwlock_unlock(&rwlock);
        sleep(1);
    }
    return NULL;
}

/* 写锁线程 */
void *thread_write_lock(void *arg){
        
    char *pthread_name = (char *)arg;

    while(1){    
        /* 写加锁 */
        pthread_rwlock_wrlock(&rwlock);

        global_num++;
        printf("写线程 %s 进入临界区, global_num = %d\\n",pthread_name, global_num);
        sleep(1);
        printf("写线程 %s 退出临界区\\n", pthread_name);
        
        pthread_rwlock_unlock(&rwlock);

        /* 这里多一秒 是因为两个写锁一起请求会导致后面的读锁饥饿. */
        sleep(2);
    }
}


int main(void){

    pthread_t tid_r1, tid_r2, tid_w1, tid_w2;

    int err;
    //创建四个线程, 两个读, 两个写.
    if((err = pthread_create(&tid_r1, NULL, thread_read_lock, "read1")) != 0)  err_quit("create thread r1 error\\n", strerror(err));
    if((err = pthread_create(&tid_r2, NULL, thread_read_lock, "read2")) != 0)  err_quit("create thread r1 error\\n", strerror(err));
    if((err = pthread_create(&tid_w1, NULL, thread_write_lock, "write1")) != 0)  err_quit("create thread w1 error\\n", strerror(err));
    if((err = pthread_create(&tid_w2, NULL, thread_write_lock, "write2")) != 0)  err_quit("create thread w2 error\\n", strerror(err));
    
    /* 防止主线程提前退出 */
    if((err = pthread_join(tid_r1, NULL)) != 0) err_quit("cant\'t join with thread 1\\n", strerror(err));

    exit(0);
}
View Code

 

 

运行结果 :

读线程 read2 进入临界区, global_num = 10
读线程 read1 进入临界区, global_num = 10
读线程 read2 退出临界区
读线程 read1 退出临界区
写线程 write1 进入临界区, global_num = 11
写线程 write1 退出临界区
写线程 write2 进入临界区, global_num = 12
写线程 write2 退出临界区
读线程 read2 进入临界区, global_num = 12
读线程 read1 进入临界区, global_num = 12
读线程 read2 退出临界区
读线程 read1 退出临界区
写线程 write1 进入临界区, global_num = 13
写线程 write1 退出临界区
写线程 write2 进入临界区, global_num = 14
写线程 write2 退出临界区
读线程 read2 进入临界区, global_num = 14
读线程 read1 进入临界区, global_num = 14

 

可以看出, 读锁可以同时进入临界区, 而写锁中间等待了一秒都没有其它线程能进来.

 

条件变量

配合一个互斥量来使用, 允许线程以无竞争的方式来等待条件. 

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


struct foo{
    int f_count;
    pthread_mutex_t f_lock;
    pthread_cond_t f_cond;
};

/* 初始化互斥量与条件变量 */
struct foo * foo_alloc(){
    struct foo *fp;
    if((fp = malloc(sizeof(struct foo))) != NULL){
        fp->f_count = 0;
        pthread_mutex_init(&fp->f_lock, NULL);
        pthread_cond_init(&fp->f_cond, NULL);
    }
    return fp;
}

/* 加法 */
void *foo_increase(void *arg){
    
    struct foo *fp;

    fp = (struct foo*)arg;

    while(1){
        pthread_mutex_lock(&fp->f_lock);
    
        fp->f_count++;
        /* 大于等于100时发送条件 */
        if(fp->f_count >= 100){
            pthread_cond_signal(&fp->f_cond);
            
            pthread_cond_wait(&fp->f_cond, &fp->f_lock);
        }

        pthread_mutex_unlock(&fp->f_lock);
    }
}

/* 重新置0 */
void *foo_print(void *arg){
    struct foo *fp;
    fp = (struct foo*)arg;

    while(1){
        pthread_mutex_lock(&fp->f_lock);
        
        while(fp->f_count < 100){
            //释放掉锁, 等待条件为真返回, 再次锁住.
            pthread_cond_wait(&fp->f_cond, &fp->f_lock);
        }
        printf("重置 : %d\\n", fp->f_count);    
        /* 重新置0 */    
        fp->f_count = 0;
        
        pthread_cond_signal(&fp->f_cond);
        pthread_mutex_unlock(&fp->f_lock);
    }
}


int main(void){

    struct foo *fp;
    pthread_t tid_increase1, tid_print;
    //初始化
    fp = foo_alloc();

    //加法线程
    pthread_create(&tid_increase1, NULL, foo_increase, fp);
    
    //重置线程
    pthread_create(&tid_print, NULL, foo_print, fp);

    //防止主线程提前退出
    sleep(20);
    exit(0);
}
View Code

 

以上是关于线程同步的主要内容,如果未能解决你的问题,请参考以下文章

起底多线程同步锁(iOS)

多线程编程

第十次总结 线程的异步和同步

详解C++多线程

进程线程同步异步

配置 kafka 同步刷盘