精华内容
下载资源
问答
  • 任务管理系统 使用Python构建的任务管理系统 :snake: Django框架,Django REST框架,ReactJS和TypeScript
  • python 定时任务框架APScheduler

    千次阅读 2019-04-08 16:37:52
    它是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。 2、 安装 使用 pip 包管理...
    1、 简介

    APScheduler的全称是Advanced Python Scheduler。它是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。

    2、 安装

    使用 pip 包管理工具安装 APScheduler 是最方便快捷的。

    pip install APScheduler

    3 、使用步骤

    APScheduler 使用起来还算是比较简单。运行一个调度任务只需要以下三部曲。

    新建一个 schedulers (调度器) 。
    添加一个调度任务(job stores)。
    运行调度任务。

    下面是执行定时任务的简单示例代码

    #!/usr/bin/env python
    # coding:utf-8
    
    import time
    from apscheduler.schedulers.blocking import BlockingScheduler
    import logging
    
    LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
    logging.basicConfig(filename='mypython.log', level=logging.INFO, format=LOG_FORMAT)
    
    today = time.strftime('%Y%m%d', time.localtime(time.time()))
    nowDateTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    
    global num
    num = 0
    
    class myAP:
        def __init__(self):
            print("hello")
    
    
    def sayHello():
        global num
        num = num + 1
        print(nowDateTime+'  任务一  hello word %d' % num)
    
    def sayHello11():
        global num
        num = num + 1
        print(nowDateTime+'  任务二 o(* ̄︶ ̄*)o hello word %d' % num)
    
    if __name__ == "__main__":
        # BlockingScheduler
        scheduler = BlockingScheduler()
        # scheduler.add_job(sayHello, 'cron', day_of_week='0-6', hour=19, minute=50)
        scheduler.add_job(sayHello, 'interval', seconds=2)
        scheduler.add_job(sayHello11, 'interval', seconds=3)
        scheduler.start()
    

    执行结果如图
    定时任务执行结果

    4、 基础组件

    APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。

    schedulers(调度器)
    它是任务调度器,属于控制器角色。它配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。

    triggers(触发器)
    描述调度任务被触发的条件。不过触发器完全是无状态的。

    job stores(作业存储器)
    任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中,任务中的数据序列化后保存到持久化数据库,从数据库加载后又反序列化。

    executors(执行器)
    负责处理作业的运行,它们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

    4.1 schedulers(调度器)

    我个人觉得 APScheduler 非常好用的原因。它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:

    BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
    BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。
    AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
    GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 GeventExecutor 配合使用。
    TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
    TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。
    QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。

    4.2 triggers(触发器)

    APScheduler 有三种内建的 trigger:
    1)date 触发器
    date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:

    参数 说明
    run_date (datetime 或 str) 作业的运行日期或时间
    timezone (datetime.tzinfo 或 str) 指定时区

    date 触发器使用示例如下:

    from datetime import datetime
    from datetime import date
    from apscheduler.schedulers.background import BackgroundScheduler
    
    def job_func(text):
        print(text)
    
    scheduler = BackgroundScheduler()
    # 在 2017-12-13 时刻运行一次 job_func 方法
    scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
    # 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
    scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
    # 在 2017-12-13 14:00:01 时刻运行一次 job_func 方法
    scheduler .add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text'])
    scheduler.start()
    
    
    interval 触发器

    固定时间间隔触发。interval 间隔调度,参数如下:

    参数 说明
    weeks (int) 间隔几周
    days (int) 间隔几天
    hours (int) 间隔几小时
    minutes (int) 间隔几分钟
    seconds (int) 间隔多少秒
    start_date (datetime 或 str) 开始日期
    end_date (datetime 或 str) 结束日期
    timezone (datetime.tzinfo 或str) 时区
    cron 触发器

    在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。
    我们先了解 cron 参数:

    参数 说明
    year (int 或 str) 年,4位数字
    month (int 或 str) 月 (范围1-12)
    day (int 或 str) 日 (范围1-31
    week (int 或 str) 周 (范围1-53)
    day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者mon,tue,wed,thu,fri,sat,sun)
    hour (int 或 str) 时 (范围0-23)
    minute (int 或 str) 分 (范围0-59)
    second (int 或 str) 秒 (范围0-59)
    start_date (datetime 或 str) 最早开始日期(包含)
    end_date (datetime 或 str) 最晚结束时间(包含)
    timezone (datetime.tzinfo 或str) 指定时区

    cron 触发器使用示例如下:

    import datetime
    from apscheduler.schedulers.background import BackgroundScheduler
    
    def job_func(text): print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
    scheduler = BackgroundScheduler() 
    
    # 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 job_func 任务
    scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3') 
    scheduler.start()
    
    
    4.3 作业存储(job store)

    该组件是对调度任务的管理。
    1)添加 job
    有两种添加方法,其中一种上述代码用到的 add_job(), 另一种则是scheduled_job()修饰器来修饰函数。

    这个两种办法的区别是:第一种方法返回一个 apscheduler.job.Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。

    第二种添加任务方式的例子:

    @scheduler.scheduled_job(job_func, 'interval', minutes=2) 
    def job_func(text): 
        print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) 
        scheduler = BackgroundScheduler() 
        scheduler.start()
    
    

    2)移除 job
    移除 job 也有两种方法:remove_job() 和 job.remove()。
    remove_job() 是根据 job 的 id 来移除,所以要在 job 创建的时候指定一个 id。
    job.remove() 则是对 job 执行 remove 方法即可

    scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
    scheduler.remove_job(job_one)
    job = add_job(job_func, 'interval', minutes=2, id='job_one')
    job.remvoe()
    

    3)获取 job 列表
    通过 scheduler.get_jobs() 方法能够获取当前调度器中的所有 job 的列表

    修改 job
    如果你因计划改变要对 job 进行修改,可以使用Job.modify() 或者 modify_job()方法来修改 job 的属性。但是值得注意的是,job 的 id 是无法被修改的。

    scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
    scheduler.start()
    # 将触发时间间隔修改成 5分钟
    scheduler.modify_job('job_one', minutes=5)
    
    job = scheduler.add_job(job_func, 'interval', minutes=2)
    # 将触发时间间隔修改成 5分钟
    job.modify(minutes=5)
    

    5)关闭 job
    默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将 wait 选项设置为 False。

    scheduler.shutdown()
    scheduler.shutdown(wait=false)
    
    4.4 执行器(executor)

    执行器顾名思义是执行调度任务的模块。最常用的 executor 有两种:ProcessPoolExecutor 和 ThreadPoolExecutor

    下面是显式设置 job store(使用mongo存储)和 executor 的代码的示例。

    from pymongo import MongoClient
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.jobstores.mongodb import MongoDBJobStore
    from apscheduler.jobstores.memory import MemoryJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    
    
    def my_job():
        print 'hello world'
    host = '127.0.0.1'
    port = 27017
    client = MongoClient(host, port)
    
    jobstores = {
        'mongo': MongoDBJobStore(collection='job', database='test', client=client),
        'default': MemoryJobStore()
    }
    executors = {
        'default': ThreadPoolExecutor(10),
        'processpool': ProcessPoolExecutor(3)
    }
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    }
    scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
    scheduler.add_job(my_job, 'interval', seconds=5)
    
    try:
        scheduler.start()
    except SystemExit:
        client.close()
    
    
    展开全文
  • Python 定时任务框架

    2021-03-05 10:20:19
    一、APScheduler 简介 在实际开发中我们经常会碰上一些重复性或...但手动管理并不是一个很好的选择,如果我们需要有十几个不同的定时任务需要管理,那么每次通过人工来进行干预未免有些笨拙,那这时候就真的是「人工智

    一、APScheduler 简介

    在实际开发中我们经常会碰上一些重复性或周期性的任务,比如像每天定时爬取某个网站的数据、一定周期定时运行代码训练模型等,类似这类的任务通常需要我们手动来进行设定或调度,以便其能够在我们设定好的时间内运行。

    在 Windows 上我们可以通过计划任务来手动实现,而在 Linux 系统上往往我们会用到更多关于 crontab 的相关操作。但手动管理并不是一个很好的选择,如果我们需要有十几个不同的定时任务需要管理,那么每次通过人工来进行干预未免有些笨拙,那这时候就真的是「人工智能」了。

    所以将这些定时任务的调度代码化才是能够让我们很好地从这种手动管理的纯人力操作中解脱出来。

    在 Python 生态中对于定时任务的一些操作主要有那么几个:

    1. schedule:第三方模块,该模块适合比较轻量级的一些调度任务,但却不适用于复杂时间的调度

    2. APScheduler:第三方定时任务框架,是对 Java 第三方定时任务框架Quartz 的模仿与移植,能提供比 schedule 更复杂的应用场景,并且各种组件都是模块化,易于使用与二次开发。

    3. Celery Beat:属于 celery 这分布式任务队列第三方库下的一个定时任务组件,如果使用需要配合 RabbitMQ 或 Redis 这类的消息队列套件,需要花费一定的时间在环境搭建上,但在高版本中已经不支持 Windows。

    所以为了满足能够相对复杂的时间条件,又不需要在前期的环境搭建上花费很多时间的前提下,选择 APScheduler 来对我们的调度任务或定时任务进行管理是个性价比极高的选择。而本文主要会带你快速上手有关 APScheduler 的使用。

    二、APScheduler 概念与组件

    虽然说官方文档上的内容不是很多,而且所列举的 API 不是很多,但这侧面也反映了这一框架的简单易用。所以在使用 APScheduler 之前,我们需要对这个框架的一些概念简单了解,主要有那么以下几个:

    • 触发器(trigger)

    • 任务持久化(job stores)

    • 执行器(executor)

    • 调度器(scheduler)

    1.触发器(trigger)

    所谓的触发器就是用以触发定时任务的组件,在 APScheduler 中主要是指时间触发器,并且主要有三类时间触发器可供使用:

    • date:日期触发器。日期触发器主要是在某一日期时间点上运行任务时调用,是 APScheduler 里面最简单的一种触发器。所以通常也适用于一次性的任务或作业调度。

    • interval:间隔触发器。间隔触发器是在日期触发器基础上扩展了对时间部分,比如时、分、秒、天、周这几个部分的设定。是我们用以对重复性任务进行设定或调度的一个常用调度器。设定了时间部分之后,从起始日期开始(默认是当前)会按照设定的时间去执行任务。

    • croncron 表达式触发器。cron 表达式触发器就等价于我们 Linux 上的 crontab,它主要用于更复杂的日期时间进行设定。但需要注意的是,APScheduler 不支持 6 位及以上的 cron 表达式,最多只支持到 5 位。

    2.任务持久化(job stores)

    任务持久化主要是用于将设定好的调度任务进行存储,即便是程序因为意外情况,如断电、电脑或服务器重启时,只要重新运行程序时,APScheduler 就会根据对存储好的调度任务结果进行判断,如果出现已经过期但未执行的情况会进行相应的操作。

    APScheduler 为我们提供了多种持久化任务的途径,默认是使用 memory 也就是内存的形式,但内存并不是持久化最好的方式。最好的方式则是通过像数据库这样的载体来将我们的定时任务写入到磁盘当中,只要磁盘没有损坏就能将数据给恢复。

    APScheduler 支持的且常用的数据库主要有:

    • sqlalchemy 形式的数据库,这里就主要是指各种传统的关系型数据库,如 MySQL、PostgreSQL、SQLite 等。

    • mongodb 非结构化的 Mongodb 数据库,该类型数据库经常用于对非结构化或版结构化数据的存储或操作,如 JSON。

    • redis 内存数据库,通常用作数据缓存来使用,当然通过一些主从复制等方式也能实现当中数据的持久化或保存。

    通常我们可以在创建 Scheduler 实例时创建,或是单独为任务指定。配置的方式相对简单,我们只需要指定对应的数据库链接即可。

    3.执行器(executor)

    执行器顾名思义就是执行我们任务的对象,在计算机内通常要么是 CPU 调度任务,要么是单独维护一个线程来运行任务。所以 APScheduler 里的执行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 这样的线程池和进程池两种。

    当然如果是和协程或异步相关的任务调度,还可以使用对应的 AsyncIOExecutorTwistedExecutor 和 GeventExecutor 三种执行器。

    4.调度器(scheduler)

    调度器的选择主要取决于你当前的程序环境以及 APScheduler 的用途。根据用途的不同,APScheduler 又提供了以下几种调度器:

    • BlockingScheduler:阻塞调度器,当程序中没有任何存在主进程之中运行东西时,就则使用该调度器。

    • BackgroundScheduler:后台调度器,在不使用后面任何的调度器且希望在应用程序内部运行时的后台启动时才进行使用,如当前你已经开启了一个 Django 或 Flask 服务。

    • AsyncIOSchedulerAsyncIO 调度器,如果代码是通过 asyncio 模块进行异步操作,使用该调度器。

    • GeventSchedulerGevent 调度器,如果代码是通过 gevent 模块进行协程操作,使用该调度器

    • TornadoSchedulerTornado 调度器,在 Tornado 框架中使用

    • TwistedSchedulerTwisted 调度器,在基于 Twisted 的框架或应用程序中使用

    • QtSchedulerQt 调度器,在构建 Qt 应用中进行使用。

    通常情况下如果不是和 Web 项目或应用集成共存,那么往往都首选 BlockingScheduler 调度器来进行操作,它会在当前进程中启动相应的线程来进行任务调度与处理;反之,如果是和 Web 项目或应用共存,那么需要选择 BackgroundScheduler 调度器,因为它不会干扰当前应用的线程或进程状况。

    基于对以上的概念和组件认识,我们就能基本上摸清 APScheduler 的运行流程:

    1. 设定调度器(scheduler)用以对任务的调度与安排进行全局统筹

    2. 对相应的函数或方法上设定相应的触发器(trigger),并添加到调度器中

    3. 如有任务持久化(job stores)需要则需要设定对应的持久化层,否则默认使用内存存储任务

    4. 当触发器被触发时,就将任务交由执行器(executor)进行执行

     

    三、APScheduler 快速上手

    虽然 APScheduler 里面的概念和组件看起来有点多,但在使用上并不算很复杂,我们可以通过本节的示例就能够很快使用。

    1.选择对应的 scheduler

    在使用之前我们需要先实例化一个 scheduler 对象,所有的 scheduler 对象都被放在了 apscheduler.schedulers 模块下,我们可以直接通过查看 API 文档或者借助 IDE 补全的提示来获取相应的 scheduler 对象。

    这里我直接选取了最基础的 BlockingScheduler

    # main.py
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    scheduler = BlockingScheduler()
    

    2.配置 scheduler

    对于 scheduler 的一些配置我们可以直接在实例化对象时就进行配置,当然也可以在创建实例化对象之后再进行配置。

    实例化时进行参数配置:

    # main.py
    from datetime import datetime
    
    from apscheduler.executors.pool import ThreadPoolExecutor
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    # 任务持久化 使用 SQLite
    jobstores = {
        'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db')
    }
    # 执行器配置
    executors = {
        'default': ThreadPoolExecutor(20),
    }
    # 关于 Job 的相关配置,见官方文档 API
    job_defaults = {
        'coalesce': False,
        'next_run_time': datetime.now()
    }
    scheduler = BlockingScheduler(
      jobstores = jobstores,
      executors = executors,
      job_defaults = job_defaults,
      timezone = 'Asia/Shanghai'
    )
    

    或是通过 scheduler.configure 方法进行同样的操作:

    scheduler = BlockingScheduler()
    scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')
    

    3.添加并执行你的任务

    创建 scheduler 对象之后,我们需要调用其下的 add_job() 或是 scheduled_job() 方法来将我们需要执行的函数进行注册。前者是以传参的形式指定对应的函数名,而后者则是以装饰器的形式直接对我们要执行的函数进行修饰。

    比如我现在有一个输出此时此刻时间的函数 now()

    from datetime import datetime
    
    def now(trigger):
        print(f"trigger:{trigger} -> {datetime.now()}")
    

    然后我打算每 5 秒的时候运行一次,那我们使用 add_job() 可以这样写:

    if __name__ == '__main__':
        scheduler.add_job(now, trigger = "interval", args = ("interval",), seconds = 5)
        scheduler.start()
    

    在调用 start() 方法之后调度器就会开始执行,并在控制台上看到对应的结果了:

    trigger:interval -> 2021-01-16 21:19:43.356674
    trigger:interval -> 2021-01-16 21:19:46.679849
    trigger:interval -> 2021-01-16 21:19:48.356595
    

    当然使用 @scheduled_job 的方式来装饰我们的任务或许会更加自由一些,于是上面的例子就可以写成这样:

    @scheduler.scheduled_job(trigger = "interval", args = ("interval",), seconds = 5)
    def now(trigger):
        print(f"trigger:{trigger} -> {datetime.now()}")
    
    if __name__ == '__main__':
        scheduler.start()
    

    运行之后就会在控制台看到同样的结果了。

    不过需要注意的是,添加任务一定要在 start() 方法执行前调用,否则会找不到任务或是抛出异常。

    四、将 APScheduler 集成到 Web 项目中

    如果你是正在做有关的 Web 项目且存在一些定时任务,那么得益于APScheduler 由于多样的调度器,我们能够将其和我们的项目结合到一起。

    如果你正在使用 Flask,那么 Flask-APScheduler 这一别人写好的第三方包装库就很适合你,虽然它没有相关的文档,但只要你了解了前面我所介绍的有关于 APScheduler 的概念和组件,你就能很轻易地看懂这个第三方库仓库里的示例代码。

    如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些对任务或作业的增删改查操作,我们可以自己编写一套合适的 API。

    这里我使用的是 FastAPI 这一目前流行的 Web 框架。demo 项目结构如下:

    temp-scheduler
    ├── config.py       # 配置项
    ├── main.py         # API 文件
    └── scheduler.py    # APScheduler 相关设置
    

    1.安装依赖

    这里我们需要的依赖不多,只需要简单几个即可:

    pip install fastapi apscheduler sqlalchemy uvicorn
    

    2.配置项

    如果项目中模块过多,那么使用一个文件或模块来进行统一管理是最好的选择。这里的 config.py 我们主要像 Flask 的配置那样简单设定:

    from apscheduler.executors.pool import ThreadPoolExecutor
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    class SchedulerConfig:
    
        JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")}
        EXECUTORS = {"default": ThreadPoolExecutor(20)}
        JOB_DEFAULTS = {"coalesce": False}
    
        @classmethod
        def to_dict(cls):
            return {
                "jobstores": cls.JOBSTORES,
                "executors": cls.EXECUTORS,
                "job_defaults": cls.JOB_DEFAULTS,
            }
    

    在 SchedulerConfig 配置项中我们可以自己实现一个 to_dict() 类方法,以便我们后续传参时通过解包的方式直接传入配置参数即可。

    3.Scheduler 相关设置

    scheduler.py 模块的设定也比较简单,即设定对应的 scheduler 调度器即可。由于是演示 demo 我还将要定期执行的任务也放在了这个模块当中:

    import logging
    from datetime import datetime
    
    from apscheduler.schedulers.background import BackgroundScheduler
    
    from config import SchedulerConfig
    
    scheduler = BackgroundScheduler()
    logger = logging.getLogger(__name__)
    
    def init_scheduler() -> None:
        # config scheduler
        scheduler.configure(**SchedulerConfig.to_dict())
    
        logger.info("scheduler is running...")
    
        # schedule test
        scheduler.add_job(
            func=mytask,
            trigger="date",
            args=("APScheduler Initialize.",),
            next_run_time=datetime.now(),
        )
        scheduler.start()
    
    def mytask(message: str) -> None:
        print(f"[{datetime.now()}] message: {message}")
    

    在这一部分中:

    • init_scheduler() 方法主要用于在 API 服务启动时被调用,然后对 scheduler 对象的配置以及测试

    • mytask() 则是我们要定期执行的任务,后续我们可以通过 APScheduler 提供的方法来自行添加任务

    4.API 设置

    在 main.py 模块就主要存放着我们由 FastAPI 所构建的相关 API。如果在后续开发时存在多个接口,此时就需要将不同接口放在不同模块文件中,以达到路由的分发与管理,类似于 Flask 的蓝图模式。

    import logging
    import uuid
    from datetime import datetime
    from typing import Any, Dict, Optional, Sequence, Union
    
    from fastapi import FastAPI
    from pydantic import BaseModel
    
    from scheduler import init_scheduler, mytask, scheduler
    
    logger = logging.getLogger(__name__)
    
    app = FastAPI(title="APScheduler API")
    app.add_event_handler("startup", init_scheduler)
    
    class Job(BaseModel):
        id: Union[int, str, uuid.UUID]
        name: Optional[str] = None
        func: Optional[str] = None
        args: Optional[Sequence[Optional[str]]] = None
        kwargs: Optional[Dict[str, Any]] = None
        executor: Optional[str] = None
        misfire_grace_time: Optional[str] = None
        coalesce: Optional[bool] = None
        max_instances: Optional[int] = None
        next_run_time: Optional[Union[str, datetime]] = None
    
    @app.post("/add")
    def add_job(
        message: str,
        trigger: str,
        trigger_args: Optional[dict],
        id: Union[str, int, uuid.UUID],
    ):
        try:
            scheduler.add_job(
                func=mytask,
                trigger=trigger,
                kwargs={"message": message},
                id=id,
                **trigger_args,
            )
        except Exception as e:
            logger.exception(e.args)
            return {"status_code": 0, "message": "添加失败"}
        return {"status_code": 1, "message": "添加成功"}
    
    @app.delete("/delete/{id}")
    def delete_job(id: Union[str, int, uuid.UUID]):
        """delete exist job by id"""
        try:
            scheduler.remove_job(job_id=id)
        except Exception:
            return dict(
                message="删除失败",
                status_code=0,
            )
        return dict(
            message="删除成功",
            status_code=1,
        )
    
    @app.put("/reschedule/{id}")
    def reschedule_job(
        id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]
    ):
        try:
            scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args)
        except Exception as e:
            logger.exception(e.args)
            return dict(
                message="修改失败",
                status_code=0,
            )
        return dict(
            message="修改成功",
            status_code=1,
        )
    
    @app.get("/job")
    def get_all_jobs():
        jobs = None
        try:
            job_list = scheduler.get_jobs()
            if job_list:
                jobs = [Job(**task.__getstate__()) for task in job_list]
        except Exception as e:
            logger.exception(e.args)
            return dict(
                message="查询失败",
                status_code=0,
                jobs=jobs,
            )
        return dict(
            message="查询成功",
            status_code=1,
            jobs=jobs,
        )
    
    @app.get("/job/{id}")
    def get_job_by_id(id: Union[int, str, uuid.UUID]):
        jobs = []
        try:
            job = scheduler.get_job(job_id=id)
            if job:
                jobs = [Job(**job.__getstate__())]
        except Exception as e:
            logger.exception(e.args)
            return dict(
                message="查询失败",
                status_code=0,
                jobs=jobs,
            )
        return dict(
            message="查询成功",
            status_code=1,
            jobs=jobs,
        )
    

    以上代码看起来很多,其实核心的就那么几点:

    1. FastAPI 对象 app 的初始化。这里用到的 add_event_handler() 方法就有点像 Flask 中的 before_first_request,会在 Web 服务请求伊始进行操作,理解为初始化相关的操作即可。

    2. API 接口路由。路由通过 app 对象下的对应 HTTP 方法来实现,如 GETPOSTPUT 等。这里的装饰器用法其实也和 Flask 很类似,就不多赘述。

    3. scheduler 对象的增删改查。从 scheduler.py 模块中引入我们创建好的 scheduler 对象之后就可以直接用来做增删改查的操作:

      1. 增:使用 add_job() 方法,其主要的参数是要运行的函数(或方法)、触发器以及触发器参数等

      2. 删:使用 delete_job() 方法,我们需要传入一个对应任务的 id 参数,用以能够查找到对应的任务

      3. 改:使用 reschedule_job() 方法,这里也需要一个对应任务的 id 参数,以及需要重新修改的触发器及其参数

      4. 查:使用 get_jobs() 和 get_job() 两个方法,前者是直接获取到当前调度的所有任务,返回的是一个包含了 APScheduler.job.Job 对象的列表,而后者是通过 id 参数来查找对应的任务对象;这里我通过底层源码使用 __getstate__() 来获取到任务的相关信息,这些信息我们通过事先设定好的 Job 对象来对其进行序列化,最后将信息从接口中返回。

    5.运行

    完成以上的所有操作之后,我们就可以打开控制台,进入到该目录下并激活我们的虚拟环境,之后运行:

    uvicorn main:app 
    

    之后我们就能在 FastAPI 默认的地址 http://127.0.0.1:8000/docs  中看到关于全部接口的 Swagger 文档页面了:

    图片

    fastapi 集成的 swagger 页面

    之后我们可以直接在文档里面或使用 Postman 来自己进行接口测试即可。

    五、结尾

    本文介绍了有关于 APScheduler 框架的概念及其用法,并进行了简单的实践。

    得益于 APScheduler 的模块化设计才可以让我们更方便地去理解、使用它,并将其运用到我们实际的开发过程中。

    从 APScheduler 目前的 Github 仓库代码以及 issue 来看,作者已经在开始重构 4.0 版本,当中的一些源代码和 API 也有较大的变动,相信在 4.0 版本中将会引入更多的新特性。

    展开全文
  • APScheduler 简介 在实际开发中我们经常会碰上一些重复性或...但手动管理并不是一个很好的选择,如果我们需要有十几个不同的定时任务需要管理,那么每次通过人工来进行干预未免有些笨拙,那这时候就真的是「人工智能」

    APScheduler 简介

    在实际开发中我们经常会碰上一些重复性或周期性的任务,比如像每天定时爬取某个网站的数据、一定周期定时运行代码训练模型等,类似这类的任务通常需要我们手动来进行设定或调度,以便其能够在我们设定好的时间内运行。

    在 Windows 上我们可以通过计划任务来手动实现,而在 Linux 系统上往往我们会用到更多关于 crontab 的相关操作。但手动管理并不是一个很好的选择,如果我们需要有十几个不同的定时任务需要管理,那么每次通过人工来进行干预未免有些笨拙,那这时候就真的是「人工智能」了。

    所以将这些定时任务的调度代码化才是能够让我们很好地从这种手动管理的纯人力操作中解脱出来。

    在 Python 生态中对于定时任务的一些操作主要有那么几个:

    1. schedule:第三方模块,该模块适合比较轻量级的一些调度任务,但却不适用于复杂时间的调度

    2. APScheduler:第三方定时任务框架,是对 Java 第三方定时任务框架 Quartz 的模仿与移植,能提供比 schedule 更复杂的应用场景,并且各种组件都是模块化,易于使用与二次开发。

    3. Celery Beat:属于 celery 这分布式任务队列第三方库下的一个定时任务组件,如果使用需要配合 RabbitMQ 或 Redis 这类的消息队列套件,需要花费一定的时间在环境搭建上,但在高版本中已经不支持 Windows。

    所以为了满足能够相对复杂的时间条件,又不需要在前期的环境搭建上花费很多时间的前提下,选择 APScheduler 来对我们的调度任务或定时任务进行管理是个性价比极高的选择。而本文主要会带你快速上手有关 APScheduler 的使用。

    APScheduler 概念与组件

    虽然说官方文档上的内容不是很多,而且所列举的 API 不是很多,但这侧面也反映了这一框架的简单易用。所以在使用 APScheduler 之前,我们需要对这个框架的一些概念简单了解,主要有那么以下几个:

    • 触发器(trigger)

    • 任务持久化(job stores)

    • 执行器(executor)

    • 调度器(scheduler)

    触发器(trigger)

    所谓的触发器就是用以触发定时任务的组件,在 APScheduler中主要是指时间触发器,并且主要有三类时间触发器可供使用:

    • date:日期触发器。日期触发器主要是在某一日期时间点上运行任务时调用,是 APScheduler 里面最简单的一种触发器。所以通常也适用于一次性的任务或作业调度。

    • interval:间隔触发器。间隔触发器是在日期触发器基础上扩展了对时间部分,比如时、分、秒、天、周这几个部分的设定。是我们用以对重复性任务进行设定或调度的一个常用调度器。设定了时间部分之后,从起始日期开始(默认是当前)会按照设定的时间去执行任务。

    • croncron 表达式触发器。cron 表达式触发器就等价于我们 Linux 上的 crontab,它主要用于更复杂的日期时间进行设定。但需要注意的是,APScheduler 不支持 6 位及以上的 cron 表达式,最多只支持到 5 位。

    任务持久化(job stores)

    任务持久化主要是用于将设定好的调度任务进行存储,即便是程序因为意外情况,如断电、电脑或服务器重启时,只要重新运行程序时,APScheduler 就会根据对存储好的调度任务结果进行判断,如果出现已经过期但未执行的情况会进行相应的操作。

    APScheduler 为我们提供了多种持久化任务的途径,默认是使用memory 也就是内存的形式,但内存并不是持久化最好的方式。最好的方式则是通过像数据库这样的载体来将我们的定时任务写入到磁盘当中,只要磁盘没有损坏就能将数据给恢复。

    APScheduler 支持的且常用的数据库主要有:

    • sqlalchemy 形式的数据库,这里就主要是指各种传统的关系型数据库,如 MySQL、PostgreSQL、SQLite 等。

    • mongodb 非结构化的 Mongodb 数据库,该类型数据库经常用于对非结构化或版结构化数据的存储或操作,如 JSON。

    • redis 内存数据库,通常用作数据缓存来使用,当然通过一些主从复制等方式也能实现当中数据的持久化或保存。

    通常我们可以在创建 Scheduler 实例时创建,或是单独为任务指定。配置的方式相对简单,我们只需要指定对应的数据库链接即可。

    执行器(executor)

    执行器顾名思义就是执行我们任务的对象,在计算机内通常要么是 CPU 调度任务,要么是单独维护一个线程来运行任务。所以 APScheduler 里的执行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 这样的线程池和进程池两种。

    当然如果是和协程或异步相关的任务调度,还可以使用对应的 AsyncIOExecutorTwistedExecutor 和 GeventExecutor 三种执行器。

    调度器(scheduler)

    调度器的选择主要取决于你当前的程序环境以及 APScheduler的用途。根据用途的不同,APScheduler 又提供了以下几种调度器:

    • BlockingScheduler:阻塞调度器,当程序中没有任何存在主进程之中运行东西时,就则使用该调度器。

    • BackgroundScheduler:后台调度器,在不使用后面任何的调度器且希望在应用程序内部运行时的后台启动时才进行使用,如当前你已经开启了一个 Django 或 Flask 服务。

    • AsyncIOSchedulerAsyncIO 调度器,如果代码是通过 asyncio 模块进行异步操作,使用该调度器。

    • GeventSchedulerGevent 调度器,如果代码是通过 gevent 模块进行协程操作,使用该调度器

    • TornadoSchedulerTornado 调度器,在 Tornado 框架中使用

    • TwistedSchedulerTwisted 调度器,在基于 Twisted的框架或应用程序中使用

    • QtSchedulerQt 调度器,在构建 Qt 应用中进行使用。

    通常情况下如果不是和 Web 项目或应用集成共存,那么往往都首选 BlockingScheduler 调度器来进行操作,它会在当前进程中启动相应的线程来进行任务调度与处理;反之,如果是和 Web 项目或应用共存,那么需要选择 BackgroundScheduler 调度器,因为它不会干扰当前应用的线程或进程状况。

    基于对以上的概念和组件认识,我们就能基本上摸清 APScheduler 的运行流程:

    1. 设定调度器(scheduler)用以对任务的调度与安排进行全局统筹

    2. 对相应的函数或方法上设定相应的触发器(trigger),并添加到调度器中

    3. 如有任务持久化(job stores)需要则需要设定对应的持久化层,否则默认使用内存存储任务

    4. 当触发器被触发时,就将任务交由执行器(executor)进行执行

    APScheduler 快速上手

    虽然 APScheduler 里面的概念和组件看起来有点多,但在使用上并不算很复杂,我们可以通过本节的示例就能够很快使用。

    选择对应的 scheduler

    在使用之前我们需要先实例化一个 scheduler 对象,所有的 scheduler 对象都被放在了 apscheduler.schedulers 模块下,我们可以直接通过查看 API 文档或者借助 IDE 补全的提示来获取相应的 scheduler 对象。

    这里我直接选取了最基础的 BlockingScheduler

    # main.py
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    scheduler = BlockingScheduler()
    

    配置 scheduler

    对于 scheduler 的一些配置我们可以直接在实例化对象时就进行配置,当然也可以在创建实例化对象之后再进行配置。

    实例化时进行参数配置:

    # main.py
    from datetime import datetime
    
    from apscheduler.executors.pool import ThreadPoolExecutor
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    # 任务持久化 使用 SQLite
    jobstores = {
        'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db')
    }
    # 执行器配置
    executors = {
        'default': ThreadPoolExecutor(20),
    }
    # 关于 Job 的相关配置,见官方文档 API
    job_defaults = {
        'coalesce': False,
        'next_run_time': datetime.now()
    }
    scheduler = BlockingScheduler(
      jobstores = jobstores,
      executors = executors,
      job_defaults = job_defaults,
      timezone = 'Asia/Shanghai'
    )
    

    或是通过 scheduler.configure 方法进行同样的操作:

    scheduler = BlockingScheduler()
    scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')
    

    添加并执行你的任务

    创建 scheduler 对象之后,我们需要调用其下的 add_job()或是 scheduled_job() 方法来将我们需要执行的函数进行注册。前者是以传参的形式指定对应的函数名,而后者则是以装饰器的形式直接对我们要执行的函数进行修饰。

    比如我现在有一个输出此时此刻时间的函数 now()

    from datetime import datetime
    
    def now(trigger):
        print(f"trigger:{trigger} -> {datetime.now()}")
    

    然后我打算每 5 秒的时候运行一次,那我们使用 add_job() 可以这样写:

    if __name__ == '__main__':
        scheduler.add_job(now, trigger = "interval", args = ("interval",), seconds = 5)
        scheduler.start()
    

    在调用 start() 方法之后调度器就会开始执行,并在控制台上看到对应的结果了:

    trigger:interval -> 2021-01-16 21:19:43.356674
    trigger:interval -> 2021-01-16 21:19:46.679849
    trigger:interval -> 2021-01-16 21:19:48.356595
    

    当然使用 @scheduled_job 的方式来装饰我们的任务或许会更加自由一些,于是上面的例子就可以写成这样:

    @scheduler.scheduled_job(trigger = "interval", args = ("interval",), seconds = 5)
    def now(trigger):
        print(f"trigger:{trigger} -> {datetime.now()}")
    
    if __name__ == '__main__':
        scheduler.start()
    

    运行之后就会在控制台看到同样的结果了。

    不过需要注意的是,添加任务一定要在 start() 方法执行前调用,否则会找不到任务或是抛出异常。

    将 APScheduler 集成到 Web 项目中

    如果你是正在做有关的 Web 项目且存在一些定时任务,那么得益于 APScheduler 由于多样的调度器,我们能够将其和我们的项目结合到一起。

    如果你正在使用 Flask,那么 Flask-APScheduler 这一别人写好的第三方包装库就很适合你,虽然它没有相关的文档,但只要你了解了前面我所介绍的有关于 APScheduler 的概念和组件,你就能很轻易地看懂这个第三方库仓库里的示例代码。

    如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些对任务或作业的增删改查操作,我们可以自己编写一套合适的 API。

    这里我使用的是 FastAPI 这一目前流行的 Web 框架。demo 项目结构如下:

    temp-scheduler
    ├── config.py       # 配置项
    ├── main.py         # API 文件
    └── scheduler.py    # APScheduler 相关设置
    

    安装依赖

    这里我们需要的依赖不多,只需要简单几个即可:

    pip install fastapi apscheduler sqlalchemy uvicorn
    

    配置项

    如果项目中模块过多,那么使用一个文件或模块来进行统一管理是最好的选择。这里的 config.py 我们主要像 Flask 的配置那样简单设定:

    from apscheduler.executors.pool import ThreadPoolExecutor
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    class SchedulerConfig:
    
        JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")}
        EXECUTORS = {"default": ThreadPoolExecutor(20)}
        JOB_DEFAULTS = {"coalesce": False}
    
        @classmethod
        def to_dict(cls):
            return {
                "jobstores": cls.JOBSTORES,
                "executors": cls.EXECUTORS,
                "job_defaults": cls.JOB_DEFAULTS,
            }
    

    在 SchedulerConfig 配置项中我们可以自己实现一个 to_dict() 类方法,以便我们后续传参时通过解包的方式直接传入配置参数即可。

    Scheduler 相关设置

    scheduler.py 模块的设定也比较简单,即设定对应的 scheduler 调度器即可。由于是演示 demo 我还将要定期执行的任务也放在了这个模块当中:

    import logging
    from datetime import datetime
    
    from apscheduler.schedulers.background import BackgroundScheduler
    
    from config import SchedulerConfig
    
    scheduler = BackgroundScheduler()
    logger = logging.getLogger(__name__)
    
    def init_scheduler() -> None:
        # config scheduler
        scheduler.configure(**SchedulerConfig.to_dict())
    
        logger.info("scheduler is running...")
    
        # schedule test
        scheduler.add_job(
            func=mytask,
            trigger="date",
            args=("APScheduler Initialize.",),
            next_run_time=datetime.now(),
        )
        scheduler.start()
    
    def mytask(message: str) -> None:
        print(f"[{datetime.now()}] message: {message}")
    

    在这一部分中:

    • init_scheduler() 方法主要用于在 API 服务启动时被调用,然后对 scheduler 对象的配置以及测试

    • mytask() 则是我们要定期执行的任务,后续我们可以通过 APScheduler 提供的方法来自行添加任务

    API 设置

    在 main.py 模块就主要存放着我们由 FastAPI 所构建的相关 API。如果在后续开发时存在多个接口,此时就需要将不同接口放在不同模块文件中,以达到路由的分发与管理,类似于 Flask 的蓝图模式。

    import logging
    import uuid
    from datetime import datetime
    from typing import Any, Dict, Optional, Sequence, Union
    
    from fastapi import FastAPI
    from pydantic import BaseModel
    
    from scheduler import init_scheduler, mytask, scheduler
    
    logger = logging.getLogger(__name__)
    
    app = FastAPI(title="APScheduler API")
    app.add_event_handler("startup", init_scheduler)
    
    class Job(BaseModel):
        id: Union[int, str, uuid.UUID]
        name: Optional[str] = None
        func: Optional[str] = None
        args: Optional[Sequence[Optional[str]]] = None
        kwargs: Optional[Dict[str, Any]] = None
        executor: Optional[str] = None
        misfire_grace_time: Optional[str] = None
        coalesce: Optional[bool] = None
        max_instances: Optional[int] = None
        next_run_time: Optional[Union[str, datetime]] = None
    
    @app.post("/add")
    def add_job(
        message: str,
        trigger: str,
        trigger_args: Optional[dict],
        id: Union[str, int, uuid.UUID],
    ):
        try:
            scheduler.add_job(
                func=mytask,
                trigger=trigger,
                kwargs={"message": message},
                id=id,
                **trigger_args,
            )
        except Exception as e:
            logger.exception(e.args)
            return {"status_code": 0, "message": "添加失败"}
        return {"status_code": 1, "message": "添加成功"}
    
    @app.delete("/delete/{id}")
    def delete_job(id: Union[str, int, uuid.UUID]):
        """delete exist job by id"""
        try:
            scheduler.remove_job(job_id=id)
        except Exception:
            return dict(
                message="删除失败",
                status_code=0,
            )
        return dict(
            message="删除成功",
            status_code=1,
        )
    
    @app.put("/reschedule/{id}")
    def reschedule_job(
        id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]
    ):
        try:
            scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args)
        except Exception as e:
            logger.exception(e.args)
            return dict(
                message="修改失败",
                status_code=0,
            )
        return dict(
            message="修改成功",
            status_code=1,
        )
    
    @app.get("/job")
    def get_all_jobs():
        jobs = None
        try:
            job_list = scheduler.get_jobs()
            if job_list:
                jobs = [Job(**task.__getstate__()) for task in job_list]
        except Exception as e:
            logger.exception(e.args)
            return dict(
                message="查询失败",
                status_code=0,
                jobs=jobs,
            )
        return dict(
            message="查询成功",
            status_code=1,
            jobs=jobs,
        )
    
    @app.get("/job/{id}")
    def get_job_by_id(id: Union[int, str, uuid.UUID]):
        jobs = []
        try:
            job = scheduler.get_job(job_id=id)
            if job:
                jobs = [Job(**job.__getstate__())]
        except Exception as e:
            logger.exception(e.args)
            return dict(
                message="查询失败",
                status_code=0,
                jobs=jobs,
            )
        return dict(
            message="查询成功",
            status_code=1,
            jobs=jobs,
        )
    

    以上代码看起来很多,其实核心的就那么几点:

    1. FastAPI 对象 app 的初始化。这里用到的 add_event_handler() 方法就有点像 Flask 中的 before_first_request,会在 Web 服务请求伊始进行操作,理解为初始化相关的操作即可。

    2. API 接口路由。路由通过 app 对象下的对应 HTTP 方法来实现,如 GETPOSTPUT 等。这里的装饰器用法其实也和 Flask 很类似,就不多赘述。

    3. scheduler 对象的增删改查。从 scheduler.py 模块中引入我们创建好的 scheduler 对象之后就可以直接用来做增删改查的操作:

      1. 增:使用 add_job() 方法,其主要的参数是要运行的函数(或方法)、触发器以及触发器参数等

      2. 删:使用 delete_job() 方法,我们需要传入一个对应任务的 id 参数,用以能够查找到对应的任务

      3. 改:使用 reschedule_job() 方法,这里也需要一个对应任务的 id 参数,以及需要重新修改的触发器及其参数

      4. 查:使用 get_jobs() 和 get_job() 两个方法,前者是直接获取到当前调度的所有任务,返回的是一个包含了 APScheduler.job.Job 对象的列表,而后者是通过 id 参数来查找对应的任务对象;这里我通过底层源码使用 __getstate__() 来获取到任务的相关信息,这些信息我们通过事先设定好的 Job 对象来对其进行序列化,最后将信息从接口中返回。

    展开全文
  • Python APScheduler定时任务调度框架

    千次阅读 2018-12-10 10:01:40
    它是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。 2 安装 使用 pip 包管理...

    1 简介

    APScheduler的全称是Advanced Python Scheduler。它是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。

    2 安装

    使用 pip 包管理工具安装 APScheduler 是最方便快捷的。

    pip install APScheduler
    # 如果出现因下载失败导致安装不上的情况,建议使用代理
    pip --proxy http://代理ip:端口 install APScheduler
    

    3 使用步骤

    APScheduler 使用起来还算是比较简单。运行一个调度任务只需要以下三部曲。

    新建一个 schedulers (调度器) 。
    添加一个调度任务(job stores)。
    运行调度任务。

    下面是执行每 2 秒报时的简单示例代码:

    import datetime
    import time
    from apscheduler.schedulers.background import BackgroundScheduler
    
    def timedTask():
        print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
    
    if __name__ == '__main__':
        # 创建后台执行的 schedulers
        scheduler = BackgroundScheduler()  
        # 添加调度任务
        # 调度方法为 timedTask,触发器选择 interval(间隔性),间隔时长为 2 秒
        scheduler.add_job(timedTask, 'interval', seconds=2)
        # 启动调度任务
        scheduler.start()
        
        while True:
            print(time.time())
            time.sleep(5)
    

    4 基础组件

    APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。

    • schedulers(调度器)
      它是任务调度器,属于控制器角色。它配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。

    • triggers(触发器)
      描述调度任务被触发的条件。不过触发器完全是无状态的。

    • job stores(作业存储器)
      任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中,任务中的数据序列化后保存到持久化数据库,从数据库加载后又反序列化。

    • executors(执行器)
      负责处理作业的运行,它们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

    4.1 schedulers(调度器)

    我个人觉得 APScheduler 非常好用的原因。它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:

    • BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
    • BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。
    • AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
    • GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 GeventExecutor 配合使用。
    • TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
    • TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。
    • QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。

    4.2 triggers(触发器)

    APScheduler 有三种内建的 trigger:
    1)date 触发器
    date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:

    参数 说明
    run_date (datetime 或 str) 作业的运行日期或时间
    timezone (datetime.tzinfo 或 str) 指定时区

    date 触发器使用示例如下:

    from datetime import datetime
    from datetime import date
    from apscheduler.schedulers.background import BackgroundScheduler
    
    def job_func(text):
        print(text)
    
    scheduler = BackgroundScheduler()
    # 在 2017-12-13 时刻运行一次 job_func 方法
    scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
    # 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
    scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
    # 在 2017-12-13 14:00:01 时刻运行一次 job_func 方法
    scheduler .add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text'])
    
    scheduler.start()
    

    2)interval 触发器
    固定时间间隔触发。interval 间隔调度,参数如下:

    参数 说明
    weeks (int) 间隔几周
    days (int) 间隔几天
    hours (int) 间隔几小时
    minutes (int) 间隔几分钟
    seconds (int) 间隔多少秒
    start_date (datetime 或 str) 开始日期
    end_date (datetime 或 str) 结束日期
    timezone (datetime.tzinfo 或str) 时区

    interval 触发器使用示例如下:

    import datetime
    from apscheduler.schedulers.background import BackgroundScheduler
    
    def job_func(text):
        print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
    
    scheduler = BackgroundScheduler()
    # 每隔两分钟执行一次 job_func 方法
    scheduler .add_job(job_func, 'interval', minutes=2)
    # 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
    scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
    
    scheduler.start()
    

    3)cron 触发器
    在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。
    我们先了解 cron 参数:

    参数 说明
    year (int 或 str) 年,4位数字
    month (int 或 str) 月 (范围1-12)
    day (int 或 str) 日 (范围1-31
    week (int 或 str) 周 (范围1-53)
    day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
    hour (int 或 str) 时 (范围0-23)
    minute (int 或 str) 分 (范围0-59)
    second (int 或 str) 秒 (范围0-59)
    start_date (datetime 或 str) 最早开始日期(包含)
    end_date (datetime 或 str) 最晚结束时间(包含)
    timezone (datetime.tzinfo 或str) 指定时区

    这些参数是支持算数表达式,取值格式有如下:

    image.png

    cron 触发器使用示例如下:

    import datetime
    from apscheduler.schedulers.background import BackgroundScheduler
    
    def job_func(text):
        print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
    
    scheduler = BackgroundScheduler()
    # 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 job_func 任务
    scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
    
    scheduler.start()
    

    4.3 作业存储(job store)

    该组件是对调度任务的管理。
    1)添加 job
    有两种添加方法,其中一种上述代码用到的 add_job(), 另一种则是scheduled_job()修饰器来修饰函数。

    这个两种办法的区别是:第一种方法返回一个 apscheduler.job.Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。

    第二种添加任务方式的例子:

    import datetime
    from apscheduler.schedulers.background import BackgroundScheduler
    
    @scheduler.scheduled_job(job_func, 'interval', minutes=2)
    def job_func(text):
        print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
    
    scheduler = BackgroundScheduler()
    scheduler.start()
    

    2)移除 job
    移除 job 也有两种方法:remove_job() 和 job.remove()
    remove_job() 是根据 job 的 id 来移除,所以要在 job 创建的时候指定一个 id。
    job.remove() 则是对 job 执行 remove 方法即可

    scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
    scheduler.remove_job(job_one)
    
    job = add_job(job_func, 'interval', minutes=2, id='job_one')
    job.remvoe()
    

    3)获取 job 列表
    通过 scheduler.get_jobs() 方法能够获取当前调度器中的所有 job 的列表

    修改 job
    如果你因计划改变要对 job 进行修改,可以使用Job.modify() 或者 modify_job()方法来修改 job 的属性。但是值得注意的是,job 的 id 是无法被修改的。

    scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
    scheduler.start()
    # 将触发时间间隔修改成 5分钟
    scheduler.modify_job('job_one', minutes=5)
    
    job = scheduler.add_job(job_func, 'interval', minutes=2)
    # 将触发时间间隔修改成 5分钟
    job.modify(minutes=5)
    

    5)关闭 job
    默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将 wait 选项设置为 False。

    scheduler.shutdown()
    scheduler.shutdown(wait=false)
    

    4.4 执行器(executor)

    执行器顾名思义是执行调度任务的模块。最常用的 executor 有两种:ProcessPoolExecutor 和 ThreadPoolExecutor

    下面是显式设置 job store(使用mongo存储)和 executor 的代码的示例。
    注:本代码来源于网络

    from pymongo import MongoClient
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.jobstores.mongodb import MongoDBJobStore
    from apscheduler.jobstores.memory import MemoryJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
     
     
    def my_job():
        print 'hello world'
    host = '127.0.0.1'
    port = 27017
    client = MongoClient(host, port)
     
    jobstores = {
        'mongo': MongoDBJobStore(collection='job', database='test', client=client),
        'default': MemoryJobStore()
    }
    executors = {
        'default': ThreadPoolExecutor(10),
        'processpool': ProcessPoolExecutor(3)
    }
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    }
    scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
    scheduler.add_job(my_job, 'interval', seconds=5)
     
    try:
        scheduler.start()
    except SystemExit:
        client.close()
    展开全文
  • 经过1-2个星期的开发,现在开发了个半成品(UI现在比较烂,因为我的前端本来就很差,将就下吧),大概功能如下:用户功能(添加、删除、修改),添加部门功能,任务管理功能(添加、删除、修改,详细)、项目管理...
  • 不知是不是 crontab 命令不允许有 qsub 的提交操作,还是管理员设置了 crontab 发起任务的用户没有节点访问权限。。。总之,一向很便利的 crontab 命令居然给我挖坑了。于是,我只得自己写一个定时任务。 当然,核心...
  • 花了两周时间,利用工作间隙时间,开发了一个基于Django的项目任务管理Web应用。项目计划的实时动态,可以方便地被项目成员查看(^_^又重复发明轮子了)。从前台到后台,好好折腾了一把,用到:HTML、CSS、...
  • Python Web开发框架Django

    2019-10-04 23:28:01
    花了两周时间,利用工作间隙时间,开发了一个基于Django的项目任务管理Web应用。项目计划的实时动态,可以方便地被项目成员查看(^_^又重复发明轮子了)。从前台到后台,好好折腾了一把,用到:HTML、CSS、...
  • 初步使用python框架Django,调度任务的程序运行基于python高级调度框架APScheduler,使用Scrapy框架进行数据的爬取,彩蛋游戏使用unity2D进行开发 项目描述: 需求分析: 1.学校图书馆借的书都很容易忘记还,这...
  • Python 爬虫框架 Scrapyd 集群管理的全功能 web UI,支持 Scrapy 日志分析和可视化、自动打包、定时器任务和邮件通知等特色功能
  • 它处理登录,重置和记住用户会话的常见任务 flask-jwt-extended 烧瓶邮件 可以在Flask应用中设置SMTP可以可以发送邮件信息 烧瓶塔利斯曼 处理设置HTTP标头的过程,这些标头可以帮助防止一些常见的Web应用程序安全...
  • Luiti建立在Luigi的基础上,将所有任务分成多个包,并每个Python文件强制执行一个任务。 Luiti任务类可以通过luiti命令进行管理,支持的操作包括ls,new,generate,info,clean,run和webui。 Luiti诞生于建立...
  • django-admin.py是Django的一个用于管理任务的命令行工具。本文将描述它的大概用法。 另外,在每一个Django project中都会有一个manage.py。manage.py是对django-admin.py的简单包装,它额外帮助我们做了两件事情: ...
  • 小编典典(1)你可以使用app.app_context()上下文管理器来设置应用程序上下文。我想用法会像这样:from apscheduler.scheduler import Schedulerdef checkSecondApi():with app.app_context():# Do whatever you ...
  • 它是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。 2 安装 #使用 pip 包管理...
  • 每周一荐:Python Web开发框架Django

    千次阅读 2012-08-09 22:39:40
    花了两周时间,利用工作间隙时间,开发了一个基于Django的项目任务管理Web应用。项目计划的实时动态,可以方便地被项目成员查看(^_^又重复发明轮子了)。从前台到后台,好好折腾了一把,用到:HTML、CSS、...
  • 1. Flask:一个用python编写的Web应用框架,它整合了Werzeug(WSGI实用工具)和Jinja2(模板...3. Redmine:一款开源的问题跟踪系统,它以管理项目内的任务和Bug的问题功能为中心,兼具服务于团队开发的功能,比如与...
  • 业务需求,需要让python自动化工作,于是本人开发了一个框架用来管理多线程任务,供大家使用~ # 以下为框架源码 # coding:utf8 # author: Lanyixiao_Eathoublu import threading import time class HTask: def _...
  • 经过1-2个星期的开发,现在开发了个半成品(UI现在比较烂,因为我的前端本来就很差,将就下吧),大概功能如下:用户功能(添加、删除、修改),添加部门功能,任务管理功能(添加、删除、修改,详细)、项目管理...
  • OpsManage是什么? 一款代码部署、应用部署、计划任务、设备资产管理平台。 开发语言与框架: 编程语言:Python2.7 HTML JScripts 前端Web框架
  • 酒店管理数据库管理项目:使用各种前端技术库(HTML,CSS,JavaScript,Bootstrap)设计的响应式和动态网页(UI)。 为系统设计的数据库(SQL,MySQL服务器)。... 使用Python脚本执行DBA任务以测试查询并绘制性能图
  • 现在,您通常可以存储一些代码或文本来执行日常任务。 非常方便,Farinha将使您的工作效率更高。 安装 我强烈建议使用farinha便携式和虚拟环境下,像 。 激活环境后,安装依赖项: $ pip install django $ pip ...

空空如也

空空如也

1 2 3 4 5 ... 19
收藏数 373
精华内容 149
关键字:

python任务管理框架

python 订阅