AnyEvent::RabbitMQ 关闭通道的问题
Posted
技术标签:
【中文标题】AnyEvent::RabbitMQ 关闭通道的问题【英文标题】:AnyEvent::RabbitMQ issues with closed channels 【发布时间】:2015-11-05 02:37:24 【问题描述】:我正在编写一个用于将消息发布到消息队列 (RabbitMQ) 的主程序。该程序是用 Perl 5 编写的,并使用AnyEvent::RabbitMQ 与 RabbitMQ 进行通信。
以下最小示例(对于我遇到的问题)将在通过同一通道发送第二个命令时失败,并出现错误“通道已关闭”。
use strictures 2;
use AnyEvent::RabbitMQ;
main();
############################################################################
sub main
_log( debug => 'main' );
my $condvar = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new;
$ar->load_xml_spec;
_log( debug => 'Connecting to RabbitMQ...' );
$ar->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
on_success => sub _on_connect_success( $condvar, $ar, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
$condvar->recv;
$ar->close;
return;
############################################################################
sub _on_connect_success
my ( $condvar, $ar, $new_ar ) = @_;
_log( debug => 'Connected to RabbitMQ.' );
_open_channel( $condvar, $new_ar );
return;
############################################################################
sub _open_channel
my ( $condvar, $ar ) = @_;
_log( debug => 'Opening RabbitMQ channel...' );
$ar->open_channel(
on_success => sub _on_open_channel_success( $condvar, $ar, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
############################################################################
sub _on_open_channel_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
_declare_queue( $condvar, $ar, $channel );
return;
############################################################################
sub _declare_queue
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ queue...' );
$channel->declare_queue(
queue => 'test',
auto_delete => 1,
passive => 0,
durable => 0,
exclusive => 0,
no_ack => 1,
ticket => 0,
on_success =>
sub _on_declare_queue_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
############################################################################
sub _on_declare_queue_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ queue.' );
_bind_queue( $condvar, $ar, $channel );
return;
############################################################################
sub _bind_queue
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ queue...' );
$channel->bind_queue(
queue => 'test',
exchange => '',
routing_key => '',
on_success => sub _on_bind_queue_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
############################################################################
sub _on_bind_queue_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ queue.' );
_log( info => 'Master ready to publish messages.' );
_publish_message( $condvar, $ar, $channel, 'Hello, world!' );
return;
############################################################################
sub _publish_message
my ( $condvar, $ar, $channel, $message ) = @_;
_log( debug => "Publishing RabbitMQ message ($message)..." );
$channel->publish(
queue => 'test',
exchange => '',
routing_key => '',
body => $message,
header => ,
mandatory => 0,
immediate => 0,
on_success =>
sub _on_publish_message_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
on_ack => sub _error( $condvar, $ar, 'ack', @_ ) ,
on_nack => sub _error( $condvar, $ar, 'nack', @_ ) ,
);
return;
############################################################################
sub _on_publish_message_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => "Published RabbitMQ message." );
sleep 1;
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
return;
############################################################################
sub _error
my ( $condvar, $ar, $type, @error ) = @_;
_log( error => sprintf '%s - %s', $type, join ', ', @error );
$condvar->send( $condvar, $ar, $type, @error );
return;
############################################################################
sub _log
my ( $level, $message ) = @_;
my @time = gmtime time;
$time[5] += 1900;
$time[4] += 1;
my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
my @caller0 = caller(0);
my @caller1 = caller(1);
my $subroutine = $caller1[3];
$subroutine =~ s/^$caller0[0]:://;
print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
return;
这个程序应该:
连接到 RabbitMQ 打开一个 RabbitMQ 通道 声明一个简单队列(名为“test”) 绑定到该队列(名为“test”) 发布消息(“Hello, world!”) 成功发布消息后稍等片刻,再发布一条消息这个程序(主程序)应该不消费消息。还有其他程序可以完成这项工作。
最小示例(见上文)将产生以下输出:
2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22)
2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37)
2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46)
2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59)
2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76)
2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89)
2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102)
2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50)
为什么AnyEvent::RabbitMQ
或 RabbitMQ 本身会关闭通道(不是连接还是我错过了什么)?
【问题讨论】:
【参考方案1】:如果您查看 RabbitMQ 服务器日志,您会看到如下内容:
amqp_error,access_refused,"默认交换上不允许操作",'queue.bind'
显然它不允许您在默认交换上绑定队列。所以你需要先声明和绑定你自己的exchange。
sub _declare_exchange
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ exchange...' );
$channel->declare_exchange(
exchange => 'testest',
type => 'fanout',
on_success =>
sub _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
############################################################################
sub _on_declare_exchange_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ exchange.' );
_bind_exchange( $condvar, $ar, $channel );
return;
############################################################################
sub _bind_exchange
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ exchange...' );
$channel->bind_exchange(
source => 'testest',
destination => 'testest',
routing_key => '',
on_success => sub _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
一旦你设置了这些潜艇,告诉你的程序使用这个自定义交换。
sub _on_open_channel_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
$channel->confirm;
_declare_exchange( $condvar, $ar, $channel );
return;
当您向队列发送消息时,$channel->confirm
是使 RabbitMQ 回答并确认所必需的。如果你不这样做,成功处理程序将永远不会被调用,因为没有成功响应返回。
然后在您的_bind_queue
中,您需要将交换添加到bind_queue()
调用中。
$channel->bind_queue(
queue => 'test',
exchange => 'testest', # <-- here
routing_key => '',
# ...
);
同样需要在_publish_message
和publish()
调用中完成。在那里,您还应该将 on_ack
处理程序替换为实际处理确认的内容。我认为您打算这样做,但出现复制/粘贴错误1。
$channel->publish(
queue => 'test',
exchange => 'testest', # <-- here
routing_key => '',
# ...
on_ack => sub
_on_publish_message_success( $condvar, $ar, $channel, @_ );
,
);
另一件事是,当您使用 AnyEvent 时,_on_publish_message_success
中的 sleep
调用不是一个好主意,因为这将停止整个程序。请改用AE::timer
。
my $t;
$t = AE::timer(1,0,sub
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
undef $t;
);
这是包含所有更改的完整代码。
use strictures 2;
use AnyEvent::RabbitMQ;
main();
############################################################################
sub main
_log( debug => 'main' );
my $condvar = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new;
$ar->load_xml_spec;
_log( debug => 'Connecting to RabbitMQ...' );
$ar->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/guest',
timeout => 1,
tls => 0,
on_success => sub _on_connect_success( $condvar, $ar, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
$condvar->recv;
$ar->close;
return;
############################################################################
sub _on_connect_success
my ( $condvar, $ar, $new_ar ) = @_;
_log( debug => 'Connected to RabbitMQ.' );
_open_channel( $condvar, $new_ar );
return;
############################################################################
sub _open_channel
my ( $condvar, $ar ) = @_;
_log( debug => 'Opening RabbitMQ channel...' );
$ar->open_channel(
on_success => sub _on_open_channel_success( $condvar, $ar, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
############################################################################
sub _on_open_channel_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
$channel->confirm;
_declare_exchange( $condvar, $ar, $channel );
return;
############################################################################
sub _declare_exchange
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ exchange...' );
$channel->declare_exchange(
exchange => 'testest',
type => 'fanout',
on_success =>
sub _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
############################################################################
sub _on_declare_exchange_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ exchange.' );
_bind_exchange( $condvar, $ar, $channel );
return;
############################################################################
sub _bind_exchange
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ exchange...' );
$channel->bind_exchange(
source => 'testest',
destination => 'testest',
routing_key => '',
on_success => sub _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
############################################################################
sub _on_bind_exchange_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ exchange.' );
_declare_queue( $condvar, $ar, $channel );
return;
############################################################################
sub _declare_queue
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ queue...' );
$channel->declare_queue(
queue => 'test',
auto_delete => 1,
passive => 0,
durable => 0,
exclusive => 0,
no_ack => 1,
ticket => 0,
on_success =>
sub _on_declare_queue_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
############################################################################
sub _on_declare_queue_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ queue.' );
_bind_queue( $condvar, $ar, $channel );
return;
############################################################################
sub _bind_queue
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ queue...' );
$channel->bind_queue(
queue => 'test',
exchange => 'testest',
routing_key => '',
on_success => sub _on_bind_queue_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
);
return;
############################################################################
sub _on_bind_queue_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ queue.' );
_log( info => 'Master ready to publish messages.' );
_publish_message( $condvar, $ar, $channel, 'Hello, world!' );
return;
############################################################################
sub _publish_message
my ( $condvar, $ar, $channel, $message ) = @_;
_log( debug => "Publishing RabbitMQ message ($message)..." );
$channel->publish(
queue => 'test',
exchange => 'testest',
routing_key => '',
body => $message,
header => ,
mandatory => 0,
immediate => 0,
on_success =>
sub _on_publish_message_success( $condvar, $ar, $channel, @_ ) ,
on_failure => sub _error( $condvar, $ar, 'failure', @_ ) ,
on_read_failure => sub _error( $condvar, $ar, 'read_failure', @_ ) ,
on_return => sub _error( $condvar, $ar, 'return', @_ ) ,
on_close => sub _error( $condvar, $ar, 'close', @_ ) ,
on_ack => sub
_on_publish_message_success( $condvar, $ar, $channel, @_ );
# _error( $condvar, $ar, 'ack', @_ )
,
on_nack => sub _error( $condvar, $ar, 'nack', @_ ) ,
);
return;
############################################################################
sub _on_publish_message_success
my ( $condvar, $ar, $channel ) = @_;
_log( debug => "Published RabbitMQ message." );
my $t; $t=AE::timer(1,0,sub
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
undef $t;
);
return;
############################################################################
sub _error
my ( $condvar, $ar, $type, @error ) = @_;
_log( error => sprintf '%s - %s', $type, join ', ', @error );
$condvar->send( $condvar, $ar, $type, @error );
return;
############################################################################
sub _log
my ( $level, $message ) = @_;
my @time = gmtime time;
$time[5] += 1900;
$time[4] += 1;
my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
my @caller0 = caller(0);
my @caller1 = caller(1);
my $subroutine = $caller1[3];
$subroutine =~ s/^$caller0[0]:://;
print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
return;
1)在某些地方你需要给你的同事买啤酒:)
【讨论】:
以上是关于AnyEvent::RabbitMQ 关闭通道的问题的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin 协程Channel 通道 ④ ( Channel 通道的热数据流属性 | Channel 通道关闭过程 | Channel 通道关闭代码示例 )