您的位置:

python任务队列框架,python队列的基本操作

本文目录一览:

Python实现简单多线程任务队列

Python实现简单多线程任务队列

最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):

defgradient_descent(): # the gradient descent code plotly.write(X, Y)

一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。

一种解决办法是每调用一次 plotly.write 函数就开启一个新的线程,但是这种方法感觉不是很好。 我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis 来持久化数据。

那用什么办法解决呢?我在 python 中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。

fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue):

首先我们继承 Queue.Queue 类。从 Queue.Queue 类可以继承 get 和 put 方法,以及队列的行为。

def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()

初始化的时候,我们可以不用考虑工作线程的数量。

defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))

我们把 task, args, kwargs 以元组的形式存储在队列中。*args 可以传递数量不等的参数,**kwargs 可以传递命名参数。

defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()

我们为每个 worker 创建一个线程,然后在后台删除。

下面是 worker 函数的代码:

defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()

worker 函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:

我们可以通过下面的代码测试:

defblokkah(*args,**kwargs): time.sleep(5) print“Blokkah mofo!” q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print“Alldone!”

Blokkah 是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。

defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)

修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。fromthreadingimportThreadimportQueueimporttime classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()

Python数据结构-队列与广度优先搜索(Queue)

队列(Queue) :简称为队,一种线性表数据结构,是一种只允许在表的一端进行插入操作,而在表的另一端进行删除操作的线性表。

我们把队列中允许插入的一端称为 「队尾(rear)」 ;把允许删除的另一端称为 「队头(front)」 。当表中没有任何数据元素时,称之为 「空队」 。

广度优先搜索算法(Breadth First Search) :简称为 BFS,又译作宽度优先搜索 / 横向优先搜索。是一种用于遍历或搜索树或图的算法。该算法从根节点开始,沿着树的宽度遍历树或图的节点。如果所有节点均被访问,则算法中止。

广度优先遍历 类似于树的层次遍历过程 。呈现出一层一层向外扩张的特点。先看到的节点先访问,后看到的节点后访问。遍历到的节点顺序符合「先进先出」的特点,所以广度优先搜索可以通过「队列」来实现。

力扣933

游戏时,队首始终是持有土豆的人

模拟游戏开始,队首的人出队,之后再到队尾(类似于循环队列)

传递了num次之后,将队首的人移除

如此反复,直到队列中剩余一人

多人共用一台打印机,采取“先到先服务”的队列策略来执行打印任务

需要解决的问题:1 打印系统的容量是多少?2 在能够接受的等待时间内,系统可容纳多少用户以多高的频率提交打印任务?

输入:abba

输出:False

思路:1 先将需要判定的词从队尾加入 deque; 2从两端同时移除字符并判断是否相同,直到deque中剩余0个(偶数)或1个字符(奇数)

内容参考:

Python 异步任务队列Celery 使用

在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思。在工头(生产者)提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农(消费者)等着取出一个个任务准备着手做。这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

其实现架构如下图所示:

可以看到,Celery 主要包含以下几个模块:

celery可以通过pip自动安装。

broker 可选择使用RabbitMQ/redis,backend可选择使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安装请参考对应的官方文档。

------------------------------rabbitmq相关----------------------------------------------------------

官网安装方法:

启动管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 启动rabbitmq:sbin/rabbitmq-server -detached

rabbitmq已经启动,可以打开页面来看看 地址:

用户名密码都是guest 。进入可以看到具体页面。 关于rabbitmq的配置,网上很多 自己去搜以下就ok了。

------------------------------rabbitmq相关--------------------------------------------------------

项目结构如下:

使用前,需要三个方面:celery配置,celery实例,需执行的任务函数,如下:

Celery 的配置比较多,可以在 官方配置文档: 查询每个配置项的含义。

当然,要保证上述异步任务and下述定时任务都能正常执行,就需要先启动celery worker,启动命令行如下:

需 启动beat ,执行定时任务时, Celery会通过celery beat进程来完成。Celery beat会保持运行, 一旦到了某一定时任务需要执行时, Celery beat便将其加入到queue中. 不像worker进程, Celery beat只需要一个即可。而且为了避免有重复的任务被发送出去,所以Celery beat仅能有一个。

命令行启动:

如果你想将celery worker/beat要放到后台运行,推荐可以扔给supervisor。

supervisor.conf如下:

python爬虫用什么框架

python爬虫框架概述

爬虫框架中比较好用的是 Scrapy 和PySpider。pyspider上手更简单,操作更加简便,因为它增加了 WEB 界面,写爬虫迅速,集成了phantomjs,可以用来抓取js渲染的页面。Scrapy自定义程度高,比 PySpider更底层一些,适合学习研究,需要学习的相关知识多,不过自己拿来研究分布式和多线程等等是非常合适的。

PySpider

PySpider是binux做的一个爬虫架构的开源化实现。主要的功能需求是:

抓取、更新调度多站点的特定的页面

需要对页面进行结构化信息提取

灵活可扩展,稳定可监控

pyspider的设计基础是:以python脚本驱动的抓取环模型爬虫

通过python脚本进行结构化信息的提取,follow链接调度抓取控制,实现最大的灵活性

通过web化的脚本编写、调试环境。web展现调度状态

抓取环模型成熟稳定,模块间相互独立,通过消息队列连接,从单进程到多机分布式灵活拓展

pyspider的架构主要分为 scheduler(调度器), fetcher(抓取器), processor(脚本执行):

各个组件间使用消息队列连接,除了scheduler是单点的,fetcher 和 processor 都是可以多实例分布式部署的。 scheduler 负责整体的调度控制

任务由 scheduler 发起调度,fetcher 抓取网页内容, processor 执行预先编写的python脚本,输出结果或产生新的提链任务(发往 scheduler),形成闭环。

每个脚本可以灵活使用各种python库对页面进行解析,使用框架API控制下一步抓取动作,通过设置回调控制解析动作。

Scrapy

Scrapy是一个为了爬取网站数据,提取结构性数据而编写的应用框架。 可以应用在包括数据挖掘,信息处理或存储历史数据等一系列的程序中。

其最初是为了页面抓取 (更确切来说, 网络抓取 )所设计的, 也可以应用在获取API所返回的数据(例如 Amazon Associates Web Services ) 或者通用的网络爬虫。Scrapy用途广泛,可以用于数据挖掘、监测和自动化测试

Scrapy主要包括了以下组件:

引擎(Scrapy): 用来处理整个系统的数据流处理, 触发事务(框架核心)

调度器(Scheduler): 用来接受引擎发过来的请求, 压入队列中, 并在引擎再次请求的时候返回. 可以想像成一个URL(抓取网页的网址或者说是链接)的优先队列, 由它来决定下一个要抓取的网址是什么, 同时去除重复的网址

下载器(Downloader): 用于下载网页内容, 并将网页内容返回给蜘蛛(Scrapy下载器是建立在twisted这个高效的异步模型上的)

爬虫(Spiders): 爬虫是主要干活的, 用于从特定的网页中提取自己需要的信息, 即所谓的实体(Item)。用户也可以从中提取出链接,让Scrapy继续抓取下一个页面

项目管道(Pipeline): 负责处理爬虫从网页中抽取的实体,主要的功能是持久化实体、验证实体的有效性、清除不需要的信息。当页面被爬虫解析后,将被发送到项目管道,并经过几个特定的次序处理数据。

下载器中间件(Downloader Middlewares): 位于Scrapy引擎和下载器之间的框架,主要是处理Scrapy引擎与下载器之间的请求及响应。

爬虫中间件(Spider Middlewares): 介于Scrapy引擎和爬虫之间的框架,主要工作是处理蜘蛛的响应输入和请求输出。

调度中间件(Scheduler Middewares): 介于Scrapy引擎和调度之间的中间件,从Scrapy引擎发送到调度的请求和响应。

Scrapy运行流程大概如下:

首先,引擎从调度器中取出一个链接(URL)用于接下来的抓取

引擎把URL封装成一个请求(Request)传给下载器,下载器把资源下载下来,并封装成应答包(Response)

然后,爬虫解析Response

若是解析出实体(Item),则交给实体管道进行进一步的处理。

若是解析出的是链接(URL),则把URL交给Scheduler等待抓取

python多任务之进程队列queen

python的多进程之间无法用全局变量,需要只用队列queen进行通讯。

1. 创建。q=multiprocessing.Queen(num),num最大存放多少数据

2.进程使用队列,需要在创建进程时做为参数传进去。p=multiprocessing.Process(target=fun_name,args=(q,))

3.队列使用。队列是先进先出的,p.put(任何数据类型),放进数据,当队列满时会进程会堵塞等待。p.get()取出数据,当队列中无数据是,进程会堵塞等待。p.full()是否已满,p.empty()是否空了。