rabbitmq消息流转分析

Posted 独立特行

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq消息流转分析相关的知识,希望对你有一定的参考价值。

连接(connection):消费者或者生产者与消息中间件建立的tcp连接;

频道(channel):tcp连接建立之后,必须现在连接上开频道,才能进行其他操作(原因?

登录(logging):建立频道之后,要登录到特定的虚拟机,一组虚拟机持有一组交换机和队列,其他虚拟机用户无法访问当前用户对应的虚拟机中的交换机和队列;

交换机(exchange):在rabbitmq消息中间件启动时就会创建一个默认的交换机(当然也可以人为创建),与连接无关,负责整个消息中间件中消息的投递;交换机不会存储消息,

如果没有任何队列与之绑定,那么交换机会丢弃收到的消息;

队列(queue):用来存储交换机投递过来的消息,通过路由键与交换机绑定,进行消息的持久化存储;

队列由消费者或者生产者连上消息中间件后自行创建,人为指定队列名称,如果当前创建的队列rabbitmq上已经存在,rabbitmq不会重复创建;

路由键(routingkey):交换机和队列进行消息投递的识别码,人为指定;

一、生产者发送消息:

conn = amqp_new_connection();//

socket = amqp_tcp_socket_new(conn);

status = amqp_socket_open(socket, hostname, port);

amqp_login(conn, "ois", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "ois", "1");

amqp_channel_open(conn, 1);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

amqp_bytes_t queuename;

{

???? amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("myqueue"), 0, 0, 0, 1,

???????? amqp_empty_table);

???? die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

???? queuename = amqp_bytes_malloc_dup(r->queue);

???? if (queuename.bytes == NULL) {

???????? fprintf(stderr, "Out of memory while copying queue name");

???????? return 1;

???? }

}//创建名称为myqueue的队列

amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey),

???? amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");//将队列myqueue通过路由键routingkey绑定到交换机exchange上

?

for (;;)

{

amqp_basic_properties_t props;

props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;

props.content_type = amqp_cstring_bytes("text/plain");

props.delivery_mode = 2; /* persistent delivery mode */

die_on_error(amqp_basic_publish(conn,

1,

amqp_cstring_bytes(exchange),

amqp_cstring_bytes(routingkey),

0,

0,

&props,

amqp_cstring_bytes("test message")),

"Publishing");

????microsleep(1*1000*100);

}//生产者发布消息,发布消息需要指定接收消息的交换机,以及路由键,交换机需要根据路由键投递消息到具体的队列中

二、消费者获取消息进行处理

static void run(amqp_connection_state_t conn)

{

uint64_t start_time = now_microseconds();

int received = 0;

int previous_received = 0;

uint64_t previous_report_time = start_time;

uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;

?

amqp_frame_t frame;

?

uint64_t now;

?

for (;;) {

amqp_rpc_reply_t ret;

amqp_envelope_t envelope;

?

now = now_microseconds();

if (now > next_summary_time) {

int countOverInterval = received - previous_received;

double intervalRate = countOverInterval / ((now - previous_report_time) / 1000000.0);

printf("%d ms: Received %d - %d since last report (%d Hz)\n",

(int)(now - start_time) / 1000, received, countOverInterval, (int) intervalRate);

?

previous_received = received;

previous_report_time = now;

next_summary_time += SUMMARY_EVERY_US;

}

?

amqp_maybe_release_buffers(conn);

ret = amqp_consume_message(conn, &envelope, NULL, 0);//定时获取队列中的消息

?

if (AMQP_RESPONSE_NORMAL != ret.reply_type) {

if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&

AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {

if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {

return;

}

?

if (AMQP_FRAME_METHOD == frame.frame_type) {

switch (frame.payload.method.id) {

case AMQP_BASIC_ACK_METHOD:

/* if we‘ve turned publisher confirms on, and we‘ve published a message

* here is a message being confirmed

*/

?

break;

case AMQP_BASIC_RETURN_METHOD:

/* if a published message couldn‘t be routed and the mandatory flag was set

* this is what would be returned. The message then needs to be read.

*/

{

amqp_message_t message;

ret = amqp_read_message(conn, frame.channel, &message, 0);

if (AMQP_RESPONSE_NORMAL != ret.reply_type) {

return;

}

?

amqp_destroy_message(&message);

}

?

break;

?

case AMQP_CHANNEL_CLOSE_METHOD:

/* a channel.close method happens when a channel exception occurs, this

* can happen by publishing to an exchange that doesn‘t exist for example

*

* In this case you would need to open another channel redeclare any queues

* that were declared auto-delete, and restart any consumers that were attached

* to the previous channel

*/

return;

?

case AMQP_CONNECTION_CLOSE_METHOD:

/* a connection.close method happens when a connection exception occurs,

* this can happen by trying to use a channel that isn‘t open for example.

*

* In this case the whole connection must be restarted.

*/

return;

?

default:

fprintf(stderr ,"An unexpected method was received %u\n", frame.payload.method.id);

return;

}

}

}

?

} else {

amqp_destroy_envelope(&envelope);

}

?

received++;

}

}

?

int main(int argc, char const *const *argv)

{

conn = amqp_new_connection();

socket = amqp_tcp_socket_new(conn);

status = amqp_socket_open(socket, hostname, port);

die_on_amqp_error(amqp_login(conn, "ot", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),

"Logging in");

amqp_channel_open(conn, 1);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

{

amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,

amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");

queuename = amqp_bytes_malloc_dup(r->queue);

if (queuename.bytes == NULL) {

fprintf(stderr, "Out of memory while copying queue name");

return 1;

}

}

?

amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),

amqp_empty_table);//绑定消息队列

die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");

?

amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");//指定这是一个消费者

?

run(conn);

?

die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");

die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");

die_on_error(amqp_destroy_connection(conn), "Ending connection");

?

return 0;

}

?

?

?

?

?

?

?

以上是关于rabbitmq消息流转分析的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ存储模型

RabbitMq初探——发布与订阅

rabbitmq学习笔记

rabbitmq学习笔记

RabbitMQ研究高级使用

RabbitMQ研究高级使用