AnyEvent::RabbitMQ 关闭通道的问题

Posted

技术标签:

【中文标题】AnyEvent::RabbitMQ 关闭通道的问题【英文标题】:AnyEvent::RabbitMQ issues with closed channels 【发布时间】:2015-11-05 02:37:24 【问题描述】:

我正在编写一个用于将消息发布到消息队列 (RabbitMQ) 的主程序。该程序是用 Perl 5 编写的,并使用AnyEvent::RabbitMQ 与 RabbitMQ 进行通信。

以下最小示例(对于我遇到的问题)将在通过同一通道发送第二个命令时失败,并出现错误“通道已关闭”。

use strictures 2;

use AnyEvent::RabbitMQ;

main();

############################################################################
sub main 
  _log( debug => 'main' );
  my $condvar = AnyEvent->condvar;
  my $ar      = AnyEvent::RabbitMQ->new;
  $ar->load_xml_spec;
  _log( debug => 'Connecting to RabbitMQ...' );
  $ar->connect(
    host            => 'localhost',
    port            => 5672,
    user            => 'guest',
    pass            => 'guest',
    vhost           => '/',
    timeout         => 1,
    tls             => 0,
    on_success      => sub  _on_connect_success( $condvar, $ar, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure', @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return', @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close', @_ ) ,
  );
  $condvar->recv;
  $ar->close;
  return;


############################################################################
sub _on_connect_success 
  my ( $condvar, $ar, $new_ar ) = @_;
  _log( debug => 'Connected to RabbitMQ.' );
  _open_channel( $condvar, $new_ar );
  return;


############################################################################
sub _open_channel 
  my ( $condvar, $ar ) = @_;
  _log( debug => 'Opening RabbitMQ channel...' );
  $ar->open_channel(
    on_success => sub  _on_open_channel_success( $condvar, $ar, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;


############################################################################
sub _on_open_channel_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Opened RabbitMQ channel.' );
  _declare_queue( $condvar, $ar, $channel );
  return;


############################################################################
sub _declare_queue 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declaring RabbitMQ queue...' );
  $channel->declare_queue(
    queue       => 'test',
    auto_delete => 1,
    passive     => 0,
    durable     => 0,
    exclusive   => 0,
    no_ack      => 1,
    ticket      => 0,
    on_success =>
      sub  _on_declare_queue_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;


############################################################################
sub _on_declare_queue_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declared RabbitMQ queue.' );
  _bind_queue( $condvar, $ar, $channel );
  return;


############################################################################
sub _bind_queue 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binding RabbitMQ queue...' );
  $channel->bind_queue(
    queue       => 'test',
    exchange    => '',
    routing_key => '',
    on_success => sub  _on_bind_queue_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;


############################################################################
sub _on_bind_queue_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binded RabbitMQ queue.' );
  _log( info  => 'Master ready to publish messages.' );
  _publish_message( $condvar, $ar, $channel, 'Hello, world!' );
  return;


############################################################################
sub _publish_message 
  my ( $condvar, $ar, $channel, $message ) = @_;
  _log( debug => "Publishing RabbitMQ message ($message)..." );
  $channel->publish(
    queue       => 'test',
    exchange    => '',
    routing_key => '',
    body        => $message,
    header      => ,
    mandatory   => 0,
    immediate   => 0,
    on_success =>
      sub  _on_publish_message_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
    on_ack          => sub  _error( $condvar, $ar, 'ack',          @_ ) ,
    on_nack         => sub  _error( $condvar, $ar, 'nack',         @_ ) ,
  );
  return;


############################################################################
sub _on_publish_message_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => "Published RabbitMQ message." );
  sleep 1;
  _publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
  return;


############################################################################
sub _error 
  my ( $condvar, $ar, $type, @error ) = @_;
  _log( error => sprintf '%s - %s', $type, join ', ', @error );
  $condvar->send( $condvar, $ar, $type, @error );
  return;


############################################################################
sub _log 
  my ( $level, $message ) = @_;
  my @time = gmtime time;
  $time[5] += 1900;
  $time[4] += 1;
  my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
  my @caller0    = caller(0);
  my @caller1    = caller(1);
  my $subroutine = $caller1[3];
  $subroutine =~ s/^$caller0[0]:://;
  print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
  return;

这个程序应该:

连接到 RabbitMQ 打开一个 RabbitMQ 通道 声明一个简单队列(名为“test”) 绑定到该队列(名为“test”) 发布消息(“Hello, world!”) 成功发布消息后稍等片刻,再发布一条消息

这个程序(主程序)应该消费消息。还有其他程序可以完成这项工作。

最小示例(见上文)将产生以下输出:

2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22)
2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37)
2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46)
2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59)
2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76)
2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89)
2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102)
2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50)

为什么AnyEvent::RabbitMQ 或 RabbitMQ 本身会关闭通道(不是连接还是我错过了什么)?

【问题讨论】:

【参考方案1】:

如果您查看 RabbitMQ 服务器日志,您会看到如下内容:

amqp_error,access_refused,"默认交换上不允许操作",'queue.bind'

显然它不允许您在默认交换上绑定队列。所以你需要先声明和绑定你自己的exchange

sub _declare_exchange 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declaring RabbitMQ exchange...' );
  $channel->declare_exchange(
    exchange        => 'testest',
    type            => 'fanout',
    on_success =>
      sub  _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;


############################################################################
sub _on_declare_exchange_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declared RabbitMQ exchange.' );
  _bind_exchange( $condvar, $ar, $channel );
  return;


############################################################################
sub _bind_exchange 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binding RabbitMQ exchange...' );
  $channel->bind_exchange(
    source      => 'testest',
    destination => 'testest',
    routing_key => '',
    on_success => sub  _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;

一旦你设置了这些潜艇,告诉你的程序使用这个自定义交换。

sub _on_open_channel_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Opened RabbitMQ channel.' );
  $channel->confirm;
  _declare_exchange( $condvar, $ar, $channel );
  return;

当您向队列发送消息时,$channel->confirm 是使 RabbitMQ 回答并确认所必需的。如果你不这样做,成功处理程序将永远不会被调用,因为没有成功响应返回。

然后在您的_bind_queue 中,您需要将交换添加到bind_queue() 调用中。

  $channel->bind_queue(
    queue       => 'test',
    exchange    => 'testest', # <-- here
    routing_key => '',
    # ...
  );

同样需要在_publish_messagepublish() 调用中完成。在那里,您还应该将 on_ack 处理程序替换为实际处理确认的内容。我认为您打算这样做,但出现复制/粘贴错误1

$channel->publish(
  queue       => 'test',
  exchange    => 'testest', # <-- here
  routing_key => '',
  # ...
  on_ack          => sub  
  _on_publish_message_success( $condvar, $ar, $channel, @_ );
  ,
);

另一件事是,当您使用 AnyEvent 时,_on_publish_message_success 中的 sleep 调用不是一个好主意,因为这将停止整个程序。请改用AE::timer

my $t; 
$t = AE::timer(1,0,sub 
  _publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
  undef $t;
);

这是包含所有更改的完整代码。

use strictures 2;

use AnyEvent::RabbitMQ;

main();

############################################################################
sub main 
  _log( debug => 'main' );
  my $condvar = AnyEvent->condvar;
  my $ar      = AnyEvent::RabbitMQ->new;
  $ar->load_xml_spec;
  _log( debug => 'Connecting to RabbitMQ...' );
  $ar->connect(
    host            => 'localhost',
    port            => 5672,
    user            => 'guest',
    pass            => 'guest',
    vhost           => '/guest',
    timeout         => 1,
    tls             => 0,
    on_success      => sub  _on_connect_success( $condvar, $ar, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure', @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return', @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close', @_ ) ,
  );
  $condvar->recv;
  $ar->close;
  return;


############################################################################
sub _on_connect_success 
  my ( $condvar, $ar, $new_ar ) = @_;
  _log( debug => 'Connected to RabbitMQ.' );
  _open_channel( $condvar, $new_ar );
  return;


############################################################################
sub _open_channel 
  my ( $condvar, $ar ) = @_;
  _log( debug => 'Opening RabbitMQ channel...' );
  $ar->open_channel(
    on_success => sub  _on_open_channel_success( $condvar, $ar, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;


############################################################################
sub _on_open_channel_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Opened RabbitMQ channel.' );
  $channel->confirm;
  _declare_exchange( $condvar, $ar, $channel );
  return;


############################################################################
sub _declare_exchange 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declaring RabbitMQ exchange...' );
  $channel->declare_exchange(
    exchange        => 'testest',
    type            => 'fanout',
    on_success =>
      sub  _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;


############################################################################
sub _on_declare_exchange_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declared RabbitMQ exchange.' );
  _bind_exchange( $condvar, $ar, $channel );
  return;


############################################################################
sub _bind_exchange 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binding RabbitMQ exchange...' );
  $channel->bind_exchange(
    source      => 'testest',
    destination => 'testest',
    routing_key => '',
    on_success => sub  _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;


############################################################################
sub _on_bind_exchange_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binded RabbitMQ exchange.' );
  _declare_queue( $condvar, $ar, $channel );
  return;



############################################################################
sub _declare_queue 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declaring RabbitMQ queue...' );
  $channel->declare_queue(
    queue       => 'test',
    auto_delete => 1,
    passive     => 0,
    durable     => 0,
    exclusive   => 0,
    no_ack      => 1,
    ticket      => 0,
    on_success =>
      sub  _on_declare_queue_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;


############################################################################
sub _on_declare_queue_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declared RabbitMQ queue.' );
  _bind_queue( $condvar, $ar, $channel );
  return;


############################################################################
sub _bind_queue 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binding RabbitMQ queue...' );
  $channel->bind_queue(
    queue       => 'test',
    exchange    => 'testest',
    routing_key => '',
    on_success => sub  _on_bind_queue_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
  );
  return;


############################################################################
sub _on_bind_queue_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binded RabbitMQ queue.' );
  _log( info  => 'Master ready to publish messages.' );
  _publish_message( $condvar, $ar, $channel, 'Hello, world!' );
  return;


############################################################################
sub _publish_message 
  my ( $condvar, $ar, $channel, $message ) = @_;
  _log( debug => "Publishing RabbitMQ message ($message)..." );
  $channel->publish(
    queue       => 'test',
    exchange    => 'testest',
    routing_key => '',
    body        => $message,
    header      => ,
    mandatory   => 0,
    immediate   => 0,
    on_success =>
      sub  _on_publish_message_success( $condvar, $ar, $channel, @_ ) ,
    on_failure      => sub  _error( $condvar, $ar, 'failure',      @_ ) ,
    on_read_failure => sub  _error( $condvar, $ar, 'read_failure', @_ ) ,
    on_return       => sub  _error( $condvar, $ar, 'return',       @_ ) ,
    on_close        => sub  _error( $condvar, $ar, 'close',        @_ ) ,
    on_ack          => sub  
        _on_publish_message_success( $condvar, $ar, $channel, @_ );
#        _error( $condvar, $ar, 'ack',          @_ )    
    ,
    on_nack         => sub  _error( $condvar, $ar, 'nack',         @_ ) ,
  );
  return;


############################################################################
sub _on_publish_message_success 
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => "Published RabbitMQ message." );
  my $t; $t=AE::timer(1,0,sub 
      _publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
      undef $t;
  );
  return;


############################################################################
sub _error 
  my ( $condvar, $ar, $type, @error ) = @_;
  _log( error => sprintf '%s - %s', $type, join ', ', @error );
  $condvar->send( $condvar, $ar, $type, @error );
  return;


############################################################################
sub _log 
  my ( $level, $message ) = @_;
  my @time = gmtime time;
  $time[5] += 1900;
  $time[4] += 1;
  my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
  my @caller0    = caller(0);
  my @caller1    = caller(1);
  my $subroutine = $caller1[3];
  $subroutine =~ s/^$caller0[0]:://;
  print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
  return;


1)在某些地方你需要给你的同事买啤酒:)

【讨论】:

以上是关于AnyEvent::RabbitMQ 关闭通道的问题的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Channel 通道 ④ ( Channel 通道的热数据流属性 | Channel 通道关闭过程 | Channel 通道关闭代码示例 )

运行了“自动关闭空闲的IDE通道”怎么恢复?

信号 goroutine 在通道关闭时停止

关闭和发送到通道之间的竞争条件

如何测试通道是不是关闭并仅在未关闭时发送给它

Golang✔️走进 Go 语言✔️ 第十八课 通道关闭 & 工作池