本文目录一览:
- 1、Python中的多进程与多线程/分布式该如何使用
- 2、python基础(21)-线程通信
- 3、Python多线程总结
- 4、Python中的线程池是什么
- 5、小白都看懂了,Python 中的线程和进程精讲,建议收藏
Python中的多进程与多线程/分布式该如何使用
Python提供了非常好用的多进程包multiprocessing,你只需要定义一个函数,Python会替你完成其他所有事情。
借助这个包,可以轻松完成从单进程到并发执行的转换。
1、新建单一进程
如果我们新建少量进程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用进程池
是的,你没有看错,不是线程池。它可以让你跑满多核CPU,而且使用方法非常简单。
注意要用apply_async,如果落下async,就变成阻塞版本了。
processes=4是最多并发进程数量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,并需要关注结果
更多的时候,我们不仅需要多进程执行,还需要关注每个进程的执行结果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."1234567891011121314151617181920
2014.12.25更新
根据网友评论中的反馈,在Windows下运行有可能崩溃(开启了一大堆新窗口、进程),可以通过如下调用来解决:
multiprocessing.freeze_support()1
附录(自己的脚本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用进程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加线程到线程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #创建多线程任务
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待线程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break
python基础(21)-线程通信
到这里,我们要聊一下线程通信的内容;
首先,我们抛开语言不谈,先看看比较基础的东西,线程间通信的方式;其实也就是哪几种(我这里说的,是我的所谓的知道的。。。)事件,消息队列,信号量,条件变量(锁算不算?我只是认为是同步的一种);所以我们也就是要把这些掌握了,因为各有各的好处嘛;
条件变量我放到了上面的线程同步里面讲了,我总感觉这算是同步的一种,没有很多具体信息的沟通;同时吧,我认为条件变量比较重要,因为这种可以应用于线程池的操作上;所以比较重要;这里,抛开条件变量不谈,我们看看其他的东西;
1、消息队列:
queue 模块下提供了几个阻塞队列,这些队列主要用于实现线程通信。在 queue 模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。
关于这三个队列类的简单介绍如下:
queue.Queue(maxsize=0):代表 FIFO(先进先出)的常规队列,maxsize 可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时就会被阻塞,直到队列中的元素被消费。如果将 maxsize 设置为 0 或负数,则该队列的大小就是无限制的。
queue.LifoQueue(maxsize=0):代表 LIFO(后进先出)的队列,与 Queue 的区别就是出队列的顺序不同。
PriorityQueue(maxsize=0):代表优先级队列,优先级最小的元素先出队列。
这三个队列类的属性和方法基本相同, 它们都提供了如下属性和方法:
Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素。
Queue.empty():判断队列是否为空。
Queue.full():判断队列是否已满。
Queue.put(item, block=True, timeout=None):向队列中放入元素。如果队列己满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到该队列的元素被消费;如果队列己满,且 block 参数为 False(不阻塞),则直接引发 queue.FULL 异常。
Queue.put_nowait(item):向队列中放入元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
Queue.get(item, block=True, timeout=None):从队列中取出元素(消费元素)。如果队列已满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到有元素被放入队列中; 如果队列己空,且 block 参数为 False(不阻塞),则直接引发 queue.EMPTY 异常。
Queue.get_nowait(item):从队列中取出元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
其实我们想想,这个队列,是python进行封装的,那么我们可以用在线程间的通信;同时也是可以用做一个数据结构;先进先出就是队列,后进先出就是栈;我们用这个栈写个十进制转二进制的例子:
没毛病,可以正常的打印;其中需要注意的就是,maxsize在初始化的时候如果是0或者是个负数的话,那么就会是不限制大小;
那么其实我们想想,我们如果用做线程通信的话,我们两个线程,可以把队列设置为1的大小,如果是1对多,比如是创建者和消费者的关系,我们完全可以作为消息队列,比如说创建者一直在创建一些东西,然后放入到消息队列里面,然后供消费着使用;就是一个很好的例子;所以,其实说是消息队列,也就是队列,没差;
=====================================================================
下面来看一下事件
Event 是一种非常简单的线程通信机制,一个线程发出一个 Event,另一个线程可通过该 Event 被触发。
Event 本身管理一个内部旗标,程序可以通过 Event 的 set() 方法将该旗标设置为 True,也可以调用 clear() 方法将该旗标设置为 False。程序可以调用 wait() 方法来阻塞当前线程,直到 Event 的内部旗标被设置为 True。
Event 提供了如下方法:
is_set():该方法返回 Event 的内部旗标是否为True。
set():该方法将会把 Event 的内部旗标设置为 True,并唤醒所有处于等待状态的线程。
clear():该方法将 Event 的内部旗标设置为 False,通常接下来会调用 wait() 方法来阻塞当前线程。
wait(timeout=None):该方法会阻塞当前线程。
这里我想解释一下;其实对于事件来说,事件可以看成和条件变量是一样的,只是我们说说不一样的地方;
1、对于事件来说,一旦触发了事件,也就是说,一旦set为true了,那么就会一直为true,需要clear调内部的标志,才能继续wait;但是conditon不是,他是一次性的唤醒其他线程;
2、conditon自己带锁;事件呢?不是的;没有自己的锁;比如说有一个存钱的线程,有一个是取钱的线程;那么存钱的线程要存钱;需要怎么办呢?1、发现银行没有钱了(is_set判断);2、锁住银行;3、存钱;4、释放银行;5、唤醒事件;对于取钱的人;1、判断是否有钱;2、被唤醒了,然后锁住银行;3、开始取钱;4、清理告诉存钱的人,我没钱了(clear);5、释放锁;6、等着钱存进去;
其实说白了,就是记住一点;这个旗标需要自己clear就对了
写个例子,怕以后忘了怎么用;
其实时间和信号量比较像;但是信号量不用自己清除标志位;但是事件是需要的;
Python多线程总结
在实际处理数据时,因系统内存有限,我们不可能一次把所有数据都导出进行操作,所以需要批量导出依次操作。为了加快运行,我们会采用多线程的方法进行数据处理, 以下为我总结的多线程批量处理数据的模板:
主要分为三大部分:
共分4部分对多线程的内容进行总结。
先为大家介绍线程的相关概念:
在飞车程序中,如果没有多线程,我们就不能一边听歌一边玩飞车,听歌与玩 游戏 不能并行;在使用多线程后,我们就可以在玩 游戏 的同时听背景音乐。在这个例子中启动飞车程序就是一个进程,玩 游戏 和听音乐是两个线程。
Python 提供了 threading 模块来实现多线程:
因为新建线程系统需要分配资源、终止线程系统需要回收资源,所以如果可以重用线程,则可以减去新建/终止的开销以提升性能。同时,使用线程池的语法比自己新建线程执行线程更加简洁。
Python 为我们提供了 ThreadPoolExecutor 来实现线程池,此线程池默认子线程守护。它的适应场景为突发性大量请求或需要大量线程完成任务,但实际任务处理时间较短。
其中 max_workers 为线程池中的线程个数,常用的遍历方法有 map 和 submit+as_completed 。根据业务场景的不同,若我们需要输出结果按遍历顺序返回,我们就用 map 方法,若想谁先完成就返回谁,我们就用 submit+as_complete 方法。
我们把一个时间段内只允许一个线程使用的资源称为临界资源,对临界资源的访问,必须互斥的进行。互斥,也称间接制约关系。线程互斥指当一个线程访问某临界资源时,另一个想要访问该临界资源的线程必须等待。当前访问临界资源的线程访问结束,释放该资源之后,另一个线程才能去访问临界资源。锁的功能就是实现线程互斥。
我把线程互斥比作厕所包间上大号的过程,因为包间里只有一个坑,所以只允许一个人进行大号。当第一个人要上厕所时,会将门上上锁,这时如果第二个人也想大号,那就必须等第一个人上完,将锁解开后才能进行,在这期间第二个人就只能在门外等着。这个过程与代码中使用锁的原理如出一辙,这里的坑就是临界资源。 Python 的 threading 模块引入了锁。 threading 模块提供了 Lock 类,它有如下方法加锁和释放锁:
我们会发现这个程序只会打印“第一道锁”,而且程序既没有终止,也没有继续运行。这是因为 Lock 锁在同一线程内第一次加锁之后还没有释放时,就进行了第二次 acquire 请求,导致无法执行 release ,所以锁永远无法释放,这就是死锁。如果我们使用 RLock 就能正常运行,不会发生死锁的状态。
在主线程中定义 Lock 锁,然后上锁,再创建一个子 线程t 运行 main 函数释放锁,结果正常输出,说明主线程上的锁,可由子线程解锁。
如果把上面的锁改为 RLock 则报错。在实际中设计程序时,我们会将每个功能分别封装成一个函数,每个函数中都可能会有临界区域,所以就需要用到 RLock 。
一句话总结就是 Lock 不能套娃, RLock 可以套娃; Lock 可以由其他线程中的锁进行操作, RLock 只能由本线程进行操作。
Python中的线程池是什么
多线程的做法是,可以同时创建多个线程放入等待执行的序列中。某个线程执行完毕就将它从序列中移除并销毁。不然的话,即时创建,然后就一定要等到它销毁,那这不是多线程,这是单线程.
进程中根据需要,为一些需要慢资源、竟争性资源的任务创建线程,排除等候执行。
线程需要等待分配,如果短时间建立了多个线程,哪个线程先开始执行,由调度程序决定;
...调度执行循环
当线程执行完毕,销毁线程。
比如说,下载图片:我有一个列表,记录了要下载的300张图片的URL。每个图片的来源可能是不同网站(服务器)。那么,主循环里只需要创建300个【下载】线程。每个线程负责一个URL的下载任务。
然后,调序程序开始调度:线程1有数据过来了,分配时间片给线程1处理这段数据...线程n执行完毕,销毁线程n...线程1又有数据过来了,分配时间片给线程1处理这段数据......销毁线程n,没有等待中的线程,调度暂停。
于是,所有的图片下载完了。
这个过程与单线程的不同是,在多线程中,最开始同一时间有300个请求在等待若干个服务器返回数据,而单线程则总是只有一个请求在等待服务器返回数据或者正在处理数据,另外299个请求根本不存在。
这才是多线程与单线程最主要的差别:在等待某个资源的时候,把其它资源给别的线程去使用。
小白都看懂了,Python 中的线程和进程精讲,建议收藏
目录
众所周知,CPU是计算机的核心,它承担了所有的计算任务。而操作系统是计算机的管理者,是一个大管家,它负责任务的调度,资源的分配和管理,统领整个计算机硬件。应用程序是具有某种功能的程序,程序运行与操作系统之上
在很早的时候计算机并没有线程这个概念,但是随着时代的发展,只用进程来处理程序出现很多的不足。如当一个进程堵塞时,整个程序会停止在堵塞处,并且如果频繁的切换进程,会浪费系统资源。所以线程出现了
线程是能拥有资源和独立运行的最小单位,也是程序执行的最小单位。一个进程可以拥有多个线程,而且属于同一个进程的多个线程间会共享该进行的资源
① 200 多本 Python 电子书(和经典的书籍)应该有
② Python标准库资料(最全中文版)
③ 项目源码(四五十个有趣且可靠的练手项目及源码)
④ Python基础入门、爬虫、网络开发、大数据分析方面的视频(适合小白学习)
⑤ Python学习路线图(告别不入流的学习)
私信我01即可获取大量Python学习资源
进程时一个具有一定功能的程序在一个数据集上的一次动态执行过程。进程由程序,数据集合和进程控制块三部分组成。程序用于描述进程要完成的功能,是控制进程执行的指令集;数据集合是程序在执行时需要的数据和工作区;程序控制块(PCB)包含程序的描述信息和控制信息,是进程存在的唯一标志
在Python中,通过两个标准库 thread 和 Threading 提供对线程的支持, threading 对 thread 进行了封装。 threading 模块中提供了 Thread , Lock , RLOCK , Condition 等组件
在Python中线程和进程的使用就是通过 Thread 这个类。这个类在我们的 thread 和 threading 模块中。我们一般通过 threading 导入
默认情况下,只要在解释器中,如果没有报错,则说明线程可用
守护模式:
现在我们程序代码中,有多个线程, 并且在这个几个线程中都会去 操作同一部分内容,那么如何实现这些数据的共享呢?
这时,可以使用 threading库里面的锁对象 Lock 去保护
Lock 对象的acquire方法 是申请锁
每个线程在操作共享数据对象之前,都应该申请获取操作权,也就是调用该共享数据对象对应的锁对象的acquire方法,如果线程A 执行了 acquire() 方法,别的线程B 已经申请到了这个锁, 并且还没有释放,那么 线程A的代码就在此处 等待 线程B 释放锁,不去执行后面的代码。
直到线程B 执行了锁的 release 方法释放了这个锁, 线程A 才可以获取这个锁,就可以执行下面的代码了
如:
到在使用多线程时,如果数据出现和自己预期不符的问题,就可以考虑是否是共享的数据被调用覆盖的问题
使用 threading 库里面的锁对象 Lock 去保护
Python中的多进程是通过multiprocessing包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多也有start(), run(), join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的
守护模式:
其使用方法和线程的那个 Lock 使用方法类似
Manager的作用是提供多进程共享的全局变量,Manager()方法会返回一个对象,该对象控制着一个服务进程,该进程中保存的对象运行其他进程使用代理进行操作
语法:
线程池的基类是 concurrent.futures 模块中的 Executor , Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor ,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定
Exectuor 提供了如下常用方法:
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表
Future 提供了如下方法:
使用线程池来执行线程任务的步骤如下:
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
也可以低于 CPU 核心数
使用线程池来执行线程任务的步骤如下:
关于进程的开启代码一定要放在 if __name__ == '__main__': 代码之下,不能放到函数中或其他地方
开启进程的技巧
开启进程的数量最好低于最大 CPU 核心数