如果它在事务中,则发送帧失败 (STOMP)

Posted

技术标签:

【中文标题】如果它在事务中,则发送帧失败 (STOMP)【英文标题】:SEND frame fails if it is in a transaction (STOMP) 【发布时间】:2014-01-03 01:04:47 【问题描述】:

我正在为 STOMP (Apollo) 创建自己的 php 网关,因为所有现有的解决方案要么太糟糕,要么太复杂。

所以当我在没有交易(但有收据)的情况下向队列发送消息时,一切正常,请查看日志:

SEND
destination:/queue/teststst
content-type:text/plain
content-length:100
receipt:f1dd03450508d938e6eb8196a6d128c4

2149b4a936862b121f7928ed5c060152
75235e0fbc8a56970ede75d2147b538b
fece2e4403b52fd903b8e7a78b1918c6

RECEIPT
receipt-id:f1dd03450508d938e6eb8196a6d128c4

但是当涉及到事务时,我明白了(我删除了握手日志,别担心):

BEGIN
transaction:499cc8a062be1235d312e968e5f30802
receipt:f7c837aed5ee9efd8f27143d85061067

RECEIPT
receipt-id:f7c837aed5ee9efd8f27143d85061067


SEND
destination:/queue/teststst
content-type:text/plain
content-length:100
transaction:499cc8a062be1235d312e968e5f30802
receipt:7048ce3f8a01b55294f5e92fcd501c93

f4c356386a563be82f460900889ea07f
bdd8a503ac3e5ddf0d816f95e9448e1e
e5b7ea903beb7f6300249217e9824ef0

然后什么都没有。我的包装器尝试接收接收帧,但只有一个 fgets 超时,看起来代理等待更多的 SEND 帧数据,但生成帧的过程是相同的,只是多了一个标头(事务)。所有必要的 EOL 和空八位字节都在它们的位置上。

阿波罗 v1.6,STOMP v1.2。

会是什么?..

更新:源代码

<?php
class Stompler 
  const EOL = "\n";
  const NULL_OCTET = "\x00";
  const STATE_HEADER = 1;
  const STATE_BODY   = 2;

  protected $subscription = false;
  protected $transactionStack;
  protected $connection;
  protected $socket;
  protected $possibleFrameTypes = [
    'server' => [
      'MESSAGE',
      'RECEIPT',
      'ERROR',
      'CONNECTED',
    ],
  ];

  public function send($message, $queueName, $async = false) 
    $this->connect();

    $params = [
      'destination' => $queueName,
      'content-type' => 'text/plain',
      'content-length' => mb_strlen($message . static::EOL),
    ];

    if (isset($this->transactionStack) && !$this->transactionStack->isEmpty()) 
      $params['transaction'] = $this->transactionStack->top();
    

    if ($async === false) 
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    

    $this->sendFrame('send', $params, $message);

    if ($async === false) $this->checkReceipt($receiptId, 'send');
  

  public function subscribe($queueName, $async = false, $autoAck = false) 
    $this->connect();

    if ($this->subscription === true) 
      throw new StomplerException('Another subscription has already been started');
    

    $this->subscription = true;

    $params = [
      'id' => 1,
      'destination' => $queueName,
      'ack' => ($autoAck === true ? 'auto' : 'client'),
    ];

    if ($async === false) 
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    

    $this->sendFrame('subscribe', $params);

    if ($async === false) $this->checkReceipt($receiptId, 'subscribe');
  

  public function unsubscribe() 
    if ($this->subscription === false) 
      throw new StomplerException('No subscription');
    

    $this->subscription = false;

    $this->sendFrame('unsubscribe', [
      'id' => 1,
    ]);
  

  public function connect() 
    if (!empty($this->connection)) return;

    $config = [...];

            $this->socket = fsockopen('tcp://' . $config['host'], $config['port']);
            if (empty($this->socket)) throw new StomplerConnectionException;

            stream_set_timeout($this->socket, 2);

            $this->sendFrame('connect', [
          'accept-version' => '1.2',
          'login'          => $config['login'],
          'passcode'       => $config['password'],
          'virtual-host'   => 'srv',
          'host'           => 'srv',
        ]);
            $frame = $this->readFrame();

            if ($frame['name'] === 'ERROR') 
          throw new StomplerConnectionException("Could not connect to broker: '$frame['headers']['message']' ($frame['body'])");
        
            if ($frame['name'] !== 'CONNECTED') 
          throw new StomplerConnectionException;
        

            $this->connection = $frame['headers']['session'];
        

  public function ack($message, $async = false) 
    $id = is_array($message) ? $message['headers']['ack'] : $message;

    $params = [
      'id' => $id,
    ];

    if (isset($this->transactionStack) && !$this->transactionStack->isEmpty()) 
      $params['transaction'] = $this->transactionStack->top();
    

    if ($async === false) 
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    

    $this->sendFrame('ack', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'ack');
  

  public function nack($message, $async = false) 
    $id = is_array($message) ? $message['headers']['ack'] : $message;

    $params = [
      'id' => $id,
    ];

    if (isset($this->transactionStack) && !$this->transactionStack->isEmpty()) 
      $params['transaction'] = $this->transactionStack->top();
    

    if ($async === false) 
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    

    $this->sendFrame('nack', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'nack');
  

  public function begin($async = false) 
    $this->connect();

    if ($this->transactionStack === null) 
      $this->transactionStack = new \SplStack();
    

    $this->transactionStack->unshift($this->generateRandom());

    $params = ['transaction' => $this->transactionStack->top()];

    if ($async === false) 
      $receiptId = $this->generateRandom();
      $params['receipt'] = $receiptId;
    

    $this->sendFrame('begin', $params);
    if ($async === false) $this->checkReceipt($receiptId, 'begin');
  

  public function commit() 
    if (empty($this->transactionStack) || $this->transactionStack->isEmpty()) 
      throw new StomplerException('No transaction started');
    

    $this->sendFrame('commit', ['transaction' => $this->transactionStack->pop()]);
  

  public function abort($transactionId) 
    if (empty($this->transactionStack) || $this->transactionStack->isEmpty()) 
      throw new StomplerException('No transaction started');
    

    $this->sendFrame('abort', ['transaction' => $this->transactionStack->pop()]);
  

  public function readFrame($expectedFrameName = null) 
    $started = time();
    $frame = [
      'name' => null,
      'headers' => [],
      'body' => null,
    ];
    $state = false;
    $frameName = false;

    //echo '-------RECV--------' . PHP_EOL;
    while (true) 
      if (feof($this->socket) || ((time() - $started) > 1)) return false;

      $frameLine = fgets($this->socket);

      echo $frameLine;

      if ($state === static::STATE_HEADER) 
        $header = rtrim($frameLine, static::EOL);
        if (!empty($header)) 
          list($k, $v) = explode(':', $header);
          $frame['headers'][$k] = $v;
        
      
      if ($state === static::STATE_BODY) $frame['body'] .= $frameLine;

      if ($state === false) 
        $frameName = $frame['name'] = rtrim($frameLine, static::EOL);
        if (!in_array($frameName, $this->possibleFrameTypes['server'])) 
          if (empty($frameName)) return false;
          throw new StomplerUnknownFrameException($frameName);
        
        if ($expectedFrameName !== null && $frameName !== mb_strtoupper($expectedFrameName)) 
          throw new StomplerUnexpectedFrameException($frameName);
        
        $state = static::STATE_HEADER;
      
      elseif ($state === static::STATE_HEADER && $frameLine === static::EOL) 
        $state = static::STATE_BODY;
      
      elseif ($state === static::STATE_BODY && $this->detectNullOctet($frameLine)) 
        break;
      
    
    //echo '-------RECV--------' . PHP_EOL;

    if ($frame['body'] !== null) $frame['body'] = rtrim($frame['body'], static::EOL . static::NULL_OCTET);
    if ($frame['name'] === null) return false;

    return $frame;
  

  private function sendFrame($frameName, $frameParams, $body = null) 
    $frame = $this->compileFrame($frameName, $frameParams, $body);
    //echo '=======SEND========' . PHP_EOL;
    echo $frame;
    //echo '=======SEND========' . PHP_EOL;
    $result = fwrite($this->socket, $frame);

    if (empty($result)) 
      $md = stream_get_meta_data($this->socket);
      if($md['timed_out']) throw new StomplerTimeoutConnectionException;
      throw new StomplerUnknownConnectionException;
    
  

  private function compileFrame($name, $headers, $body = null) 
    $result = mb_strtoupper($name) . static::EOL;

    foreach ($headers as $key => $value) 
      $result .= $key;
      if ($value !== false) $result .= ':' . $value;
      $result .= static::EOL;
    

    if ($body) $result .= static::EOL . $body;

    $result .= static::EOL . static::NULL_OCTET;

    return $result;
  

  private function detectNullOctet($string) 
    return strpos($string, static::NULL_OCTET) === (mb_strlen($string) - 2);
  

  private function checkReceipt($receiptId, $frameName) 
    $frameName = mb_strtoupper($frameName);

    try 
      $receiptFrame = $this->readFrame('RECEIPT');
      if ($receiptFrame['headers']['receipt-id'] != $receiptId) 
        throw new StomplerException("Wrong receipt for $frameName frame (expected $receiptFrame, received $receiptFrame['headers']['receipt-id'])");
      
    
    catch (StomplerUnexpectedFrameException $e) 
      throw new StomplerException("Could not receive receipt frame for $frameName frame (received $e->getMessage() frame)");
    
  

  private function generateRandom() 
    return md5(uniqid('', true));
  

  public function __destruct() 
    if (empty($this->socket)) return;

    $this->connection = null;
    fclose($this->socket);
  

【问题讨论】:

请用英文回答。我不太确定它是关于 PHP 的,更多的是关于 STOMP 本身。再说一遍:完全没有区别,只是多了两个帧(开始和提交/中止)和帧中的事务标头。 开个玩笑,抱歉。基本上你的PHP在哪里?这被标记为 PHP 问题,所以我希望看到一些代码。 你去吧,但我仍然对此持怀疑态度。我认为这比 PHP 更像是 STOMP 协议问题。 好吧,让我们看看我们能做些什么。 好的,看起来 readFrame 中的这段代码永远不会在无限循环中执行,对吗? elseif ($state === static::STATE_BODY && $this->detectNullOctet($frameLine)) break; 【参考方案1】:

解决了。如果您开始交易,则不需要收据标头,因此必须修复官方 STOMP 文档(可能是与 Apollo 相关的问题)。

【讨论】:

【参考方案2】:

您确定要发送 stomp COMMIT 帧吗?在相关事务提交之前,接收方不会得到事务处理的 SEND 帧。

【讨论】:

以上是关于如果它在事务中,则发送帧失败 (STOMP)的主要内容,如果未能解决你的问题,请参考以下文章

使用 Scala 向 ApolloMQ 发送 Stomp CONNECT 帧

引入sleuth依赖导致websocket 发送 STOMP 消息失败

引入sleuth依赖导致websocket 发送 STOMP 消息失败

STOMP 客户端中的心跳

如何将事务与 Stomp 和 ActiveMQ(和 Perl)一起使用?

分布式事务