通过Redis的list来实现 Server - Client 的同步通信
Posted leijiangtao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过Redis的list来实现 Server - Client 的同步通信相关的知识,希望对你有一定的参考价值。
Redis实现类似同步方法调用的功能(一)
首先声明,这么干纯粹是为了好玩。
通常我们用Redis主要是为了存储一些数据,由于数据在内存里,所以查询更新很快。同时我们也可以利用 Pub/Sub 功能来实现消息发布/订阅。但是今天我们来说说怎么通过Redis的list来实现 Server - Client 的同步通信。
具体需求
Client 端运行后监听 Server 端派发的请求,然后执行一些操作,并将结果返回给 Server 端。
实现想法
- 利用 Redis 的 list 数据结构,使用阻塞 pop 的方式实现 Client 端等待派发命令和 Server 端等待返回结果。
- 首先Server端生成一个全局唯一的key,并将key和data一起push到我们指定的一个队列里,这里是“myqueue”。lpush之后,Server端就使用brpop等待从“key”队列返回结果,并设置超时时间为2秒。
- Client端启动后,使用brpop从指定的队列里获取派发的命令,一旦收到Server端派发的数据,Client就会获取key和data,然后做自己的一些处理,处理完成后,就往“key”队列里lpush执行结果。
- 最后,Server端会从“key”队列里使用brpop获取执行结果。
实现代码
import redis import time import json import threading host = ‘localhost‘ port = 6322 queue = ‘myqueue‘ class Server(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): pool = redis.BlockingConnectionPool(host=host, port=port, db=‘0‘) conn = redis.Redis(connection_pool=pool) idx = 0 while True: idx = idx + 1 key = str(idx) data = "request_" + key request = {‘id‘: key, ‘data‘: data} print ‘Server: Send request: %s‘ % request conn.lpush(queue, json.dumps(request)) response = conn.brpop(key, 2) if response: print ‘Server: Receive response: %s‘ % response[1] else: print "Server: Timeout!!!" time.sleep(1) class Client(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): pool = redis.BlockingConnectionPool(host=host, port=port, db=‘0‘) conn = redis.Redis(connection_pool=pool) while True: msg = conn.brpop(queue)[1] print ‘Client: Receive request: %s‘ % msg time.sleep(0.1) d = json.loads(msg) key = d.get(‘id‘) d[‘data‘] = "response_" + key print ‘Client: Send response: %s‘ % d conn.lpush(key, json.dumps(d)) conn.expire(key, 5) server = Server() server.start() client = Client() client.start()
Redis实现类似同步方法调用的功能(二)
接上一篇,这么干纯粹是为了好玩。
上一篇的博客中的例子只能处理一个Server对一个Client的情况,今天修改了一版,可以支持一个Server对多个Client。实现方式就是Server每派发一个动作就扔到一个线程里去,Client也类似每收到一个数据,就起一个线程去做自己的逻辑。这样看起来就有点像socket变成了。
import redis
import time
import json
import threading
host = ‘localhost‘
port = 6322
queue = ‘myqueue‘
class Server(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
pool = redis.BlockingConnectionPool(host=host, port=port, db=‘0‘)
conn = redis.Redis(connection_pool=pool)
idx = 0
while True:
idx = idx + 1
key = str(idx)
data = "request_" + key
threading.Thread(target=ServerHandler(conn, key, data).handle).start()
time.sleep(1)
class ServerHandler(object):
def __init__(self, conn, key, data):
self.conn = conn
self.key = key
self.data = data
def handle(self):
request = {‘id‘: self.key, ‘data‘: self.data}
print ‘Server: Send request: %s‘ % request
self.conn.lpush(queue, json.dumps(request))
response = self.conn.brpop(self.key, 2)
if response:
print ‘Server: Receive response: %s‘ % response[1]
else:
print "Server: Timeout!!!"
class Client(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
pool = redis.BlockingConnectionPool(host=host, port=port, db=‘0‘)
conn = redis.Redis(connection_pool=pool)
while True:
msg = conn.brpop(queue)[1]
threading.Thread(target=ClientHandler(conn, msg).handle).start()
class ClientHandler(object):
def __init__(self, conn, msg):
self.conn = conn
self.msg = msg
def handle(self):
print ‘Client: Receive request: %s‘ % self.msg
time.sleep(0.1)
d = json.loads(self.msg)
key = d.get(‘id‘)
d[‘data‘] = "response_" + key
print ‘Client: Send response: %s‘