您的位置:

Semaphore使用详解

一、Semaphore基础知识

1、Semaphore的定义和概念

import threading

sem = threading.Semaphore(3)

Semaphore是一种计数器,它用来保护对共享资源的访问。在任何时刻,同时只能有一个线程访问共享资源。当线程要访问共享资源时,它必须首先获得Semaphore,如果Semaphore计数器的值为0,那么线程就会被阻塞,直到Semaphore有信号为止。

2、Semaphore的构造函数

__init__(self, value=1)

Semaphore的构造函数有一个参数value,这个参数指定了Semaphore的初始计数器的值,默认为1。

3、Semaphore的方法

Semaphore对象有两个主要方法,分别是acquire()和release()方法。

  • acquire([blocking])

    acquire()方法尝试获取Semaphore,如果Semaphore计数器的值为0,就会阻塞线程。可选参数blocking默认为1,如果设为0,则acquire()方法会立即返回,不会阻塞线程。当acquire()方法成功获取Semaphore时,Semaphore计数器的值会减1。

  • release()

    release()方法会把Semaphore的计数器的值加1,如果此时Semaphore有阻塞的线程,它就会选择一个线程并唤醒它。

二、Semaphore使用场景

1、实现对公共资源的访问控制

Semaphore的最主要的应用场景就是实现对公共资源的访问控制。Semaphore控制对公共资源的访问,保证同一时刻只有一个线程在访问该资源,而其他线程必须等待。

import threading

class SharedResource:
    def __init__(self):
        self.sem = threading.Semaphore(1)
        self.data = None
    
    def get(self):
        self.sem.acquire()
        result = self.data
        self.sem.release()
        return result
    
    def set(self, data):
        self.sem.acquire()
        self.data = data
        self.sem.release()

上述代码中,定义了一个SharedResource类,该类有一个Semaphore成员变量sem,Semaphore的计数器的初始值为1。类中定义了get()和set()方法,这两个方法在访问data成员变量时,都要先获取Semaphore,然后再进行操作。

2、控制程序并发访问线程数

当程序需要同时运行大量的线程的时候,如果没有控制线程数,容易出现线程过多造成CPU调度产生的负担,进而导致程序运行缓慢或者崩溃。这时,可以使用Semaphore来控制并发访问线程数,保证程序有一个平缓的工作负荷。

import threading
import time

sem = threading.Semaphore(3)

def worker():
    with sem:
        print(f'Thread {threading.get_ident()} started')
        time.sleep(1)
        print(f'Thread {threading.get_ident()} finished')
        
for i in range(5):
    t = threading.Thread(target=worker)
    t.start()

上述代码中,创建了5个线程来执行worker函数。设置了Semaphore的计数器初始值为3,即同一时间只能有3个线程执行worker函数。使用with语句获取Semaphore,当3个线程都在执行时,后续的线程会被阻塞。当某个线程执行结束时,会释放Semaphore,唤醒被阻塞的线程。

三、Semaphore实现生产者-消费者模型

Semaphore还可以用来实现生产者-消费者模型。生产者和消费者之间通过一个队列来进行通讯。当队列中没有可以消费的数据时,消费者线程会阻塞,当队列满了时,生产者线程会阻塞。当生产者生产了新的数据时,会通知阻塞的消费者线程开始消费,当消费者消费了数据时,会通知阻塞的生产者线程开始生产。

import threading
import time

queue = []
sem_producer = threading.Semaphore(10)
sem_consumer = threading.Semaphore(0)
mutex = threading.Lock()

class ProducerThread(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)
    
    def run(self):
        global queue
        for i in range(20):
            sem_producer.acquire()
            mutex.acquire()
            queue.append(i)
            print(f'{self.name} produced {i}', end='\n' if i%10==9 else ', ')
            mutex.release()
            sem_consumer.release()
            time.sleep(0.1)
        
class ConsumerThread(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)
    
    def run(self):
        global queue
        while True:
            sem_consumer.acquire()
            mutex.acquire()
            if len(queue) == 0:
                mutex.release()
                break
            i = queue.pop(0)
            print(f'{self.name} consumed {i}', end='\n' if i%10==9 else ', ')
            mutex.release()
            sem_producer.release()
            time.sleep(0.1)

producer_threads = [ProducerThread(f'Producer {i}') for i in range(3)]
consumer_threads = [ConsumerThread(f'Consumer {i}') for i in range(2)]

for thread in producer_threads + consumer_threads:
    thread.start()

for thread in producer_threads + consumer_threads:
    thread.join()

上述代码中,定义了ProducerThread和ConsumerThread两个线程类,ProducerThread负责生产数据,ConsumerThread负责消费数据。queue是一个列表,表示共享数据的队列,Semaphore sem_producer的计数器初始值为10,Semaphore sem_consumer的计数器初始值为0,互斥锁mutex保护对队列的访问。运行3个生产者线程和2个消费者线程,生产者生产20个数据,消费者消费这20个数据。

四、总结

Semaphore是Python多线程编程中非常有用的工具,可以用来控制对公共资源的访问,也可以用来控制并发线程数,还能够实现生产者-消费者模型。但是,使用Semaphore时需要注意两个问题:第一,互斥锁和Semaphore的区别,互斥锁是为了保证对共享资源的互斥访问,Semaphore是为了控制对公共资源的访问。第二,使用Semaphore时需要注意死锁问题,当某个线程无法获取Semaphore时会被阻塞,如果在某个线程获取Semaphore之前另一个线程已经获取了Semaphore并陷入阻塞,就会出现死锁,程序将无法继续运行。