封装带SSH跳板机的redis
Posted 彼得潘jd
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了封装带SSH跳板机的redis相关的知识,希望对你有一定的参考价值。
一、封装ssh的redis
二、setting的配置
三、应用示例
import redis from sshtunnel import SSHTunnelForwarder from conf.setting import REDIS_online, REDIS_dev class MyRedis(): def __del__(self): self.server.close() def __init__(self,host,ip,ssh_username,ssh_password,db,password): #构造shh-redis链接,使用setting配置文件 try: server = SSHTunnelForwarder( ssh_address_or_host=(host, 22), ssh_username=ssh_username, ssh_password=ssh_password, remote_bind_address=(ip, 6379) ) self.server = server self.server.daemon_forward_servers = True #不加server关闭不了 self.server.start() pool = redis.ConnectionPool(host=‘127.0.0.1‘, port=self.server.local_bind_port, password=password, db=db, decode_responses=True) redis_connect = redis.Redis(connection_pool=pool) self.r = redis_connect except Exception as e: print(‘redis连接失败,错误信息%s‘%e) def str_get(self,k): res = self.r.get(k) return res # def str_set(self,k,v,time=None): # self.r.set(k,v,time) # def delete(self,k): # tag = self.r.exists(k) #判断这个key是否存在 # if tag: # self.r.delete(k) # print(‘删除成功‘) # else: # print(‘这个key不存在‘) # def hash_get(self,name,k): # res = self.r.hget(name,k) # if res: # return res.decode() # def hash_set(self,name,k,v): # self.r.hset(name,k,v) # def hash_getall(self,name): # data = {} # # {b‘12‘: b‘1212‘, b‘3‘: b‘sdad‘, b‘4‘: b‘asdadsa‘} # res = self.r.hgetall(name) # if res: # for k,v in res.items(): # k = k.decode() # v = v.decode() # data[k]=v # return data # def hash_del(self,name,k): # res = self.r.hdel(name,k) # if res: # print(‘删除成功‘) # return 1 # else: # print(‘删除失败,该key不存在‘) # return 0 # @property # def clean_redis(self): # self.r.flushdb() #清1空redis # print(‘清空redis成功!‘) # return 0 # my = MyRedis(**REDIS_dev) # if __name__ == ‘__main__‘: # my = MyRedis(**REDIS_online) # vcode = my.str_get(‘xxx:vcode:xxx:1xx0000xxxx‘) # print(vcode)
1 REDIS_online = { 2 ‘host‘: ‘1xx.xxx.xxx.xxx‘, 3 ‘ip‘ : ‘1xx.xx.xx.xx‘, 4 ‘ssh_username‘ : ‘aaa‘, 5 ‘ssh_password‘: ‘xxx‘, 6 ‘db‘:2, 7 ‘password‘ : ‘vvvv‘ 8 }
1 import unittest,requests 2 from sshtunnel import SSHTunnelForwarder 3 from lib.my_redis import MyRedis 4 # from lib.my_sql import my_sql 5 from conf.setting import BASE_URL,REDIS_online 6 from urllib.parse import urljoin 7 from lib.my_request import MyRequest 8 from lib.tools import login 9 10 class Pt_xxx_xxx(unittest.TestCase): 11 def test_c_xxx(self): 12 ‘‘‘xxxxx‘‘‘ 13 url = ‘/xxx/xxx/messageCode‘ 14 real_url = urljoin(BASE_URL, url) 15 16 data = {‘xx‘: ‘xxx%2C2‘, 17 ‘xx‘: ‘xxxA‘ 18 } 19 res = MyRequest.get(real_url, data) 20 if res.get(‘msg‘) == ‘发xxxxxx,xxx‘: 21 self.assertEqual(401, res.get(‘code‘), msg = ‘发xxxx==>失败url:%s 失败data:%s‘%(real_url,data) ) 22 else: 23 self.assertEqual(200, res.get(‘code‘), msg=‘发xxxx==>失败url:%s 失败data:%s‘%(real_url,data) ) 24 25 26 def test_c_xxxx(self): 27 ‘‘‘xxx‘‘‘ 28 url = ‘/xx/xxxx/messageCode‘ 29 real_url = urljoin(BASE_URL, url) 30 my = MyRedis(**REDIS_online) 31 vcode = my.str_get(‘xx:xx:xxx:1xx000xxx34‘) 32 33 data = {‘xx‘: ‘xx%2C2‘, 34 ‘xxx‘: 2, 35 ‘vcode‘: vcode 36 } 37 res = MyRequest.get(real_url, data) 38 self.assertEqual(200, res.get(‘code‘), msg=‘获取xx败==>失败url:%s 失败data:%s‘ % (real_url, data)) 39 40 41 if __name__ == ‘__main__‘: 42 c=xxx() 43 c.test_c_xx() 44 c.test_c_xx()
以上是关于封装带SSH跳板机的redis的主要内容,如果未能解决你的问题,请参考以下文章