如果它在事务中,则发送帧失败 (STOMP)
Posted
技术标签:
【中文标题】如果它在事务中,则发送帧失败 (STOMP)【英文标题】:SEND frame fails if it is in a transaction (STOMP) 【发布时间】:2014-01-03 01:04:47 【问题描述】:我正在为 STOMP (Apollo) 创建自己的 php 网关,因为所有现有的解决方案要么太糟糕,要么太复杂。
所以当我在没有交易(但有收据)的情况下向队列发送消息时,一切正常,请查看日志:
SEND
destination:/queue/teststst
content-type:text/plain
content-length:100
receipt:f1dd03450508d938e6eb8196a6d128c4
2149b4a936862b121f7928ed5c060152
75235e0fbc8a56970ede75d2147b538b
fece2e4403b52fd903b8e7a78b1918c6
RECEIPT
receipt-id:f1dd03450508d938e6eb8196a6d128c4
但是当涉及到事务时,我明白了(我删除了握手日志,别担心):
BEGIN
transaction:499cc8a062be1235d312e968e5f30802
receipt:f7c837aed5ee9efd8f27143d85061067
RECEIPT
receipt-id:f7c837aed5ee9efd8f27143d85061067
SEND
destination:/queue/teststst
content-type:text/plain
content-length:100
transaction:499cc8a062be1235d312e968e5f30802
receipt:7048ce3f8a01b55294f5e92fcd501c93
f4c356386a563be82f460900889ea07f
bdd8a503ac3e5ddf0d816f95e9448e1e
e5b7ea903beb7f6300249217e9824ef0
然后什么都没有。我的包装器尝试接收接收帧,但只有一个 fgets 超时,看起来代理等待更多的 SEND 帧数据,但生成帧的过程是相同的,只是多了一个标头(事务)。所有必要的 EOL 和空八位字节都在它们的位置上。
阿波罗 v1.6,STOMP v1.2。
会是什么?..
更新:源代码
<?php
class Stompler
const EOL = "\n";
const NULL_OCTET = "\x00";
const STATE_HEADER = 1;
const STATE_BODY = 2;
protected $subscription = false;
protected $transactionStack;
protected $connection;
protected $socket;
protected $possibleFrameTypes = [
'server' => [
'MESSAGE',
'RECEIPT',
'ERROR',
'CONNECTED',
],
];
public function send($message, $queueName, $async = false)
$this->connect();
$params = [
'destination' => $queueName,
'content-type' => 'text/plain',
'content-length' => mb_strlen($message . static::EOL),
];
if (isset($this->transactionStack) && !$this->transactionStack->isEmpty())
$params['transaction'] = $this->transactionStack->top();
if ($async === false)
$receiptId = $this->generateRandom();
$params['receipt'] = $receiptId;
$this->sendFrame('send', $params, $message);
if ($async === false) $this->checkReceipt($receiptId, 'send');
public function subscribe($queueName, $async = false, $autoAck = false)
$this->connect();
if ($this->subscription === true)
throw new StomplerException('Another subscription has already been started');
$this->subscription = true;
$params = [
'id' => 1,
'destination' => $queueName,
'ack' => ($autoAck === true ? 'auto' : 'client'),
];
if ($async === false)
$receiptId = $this->generateRandom();
$params['receipt'] = $receiptId;
$this->sendFrame('subscribe', $params);
if ($async === false) $this->checkReceipt($receiptId, 'subscribe');
public function unsubscribe()
if ($this->subscription === false)
throw new StomplerException('No subscription');
$this->subscription = false;
$this->sendFrame('unsubscribe', [
'id' => 1,
]);
public function connect()
if (!empty($this->connection)) return;
$config = [...];
$this->socket = fsockopen('tcp://' . $config['host'], $config['port']);
if (empty($this->socket)) throw new StomplerConnectionException;
stream_set_timeout($this->socket, 2);
$this->sendFrame('connect', [
'accept-version' => '1.2',
'login' => $config['login'],
'passcode' => $config['password'],
'virtual-host' => 'srv',
'host' => 'srv',
]);
$frame = $this->readFrame();
if ($frame['name'] === 'ERROR')
throw new StomplerConnectionException("Could not connect to broker: '$frame['headers']['message']' ($frame['body'])");
if ($frame['name'] !== 'CONNECTED')
throw new StomplerConnectionException;
$this->connection = $frame['headers']['session'];
public function ack($message, $async = false)
$id = is_array($message) ? $message['headers']['ack'] : $message;
$params = [
'id' => $id,
];
if (isset($this->transactionStack) && !$this->transactionStack->isEmpty())
$params['transaction'] = $this->transactionStack->top();
if ($async === false)
$receiptId = $this->generateRandom();
$params['receipt'] = $receiptId;
$this->sendFrame('ack', $params);
if ($async === false) $this->checkReceipt($receiptId, 'ack');
public function nack($message, $async = false)
$id = is_array($message) ? $message['headers']['ack'] : $message;
$params = [
'id' => $id,
];
if (isset($this->transactionStack) && !$this->transactionStack->isEmpty())
$params['transaction'] = $this->transactionStack->top();
if ($async === false)
$receiptId = $this->generateRandom();
$params['receipt'] = $receiptId;
$this->sendFrame('nack', $params);
if ($async === false) $this->checkReceipt($receiptId, 'nack');
public function begin($async = false)
$this->connect();
if ($this->transactionStack === null)
$this->transactionStack = new \SplStack();
$this->transactionStack->unshift($this->generateRandom());
$params = ['transaction' => $this->transactionStack->top()];
if ($async === false)
$receiptId = $this->generateRandom();
$params['receipt'] = $receiptId;
$this->sendFrame('begin', $params);
if ($async === false) $this->checkReceipt($receiptId, 'begin');
public function commit()
if (empty($this->transactionStack) || $this->transactionStack->isEmpty())
throw new StomplerException('No transaction started');
$this->sendFrame('commit', ['transaction' => $this->transactionStack->pop()]);
public function abort($transactionId)
if (empty($this->transactionStack) || $this->transactionStack->isEmpty())
throw new StomplerException('No transaction started');
$this->sendFrame('abort', ['transaction' => $this->transactionStack->pop()]);
public function readFrame($expectedFrameName = null)
$started = time();
$frame = [
'name' => null,
'headers' => [],
'body' => null,
];
$state = false;
$frameName = false;
//echo '-------RECV--------' . PHP_EOL;
while (true)
if (feof($this->socket) || ((time() - $started) > 1)) return false;
$frameLine = fgets($this->socket);
echo $frameLine;
if ($state === static::STATE_HEADER)
$header = rtrim($frameLine, static::EOL);
if (!empty($header))
list($k, $v) = explode(':', $header);
$frame['headers'][$k] = $v;
if ($state === static::STATE_BODY) $frame['body'] .= $frameLine;
if ($state === false)
$frameName = $frame['name'] = rtrim($frameLine, static::EOL);
if (!in_array($frameName, $this->possibleFrameTypes['server']))
if (empty($frameName)) return false;
throw new StomplerUnknownFrameException($frameName);
if ($expectedFrameName !== null && $frameName !== mb_strtoupper($expectedFrameName))
throw new StomplerUnexpectedFrameException($frameName);
$state = static::STATE_HEADER;
elseif ($state === static::STATE_HEADER && $frameLine === static::EOL)
$state = static::STATE_BODY;
elseif ($state === static::STATE_BODY && $this->detectNullOctet($frameLine))
break;
//echo '-------RECV--------' . PHP_EOL;
if ($frame['body'] !== null) $frame['body'] = rtrim($frame['body'], static::EOL . static::NULL_OCTET);
if ($frame['name'] === null) return false;
return $frame;
private function sendFrame($frameName, $frameParams, $body = null)
$frame = $this->compileFrame($frameName, $frameParams, $body);
//echo '=======SEND========' . PHP_EOL;
echo $frame;
//echo '=======SEND========' . PHP_EOL;
$result = fwrite($this->socket, $frame);
if (empty($result))
$md = stream_get_meta_data($this->socket);
if($md['timed_out']) throw new StomplerTimeoutConnectionException;
throw new StomplerUnknownConnectionException;
private function compileFrame($name, $headers, $body = null)
$result = mb_strtoupper($name) . static::EOL;
foreach ($headers as $key => $value)
$result .= $key;
if ($value !== false) $result .= ':' . $value;
$result .= static::EOL;
if ($body) $result .= static::EOL . $body;
$result .= static::EOL . static::NULL_OCTET;
return $result;
private function detectNullOctet($string)
return strpos($string, static::NULL_OCTET) === (mb_strlen($string) - 2);
private function checkReceipt($receiptId, $frameName)
$frameName = mb_strtoupper($frameName);
try
$receiptFrame = $this->readFrame('RECEIPT');
if ($receiptFrame['headers']['receipt-id'] != $receiptId)
throw new StomplerException("Wrong receipt for $frameName frame (expected $receiptFrame, received $receiptFrame['headers']['receipt-id'])");
catch (StomplerUnexpectedFrameException $e)
throw new StomplerException("Could not receive receipt frame for $frameName frame (received $e->getMessage() frame)");
private function generateRandom()
return md5(uniqid('', true));
public function __destruct()
if (empty($this->socket)) return;
$this->connection = null;
fclose($this->socket);
【问题讨论】:
请用英文回答。我不太确定它是关于 PHP 的,更多的是关于 STOMP 本身。再说一遍:完全没有区别,只是多了两个帧(开始和提交/中止)和帧中的事务标头。 开个玩笑,抱歉。基本上你的PHP在哪里?这被标记为 PHP 问题,所以我希望看到一些代码。 你去吧,但我仍然对此持怀疑态度。我认为这比 PHP 更像是 STOMP 协议问题。 好吧,让我们看看我们能做些什么。 好的,看起来 readFrame 中的这段代码永远不会在无限循环中执行,对吗? elseif ($state === static::STATE_BODY && $this->detectNullOctet($frameLine)) break; 【参考方案1】:解决了。如果您开始交易,则不需要收据标头,因此必须修复官方 STOMP 文档(可能是与 Apollo 相关的问题)。
【讨论】:
【参考方案2】:您确定要发送 stomp COMMIT 帧吗?在相关事务提交之前,接收方不会得到事务处理的 SEND 帧。
【讨论】:
以上是关于如果它在事务中,则发送帧失败 (STOMP)的主要内容,如果未能解决你的问题,请参考以下文章
使用 Scala 向 ApolloMQ 发送 Stomp CONNECT 帧
引入sleuth依赖导致websocket 发送 STOMP 消息失败
引入sleuth依赖导致websocket 发送 STOMP 消息失败