用python怎样实现异步redis客户端
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用python怎样实现异步redis客户端相关的知识,希望对你有一定的参考价值。
参考技术A 问题里,访问redis的瓶颈在于网络IO开销太大,跟不上cpu的计算速度。有几个办法,可以考虑一下,大前提:cpu计算的数据,前后没什么关联,否则只能用同步方式。想更好的学习python请关注微信公众号“Python基础教程”!1. 内建多个queue,将需要写回redis的数据,另起多线程(与queue一对一)处理。
2. 使用pipeline方式读写redis,减少网络交互开销。
想更好的学习python请关注微信公众号“Python基础教程”! 参考技术B 题主的问题里,访问redis的瓶颈在于网络IO开销太大,跟不上cpu的计算速度。有几个办法,题主可以考虑一下,大前提:cpu计算的数据,前后没什么关联,否则只能用同步方式。
1. 内建多个queue,将需要写回redis的数据,另起多线程(与queue一对一)处理。
2. 使用pipeline方式读写redis,减少网络交互开销。
异步redis队列实现 数据入库
业务需求
app客户端向服务端接口发送来json 数据 每天 发一次 清空缓存后会再次发送
出问题之前业务逻辑:
php 接口 首先将 json 转为数组 去重 在一张大表中插入不存在的数据
该用户已经存在 和新增的id
入另一种详情表
问题所在:
当用户因特殊情况清除缓存 导致app 发送json串 入库并发高 导致CPU 暴增到88% 并且居高不下
优化思路:
1、异步队列处理
2、redis 过滤(就是只处理当天第一次请求)
3、redis 辅助存储app名称(验证过后批量插入数据app名称表中)
4、拼接插入的以及新增的如详细表中
解决办法:
1、接口修改 redis 过滤 + 如list队列 并将结果存入redis中
首先 redis将之前的历史数据放在redis 哈希里面 中文为键名 id 为键值
<?php
/**
* Created by haiyong.
* User: jia
* Date: 2017/9/18
* Time: 20:06
*/
namespace App\\Http\\Controllers\\App;
use App\\Http\\Controllers\\Controller;
use Illuminate\\Http\\Request;
use Illuminate\\Support\\Facades\\DB;
use Illuminate\\Support\\Facades\\Redis;
class OtherAppController extends Controller
/**
* app应用统计接口
* @param Request $request
* @return string
*/
public function appTotal(Request $request)
// //历史数据入库
//$redis = Redis::connection('web_active');
// $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');
// $str = '';
// foreach ($app_name as $key => $val)
// $str.= "$val $key ";
//
// $redis->hmset('app_name', $app_name);
// echo $str;exit;
$result = $request->input('res');
$list = json_decode($result, true);
if (empty ($list) || !is_array($list))
return json_encode(['result' => 'ERROR', 'msg' => 'parameter error']);
$data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
$data['time'] = date('Y-m-d');
$redis_key = 'log_app:'.$data['time'];
//redis 过滤
$redis = Redis::connection('web_active');
//redis 键值过期设置
if (empty($redis->exists($redis_key)))
$redis->hset($redis_key, 1, 'start');
$redis->EXPIREAT($redis_key, strtotime($data['time'].'+2 day'));
//值确定
if ($redis->hexists($redis_key, $data['uid']))
return json_encode(['result' => 'SUCCESS']);
else
//推入队列
$redis->hset($redis_key, $data['uid'], $result);
$redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']);
return json_encode(['result' => 'SUCCESS']);
2、php 脚本循环 监控redis 队列 执行逻辑 防止内存溢出
mget 获取该用户的app id 不存在就会返回null
通过判断null 运用redis 新值作为自增id指针 将null 补齐 之后批量入mysql 并跟新redis 哈希 和指针值 并入库 详情表
<?php
namespace App\\Console\\Commands;
use Illuminate\\Console\\Command;
use Illuminate\\Support\\Facades\\Redis;
use Illuminate\\Support\\Facades\\DB;
use Illuminate\\Support\\Facades\\Storage;
class AppTotal extends Command
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'AppTotal:run';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Command description';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
parent::__construct();
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
//历史数据入库
// $redis = Redis::connection('web_active');
// $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');
// $redis->hmset('app_name', $app_name);
// exit;
while(1)
$redis = Redis::connection('web_active');
//队列名称
$res = $redis->lpop('log_app_list');
//开关按钮
$lock = $redis->get('log_app_lock');
if (!empty($res))
list($date,$uid) = explode(':',$res);
$result = $redis->hget('log_app:'.$date, $uid);
if (!empty($result))
$table_name = 'app_total'.date('Ym');
$list = json_decode($result, true);
$data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
$data['sex'] = isset($list['sex']) ? $list['sex'] : '' ;
$data['device'] = isset($list['device']) ? $list['device'] : '' ;
$data['appList'] = isset($list['list']) ? $list['list'] : '' ;
//数据去重 flip比unique更节约性能
$data['appList'] = array_flip($data['appList']);
$data['appList'] = array_flip($data['appList']);
$data['time'] = date('Y-m-d');
//app应用过滤
$app_res = $redis->hmget('app_name', $data['appList']);
//新增加app数组
$new_app = [];
//mysql 入库数组
$mysql_new_app = [];
//获取当前redis 自增指针
$total = $redis->get('app_name_total');
foreach ($app_res as $key =>& $val)
if (is_null($val))
$total += 1;
$new_app[$data['appList'][$key]] = $total;
$val = $total;
array_push($mysql_new_app,['id' => $total, 'appName'=> $data['appList'][$key]]);
if (count($new_app))
$str = "INSERT IGNORE INTO app_set_name (id,appName) values";
foreach ($new_app as $key => $val)
$str.= "(".$val.",'".$key."'),";
$str = trim($str, ',');
//$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app);
$mysql_res = DB::connection('phpLog')->statement($str);
if ($mysql_res)
// 设置redis 指针
$redis->set('app_name_total', $total);
// redis 数据入库
$redis->hmset('app_name', $new_app);
// 详情数据入库
$data['appList'] = implode(',', $app_res);
//app统计入库
DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList)
values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')");
//log 记录 当文件达到123MB的时候产生内存保错 所有这个地方可是利用日志切割 或者 不写入 日志
Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' success '.$result."\\n");
else
Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' error '.$result."\\n");
//执行间隔
sleep(1);
//结束按钮
if ($lock == 2)
exit;
//内存检测
if(memory_get_usage()>1000*1024*1024)
exit('内存溢出');//大于100M内存退出程序,防止内存泄漏被系统杀死导致任务终端
3、执定 定时任务监控脚本执行情况
crontab -e
/2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1
test.sh 内容 (查看执行命令返回的进程id 如果没有就执行命令开启)
#!/bin/bash
alive=`ps -ef | grep AppTotal | grep -v grep | awk 'print $2'`
if [ ! $alive ]
then
/usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &
fi
记得授权哦 chmod +x test.sh
笔者用的laravel 框架 将命令激活丢入后台
执行命令
/usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &
完事直接 ctrl -c 结束就行 命令以在后台运行 可以用shell 中的命令查看进程id
这样就实现队列异步入库
还有很多问题需要优化!!大致功能已经实现!!!!!!
优化完成后cpu
以上是关于用python怎样实现异步redis客户端的主要内容,如果未能解决你的问题,请参考以下文章