由于一些历史原因,我们常用的CPython具有全局解释器锁(GIL)机制,这把全局大锁导致Python的多线程性能非常糟糕,为了解决这个问题,一种方案是使用协程代替多线程,由于自带了yield,Python实现协程相对于C++那种动辄setjmp
或者CPS,抑或利用Linux.ucontext
等基于运行时的组件的方式相比要优雅了一些。不过协程对于IO密集型的程序用处很大,但如果我们想要实现并行,那么使用多进程来取代多线程就是一个必由之路。multiprocessing
和multiprocessing.dummy
是Python下两个常用的多进程和多线程模块。
multiprocessing提供的多进程与多线程机制
multiprocessing
和multiprocessing.dummy
分别为多进程和多线程模块,但是两者的调用方式基本相同
进程池/线程池
multiprocessing.Pool(processes = pop_size)
和multiprocessing.ThreadPool(processes = pop_size)
可以创建不同大小的进程池和线程池。这两个函数还可以传入initializer
和initargs
两个参数,用于进行初始化。默认情况下Pool
会创建process
数量的进程/线程,但是maxtasksperchild
参数可以控制一条进程/线程最多处理的任务数,当超过这个数量时会重新启动一个新的进程/线程。
调用并取回结果
这里我们实际上实现了一个future需要的功能下面我们以多进程为例来展示一下
1 | task_size = len(tasks) |
其中对于多进程,multiprocessing.freeze_support()
语句在windows系统上是必须的,这是因为windows的API不包含fork()
等函数。apply_async
表示异步调用,此时各进程在运行完毕后pool.join()
回到主进程,主进程通过res.get()
函数获得callee
返回结果。
freeze_support
1 | def is_forking(argv): |
进程间通信
对于Python2而言,进程间通信借助于multiprocessing
模块提供的Queue
、Manager
等数据结构
子进程入口点
在使用多进程时,如果需要兼容windows,则最好不要使用全局变量。这是因为Python代码的执行顺序是从前往后的,而window又没有类似Linux的fork机制。所以当子进程被创建后,会重新执行一次全局变量的初始化。这可能就会覆盖掉主进程中已经被修改了的值。对于这种情况,有两种方案,第一就是通过IPC将需要传递的全局函数传过去,第二就是借助于仍然可以正常使用的sys.argv
来重新初始化一遍。
1 | import multiprocessing, subprocess |
子进程import的情况
如果主进程和子进程同时import了一个文件,那么其中的变量是共享的么?
1 | # sub |
multiprocessing使用实例
使用multiprocessing与subprocess协作
这个案例来自于我毕业论文的一个需求。为了提高计算效率,可以编写一段主程序,用它来启动若干个外部程序,并把总的计算任务拆分发送给这些外部程序并行计算。运行外部的可执行程序可以使用subprocess模块,主程序通过PIPE和该外部子程序进行通信,这样的通信会阻塞主程序,不能达到并行的效果。为了能够异步地对多个subprocess进行通信,可以使用multiprocessing的多进程,每条进程中调用subprocess,subprocess在进程结束后取回输出,并交回给主进程合并。
这样的方法对于n
个任务需要启动n
个外部程序,如果外部程序的初始化成本比较大,这样的设计方案成本是划不来的,Windows系统下有不能直接fork。比较好的方法是预先初始化m
个外部程序作为进程池,然后进程池中的每个外部程序依次处理[n/m]
个任务,外部程序和外部程序之间是并行的。为了管理这些子程序,我们可以借助于multiprocessing的多线程,由子线程负责和外部进程进行通信。这样子线程是彼此并行的,而主线程可以阻塞起来,等所有的线程计算完毕join回来即可。虽然说Python自带的GIL给多线程的带来阻碍,但是主要的计算工作主要存在于subprocess所调用的外部程序中,因此性能损失有限。并且采用多进程由于不能共享内存,因此很难将初始化好的外部程序的句柄交给相应的子进程。
初始化外部进程
对于这样的多线程方案,首先通过以下语句启动proc_size
个外部进程,并且注册proc_size
个线程负责和各个外部进程进行交互。主线程使用join
函数等待所有子线程返回结果。
1 | for index in xrange(proc_size): |
有关subprocess模块的用法我已经另外开了一篇文章来记录。
这里的t.daemon = True
表示该线程是主线程的守护线程,守护线程会在主线程退出时自动退出。对每个线程调用start
方法,线程才会启动,这时候线程使用给定的args
参数调用target
参数传入的initial_method
方法。
1 | for i in xrange(proc_size): |
调用外部进程计算
由于n远大于m,因此对每一个外部进程需要使用互斥锁threading.Lock()
维护。将线程i按照模m分组,同剩余系的线程共享一个进程。线程取得进程资源后调用lock.acquire()
为进程资源上锁,这时候如果其他线程再次调用lock.acquire()
则会陷入阻塞状态,直到获得锁的线程调用lock.release()
释放资源。