使用信号量进行共享内存同步
Posted
技术标签:
【中文标题】使用信号量进行共享内存同步【英文标题】:Shared memory sync using semaphore 【发布时间】:2018-04-07 06:30:17 【问题描述】:我有两个代码:PRODUCER (PR) 和 CONSUMER (CO)。有一块内存(Mat)(准确地说是一个 3D 矩阵)需要在两个程序之间共享。我目前正在使用基于共享内存的 IPC 函数来共享两个代码之间的内存空间。
约束:
-
PR 是
Mat
的所有者,并执行更改矩阵值的迭代。 CO 是 Mat
的用户,仅读取值并用于进一步计算
PR 应先写入数据,然后应等待 CO 读取并使用 Matrix
的值,然后发出信号 PR 继续随着进一步的迭代,它应该像这样继续下去。
我目前使用的是-
制片人
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#define NOT_READY -1
#define FILLED 0
#define TAKEN 1
#define nx (400)
#define ny (400)
#define nz (400)
struct Memory
int status;
double u_x[nx+1][ny+2][nz+2];
int
main(int argc, char *argv[])
key_t ShmKEY;
int ShmID;
struct Memory *ShmPTR;
int i, j, k;
int niter = 5;
int sumX[niter],sumY[niter],sumZ[niter];
ShmKEY = ftok(".", 'x'); // getting the unique identifier key from directory location
ShmID = shmget(ShmKEY, sizeof(struct Memory), IPC_CREAT | 0666);
if (ShmID < 0)
printf("*** shmget error (server) ***\n");
exit(1);
ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
if ((int) ShmPTR == -1)
printf("*** shmat error (server) ***\n");
exit(1);
printf("Server attached the memory to its virtual space...\n");
ShmPTR->status = NOT_READY; // setting the status to be not ready before filling it
for (int m = 0; m < niter; m++)
for (i=0; i<=nx; i++) for (j=0; j<=ny+1; j++) for (k=0; k<=nz+1; k++)
ShmPTR->u_x[i][j][k] = m; // filling the array with iteration number (just for depiction purpose)
ShmPTR->status = FILLED; // change the status to Filled
//printf("Please start the client in another window...\n");
while (ShmPTR->status != TAKEN)
sleep(1);
printf("Server has detected the completion of its child...\n");
shmdt((void *) ShmPTR);
printf("Server has detached its shared memory...\n");
shmctl(ShmID, IPC_RMID, NULL);
printf("Server has removed its shared memory...\n");
printf("Server exits...\n");
exit(0);
消费者
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#define NOT_READY -1
#define FILLED 0
#define TAKEN 1
#define nx (400)
#define ny (400)
#define nz (400)
struct Memory
int status;
double u_x[nx+1][ny+2][nz+2];
int
main(void)
key_t ShmKEY;
int ShmID;
struct Memory *ShmPTR;
int i, j, k;
int niter = 5;
int sumX[niter];
ShmKEY = ftok(".", 'x');
ShmID = shmget(ShmKEY, sizeof(struct Memory), 0666);
if (ShmID < 0)
printf("*** shmget error (client) ***\n");
exit(1);
printf("Client has received a shared memory...\n");
ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
if ((int) ShmPTR == -1)
printf("*** shmat error (client) ***\n");
exit(1);
printf("Client has attached the shared memory to it's virtual memory space...\n");
for (int m =0; m<niter; m++)
sumX[m] = 0;
while (ShmPTR->status != FILLED)
;
printf("Client found the data is ready, performing sanity check...\n");
// read the integers and check for the sum
for (i=0; i<=nx; i++) for (j=0; j<=ny+1; j++) for (k=0; k<=nz+1; k++)
sumX[m] += ShmPTR->u_x[i][j][k];
printf("Cycle %d : sumX-> %d\n", m,sumX[m);
ShmPTR->status = TAKEN;
printf("Client has informed server data have been taken...\n");
shmdt((void *) ShmPTR);
printf("Client has detached its shared memory...\n");
printf("Client exits...\n");
exit(0);
我现在正在做的是使用名为status
的结构成员来防止出现竞争条件。从我到现在为止的阅读中,信号量允许在 IPC 中进行类似的操作。
问题:如何在此使用信号量,以便需要共享的内存空间只是数组而不是将其包装在具有自定义标志的结构中?
Edit1:或者 mutex
如果它比这个应用程序的信号量更好的话。
Edit2:遵循适用于此代码的@Stargateur 答案,但在nx
、ny
和nz
是变量的生产代码中,如何为结构由一个成员组成,该成员是一个可变长度多维数组? (当然它会一直持续到调用shmdt
和shmctl
)
【问题讨论】:
【参考方案1】:我会建议您使用两个信号量来实现您的功能,一个用于解锁 produser,一个用于解锁消费者。
如何在其中使用信号量,以便需要共享的内存空间只是数组而不是将其包装在带有自定义标志的结构中?
是的,但为什么要将数据和与数据关联的信号量分开呢?
我会做以下事情:
struct Memory
sem_t prod;
sem_t cons;
double u_x[nx+1][ny+2][nz+2];
;
// produser
sem_init(&ShmPTR->cons, !0, 0);
sem_init(&ShmPTR->prod, !0, 1);
for (int m = 0; m < niter; m++)
sem_wait(&ShmPTR->prod);
// ...
sem_post(&ShmPTR->cons);
// consumer
for (int m =0; m<niter; m++)
sem_wait(&ShmPTR->cons);
// ...
sem_post(&ShmPTR->prod);
或者如果它比信号量更好,那么可能是互斥锁 应用。
互斥量不能在进程之间共享。
顺便说一句,你使用int
来迭代数组,你应该使用size_t
【讨论】:
有点挣扎于信号量和数据的结合(还有更多这样的数组,我刚刚展示了一个这样的数组 u_x 存在 u_y 和 u_z )。有没有特别的地方可以参考我,了解更多关于信号量的知识。 @datapanda 好吧,我不知道是否有这样的资源。我的方法是阅读文档,这对我来说通常就足够了,man7.org/linux/man-pages/man7/sem_overview.7.html,当你完成对机制的理解时,唯一的限制就是你的想象力。如果您需要更详细的答案,您可以更新或创建新问题。 这确实在示例代码中无缝运行,让我在生产代码中尝试一下。附带说明一下,为什么在编译带有信号量的程序时必须链接 pthread 库(semaphore.h) @datapanda 历史原因,“使用 POSIX 信号量 API 的程序必须使用 cc -pthread 编译以链接实时库 librt。”,不是很重要,但它是必需的。 并希望它们在运行 Linux 的 HPC 集群上是可移植的,这些集群确实符合 POSIX。谢谢@Stargateur【参考方案2】:如果您打算将 PR 和 CO 保留为单独的进程,您可以尝试将其中一个进程与另一个进程分叉以使它们同步。在这种特殊情况下,我的建议是从 PR 流程中分叉 CO。以下是我的想法:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <sys/stat.h>
#include <sys/mman.h>
int main(int argc, char* argv[])
const char *name="SHARED";
const int SIZE = 4096;
pid_t pidA;
pidA = fork();
if (pidA < 0)
printf("forkA Failed" );
return 1;
else if (pidA == 0) // Child process A
// Read from the shared memory object.
exit(0);
else // Parent process
int shm_fd;
/* pointer to shared memory obect */
void* ptr;
/* create the shared memory object */
shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
/* configure the size of the shared memory object */
ftruncate(shm_fd, SIZE);
/* memory map the shared memory object */
ptr = mmap(0, SIZE, PROT_WRITE, MAP_SHARED, shm_fd, 0);
/* write to the shared memory object */
// Wait for child to read matrix
wait(NULL);
printf("Program finished------\n");
但我们都知道线程更轻量级,因此首选。您可以将信号量与两个线程一起使用,如下所示:
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
sem_t can_read,can_write; // declare two global semaphore
void* threadPR(void* arg)
while(true)
//wait
sem_wait(&can_write);
//Write to matrix
//signal
sem_post(&can_read);
void* threadCO(void* arg)
while(true)
//wait
sem_wait(&can_read);
//Read the matrix
//signal
sem_post(&can_write);
int main()
// initialize the semaphore
sem_init(&mutex, 0, 1);
// declare two threads
pthread_t t1,t2;
pthread_create(&t1,NULL,threadPR,NULL); // Run the PR thread
// do whatever needed before running CO
pthread_create(&t2,NULL,threadCO,NULL); // Run the CO thread
// wait for threads to join
pthread_join(t1,NULL);
pthread_join(t2,NULL);
// free the semaphore
sem_destroy(&mutex);
return 0;
您可以将所需的初始化作为全局变量添加到此实现中。
【讨论】:
在我的情况下,预期的用例不允许我分叉,因为有两个单独的代码,一个是 openMP 并行化的,另一个是 MPI 并行化的代码。 MPI 代码需要读取访问由 openMP 代码在每次迭代中创建和修改的内存块。 :( 你需要一个共享的公共存储来存储你的 openMP 和 MPI 并行程序。或者,如果适合您,您可以更改其中一个程序。或者你可以为这两个程序强制使用相同的上下文,我不建议并行程序,它会带来巨大的开销。 问题是 OpenMP 代码是一个遗留代码,很难以直接的方式移植到 MPI,而且 MPI 代码严重依赖于默认情况下需要 MPI 运行的开源库。所以我有点卡在一个奇怪的情况下,让我在 MPI 代码中需要的内存块由 OpenMP 代码共享。 我明白了。如果我是你,我会使用共享文件系统并在文件系统级别处理同步。 您能详细说明一下吗? (也许它会帮助我决定解决方案)我认为文件系统会导致磁盘 IO。为了给你更多的上下文,讨论中的两个代码都是 CFD 代码,我有兴趣分享巨大的速度矩阵。以上是关于使用信号量进行共享内存同步的主要内容,如果未能解决你的问题,请参考以下文章
深入详解Linux进程间通信之共享内存(Shared Memory)+信号量同步
深入详解Linux进程间通信之共享内存(Shared Memory)+信号量同步
深入详解Linux进程间通信之共享内存(Shared Memory)+信号量同步
在不同步的情况下读写 SysV 共享内存(使用信号量、C/C++、Linux)