Python定时任务--apscheduler

APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:

triggers: 任务触发器组件,提供任务触发方式

job stores: 任务商店组件,提供任务保存方式

executors: 任务调度组件,提供任务调度方式

schedulers: 任务调度组件,提供任务工作方式

官网文档:https://apscheduler.readthedocs.io/en/3.3.1/

1.安装

$ pip install apscheduler # python3 使用pip3

2.简单实例

from apscheduler.schedulers.blocking import BlockingScheduler
import time

# 实例化一个调度器
scheduler = BlockingScheduler()

def job_task():
    print "%s: 执行任务"  % time.asctime()

# 添加任务并设置触发方式为3s一次
scheduler.add_job(job_task, 'interval', seconds=3)

# 开始运行调度器
scheduler.start()

输出:

$ python first.py
Fri Sep  8 20:41:55 2017: 执行任务
Fri Sep  8 20:41:58 2017: 执行任务
...

3.trigger组件

trigger提供任务的触发方式,共三种方式:

  • date:只在某个时间点执行一次run_date(datetime|str)

    scheduler.add_job(my_job, 'date', run_date=date(2019, 4, 19), args=[])
    scheduler.add_job(my_job, 'date', run_date=datetime(2019, 4, 19, 21, 30, 5), args=[])
    scheduler.add_job(my_job, 'date', run_date='2019-4-19 21:30:05', args=[])
    # The 'date' trigger and datetime.now() as run_date are implicit
    sched.add_job(my_job, args=[[])
  • interval: 每隔一段时间执行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None

scheduler.add_job(my_job, 'interval', hours=2)
scheduler.add_job(my_job, 'interval', hours=2, start_date='2019-4-19 21:30:00', end_date='2019-04-20 21:30:00)

@scheduler.scheduled_job('interval', id='my_job_id', hours=2)
def my_job():
    print("Hello World")
  • cron: 使用同linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
sched.add_job(my_job, 'cron', hour=3, minute=30)
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30')

@sched.scheduled_job('cron', id='my_job_id', day='last sun')
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

4.scheduler组件

scheduler组件提供执行的方式,在不同的运用环境中选择合适的方式

  • BlockingScheduler: 进程中只运行调度器时的方式
from apscheduler.schedulers.blocking import BlockingScheduler
import time

scheduler = BlockingScheduler()

def job1():
    print "%s: 执行任务"  % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()
  • BackgroundScheduler: 不想使用任何框架时的方式
from apscheduler.schedulers.background import BackgroundScheduler
import time

scheduler = BackgroundScheduler()

def job1():
    print "%s: 执行任务"  % time.asctime()

scheduler.add_job(job1, 'interval', seconds=3)
scheduler.start()

while True:
    pass
  • AsyncIOScheduler: asyncio module的方式(Python3)
from apscheduler.schedulers.asyncio import AsyncIOScheduler
try:
    import asyncio
except ImportError:
    import trollius as asyncio
...
...
# while True:pass 
try:
    asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
    pass
  • GeventScheduler: gevent方式
from apscheduler.schedulers.gevent import GeventScheduler

...
...

g = scheduler.start()
# while True:pass
try:
    g.join()
except (KeyboardInterrupt, SystemExit):
    pass
  • ornadoScheduler: Tornado方式
from tornado.ioloop import IOLoop
from apscheduler.schedulers.tornado import TornadoScheduler

...
...

# while True:pass
try:
    IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
    pass
  • TwistedScheduler: Twisted方式
from twisted.internet import reactor
from apscheduler.schedulers.twisted import TwistedScheduler

...
...

# while True:pass
try:
    reactor.run()
except (KeyboardInterrupt, SystemExit):
    pass
  • QtScheduler: Qt方式

5.executors组件

executors组件提供任务的调度方式

  • base
  • debug
  • gevent
  • pool(max_workers=10)
  • twisted

6.jobstore组件

jobstore提供任务的各种持久化方式

  • base
  • memory
  • mongodb scheduler.add_jobstore('mongodb', collection='example_jobs')
  • redis scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
  • rethinkdb scheduler.add_jobstore('rethinkdb', database='apscheduler_example')
  • sqlalchemy scheduler.add_jobstore('sqlalchemy', url=url)
  • zookeeper scheduler.add_jobstore('zookeeper', path='/example_jobs')

7.任务操作

  • 添加任务add_job 果使用了任务的存储,开启时最好添加replace_existing=True,否则每次开启都会创建任务的副本 开启后任务不会马上启动,可修改trigger参数

  • 删除任务remove_job

# 根据任务实例删除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()

# 根据任务id删除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
  • 任务的暂停pause_job和继续resume_job
job = scheduler.add_job(myfunc, 'interval', minutes=2)
# 根据任务实例
job.pause()
job.resume()

# 根据任务id暂停
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
  • 任务的修饰modify和重设reschedule_job
    修饰:job.modify(max_instances=6, name='Alternate name')
    重设:scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

8.调度器操作

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
声明:原创文章,版权所有,转载请注明出处,https://litets.com。