您的位置:

scheduled每分钟执行一次的详细阐述

一、基本概念

在编写程序时,我们经常需要在规定的时间点运行代码来执行某些任务。其中,scheduled任务允许我们预定义任务执行的时间和频率。

scheduled任务通常是指重复执行的什么任务,在一定的时间间隔内执行,即使脚本正在运行也是如此。这种方式可以被广泛应用,如清理无用文件、发生邮件、备份文件等。

每分钟执行一次是其中一种常见的方式。下面是一个基本的示例代码:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(1).minutes.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

在这个示例中,定义了一个函数job(),并使用schedule库中的every()方法指示每隔1分钟执行该函数。在while循环中,schedule.run_pending()会让schedule定时任务尽可能运行,time.sleep(1)是为了确保Python不会在构建旋转调度器时占用太多CPU。

二、任务定时管理

scheduled任务的管理可以使用schedule库,该库可以让我们方便地添加任务、修改任务、删除任务和查询任务。下面从这几个方面详细阐述如何管理scheduled任务。

1. 添加任务

使用schedule库添加任务时,可以使用每个线程间隔的时间来定义任务执行的特定时间。可以使用以下不同的参数定义任务:

a. interval:任务的运行间隔。

b. start_time:首次运行任务的时间,可以是datetime、date或time对象。

c. end_time:可选的任务结束时间,可以是datetime、date或time对象。

d. last_run_at:任务上次运行的时间。

e. next_run_at:任务下次运行的时间。

因此,在定义新任务时,需要给定任务名称、任务执行函数和任务执行时间间隔。下面是一个添加任务的示例代码:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(1).minutes.do(job).tag('task1')

while True:
    schedule.run_pending()
    time.sleep(1)

2. 修改任务

一旦我们添加了一个任务,就可以根据需要修改该任务。在schedule库中,我们可以使用label来唯一地标识每个任务。下面是一个修改任务的示例代码:

import schedule
import time

def job():
    print("I'm working...")

j = schedule.every(1).minutes.do(job).tag('task1')
j.interval(2)

while True:
    schedule.run_pending()
    time.sleep(1)

3. 删除任务

我们还可以根据具体的需求从schedule调度器中删除任务。删除任务的操作比添加任务和修改任务操作更简单,只需要调用schedule的cancel()方法,如下所示:

import schedule
import time

def job():
    print("I'm working...")

j = schedule.every(1).minutes.do(job).tag('task1')

schedule.cancel_job(j)

while True:
    schedule.run_pending()
    time.sleep(1)

4. 查询任务

如果要查询所有正在运行的任务,可以用schedule的下面方法来输出所有被调用过的函数列表:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(1).minutes.do(job)

while True:
    print(schedule.jobs) # 输出任务列表
    schedule.run_pending()
    time.sleep(1)

三、任务执行情况监控

如果一个脚本需要长时间执行,我们需要对执行情况进行监控。schedule库为这个目的提供了一些内置功能。例如,我们可以使用schedule的on_success()回调函数记录每次任务成功执行的时间。下面是一个监控任务执行情况的示例代码:

import schedule
import time

def job():
    print("I'm working...")

def on_success(job):
    print('Task {0} ran successfully at {1}'.format(job.name, datetime.datetime.now()))

schedule.every(1).minutes.do(job).tag('task1')
schedule.every().day.at("10:30").do(lambda: on_success(), job)

while True:
    schedule.run_pending()
    time.sleep(1)

在上面的代码中,on_success()函数将在每次成功运行任务时执行。如果您希望在执行任务时发生错误,可以使用on_error()回调函数。

四、总结

在本文中,我们详细介绍了如何使用Python中的schedule库来管理和监控scheduled任务。我们首先介绍了scheduled任务的概念,然后讨论了如何添加、修改、删除和查询任务。最后,我们提供了一个例子,演示了如何使用回调函数来监视任务的执行情况。