Python 多进程共享内存、NumPy 数组 | Sharing NumPy Array When Using Python Multiprocessing

背景

当前的项目需要对大型 numpy 数组进行各种运算(不是深度学习的那种运算),实践发现只开一个 python 进程时,只能使用一个 CPU 核心。所以考虑使用 multiprocessing 模块进行多进程运算。

但是,问题也很明显:用的是 multiprocessing.pool,如果我的 pool 的 size 是 4,一个 GB 级的 ndarray 传给 pool,会复制 4 份到每一个子进程。这首先会在传输时花时间做相应的 pickle 和 unpickle 操作;更重要的是,这坨数据会在内存里复制 4 份——这直接导致能处理的最大数据大小缩小了四分之三。

本文使用的 Python 版本为 3.6 / 3.7,Windows 系统。
在 3.8 版本中,新加入了 multiprocessing.shared_memory 模块,应该能简化这个问题。但是目前为止,项目使用的部分包还不支持 3.8,所以仍需要在旧版本中解决这个问题。

Value 与 Array

在 multiprocessing 包中,提供了一些可共享的对象:Value、Array、RawValue 与 RawArray。基本上,前者没有 Raw 的,可以加锁以进行进程间同步,后面 Raw 的没有锁。项目中用到的 numpy 数组都是只读的,子进程只需要读不需要写,所以选择使用 RawArray。

下面的代码会使用一个 numpy array 创建一个 RawArray,然后把它转回 numpy array:

import multiprocessing
import numpy as np

arr = np.zeros(5)
arr_shared = multiprocessing.RawArray('d', arr) # 'd' for double, which is float64

arr_new = np.frombuffer(arr_shared, dtype=np.double)
print(arr_new) # 返回 [0. 0. 0. 0. 0.]

如果 array 是多维的,直接用上面的代码会报错,因为 RawArray 只支持一维。可以这样解决:

import multiprocessing
import numpy as np

SHAPE = (2, 3)
arr = np.zeros(SHAPE)
arr_shared = multiprocessing.RawArray('d', arr.ravel())

arr_new = np.frombuffer(arr_shared, dtype=np.double).reshape(SHAPE)
print(arr_new)
# [[0. 0. 0.]
#  [0. 0. 0.]]

传给进程池

思路很清晰:在主进程生成 array,转成 RawArray,再传给 Pool。

然而,直接把 RawArray 对象作为参数是会报错的(RuntimeError: c_double_Array_x objects should only be shared between processes through inheritance)。

在网上找到了答案:通过 pool 的 initializer 实现子进程的初始化。这在官方文档里面只有轻描淡写的一句😂。

具体来说,在创建进程池时,需要传入 initializer 函数与 initargs 参数。
initargs 包含了 RawArray 对象,也可以把它的 shape 也传进去(我下面的参考代码懒就不传了)。
initializer 函数会在子进程创建时被调用,并且把 RawArray 对象变为该子进程的全局变量。

initializer 函数及其对应的变量共享,可以用全局变量或全局的字典来实现:

global_arr_shared = None

def init_pool(arr_shared):
    global global_arr_shared
    global_arr_shared = arr_shared

而进程池是这样创建的:

with multiprocessing.Pool(processes=2, initializer=init_pool, initargs=(arr_shared,)) as pool:

Pool 的 worker 函数中,把 RawArray 转回 numpy array之后,就可以当作普通的 ndarray 操作了。如果修改了数组的内容,也会反映到原数组中,只是需要注意锁的问题。下面是一个很简单的例子。

def worker(i):
    arr = np.frombuffer(global_arr_shared, np.double).reshape(SHAPE)
    time.sleep(1)  # some other operations
    return np.sum(arr * i)

总体的程序如下,可以直接运行:

import multiprocessing 
import time
import numpy as np 

SHAPE = (2, 3)
global_arr_shared = None

def init_pool(arr_shared):
    global global_arr_shared
    global_arr_shared = arr_shared

def worker(i):
    arr = np.frombuffer(global_arr_shared, np.double).reshape(SHAPE)
    time.sleep(1)  # some other operations
    return np.sum(arr * i)


if __name__ == '__main__':
    arr = np.random.randn(*SHAPE)
    arr_shared = multiprocessing.RawArray('d', arr.ravel())

    with multiprocessing.Pool(processes=2, initializer=init_pool, initargs=(arr_shared,)) as pool:  # initargs传入tuple
        for result in pool.map(worker, [1,2,3]):
            print(result)

总体来说,就是要先变成 RawArray,然后给 Pool 加上初始化函数以传递 RawArray 给子进程,最后用的时候把 RawArray 转回 numpy array。还是有点麻烦的。
初步测试,性能基本没有受到影响,肯定比 multiprocessing.Manager 快。

相关参考

1. https://research.wmz.ninja/articles/2018/03/on-sharing-large-arrays-when-using-pythons-multiprocessing.html
2. https://stackoverflow.com/questions/52543868/pass-data-to-python-multiprocessing-pool-worker-processes


已有7条评论 发表评论

  1. x /

    我要传递的是图片,测试了一下
    arr_new = np.frombuffer(arr_shared, dtype=np.double).reshape(SHAPE)

    3张图片,运行这句差不多花了5秒。。有没有更快一点的办法呢。。

    1. 7forz / 本文作者

      感觉应该不会呀,时间是耗在 multiprocessing.RawArray(‘d’, arr.ravel()) 这里,因为需要申请新的内存空间并复制数据。
      np.frombuffer()应该是很快的。另外如果是图片,可能不需要用浮点数。最后,不知道你的数组有多大呢?

  2. 匿名 /

    有个疑问,如果子进程只需要读不需要写,那还需要全局变量吗?除了节省内存空间是不是没有其他好处了?

    1. 7forz / 本文作者

      好像是需要的,因为要通过全局变量,使 initializer 能把参数传进去。
      不过总体来说,这个写法还是挺不美观的,现在2021年了,该用 multiprocessing.shared_memory 试试了。
      如果共享的数据不大,倒是没太大必要共享的,这样写不好维护、阅读的样子

回复给匿名 取消回复