-
Python 定时任务
2016-08-19 16:32:22Python 定时任务1. sched
https://docs.python.org/2/library/sched.html
作为一名Linux的SA,我们已经习惯了用crontab,而sched提供了一种延迟处理机制,也可以理解为任务调度的另一种方式的实现。
注意:
sched模块不是循环的,一次调度被执行后就Over了,如果想再执行,请再次enter
run()一直被阻塞,直到所有事件被全部执行完. 每个事件在同一线程中运行,所以如果一个事件的执行时间大于其他事件的延迟时间,那么,就会产生重叠。重叠的解决方法是推迟后来事件的执行时间。这样保证没有丢失任何事件,但这些事件的调用时刻会比原先设定的迟。
从3.3版本开始,scheduler是线程安全的。
举例:
import sched, time s = sched.scheduler(time.time, time.sleep) def do_something(sc): print "Doing stuff..." # do you stuff sc.enter(2, 1, do_something, (sc,)) s.enter(2, 1, do_something, (s,)) s.run()
2. threading.Timer
https://docs.python.org/2/library/threading.html?highlight=timer#timer-objects
Timer 不是循环的,一次调度被执行后就Over了,如果想再执行,请再次enter
举例:
#!/usr/bin/env python from threading import Timer from time import sleep class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False def hello(name): print "Hello %s!" % name print "starting..." rt = RepeatedTimer(1, hello, "World") try: sleep(5) finally: rt.stop()
3. APScheduler
https://apscheduler.readthedocs.org/en/latest/
APScheduler 是一个 Python 定时任务框架,使用起来十分方便。提供了基于日期、固定时间间隔以及 crontab 类型的任务,并且可以持久化任务、并以 daemon 方式运行应用
举例:
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime import time import os def tick(): print('Tick! The time is: %s' % datetime.now()) scheduler = BlockingScheduler() scheduler.add_job(tick, 'interval', seconds=2) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): scheduler.shutdown()
后台执行任务:
from datetime import datetime import time import os from apscheduler.schedulers.background import BackgroundScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(tick, 'interval', seconds=2) scheduler.start() print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: # This is here to simulate application activity (which keeps the main thread alive). while True: time.sleep(5) except (KeyboardInterrupt, SystemExit): # Not strictly necessary if daemonic mode is enabled but should be done if possible scheduler.shutdown()
4. gevent
Gevent 是一种基于协程的 Python 网络库,它用到 Greenlet 提供的,封装了 libevent 事件循环的高层同步 API。
Gevent 是一个基于 greenlet 的 Python 的并发框架,以微线程 greenlet 为核心,使用了 epoll 事件监听机制以及诸多其他优化而变得高效。
于 greenlet、eventlet 相比,性能略低,但是它封装的 API 非常完善,最赞的是提供了一个 monkey 类,可以将现有基于 Python 线程直接转化为 greenlet,相当于 proxy 了一下(打了 patch)。
举例:
import gevent import gevent.monkey gevent.monkey.patch_all() # 方法一:gevent.spawn_later 创建定时器 def schedule(delay, func, *args, **kw_args): gevent.spawn_later(0, func, *args, **kw_args) gevent.spawn_later(delay, schedule, delay, func, *args, **kw_args) # 方法二:死循环中 gevent.sleep def run_reqularly(self, function, interval, *args, **kwargs): while True: gevent.sleep(interval) function(*args, **kwargs) # 方法三:同方法二,间隔时间更准确 def run_reqularly2(self, function, interval, *args, **kwargs): while True: before = time.time() function(*args, **kwargs) duration = time.time() - before if duration < interval: gevent.sleep(interval-duration) else: warning("function %s duration exceeded %f interval (took %f)" % (function.__name__, interval, duration)) def test1(): print "func test1" def test2(): print "func test2" def test3(): print "func test3" schedule(1,test1) schedule(2,test2) schedule(3,test3) while True: gevent.sleep(10)
5. Twisted
Twisted 是用 Python 实现的基于事件驱动的网络引擎框架。
举例:
from twisted.internet import task from twisted.internet import reactor def runEverySecond(): print "a second has passed" l = task.LoopingCall(runEverySecond) l.start(1.0) # call every second # l.stop() will stop the looping calls reactor.run()
6. Celery
http://celery.readthedocs.io/en/latst/userguide/periodic-tasks.html
举例:
from datetime import timedelta from celery import Celery app = Celery('app1') app.conf.update( CELERYBEAT_SCHEDULE = { 'mytimer': { 'task': 'myfunc', 'schedule': timedelta(seconds=2), }, }, CELERY_TIMEZONE = 'UTC', ) @app.task(name='myfunc') def runEverySecond(): print "a second has passed"
7. Tornado
由 Facebook 开源出来的 web 服务器框架。Tornado 采用多进程 + 非阻塞 + epoll 的模型,可以提供比较强大的网络响应性能
IOLoop 基于 epoll,可以高效的响应网络事件
举例:
from tornado import web, ioloop import datetime class MainHandler(web.RequestHandler): def get(self): self.write('Hello Tornado') def f2s(): print '2s ', datetime.datetime.now() def f5s(): print '5s ', datetime.datetime.now() if __name__ == '__main__': application = web.Application([ (r'/', MainHandler), ]) application.listen(8081) ioloop.PeriodicCallback(f2s, 2000).start() # start scheduler ioloop.PeriodicCallback(f5s, 5000).start() # start scheduler ioloop.IOLoop.instance().start()
参考
- What is the best way to repeatedly execute a function every x seconds in Python?
- Python的sched模块和Timer类
- Python任务调度之sched
- Python任务调度模块 – APScheduler
- APScheduler —— Python化的Cron
- apscheduler.schedulers.background
- 使用gevent spawn_later实现定时计划任务系统
- something like cron (timer) in gevent
- Tornado/Twisted - Celery - Gevent Comparison
- Scheduling tasks for the future
- “Twisted与异步编程入门”系列文章的简体中文翻译
- tornado实现异步计划任务及python常见计划任务方法
- Using Celery to schedule python tasks
- Celery documentation - Periodic Tasks
-eof-
-
Python定时任务
2020-07-14 13:23:47Python定时任务 APScheduler定时任务 上次测试女神听了我的建议,已经做好了要给项目添加定时任务的决定了。但是之前提供的四种方式中,她不知道具体选择哪一个。为了和女神更近一步,我把我入行近10年收藏的干货...Python定时任务
APScheduler定时任务
上次测试女神听了我的建议,已经做好了要给项目添加定时任务的决定了。但是之前提供的四种方式中,她不知道具体选择哪一个。为了和女神更近一步,我把我入行近10年收藏的干货免费拿出来分享给女神,希望女神凌晨2点再找我的时候,不再是因为要给他调程序了。Python中定时任务的解决方案,总体来说有四种,分别是:crontab、 scheduler、 Celery、 APScheduler,其中 crontab不适合多台服务器的配置、scheduler太过于简单、 Celery依赖的软件比较多,比较耗资源。最好的解决方案就是 APScheduler。
APScheduler使用起来十分方便。提供了基于日期、固定时间间隔以及 crontab类型的任务。还可以在程序运行过程中动态的新增任务和删除任务。在任务运行过程中,还可以把任务存储起来,下次启动运行依然保留之前的状态。另外最重要的一个特点是,因为他是基于 Python语言的库,所以是可以跨平台的,一段代码,处处运行!
在这里我来给大家详细介绍一下具体的用法。
一、安装:
安装非常简单,通过 pip install apscheduler即可。二、基本使用:
先来看一段代码,然后再来给大家详细讲解其中的细节:
其中 BlockingScheduler是阻塞性的调度器,是最基本的调度器,下面调用 start方法就会阻塞当前进程,所以如果你的程序除了调度进程没有其他后台进程,那么是可以是否的,否则这个调度器会阻塞你程序的正常执行。接下来就是定义一个 my_clock函数,这个函数就是需要定时调度的任务代码。
然后就是实例化一个 BlockingScheduler对象,并把 my_clock添加到任务调度中。然后看 interval参数,这里用的是间隔的方式来调度,调度频率是 seconds=3,也就是每3秒执行一次。
可以看到每隔3秒钟的时间会执行一次。说明定时任务已经成功执行了!在了解了 APScheduler的基本使用后,再来对 APScheduler的四个基本对象做个了解,这样才能从全局掌握 APScheduler。
三、四个基本对象:
-
触发器(triggers):
触发器就是根据你指定的触发方式,比如是按照时间间隔,还是按照 crontab触发,触发条件是什么等。每个任务都有自己的触发器。 -
任务存储器(job stores):
任务存储器是可以存储任务的地方,默认情况下任务保存在内存,也可将任务保存在各种数据库中。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。 -
执行器(executors):
执行器的目的是安排任务到线程池或者进程池中运行的。 -
调度器(schedulers):
任务调度器是属于整个调度的总指挥官。他会合理安排作业存储器、执行器、触发器进行工作,并进行添加和删除任务等。调度器通常是只有一个的。开发人员很少直接操作触发器、存储器、执行器等。因为这些都由调度器自动来实现了。
四、触发器:
触发器有两种,第一种是 interval,第二种是 crontab。interval可以具体指定多少时间间隔执行一次。crontab可以指定执行的日期策略。以下分别进行讲解。 -
date触发器:
在某个日期时间只触发一次事件。示例代码如下:
更多请参考:https://apscheduler.readthedocs.io/en/stable/modules/triggers/date.html
- interval触发器:
想要在固定的时间间隔触发事件。interval的触发器可以设置以下的触发参数:
weeks:周。整形。
days:一个月中的第几天。整形。
hours:小时。整形。
minutes:分钟。整形。
seconds:秒。整形。
start_date:间隔触发的起始时间。
end_date:间隔触发的结束时间。
jitter:触发的时间误差。
在每天的11点24分触发事件。更多请参考:https://apscheduler.readthedocs.io/en/stable/modules/triggers/interval.html- crontab触发器:
在某个确切的时间周期性的触发事件。可以使用的参数如下:
year:4位数字的年份。
month:1-12月份。
day:1-31日。
week:1-53周。
day_of_week:一个礼拜中的第几天( 0-6或者 mon、 tue、 wed、 thu、 fri、 sat、 sun)。
hour: 0-23小时。
minute: 0-59分钟。
second: 0-59秒。
start_date: datetime类型或者字符串类型,起始时间。
end_date: datetime类型或者字符串类型,结束时间。
timezone:时区。
jitter:任务触发的误差时间。
也可以用表达式类型,可以用以下方式:五、调度器:
BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用 start函数会阻塞当前线程,不能立即返回。
BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用 start后主线程不会阻塞。
AsyncIOScheduler:适用于使用了 asyncio模块的应用程序。
GeventScheduler:适用于使用 gevent模块的应用程序。
TwistedScheduler:适用于构建 Twisted的应用程序。
QtScheduler:适用于构建 Qt的应用程序。
六、任务存储器:
任务存储器的选择有两种。一是内存,也是默认的配置。二是数据库。使用内存的方式是简单高效,但是不好的是,一旦程序出现问题,重新运行的话,会把之前已经执行了的任务重新执行一遍。数据库则可以在程序崩溃后,重新运行可以从之前中断的地方恢复正常运行。有以下几种选择:MemoryJobStore:没有序列化,任务存储在内存中,增删改查都是在内存中完成。
SQLAlchemyJobStore:使用 SQLAlchemy这个 ORM框架作为存储方式。
MongoDBJobStore:使用 mongodb作为存储器。
RedisJobStore:使用 redis作为存储器。
七、执行器:
执行器的选择取决于应用场景。通常默认的 ThreadPoolExecutor已经在大部分情况下是可以满足我们需求的。如果我们的任务涉及到一些 CPU密集计算的操作。那么应该考虑 ProcessPoolExecutor。然后针对每种程序, apscheduler也设置了不同的 executor:ThreadPoolExecutor:线程池执行器。
ProcessPoolExecutor:进程池执行器。
GeventExecutor: Gevent程序执行器。
TornadoExecutor: Tornado程序执行器。
TwistedExecutor: Twisted程序执行器。
AsyncIOExecutor: asyncio程序执行器。
八、定时任务调度配置:
这里我们用一个例子来说明。比如我想这样配置执行器:
配置 default执行器为 ThreadPoolExecutor,并且设置最多的线程数是20个。
<
存储器:配置 default的任务存储器为 SQLAlchemyJobStore(使用SQLite)。
<
任务配置:设置 coalesce为 False:设置这个目的是,比如由于某个原因导致某个任务积攒了很多次没有执行(比如有一个任务是1分钟跑一次,但是系统原因断了5分钟),如果 coalesce=True,那么下次恢复运行的时候,会只执行一次,而如果设置 coalesce=False,那么就不会合并,会5次全部执行。
max_instances=5:同一个任务同一时间最多只能有5个实例在运行。比如一个耗时10分钟的job,被指定每分钟运行1次,如果我 max_instance值5,那么在第6~10分钟上,新的运行实例不会被执行,因为已经有5个实例在跑了。
那么代码如下:九、任务操作:
-
添加任务:
使用 scheduler.add_job(job_obj,args,id,trigger,**trigger_kwargs)。 -
删除任务:
使用 scheduler.remove_job(job_id,jobstore=None)。 -
暂停任务:
使用 scheduler.pause_job(job_id,jobstore=None)。 -
恢复任务:
使用 scheduler.resume_job(job_id,jobstore=None)。 -
修改某个任务属性信息:
使用 scheduler.modify_job(job_id,jobstore=None,**changes)。 -
修改单个作业的触发器并更新下次运行时间:
使用 scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args) -
输出作业信息:
使用 scheduler.print_jobs(jobstore=None,out=sys.stdout)
十、异常监听:
当我们的任务抛出异常后,我们可以监听到,然后把错误信息进行记录。示例代码如下:
https://mp.weixin.qq.com/s/9yxX1IcLg0-YrZ82O32UoQ -
-
python定时任务
2019-04-02 19:23:34在项目中,我们可能遇到有定时任务的需求。其一:定时执行任务。例如每天早上 8 点定时推送早报。其二:每隔一个时间段就执行任务。比如:每隔一个小时提醒自己...今天,我跟大家分享下 Python 定时任务的实现方法。1、第一种办法是最简单又最暴力。那就是在一个死循环中,使用线程睡眠函数 sleep()。
from datetime import datetime import time ''' 每个 10 秒打印当前时间。 ''' def timedTask(): while True: print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) time.sleep(10) if __name__ == '__main__': timedTask()
这种方法能够执行固定间隔时间的任务。如果timedTask()函数之后还有些操作,我们还使用死循环 + 阻塞线程。这会使得timedTask()一直占有 CPU 资源,导致后续操作无法执行。我建议谨重使用。2、既然第一种方法暴力,那么有没有比较优雅地方法?答案是肯定的。Python 标准库 threading 中有个 Timer 类。它会新启动一个线程来执行定时任务,所以它是非阻塞函式。
如果你有使用多线程的话,需要关心线程安全问题。那么你可以选使用threading.Timer模块。
from datetime import datetime from threading import Timer import time ''' 每个 10 秒打印当前时间。 ''' def timedTask(): ''' 第一个参数: 延迟多长时间执行任务(单位: 秒) 第二个参数: 要执行的任务, 即函数 第三个参数: 调用函数的参数(tuple) ''' Timer(10, task, ()).start() # 定时任务 def task(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) if __name__ == '__main__': timedTask() while True: print(time.time()) time.sleep(5)
运行结果:1512486945.1196375 1512486950.119873 2017-12-05 23:15:50 1512486955.133385
3、使用标准库中sched模块。sched 是事件调度器,它通过 scheduler 类来调度事件,从而达到定时执行任务的效果。sched库使用起来也是非常简单。
1)首先构造一个sched.scheduler类
它接受两个参数:timefunc和 delayfunc。timefunc 应该返回一个数字,代表当前时间,delayfunc 函数接受一个参数,用于暂停运行的时间单元。一般使用默认参数就行,即传入这两个参数 time.time 和 time.sleep.当然,你也可以自己实现时间暂停的函数。
2)添加调度任务
scheduler 提供了两个添加调度任务的函数:enter(delay, priority, action, argument=(), kwargs={})
1
该函数可以延迟一定时间执行任务。delay 表示延迟多长时间执行任务,单位是秒。priority为优先级,越小优先级越大。两个任务指定相同的延迟时间,优先级大的任务会向被执行。action 即需要执行的函数,argument 和 kwargs 分别是函数的位置和关键字参数。scheduler.enterabs(time, priority, action, argument=(), kwargs={})
1
添加一项任务,但这个任务会在 time 这时刻执行。因此,time 是绝对时间.其他参数用法与 enter() 中的参数用法是一致。3)把任务运行起来
调用 scheduler.run()函数就完事了。下面是 sche 使用的简单示例:
from datetime import datetime import sched import time ''' 每个 10 秒打印当前时间。 ''' def timedTask(): # 初始化 sched 模块的 scheduler 类 scheduler = sched.scheduler(time.time, time.sleep) # 增加调度任务 scheduler.enter(10, 1, task) # 运行任务 scheduler.run() # 定时任务 def task(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) if __name__ == '__main__': timedTask()
值得注意的是: scheduler 中的每个调度任务只会工作一次,不会无限循环被调用。如果想重复执行同一任务, 需要重复添加调度任务即可。上面三种办法能实现定时任务,但是都无法做到循环执行定时任务。因此,需要一个能够担当此重任的库。它就是APScheduler。
1 简介
APScheduler的全称是Advanced Python Scheduler。它是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。2 安装
使用 pip 包管理工具安装 APScheduler 是最方便快捷的。pip install APScheduler # 如果出现因下载失败导致安装不上的情况,建议使用代理 pip --proxy http://代理ip:端口 install APScheduler
3 使用步骤
APScheduler 使用起来还算是比较简单。运行一个调度任务只需要以下三部曲。新建一个 schedulers (调度器) 。 添加一个调度任务(job stores)。 运行调度任务。 下面是执行每 2 秒报时的简单示例代码: import datetime import time from apscheduler.schedulers.background import BackgroundScheduler def timedTask(): print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) if __name__ == '__main__': # 创建后台执行的 schedulers scheduler = BackgroundScheduler() # 添加调度任务 # 调度方法为 timedTask,触发器选择 interval(间隔性),间隔时长为 2 秒 scheduler.add_job(timedTask, 'interval', seconds=2) # 启动调度任务 scheduler.start() while True: print(time.time()) time.sleep(5)
4 基础组件
APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。schedulers(调度器)
它是任务调度器,属于控制器角色。它配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。triggers(触发器)
描述调度任务被触发的条件。不过触发器完全是无状态的。job stores(作业存储器)
任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中,任务中的数据序列化后保存到持久化数据库,从数据库加载后又反序列化。executors(执行器)
负责处理作业的运行,它们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。4.1 schedulers(调度器)
我个人觉得 APScheduler 非常好用的原因。它提供 7 种调度器,能够满足我们各种场景的需要。例如:后台执行某个操作,异步执行操作等。调度器分别是:BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。
AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 GeventExecutor 配合使用。
TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。
QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。
4.2 triggers(触发器)
APScheduler 有三种内建的 trigger:
1)date 触发器
date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:参数 说明
run_date (datetime 或 str) 作业的运行日期或时间
timezone (datetime.tzinfo 或 str) 指定时区
date 触发器使用示例如下:from datetime import datetime from datetime import date from apscheduler.schedulers.background import BackgroundScheduler def job_func(text): print(text) scheduler = BackgroundScheduler() # 在 2017-12-13 时刻运行一次 job_func 方法 scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text']) # 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法 scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text']) # 在 2017-12-13 14:00:01 时刻运行一次 job_func 方法 scheduler .add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text']) scheduler.start()
2)interval 触发器
固定时间间隔触发。interval 间隔调度,参数如下:参数 说明
weeks (int) 间隔几周 days (int) 间隔几天 hours (int) 间隔几小时 minutes (int) 间隔几分钟 seconds (int) 间隔多少秒 start_date (datetime 或 str) 开始日期 end_date (datetime 或 str) 结束日期 timezone (datetime.tzinfo 或str) 时区
interval 触发器使用示例如下: import datetime from apscheduler.schedulers.background import BackgroundScheduler def job_func(text): print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler = BackgroundScheduler() # 每隔两分钟执行一次 job_func 方法 scheduler .add_job(job_func, 'interval', minutes=2) # 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法 scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10') scheduler.start()
3)cron 触发器
在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。
我们先了解 cron 参数:参数 说明
year (int 或 str) 年,4位数字
month (int 或 str) 月 (范围1-12)
day (int 或 str) 日 (范围1-31
week (int 或 str) 周 (范围1-53)
day_of_week (int 或 str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 时 (范围0-23)
minute (int 或 str) 分 (范围0-59)
second (int 或 str) 秒 (范围0-59)
start_date (datetime 或 str) 最早开始日期(包含)
end_date (datetime 或 str) 最晚结束时间(包含)
timezone (datetime.tzinfo 或str) 指定时区
这些参数是支持算数表达式,取值格式有如下:cron 触发器使用示例如下:
import datetime from apscheduler.schedulers.background import BackgroundScheduler def job_func(text): print("当前时间:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler = BackgroundScheduler() # 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 job_func 任务 scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3') scheduler.start()
4.3 作业存储(job store)
该组件是对调度任务的管理。
1)添加 job
有两种添加方法,其中一种上述代码用到的 add_job(), 另一种则是scheduled_job()修饰器来修饰函数。这个两种办法的区别是:第一种方法返回一个 apscheduler.job.Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。
第二种添加任务方式的例子:
import datetime from apscheduler.schedulers.background import BackgroundScheduler @scheduler.scheduled_job(job_func, 'interval', minutes=2) def job_func(text): print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]) scheduler = BackgroundScheduler() scheduler.start()
2)移除 job
移除 job 也有两种方法:remove_job() 和 job.remove()。
remove_job() 是根据 job 的 id 来移除,所以要在 job 创建的时候指定一个 id。
job.remove() 则是对 job 执行 remove 方法即可scheduler.add_job(job_func, 'interval', minutes=2, id='job_one') scheduler.remove_job(job_one) job = add_job(job_func, 'interval', minutes=2, id='job_one') job.remvoe()
3)获取 job 列表
通过 scheduler.get_jobs() 方法能够获取当前调度器中的所有 job 的列表修改 job
如果你因计划改变要对 job 进行修改,可以使用Job.modify() 或者 modify_job()方法来修改 job 的属性。但是值得注意的是,job 的 id 是无法被修改的。scheduler.add_job(job_func, 'interval', minutes=2, id='job_one') scheduler.start() # 将触发时间间隔修改成 5分钟 scheduler.modify_job('job_one', minutes=5) job = scheduler.add_job(job_func, 'interval', minutes=2) # 将触发时间间隔修改成 5分钟 job.modify(minutes=5)
5)关闭 job
默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将 wait 选项设置为 False。scheduler.shutdown() scheduler.shutdown(wait=false)
4.4 执行器(executor)
执行器顾名思义是执行调度任务的模块。最常用的 executor 有两种:ProcessPoolExecutor 和 ThreadPoolExecutor下面是显式设置 job store(使用mongo存储)和 executor 的代码的示例。
from pymongo import MongoClient from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor def my_job(): print 'hello world' host = '127.0.0.1' port = 27017 client = MongoClient(host, port) jobstores = { 'mongo': MongoDBJobStore(collection='job', database='test', client=client), 'default': MemoryJobStore() } executors = { 'default': ThreadPoolExecutor(10), 'processpool': ProcessPoolExecutor(3) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults) scheduler.add_job(my_job, 'interval', seconds=5) try: scheduler.start() except SystemExit: client.close()
-
python 定时任务
2018-09-28 17:03:011、python定时任务之sched sched 定时任务 sched的使用步骤如下: s = sched.scheduler(time.time, time.sleep) s.enter(delay, priority, func1, (arg1, arg2, ...)) s.run() 第一步:新建一个调度器,time...1、python定时任务之sched
sched 定时任务
sched的使用步骤如下:
s = sched.scheduler(time.time, time.sleep) s.enter(delay, priority, func1, (arg1, arg2, ...)) s.run()
第一步:新建一个调度器,time.time表示可以返回时间戳, time.sleep表示定时任务阻塞;
第二步:添加调度任务,可以添加多个任务,delay表示间隔事件,相对于调度器添加这个任务时刻的延时,以秒为单位,priority表示优先级(数字越小优先级越高),func指的是调用的函数名,(arg1, arg2, …)是一个元组,里面存放的是函数的参数,若函数没有参数直接传入一个空元组()就可以了;
第三步:运行调度器。例1:
# coding:utf8 import os import time import sched import threading import datetime def task(): now = str(datetime.datetime.now()).split('.')[0] print('当前时间:%s, 进程pid:%s , 线程pid:%s ' % (now, os.getpid(), threading.current_thread().name)) # os.getpid()获取当前进程id os.getppid()获取父进程id if __name__ == '__main__': now = str(datetime.datetime.now()).split('.')[0] print(now) s = sched.scheduler(time.time, time.sleep) s.enter(10, 0, task, ()) # 10秒之后执行函数,该函数没有参数所以传入一个空元组,若有参数,把参数传入元组,并用逗号隔开比如(arg1, arg2, ...) s.run() # 2018-09-28 10:33:25 # 当前时间:2018-09-28 10:33:35, 进程pid:11228 , 线程pid:MainThread
例一中,休眠10秒之后执行task函数。
例2:
import os import time import sched import threading import datetime def task(msg): time.sleep(1) now = str(datetime.datetime.now()).split('.')[0] print('当前时间:%s, 进程pid:%s , 线程pid:%s , message:%s' % (now, os.getpid(), threading.current_thread().name, msg)) # os.getpid()获取当前进程id os.getppid()获取父进程id if __name__ == '__main__': now = str(datetime.datetime.now()).split('.')[0] print(now) s = sched.scheduler(time.time, time.sleep) s.enter(5, 0, task, ('Hello, world !',)) s.enter(5, 0, task, ('Hi, my friend !',)) s.enter(5, 0, task, ('Are you ok ?',)) s.run() # 2018-09-28 11:10:05 # 当前时间:2018-09-28 11:10:11, 进程pid:8892 , 线程pid:MainThread , message:Hello, world ! # 当前时间:2018-09-28 11:10:12, 进程pid:8892 , 线程pid:MainThread , message:Hi, my friend ! # 当前时间:2018-09-28 11:10:13, 进程pid:8892 , 线程pid:MainThread , message:Are you ok ?
总结:sched调度函数可以定时执行,但是3个函数并没有在同一时间点运行,即便把函数的优先级设置为一样的,程序会根据添加任务的先后顺序执行,只有主线程再跑,若想用多线程跑,见例3.
例3
import os import time import sched import threading import datetime def work(msg): now = str(datetime.datetime.now()) print('当前时间:%s, 进程pid:%s , 线程pid:%s , message:%s' % (now, os.getpid(), threading.current_thread().name, msg)) # os.getpid()获取当前进程id os.getppid()获取父进程id def task1(): threading.Thread(target=work, args=('Hello, world !',)).start() def task2(): threading.Thread(target=work, args=('Hi, my friend !',)).start() def task3(): threading.Thread(target=work, args=('Are you ok ?',)).start() if __name__ == '__main__': for _ in range(5): now = str(datetime.datetime.now()).split('.')[0] print(now) s = sched.scheduler(time.time, time.sleep) s.enter(5, 0, task1, ()) s.enter(5, 0, task2, ()) s.enter(5, 0, task3, ()) s.run() # 2018-09-28 11:06:51 # 当前时间:2018-09-28 11:06:56.302816, 进程pid:1908 , 线程pid:Thread-1 , message:Hello, world ! # 当前时间:2018-09-28 11:06:56.303275, 进程pid:1908 , 线程pid:Thread-2 , message:Hi, my friend ! # 当前时间:2018-09-28 11:06:56.303771, 进程pid:1908 , 线程pid:Thread-3 , message:Are you ok ?
使用多线程之后,就不会出现一个主线程运行的问题。
2、python定时任务之schedule
简单的schedule的定时任务如下:
例1:
import datetime import schedule import threading def task(): now = str(datetime.datetime.now()).split('.')[0] print('当前时间:%s, 进程Pid: %s, 线程pid:%s' % (now, os.getppid(), threading.current_thread().name)) if __name__ == '__main__': counter = 0 schedule.every(0.1).minutes.do(task) # 每0.1分钟运行一次,也就是6秒执行一次 while True: schedule.run_pending() # 当前时间:2018-09-28 16:23:48, 进程Pid: 11320, 线程pid:MainThread # 当前时间:2018-09-28 16:23:54, 进程Pid: 11320, 线程pid:MainThread # 当前时间:2018-09-28 16:24:00, 进程Pid: 11320, 线程pid:MainThread # 当前时间:2018-09-28 16:24:06, 进程Pid: 11320, 线程pid:MainThread # 当前时间:2018-09-28 16:24:12, 进程Pid: 11320, 线程pid:MainThread # schedule.every(5).minutes.do(task) 每5分钟运行 # schedule.every(1).hour.do(task) 每1小时运行 # schedule.every().day.at("9:00").do(task) 每天9:00的时候运行 # schedule.every(5).to(10).days.do(task) # schedule.every().monday.do(task) # schedule.every().wednesday.at("13:15").do(task)
schedule多线程定时任务与sched大同小异见例2
例2:
import os import time import schedule import threading import datetime def work(msg): now = str(datetime.datetime.now()) print('当前时间:%s, 进程pid:%s , 线程pid:%s , message:%s' % (now, os.getpid(), threading.current_thread().name, msg)) # os.getpid()获取当前进程id, os.getppid()获取父进程id def task(): for i in range(3): threading.Thread(target=work, args=(str(i),)).start() if __name__ == '__main__': now = str(datetime.datetime.now()).split('.')[0] print(now) schedule.every(5).seconds.do(task) while True: schedule.run_pending() # 2018-09-28 16:43:44 # 当前时间:2018-09-28 16:43:49.331639, 进程pid:11316 , 线程pid:Thread-1 , message:0 # 当前时间:2018-09-28 16:43:49.331639, 进程pid:11316 , 线程pid:Thread-2 , message:1 # 当前时间:2018-09-28 16:43:49.332136, 进程pid:11316 , 线程pid:Thread-3 , message:2
3、python定时任务之Timer
Timer的参数如下:
Time(interval, function, args=None, k=None)
interval — 时间间隔
function — 参数名
args — 默认参数,默认为None
interval — 关键字参数
简单使用如下:例1
import time import datetime import threading from threading import Timer def task(msg): time.sleep(7) now = str(datetime.datetime.now()).split('.')[0] print('当前时间:%s, Pid: %s, message:%s, Threading: %s' % (now, os.getpid(), msg, threading.current_thread().name)) if __name__ == '__main__': now = str(datetime.datetime.now()).split('.')[0] print(now) Timer(10, task, ('WORLD1',)).start() # 2018-09-28 16:48:05 # 当前时间:2018-09-28 16:48:22, Pid: 8572, message:WORLD1, Threading: Thread-1
threading中的Timer定时任务与sched、schedule显然不同,它本身已经封装好了多线程,在上例中可见,定时任务的函数是用子线程运行的,要用多线程那么多使用Timer调用任务就可以了。
-
python定时任务框架_Python定时任务框架APScheduler
2020-12-04 14:15:40APScheduler是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个... -
python 定时任务 全局变量_APScheduler-Python定时任务
2021-01-14 08:12:04它是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。2 安装使用 pip 包管理工具安装...