Swoole Redis 连接池的实现

Posted 新亮笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Swoole Redis 连接池的实现相关的知识,希望对你有一定的参考价值。

第 85 篇文章

这是关于 Swoole 入门学习的第九篇文章:Swoole Redis 连接池的实现。

概述

收到读者反馈,“亮哥,文章能多点图片吗?就是将运行结果以图片的形式展示...”

我个人觉得这是比较懒、动手能力差的表现,恩... 要勤快些。

但谁让文章是写给你们看的那,我以后尽量文章写的图文并茂一点。

上篇文章 分享了 mysql 连接池,这篇文章 咱们来分享下 Redis 连接池。

在上篇文章的基础上进行简单调整即可,将实例化 MySQL 的地方,修改成实例化 Redis 即可,还要注意一些方法的调整。

这篇文章仅仅只实现一个 Redis 连接池,篇幅就太少了,顺便将前几篇整合一下。

Demo 中大概包含这些点:

  • 实现 MySQL 连接池

  • 实现 MySQL CURD 方法的定义

  • 实现 Redis 连接池

  • 实现 Redis 方法的定义

  • 满足 HTTP、TCP、WebSocket 调用

  • 提供 Demo 供测试

  • 调整 目录结构

HTTP 调用:

  • 实现 读取 MySQL 中数据的 Demo

  • 实现 读取 Redis 中数据的 Demo

TCP 调用:

  • 实现 读取 MySQL 中数据的 Demo

  • 实现 读取 Redis 中数据的 Demo

Swoole Redis 连接池的实现

WebSocket 调用:

  • 实现 每秒展示 API 调用量 Demo

目录结构

 
   
   
 
  1. ├─ client

  2. ├─ http

  3. ├── mysql.php //测试 MySQL 连接

  4. ├── redis.php //测试 Redis 连接

  5. ├─ tcp

  6. ├── mysql.php //测试 MySQL 连接

  7. ├── redis.php //测试 Redis 连接

  8. ├─ websocket

  9. ├── index.html //实现 API 调用量展示

  10. ├─ controller

  11. ├─ Order.php //实现 MySQL CURD

  12. ├─ Product.php //实现 Redis 调用

  13. ├─ Statistic.php //模拟 API 调用数据

  14. ├─ server

  15. ├─ config

  16. ├── config.php //默认配置

  17. ├── mysql.php //MySQL 配置

  18. ├── redis.php //Redis 配置

  19. ├─ core

  20. ├── Common.php //公共方法

  21. ├── Core.php //核心文件

  22. ├── HandlerException.php //异常处理

  23. ├── callback //回调处理

  24. ├── OnRequest.php

  25. ├── OnReceive.php

  26. ├── OnTask.php

  27. ├── ...

  28. ├── mysql

  29. ├── MysqlDB.php

  30. ├── MysqlPool.php

  31. ├── redis

  32. ├── RedisDB.php

  33. ├── RedisPool.php

  34. ├─ log -- 需要 读/写 权限

  35. ├── ...

  36. ├─ index.php //入口文件

代码

server/core/redis/RedisPool.php

 
   
   
 
  1. <?php


  2. if (!defined('SERVER_PATH')) exit("No Access");


  3. class RedisPool

  4. {

  5. private static $instance;

  6. private $pool;

  7. private $config;


  8. public static function getInstance($config = null)

  9. {

  10. if (empty(self::$instance)) {

  11. if (empty($config)) {

  12. throw new RuntimeException("Redis config empty");

  13. }

  14. self::$instance = new static($config);

  15. }

  16. return self::$instance;

  17. }


  18. public function __construct($config)

  19. {

  20. if (empty($this->pool)) {

  21. $this->config = $config;

  22. $this->pool = new chan($config['master']['pool_size']);

  23. for ($i = 0; $i < $config['master']['pool_size']; $i++) {

  24. go(function() use ($config) {

  25. $redis = new RedisDB();

  26. $res = $redis->connect($config);

  27. if ($res === false) {

  28. throw new RuntimeException("Failed to connect redis server");

  29. } else {

  30. $this->pool->push($redis);

  31. }

  32. });

  33. }

  34. }

  35. }


  36. public function get()

  37. {

  38. if ($this->pool->length() > 0) {

  39. $redis = $this->pool->pop($this->config['master']['pool_get_timeout']);

  40. if (false === $redis) {

  41. throw new RuntimeException("Pop redis timeout");

  42. }

  43. defer(function () use ($redis) { //释放

  44. $this->pool->push($redis);

  45. });

  46. return $redis;

  47. } else {

  48. throw new RuntimeException("Pool length <= 0");

  49. }

  50. }

  51. }

server/core/redis/RedisDB.php

 
   
   
 
  1. <?php


  2. if (!defined('SERVER_PATH')) exit("No Access");


  3. class RedisDB

  4. {

  5. private $master;

  6. private $slave;

  7. private $config;


  8. public function __call($name, $arguments)

  9. {

  10. // TODO 主库的操作

  11. $command_master = ['set', 'hset', 'sadd'];


  12. if (!in_array($name, $command_master)) {

  13. $db = $this->_get_usable_db('slave');

  14. } else {

  15. $db = $this->_get_usable_db('master');

  16. }

  17. $result = call_user_func_array([$db, $name], $arguments);

  18. return $result;

  19. }


  20. public function connect($config)

  21. {

  22. //主库

  23. $master = new SwooleCoroutineRedis();

  24. $res = $master->connect($config['master']['host'], $config['master']['port']);

  25. if ($res === false) {

  26. throw new RuntimeException($master->errCode, $master->errMsg);

  27. } else {

  28. $this->master = $master;

  29. }


  30. //从库

  31. $slave = new SwooleCoroutineRedis();

  32. $res = $slave->connect($config['slave']['host'], $config['slave']['port']);

  33. if ($res === false) {

  34. throw new RuntimeException($slave->errCode, $slave->errMsg);

  35. } else {

  36. $this->slave = $slave;

  37. }


  38. $this->config = $config;

  39. return $res;

  40. }


  41. private function _get_usable_db($type)

  42. {

  43. if ($type == 'master') {

  44. if (!$this->master->connected) {

  45. $master = new SwooleCoroutineRedis();

  46. $res = $master->connect($this->config['master']['host'], $this->config['master']['port']);

  47. if ($res === false) {

  48. throw new RuntimeException($master->errCode, $master->errMsg);

  49. } else {

  50. $this->master = $master;

  51. }

  52. }

  53. return $this->master;

  54. } elseif ($type == 'slave') {

  55. if (!$this->slave->connected) {

  56. $slave = new SwooleCoroutineRedis();

  57. $res = $slave->connect($this->config['slave']['host'], $this->config['slave']['port']);

  58. if ($res === false) {

  59. throw new RuntimeException($slave->errCode, $slave->errMsg);

  60. } else {

  61. $this->slave = $slave;

  62. }

  63. }

  64. return $this->slave;

  65. }

  66. }

  67. }

client/http/redis.php

 
   
   
 
  1. <?php


  2. $demo = [

  3. 'type' => 'SW',

  4. 'token' => 'Bb1R3YLipbkTp5p0',

  5. 'param' => [

  6. 'class' => 'Product',

  7. 'method' => 'set',

  8. 'param' => [

  9. 'key' => 'C4649',

  10. 'value' => '订单-C4649'

  11. ],

  12. ],

  13. ];


  14. $ch = curl_init();

  15. $options = [

  16. CURLOPT_URL => 'http://10.211.55.4:9509/',

  17. CURLOPT_POST => 1,

  18. CURLOPT_POSTFIELDS => json_encode($demo),

  19. ];

  20. curl_setopt_array($ch, $options);

  21. curl_exec($ch);

  22. curl_close($ch);

client/tpc/redis.php

 
   
   
 
  1. <?php


  2. class Client

  3. {

  4. private $client;


  5. public function __construct() {

  6. $this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);


  7. $this->client->on('Connect', [$this, 'onConnect']);

  8. $this->client->on('Receive', [$this, 'onReceive']);

  9. $this->client->on('Close', [$this, 'onClose']);

  10. $this->client->on('Error', [$this, 'onError']);

  11. }


  12. public function connect() {

  13. if(!$fp = $this->client->connect("0.0.0.0", 9510, 1)) {

  14. echo "Error: {$fp->errMsg}[{$fp->errCode}]".PHP_EOL;

  15. return;

  16. }

  17. }


  18. public function onConnect() {


  19. fwrite(STDOUT, "测试RPC (Y or N):");

  20. swoole_event_add(STDIN, function() {

  21. $msg = trim(fgets(STDIN));

  22. if ($msg == 'y') {

  23. $this->send();

  24. }

  25. fwrite(STDOUT, "测试RPC (Y or N):");

  26. });

  27. }


  28. public function onReceive($cli, $data) {

  29. echo '[Received]:'.$data;

  30. }


  31. public function send() {

  32. $demo = [

  33. 'type' => 'SW',

  34. 'token' => 'Bb1R3YLipbkTp5p0',

  35. 'param' => [

  36. 'class' => 'Product',

  37. 'method' => 'get',

  38. 'param' => [

  39. 'code' => 'C4649'

  40. ],

  41. ],

  42. ];

  43. $this->client->send(json_encode($demo));

  44. }


  45. public function onClose() {

  46. echo "Client close connection".PHP_EOL;

  47. }


  48. public function onError() {


  49. }

  50. }


  51. $client = new Client();

  52. $client->connect();

client/websocket/index.html

 
   
   
 
  1. <!DOCTYPE html>

  2. <html>

  3. <head>

  4. <meta charset="utf-8">

  5. <meta http-equiv="X-UA-Compatible" content="IE=edge">

  6. <meta name="viewport" content="width=device-width, initial-scale=1">

  7. <meta name="description" content="">

  8. <meta name="keywords" content="">

  9. <title>Demo</title>

  10. <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.js"></script>

  11. <script src="http://echarts.baidu.com/gallery/vendors/echarts/echarts.min.js"></script>

  12. </head>

  13. <body>

  14. <!-- 为ECharts准备一个具备大小(宽高)的Dom -->

  15. <div id="main" style="width: 900px;height:400px;"></div>

  16. <script type="text/javascript">

  17. if ("WebSocket" in window) {

  18. // 基于准备好的dom,初始化echarts实例

  19. var myChart = echarts.init(document.getElementById('main'));

  20. var wsServer = 'ws://10.211.55.4:9509';

  21. var ws = new WebSocket(wsServer);


  22. ws.onopen = function (evt) {

  23. if (ws.readyState == 1) {

  24. console.log('WebSocket 连接成功...');

  25. } else {

  26. console.log('WebSocket 连接失败...');

  27. }


  28. if (ws.readyState == 1) {

  29. ws.send('开始请求...');

  30. } else {

  31. alert('WebSocket 连接失败');

  32. }

  33. };


  34. ws.onmessage = function (evt) {

  35. console.log('Retrieved data from server: ' + evt.data);

  36. var evt_data = jQuery.parseJSON(evt.data);

  37. myChart.setOption({

  38. xAxis: {

  39. data : evt_data.time

  40. },

  41. series: [{

  42. data: evt_data.value

  43. }]

  44. });


  45. };


  46. ws.onerror = function (evt) {

  47. alert('WebSocket 发生错误');

  48. console.log(evt);

  49. };


  50. ws.onclose = function() {

  51. alert('WebSocket 连接关闭');

  52. console.log('WebSocket 连接关闭...');

  53. };


  54. // 指定图表的配置项和数据

  55. $.ajax({

  56. url : 'http://10.211.55.4:9509/', // 请求url

  57. type : "post", // 提交方式

  58. dataType : "json", // 数据类型

  59. data : {

  60. 'type' : 'SW',

  61. 'token' : 'Bb1R3YLipbkTp5p0',

  62. 'param' : {

  63. 'class' : 'Statistic',

  64. 'method' : 'init'

  65. }

  66. },

  67. beforeSend:function() {


  68. },

  69. success : function(rs) {

  70. if (rs.code != 1) {

  71. alert('获取数据失败');

  72. } else {

  73. var option = {

  74. title: {

  75. text: 'API 调用量',

  76. x:'center'

  77. },

  78. tooltip: {

  79. trigger: 'axis',

  80. axisPointer: {

  81. animation: false

  82. }

  83. },

  84. xAxis: {

  85. type : 'category',

  86. data : rs.data.time

  87. },

  88. yAxis: {

  89. type: 'value',

  90. boundaryGap: [0, '100%'],

  91. name: '使用量',

  92. splitLine: {

  93. show: false

  94. }

  95. },

  96. series: [{

  97. name: '使用量',

  98. type: 'line',

  99. showSymbol: false,

  100. hoverAnimation: false,

  101. data: rs.data.value

  102. }]

  103. };


  104. // 使用刚指定的配置项和数据显示图表。

  105. if (option && typeof option === "object") {

  106. myChart.setOption(option, true);

  107. }

  108. }

  109. },

  110. error : function(){

  111. alert('服务器请求异常');

  112. }

  113. });

  114. } else {

  115. alert("您的浏览器不支持 WebSocket!");

  116. }

  117. </script>

  118. </body>

  119. </html>

还涉及到,OnMessage.php、OnTask.php 、OnWorkerStart.php 等,就不贴代码了。

运行

小框架的启动/关闭/热加载,看看这篇文章:

里面 Demo 在 client 文件夹下。

http 目录下的文件,放到自己虚拟目录下,用浏览器访问。

tcp 目录下的文件,在 CLI 下运行。

websocket 目录下的文件,直接点击在浏览器访问。

扩展

官方协程 Redis 客户端手册:

https://wiki.swoole.com/wiki/page/589.html

大家可以尝试使用官方提供的其他方法。

小结

Demo 代码仅供参考,里面有很多不严谨的地方,根据自己的需要进行修改 ...

上面的 Demo 需要源码的,加我微信。(菜单-> 加我微信-> 扫我)

推荐阅读

本文欢迎转发,转发请注明作者和出处,谢谢!

以上是关于Swoole Redis 连接池的实现的主要内容,如果未能解决你的问题,请参考以下文章

Swoole MySQL 连接池的实现

Swoole MySQL 连接池的实现

Swoole MySQL 连接池的实现

用Swoole4 打造高并发的PHP协程Mysql连接池

Easyswoole Mysqli连接池的使用

Redis-py连接池的实现