python发送微信

Posted 每周向前一小步

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python发送微信相关的知识,希望对你有一定的参考价值。

申请企业微信

使用python发送信息到企业微信,同时支持python2与python3环境,需要先申请一个企业微信,然后创建应用,获取以下三个信息

企业IP、Agentid、Secret

网信为创建的应用名称

 

 脚本描述

 将以上三个信息替换到脚本中,主要是

class WeiXin(object):部分,其他的辅助性工具类,收集的一些常用脚本可不用关注
#!/usr/bin/env python 
#coding=utf-8
\'\'\'
Created on 2018年2月8日

@author: root
\'\'\'


from datetime import datetime
import sys, os, re, json,socket,time
from subprocess import Popen, PIPE
from sys import version_info
if version_info.major == 3 and version_info.minor >=3:
    import urllib.request as urllib2
    pyversion = 3
elif version_info.major == 3:
    pyversion = 3
else:
    import urllib2
    pyversion = 2


try:
    if version_info.major and version_info.major == 3:
        pyversion=3
    elif version_info.major and version_info.major == 2:
        pyversion=2
    else:
        pyversion=2
except Exception as e:
    pyversion = 2

localpath = os.path.split(os.path.realpath(__file__))[0]


class OSCmd():
    \'\'\'
    OS Command:直接可调用执行命令的方法,不包括业务逻辑
    本脚本为分好层次的项目中抽出出来的方法,归为3个类,一个是命令类OSCmd,一个是系统检查类;
    为保持代码统计,命令类OSCmd是项目是调试好的代码复制过来的,不在此脚本中修改,每次都去项目中取相应的方法
    系统检查逻辑类可以修改
    \'\'\'

    def __init__(self):
        \'\'\'
        Constructor
        \'\'\'

    def exes(self, cmd_shell):
        \'\'\'
        call shell command
        \'\'\'
        s = Popen(cmd_shell, shell=True, stdout=PIPE);
        if pyversion == 3:
            s = s.encode(encoding="utf-8")
        return (s.communicate()[0]).strip(\'\\n\')

    def getexesfstval(self, cmd_shell):
        \'\'\'
        call shell command
        比如在通过ps -ef过滤进程号的时候,在eclipse中执行可以返回正确的结果,然后在shell在测试脚本时却多返回一个数字(比如13742),这里取第一个数字,舍弃多返回的数字
        \'\'\'
        s = Popen(cmd_shell, shell=True, stdout=PIPE);
        res = (s.communicate()[0]).strip(\'\\n\')
        ress = res.split(\'\\n\')
        return ress[0]

    def exef(self, filename, args):
        \'\'\'
        filename : the file is needed to exec as the way like "./filename args"
        args: list []
        for exp: oscmd.exef(\'/scripts/test/t2.py\', [\'a\',\'b\'])
        \'\'\'
        args.insert(0, \'\')
        if os.path.exists(filename):
            os.execv(filename, args)
        else:
            print(\'The {0} is not exist\'.format(filename))

    def getLineFromFile(self, targetFile, *param):
        \'\'\'
        文件中,返回某行记录,适合小文件
        f:返回首行
        l:返回末行
        n:返回第n行,n为正整数
        默认返回最后一行数据
        \'\'\'
        global f
        try:
            f = open(targetFile);
            pnum = len(param);
            with open(targetFile, \'r\') as f:  # 打开文件,适合小文件
                lines = f.readlines()  # 读取所有行
                first_line = lines[0]  # 取第一行
                last_line = lines[-1]  # 取最后一行

            if pnum > 0:
                if type(param[0]) == type(\'a\') and param[0].lower() == \'f\':
                    return first_line
                elif type(param[0]) == type(\'a\') and param[0].lower() == \'l\':
                    return last_line
                else:
                    return lines[int(param[0]) - 1]
            return last_line
        finally:
            f.close();

    

    def timeminustoS(self, t1, t2):
        t1 = time.localtime(t1)
        t1 = time.strftime("%Y-%m-%d %H:%M:%S", t1)
        t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S")

        # t2=time.localtime(t2)
        t2 = time.strftime("%Y-%m-%d %H:%M:%S", t2)
        t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S")
        return self.total_seconds(t2 - t1)

    def total_seconds(self, time_delta):
        \'\'\'
        python 2.6 has not total_seconds method
        \'\'\'
        return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6

    def rmfileFrmNow(self,p_savenum, targetDir, mins, *contain):
        \'\'\'
        删除指定目录下指定分钟之前的文件,不删除目录,也不递归目录

        删除/backup/rman/backdb目录下超过30个小时的文件
        rmfileFrmNow(\'/backup/rman/backdb\', 30*60)

        删除/backup/rman/backdb目录下超过30个小时,包含_MEMDB_20字符的文件
        rmfileFrmNow(\'/backup/rman/backdb\', 30*60,\'_MEMDB_20\')

        删除/backup/rman/backdb目录下超过30个小时,同时包含back_full_、_MEMDB_20字符的文件
        rmfileFrmNow(\'/backup/rman/backdb\', 30*60,\'back_full\',\'_MEMDB_20\')
        \'\'\'
        clen = len(contain)
        defilist = []
        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                if clen > 0:
                    for c in contain:
                        if c in str(fil):
                            defilist.append(fil)
        #排序
        defilist = self.get_filist_bytime(defilist)
        lsz = len(defilist)
        if  lsz > p_savenum:
            defilist = defilist[0,lsz - p_savenum]

        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                flag = True
                if clen > 0:
                    for c in contain:
                        if not c in str(fil):
                            flag = False
                if flag:
                    fil = os.path.join(targetDir, fil)
                    if os.path.isfile(fil):
                        if self.isBeforeMins(fil, mins):
                            os.remove(fil)

    def isBeforeMins(self, fil, mins):
        \'\'\'
        判断一个文件的最后修改时间距离当前时间,是否超过指定的分钟数
        \'\'\'
        if os.path.isfile(fil):
            mtfile = os.path.getmtime(fil)
            tnow = time.localtime()
            sec = self.timeminustoS(mtfile, tnow)
            mms = round(sec / 60)
            mins = eval(mins)
            if mms - mins > 0:
                return True
            return False

    def isMorthanSize(self, fil, siz):
        \'\'\'
        判断一个文件是否超过指定的大小,单位为M
        \'\'\'
        if os.path.isfile(fil):
            filsiz = os.path.getsize(fil)
            fsiz = eval(siz)*1024*1024
            if filsiz - fsiz > 0:
                return True
            return False

    def rmfilMorthanSize(self, targetDir, siz, *contain):
        \'\'\'
        删除指定目录下超过指定大小的文件,不删除目录,也不递归目录

        删除/backup/rman/backdb目录下超过2G大小的文件
        rmfileFrmNow(\'/backup/rman/backdb\', 2*1024)

        删除/backup/rman/backdb目录下超过10G大小,包含_MEMDB_20字符的文件
        rmfileFrmNow(\'/backup/rman/backdb\', 10*1024,\'_MEMDB_20\')

        删除/backup/rman/backdb目录下超过3G大小,同时包含back_full_、_MEMDB_20字符的文件
        rmfileFrmNow(\'/backup/rman/backdb\', 3*1024,\'back_full\',\'_MEMDB_20\')
        \'\'\'
        clen = len(contain)
        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                flag = True
                if clen > 0:
                    for c in contain:
                        if not c in str(fil):
                            flag = False
                if flag:
                    fil = os.path.join(targetDir, fil)
                    if os.path.isfile(fil):
                        if self.isMorthanSize(fil, siz):
                            os.remove(fil)


    def mkdir(self, dr, *mod):
        # import stat
        s1 = str(dr)
        if s1.startswith("~/"):
            s1 = s1[1:]
            homedir = os.environ[\'HOME\']
            s1 = \'%s%s\' % (homedir, s1)
        if not os.path.exists(s1):
            cmd_shell = \'mkdir -p %s\' % (s1)
            # os.mkdir(dir)  这个命令不识别linux 用户home目录“~”符号
            self.exes(cmd_shell)
            p_num = len(mod)
            chmod = \'chmod -R 755 %s\' % (s1)
            if p_num == 1:
                chmod = \'chmod -R %s %s\' % (mod[0], s1)
                self.exes(chmod)
                # os.chmod(dir, mod)
            else:
                # os.chmod(dir, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH)   该行会抛出异常 TypeError: coercing to Unicode: need string or buffer, builtin_function_or_method found
                self.exes(chmod)
            return s1

    def mknod(self, filename, *mod):
        s1 = str(filename)
        if s1.startswith("~/"):
            s1 = s1[1:]
            homedir = os.environ[\'HOME\']
            s1 = \'%s%s\' % (homedir, s1)
        if not os.path.exists(s1):
            cmd_shell = \'touch %s\' % (s1)
            self.exes(cmd_shell)
            p_num = len(mod)
            chmod = \'chmod -R 644 %s\' % (s1)
            if p_num == 1:
                chmod = \'chmod -R %s %s\' % (mod[0], s1)
                self.exes(chmod)
            else:
                self.exes(chmod)
        return s1

    def get_filist_bytime(self,file_path):
        dir_list = os.listdir(file_path)
        if not dir_list:
            return
        else:
            # 注意,这里使用lambda表达式,将文件按照最后修改时间顺序升序排列
            # os.path.getmtime() 函数是获取文件最后修改时间
            # os.path.getctime() 函数是获取文件最后创建时间
            dir_list = sorted(dir_list, key=lambda x: os.path.getmtime(os.path.join(file_path, x)))
            return dir_list

    def shichaByMin(self, t1, t2):
        \'\'\'
        计算以下三种类型之间的时间差
    time.time()-浮点型,time.localtime()-struct_time型、datetime.now()-datetime型
        \'\'\'
        t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S")
        t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S")
        return round(self.total_seconds(t2 - t1) / 60)

    def total_seconds(self, time_delta):
        \'\'\'
        python 2.6 has not total_seconds method
        \'\'\'
        return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6


class Properties:
    # 脚本默认配置路径
    oscheck_properties = \'/tmp/.python-eggs/.oscheck.properties\'
    file_name = \'\'
    oscmd = OSCmd()

    def __init__(self, file_name=\'.oscheck.properties\'):
        \'\'\'
        将配置文件转化为列表,以列表的读取方式进行值的替换
        \'\'\'
        dr = \'/tmp/.python-eggs/\'
        if not os.path.exists(dr):  # 当目录以~开头时该行永为True,但下面的语句会自判断
            self.oscmd.mkdir(dr,\'777\')

        if not os.path.exists(file_name):  # 当目录以~开头时该行永为True,但下面的语句会自判断
            file_name = self.oscmd.mknod(file_name,\'666\')
            # os.mknod(file_name)   ~

        self.file_name = file_name
        self.properties = {}
        try:
            fopen = open(self.file_name, \'r\')
            for line in fopen:
                line = line.strip()
                if line.find(\'=\') > 0 and not line.startswith(\'#\'):
                    strs = line.split(\'=\')
                    self.properties[strs[0].strip()] = strs[1].strip()
        except Exception as e:
            raise e
        else:
            fopen.close()

    def has_key(self, key):
        return key in self.properties

    def keys(self, key):
        return self.properties.keys();

    def get(self, key, default_value=\'\'):
        if key in self.properties:
            return self.properties[key]
        return default_value

    def put(self, key, value):
        self.properties[key] = value
        self.replace_property(self.file_name, key + \'=.*\', key + \'=\' + str(value), True)

    def putjson(self, key, values):
        \'\'\'
        以json的形式存储prop中的value,可以处理一些特殊字符,比如=
        \'\'\'
        pj = {key: values}
        value = json.dumps(pj)
        self.put(key, value)

    def getjsonvalue(self, key, default_value=\'\'):
        \'\'\'
        以json的形式存储prop中的value,可以处理一些特殊字符,比如=
        \'\'\'
        if key in self.properties:
            value = self.get(key)
            valuedict = json.loads(value)
            return valuedict[key]
        return default_value

    def toasc(self, ss):
        asclst = []
        for em in bytearray(ss):
            asclst.append(em)
        return asclst

    def asctostr(self, asclst):
        ss = \'\'
        for em in asclst:
            ss += str(chr(em))
        return ss

    def putpwd(self, key, value):
        \'\'\'
        字符串以ASCII码存储
        \'\'\'
        asclst = self.toasc(value)
        self.putjson(key, asclst)

    def getpwd(self, key):
        asclst = self.getjsonvalue(key)
        pwd = self.asctostr(asclst)
        return pwd

    def replace_property(self, file_name, from_regex, to_str, append_on_not_exists=True):
        \'\'\'
        新写入数据后,替换文件以永久保存
        :param file_name:
        :param from_regex:
        :param to_str:
        :param append_on_not_exists:
        :return:
        \'\'\'
        import tempfile
        tmpfile = tempfile.TemporaryFile()

        if os.path.exists(file_name):
            r_open = open(file_name, \'r\')
            pattern = re.compile(r\'\' + from_regex)
            found = None
            for line in r_open:
                if pattern.search(line) and not line.strip().startswith(\'#\'):
                    found = True
                    line = re.sub(from_regex, to_str, line)
                if pyversion == 3:
                    line = line.encode(encoding="utf-8")
                tmpfile.write(line)
            if not found and append_on_not_exists:
                to_str = \'\\n\' + to_str
                if pyversion == 3:
                    to_str = to_str.encode(encoding="utf-8")
                tmpfile.write(to_str)
            r_open.close()
            tmpfile.seek(0)

            content = tmpfile.read()

            if os.path.exists(file_name):
                os.remove(file_name)

            w_open = open(file_name, \'w\')
            if pyversion == 3:
                content = content.decode(\'utf-8\')
            w_open.write(content)
            w_open.close()
            tmpfile.close()
        else:
            print("file %s not found" % (file_name))


def parsePro(file_name=\'/tmp/.python-eggs/.oscheck.properties\'):
    return Properties(file_name)


class Log:
    def __init__(self):
        \'\'\'
        Constructor
        \'\'\'

    logfile = os.path.join(localpath, \'scheck.log\')

    def setlog(self, logfile):
        self.logfile = logfile

    def log(self, strs, sp=\'a\'):
        open(self.logfile, sp).write(\'%s\\n\' % (strs))

    def inserttime(self, sp=\'a\'):
        now = datetime.now()
        strs = now.strftime(\'%Y-%m-%d %H:%M:%S\')
        oscmd = OSCmd()
        oscmd.mknod(self.logfile, \'666\')
        open(self.logfile, sp).write(\'%s\\n\' % (strs))

    def frmMsg(self,content):
        \'\'\'
        格式化信息输出
        :param content:
        :return:
        \'\'\'
        curtime = datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\')
        hname = socket.gethostname()
        IPADDR = socket.gethostbyname(hname)
        cnt = "内容:{0} <br/>时间:{1} <br/>信息来自 {2} {3}".format(content,curtime,hname,IPADDR)
        return cnt


class WeiXin(object):
    \'\'\'
    发送微信
    \'\'\'
    token_url = \'https://qyapi.weixin.qq.com/cgi-bin/gettoken\'
    cropmsg ={
        \'corpid\' : \'wwe***2ed*********\',
        \'corpsecret\' : \'Mgyi*****ahx3O-******HkLfg\' 
        }
    sendmsg = {}
    access_token_key="weixin_access_token"
    time_token_key="weixin_tokenkey_gettime"
    oscmd = OSCmd()
    prop = parsePro()
    log = Log()

    def __init__(self):
        \'\'\'
        Constructor
        \'\'\'

    def formatContent(self,content):
        cnt=self.log.frmMsg(content)
        return cnt
        
    def setMsg(self,param):
        arg_num=len(param)
        content = param[0]
        content = self.formatContent(content)
        if pyversion == 2:
            content = content.decode(\'utf-8\')


        if arg_num == 1 :
            touser="@all"
            agentid="1000009"
        elif arg_num == 2 :
            touser=param[1]
            agentid="1000009"
        elif arg_num == 3 :
            touser=param[1]
            agentid=param[2]
            
        self.sendmsg = {
              "touser":touser,
              "agentid":agentid,
              "msgtype": "text",
              "text":{
                      "content":content
                      }
              }

    def updateToken(self):
        access_token_response = self.geturl(self.token_url, self.cropmsg)
        access_token = (json.loads(access_token_response)[\'access_token\']).encode(\'utf-8\')
        self.prop.put(self.access_token_key, access_token)
        self.prop.put(self.time_token_key, datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\'))


    def get_access_token(self):
        \'\'\'
        获取访问凭证,经过编码的一串字符串数据
        每两个小时取一次即可
        \'\'\'
        if not self.prop.has_key(self.time_token_key):
            self.updateToken()
        else:
            curtime = datetime.now().strftime(\'%Y-%m-%d %H:%M:%S\')
            oldtime = self.prop.get(self.time_token_key)
            shicha = self.oscmd.shichaByMin(oldtime,curtime)
            if shicha > 110:
                self.updateToken()

        return self.prop.get(self.access_token_key)
    

    def encodeurl(self,dic):
        \'\'\'
        将字典转换为url参数传值方式,key1=value1&key2=value2
        \'\'\'
        data = \'\'
        for k,v in dic.items():
            data += \'%s=%s%s\' % (k,v,\'&\')
        return data
    
    
    def geturl(self,url,data):
        \'\'\'
        data为字典类型的参数,
        返回类型<type \'unicode\'>,json
        \'\'\'
        data = self.encodeurl(data)
        response = urllib2.urlopen(\'%s?%s\' % (url,data))
        return response.read().decode(\'utf-8\')
                
    def posturl(self,url,data,isjson = True):
        \'\'\'
        发送json数据
        返回类型<type \'unicode\'>,json
        \'\'\'
        if isjson:
            data = json.dumps(data)    #dict
            if pyversion == 3 :
                data = data.encode(encoding="utf-8")
            response = urllib2.urlopen(url,data)
            return response.read().decode(\'utf-8\')
        
        
    def sampleSend(self,content):
        self.setMsg(content)
        # 获取企业访问凭证
        access_token = self.get_access_token()
        sendmsg_url = \'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s\' % access_token 
        print (self.posturl(sendmsg_url,self.sendmsg))
        
    def showExample(self):
        if len(sys.argv) == 4:
            touser,notuse,content = sys.argv[1:] 
        else:
            print (\'error segments, now exit\')
            sys.exit()
        
    def send(self,param):
        ll = Log()
        arg_num=len(param)
        if arg_num >=1:
            self.sampleSend(param)
            #ll.log(\'%s\'%(param))
            
        return \'\'
    
    def showUsing(self):
        print (\'There must be more than 1 params. 

以上是关于python发送微信的主要内容,如果未能解决你的问题,请参考以下文章

脱离微信客户端发送微信消息

微信小程序代码片段分享

python调用企业微信接口发送群聊消息代码参考

python调用企业微信api实现发送群机器人消息实例

Python 微信公众号发送消息

python实现通过企业微信发送消息