参考文档:https://schedule.readthedocs.io/en/stable/
schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期
或者自定义事件执行时间
,便于执行轻量级任务。
安装
模块使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import time import schedule import datetime
def job(): print(datetime.datetime.now(),'I am working...')
schedule.every(10).seconds.do(job) schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) schedule.every(5).to(10).minutes.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
|
PS: 函数带有参数时在schedule
模块添加:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import schedule import time
def job(name): print("her name is : ", name)
name = xiaona schedule.every(10).minutes.do(job, name) schedule.every().hour.do(job, name) schedule.every().day.at("10:30").do(job, name) schedule.every(5).to(10).days.do(job, name) schedule.every().monday.do(job, name) schedule.every().wednesday.at("13:15").do(job, name)
while True: schedule.run_pending() time.sleep(1)
|
并行执行
schedule方法是串行的,也就是说,如果各个任务之间时间不冲突,那是没问题的;如果时间有冲突的话,会串行的执行命令。可以改为多线程的方式执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import schedule import time import threading
def job(): print("I'm working... in job1 start") time.sleep(15) print("I'm working... in job1 end")
def job2(): print("I'm working... in job2")
def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start()
schedule.every(10).seconds.do(run_threaded,job) schedule.every(10).seconds.do(run_threaded,job2)
while True: schedule.run_pending() time.sleep(1)
|
局限性
1.需要定时运行的函数job不应当是死循环类型的,也就是说,这个线程应该有一个执行完毕的出口。一是因为线程万一僵死,会是非常棘手的问题;二是下一次定时任务还会开启一个新的线程,执行次数多了就会演变成灾难。
2.如果schedule的时间间隔设置得比job执行的时间短,一样会线程堆积形成灾难,也就是说,我job的执行时间是1个小时,但是我定时任务设置的是5分钟一次,那就会一直堆积线程。
由于Schedule 模块中存在的问题,可以采用其它方式实现定时执行代码。本人只是选的最简单一种来试试水😂!
参考:Python3-定时任务四种实现方式
联系作者