c++实现RabbitMQ简单的生产者和消费者
Posted MFT小白
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c++实现RabbitMQ简单的生产者和消费者相关的知识,希望对你有一定的参考价值。
基本思想:利用c++实现RabbitMQ简单的生产者和消费者
CMakeList.txt
cmake_minimum_required(VERSION 3.16) project(producer) set(CMAKE_CXX_STANDARD 14) add_executable(producer main.cpp) target_link_libraries(producer rabbitmq)
producer.cpp
#include <iostream>
#include <string>
#include <unistd.h>
#include <amqp.h>
#include <cstring>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
using namespace std;
int main()
string hostName = "127.0.0.1"; // ip 默认值
int port = 5672; // 端口号 默认值
amqp_socket_t *socket = nullptr;
amqp_connection_state_t conn;
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket)
cout << "create socket failed!";
exit(1);
if (amqp_socket_open(socket, hostName.c_str(), port))
cout << "opening TCP socket failed" << endl;
exit(1);
// 登录 ("/" 虚拟机默认值)
if(1 != amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest").reply_type)
cout << "login failed" << endl;
// 创建管道(链接)
amqp_channel_open(conn, 1);
// 创建队列
amqp_bytes_t queue;
char mac[20] = "queue_test";
queue.bytes = mac;
queue.len = strlen(mac);
//声明队列
amqp_queue_declare_ok_t *declare = amqp_queue_declare(conn, 1, queue, 0, 0, 0, 1, amqp_empty_table);
int i = 0;
while (true)
string str = "Hello World " + to_string(i);
char message[64] = '\\0';
strcpy(message, str.c_str());
amqp_bytes_t message_bytes;
message_bytes.len = sizeof(message);
message_bytes.bytes = message;
// 发送消息
amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("queue_test"), 0, 0, nullptr, message_bytes);
cout << "[已发送] " << str << endl;
i++;
sleep(1);
if(i == 5)
break;
cout << "send msg over! " << endl;
// 释放资源
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
// getchar();
return 0;
队列中存储5条生产者消息
consumer.cpp
#include <iostream>
#include <string>
#include <amqp.h>
#include <ctime>
#include <unistd.h>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
using namespace std;
void delay_msec(int msec)
clock_t now = clock();
while(clock() - now < msec);
int main()
string name = "received";
string delayStr = "1";
//cout << "The delayStr is : " << delayStr << endl;
int delay = std::stoi(delayStr);
//cout << "The delay is : " << delay << endl;
//string name = "one";
//int delay = 1;
string hostName = "127.0.0.1";
int port = 5672;
amqp_socket_t *socket = nullptr;
amqp_connection_state_t conn;
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if(!socket)
cout << "create socket failed!";
exit(1);
if(amqp_socket_open(socket, hostName.c_str(), port))
cout << "opening TCP socket failed" << endl;
exit(1);
//登录
if(1 != amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest").reply_type)
cout << "login failed" << endl;
exit(1);
amqp_channel_open(conn, 1);
while (true)
amqp_basic_get(conn, 1, amqp_cstring_bytes("queue_test"), 1);
amqp_message_t *msg = new amqp_message_t;
amqp_read_message(conn, 1, msg, 0);
cout << "[" << name << "] The result is : " << (char *)msg->body.bytes << endl;
amqp_destroy_message(msg);
delete msg;
sleep(1);
// delay_msec(1000 * delay);
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
// getchar();
return 0;
以上是关于c++实现RabbitMQ简单的生产者和消费者的主要内容,如果未能解决你的问题,请参考以下文章