封装带SSH跳板机的redis

Posted 彼得潘jd

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了封装带SSH跳板机的redis相关的知识,希望对你有一定的参考价值。

一、封装ssh的redis

二、setting的配置

三、应用示例

import redis
from sshtunnel import SSHTunnelForwarder
from conf.setting import REDIS_online, REDIS_dev

class MyRedis():

    def __del__(self):
        self.server.close()    

    def __init__(self,host,ip,ssh_username,ssh_password,db,password):
        #构造shh-redis链接,使用setting配置文件
        try:
            server = SSHTunnelForwarder(
                ssh_address_or_host=(host, 22),
                ssh_username=ssh_username,
                ssh_password=ssh_password,
                remote_bind_address=(ip, 6379)
            )
            self.server = server
            self.server.daemon_forward_servers = True #不加server关闭不了
            self.server.start()
            pool = redis.ConnectionPool(host=127.0.0.1, port=self.server.local_bind_port, password=password, db=db,
                                        decode_responses=True)
            redis_connect = redis.Redis(connection_pool=pool)
            self.r = redis_connect
        except Exception as e:
            print(redis连接失败,错误信息%s%e)

    def str_get(self,k):
        res = self.r.get(k)
        return res

    # def str_set(self,k,v,time=None):
    #     self.r.set(k,v,time)

    # def delete(self,k):
    #     tag = self.r.exists(k) #判断这个key是否存在
    #     if tag:
    #         self.r.delete(k)
    #         print(‘删除成功‘)
    #     else:
    #         print(‘这个key不存在‘)
    # def hash_get(self,name,k):
    #     res = self.r.hget(name,k)
    #     if res:
    #         return res.decode()
    # def hash_set(self,name,k,v):
    #     self.r.hset(name,k,v)
    # def hash_getall(self,name):
    #     data = {}
    #     # {b‘12‘: b‘1212‘, b‘3‘: b‘sdad‘, b‘4‘: b‘asdadsa‘}
    #     res = self.r.hgetall(name)
    #     if res:
    #         for k,v in res.items():
    #             k =  k.decode()
    #             v = v.decode()
    #             data[k]=v
    #     return data
    # def hash_del(self,name,k):
    #     res = self.r.hdel(name,k)
    #     if res:
    #         print(‘删除成功‘)
    #         return 1
    #     else:
    #         print(‘删除失败,该key不存在‘)
    #         return 0


    # @property
    # def clean_redis(self):
    #     self.r.flushdb()  #清1空redis
    #     print(‘清空redis成功!‘)
    #     return 0


# my = MyRedis(**REDIS_dev)

# if __name__ == ‘__main__‘:
#     my = MyRedis(**REDIS_online)
#     vcode = my.str_get(‘xxx:vcode:xxx:1xx0000xxxx‘)
#     print(vcode)

 

1 REDIS_online = {
2     host: 1xx.xxx.xxx.xxx,
3     ip : 1xx.xx.xx.xx,
4     ssh_username : aaa,
5     ssh_password: xxx,
6     db:2,
7     password : vvvv
8 }

 

技术图片
 1 import unittest,requests
 2 from sshtunnel import SSHTunnelForwarder
 3 from lib.my_redis import MyRedis
 4 # from lib.my_sql import my_sql
 5 from conf.setting import BASE_URL,REDIS_online
 6 from urllib.parse import urljoin
 7 from lib.my_request import MyRequest
 8 from lib.tools import login
 9 
10 class Pt_xxx_xxx(unittest.TestCase):
11     def test_c_xxx(self):
12         ‘‘‘xxxxx‘‘‘
13         url = /xxx/xxx/messageCode
14         real_url = urljoin(BASE_URL, url)
15 
16         data = {xx: xxx%2C2,
17                 xx: xxxA
18                 }
19         res = MyRequest.get(real_url, data)
20         if res.get(msg) == 发xxxxxx,xxx:
21             self.assertEqual(401, res.get(code), msg = 发xxxx==>失败url:%s 失败data:%s%(real_url,data) )
22         else:
23             self.assertEqual(200, res.get(code), msg=发xxxx==>失败url:%s 失败data:%s%(real_url,data) )
24 
25 
26     def test_c_xxxx(self):
27         ‘‘‘xxx‘‘‘
28         url = /xx/xxxx/messageCode
29         real_url = urljoin(BASE_URL, url)
30         my = MyRedis(**REDIS_online)
31         vcode = my.str_get(xx:xx:xxx:1xx000xxx34)
32 
33         data = {xx: xx%2C2,
34                 xxx: 2,
35                 vcode: vcode
36                 }
37         res = MyRequest.get(real_url, data)
38         self.assertEqual(200, res.get(code), msg=获取xx败==>失败url:%s 失败data:%s % (real_url, data))
39 
40 
41 if __name__ == __main__:
42     c=xxx()
43     c.test_c_xx()
44     c.test_c_xx()
应用

 

以上是关于封装带SSH跳板机的redis的主要内容,如果未能解决你的问题,请参考以下文章

ssh远程登陆脚本(带跳板机)

jmeter连接配置带跳板机(SSH)的mysql服务器

XShell SSH 跳板配置之jmeter连接跳板数据库

mysql ssh 跳板机(堡垒机???)连接服务器

跳板机是什么

Jumpserver