本文目录一览:
python异步有哪些方式
yield相当于return,他将相应的值返回给调用next()或者send()的调用者,从而交出了CPU使用权,而当调用者再次调用next()或者send()的时候,又会返回到yield中断的地方,如果send有参数,还会将参数返回给yield赋值的变量,如果没有就和next()一样赋值为None。但是这里会遇到一个问题,就是嵌套使用generator时外层的generator需要写大量代码,看如下示例:
注意以下代码均在Python3.6上运行调试
#!/usr/bin/env python# encoding:utf-8def inner_generator():
i = 0
while True:
i = yield i if i 10: raise StopIterationdef outer_generator():
print("do something before yield")
from_inner = 0
from_outer = 1
g = inner_generator()
g.send(None) while 1: try:
from_inner = g.send(from_outer)
from_outer = yield from_inner except StopIteration: breakdef main():
g = outer_generator()
g.send(None)
i = 0
while 1: try:
i = g.send(i + 1)
print(i) except StopIteration: breakif __name__ == '__main__':
main()1234567891011121314151617181920212223242526272829303132333435363738394041
为了简化,在Python3.3中引入了yield from
yield from
使用yield from有两个好处,
1、可以将main中send的参数一直返回给最里层的generator,
2、同时我们也不需要再使用while循环和send (), next()来进行迭代。
我们可以将上边的代码修改如下:
def inner_generator():
i = 0
while True:
i = yield i if i 10: raise StopIterationdef outer_generator():
print("do something before coroutine start") yield from inner_generator()def main():
g = outer_generator()
g.send(None)
i = 0
while 1: try:
i = g.send(i + 1)
print(i) except StopIteration: breakif __name__ == '__main__':
main()1234567891011121314151617181920212223242526
执行结果如下:
do something before coroutine start123456789101234567891011
这里inner_generator()中执行的代码片段我们实际就可以认为是协程,所以总的来说逻辑图如下:
接下来我们就看下究竟协程是啥样子
协程coroutine
协程的概念应该是从进程和线程演变而来的,他们都是独立的执行一段代码,但是不同是线程比进程要轻量级,协程比线程还要轻量级。多线程在同一个进程中执行,而协程通常也是在一个线程当中执行。它们的关系图如下:
我们都知道Python由于GIL(Global Interpreter Lock)原因,其线程效率并不高,并且在*nix系统中,创建线程的开销并不比进程小,因此在并发操作时,多线程的效率还是受到了很大制约的。所以后来人们发现通过yield来中断代码片段的执行,同时交出了cpu的使用权,于是协程的概念产生了。在Python3.4正式引入了协程的概念,代码示例如下:
import asyncio# Borrowed from countdown(number, n):
while n 0:
print('T-minus', n, '({})'.format(number)) yield from asyncio.sleep(1)
n -= 1loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(countdown("A", 2)),
asyncio.ensure_future(countdown("B", 3))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()12345678910111213141516
示例显示了在Python3.4引入两个重要概念协程和事件循环,
通过修饰符@asyncio.coroutine定义了一个协程,而通过event loop来执行tasks中所有的协程任务。之后在Python3.5引入了新的async await语法,从而有了原生协程的概念。
async await
在Python3.5中,引入了ayncawait 语法结构,通过”aync def”可以定义一个协程代码片段,作用类似于Python3.4中的@asyncio.coroutine修饰符,而await则相当于”yield from”。
先来看一段代码,这个是我刚开始使用asyncawait语法时,写的一段小程序。
#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time
async def wait_download(url):
response = await requets.get(url)
print("get {} response complete.".format(url))
async def main():
start = time.time()
await asyncio.wait([
wait_download(""),
wait_download(""),
wait_download("")])
end = time.time()
print("Complete in {} seconds".format(end - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())12345678910111213141516171819202122232425
这里会收到这样的报错:
Task exception was never retrieved
future: Task finished coro=wait_download() done, defined at asynctest.py:9 exception=TypeError("object Response can't be used in 'await' expression",)
Traceback (most recent call last):
File "asynctest.py", line 10, in wait_download
data = await requests.get(url)
TypeError: object Response can't be used in 'await' expression123456
这是由于requests.get()函数返回的Response对象不能用于await表达式,可是如果不能用于await,还怎么样来实现异步呢?
原来Python的await表达式是类似于”yield from”的东西,但是await会去做参数检查,它要求await表达式中的对象必须是awaitable的,那啥是awaitable呢? awaitable对象必须满足如下条件中其中之一:
1、A native coroutine object returned from a native coroutine function .
原生协程对象
2、A generator-based coroutine object returned from a function decorated with types.coroutine() .
types.coroutine()修饰的基于生成器的协程对象,注意不是Python3.4中asyncio.coroutine
3、An object with an await method returning an iterator.
实现了await method,并在其中返回了iterator的对象
根据这些条件定义,我们可以修改代码如下:
#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time
async def download(url): # 通过async def定义的函数是原生的协程对象
response = requests.get(url)
print(response.text)
async def wait_download(url):
await download(url) # 这里download(url)就是一个原生的协程对象
print("get {} data complete.".format(url))
async def main():
start = time.time()
await asyncio.wait([
wait_download(""),
wait_download(""),
wait_download("")])
end = time.time()
print("Complete in {} seconds".format(end - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())123456789101112131415161718192021222324252627282930
好了现在一个真正的实现了异步编程的小程序终于诞生了。
而目前更牛逼的异步是使用uvloop或者pyuv,这两个最新的Python库都是libuv实现的,可以提供更加高效的event loop。
uvloop和pyuv
pyuv实现了Python2.x和3.x,但是该项目在github上已经许久没有更新了,不知道是否还有人在维护。
uvloop只实现了3.x, 但是该项目在github上始终活跃。
它们的使用也非常简单,以uvloop为例,只需要添加以下代码就可以了
import asyncioimport uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())123
python里怎么实现异步调用
本文实例讲述了python使用multiprocessing模块实现带回调函数的异步调用方法。分享给大家供大家参考。具体分析如下:
multipressing模块是python 2.6版本加入的,通过这个模块可以轻松实现异步调用
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=1)
# Start a worker processes.
result = pool.apply_async(f, [10], callback)
# Evaluate "f(10)" asynchronously calling callback when finished.
希望本文所述对大家的Python程序设计有所帮助。
python 异步是什么意思
异步是计算机多线程的异步处理。与同步处理相对,异步处理不用阻塞当前线程来等待处理完成,而是允许后续操作,直至其它线程将处理完成,并回调通知此线程。
Python 异步任务队列Celery 使用
在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思。在工头(生产者)提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农(消费者)等着取出一个个任务准备着手做。这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。
其实现架构如下图所示:
可以看到,Celery 主要包含以下几个模块:
celery可以通过pip自动安装。
broker 可选择使用RabbitMQ/redis,backend可选择使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安装请参考对应的官方文档。
------------------------------rabbitmq相关----------------------------------------------------------
官网安装方法:
启动管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 启动rabbitmq:sbin/rabbitmq-server -detached
rabbitmq已经启动,可以打开页面来看看 地址:
用户名密码都是guest 。进入可以看到具体页面。 关于rabbitmq的配置,网上很多 自己去搜以下就ok了。
------------------------------rabbitmq相关--------------------------------------------------------
项目结构如下:
使用前,需要三个方面:celery配置,celery实例,需执行的任务函数,如下:
Celery 的配置比较多,可以在 官方配置文档: 查询每个配置项的含义。
当然,要保证上述异步任务and下述定时任务都能正常执行,就需要先启动celery worker,启动命令行如下:
需 启动beat ,执行定时任务时, Celery会通过celery beat进程来完成。Celery beat会保持运行, 一旦到了某一定时任务需要执行时, Celery beat便将其加入到queue中. 不像worker进程, Celery beat只需要一个即可。而且为了避免有重复的任务被发送出去,所以Celery beat仅能有一个。
命令行启动:
如果你想将celery worker/beat要放到后台运行,推荐可以扔给supervisor。
supervisor.conf如下:
Python异步编程全攻略
如果你厌倦了多线程,不妨试试python的异步编程,再引入async, await关键字之后语法变得更加简洁和直观,又经过几年的生态发展,现在是一个很不错的并发模型。
下面介绍一下python异步编程的方方面面。
因为GIL的存在,所以Python的多线程在CPU密集的任务下显得无力,但是对于IO密集的任务,多线程还是足以发挥多线程的优势的,而异步也是为了应对IO密集的任务,所以两者是一个可以相互替代的方案,因为设计的不同,理论上异步要比多线程快,因为异步的花销更少, 因为不需要额外系统申请额外的内存,而线程的创建跟系统有关,需要分配一定量的内存,一般是几兆,比如linux默认是8MB。
虽然异步很好,比如可以使用更少的内存,比如更好地控制并发(也许你并不这么认为:))。但是由于async/await 语法的存在导致与之前的语法有些割裂,所以需要适配,需要付出额外的努力,再者就是生态远远没有同步编程强大,比如很多库还不支持异步,所以你需要一些额外的适配。
为了不给其他网站带来困扰,这里首先在自己电脑启动web服务用于测试,代码很简单。
本文所有依赖如下:
所有依赖可通过代码仓库的requirements.txt一次性安装。
首先看一个错误的例子
输出如下:
发现花费了3秒,不符合预期呀。。。。这是因为虽然用了协程,但是每个协程是串行的运行,也就是说后一个等前一个完成之后才开始,那么这样的异步代码并没有并发,所以我们需要让这些协程并行起来
为了让代码变动的不是太多,所以这里用了一个笨办法来等待所有任务完成, 之所以在main函数中等待是为了不让ClientSession关闭, 如果你移除了main函数中的等待代码会发现报告异常 RuntimeError: Session is closed ,而代码里的解决方案非常的不优雅,需要手动的等待,为了解决这个问题,我们再次改进代码。
这里解决的方式是通过 asyncio.wait 方法等待一个协程列表,默认是等待所有协程结束后返回,会返回一个完成(done)列表,以及一个待办(pending)列表。
如果我们不想要协程对象而是结果,那么我们可以使用 asyncio.gather
结果输出如下:
通过 asyncio.ensure_future 我们就能创建一个协程,跟调用一个函数差别不大,为了等待所有任务完成之后退出,我们需要使用 asyncio.wait 等方法来等待,如果只想要协程输出的结果,我们可以使用 asyncio.gather 来获取结果。
虽然前面能够随心所欲的创建协程,但是就像多线程一样,我们也需要处理协程之间的同步问题,为了保持语法及使用情况的一致,多线程中用到的同步功能,asyncio中基本也能找到, 并且用法基本一致,不一致的地方主要是需要用异步的关键字,比如 async with/ await 等
通过锁让并发慢下来,让协程一个一个的运行。
输出如下:
通过观察很容易发现,并发的速度因为锁而慢下来了,因为每次只有一个协程能获得锁,所以并发变成了串行。
通过事件来通知特定的协程开始工作,假设有一个任务是根据http响应结果选择是否激活。
输出如下:
可以看到事件(Event)等待者都是在得到响应内容之后输出,并且事件(Event)可以是多个协程同时等待。
上面的事件虽然很棒,能够在不同的协程之间同步状态,并且也能够一次性同步所有的等待协程,但是还不够精细化,比如想通知指定数量的等待协程,这个时候Event就无能为力了,所以同步原语中出现了Condition。
输出如下:
可以看到,前面两个等待的协程是在同一时刻完成,而不是全部等待完成。
通过创建协程的数量来控制并发并不是非常优雅的方式,所以可以通过信号量的方式来控制并发。
输出如下:
可以发现,虽然同时创建了三个协程,但是同一时刻只有两个协程工作,而另外一个协程需要等待一个协程让出信号量才能运行。
无论是协程还是线程,任务之间的状态同步还是很重要的,所以有了应对各种同步机制的同步原语,因为要保证一个资源同一个时刻只能一个任务访问,所以引入了锁,又因为需要一个任务等待另一个任务,或者多个任务等待某个任务,因此引入了事件(Event),但是为了更精细的控制通知的程度,所以又引入了条件(Condition), 通过条件可以控制一次通知多少的任务。
有时候的并发需求是通过一个变量控制并发任务的并发数而不是通过创建协程的数量来控制并发,所以引入了信号量(Semaphore),这样就可以在创建的协程数远远大于并发数的情况下让协程在指定的并发量情况下并发。
不得不承认异步编程相比起同步编程的生态要小的很多,所以不可能完全异步编程,因此需要一种方式兼容。
多线程是为了兼容同步得代码。
多进程是为了利用CPU多核的能力。
输出如下:
可以看到总耗时1秒,说明所有的线程跟进程是同时运行的。
下面是本人使用过的一些异步库,仅供参考
web框架
http客户端
数据库
ORM
虽然异步库发展得还算不错,但是中肯的说并没有覆盖方方面面。
虽然我鼓励大家尝试异步编程,但是本文的最后却是让大家谨慎的选择开发环境,如果你觉得本文的并发,同步,兼容多线程,多进程不值得一提,那么我十分推荐你尝试以异步编程的方式开始一个新的项目,如果你对其中一些还有疑问或者你确定了要使用的依赖库并且大多数是没有异步库替代的,那么我还是建议你直接按照自己擅长的同步编程开始。
异步编程虽然很不错,不过,也许你并不需要。