multiprocessing模块用法

由于一些历史原因,我们常用的CPython具有全局解释器锁(GIL)机制,这把全局大锁导致Python的多线程性能非常糟糕,为了解决这个问题,一种方案是使用协程代替多线程,由于自带了yield,Python实现协程相对于C++那种动辄setjmp或者CPS,抑或利用Linux.ucontext等基于运行时的组件的方式相比要优雅了一些。不过协程对于IO密集型的程序用处很大,但如果我们想要实现并行,那么使用多进程来取代多线程就是一个必由之路。
multiprocessingmultiprocessing.dummy是Python下两个常用的多进程和多线程模块。

multiprocessing提供的多进程与多线程机制

multiprocessingmultiprocessing.dummy分别为多进程和多线程模块,但是两者的调用方式基本相同

进程池/线程池

multiprocessing.Pool(processes = pop_size)multiprocessing.ThreadPool(processes = pop_size)可以创建不同大小的进程池和线程池。这两个函数还可以传入initializerinitargs两个参数,用于进行初始化。默认情况下Pool会创建process数量的进程/线程,但是maxtasksperchild参数可以控制一条进程/线程最多处理的任务数,当超过这个数量时会重新启动一个新的进程/线程。

调用并取回结果

这里我们实际上实现了一个future需要的功能下面我们以多进程为例来展示一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
task_size = len(tasks)
ans = [None] * task_size
result = []
multiprocessing.freeze_support()
pool = multiprocessing.Pool(processes = task_size)
for (i, task) in zip(count(0), tasks):
run = pool.apply_async(callee, args = (task, i))
result.append( (i, callee) )
try:
pool.close()
pool.join()
for res in result:
ans[res[0]] = res[1].get()
except Exception, e:
print e
return ans

其中对于多进程,multiprocessing.freeze_support()语句在windows系统上是必须的,这是因为windows的API不包含fork()等函数。
apply_async表示异步调用,此时各进程在运行完毕后pool.join()回到主进程,主进程通过res.get()函数获得callee返回结果。

freeze_support

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def is_forking(argv):
# 子进程带有--multiprocessing-fork标记
if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
assert len(argv) == 3
return True
else:
return False

def freeze_support():
'''
Run code for process object if this in not the main process
'''
if is_forking(sys.argv):
main()
sys.exit()

def main():
'''
Run code specified by data received over pipe
'''
assert is_forking(sys.argv)

handle = int(sys.argv[-1])
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
from_parent = os.fdopen(fd, 'rb')

process.current_process()._inheriting = True
preparation_data = load(from_parent)
prepare(preparation_data)
self = load(from_parent)
process.current_process()._inheriting = False

from_parent.close()

exitcode = self._bootstrap()
exit(exitcode)

进程间通信

对于Python2而言,进程间通信借助于multiprocessing模块提供的QueueManager等数据结构

子进程入口点

在使用多进程时,如果需要兼容windows,则最好不要使用全局变量。这是因为Python代码的执行顺序是从前往后的,而window又没有类似Linux的fork机制。所以当子进程被创建后,会重新执行一次全局变量的初始化。这可能就会覆盖掉主进程中已经被修改了的值。对于这种情况,有两种方案,第一就是通过IPC将需要传递的全局函数传过去,第二就是借助于仍然可以正常使用的sys.argv来重新初始化一遍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import multiprocessing, subprocess
from multiprocessing import Process, Array
import os

g = 1000
print "进程{}:g的初始值是{}".format(os.getpid(), g)

def inner():
print "进程{}:g在子进程入口函数inner的值是{}".format(os.getpid(), g)

if __name__ == '__main__':
global g
g = 2000
print "进程{}:在主进程fork前的值是{}".format(os.getpid(), g)
proc = multiprocessing.Process(target=inner)
proc.start()
g = 3000
print "进程{}:在主进程fork后的值是{}".format(os.getpid(), g)

print "进程{}:最终的值是{}".format(os.getpid(), g)

子进程import的情况

如果主进程和子进程同时import了一个文件,那么其中的变量是共享的么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# sub
global_x = 1000

# main
from sub import global_x
import multiprocessing, subprocess
from multiprocessing import Process, Array

def inner():
print("global_x before inner is {}".format(global_x))
global_x = 3000
print("global_x after inner is {}".format(global_x))

if __name__ == '__main__':
global_x = 2000
print("global_x before sub is {}".format(global_x))
proc = multiprocessing.Process(target=inner)
proc.start()
proc.join()
print("global_x after sub is {}".format(global_x))

multiprocessing使用实例

使用multiprocessing与subprocess协作

这个案例来自于我毕业论文的一个需求。为了提高计算效率,可以编写一段主程序,用它来启动若干个外部程序,并把总的计算任务拆分发送给这些外部程序并行计算。运行外部的可执行程序可以使用subprocess模块,主程序通过PIPE和该外部子程序进行通信,这样的通信会阻塞主程序,不能达到并行的效果。为了能够异步地对多个subprocess进行通信,可以使用multiprocessing的多进程,每条进程中调用subprocess,subprocess在进程结束后取回输出,并交回给主进程合并。
这样的方法对于n个任务需要启动n个外部程序,如果外部程序的初始化成本比较大,这样的设计方案成本是划不来的,Windows系统下有不能直接fork。比较好的方法是预先初始化m个外部程序作为进程池,然后进程池中的每个外部程序依次处理[n/m]个任务,外部程序和外部程序之间是并行的。为了管理这些子程序,我们可以借助于multiprocessing的多线程,由子线程负责和外部进程进行通信。这样子线程是彼此并行的,而主线程可以阻塞起来,等所有的线程计算完毕join回来即可。虽然说Python自带的GIL给多线程的带来阻碍,但是主要的计算工作主要存在于subprocess所调用的外部程序中,因此性能损失有限。并且采用多进程由于不能共享内存,因此很难将初始化好的外部程序的句柄交给相应的子进程。

初始化外部进程

对于这样的多线程方案,首先通过以下语句启动proc_size个外部进程,并且注册proc_size个线程负责和各个外部进程进行交互。主线程使用join函数等待所有子线程返回结果。

1
2
3
4
5
6
for index in xrange(proc_size):
subproc = subprocess.Popen(['XXX.exe'], stdin = subprocess.PIPE, stdout = subprocess.PIPE
, stderr = subprocess.PIPE, bufsize=1, close_fds='posix' in sys.builtin_module_names)
t = Thread(target = initial_method, args = (index, subproc, call_back))
t.daemon = True
ths.append(t)

有关subprocess模块的用法我已经另外开了一篇文章来记录。
这里的t.daemon = True表示该线程是主线程的守护线程,守护线程会在主线程退出时自动退出。对每个线程调用start方法,线程才会启动,这时候线程使用给定的args参数调用target参数传入的initial_method方法。

1
2
3
4
for i in xrange(proc_size):
ths[i].start()
for i in xrange(proc_size):
ths[i].join()

调用外部进程计算

由于n远大于m,因此对每一个外部进程需要使用互斥锁threading.Lock()维护。将线程i按照模m分组,同剩余系的线程共享一个进程。线程取得进程资源后调用lock.acquire()为进程资源上锁,这时候如果其他线程再次调用lock.acquire()则会陷入阻塞状态,直到获得锁的线程调用lock.release()释放资源。