PHP Hyperf连接Nacos RPC调用并发内存泄露(协程安全问题)

Posted qq_540616979

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP Hyperf连接Nacos RPC调用并发内存泄露(协程安全问题)相关的知识,希望对你有一定的参考价值。

php单例协程安全问题

想通过Hyperf +nacos 搭建微服务项目,在测试途中发现框架在使用nacos时会出现内存泄露问题…随着接口调用并发提高很可能把服务器搞死,这里分析一下具体原因
hyperf+nacos 微服务搭建的流程可以看我上个文章
PHP微服务 hyperf+nacos使用
一开始我以为是我使用不当才会造成内存泄露,后面测试按照官方使用方法也可能会内存泄露;

Rpc调用方法


public function rpc_test()
         $cus=   ApplicationContext::getContainer()->get(DefaultConsumer::class);
         $d= $cus->get("add",['a'=>1,'b'=>2]);
        return [
            '本机ip' => get_server_ip(),
            'message' => "微服务调用结果=$d",
        ];
    

调用类

<?php
/**
 * Created by PhpStorm.
 * User: 05
 * Date: 2021/11/25
 * Time: 15:00
 */

namespace App\\JsonRpc;


use Hyperf\\Rpc\\Protocol;
use Hyperf\\Rpc\\ProtocolManager;
use Hyperf\\RpcClient\\AbstractServiceClient;
use Hyperf\\RpcClient\\Client;
use Hyperf\\Utils\\ApplicationContext;
use Psr\\Container\\ContainerInterface;
use function Swoole\\Coroutine\\Http\\get;

class DefaultConsumer extends AbstractServiceClient


    /**
     * 定义对应服务提供者的服务名称
     * @var string
     */
    protected $serviceName = 'DefaultService';


    /**
     * 定义对应服务提供者的服务协议
     * @var string
     */
    protected $protocol = 'jsonrpc-http';



    public function get(string $method, array $parms)
    
        return $this->__request($method, ['pms'=>$parms]);
    



简单分享一下调用流程
1:ApplicationContext::getContainer()->get(DefaultConsumer::class);
通过容器实例化一个消费者类,容器会保存已有实例,没有则实例化一个保存起来(单例模式,这里不安全)

2:实例化 AbstractServiceClient

   public function __construct(ContainerInterface $container)
    
        $this->container = $container;
        $this->loadBalancerManager = $container->get(LoadBalancerManager::class);
        $protocol = new Protocol($container, $container->get(ProtocolManager::class), $this->protocol, $this->getOptions());
        $loadBalancer = $this->createLoadBalancer(...$this->createNodes());
        $transporter = $protocol->getTransporter()->setLoadBalancer($loadBalancer);
        $this->client = make(Client::class)
            ->setPacker($protocol->getPacker())
            ->setTransporter($transporter);
        $this->idGenerator = $this->getIdGenerator();
        $this->pathGenerator = $protocol->getPathGenerator();
        $this->dataFormatter = $protocol->getDataFormatter();
    

这里会做如下的事
a:创建 LoadBalancer,根据serviceName 去配置里找对应的server,并获取节点配置(这也是serviceName 不可改的原因,自己new之后再set已经没有意义了)
b:这里当服务节点Node配置了服务中心时创建node 会主动访问服务中心获取节点信息,生成网络请求Cline
c:通过网络请求发起Rpc调用
这里的问题就在:
创建node会时主动访问服务中心获取节点信息会造成协程切换,并且需要一定IO时间

//获取节点信息
 public function getNodes(string $uri, string $name, array $metadata): array
    
        if (!empty(self::$last_node[$name]['data'])&&time()-self::$last_node[$name]['time']<5)
            return  self::$last_node[$name]['data'];
        
        self::$last_node[$name]=['data'=>null,'time'=>time()];
        $response = $this->client->instance->list($name, [
            'groupName' => $this->config->get('services.drivers.nacos.group_name'),
            'namespaceId' => $this->config->get('services.drivers.nacos.namespace_id'),
        ]);

        if ($response->getStatusCode() !== 200) 
            throw new RequestException((string) $response->getBody(), $response->getStatusCode());
        

        $data = Json::decode((string) $response->getBody());
        $hosts = $data['hosts'] ?? [];
        $nodes = [];
        foreach ($hosts as $node) 
            if (isset($node['ip'], $node['port']) && ($node['healthy'] ?? false)) 
                $nodes[] = [
                    'host' => $node['ip'],
                    'port' => $node['port'],
                    'weight' => $this->getWeight($node['weight'] ?? 1),
                ];
            
        
        $cacheNode=['data'=>$nodes,'time'=>time()];
        self::$last_node[$name]=$cacheNode;
        return $nodes;
    

3:创建负载均衡器会主动定时刷新节点,代码如下

//AbstractLoadBalancer 类里
  public function refresh(callable $callback, int $tickMs = 5000)
    
        $timerId = Timer::tick($tickMs, function () use ($callback) 
            $nodes = call($callback);
            //这log是我打的
           //Log::get("dev_05")->info("refresh 测试");
            is_array($nodes) && $this->setNodes($nodes);
        );
        Coroutine::create(function () use ($timerId) 
            CoordinatorManager::until(Constants::WORKER_EXIT)->yield();
            Timer::clear($timerId);
        );
    

这代码也可能会有问题,这会导致整个节点实例跟负载均衡器等无法被系统回收,泄露的对象永远无法回收;

图:并发测试后明显的内存泄露,并且同一个服务会同时多并发刷新nocs中心,且刷新不随接口并发结束而结束

访问流程log

//主worker进程 最前面的数字代表进程号 in_rpc_test_time:表示刚进入接口   rpc_get_node_befor:表示实例化消费者客户端时联网获取节点配置前   rpc_get_node_after:代表获取节点配置成功  get_cus:代表实例化消费者客户端完成
[110456 in_rpc_test_time=1662004837.715]  [110456 rpc_get_node_befor]  [110456 rpc_get_node_after]  [110456 get_cus_time=1662004837.8963]  
[110456 in_rpc_test_time=1662004837.9779]  [110456 in_rpc_test_time=1662004837.9787]  [110456 in_rpc_test_time=1662004837.9808]  
[110456 in_rpc_test_time=1662004837.9814]  [110456 get_cus_time=1662004837.9819]  
[110456 in_rpc_test_time=1662004837.9831]  [110456 get_cus_time=1662004837.9837]  
[110456 get_cus_time=1662004837.9851]  [110456 get_cus_time=1662004837.9853]  
[110456 get_cus_time=1662004837.9855]  [110456 in_rpc_test_time=1662004838.057]  
[110456 in_rpc_test_time=1662004838.0629]  [110456 in_rpc_test_time=1662004838.0699]  [110456 in_rpc_test_time=1662004838.0708]  
[110456 get_cus_time=1662004838.0712]  [110456 in_rpc_test_time=1662004838.0769]  [110456 get_cus_time=1662004838.0777]  
[110456 get_cus_time=1662004838.0779]  [110456 get_cus_time=1662004838.0882]  [110456 get_cus_time=1662004838.0885]  
[110456 in_rpc_test_time=1662004838.1327]  [110456 in_rpc_test_time=1662004838.146]  [110456 in_rpc_test_time=1662004838.1484]  
[110456 in_rpc_test_time=1662004838.1579]  [110456 get_cus_time=1662004838.1587]  [110456 get_cus_time=1662004838.1589]  
[110456 get_cus_time=1662004838.1591]  [110456 get_cus_time=1662004838.1753]    

//其他worker进程
[110455 in_rpc_test_time=1662004837.9808]  [110455 rpc_get_node_befor] 
[110455 in_rpc_test_time=1662004837.9845]  [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9855]  [110455 rpc_get_node_befor]  
[110455 in_rpc_test_time=1662004837.9871]  [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9855]  [110455 rpc_get_node_befor]  
[110455 in_rpc_test_time=1662004837.9871]  [110455 rpc_get_node_befor]
[110455 in_rpc_test_time=1662004837.9883]  [110455 rpc_get_node_befor]
[110455 rpc_get_node_after]  [110455 rpc_get_node_after]
[110455 get_cus_time=1662004838.2421]  [110455 get_cus_time=1662004838.2423]  
[110455 rpc_get_node_after]  [110455 get_cus_time=1662004838.2973]  
[110455 rpc_get_node_after]  [110455 rpc_get_node_after]  
[110455 get_cus_time=1662004838.3056]  [110455 get_cus_time=1662004838.3064]  

这里有几点非常奇怪
1:在主worker进程里,这个初始化必定是同步的,看主进程第一行log,所以当worker 进程设置为1 时并不会出现这个协程安全问题,这个哪怕在里面co::sleep 也能保证是同步的…

2:通过网络getNode在非主进程下肯定是导致协程切换了, (看其他进程log前面几行)

3:就算不联网获取node,单 ApplicationContext::getContainer()->get(DefaultConsumer::class) 也会导致协程切换(看主进程第二行log,这里是已经实例化DefaultConsumer了,并且打container->get里打log 也是走到静态return但返回后就会导致协程切换);后面通过去除联网获取nodel,一样也是无法保证单例的情况,说明不单是联网获取Node导致协程切换了

泄露前提:
A:进程内还没有实例化过 AbstractServiceClient
B:Rpc 调用瞬时出现并发
这种情况只在压测时比较常见

解决方案

1:单worker进程,保证初始化时是同步的,但这个对高并发服务是不可接受的

2:生成DefaultConsumer 其他具体服务类继承DefaultConsumer,在DefaultConsumer里添加等待父方法,如下

<?php
/**
 * Created by PhpStorm.
 * User: 05
 * Date: 2021/11/25
 * Time: 15:00
 */

namespace App\\JsonRpc;


use Hyperf\\Rpc\\Protocol;
use Hyperf\\Rpc\\ProtocolManager;
use Hyperf\\RpcClient\\AbstractServiceClient;
use Hyperf\\RpcClient\\Client;
use Hyperf\\Utils\\ApplicationContext;
use Psr\\Container\\ContainerInterface;
use function Swoole\\Coroutine\\Http\\get;

class DefaultConsumer extends AbstractServiceClient


    /**
     * 定义对应服务提供者的服务名称
     * @var string
     */
    protected $serviceName = 'DefaultService';


    /**
     * 定义对应服务提供者的服务协议
     * @var string
     */
    protected $protocol = 'jsonrpc-http';

    protected static $is_initing=0;


    public function __construct(ContainerInterface $container)
    
        self::$is_initing=1;
        parent::__construct($container);
        self::$is_initing=2;
    

    /**
     * @return bool
     * 保证单例同步
     */
    public static function waitInit()
        while(self::$is_initing===1)
            //睡眠1ms 等待第一个实例化
            time_nanosleep(0,1000000);
        
        return true;
    


    public function get(string $method, array $parms)
    
        return $this->__request($method, ['pms'=>$parms]);
    
    

CalculatorServiceConsumer 只需继承 DefaultConsumer 代码如下

<?php
/**
 * Created by PhpStorm.
 * User: 05
 * Date: 2021/11/25
 * Time: 15:00
 */

namespace App\\JsonRpc;

class CalculatorServiceConsumer extends DefaultConsumer


    /**
     * 定义对应服务提供者的服务名称
     * @var string
     */
    protected $serviceName = 'CaculatorService';



调用服务时先判断是否正在实例化,保证单例协程安全即可

 public function rpc_test()
		//判断是否有对象正在实例化中,保证单例协程安全
        if (CalculatorServiceConsumer::waitInit())
            $cus=   ApplicationContext::getContainer()->get(CalculatorServiceConsumer::class);
            $d= $cus->get("add",['a'=>1,'b'=>2]);
        ;
        if (is_array($d))
             $d=json_encode($d);
         
        return [
            '本机ip' => get_server_ip(),
            'message' => "微服务调用结果=".$d,
        ];
    

最后结果:启用服务(2个worker进程)后 ab并发100测试,5s后的刷新日志,每个进程保证一个实例,问题解决

总结:

Swoole 框架无疑大大提高了PHP服务的并发能力,但是由于协程的加入,并且在swoole底层对一些函数的hook,让很多本来是同步的代码变成协程异步了,导致很多逻辑与原始PHP的逻辑有出入,大大提高了PHP的使用门槛,相信没有几个phper 会关心单例安全这个事, 哪怕像hyperf这样比较流行的swoole框架也不能完全避免,暂时来看,非必要还是PHP+FPM会比较稳妥

Springboot+Dubbo+Nacos实现RPC调用

Springboot+Dubbo+Nacos 注解方式实现微服务调用

1、项目结构

|-- dubbo-demo (父级工程)
    |-- dubbo-demo-core (基础工程)
    |-- dubbo-consumer (消费者)
    |-- dubbo-provider (生产者)

SpringBoot版本:2.2.x

Dubbo版本:2.7.3

Nacos版本:1.1.4

2、启动Nacos注册中心


3、搭建项目

dubbo-demo父工程版本控制:

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.2.9.RELEASE</version>
</parent>

<properties>
	<java.version>1.8</java.version>
	<compiler.plugin.version>3.8.1</compiler.plugin.version>
	<war.plugin.version>3.2.3</war.plugin.version>
	<jar.plugin.version>3.1.2</jar.plugin.version>
	<spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
	<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
	<dubbo-spring-boot-starter.version>2.7.3</dubbo-spring-boot-starter.version>
	<nacos-client.version>1.1.4</nacos-client.version>
</properties>

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-alibaba-dependencies</artifactId>
			<version>$spring-cloud-alibaba.version</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-dependencies</artifactId>
			<version>$spring-cloud.version</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.dubbo</groupId>
			<artifactId>dubbo-spring-boot-starter</artifactId>
			<version>$dubbo-spring-boot-starter.version</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba.nacos</groupId>
			<artifactId>nacos-client</artifactId>
			<version>$nacos-client.version</version>
		</dependency>
		<dependency>
			<groupId>org.apache.dubbo</groupId>
			<artifactId>dubbo-registry-nacos</artifactId>
			<version>$dubbo-spring-boot-starter.version</version>
		</dependency>
		<dependency>
			<groupId>org.apache.dubbo</groupId>
			<artifactId>dubbo</artifactId>
			<version>$dubbo-spring-boot-starter.version</version>
		</dependency>
	</dependencies>
</dependencyManagement>

Consumer和Provider的Maven依赖如下:

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.apache.dubbo</groupId>
		<artifactId>dubbo-spring-boot-starter</artifactId>
	</dependency>
	<dependency>
		<groupId>com.alibaba.nacos</groupId>
		<artifactId>nacos-client</artifactId>
	</dependency>
	<dependency>
		<groupId>org.apache.dubbo</groupId>
		<artifactId>dubbo-registry-nacos</artifactId>
		<version>$dubbo-spring-boot-starter.version</version>
	</dependency>
	<dependency>
		<groupId>org.apache.dubbo</groupId>
		<artifactId>dubbo</artifactId>
	</dependency>
	<dependency>
		<groupId>org.example</groupId>
		<artifactId>dubbo-demo-core</artifactId>
		<version>1.0.0</version>
	</dependency>
</dependencies>

Provider配置如下:

server.port=8081
spring.application.name=dubbo-provider-demo

# Dubbo
dubbo.scan.base-packages=com.nari.dubbo.provider.service
dubbo.application.name=$spring.application.name
# 禁用QOS同一台机器可能会有端口冲突现象
#dubbo.application.name.qos-enable=false
#dubbo.application.name.qos-accept-foreign-ip=false
# Dubbo Protocol
dubbo.protocol.name=dubbo
dubbo.protocol.port=12345
## Dubbo Registry
dubbo.registry.address=nacos://localhost:8848

Consumer配置如下:

server.port=8082
spring.application.name=dubbo-consumer-demo
#dubbo.application.name=$spring.application.name

## Dubbo Registry
dubbo.registry.address=nacos://localhost:8848
#dubbo.consumer.timeout=4000

4、core工程编写

package com.nari.dubbo.core;

public interface DemoService 
    String sayHello(String name);

5、Provider工程编写

在启动类上面不要忘记加上@EnableDubbo注解

package com.nari.dubbo.provider;

import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableDubbo
@SpringBootApplication
public class ProviderApplication 

    public static void main(String[] args) 
        SpringApplication.run(ProviderApplication.class, args);
    

实现DemoService接口,注意这里的@Serivce引用的是dubbo的包

package com.nari.dubbo.provider.service;

import com.nari.dubbo.core.DemoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.Service;//dubbo包下
import org.springframework.beans.factory.annotation.Value;

@Slf4j
@Service(version = "$dubbo-demo.version")
public class DemoServiceImpl implements DemoService 

    @Value("$dubbo.application.name")
    private String serviceName;

    @Override
    public String sayHello(String name) 
        log.info("provider被调用");
        return String.format("[%s] : Hello, %s", serviceName, name);
    

6、Consumer工程编写

和Provider工程的启动类一样,加上@EnableDubbo注解

编写测试接口:

package com.nari.dubbo.consumer.controller;

import com.nari.dubbo.core.DemoService;
import org.apache.dubbo.config.annotation.Reference;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/test")
public class TestController 
    @Reference(version = "$dubbo-demo.version")
    private DemoService demoService;

    @GetMapping(value = "/test/name")
    public String sayHello(@PathVariable("name") String name) 
        return demoService.sayHello(name);
    

7、测试

启动Provider工程和Consumer工程,这个时候Nacos会有对应的服务:

使用浏览器请求consumer:

http://localhost:8082/test/test/测试

provider控制台打印日志,说明调用成功

以上是关于PHP Hyperf连接Nacos RPC调用并发内存泄露(协程安全问题)的主要内容,如果未能解决你的问题,请参考以下文章

RPC 中的最大并发连接数是多少?

hyperf实现简单的rpc服务(win10 + docker+consul)

Springboot+Dubbo+Nacos实现RPC调用

Yii2 EasySwoole Hyperf 并发对比

微服务架构第一阶段(nacos,gateWay,RPC)

71 nacos 实现微服务的注册与发现