hyperf实现简单的rpc服务(win10 + docker+consul)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hyperf实现简单的rpc服务(win10 + docker+consul)相关的知识,希望对你有一定的参考价值。

参考技术A 服务端配置完成

启动服务端,服务将会自动推送到consul

启动客户端(消费者)

在消费者端编写测试文件

访问接口 http://127.0.0.1:9601/index/rpc
返回结果

PHP Hyperf连接Nacos RPC调用并发内存泄露(协程安全问题)

PHP单例协程安全问题

想通过Hyperf +nacos 搭建微服务项目,在测试途中发现框架在使用nacos时会出现内存泄露问题…随着接口调用并发提高很可能把服务器搞死,这里分析一下具体原因
hyperf+nacos 微服务搭建的流程可以看我上个文章
PHP微服务 hyperf+nacos使用
一开始我以为是我使用不当才会造成内存泄露,后面测试按照官方使用方法也可能会内存泄露;

Rpc调用方法


public function rpc_test()
         $cus=   ApplicationContext::getContainer()->get(DefaultConsumer::class);
         $d= $cus->get("add",['a'=>1,'b'=>2]);
        return [
            '本机ip' => get_server_ip(),
            'message' => "微服务调用结果=$d",
        ];
    

调用类

<?php
/**
 * Created by PhpStorm.
 * User: 05
 * Date: 2021/11/25
 * Time: 15:00
 */

namespace App\\JsonRpc;


use Hyperf\\Rpc\\Protocol;
use Hyperf\\Rpc\\ProtocolManager;
use Hyperf\\RpcClient\\AbstractServiceClient;
use Hyperf\\RpcClient\\Client;
use Hyperf\\Utils\\ApplicationContext;
use Psr\\Container\\ContainerInterface;
use function Swoole\\Coroutine\\Http\\get;

class DefaultConsumer extends AbstractServiceClient


    /**
     * 定义对应服务提供者的服务名称
     * @var string
     */
    protected $serviceName = 'DefaultService';


    /**
     * 定义对应服务提供者的服务协议
     * @var string
     */
    protected $protocol = 'jsonrpc-http';



    public function get(string $method, array $parms)
    
        return $this->__request($method, ['pms'=>$parms]);
    



简单分享一下调用流程
1:ApplicationContext::getContainer()->get(DefaultConsumer::class);
通过容器实例化一个消费者类,容器会保存已有实例,没有则实例化一个保存起来(单例模式,这里不安全)

2:实例化 AbstractServiceClient

   public function __construct(ContainerInterface $container)
    
        $this->container = $container;
        $this->loadBalancerManager = $container->get(LoadBalancerManager::class);
        $protocol = new Protocol($container, $container->get(ProtocolManager::class), $this->protocol, $this->getOptions());
        $loadBalancer = $this->createLoadBalancer(...$this->createNodes());
        $transporter = $protocol->getTransporter()->setLoadBalancer($loadBalancer);
        $this->client = make(Client::class)
            ->setPacker($protocol->getPacker())
            ->setTransporter($transporter);
        $this->idGenerator = $this->getIdGenerator();
        $this->pathGenerator = $protocol->getPathGenerator();
        $this->dataFormatter = $protocol->getDataFormatter();
    

这里会做如下的事
a:创建 LoadBalancer,根据serviceName 去配置里找对应的server,并获取节点配置(这也是serviceName 不可改的原因,自己new之后再set已经没有意义了)
b:这里当服务节点Node配置了服务中心时创建node 会主动访问服务中心获取节点信息,生成网络请求Cline
c:通过网络请求发起Rpc调用
这里的问题就在:
创建node会时主动访问服务中心获取节点信息会造成协程切换,并且需要一定IO时间

//获取节点信息
 public function getNodes(string $uri, string $name, array $metadata): array
    
        if (!empty(self::$last_node[$name]['data'])&&time()-self::$last_node[$name]['time']<5)
            return  self::$last_node[$name]['data'];
        
        self::$last_node[$name]=['data'=>null,'time'=>time()];
        $response = $this->client->instance->list($name, [
            'groupName' => $this->config->get('services.drivers.nacos.group_name'),
            'namespaceId' => $this->config->get('services.drivers.nacos.namespace_id'),
        ]);

        if ($response->getStatusCode() !== 200) 
            throw new RequestException((string) $response->getBody(), $response->getStatusCode());
        

        $data = Json::decode((string) $response->getBody());
        $hosts = $data['hosts'] ?? [];
        $nodes = [];
        foreach ($hosts as $node) 
            if (isset($node['ip'], $node['port']) && ($node['healthy'] ?? false)) 
                $nodes[] = [
                    'host' => $node['ip'],
                    'port' => $node['port'],
                    'weight' => $this->getWeight($node['weight'] ?? 1),
                ];
            
        
        $cacheNode=['data'=>$nodes,'time'=>time()];
        self::$last_node[$name]=$cacheNode;
        return $nodes;
    

3:创建负载均衡器会主动定时刷新节点,代码如下

//AbstractLoadBalancer 类里
  public function refresh(callable $callback, int $tickMs = 5000)
    
        $timerId = Timer::tick($tickMs, function () use ($callback) 
            $nodes = call($callback);
            //这log是我打的
           //Log::get("dev_05")->info("refresh 测试");
            is_array($nodes) && $this->setNodes($nodes);
        );
        Coroutine::create(function () use ($timerId) 
            CoordinatorManager::until(Constants::WORKER_EXIT)->yield();
            Timer::clear($timerId);
        );
    

这代码也可能会有问题,这会导致整个节点实例跟负载均衡器等无法被系统回收,泄露的对象永远无法回收;

图:并发测试后明显的内存泄露,并且同一个服务会同时多并发刷新nocs中心,且刷新不随接口并发结束而结束

访问流程log

//主worker进程 最前面的数字代表进程号 in_rpc_test_time:表示刚进入接口   rpc_get_node_befor:表示实例化消费者客户端时联网获取节点配置前   rpc_get_node_after:代表获取节点配置成功  get_cus:代表实例化消费者客户端完成
[110456 in_rpc_test_time=1662004837.715]  [110456 rpc_get_node_befor]  [110456 rpc_get_node_after]  [110456 get_cus_time=1662004837.8963]  
[110456 in_rpc_test_time=1662004837.9779]  [110456 in_rpc_test_time=1662004837.9787]  [110456 in_rpc_test_time=1662004837.9808]  
[110456 in_rpc_test_time=1662004837.9814]  [110456 get_cus_time=1662004837.9819]  
[110456 in_rpc_test_time=1662004837.9831]  [110456 get_cus_time=1662004837.9837]  
[110456 get_cus_time=1662004837.9851]  [110456 get_cus_time=1662004837.9853]  
[110456 get_cus_time=1662004837.9855]  [110456 in_rpc_test_time=1662004838.057]  
[110456 in_rpc_test_time=1662004838.0629]  [110456 in_rpc_test_time=1662004838.0699]  [110456 in_rpc_test_time=1662004838.0708]  
[110456 get_cus_time=1662004838.0712]  [110456 in_rpc_test_time=1662004838.0769]  [110456 get_cus_time=1662004838.0777]  
[110456 get_cus_time=1662004838.0779]  [110456 get_cus_time=1662004838.0882]  [110456 get_cus_time=1662004838.0885]  
[110456 in_rpc_test_time=1662004838.1327]  [110456 in_rpc_test_time=1662004838.146]  [110456 in_rpc_test_time=1662004838.1484]  
[110456 in_rpc_test_time=1662004838.1579]  [110456 get_cus_time=1662004838.1587]  [110456 get_cus_time=1662004838.1589]  
[110456 get_cus_time=1662004838.1591]  [110456 get_cus_time=1662004838.1753]    

//其他worker进程
[110455 in_rpc_test_time=1662004837.9808]  [110455 rpc_get_node_befor] 
[110455 in_rpc_test_time=1662004837.9845]  [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9855]  [110455 rpc_get_node_befor]  
[110455 in_rpc_test_time=1662004837.9871]  [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9855]  [110455 rpc_get_node_befor]  
[110455 in_rpc_test_time=1662004837.9871]  [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9883]  [110455 rpc_get_node_befor]
[110455 rpc_get_node_after]  [110455 rpc_get_node_after]
[110455 get_cus_time=1662004838.2421]  [110455 get_cus_time=1662004838.2423]  
[110455 rpc_get_node_after]  [110455 get_cus_time=1662004838.2973]  
[110455 rpc_get_node_after]  [110455 rpc_get_node_after]  
[110455 get_cus_time=1662004838.3056]  [110455 get_cus_time=1662004838.3064]  

这里有几点非常奇怪
1:在主worker进程里,这个初始化必定是同步的,看主进程第一行log,所以当worker 进程设置为1 时并不会出现这个协程安全问题,这个哪怕在里面co::sleep 也能保证是同步的…

2:通过网络getNode在非主进程下肯定是导致协程切换了, (看其他进程log前面几行)

3:就算不联网获取node,单 ApplicationContext::getContainer()->get(DefaultConsumer::class) 也会导致协程切换(看主进程第二行log,这里是已经实例化DefaultConsumer了,并且打container->get里打log 也是走到静态return但返回后就会导致协程切换);后面通过去除联网获取nodel,一样也是无法保证单例的情况,说明不单是联网获取Node导致协程切换了

泄露前提:
A:进程内还没有实例化过 AbstractServiceClient
B:Rpc 调用瞬时出现并发
这种情况只在压测时比较常见

解决方案

1:单worker进程,保证初始化时是同步的,但这个对高并发服务是不可接受的

2:生成DefaultConsumer 其他具体服务类继承DefaultConsumer,在DefaultConsumer里添加等待父方法,如下

<?php
/**
 * Created by PhpStorm.
 * User: 05
 * Date: 2021/11/25
 * Time: 15:00
 */

namespace App\\JsonRpc;


use Hyperf\\Rpc\\Protocol;
use Hyperf\\Rpc\\ProtocolManager;
use Hyperf\\RpcClient\\AbstractServiceClient;
use Hyperf\\RpcClient\\Client;
use Hyperf\\Utils\\ApplicationContext;
use Psr\\Container\\ContainerInterface;
use function Swoole\\Coroutine\\Http\\get;

class DefaultConsumer extends AbstractServiceClient


    /**
     * 定义对应服务提供者的服务名称
     * @var string
     */
    protected $serviceName = 'DefaultService';


    /**
     * 定义对应服务提供者的服务协议
     * @var string
     */
    protected $protocol = 'jsonrpc-http';

    protected static $is_initing=0;


    public function __construct(ContainerInterface $container)
    
        self::$is_initing=1;
        parent::__construct($container);
        self::$is_initing=2;
    

    /**
     * @return bool
     * 保证单例同步
     */
    public static function waitInit()
        while(self::$is_initing===1)
            //睡眠1ms 等待第一个实例化
            time_nanosleep(0,1000000);
        
        return true;
    


    public function get(string $method, array $parms)
    
        return $this->__request($method, ['pms'=>$parms]);
    
    

CalculatorServiceConsumer 只需继承 DefaultConsumer 代码如下

<?php
/**
 * Created by PhpStorm.
 * User: 05
 * Date: 2021/11/25
 * Time: 15:00
 */

namespace App\\JsonRpc;

class CalculatorServiceConsumer extends DefaultConsumer


    /**
     * 定义对应服务提供者的服务名称
     * @var string
     */
    protected $serviceName = 'CaculatorService';



调用服务时先判断是否正在实例化,保证单例协程安全即可

 public function rpc_test()
		//判断是否有对象正在实例化中,保证单例协程安全
        if (CalculatorServiceConsumer::waitInit())
            $cus=   ApplicationContext::getContainer()->get(CalculatorServiceConsumer::class);
            $d= $cus->get("add",['a'=>1,'b'=>2]);
        ;
        if (is_array($d))
             $d=json_encode($d);
         
        return [
            '本机ip' => get_server_ip(),
            'message' => "微服务调用结果=".$d,
        ];
    

最后结果:启用服务(2个worker进程)后 ab并发100测试,5s后的刷新日志,每个进程保证一个实例,问题解决

总结:

Swoole 框架无疑大大提高了PHP服务的并发能力,但是由于协程的加入,并且在swoole底层对一些函数的hook,让很多本来是同步的代码变成协程异步了,导致很多逻辑与原始PHP的逻辑有出入,大大提高了PHP的使用门槛,相信没有几个phper 会关心单例安全这个事, 哪怕像hyperf这样比较流行的swoole框架也不能完全避免,暂时来看,非必要还是PHP+FPM会比较稳妥

以上是关于hyperf实现简单的rpc服务(win10 + docker+consul)的主要内容,如果未能解决你的问题,请参考以下文章

Hyperf jsonrpc 服务的搭建

Swoole系列6.3Hyperf 运行各种网络服务

Swoole系列6.3Hyperf 运行各种网络服务

基于Hyperf实现RabbitMQ+WebSocket消息推送

Hyperf+RabbitMQ+WebSocket实现大屏幕消息推送

基于Redis的分布式锁算法RedLock及RedLock-Hyperf实现