hyperf实现简单的rpc服务(win10 + docker+consul)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hyperf实现简单的rpc服务(win10 + docker+consul)相关的知识,希望对你有一定的参考价值。
参考技术A 服务端配置完成启动服务端,服务将会自动推送到consul
启动客户端(消费者)
在消费者端编写测试文件
访问接口 http://127.0.0.1:9601/index/rpc
返回结果
PHP Hyperf连接Nacos RPC调用并发内存泄露(协程安全问题)
PHP单例协程安全问题
想通过Hyperf +nacos 搭建微服务项目,在测试途中发现框架在使用nacos时会出现内存泄露问题…随着接口调用并发提高很可能把服务器搞死,这里分析一下具体原因
hyperf+nacos 微服务搭建的流程可以看我上个文章
PHP微服务 hyperf+nacos使用
一开始我以为是我使用不当才会造成内存泄露,后面测试按照官方使用方法也可能会内存泄露;
Rpc调用方法
public function rpc_test()
$cus= ApplicationContext::getContainer()->get(DefaultConsumer::class);
$d= $cus->get("add",['a'=>1,'b'=>2]);
return [
'本机ip' => get_server_ip(),
'message' => "微服务调用结果=$d",
];
调用类
<?php
/**
* Created by PhpStorm.
* User: 05
* Date: 2021/11/25
* Time: 15:00
*/
namespace App\\JsonRpc;
use Hyperf\\Rpc\\Protocol;
use Hyperf\\Rpc\\ProtocolManager;
use Hyperf\\RpcClient\\AbstractServiceClient;
use Hyperf\\RpcClient\\Client;
use Hyperf\\Utils\\ApplicationContext;
use Psr\\Container\\ContainerInterface;
use function Swoole\\Coroutine\\Http\\get;
class DefaultConsumer extends AbstractServiceClient
/**
* 定义对应服务提供者的服务名称
* @var string
*/
protected $serviceName = 'DefaultService';
/**
* 定义对应服务提供者的服务协议
* @var string
*/
protected $protocol = 'jsonrpc-http';
public function get(string $method, array $parms)
return $this->__request($method, ['pms'=>$parms]);
简单分享一下调用流程
1:ApplicationContext::getContainer()->get(DefaultConsumer::class);
通过容器实例化一个消费者类,容器会保存已有实例,没有则实例化一个保存起来(单例模式,这里不安全)
2:实例化 AbstractServiceClient
public function __construct(ContainerInterface $container)
$this->container = $container;
$this->loadBalancerManager = $container->get(LoadBalancerManager::class);
$protocol = new Protocol($container, $container->get(ProtocolManager::class), $this->protocol, $this->getOptions());
$loadBalancer = $this->createLoadBalancer(...$this->createNodes());
$transporter = $protocol->getTransporter()->setLoadBalancer($loadBalancer);
$this->client = make(Client::class)
->setPacker($protocol->getPacker())
->setTransporter($transporter);
$this->idGenerator = $this->getIdGenerator();
$this->pathGenerator = $protocol->getPathGenerator();
$this->dataFormatter = $protocol->getDataFormatter();
这里会做如下的事
a:创建 LoadBalancer,根据serviceName 去配置里找对应的server,并获取节点配置(这也是serviceName 不可改的原因,自己new之后再set已经没有意义了)
b:这里当服务节点Node配置了服务中心时创建node 会主动访问服务中心获取节点信息,生成网络请求Cline
c:通过网络请求发起Rpc调用
这里的问题就在:
创建node会时主动访问服务中心获取节点信息会造成协程切换,并且需要一定IO时间
//获取节点信息
public function getNodes(string $uri, string $name, array $metadata): array
if (!empty(self::$last_node[$name]['data'])&&time()-self::$last_node[$name]['time']<5)
return self::$last_node[$name]['data'];
self::$last_node[$name]=['data'=>null,'time'=>time()];
$response = $this->client->instance->list($name, [
'groupName' => $this->config->get('services.drivers.nacos.group_name'),
'namespaceId' => $this->config->get('services.drivers.nacos.namespace_id'),
]);
if ($response->getStatusCode() !== 200)
throw new RequestException((string) $response->getBody(), $response->getStatusCode());
$data = Json::decode((string) $response->getBody());
$hosts = $data['hosts'] ?? [];
$nodes = [];
foreach ($hosts as $node)
if (isset($node['ip'], $node['port']) && ($node['healthy'] ?? false))
$nodes[] = [
'host' => $node['ip'],
'port' => $node['port'],
'weight' => $this->getWeight($node['weight'] ?? 1),
];
$cacheNode=['data'=>$nodes,'time'=>time()];
self::$last_node[$name]=$cacheNode;
return $nodes;
3:创建负载均衡器会主动定时刷新节点,代码如下
//AbstractLoadBalancer 类里
public function refresh(callable $callback, int $tickMs = 5000)
$timerId = Timer::tick($tickMs, function () use ($callback)
$nodes = call($callback);
//这log是我打的
//Log::get("dev_05")->info("refresh 测试");
is_array($nodes) && $this->setNodes($nodes);
);
Coroutine::create(function () use ($timerId)
CoordinatorManager::until(Constants::WORKER_EXIT)->yield();
Timer::clear($timerId);
);
这代码也可能会有问题,这会导致整个节点实例跟负载均衡器等无法被系统回收,泄露的对象永远无法回收;
图:并发测试后明显的内存泄露,并且同一个服务会同时多并发刷新nocs中心,且刷新不随接口并发结束而结束
访问流程log
//主worker进程 最前面的数字代表进程号 in_rpc_test_time:表示刚进入接口 rpc_get_node_befor:表示实例化消费者客户端时联网获取节点配置前 rpc_get_node_after:代表获取节点配置成功 get_cus:代表实例化消费者客户端完成
[110456 in_rpc_test_time=1662004837.715] [110456 rpc_get_node_befor] [110456 rpc_get_node_after] [110456 get_cus_time=1662004837.8963]
[110456 in_rpc_test_time=1662004837.9779] [110456 in_rpc_test_time=1662004837.9787] [110456 in_rpc_test_time=1662004837.9808]
[110456 in_rpc_test_time=1662004837.9814] [110456 get_cus_time=1662004837.9819]
[110456 in_rpc_test_time=1662004837.9831] [110456 get_cus_time=1662004837.9837]
[110456 get_cus_time=1662004837.9851] [110456 get_cus_time=1662004837.9853]
[110456 get_cus_time=1662004837.9855] [110456 in_rpc_test_time=1662004838.057]
[110456 in_rpc_test_time=1662004838.0629] [110456 in_rpc_test_time=1662004838.0699] [110456 in_rpc_test_time=1662004838.0708]
[110456 get_cus_time=1662004838.0712] [110456 in_rpc_test_time=1662004838.0769] [110456 get_cus_time=1662004838.0777]
[110456 get_cus_time=1662004838.0779] [110456 get_cus_time=1662004838.0882] [110456 get_cus_time=1662004838.0885]
[110456 in_rpc_test_time=1662004838.1327] [110456 in_rpc_test_time=1662004838.146] [110456 in_rpc_test_time=1662004838.1484]
[110456 in_rpc_test_time=1662004838.1579] [110456 get_cus_time=1662004838.1587] [110456 get_cus_time=1662004838.1589]
[110456 get_cus_time=1662004838.1591] [110456 get_cus_time=1662004838.1753]
//其他worker进程
[110455 in_rpc_test_time=1662004837.9808] [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9845] [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9855] [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9871] [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9855] [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9871] [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9883] [110455 rpc_get_node_befor]
[110455 rpc_get_node_after] [110455 rpc_get_node_after]
[110455 get_cus_time=1662004838.2421] [110455 get_cus_time=1662004838.2423]
[110455 rpc_get_node_after] [110455 get_cus_time=1662004838.2973]
[110455 rpc_get_node_after] [110455 rpc_get_node_after]
[110455 get_cus_time=1662004838.3056] [110455 get_cus_time=1662004838.3064]
这里有几点非常奇怪
1:在主worker进程里,这个初始化必定是同步的,看主进程第一行log,所以当worker 进程设置为1 时并不会出现这个协程安全问题,这个哪怕在里面co::sleep 也能保证是同步的…
2:通过网络getNode在非主进程下肯定是导致协程切换了, (看其他进程log前面几行)
3:就算不联网获取node,单 ApplicationContext::getContainer()->get(DefaultConsumer::class) 也会导致协程切换(看主进程第二行log,这里是已经实例化DefaultConsumer了,并且打container->get里打log 也是走到静态return但返回后就会导致协程切换);后面通过去除联网获取nodel,一样也是无法保证单例的情况,说明不单是联网获取Node导致协程切换了
泄露前提:
A:进程内还没有实例化过 AbstractServiceClient
B:Rpc 调用瞬时出现并发
这种情况只在压测时比较常见
解决方案
1:单worker进程,保证初始化时是同步的,但这个对高并发服务是不可接受的
2:生成DefaultConsumer 其他具体服务类继承DefaultConsumer,在DefaultConsumer里添加等待父方法,如下
<?php
/**
* Created by PhpStorm.
* User: 05
* Date: 2021/11/25
* Time: 15:00
*/
namespace App\\JsonRpc;
use Hyperf\\Rpc\\Protocol;
use Hyperf\\Rpc\\ProtocolManager;
use Hyperf\\RpcClient\\AbstractServiceClient;
use Hyperf\\RpcClient\\Client;
use Hyperf\\Utils\\ApplicationContext;
use Psr\\Container\\ContainerInterface;
use function Swoole\\Coroutine\\Http\\get;
class DefaultConsumer extends AbstractServiceClient
/**
* 定义对应服务提供者的服务名称
* @var string
*/
protected $serviceName = 'DefaultService';
/**
* 定义对应服务提供者的服务协议
* @var string
*/
protected $protocol = 'jsonrpc-http';
protected static $is_initing=0;
public function __construct(ContainerInterface $container)
self::$is_initing=1;
parent::__construct($container);
self::$is_initing=2;
/**
* @return bool
* 保证单例同步
*/
public static function waitInit()
while(self::$is_initing===1)
//睡眠1ms 等待第一个实例化
time_nanosleep(0,1000000);
return true;
public function get(string $method, array $parms)
return $this->__request($method, ['pms'=>$parms]);
CalculatorServiceConsumer 只需继承 DefaultConsumer 代码如下
<?php
/**
* Created by PhpStorm.
* User: 05
* Date: 2021/11/25
* Time: 15:00
*/
namespace App\\JsonRpc;
class CalculatorServiceConsumer extends DefaultConsumer
/**
* 定义对应服务提供者的服务名称
* @var string
*/
protected $serviceName = 'CaculatorService';
调用服务时先判断是否正在实例化,保证单例协程安全即可
public function rpc_test()
//判断是否有对象正在实例化中,保证单例协程安全
if (CalculatorServiceConsumer::waitInit())
$cus= ApplicationContext::getContainer()->get(CalculatorServiceConsumer::class);
$d= $cus->get("add",['a'=>1,'b'=>2]);
;
if (is_array($d))
$d=json_encode($d);
return [
'本机ip' => get_server_ip(),
'message' => "微服务调用结果=".$d,
];
最后结果:启用服务(2个worker进程)后 ab并发100测试,5s后的刷新日志,每个进程保证一个实例,问题解决
总结:
Swoole 框架无疑大大提高了PHP服务的并发能力,但是由于协程的加入,并且在swoole底层对一些函数的hook,让很多本来是同步的代码变成协程异步了,导致很多逻辑与原始PHP的逻辑有出入,大大提高了PHP的使用门槛,相信没有几个phper 会关心单例安全这个事, 哪怕像hyperf这样比较流行的swoole框架也不能完全避免,暂时来看,非必要还是PHP+FPM会比较稳妥
以上是关于hyperf实现简单的rpc服务(win10 + docker+consul)的主要内容,如果未能解决你的问题,请参考以下文章
基于Hyperf实现RabbitMQ+WebSocket消息推送