背景
当前的项目需要对大型 numpy 数组进行各种运算(不是深度学习的那种运算),实践发现只开一个 python 进程时,只能使用一个 CPU 核心。所以考虑使用 multiprocessing 模块进行多进程运算。
但是,问题也很明显:用的是 multiprocessing.pool,如果我的 pool 的 size 是 4,一个 GB 级的 ndarray 传给 pool,会复制 4 份到每一个子进程。这首先会在传输时花时间做相应的 pickle 和 unpickle 操作;更重要的是,这坨数据会在内存里复制 4 份——这直接导致能处理的最大数据大小缩小了四分之三。
本文使用的 Python 版本为 3.6 / 3.7,Windows 系统。
在 3.8 版本中,新加入了 multiprocessing.shared_memory 模块,应该能简化这个问题。但是目前为止,项目使用的部分包还不支持 3.8,所以仍需要在旧版本中解决这个问题。
Value 与 Array
在 multiprocessing 包中,提供了一些可共享的对象:Value、Array、RawValue 与 RawArray。基本上,前者没有 Raw 的,可以加锁以进行进程间同步,后面 Raw 的没有锁。项目中用到的 numpy 数组都是只读的,子进程只需要读不需要写,所以选择使用 RawArray。
下面的代码会使用一个 numpy array 创建一个 RawArray,然后把它转回 numpy array:
import multiprocessing import numpy as np arr = np.zeros(5) arr_shared = multiprocessing.RawArray('d', arr) # 'd' for double, which is float64 arr_new = np.frombuffer(arr_shared, dtype=np.double) print(arr_new) # 返回 [0. 0. 0. 0. 0.]
如果 array 是多维的,直接用上面的代码会报错,因为 RawArray 只支持一维。可以这样解决:
import multiprocessing import numpy as np SHAPE = (2, 3) arr = np.zeros(SHAPE) arr_shared = multiprocessing.RawArray('d', arr.ravel()) arr_new = np.frombuffer(arr_shared, dtype=np.double).reshape(SHAPE) print(arr_new) # [[0. 0. 0.] # [0. 0. 0.]]
传给进程池
思路很清晰:在主进程生成 array,转成 RawArray,再传给 Pool。
然而,直接把 RawArray 对象作为参数是会报错的(RuntimeError: c_double_Array_x objects should only be shared between processes through inheritance)。
在网上找到了答案:通过 pool 的 initializer 实现子进程的初始化。这在官方文档里面只有轻描淡写的一句😂。
具体来说,在创建进程池时,需要传入 initializer 函数与 initargs 参数。
initargs 包含了 RawArray 对象,也可以把它的 shape 也传进去(我下面的参考代码懒就不传了)。
initializer 函数会在子进程创建时被调用,并且把 RawArray 对象变为该子进程的全局变量。
initializer 函数及其对应的变量共享,可以用全局变量或全局的字典来实现:
global_arr_shared = None def init_pool(arr_shared): global global_arr_shared global_arr_shared = arr_shared
而进程池是这样创建的:
with multiprocessing.Pool(processes=2, initializer=init_pool, initargs=(arr_shared,)) as pool:
Pool 的 worker 函数中,把 RawArray 转回 numpy array之后,就可以当作普通的 ndarray 操作了。如果修改了数组的内容,也会反映到原数组中,只是需要注意锁的问题。下面是一个很简单的例子。
def worker(i):
arr = np.frombuffer(global_arr_shared, np.double).reshape(SHAPE)
time.sleep(1) # some other operations
return np.sum(arr * i)
总体的程序如下,可以直接运行:
import multiprocessing import time import numpy as np SHAPE = (2, 3) global_arr_shared = None def init_pool(arr_shared): global global_arr_shared global_arr_shared = arr_shared def worker(i): arr = np.frombuffer(global_arr_shared, np.double).reshape(SHAPE) time.sleep(1) # some other operations return np.sum(arr * i) if __name__ == '__main__': arr = np.random.randn(*SHAPE) arr_shared = multiprocessing.RawArray('d', arr.ravel()) with multiprocessing.Pool(processes=2, initializer=init_pool, initargs=(arr_shared,)) as pool: # initargs传入tuple for result in pool.map(worker, [1,2,3]): print(result)
总体来说,就是要先变成 RawArray,然后给 Pool 加上初始化函数以传递 RawArray 给子进程,最后用的时候把 RawArray 转回 numpy array。还是有点麻烦的。
初步测试,性能基本没有受到影响,肯定比 multiprocessing.Manager 快。
相关参考
1. https://research.wmz.ninja/articles/2018/03/on-sharing-large-arrays-when-using-pythons-multiprocessing.html
2. https://stackoverflow.com/questions/52543868/pass-data-to-python-multiprocessing-pool-worker-processes
我要传递的是图片,测试了一下
arr_new = np.frombuffer(arr_shared, dtype=np.double).reshape(SHAPE)
3张图片,运行这句差不多花了5秒。。有没有更快一点的办法呢。。
感觉应该不会呀,时间是耗在 multiprocessing.RawArray(‘d’, arr.ravel()) 这里,因为需要申请新的内存空间并复制数据。
np.frombuffer()应该是很快的。另外如果是图片,可能不需要用浮点数。最后,不知道你的数组有多大呢?
有个疑问,如果子进程只需要读不需要写,那还需要全局变量吗?除了节省内存空间是不是没有其他好处了?
好像是需要的,因为要通过全局变量,使 initializer 能把参数传进去。
不过总体来说,这个写法还是挺不美观的,现在2021年了,该用 multiprocessing.shared_memory 试试了。
如果共享的数据不大,倒是没太大必要共享的,这样写不好维护、阅读的样子