python发送微信
Posted 每周向前一小步
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python发送微信相关的知识,希望对你有一定的参考价值。
申请企业微信
使用python发送信息到企业微信,同时支持python2与python3环境,需要先申请一个企业微信,然后创建应用,获取以下三个信息
企业IP、Agentid、Secret
网信为创建的应用名称
脚本描述
将以上三个信息替换到脚本中,主要是
class WeiXin(object):部分,其他的辅助性工具类,收集的一些常用脚本可不用关注
#!/usr/bin/env python #coding=utf-8 \'\'\' Created on 2018年2月8日 @author: root \'\'\' from datetime import datetime import sys, os, re, json,socket,time from subprocess import Popen, PIPE from sys import version_info if version_info.major == 3 and version_info.minor >=3: import urllib.request as urllib2 pyversion = 3 elif version_info.major == 3: pyversion = 3 else: import urllib2 pyversion = 2 try: if version_info.major and version_info.major == 3: pyversion=3 elif version_info.major and version_info.major == 2: pyversion=2 else: pyversion=2 except Exception as e: pyversion = 2 localpath = os.path.split(os.path.realpath(__file__))[0] class OSCmd(): \'\'\' OS Command:直接可调用执行命令的方法,不包括业务逻辑 本脚本为分好层次的项目中抽出出来的方法,归为3个类,一个是命令类OSCmd,一个是系统检查类; 为保持代码统计,命令类OSCmd是项目是调试好的代码复制过来的,不在此脚本中修改,每次都去项目中取相应的方法 系统检查逻辑类可以修改 \'\'\' def __init__(self): \'\'\' Constructor \'\'\' def exes(self, cmd_shell): \'\'\' call shell command \'\'\' s = Popen(cmd_shell, shell=True, stdout=PIPE); if pyversion == 3: s = s.encode(encoding="utf-8") return (s.communicate()[0]).strip(\'\\n\') def getexesfstval(self, cmd_shell): \'\'\' call shell command 比如在通过ps -ef过滤进程号的时候,在eclipse中执行可以返回正确的结果,然后在shell在测试脚本时却多返回一个数字(比如13742),这里取第一个数字,舍弃多返回的数字 \'\'\' s = Popen(cmd_shell, shell=True, stdout=PIPE); res = (s.communicate()[0]).strip(\'\\n\') ress = res.split(\'\\n\') return ress[0] def exef(self, filename, args): \'\'\' filename : the file is needed to exec as the way like "./filename args" args: list [] for exp: oscmd.exef(\'/scripts/test/t2.py\', [\'a\',\'b\']) \'\'\' args.insert(0, \'\') if os.path.exists(filename): os.execv(filename, args) else: print(\'The {0} is not exist\'.format(filename)) def getLineFromFile(self, targetFile, *param): \'\'\' 文件中,返回某行记录,适合小文件 f:返回首行 l:返回末行 n:返回第n行,n为正整数 默认返回最后一行数据 \'\'\' global f try: f = open(targetFile); pnum = len(param); with open(targetFile, \'r\') as f: # 打开文件,适合小文件 lines = f.readlines() # 读取所有行 first_line = lines[0] # 取第一行 last_line = lines[-1] # 取最后一行 if pnum > 0: if type(param[0]) == type(\'a\') and param[0].lower() == \'f\': return first_line elif type(param[0]) == type(\'a\') and param[0].lower() == \'l\': return last_line else: return lines[int(param[0]) - 1] return last_line finally: f.close(); def timeminustoS(self, t1, t2): t1 = time.localtime(t1) t1 = time.strftime("%Y-%m-%d %H:%M:%S", t1) t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S") # t2=time.localtime(t2) t2 = time.strftime("%Y-%m-%d %H:%M:%S", t2) t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S") return self.total_seconds(t2 - t1) def total_seconds(self, time_delta): \'\'\' python 2.6 has not total_seconds method \'\'\' return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6 def rmfileFrmNow(self,p_savenum, targetDir, mins, *contain): \'\'\' 删除指定目录下指定分钟之前的文件,不删除目录,也不递归目录 删除/backup/rman/backdb目录下超过30个小时的文件 rmfileFrmNow(\'/backup/rman/backdb\', 30*60) 删除/backup/rman/backdb目录下超过30个小时,包含_MEMDB_20字符的文件 rmfileFrmNow(\'/backup/rman/backdb\', 30*60,\'_MEMDB_20\') 删除/backup/rman/backdb目录下超过30个小时,同时包含back_full_、_MEMDB_20字符的文件 rmfileFrmNow(\'/backup/rman/backdb\', 30*60,\'back_full\',\'_MEMDB_20\') \'\'\' clen = len(contain) defilist = [] if os.path.isdir(targetDir): for fil in os.listdir(targetDir): if clen > 0: for c in contain: if c in str(fil): defilist.append(fil) #排序 defilist = self.get_filist_bytime(defilist) lsz = len(defilist) if lsz > p_savenum: defilist = defilist[0,lsz - p_savenum] if os.path.isdir(targetDir): for fil in os.listdir(targetDir): flag = True if clen > 0: for c in contain: if not c in str(fil): flag = False if flag: fil = os.path.join(targetDir, fil) if os.path.isfile(fil): if self.isBeforeMins(fil, mins): os.remove(fil) def isBeforeMins(self, fil, mins): \'\'\' 判断一个文件的最后修改时间距离当前时间,是否超过指定的分钟数 \'\'\' if os.path.isfile(fil): mtfile = os.path.getmtime(fil) tnow = time.localtime() sec = self.timeminustoS(mtfile, tnow) mms = round(sec / 60) mins = eval(mins) if mms - mins > 0: return True return False def isMorthanSize(self, fil, siz): \'\'\' 判断一个文件是否超过指定的大小,单位为M \'\'\' if os.path.isfile(fil): filsiz = os.path.getsize(fil) fsiz = eval(siz)*1024*1024 if filsiz - fsiz > 0: return True return False def rmfilMorthanSize(self, targetDir, siz, *contain): \'\'\' 删除指定目录下超过指定大小的文件,不删除目录,也不递归目录 删除/backup/rman/backdb目录下超过2G大小的文件 rmfileFrmNow(\'/backup/rman/backdb\', 2*1024) 删除/backup/rman/backdb目录下超过10G大小,包含_MEMDB_20字符的文件 rmfileFrmNow(\'/backup/rman/backdb\', 10*1024,\'_MEMDB_20\') 删除/backup/rman/backdb目录下超过3G大小,同时包含back_full_、_MEMDB_20字符的文件 rmfileFrmNow(\'/backup/rman/backdb\', 3*1024,\'back_full\',\'_MEMDB_20\') \'\'\' clen = len(contain) if os.path.isdir(targetDir): for fil in os.listdir(targetDir): flag = True if clen > 0: for c in contain: if not c in str(fil): flag = False if flag: fil = os.path.join(targetDir, fil) if os.path.isfile(fil): if self.isMorthanSize(fil, siz): os.remove(fil) def mkdir(self, dr, *mod): # import stat s1 = str(dr) if s1.startswith("~/"): s1 = s1[1:] homedir = os.environ[\'HOME\'] s1 = \'%s%s\' % (homedir, s1) if not os.path.exists(s1): cmd_shell = \'mkdir -p %s\' % (s1) # os.mkdir(dir) 这个命令不识别linux 用户home目录“~”符号 self.exes(cmd_shell) p_num = len(mod) chmod = \'chmod -R 755 %s\' % (s1) if p_num == 1: chmod = \'chmod -R %s %s\' % (mod[0], s1) self.exes(chmod) # os.chmod(dir, mod) else: # os.chmod(dir, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH) 该行会抛出异常 TypeError: coercing to Unicode: need string or buffer, builtin_function_or_method found self.exes(chmod) return s1 def mknod(self, filename, *mod): s1 = str(filename) if s1.startswith("~/"): s1 = s1[1:] homedir = os.environ[\'HOME\'] s1 = \'%s%s\' % (homedir, s1) if not os.path.exists(s1): cmd_shell = \'touch %s\' % (s1) self.exes(cmd_shell) p_num = len(mod) chmod = \'chmod -R 644 %s\' % (s1) if p_num == 1: chmod = \'chmod -R %s %s\' % (mod[0], s1) self.exes(chmod) else: self.exes(chmod) return s1 def get_filist_bytime(self,file_path): dir_list = os.listdir(file_path) if not dir_list: return else: # 注意,这里使用lambda表达式,将文件按照最后修改时间顺序升序排列 # os.path.getmtime() 函数是获取文件最后修改时间 # os.path.getctime() 函数是获取文件最后创建时间 dir_list = sorted(dir_list, key=lambda x: os.path.getmtime(os.path.join(file_path, x))) return dir_list def shichaByMin(self, t1, t2): \'\'\' 计算以下三种类型之间的时间差 time.time()-浮点型,time.localtime()-struct_time型、datetime.now()-datetime型 \'\'\' t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S") t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S") return round(self.total_seconds(t2 - t1) / 60) def total_seconds(self, time_delta): \'\'\' python 2.6 has not total_seconds method \'\'\' return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6 class Properties: # 脚本默认配置路径 oscheck_properties = \'/tmp/.python-eggs/.oscheck.properties\' file_name = \'\' oscmd = OSCmd() def __init__(self, file_name=\'.oscheck.properties\'): \'\'\' 将配置文件转化为列表,以列表的读取方式进行值的替换 \'\'\' dr = \'/tmp/.python-eggs/\' if not os.path.exists(dr): # 当目录以~开头时该行永为True,但下面的语句会自判断 self.oscmd.mkdir(dr,\'777\') if not os.path.exists(file_name): # 当目录以~开头时该行永为True,但下面的语句会自判断 file_name = self.oscmd.mknod(file_name,\'666\') # os.mknod(file_name) ~ self.file_name = file_name self.properties = {} try: fopen = open(self.file_name, \'r\') for line in fopen: line = line.strip() if line.find(\'=\') > 0 and not line.startswith(\'#\'): strs = line.split(\'=\') self.properties[strs[0].strip()] = strs[1].strip() except Exception as e: raise e else: fopen.close() def has_key(self, key): return key in self.properties def keys(self, key): return self.properties.keys(); def get(self, key, default_value=\'\'): if key in self.properties: return self.properties[key] return default_value def put(self, key, value): self.properties[key] = value self.replace_property(self.file_name, key + \'=.*\', key + \'=\' + str(value), True) def putjson(self, key, values): \'\'\' 以json的形式存储prop中的value,可以处理一些特殊字符,比如= \'\'\' pj = {key: values} value = json.dumps(pj) self.put(key, value) def getjsonvalue(self, key, default_value=\'\'): \'\'\' 以json的形式存储prop中的value,可以处理一些特殊字符,比如= \'\'\' if key in self.properties: value = self.get(key) valuedict = json.loads(value) return valuedict[key] return default_value def toasc(self, ss): asclst = [] for em in bytearray(ss): asclst.append(em) return asclst def asctostr(self, asclst): ss = \'\' for em in asclst: ss += str(chr(em)) return ss def putpwd(self, key, value): \'\'\' 字符串以ASCII码存储 \'\'\' asclst = self.toasc(value) self.putjson(key, asclst) def getpwd(self, key): asclst = self.getjsonvalue(key) pwd = self.asctostr(asclst) return pwd def replace_property(self, file_name, from_regex, to_str, append_on_not_exists=True): \'\'\' 新写入数据后,替换文件以永久保存 :param file_name: :param from_regex: :param to_str: :param append_on_not_exists: :return: \'\'\' import tempfile tmpfile = tempfile.TemporaryFile() if os.path.exists(file_name): r_open = open(file_name, \'r\') pattern = re.compile(r\'\' + from_regex) found = None for line in r_open: if pattern.search(line) and not line.strip().startswith(\'#\'): found = True line = re.sub(from_regex, to_str, line) if pyversion == 3: line = line.encode(encoding="utf-8") tmpfile.write(line) if not found and append_on_not_exists: to_str = \'\\n\' + to_str if pyversion == 3: to_str = to_str.encode(encoding="utf-8") tmpfile.write(to_str) r_open.close() tmpfile.seek(0) content = tmpfile.read() if os.path.exists(file_name): os.remove(file_name) w_open = open(file_name, \'w\') if pyversion == 3: content = content.decode(\'utf-8\') w_open.write(content) w_open.close() tmpfile.close() else: print("file %s not found" % (file_name)) def parsePro(file_name=\'/tmp/.python-eggs/.oscheck.properties\'): return Properties(file_name) class Log: def __init__(self): \'\'\' Constructor \'\'\' logfile = os.path.join(localpath, \'scheck.log\') def setlog(self, logfile): self.logfile = logfile def log(self, strs, sp=\'a\'): open(self.logfile, sp).write(\'%s\\n\' % (strs)) def inserttime(self, sp=\'a\'): now = datetime.now() strs = now.strftime(\'%Y-%m-%d %H:%M:%S\') oscmd = OSCmd() oscmd.mknod(self.logfile, \'666\') open(self.logfile, sp).write(\'%s\\n\' % (strs)) def frmMsg(self,content): \'\'\' 格式化信息输出 :param content: :return: \'\'\' curtime = datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\') hname = socket.gethostname() IPADDR = socket.gethostbyname(hname) cnt = "内容:{0} <br/>时间:{1} <br/>信息来自 {2} {3}".format(content,curtime,hname,IPADDR) return cnt class WeiXin(object): \'\'\' 发送微信 \'\'\' token_url = \'https://qyapi.weixin.qq.com/cgi-bin/gettoken\' cropmsg ={ \'corpid\' : \'wwe***2ed*********\', \'corpsecret\' : \'Mgyi*****ahx3O-******HkLfg\' } sendmsg = {} access_token_key="weixin_access_token" time_token_key="weixin_tokenkey_gettime" oscmd = OSCmd() prop = parsePro() log = Log() def __init__(self): \'\'\' Constructor \'\'\' def formatContent(self,content): cnt=self.log.frmMsg(content) return cnt def setMsg(self,param): arg_num=len(param) content = param[0] content = self.formatContent(content) if pyversion == 2: content = content.decode(\'utf-8\') if arg_num == 1 : touser="@all" agentid="1000009" elif arg_num == 2 : touser=param[1] agentid="1000009" elif arg_num == 3 : touser=param[1] agentid=param[2] self.sendmsg = { "touser":touser, "agentid":agentid, "msgtype": "text", "text":{ "content":content } } def updateToken(self): access_token_response = self.geturl(self.token_url, self.cropmsg) access_token = (json.loads(access_token_response)[\'access_token\']).encode(\'utf-8\') self.prop.put(self.access_token_key, access_token) self.prop.put(self.time_token_key, datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\')) def get_access_token(self): \'\'\' 获取访问凭证,经过编码的一串字符串数据 每两个小时取一次即可 \'\'\' if not self.prop.has_key(self.time_token_key): self.updateToken() else: curtime = datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\') oldtime = self.prop.get(self.time_token_key) shicha = self.oscmd.shichaByMin(oldtime,curtime) if shicha > 110: self.updateToken() return self.prop.get(self.access_token_key) def encodeurl(self,dic): \'\'\' 将字典转换为url参数传值方式,key1=value1&key2=value2 \'\'\' data = \'\' for k,v in dic.items(): data += \'%s=%s%s\' % (k,v,\'&\') return data def geturl(self,url,data): \'\'\' data为字典类型的参数, 返回类型<type \'unicode\'>,json \'\'\' data = self.encodeurl(data) response = urllib2.urlopen(\'%s?%s\' % (url,data)) return response.read().decode(\'utf-8\') def posturl(self,url,data,isjson = True): \'\'\' 发送json数据 返回类型<type \'unicode\'>,json \'\'\' if isjson: data = json.dumps(data) #dict if pyversion == 3 : data = data.encode(encoding="utf-8") response = urllib2.urlopen(url,data) return response.read().decode(\'utf-8\') def sampleSend(self,content): self.setMsg(content) # 获取企业访问凭证 access_token = self.get_access_token() sendmsg_url = \'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s\' % access_token print (self.posturl(sendmsg_url,self.sendmsg)) def showExample(self): if len(sys.argv) == 4: touser,notuse,content = sys.argv[1:] else: print (\'error segments, now exit\') sys.exit() def send(self,param): ll = Log() arg_num=len(param) if arg_num >=1: self.sampleSend(param) #ll.log(\'%s\'%(param)) return \'\' def showUsing(self): print (\'There must be more than 1 params.以上是关于python发送微信的主要内容,如果未能解决你的问题,请参考以下文章