一、Semaphore基础知识
1、Semaphore的定义和概念
import threading
sem = threading.Semaphore(3)
Semaphore是一种计数器,它用来保护对共享资源的访问。在任何时刻,同时只能有一个线程访问共享资源。当线程要访问共享资源时,它必须首先获得Semaphore,如果Semaphore计数器的值为0,那么线程就会被阻塞,直到Semaphore有信号为止。
2、Semaphore的构造函数
__init__(self, value=1)
Semaphore的构造函数有一个参数value,这个参数指定了Semaphore的初始计数器的值,默认为1。
3、Semaphore的方法
Semaphore对象有两个主要方法,分别是acquire()和release()方法。
- acquire([blocking])
acquire()方法尝试获取Semaphore,如果Semaphore计数器的值为0,就会阻塞线程。可选参数blocking默认为1,如果设为0,则acquire()方法会立即返回,不会阻塞线程。当acquire()方法成功获取Semaphore时,Semaphore计数器的值会减1。
- release()
release()方法会把Semaphore的计数器的值加1,如果此时Semaphore有阻塞的线程,它就会选择一个线程并唤醒它。
二、Semaphore使用场景
1、实现对公共资源的访问控制
Semaphore的最主要的应用场景就是实现对公共资源的访问控制。Semaphore控制对公共资源的访问,保证同一时刻只有一个线程在访问该资源,而其他线程必须等待。
import threading
class SharedResource:
def __init__(self):
self.sem = threading.Semaphore(1)
self.data = None
def get(self):
self.sem.acquire()
result = self.data
self.sem.release()
return result
def set(self, data):
self.sem.acquire()
self.data = data
self.sem.release()
上述代码中,定义了一个SharedResource类,该类有一个Semaphore成员变量sem,Semaphore的计数器的初始值为1。类中定义了get()和set()方法,这两个方法在访问data成员变量时,都要先获取Semaphore,然后再进行操作。
2、控制程序并发访问线程数
当程序需要同时运行大量的线程的时候,如果没有控制线程数,容易出现线程过多造成CPU调度产生的负担,进而导致程序运行缓慢或者崩溃。这时,可以使用Semaphore来控制并发访问线程数,保证程序有一个平缓的工作负荷。
import threading
import time
sem = threading.Semaphore(3)
def worker():
with sem:
print(f'Thread {threading.get_ident()} started')
time.sleep(1)
print(f'Thread {threading.get_ident()} finished')
for i in range(5):
t = threading.Thread(target=worker)
t.start()
上述代码中,创建了5个线程来执行worker函数。设置了Semaphore的计数器初始值为3,即同一时间只能有3个线程执行worker函数。使用with语句获取Semaphore,当3个线程都在执行时,后续的线程会被阻塞。当某个线程执行结束时,会释放Semaphore,唤醒被阻塞的线程。
三、Semaphore实现生产者-消费者模型
Semaphore还可以用来实现生产者-消费者模型。生产者和消费者之间通过一个队列来进行通讯。当队列中没有可以消费的数据时,消费者线程会阻塞,当队列满了时,生产者线程会阻塞。当生产者生产了新的数据时,会通知阻塞的消费者线程开始消费,当消费者消费了数据时,会通知阻塞的生产者线程开始生产。
import threading
import time
queue = []
sem_producer = threading.Semaphore(10)
sem_consumer = threading.Semaphore(0)
mutex = threading.Lock()
class ProducerThread(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
global queue
for i in range(20):
sem_producer.acquire()
mutex.acquire()
queue.append(i)
print(f'{self.name} produced {i}', end='\n' if i%10==9 else ', ')
mutex.release()
sem_consumer.release()
time.sleep(0.1)
class ConsumerThread(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
global queue
while True:
sem_consumer.acquire()
mutex.acquire()
if len(queue) == 0:
mutex.release()
break
i = queue.pop(0)
print(f'{self.name} consumed {i}', end='\n' if i%10==9 else ', ')
mutex.release()
sem_producer.release()
time.sleep(0.1)
producer_threads = [ProducerThread(f'Producer {i}') for i in range(3)]
consumer_threads = [ConsumerThread(f'Consumer {i}') for i in range(2)]
for thread in producer_threads + consumer_threads:
thread.start()
for thread in producer_threads + consumer_threads:
thread.join()
上述代码中,定义了ProducerThread和ConsumerThread两个线程类,ProducerThread负责生产数据,ConsumerThread负责消费数据。queue是一个列表,表示共享数据的队列,Semaphore sem_producer的计数器初始值为10,Semaphore sem_consumer的计数器初始值为0,互斥锁mutex保护对队列的访问。运行3个生产者线程和2个消费者线程,生产者生产20个数据,消费者消费这20个数据。
四、总结
Semaphore是Python多线程编程中非常有用的工具,可以用来控制对公共资源的访问,也可以用来控制并发线程数,还能够实现生产者-消费者模型。但是,使用Semaphore时需要注意两个问题:第一,互斥锁和Semaphore的区别,互斥锁是为了保证对共享资源的互斥访问,Semaphore是为了控制对公共资源的访问。第二,使用Semaphore时需要注意死锁问题,当某个线程无法获取Semaphore时会被阻塞,如果在某个线程获取Semaphore之前另一个线程已经获取了Semaphore并陷入阻塞,就会出现死锁,程序将无法继续运行。