RabbitMq初探——用队列实现RPC
Posted 王大西
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq初探——用队列实现RPC相关的知识,希望对你有一定的参考价值。
rabbitmq构造rpc
前言
rpc——remote procedure call 远程调用。在我接触的使用过http协议、thrift框架来实现远程调用。其实消息队列rabbitmq也可以实现。
原理
我们称调用远程服务者为Client,远程服务提供者为Server。
Client充当生产者,将请求发送到rabbitmq队列中,Server作为消费者,处理Client请求产生结果数据result,此刻Server作为生产者,将result
通过rabbitmq队列传递到Client,Client作为结果数据的消费者,得到result。
代码
rpc_client.php
<?php /** * Created by PhpStorm. * User: 王大西 * Date: 2017/10/23 * Time: 16:36 */ require_once __DIR__ . \'/vendor/autoload.php\'; use PhpAmqpLib\\Connection\\AMQPStreamConnection; use PhpAmqpLib\\Message\\AMQPMessage; class RpcClient { private $connection = null; private $channel = null; private $callbackQueue = null; private $response = null; private $corrId = null; public function __construct() { $this->connection = new AMQPStreamConnection(\'127.0.0.1\', 5672, \'guest\', \'guest\'); $this->channel = $this->connection->channel(); list($this->callbackQueue, ,) = $this->channel->queue_declare("", false, false, true, false); $this->channel->basic_consume($this->callbackQueue, \'\', false, false, false, false, array($this, \'onResponse\')); } public function onResponse($rep) { if ($rep->get(\'correlation_id\') == $this->corrId) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corrId = uniqid(); $msg = new AMQPMessage((string) $n, array( \'correlation_id\' => $this->corrId, \'reply_to\' => $this->callbackQueue )); $this->channel->basic_publish($msg, \'\', \'rpc_queue1\'); while (!$this->response) { $this->channel->wait(); } return intval($this->response); } } $number = isset($argv[1]) ? $argv[1] : 30; $objRpcClient = new RpcClient(); $response = $objRpcClient->call($number); echo " RPC result $response\\n";
rpc_server.php
<?php /** * rpc server * Created by PhpStorm. * User: 王大西 * Date: 2017/10/23 * Time: 16:36 */ require_once __DIR__ . \'/vendor/autoload.php\'; use PhpAmqpLib\\Connection\\AMQPStreamConnection; use PhpAmqpLib\\Message\\AMQPMessage; $connection = new AMQPStreamConnection(\'127.0.0.1\', 5672, \'guest\', \'guest\'); $channel = $connection->channel(); $channel->queue_declare(\'rpc_queue1\', false, false, false, false); function fib($n){ if ($n == 0) { return 0; } if ($n == 1) { return 1; } return fib($n-1) + fib($n-2); } echo " [x] Awaiting RPC requests\\n"; $callback = function($req){ $n = intval($req->body); //todo $n empty return echo " [.] fib(", $n, ")\\n"; $msg = new AMQPMessage((string) fib($n), array(\'correlation_id\' => $req->get("correlation_id")) ); $req->delivery_info[\'channel\']->basic_publish($msg, \'\', $req->get(\'reply_to\')); $req->delivery_info[\'channel\']->basic_ack($req->delivery_info[\'delivery_tag\']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume(\'rpc_queue1\', \'\', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
测试
server
client
以上是关于RabbitMq初探——用队列实现RPC的主要内容,如果未能解决你的问题,请参考以下文章