您的位置:

Python3 多进程详解

一、多进程概述

多进程是指同一时刻有多个进程同时执行,每个进程都有独立的内存空间、寄存器和程序计数器等。多进程可同时利用多个CPU进行并行处理,提高计算机的执行效率。

在 Python 中,使用 multiprocessing 模块可以轻松实现多进程操作。

二、创建进程

Python 中创建子进程的方式有多种,这里主要介绍两种方式:使用 Process 对象和使用 Pool 对象。

1. 使用 Process 对象

Process 对象用于表示进程,它有一个构造函数,可以传入目标函数和参数,用于启动子进程。

import multiprocessing

def worker(arg):
    print(f'Process {arg} is running.')

if __name__ == '__main__':
    for i in range(4):
        p = multiprocessing.Process(target=worker, args=(i,))
        p.start()

上面的代码中,创建了 4 个子进程,并分别传入参数 0、1、2、3,子进程将执行 worker 函数打印输出对应的进程号。

需要注意的是,在 Windows 平台下,由于没有 fork 调用,所以 multiprocessing 模块需要在 __name__ == '__main__' 的保护下运行,以防止无限递归。

2. 使用 Pool 对象

Pool 对象用于管理进程池,它其实就是维护了一个进程的列表,当需要使用的时候,就从这个池中去取,用完之后再还回去。这种方式可以提高进程的重复利用率,从而降低系统开销。

import multiprocessing
import time

def worker(arg):
    time.sleep(1)
    print(f'Process {arg} is running.')

if __name__ == '__main__':
    with multiprocessing.Pool(processes=3) as pool:
        pool.map(worker, range(10))

上面的代码中,使用 Pool 对象创建了一个拥有 3 个进程的进程池,然后用 map 函数调用 worker 函数,传入一个包含 10 个元素的列表,每个元素都作为 worker 函数的参数。

可以看到,进程池会维护 3 个进程,每个进程的运行时间为 1 秒,最终输出的信息认为是重叠在一起的结果,但实际上是 10 个进程分别执行后的结果。

三、进程间通信

由于每个进程都有自己独立的内存空间,所以进程间通信(IPC)需要使用特定的机制进行传递数据。

在 Python 中,multiprocess 模块提供了多种进程通信方式,比如 Queue、Pipe 等。

1. Queue 队列

Queue 队列是进程间通信的一种简单方式,用于在多个进程之间安全地传递数据。

import multiprocessing

def worker(q):
    while True:
        value = q.get()
        if value is None:
            break
        print(value)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=worker, args=(q,))
    p.start()

    for i in range(10):
        q.put(i)

    q.put(None)
    p.join()

上面的代码中,创建了一个 Queue 对象,并以它作为参数启动了子进程,主进程通过 put 函数向队列中添加 10 个数字,子进程不断从队列中取出数据并打印,当取出 None 时,子进程停止运行。

2. Pipe 管道

Pipe 管道是另一种用于进程间通信的方式。它返回一对对象,包含两个链接端,一个称为发送端,另一个称为接收端。

import multiprocessing

def worker(conn):
    conn.send('hello')
    print(conn.recv())
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p = multiprocessing.Process(target=worker, args=(child_conn,))
    p.start()

    print(parent_conn.recv())
    parent_conn.send('world')
    p.join()

上面的代码中,创建了一个管道对象,并以它作为参数启动了子进程,主进程向管道中发送字符串 'hello',子进程从管道中接收到数据并打印,然后向管道中发送字符串 'world',主进程从管道中接收到数据并打印,子进程退出。

四、进程池中返回值

使用 pool 对象创建进程池时,可以通过 apply_async 方法指定特定进程要执行的任务,并且可以通过 get 方法获得进程返回的值。

import multiprocessing

def sum(a, b):
    return a + b

if __name__ == '__main__':
    with multiprocessing.Pool(processes=1) as pool:
        results = []
        result = pool.apply_async(sum, (1, 2))
        results.append(result)

        for result in results:
            print(result.get())

上面的代码中,使用 apply_async 方法传入函数和参数,每个进程在完成任务后会返回一个结果,使用 get 方法来得到结果。

五、进程间共享变量

Python 中的多进程间是没有共享变量的,每个进程都有独立的内存空间,但共享部分需要使用特定的机制来进行操作。

1. Value 变量

Value 变量是 Python 中的一种特殊数据类型,它可以在多进程间进行共享。

import multiprocessing

def change_value(value):
    value.value = 100

if __name__ == '__main__':
    num = multiprocessing.Value('d', 0.0)
    p = multiprocessing.Process(target=change_value, args=(num,))
    p.start()
    p.join()

    print(num.value)

上面的代码中,创建了一个 Value 变量,然后以它作为参数启动了一个子进程,子进程执行 change_value 函数,将 Value 变量的值改为 100,最终将结果输出。

2. Array 数组

Array 数组是另一种可以在多进程间使用的数据类型。这个数据类型表示一种可以容纳相同类型数据的可变序列。

import multiprocessing

def modify_array(arr):
    for i in range(len(arr)):
        arr[i] *= 2

if __name__ == '__main__':
    arr = multiprocessing.Array('i', range(10))
    p = multiprocessing.Process(target=modify_array, args=(arr,))
    p.start()
    p.join()

    print(arr[:])

上面的代码中,创建了一个 Array 数组,然后以它作为参数启动了一个子进程,子进程执行 modify_array 函数,将数组中的所有数字都乘以 2,最终将结果输出。

六、进程之间的同步

在多进程的操作中,有时候需要控制进程的执行顺序或者进程之间有依赖关系的时候,需要使用同步机制。

1. Lock 锁

Lock 锁是 Python 中的一种同步机制,它用于控制多个进程对共享资源的访问。

import multiprocessing

def increment(value, lock):
    for idx in range(10000):
        with lock:
            value.value += 1

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    value = multiprocessing.Value('i', 0)

    processes = [multiprocessing.Process(target=increment, args=(value, lock)) for i in range(10)]

    for process in processes:
        process.start()
    for process in processes:
        process.join()

    print(value.value)

上面的代码中,创建了一个 Lock 对象,以 Value 变量和 Lock 对象作为参数启动了 10 个子进程,每个子进程都会将 Value 变量的值加 1,执行锁定,保证多个进程对变量的访问是互斥的。

2. Semaphore 信号量

Semaphore 信号量是 Python 中的另一种同步机制,它可以控制多个进程对共享资源的访问,并设置锁定次数。

import multiprocessing
import time

def worker(s, i):
    with s:
        print(f'Worker {i} start')
        time.sleep(1)
        print(f'Worker {i} end')

if __name__ == '__main__':
    s = multiprocessing.Semaphore(2)

    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(s, i))
        p.start()

上面的代码中,创建了一个 Semaphore 对象,以 Semaphore 对象和进程编号作为参数启动 5 个子进程,每个子进程获得锁之后打印相应的信息,然后等待 1 秒再释放锁。

七、多进程的其他注意事项

在 Python 中,多进程操作有一些需要注意的问题。

1. 进程的资源限制

在 Python 进行多进程操作中,需要注意操作系统的资源限制。有些系统的资源分配是静态的,需要在运行时限制进程数量。在 Linux 中,可以使用 ulimit 命令查看系统资源。

2. 多进程的数据传递问题

在 Python 进行多进程操作时,需要注意数据的传递问题。由于每个进程都有自己独立的内存空间,因此需要使用特定的机制进行数据的传递。

3. 进程的生命周期

在 Python 进行多进程操作时,需要注意进程的生命周期问题。进程的生命周期由操作系统决定,必须等待进程执行完毕才能退出。

4. 进程间的信号处理

在 Python 进行多进程操作时,需要注意进程间的信号处理问题。不同进程之间的信号处理是相互独立的,需要注意在适当的时候的发送和接收信号。

八、总结

在 Python 中,多进程操作是一个非常强大的功能,能够大大提高程序的执行效率。本文对多进程的概念、创建、进程间通信、进程池中返回值、进程间共享变量、进程之间的同步、以及多进程操作中需要注意的一些问题进行了详细的介绍和讲解。