一、FlaskAPScheduler安装
FlaskAPScheduler是一个轻量级的Python任务调度框架,它可以让我们在Flask中轻松实现后台任务的自动化调度。安装FlaskAPScheduler的方法有两种:
第一种方法,使用pip安装:
pip install Flask-APScheduler
第二种方法,从Github上下载源码,然后手动安装:
git clone https://github.com/APScheduler/flask-apscheduler.git
cd flask-apscheduler
python setup.py install
二、示例:每隔一分钟向服务器上报一次状态
下面,我们通过一个简单的示例来演示如何使用FlaskAPScheduler来实现每隔一分钟向服务器上报一次状态。
首先,在Flask应用创建的时候,我们需要创建一个APScheduler对象,然后将任务调度器初始化。代码如下:
from flask_apscheduler import APScheduler
scheduler = APScheduler()
def create_app():
app = Flask(__name__)
# 设置任务调度器的配置
app.config['SCHEDULER_API_ENABLED'] = True
app.config['JOBS'] = [
{
'id': 'report_status_job',
'func': 'report_status',
'args': (),
'trigger': 'interval',
'seconds': 60
}
]
# 将任务调度器注册到Flask应用中
scheduler.init_app(app)
scheduler.start()
return app
在上面的代码中,我们首先从flask_apscheduler模块中导入APScheduler对象,然后创建一个scheduler对象。然后,我们在创建Flask应用的时候,将任务调度器的配置信息添加到应用的config中。其中,'SCHEDULER_API_ENABLED' 配置项的值为True,表示开启任务调度器的API(任务调度器API是一个web接口,可以使用它来管理任务)。
然后,我们定义了一个名为'report_status_job'的任务,每60秒执行一次。其中func参数是要执行的任务名,我们需要在项目中定义一个名为'report_status'的函数来实现任务逻辑。
接下来,在Flask应用中,我们定义'report_status'函数,用于实现向服务器上报状态的任务逻辑。代码如下:
from datetime import datetime
def report_status():
status = {'timestamp': str(datetime.now()), 'status': 'working'}
# 将状态上报给服务器
print(status)
在'report_status'函数中,我们首先获取当前系统时间,并将其转换为字符串形式,并将状态信息打印到控制台。
最后,在Flask应用的路由中,我们定义了一个'/hello'接口,用于测试任务调度器是否正常工作。代码如下:
@app.route('/hello')
def hello():
return 'hello, world!'
现在,我们重新启动Flask应用,让任务调度器开始工作。我们在浏览器中访问'http://127.0.0.1:5000/hello'接口,然后回到命令行中,可以看到每隔60秒会输出一条状态信息。
三、自定义任务
除了定时任务,我们还可以通过FlaskAPScheduler来实现自定义任务。
首先,我们需要在Flask应用中,定义一个任务。代码如下:
def my_task(arg1, arg2):
# 任务逻辑
print('arg1:', arg1)
print('arg2:', arg2)
接下来,我们需要将任务添加到任务调度器中。代码如下:
scheduler.add_job(id='my_task', func=my_task, args=('hello', 'world'))
在上面的代码中,我们使用add_job方法来添加一个任务。其中,id参数是任务的ID,我们可以通过该ID来查找和管理任务;func参数是要执行的任务函数,args参数是任务函数的参数。
最后,我们需要启动任务调度器。代码如下:
scheduler.start()
现在,我们的自定义任务已经被添加到了任务调度器中,并且可以被调度执行了。
四、任务调度器API
任务调度器API是一个web接口,可以使用它来管理任务。下面,我们介绍一下任务调度器API的使用方法。
首先,我们需要在Flask应用中,开启任务调度器API。代码如下:
app.config['SCHEDULER_API_ENABLED'] = True
接下来,我们可以通过'/scheduler/jobs'接口来获取任务列表。代码如下:
import requests
url = 'http://127.0.0.1:5000/scheduler/jobs'
response = requests.get(url)
print(response.status_code)
print(response.json())
在上面的代码中,我们使用requests库来向'/scheduler/jobs'接口发送GET请求,获取任务列表。response.json()方法返回的是一个JSON格式的任务列表。
如果我们想要添加一个任务,可以通过'/scheduler/jobs'接口来添加。代码如下:
import json
data = {
'id': 'my_task',
'func': 'my_task',
'args': ('hello', 'world'),
'trigger': 'interval',
'seconds': 60
}
headers = {'Content-Type': 'application/json'}
url = 'http://127.0.0.1:5000/scheduler/jobs'
response = requests.post(url, data=json.dumps(data), headers=headers)
print(response.status_code)
print(response.json())
在上面的代码中,我们使用requests库来发送POST请求,添加一个名为'my_task'的任务。其中,'func'参数是要执行的任务函数名;'args'参数是任务函数的参数列表;'trigger'参数是触发条件,'interval'表示每隔一段时间触发一次;'seconds'参数表示每隔60秒触发一次。