python版 定时任务机制

Posted 谢礼森

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python版 定时任务机制相关的知识,希望对你有一定的参考价值。

定时任务的原理
服务器执行一个python脚本
这个脚本,循环执行配置的定时任务地址

Python请求地址, 该地址应该返回, 下次再来执行的秒数. 也就是任务的频率
比如任务希望每3秒执行一次, 那么任务结束后,应该返回一个3的数字

python脚本拿到任务返回的数字, 算出下次执行任务的时间. 当时间条件满足是, python脚本会继续访问该任务

不同的任务, 直接修改 init里面的配置就可以了
python脚本如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
import urllib.request
import os,stat

class MyThread(object):
    ‘‘‘
    多线程
    ‘‘‘
    def __init__(self, func_list=None,timeout=5):
        self.threads = []
        self.rs = {}
        self.timeout = timeout
        self.func_list = func_list
        self.start()

    #封装的线程函数
    def trace_func(self, func, name , *args, **kwargs):
        ret = func(*args, **kwargs)
        self.rs[name] = ret

    #执行多线程
    def start(self):
        for v in self.func_list:
            if v["args"]:
                lists = []
                lists.append(v["func"])
                lists.append(v["name"])
                for arg in v["args"]:
                    lists.append(arg)
                    tuples = tuple(lists)
                    t = threading.Thread(target=self.trace_func, args=tuples)
            else:
                t = threading.Thread(target=self.trace_func, args=(v["func"],v["name"],))
            self.threads.append(t)
        for t in self.threads:
            t.start()
        for t in self.threads:
            t.join()
            #t.join(self.timeout)

‘‘‘
执行任务
‘‘‘
def task(url, last_time):
    #当前时间, 最后执行时间
    cur_time  = int(time.time())
    last_time = int(last_time)
    if last_time==0:
        last_time = cur_time

    #满足条件就执行任务
    if cur_time>=last_time:
        print("请求:%s"%(url))
        try:
            req = urllib.request.urlopen(url)
            content = req.read()
            content = str(content, ‘utf-8‘)
            content = content.strip()
            if content.isdigit():
                last_time += int(content)
            else:
                last_time += 1000
        except:
            print(‘发生了异常, 重置文件‘)
            init()
    return last_time

‘‘‘
多线程调用定时任务
‘‘‘
def main():
    #读取文件
    with open("config.txt",‘r‘) as f:
        lines = f.readlines()
        f.close()

    #多线程执行
    func_list = []
    for v in lines:
        v = v.split(‘|‘)
        v[1] = int(v[1])
        func_list.append({"func":task,"args":(v[0], v[1]), "name":v[0]})
    mt = MyThread(func_list)
    d = mt.rs
    #重新写入文件
    content = ‘‘
    for k in d:
        content += "%s|%s
" % (k,str(d[k]))

    with open("config.txt",‘w‘) as f:
        f.write(content)
        f.close()

‘‘‘
初始化要执行的定时任务
‘‘‘
def init():
    urls = [
        ‘http://admin.yqxv1.local/task/withdraw‘,
        ‘http://baidu.com‘
    ]
    content = ‘‘
    for v in urls:
        content += "%s|%s
" % (v,0)
    with open("config.txt",‘w+‘) as f:
        f.write(content)
        f.close()
    os.chmod("config.txt",stat.S_IRWXU)

init()
while True:
    main()




以上是关于python版 定时任务机制的主要内容,如果未能解决你的问题,请参考以下文章

44. Python Celery多实例 定时任务

5种Python使用定时调度任务的方式

jekins配置python脚本定时任务

python celery多worker多队列定时任务

高级技巧之使用定时任务

Jenkins可持续集成Python自动化脚本(Windows版)