您的位置:

并发编程必备:Python多进程之队列(Queue)使用详解

并发编程是现代程序设计中非常重要的一部分,尤其是在处理大型数据和网络任务中。在Python语言中,多进程是一种方便且高效的方式来实现并发编程。通过进程之间的通信,可以实现各种并发任务,包括网络任务、数据处理、图像处理等。启动多个进程并进行通信时,我们可以使用Python提供的队列(Queue)来实现进程间的数据共享,这对于多进程编程至关重要。本文将详细讲解并发编程中Python多进程之队列的使用。

一、什么是Python多进程之队列(Queue)?

队列(Queue)是一种先进先出的数据结构,它可以在多个进程之间共享数据。在Python的多进程编程中,队列被广泛应用于多进程之间的数据传送。Python提供了两种类型的队列:进程队列(Process Queue)和线程队列(Thread Queue)。这两种队列都可以被用于多进程之间的数据传送,只是应用场景不同而已。

在Python多进程编程中,使用进程队列(Process Queue)时,需要从multiprocessing模块中引入Queue。该类实例化后,可以用来在多个进程之间传输数据。由于数据被存储在队列中,所以多进程之间不需要再次使用文件或套接字进行通信。


import multiprocessing

# 创建一个进程队列
queue = multiprocessing.Queue()

二、Python多进程之队列(Queue)的基本用法

Python多进程之队列的基本用法非常简单,只需要通过put()方法向队列中添加元素,通过get()方法从队列中取出元素即可。

下面是多进程队列的一个简单例子,它创建了两个进程,一个进程向队列中添加元素,另一个进程从队列中获取元素。在程序运行过程中,我们通过不断向队列中添加元素,检查另一个进程是否能够从队列中成功获取这些元素。


import multiprocessing

# 向队列中添加元素的进程函数
def add_data(queue):
    # 向队列中添加数据
    for i in range(10):
        queue.put(i)
        print(f"Put {i} into queue")
        
# 从队列中获取元素的进程函数
def read_data(queue):
    # 从队列中获取数据
    while True:
        data = queue.get()
        print(f"Get {data} from queue")

if __name__ == '__main__':
    # 创建进程队列
    queue = multiprocessing.Queue()

    # 创建进程
    add_process = multiprocessing.Process(target=add_data, args=(queue,))
    read_process = multiprocessing.Process(target=read_data, args=(queue,))

    # 启动进程
    add_process.start()
    read_process.start()
    
    # 等待两个进程结束
    add_process.join()
    read_process.terminate()

当程序运行时,首先创建了一个进程队列,然后创建了两个进程,分别是向队列中添加元素的add_data()函数和从队列中获取元素的read_data()函数。最后分别启动两个进程并等待它们结束。

通过这个例子,你可以看到向队列中添加元素和从队列中获取元素的简单方法。Python多进程之队列(Queue)允许你在多个进程之间传递数据,无需担心数据冲突问题。

三、Python多进程之队列(Queue)的高级用法

除了向进程队列中添加元素和从队列中取出元素的基本用法,Python的多进程之队列还有很多高级用法。在本节中,我们将介绍一些高级的操作,包括队列大小的限制、队列的阻塞操作、非阻塞操作尝试获取队列元素、可等待对象等。

(1)设置队列的大小限制

在某些情况下,我们需要控制进程队列中存储的元素数量。Python的Queue提供了一个可选的maxsize参数来指定队列的最大元素数量,如果队列已满,将不再接受新的元素。


import multiprocessing

# 向队列中添加元素的进程函数
def add_data(queue):
    # 向队列中添加数据
    for i in range(10):
        queue.put(i)
        print(f"Put {i} into queue")
        
# 从队列中获取元素的进程函数
def read_data(queue):
    # 从队列中获取数据
    while True:
        data = queue.get()
        print(f"Get {data} from queue")

if __name__ == '__main__':
    # 创建进程队列,设置大小为5
    queue = multiprocessing.Queue(maxsize=5)

    # 创建进程
    add_process = multiprocessing.Process(target=add_data, args=(queue,))
    read_process = multiprocessing.Process(target=read_data, args=(queue,))

    # 启动进程
    add_process.start()
    read_process.start()
    
    # 等待两个进程结束
    add_process.join()
    read_process.terminate()

在这个例子中,我们将进程队列maxsize设置为5。当添加进程向队列中添加元素时,当队列存储元素数量已满,添加进程会被阻塞,直到队列中的某个元素被一个获取进程取出。

(2)阻塞队列操作

队列的阻塞操作是指在队列上执行某些操作时,如果队列为空或已满,则会阻塞操作,直到队列不为空或不满为止。Python队列模块提供了以下阻塞队列操作:

  • put():向队列中添加一个元素,如果队列已满,此操作将被阻塞。
  • put_nowait():向队列中添加一个元素,如果队列已满,此操作将不会被阻塞,并引发Full异常。
  • get():从队列中获取一个元素,如果队列为空,此操作将被阻塞。
  • get_nowait():从队列中获取一个元素,如果队列为空,此操作不会被阻塞,并引发Empty异常。

下面是一个简单的例子,它演示了如何在Python的多进程中使用队列的阻塞操作。


import multiprocessing
import time

# 向队列中添加元素的进程函数
def add_data(queue):
    # 向队列中添加数据
    for i in range(10):
        queue.put(i, True, 2)
        print(f"Put {i} into queue")
        time.sleep(1)

# 从队列中获取元素的进程函数
def read_data(queue):
    # 从队列中获取数据
    while True:
        data = queue.get(True, 2)
        print(f"Get {data} from queue")
        time.sleep(1)

if __name__ == '__main__':
    # 创建进程队列,设置大小为5
    queue = multiprocessing.Queue(maxsize=5)

    # 创建进程
    add_process = multiprocessing.Process(target=add_data, args=(queue,))
    read_process = multiprocessing.Process(target=read_data, args=(queue,))

    # 启动进程
    add_process.start()
    read_process.start()
    
    # 等待两个进程结束
    add_process.join()
    read_process.terminate()

(3)非阻塞操作尝试获取队列元素

除了阻塞操作之外,Python队列还提供了非阻塞操作,这意味着如果队列为空或已满,则操作将不会被阻塞,而是立即返回结果。以下是Python队列的非阻塞操作:

  • put():向队列中添加一个元素,如果队列已满,此操作将不会被阻塞,而是立即引发Full异常。
  • put_nowait():向队列中添加一个元素,如果队列已满,此操作将不会被阻塞,并引发Full异常。
  • get():从队列中获取一个元素,如果队列为空,此操作将不会被阻塞,而是立即引发Empty异常。
  • get_nowait():从队列中获取一个元素,如果队列为空,此操作将不会被阻塞,并引发Empty异常。

以下是一个演示Python队列非阻塞操作的简单例子。


import multiprocessing
import time

# 向队列中添加元素的进程函数
def add_data(queue):
    # 向队列中添加数据
    for i in range(10):
        try:
            queue.put_nowait(i)
            print(f"Put {i} into queue")
        except Exception as e:
            print(e)
        time.sleep(1)

# 从队列中获取元素的进程函数
def read_data(queue):
    # 从队列中获取数据
    while True:
        try:
            data = queue.get_nowait()
            print(f"Get {data} from queue")
        except Exception as e:
            print(e)
        time.sleep(1)

if __name__ == '__main__':
    # 创建进程队列,设置大小为5
    queue = multiprocessing.Queue(maxsize=5)

    # 创建进程
    add_process = multiprocessing.Process(target=add_data, args=(queue,))
    read_process = multiprocessing.Process(target=read_data, args=(queue,))

    # 启动进程
    add_process.start()
    read_process.start()
    
    # 等待两个进程结束
    add_process.join()
    read_process.terminate()

(4)可等待对象

在Python的多进程编程中,有些时候我们需要等待一个进程完成任务后再执行某些操作。Python的multiprocessing模块提供了许多同步工具,包括锁、事件、条件变量等。此外,Python多进程之队列(Queue)还提供了一些同步方法,这些方法允许一个进程等待另一个进程完成任务。

以下是Python多进程之队列(Queue)提供的同步方法:

  • qsize():返回队列元素的数量,此方法不会阻塞进程。
  • empty():如果队列为空,返回True,否则返回False。
  • full():如果队列已满,返回True,否则返回False。
  • join():阻塞进程,直到队列中的所有元素都被取出。

以下是一个演示Python多进程之队列可等待对象的简单例子。


import multiprocessing

# 向队列中添加元素的进程函数
def add_data(queue):
    # 向队列中添加数据
    for i in range(10):
        queue.put(i)

# 读