ZeroMq PUB/SUB 模式无法正常工作
Posted
技术标签:
【中文标题】ZeroMq PUB/SUB 模式无法正常工作【英文标题】:ZeroMq PUB/SUB pattern not working properly 【发布时间】:2014-12-02 07:30:49 【问题描述】:我的要求:
-
高吞吐量,每秒至少 5000 条消息
交货顺序不重要
很明显,发布者不应该等待响应,也不应该关心订阅者是否正在监听
背景:
我正在为每条消息创建一个新线程,因为如果我不这样做,消息生成部分将超过发送线程的速度并且消息会丢失,因此每条消息的线程似乎是正确的方法
问题:
问题在于以某种方式开始发送 zMQ 消息的线程没有被终止(没有退出/完成)。下面这行好像有问题:
s_send(*client, request.str());
因为如果我删除它,那么线程会正常终止,所以可能是它导致问题的这一行,我的第一个猜测是线程正在等待响应,但是 zmq_PUB 是否等待响应?
这是我的代码:
void *SendHello(void *threadid)
long tid;
tid = (long) threadid;
//cout << "Hello World! Thread ID, " << tid << endl;
std::stringstream request;
//writing the hex as request to be sent to the server
request << tid;
s_send(*client, request.str());
pthread_exit(NULL);
int main()
int sequence = 0;
int NUM_THREADS = 1000;
while (1)
pthread_t threads[NUM_THREADS];
int rc;
int i;
for (i = 0; i < NUM_THREADS; i++)
cout << "main() : creating thread, " << i << endl;
rc = pthread_create(&threads[i], NULL, SendHello, (void *) i);
pthread_detach(threads[i]);
sched_yield();
if (rc)
cout << "Error:unable to create thread," << rc << endl;
exit(-1);
//usleep(1000);
sleep(1);
pthread_exit(NULL);
//delete client;
return 0;
我的问题:
我是否需要调整 zMQ 套接字以便 PUB 不等待回复我做错了什么?
编辑:
添加客户端定义:
static zmq::socket_t * s_client_socket(zmq::context_t & context)
std::cout << "I: connecting to server." << std::endl;
zmq::socket_t * client = new zmq::socket_t(context, ZMQ_SUB);
client->connect("tcp://localhost:5555");
// Configure socket to not wait at close time
int linger = 0;
client->setsockopt(ZMQ_LINGER, &linger, sizeof (linger));
return client;
zmq::context_t context(1);
zmq::socket_t * client = s_client_socket(context);
【问题讨论】:
客户端定义在哪里?你确定这完全有效 ZMQ_PAIR 在您的套接字调用中?那不应该是ZMQ_SUB吗? @DavidBrabant 你是对的,实际上它是 ZMQ_SUB,现在更正它 【参考方案1】:但是 zmq_PUB 会等待响应吗?
不,如果您的插座不是 PUB 插座并且您达到了高水位线,则可能会出现这种情况,但事实并非如此。消息是否被发送?
【讨论】:
是的,他们有,哦你的意思是说如果他们没有收到,线程就不会完成?所以 PUBLISHER 依赖于 SUBSCRIBER?它保存消息直到任何 SUBSCRIBER 消费它? 不,PUB 套接字永远不会阻塞。我已经尝试过这种情况,即使没有订阅者,发布者也会继续发布,这是有道理的。一定是哪里出了问题……以上是关于ZeroMq PUB/SUB 模式无法正常工作的主要内容,如果未能解决你的问题,请参考以下文章
使用 ZeroMQ 使用 PUB/SUB 实现 Master/Worker 模式