python 龙卷风pubsub
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 龙卷风pubsub相关的知识,希望对你有一定的参考价值。
from tornado import ioloop
import json
class Subscriber:
def __init__(self, io_loop=None, poll_interval=100, redis_ip=None, is_json=True):
'''
Parameters:
io_loop: The io_loop to use, None by default
redis_name: Which redis instance to connect to, defaults to None
poll_interval: At what interval we should poll redis in milliseconds (100ms by default)
'''
self.io_loop = io_loop or ioloop.IOLoop.current()
self.connection = redis.StrictRedis(redis_ip, port=6379)
self.pubsub = self.connection.pubsub()
self.callbacks = {}
self.last_values = {}
self.is_json = is_json
self.poller = ioloop.PeriodicCallback(self.poll_redis, poll_interval, io_loop=self.io_loop)
self.next_sub_id = 1
self.max_dispatch = 10
def list_topics(self, channel, topic_pattern=None):
'''List topics (and their values) for a channel and specified topic pattern (wildcards supported) without opening a subscription.
'''
topics_and_values = self.connection.hscan_iter('state_'+channel, match=topic_pattern)
return [ (k, self._deserialize(v)) for k, v in topics_and_values ]
def snap_value(self, channel, topic):
'''Snaps the latest value for a channel/topic without opening a subscription.
Returns None if there is no last value.
'''
last_value = self.connection.hget('state_'+channel, topic)
return self._deserialize(last_value)
def subscribe(self, channel, topic, callback):
'''Subscribe to channel and topic with a callback to be invoked on new messages.
Parameters:
channel: Name of a channel to subscribe
topic: Topic name to subscribe, note wildcards are supported, so * will subscribe to all topics.
callback: A callback function with a signature (channel, topic, message)
Returns a subscription_id which you can use to unsubscribe. Note callbacks are invoked via the
tornado IOLoop.
Notes: If you use a wildcard subscription then you won't get a lastValue
'''
channel_prefix = ('rt_'+channel+'_').encode('utf-8')
full_topic_name = channel_prefix+topic.encode('utf-8')
state_name = ('state_'+channel).encode('utf-8')
# add callback to subscriptions dictionary
was_subscribed = len(self.callbacks) > 0
cbs = self.callbacks.get(full_topic_name)
sub_id = self.next_sub_id
self.next_sub_id += 1
if cbs is None:
if '*' in channel or '*' in topic:
self.pubsub.psubscribe(full_topic_name)
else:
self.pubsub.subscribe(full_topic_name)
cbs = self.callbacks[full_topic_name] = {sub_id: callback}
else:
cbs[sub_id] = callback
is_subscribed = len(self.callbacks) > 0
if not was_subscribed and is_subscribed:
self.poller.start()
# TODO- we should probably integrate this with the io_loop since we will block until
# redis returns all matches
if '*' in topic:
last_values = list(self.connection.hscan_iter(state_name, match=topic))
else:
last_values = [(topic.encode('utf-8'), self.connection.hget(state_name, topic))]
for k, v in last_values:
self.last_values[channel_prefix+k] = v
if not v: continue # no value for this topic
self.io_loop.add_callback(callback, channel, k.decode('utf-8'), {'type': 'last_value', 'data': self._deserialize(v)})
return sub_id
def unsubscribe(self, channel, topic, sub_id):
'''Unsubscribe from channel/topic given a subscription id
'''
full_topic_name = ('rt_'+channel+'_'+topic).encode('utf-8')
cbs = self.callbacks.get(full_topic_name)
if cbs.pop(sub_id, None) is not None:
if len(cbs) == 0:
self.pubsub.unsubscribe(full_topic_name)
del self.callbacks[full_topic_name]
self.last_values.pop(full_topic_name, None)
if len(self.callbacks) == 0:
self.poller.stop()
def callback_from_thread(self, result):
# TODO- plug in redis threaded listener to invoke this function when we get a subscription message
self.io_loop.add_callback(self.handle_sub_result, result)
def handle_sub_result(self, result, verbose=False):
if result is None:
return False # Nothing more to do
if result['type'] not in ('message', 'pmessage'):
# skip redis control message like redis telling us that we have successfully subscribed?
# TODO- Daniel- please look into why we get messages with type: 'subscribe' and if we need to do anything
if verbose:
print('Dropping', result)
# nothing more to do
return False
full_topic_name = result['channel']
pattern = result['pattern']
if pattern is not None:
cbs = self.callbacks.get(pattern)
else:
cbs = self.callbacks.get(full_topic_name)
if cbs is not None:
full_topic_name_str = full_topic_name.decode('utf-8')
_, channel, topic = full_topic_name_str.split('_', 2)
for cb in cbs.values():
result['data'] = self._deserialize(result['data'])
self.io_loop.add_callback(cb, channel, topic, result)
return True
def poll_redis(self):
if len(self.callbacks) == 0:
return
for ndispatch in range(self.max_dispatch):
result = self.pubsub.get_message()
if not self.handle_sub_result(result):
break
def _deserialize(self, msg):
if self.is_json:
if type(msg) is bytes:
msg = msg.decode('utf-8')
if msg is None:
return None
else:
return json.loads(msg)
return msg
以上是关于python 龙卷风pubsub的主要内容,如果未能解决你的问题,请参考以下文章