使用共享内存(mmap)和信号量的进程间通信

Posted

技术标签:

【中文标题】使用共享内存(mmap)和信号量的进程间通信【英文标题】:inter process communication using shared memory(mmap) and semaphores 【发布时间】:2018-03-15 23:29:13 【问题描述】:

我正在尝试使用计数信号量即兴创作我为单生产者多消费者多线程编写的程序。我想使用共享内存(mmap() 系统调用)实现进程间通信。我想使用没有后备文件的匿名映射。

这是我想在父进程和它的多个子进程之间共享的结构。

typedef struct Buffer
    char **Tuples;
    sem_t buf_mutex,empty_count,fill_count;
 Buffer;
Buffer buffer[100];

父进程是mapper() 函数,它会根据一些输入产生一些东西并将其放入缓冲区[i]。子进程转到reducer() 函数,该函数消耗放入其缓冲区[j] 中的内容。每个 reducer 或子进程都应该可以访问其缓冲区。子进程在主函数中被 forked(),然后父进程控制转到mapper()。我已将同步原语初始化为进程共享。

我的main() 方法是正确的方法吗?我也收到返回值mmap() 的类型转换错误,这是一个指针,但我不知道如何处理它然后使用它。我还认为 malloc() 不应在第 47 行中用于为元组分配空间,而应使用 mmap() 本身。有人可以帮忙吗? 这是我的程序 -

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/mman.h>

typedef struct Buffer
    char **Tuples;
    // int count;
    sem_t buf_mutex,empty_count,fill_count;
 Buffer;
Buffer buffer[100];
int numOfSlots;
int numOfReducers;

void mapper()
    //Synchronization primitives (counting semaphores) used for synchronization
    //Produce something and put it in buffer[i]


void reducer(long b)
    //Synchronization primitives (counting semaphores) for synchronization
    //Consume from buffer[b]



int main (int argc, char *argv[])
    if(argc != 3) 
        if(argc < 3)
            printf("Insufficient arguments passed\n");
        else
            printf("Too many arguments passed\n");
    return 1;
    
    int i;
    long r;
    numOfSlots = atoi(argv[1]);
    numOfReducers = atoi(argv[2]);

    for(i=0; i<numOfReducers; i++)
        buffer[i] = (struct Buffer *) mmap(NULL, sizeof(buffer[i]), PROT_READ | PROT_WRITE,
                MAP_SHARED | MAP_ANONYMOUS, -1, 0);
        if (buffer[i] == MAP_FAILED)
            errExit("mmap");    

        buffer[i].Tuples = malloc(numOfSlots * sizeof(char *));
        sem_init(&buffer[i].buf_mutex, 1, 1);
        sem_init(&buffer[i].fill_count, 1, 0);
        sem_init(&buffer[i].empty_count, 1, numOfSlots);
    

    for(r=0;r<numOfReducers;r++) // loop will run n times (n=5)
        if(fork() == 0)
            printf("[son] pid %d from [parent] pid %d\n",getpid(),getppid());
            Reducer(r);
            exit(0);
        
    
    mapper();
    for(r=0;r<numOfReducers;r++) // loop will run n times (n=5)
    wait(NULL);

这些是我想要关注的链接 - https://computing.llnl.gov/tutorials/pthreads/man/pthread_mutexattr_init.txt https://github.com/bradfa/tlpi-dist/blob/master/mmap/anon_mmap.c

谢谢, 哈里什

【问题讨论】:

【参考方案1】:

在网上研究后。我想出了这个可行的解决方案。 我学到的另一件事是char *var="hello" 存储在文本段的只读内存中,这意味着子进程也可以访问它。所以strcpy() 将是其他任何事情的更好选择。

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <string.h>

typedef struct Buffer
    char **Tuples;
    // int count;
    sem_t buf_mutex,empty_count,fill_count;
    int inSlotIndex;
    int outSlotIndex;
 Buffer;
Buffer *buffer;
int numOfSlots;
int numOfReducers;
int *done;

void mapper()
//Synchronization primitives (counting semaphores) used for synchronization
//read continuously from a file, produce something and put it in buffer[i]
//Here is an example
    char *temp = "trail";
    sem_wait(&buffer[1].empty_count);
    sem_wait(&buffer[1].buf_mutex);
    // Use strcpy() for anything other than string literal
    buffer[1].Tuples[0] = temp;
    buffer[1].inSlotIndex = 3;
    buffer[1].outSlotIndex = 4;
    sem_post(&buffer[1].buf_mutex);
    sem_post(&buffer[1].fill_count);
    *done = 1;


void reducer(long tid, Buffer *buffer, int *done)
//Synchronization primitives (counting semaphores) used for synchronization
//Consume from buffer[b]
    sem_wait(&buffer[tid].fill_count);
    sem_wait(&buffer[tid].buf_mutex);
    printf("%s\n", buffer[tid].Tuples[0]);
    printf("%d\n", buffer[tid].inSlotIndex);
    printf("%d\n", buffer[tid].outSlotIndex);
    sem_post(&buffer[tid].buf_mutex);
    sem_post(&buffer[tid].empty_count);
    if(*done == 1)
        printf("DONE\n");



int main (int argc, char *argv[])

    if(argc != 3) 
        if(argc < 3)
            printf("Insufficient arguments passed\n");
        else
            printf("Too many arguments passed\n");
        return 1;
    
    srand(time(NULL));
    int i, j;
    long r;
    char *temp;
    numOfSlots = atoi(argv[1]);
    numOfReducers = atoi(argv[2]);

    buffer = (struct Buffer *)mmap(NULL, numOfReducers * sizeof(struct Buffer), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
    if(buffer == MAP_FAILED)
        printf("EXITING");
        exit(EXIT_FAILURE);
    
    done = (int *)mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
    if (done  == MAP_FAILED)
        printf("exiting\n");
    
    *done = 0;

    for(i=0; i<numOfReducers; i++)
        buffer[i].Tuples = mmap(NULL, numOfSlots * sizeof(char *), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
        if(buffer[i].Tuples == MAP_FAILED)
            printf("EXITING");
            exit(EXIT_FAILURE);
        
        for(j=0; j<numOfSlots; j++)
            temp = (char *)mmap(NULL, 30 * sizeof(char), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
            temp = strcpy(temp, "");
            buffer[i].Tuples[j] = temp;
        
        sem_init(&buffer[i].buf_mutex, 1, 1);
        sem_init(&buffer[i].fill_count, 1, 0);
        sem_init(&buffer[i].empty_count, 1, numOfSlots);
    

    for(r=0;r<numOfReducers;r++) // loop will run n times (n=5)
        if(fork() == 0)
            printf("[son] pid %d from [parent] pid %d\n",getpid(),getppid());
            reducer(r, buffer, done);
            exit(0);
        
    
    mapper();
    for(r=0;r<numOfReducers;r++) // loop will run n times (n=5)
    wait(NULL);

【讨论】:

以上是关于使用共享内存(mmap)和信号量的进程间通信的主要内容,如果未能解决你的问题,请参考以下文章

(转载)linux下的僵尸进程处理SIGCHLD信号Linux环境进程间通信: 共享内存(下)

再探进程间通信

再探进程间通信

linux共享内存和mmap的区别

Python进程间通信之共享内存

共享内存原理