c++实现RabbitMQ简单的生产者和消费者

Posted MFT小白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c++实现RabbitMQ简单的生产者和消费者相关的知识,希望对你有一定的参考价值。

基本思想:利用c++实现RabbitMQ简单的生产者和消费者

CMakeList.txt

cmake_minimum_required(VERSION 3.16)
project(producer)

set(CMAKE_CXX_STANDARD 14)

add_executable(producer main.cpp)

target_link_libraries(producer rabbitmq)

producer.cpp

#include <iostream>
#include <string>
#include <unistd.h>
#include <amqp.h>
#include <cstring>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
using namespace std;

int main() 
    string hostName = "127.0.0.1"; // ip 默认值
    int port = 5672; // 端口号 默认值
    amqp_socket_t *socket = nullptr;
    amqp_connection_state_t conn;

    conn = amqp_new_connection();
    socket = amqp_tcp_socket_new(conn);

    if (!socket) 
        cout << "create socket failed!";
        exit(1);
    

    if (amqp_socket_open(socket, hostName.c_str(), port)) 
        cout << "opening TCP socket failed" << endl;
        exit(1);
    

    // 登录  ("/" 虚拟机默认值)
    if(1 != amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest").reply_type) 
        cout << "login failed" << endl;
    

    // 创建管道(链接)
    amqp_channel_open(conn, 1);

    // 创建队列
    amqp_bytes_t queue;
    char mac[20] = "queue_test";
    queue.bytes = mac;
    queue.len = strlen(mac);
    //声明队列
    amqp_queue_declare_ok_t *declare = amqp_queue_declare(conn, 1, queue, 0, 0, 0, 1, amqp_empty_table);

    int i = 0;
    while (true) 
        string str = "Hello World " + to_string(i);
        char message[64] = '\\0';
        strcpy(message, str.c_str());
        amqp_bytes_t  message_bytes;
        message_bytes.len = sizeof(message);
        message_bytes.bytes = message;

        // 发送消息
        amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("queue_test"), 0, 0, nullptr, message_bytes);
        cout << "[已发送] " << str << endl;
        i++;
        sleep(1);
        if(i == 5)
            break;
    
    cout << "send msg over! " << endl;

    // 释放资源
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);

//    getchar();
    return 0;

队列中存储5条生产者消息

 consumer.cpp

#include <iostream>
#include <string>
#include <amqp.h>
#include <ctime>
#include <unistd.h>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
using namespace std;

void delay_msec(int msec)

    clock_t now = clock();
    while(clock() - now < msec);


int main() 

    string name = "received";
    string delayStr = "1";
    //cout << "The delayStr is : " << delayStr << endl;
    int delay = std::stoi(delayStr);
    //cout << "The delay is : " << delay << endl;
    //string name = "one";
    //int delay = 1;

    string hostName = "127.0.0.1";
    int port = 5672;
    amqp_socket_t *socket = nullptr;
    amqp_connection_state_t conn;

    conn = amqp_new_connection();
    socket = amqp_tcp_socket_new(conn);

    if(!socket)
        cout << "create socket failed!";
        exit(1);
    

    if(amqp_socket_open(socket, hostName.c_str(), port)) 
        cout << "opening TCP socket failed" << endl;
        exit(1);
    

    //登录
    if(1 != amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest").reply_type) 
        cout << "login failed" << endl;
        exit(1);
    

    amqp_channel_open(conn, 1);

    while (true) 
        amqp_basic_get(conn, 1, amqp_cstring_bytes("queue_test"), 1);
        amqp_message_t *msg = new amqp_message_t;
        amqp_read_message(conn, 1, msg, 0);
        cout << "[" << name << "] The result is : " << (char *)msg->body.bytes << endl;
        amqp_destroy_message(msg);
        delete msg;
        sleep(1);
//        delay_msec(1000 * delay);
    

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);

//    getchar();
    return 0;

以上是关于c++实现RabbitMQ简单的生产者和消费者的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ第二篇:java简单的实现RabbitMQ

RabbitMQ之工作队列

RabbitMQ基础概念详解——环境配置及模拟生产者和消费者简单消息发送

RabbitMQ实战-死信队列

RabbitMQ集群

RabbitMQ简单Java示例——生产者和消费者