Workerman创建WebSocket客户端和服务端推送数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Workerman创建WebSocket客户端和服务端推送数据相关的知识,希望对你有一定的参考价值。

参考技术A 本文场景:
服务器A:服务端,目的是要接收外部数据处理后放入消息队列。
服务器B:客户端,向服务器A提供数据。

前面有文章测试了基于HTTP的方案,但都是基于一台服务器自己测试。实际在外部服务器测试时单机请求性能很差,远远达不到处理能力,网路性能制约严重。

原基于HTTP的方案,每次推送都要重新建立连接,消耗过大。所以想到了Socket长连接方案。
扩展:HTTP使用TCP 三次握手建立连接,客户端和服务器需要交换3个包。HTTPS除了 TCP 的三个包,还要加上 SSL握手需要的9个包,一共是12个包。

服务端和客户端都是基于Workerman建立的

之前HTTP方案的测试受制于发送瓶颈太严重,单台请求能力才几百每秒,多服务器请求服务端速率正常倍增。

通过GatewayWorker/Workerman搭建Websocket微服务

背景

最近在一些项目需要用到 Websocket实时推送给分组的用户,前端需要传输给后端的信息比较少,通过多方考虑选择了通过 GatewayWorker框架(基于Workerman)搭建微服务。

介绍

Workerman

Workerman是一款纯PHP开发的开源高性能的PHP socket 服务框架。

Workerman不是重复造轮子,它不是一个MVC框架,而是一个更底层更通用的socket服务框架,你可以用它开发tcp代理、梯子代理、做游戏服务器、邮件服务器、ftp服务器、甚至开发一个php版本的redis、php版本的数据库、php版本的nginx、php版本的php-fpm等等。Workerman可以说是PHP领域的一次创新,让开发者彻底摆脱了PHP只能做WEB的束缚。

实际上Workerman类似一个PHP版本的nginx,核心也是多进程+Epoll+非阻塞IO。Workerman每个进程能维持上万并发连接。由于本身常住内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能。同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协以及各种自定义协议。拥有定时器、异步socket客户端、异步Mysql、异步Redis、异步Http、异步消息队列等众多高性能组件。

文档:http://doc.workerman.net/315110

GatewayWorker

GatewayWorker基于Workerman开发的一个项目框架,用于快速开发TCP长连接应用,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等

GatewayWorker使用经典的Gateway和Worker进程模型。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。

GatewayWorker提供非常方便的API,可以全局广播数据、可以向某个群体广播数据、也可以向某个特定客户端推送数据。配合Workerman的定时器,也可以定时推送数据。

文档:http://doc2.workerman.net/326102

Workerman与GatewayWorker的关系

Workerman可以看做是一个纯粹的socket类库,可以开发几乎所有的网络应用,不管是TCP的还是UDP的,长连接的还是短连接的。Workerman代码精简,功能强大,使用灵活,能够快速开发出各种网络应用。同时Workerman相比GatewayWorker也更底层,需要开发者有一定的多进程编程经验。

因为绝大多数开发者的目标是基于Workerman开发TCP长连接应用,而长连接应用服务端有很多共同之处,例如它们有相同的进程模型以及单发、群发、广播等接口需求。所以才有了GatewayWorker框架,GatewayWorker是基于Workerman开发的一个TCP长连接框架,实现了单发、群送、广播等长连接必用的接口。GatewayWorker框架实现了Gateway Worker进程模型,天然支持分布式多服务器部署,扩容缩容非常方便,能够应对海量并发连接。可以说GatewayWorker是基于Workerman实现的一个更完善的专门用于实现TCP长连接的项目框架。

GatewayClient

GatewayClient是GatewayWorker的客户端程序,可以进行推送、分组、统计等操作。

websocket微服务介绍

总体原则,websocket微服务不处理业务逻辑,仅仅是一个单向连接,只负责推送信息。但客户端连接websocket微服务时,websocket微服务返回给客户端clientId,客户端调用接口把clientId传给后端,此时后端就可以通过GatewayClient绑定用户到具体分组。但需要推送时,通过text协议与GatewayWorker通信,把要推送的clientId或者分组传给GatewayWorker,GatewayWorker再推送给客户端。图示如下:

具体实现

安装GatewayWorker内核

新建一个空白项目(不在Laravel/Lumen/ThinkPHP 等PHP框架里),执行

 
   
   
 
  1. composer require workerman/gateway-worker

启动文件

在根目录新建start.php作为启动文件,代码:

 
   
   
 
  1. <?php

  2. ini_set('display_errors', 'on');

  3. use Workerman\Worker;

  4. if(strpos(strtolower(PHP_OS), 'win') === 0)

  5. {

  6.    exit("start.php not support windows, please use start_for_win.bat\n");

  7. }

  8. // 检查扩展

  9. if(!extension_loaded('pcntl'))

  10. {

  11.    exit("Please install pcntl extension. See http://doc3.workerman.net/appendices/install-extension.html\n");

  12. }

  13. if(!extension_loaded('posix'))

  14. {

  15.    exit("Please install posix extension. See http://doc3.workerman.net/appendices/install-extension.html\n");

  16. }

  17. // 标记是全局启动

  18. define('GLOBAL_START', 1);

  19. require_once __DIR__ . '/vendor/autoload.php';

  20. // 加载所有Applications/*/start.php,以便启动所有服务

  21. foreach(glob(__DIR__.'/src/start*.php') as $start_file)

  22. {

  23.    require_once $start_file;

  24. }

  25. // 运行所有服务

  26. Worker::runAll();

注册Register类

GatewayWorker工作原理

src/start_register.php (目录名可以自己定义) 代码:

 
   
   
 
  1. <?php

  2. use \GatewayWorker\Register;

  3. // register 服务必须是text协议

  4. $register = new Register('text://0.0.0.0:1238');

注册Gateway类

Gateway类用于初始化Gateway进程。Gateway进程是暴露给客户端的让其连接的进程。所有客户端的请求都是由Gateway接收然后分发给BusinessWorker处理,同样BusinessWorker也会将要发给客户端的响应通过Gateway转发出去。

src/start_gateway.php 代码:

 
   
   
 
  1. <?php

  2. use \Workerman\Worker;

  3. use \Workerman\WebServer;

  4. use \GatewayWorker\Gateway;

  5. use \GatewayWorker\BusinessWorker;

  6. use \Workerman\Autoloader;

  7. // gateway 进程

  8. $gateway = new Gateway("websocket://0.0.0.0:8282");

  9. // gateway名称,status方便查看

  10. $gateway->name = 'business-gateway';

  11. // gateway进程数

  12. $gateway->count = 2;

  13. // 本机ip,分布式部署时使用内网ip

  14. $gateway->lanIp = '127.0.0.1';

  15. // 内部通讯起始端口,假如$gateway->count=4,起始端口为4000

  16. // 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口

  17. $gateway->startPort = 2900;

  18. $gateway->registerAddress = '127.0.0.1:1238';

注册BusinessWorker类

BusinessWorker是运行业务逻辑的进程,BusinessWorker收到Gateway转发来的事件及请求时会默认调用Events.php中的onConnect onMessage onClose方法处理事件及数据,开发者正是通过实现这些回调控制业务及流程。

src/start_businessworker.php 代码:

 
   
   
 
  1. <?php

  2. use \Workerman\Worker;

  3. use \Workerman\WebServer;

  4. use \GatewayWorker\Gateway;

  5. use \GatewayWorker\BusinessWorker;

  6. use \Workerman\Autoloader;

  7. // bussinessWorker 进程

  8. $worker = new BusinessWorker();

  9. // worker名称

  10. $worker->name = 'Steam-BusinessWorker';

  11. // bussinessWorker进程数量

  12. $worker->count = 1;

  13. $worker->registerAddress = '127.0.0.1:1238';

Events类

Events类用于捕获GatewayWorker事件,在这里可以写一些回调信息。

src/Events.php 代码:

 
   
   
 
  1. <?php

  2. /**

  3. * 用于检测业务代码死循环或者长时间阻塞等问题

  4. * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload

  5. * 然后观察一段时间workerman.log看是否有process_timeout异常

  6. */

  7. //declare(ticks=1);

  8. use \GatewayWorker\Lib\Gateway;

  9. /**

  10. * 主逻辑

  11. * 主要是处理 onConnect onMessage onClose 三个方法

  12. * onConnect 和 onClose 如果不需要可以不用实现并删除

  13. */

  14. class Events

  15. {

  16.    /**

  17.     * 当客户端连接时触发

  18.     * 如果业务不需此回调可以删除onConnect

  19.     *

  20.     * @param int $client_id 连接id

  21.     */

  22.    public static function onConnect($client_id)

  23.    {

  24.        // 向当前client_id发送数据

  25.        Gateway::sendToClient($client_id, json_encode([

  26.            'clientId' => $client_id,

  27.        ]));

  28.    }

  29.    /**

  30.     * 当客户端发来消息时触发

  31.     * @param int $client_id 连接id

  32.     * @param mixed $message 具体消息

  33.     */

  34.    public static function onMessage($client_id, $message)

  35.    {

  36.    }

  37.    /**

  38.     * 当用户断开连接时触发

  39.     * @param int $client_id 连接id

  40.     */

  41.    public static function onClose($client_id)

  42.    {

  43.    }

  44. }

在PHP项目分组或推送给客户端

这是在你自己的项目写的代码,过程:前端调用接口传来clientId,后端绑定到分组,再推送信息给分组或者指定的clientId客户端。

需要在项目中引用GatewayClient包

 
   
   
 
  1. composer require workerman/gatewayclient

代码:

 
   
   
 
  1. // GatewayClient 3.0.0版本以后加了命名空间

  2. use GatewayClient\Gateway;

  3. /**

  4. * === 指定registerAddress表明与哪个GatewayWorker(集群)通讯。===

  5. * GatewayWorker里用Register服务来区分集群,即一个GatewayWorker(集群)只有一个Register服务,

  6. * 其中ip为Register服务运行的ip(如果GatewayWorker是单机部署则ip就是运行GatewayWorker的服务器ip),

  7. * 端口是对应ip的服务器上start_register.php文件中监听的端口,也就是GatewayWorker启动时看到的Register的端口。

  8. * GatewayClient要想推送数据给客户端,必须知道客户端位于哪个GatewayWorker(集群),

  9. * 然后去连这个GatewayWorker(集群)Register服务的 ip:端口,才能与对应GatewayWorker(集群)通讯。

  10. * 这个 ip:端口 在GatewayClient一侧使用 Gateway::$registerAddress 来指定。

  11. *

  12. * === 如果GatewayClient和GatewayWorker不在同一台服务器需要以下步骤 ===

  13. * 1、需要设置start_gateway.php中的lanIp为实际的本机内网ip(如不在一个局域网也可以设置成外网ip),设置完后要重启GatewayWorker

  14. * 2、GatewayClient这里的Gateway::$registerAddress的ip填写填写上面步骤1lanIp所指定的ip,端口

  15. * 3、需要开启GatewayWorker所在服务器的防火墙,让以下端口可以被GatewayClient所在服务器访问,

  16. *    端口包括Rgister服务的端口以及start_gateway.php中lanIp与startPort指定的几个端口

  17. *

  18. * === 如果GatewayClient和GatewayWorker在同一台服务器 ===

  19. * GatewayClient和Register服务都在一台服务器上,ip填写127.0.0.1及即可,无需其它设置。

  20. **/

  21. Gateway::$registerAddress = '127.0.0.1:1236';

  22. // GatewayClient支持GatewayWorker中的所有接口(Gateway::closeCurrentClient Gateway::sendToCurrentClient除外)

  23. Gateway::sendToAll($data);

  24. Gateway::sendToClient($client_id, $data);

  25. Gateway::closeClient($client_id);

  26. Gateway::isOnline($client_id);

  27. Gateway::bindUid($client_id, $uid);

  28. Gateway::isUidOnline($uid);

  29. Gateway::getClientIdByUid($client_id);

  30. Gateway::unbindUid($client_id, $uid);

  31. Gateway::sendToUid($uid, $dat);

  32. Gateway::joinGroup($client_id, $group);

  33. Gateway::sendToGroup($group, $data);

  34. Gateway::leaveGroup($client_id, $group);

  35. Gateway::getClientCountByGroup($group);

  36. Gateway::getClientSessionsByGroup($group);

  37. Gateway::getAllClientCount();

  38. Gateway::getAllClientSessions();

  39. Gateway::setSession($client_id, $session);

  40. Gateway::updateSession($client_id, $session);

  41. Gateway::getSession($client_id);


以上是关于Workerman创建WebSocket客户端和服务端推送数据的主要内容,如果未能解决你的问题,请参考以下文章

F2 workerman 整合入项目

使用workerman实现在线聊天-第一版

osstatus -9801 workerman websocket 小程序不带端口

通过GatewayWorker/Workerman搭建Websocket微服务

swoole和workerman哪个更易开发

WorkerMan中php后端及时推送消息给客户端