您的位置:

FlaskAPScheduler--轻量级的Python任务调度框架

一、FlaskAPScheduler安装

FlaskAPScheduler是一个轻量级的Python任务调度框架,它可以让我们在Flask中轻松实现后台任务的自动化调度。安装FlaskAPScheduler的方法有两种:

第一种方法,使用pip安装:

pip install Flask-APScheduler

第二种方法,从Github上下载源码,然后手动安装:

git clone https://github.com/APScheduler/flask-apscheduler.git
cd flask-apscheduler
python setup.py install

二、示例:每隔一分钟向服务器上报一次状态

下面,我们通过一个简单的示例来演示如何使用FlaskAPScheduler来实现每隔一分钟向服务器上报一次状态。

首先,在Flask应用创建的时候,我们需要创建一个APScheduler对象,然后将任务调度器初始化。代码如下:


from flask_apscheduler import APScheduler

scheduler = APScheduler()

def create_app():
    app = Flask(__name__)

    # 设置任务调度器的配置
    app.config['SCHEDULER_API_ENABLED'] = True
    app.config['JOBS'] = [
        {
            'id': 'report_status_job',
            'func': 'report_status',
            'args': (),
            'trigger': 'interval',
            'seconds': 60
        }
    ]

    # 将任务调度器注册到Flask应用中
    scheduler.init_app(app)
    scheduler.start()

    return app

在上面的代码中,我们首先从flask_apscheduler模块中导入APScheduler对象,然后创建一个scheduler对象。然后,我们在创建Flask应用的时候,将任务调度器的配置信息添加到应用的config中。其中,'SCHEDULER_API_ENABLED' 配置项的值为True,表示开启任务调度器的API(任务调度器API是一个web接口,可以使用它来管理任务)。

然后,我们定义了一个名为'report_status_job'的任务,每60秒执行一次。其中func参数是要执行的任务名,我们需要在项目中定义一个名为'report_status'的函数来实现任务逻辑。

接下来,在Flask应用中,我们定义'report_status'函数,用于实现向服务器上报状态的任务逻辑。代码如下:


from datetime import datetime

def report_status():
    status = {'timestamp': str(datetime.now()), 'status': 'working'}
    # 将状态上报给服务器
    print(status)

在'report_status'函数中,我们首先获取当前系统时间,并将其转换为字符串形式,并将状态信息打印到控制台。

最后,在Flask应用的路由中,我们定义了一个'/hello'接口,用于测试任务调度器是否正常工作。代码如下:


@app.route('/hello')
def hello():
    return 'hello, world!'

现在,我们重新启动Flask应用,让任务调度器开始工作。我们在浏览器中访问'http://127.0.0.1:5000/hello'接口,然后回到命令行中,可以看到每隔60秒会输出一条状态信息。

三、自定义任务

除了定时任务,我们还可以通过FlaskAPScheduler来实现自定义任务。

首先,我们需要在Flask应用中,定义一个任务。代码如下:


def my_task(arg1, arg2):
    # 任务逻辑
    print('arg1:', arg1)
    print('arg2:', arg2)

接下来,我们需要将任务添加到任务调度器中。代码如下:


scheduler.add_job(id='my_task', func=my_task, args=('hello', 'world'))

在上面的代码中,我们使用add_job方法来添加一个任务。其中,id参数是任务的ID,我们可以通过该ID来查找和管理任务;func参数是要执行的任务函数,args参数是任务函数的参数。

最后,我们需要启动任务调度器。代码如下:


scheduler.start()

现在,我们的自定义任务已经被添加到了任务调度器中,并且可以被调度执行了。

四、任务调度器API

任务调度器API是一个web接口,可以使用它来管理任务。下面,我们介绍一下任务调度器API的使用方法。

首先,我们需要在Flask应用中,开启任务调度器API。代码如下:


app.config['SCHEDULER_API_ENABLED'] = True

接下来,我们可以通过'/scheduler/jobs'接口来获取任务列表。代码如下:


import requests

url = 'http://127.0.0.1:5000/scheduler/jobs'
response = requests.get(url)
print(response.status_code)
print(response.json())

在上面的代码中,我们使用requests库来向'/scheduler/jobs'接口发送GET请求,获取任务列表。response.json()方法返回的是一个JSON格式的任务列表。

如果我们想要添加一个任务,可以通过'/scheduler/jobs'接口来添加。代码如下:


import json

data = {
    'id': 'my_task',
    'func': 'my_task',
    'args': ('hello', 'world'),
    'trigger': 'interval',
    'seconds': 60
}

headers = {'Content-Type': 'application/json'}
url = 'http://127.0.0.1:5000/scheduler/jobs'
response = requests.post(url, data=json.dumps(data), headers=headers)
print(response.status_code)
print(response.json())

在上面的代码中,我们使用requests库来发送POST请求,添加一个名为'my_task'的任务。其中,'func'参数是要执行的任务函数名;'args'参数是任务函数的参数列表;'trigger'参数是触发条件,'interval'表示每隔一段时间触发一次;'seconds'参数表示每隔60秒触发一次。