blog/_posts/2022-09-21-cron.md

4.7 KiB
Raw Blame History

layout title tags
post 使用Python制作可热载的定时调度器
Python
程序
标志

定时任务用CRON难道不够吗

起因

最近因为写的Python脚本比较多另外也有好多脚本都是定时运行的脚本然后Linux自带的CRON可能不是很直观不然为什么那么多人开发CRON表达式生成器😂另外CRON不能和脚本绑定到一起也不能在Windows上使用所以就想用Python来实现一个。
当然如果只是想用Python为一个程序做一个定时任务还是很简单的不想用CRON也能直接用一个死循环if判断时间来做比如想要每小时什么时候执行那就不停的判断直到那个时间到了以后然后去执行就好了。不过很多个脚本都用这种方法的话就显得太凌乱了想要停止哪个脚本还不太好整所以写一个定时调度器来统一管理所有的定时脚本也许会更好一些。

探索过程

其实我最开始是想着把CRON表达式放到脚本里然后让管理器像配置文件那样读取它不过我自己解析CRON表达式有点麻烦一时也没找到好的办法。不过在找这个东西的时候找到了一个有意思的库叫做schedule用pip就能安装。它可以用类似自然语言的语法结构去写定时语句看起来很有意思于是我就打算用这个库来写我的调度器。
写它不算很复杂这个库还是挺方便理解的比据说Python常用的什么APScheduler那个库好用多了那个玩意看着就不怎么人性化。功能很快就写好了但是有个问题就是既然我要写一个可以热载自动重加载的调度器用什么办法监视文件比较好呢最开始我实现的时候是想着用列出目录和stat方法来读取文件的元数据然后轮询如果内容有变化就进行重载。不过这样写起来也麻烦准确来说也不算准确而且性能也不怎么好还要轮询文件一多整个程序的执行效率就会变低。然后我想着之前我用的Django好像就有这样的功能它到底用的是什么方法呢一般我们运行Django项目的时候它第一句会写“Watching for file changes with StatReloader”那看来StatReloader就是它用来监视文件的模块听这名字怎么和我之前想的差不多😂另外也没有找到叫这个名字的库所以就算了我还是搜一下找找别的库吧。后来我找到了一个看起来不错的库叫做watchdog看起来好像用法也不算很复杂而且据说是用的内核的什么东西来监测性能比轮询好很多所以我就整了这个库感觉效果还不错可靠性也很好文件一有修改程序就会检测到然后进行重载。

代码

管理器

import threading
import time
import schedule
import os
import importlib
from watchdog.observers import Observer
from watchdog.events import *

reload_status = [0]

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
    
class FileEventHandler(FileSystemEventHandler):
    def __init__(self, reload_status):
        self.reload_status = reload_status
    
    def on_any_event(self, event):
        if not event.is_directory:
            self.reload_status[0] = 1

observer = Observer()
event_handler = FileEventHandler(reload_status)
observer.schedule(event_handler, "tasks", recursive=False)
observer.start()
while True:
    reload_status[0] = 0
    taskList = os.listdir("tasks")
    for task in taskList:
        if "__" in task or task.rsplit(".", 1)[-1] != "py":
            continue
        try:
            importlib.reload(importlib.import_module("tasks." + task.split(".")[0])).run(run_threaded, schedule)
        except:
            print(f"Task {task.split('.')[0]} import failure")

    while True:
        if reload_status[0]:
            print("Task change, reloading...")
            time.sleep(1)
            schedule.clear()
            break
        else:
            schedule.run_pending()
        time.sleep(1)

被管理脚本示例

def run(run_threaded, schedule):
    schedule.every().second.do(run_threaded, job)

def job():
    print("The job is running.")

脚本应该放到管理器所在文件夹下的“tasks”文件夹具体定时的写法可以看看schedule官方示例

感想

感觉程序果然还是写的越简单越好功能也是越单一越好。据说APScheduler是用Python实现的像Java的Quartz那样的东西看着就很难受像我这个50行写的管理器看起来还挺不错的吧😁