在文章中介绍了multiprocessing模块用法,下面将详细介绍这个模块的实现。其内容包括:
- Manager
- conenction.Pipe
Manager
Manager对象会使用一个Server
进程来维护需要共享的对象,而其他进程需要通过Proxy来访问这些共享对象。
SyncManager和BaseManager
register
当我们调用multiprocessing.Manager()
时,实际上创建了一个SyncManager
,并调用它的start
。在start
中,会创建一个子进程来维护共享对象。
1 | # __init__.py |
我们可以通过SyncManager.dict()
等方法创建共享对象,并返回其代理。我们首先来查看它的实现。
1 | class SyncManager(BaseManager): |
从源码上看,SyncManager
继承了一个BaseManager
,通过register
方法在这之中添加了对不同类型共享对象的支持。如果我们需要增加自定义的类,那么就需要放在这里
1 | # 'dict', dict, DictProxy |
这个函数主要是围绕传入的几个参数的,我们结合整个函数的源码看这些参数的作用。
cls
类似于成员函数里的self,用来在类里面表示自己,在这里也就是我们的SyncManagertypeid
是一个字符串,对应于”Queue”/“dict”等callable
用来创建typeid对应的对象,对应于上面的Queue.Queue
和dict
等。如果一个manager实例通过from_address()
创建,或者create_method
参数时False的,那么callable就可以被设置为Noneproxytype
是BaseProxy
的子类,用来创建一个用来访问这个共享对象的代理,如果设置为None,就会默认使用Autoproxy
1
2if proxytype is None:
proxytype = AutoProxyexposed
用来限定proxy上的这个方法名字是不是public的。
如果请求访问一个没有exposed的接口,那么就会AttributeError
并且调用fallback_func
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# 来自serve_client方法
try:
...
if methodname not in exposed:
raise AttributeError(
'method %r of %r object is not in exposed=%r' %
(methodname, type(obj), exposed)
)
...
except AttributeError:
if methodname is None:
msg = ('#TRACEBACK', format_exc())
else:
try:
fallback_func = self.fallback_mapping[methodname]
result = fallback_func(
self, conn, ident, obj, *args, **kwds
)
msg = ('#RETURN', result)
except Exception:
msg = ('#TRACEBACK', format_exc())
...如果指定
exposed
为None,那么就会使用proxytype._exposed_
,如果proxytype._exposed_
也是None,那么就使用这个共享对象的所有的public方法。通过public_methods
得到所有不以_
开头的方法。1
2
3
4
5
6
7
8
9
10
11exposed = exposed or getattr(proxytype, '_exposed_', None)
def all_methods(obj):
temp = []
for name in dir(obj):
func = getattr(obj, name)
if hasattr(func, '__call__'):
temp.append(name)
return temp
def public_methods(obj):
return [name for name in all_methods(obj) if name[0] != '_']method_to_typeid
这个是一个dict
,列出了所有返回值是一个proxy的方法。如果没有指定,那么就使用proxytype._method_to_typeid_
,如果仍然为空,那么所有的返回值都按值复制。我们可以参考下面的方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16# 在BaseManager.register中
method_to_typeid = method_to_typeid or \
getattr(proxytype, '_method_to_typeid_', None)
if method_to_typeid:
for key, value in method_to_typeid.items():
assert type(key) is str, '%r is not a string' % key
assert type(value) is str, '%r is not a string' % value
# 在全局空间
PoolProxy._method_to_typeid_ = {
'apply_async': 'AsyncResult',
'map_async': 'AsyncResult',
'imap': 'Iterator',
'imap_unordered': 'Iterator'
}create_method
是一个布尔量,默认True。要不要创建一个和typeid
同名的方法,并且用这个方法来通知Server
进程创建一个共享对象并返回一个proxy。这个只有对Iterator
类是False的,也就是说Manager
不想提供一个接口让你显式创建一个Iterator
。1
2
3
4
5if create_method:
def temp(self, *args, **kwds):
...
temp.__name__ = typeid
setattr(cls, typeid, temp)这个
if
分支中实际上是创建了一个temp
函数,并且通过setattr
将这个temp函数注册为cls中的typeid
的方法。这样当我们在调用manager.XXX()
时,就会执行这个temp
里面的内容。我们将在后面深入探讨这个代码。
register
函数除了和几个参数有关的代码之外,剩下就这几行代码了,它们的作用就是将描述这个typeid
的信息打包,存储到_registry
中。
1 | if '_registry' not in cls.__dict__: |
我们打印一下register完所有类的cls._registry
为JoinableQueue,Namespace,BoundedSemaphore,Iterator,Lock,list,RLock,Queue,Array,dict,Pool,Semaphore,Value,Event,Condition
创建共享对象(Client)
这一章节的开始,我们来看register
函数中是如何创建共享对象的,主要过程就是通读一下register
后面的代码和_create
函数的实现。在创建完共享对象之后,这个共享对象就可以看做是远程对象(稍后将讲解的Server端)的一个代理。if create_method
中创建的temp
入口方法的主要功能是:
- 先通过
_create
通知Server
创建一个共享变量,并返回其唯一标识token
- 创建一个连接到这个
token
的proxy,这个proxy还作为这个函数的返回值 decref
,有关引用计数的问题将在Server
章节中讨论
1 | # managers.py |
_create
首先查看_create
这个方法,首先不得不提的是它接受一堆*args
和**kwds
的参数,这两个鬼东西它自己不用,而是传给dispatch
,而dispatch
也不用,而是直接send
出去。
下面来看流程,首先创建了一个_Client
类型的变量conn
。By the way,那么Server在哪里呢,稍后解释。
1 | # managers.py |
查看__init__
方法得知,这个实际上就是一个套接字。特别地,如果机器支持AF_UNIX
的话,会使用UNIX Domain Socket来进行同一主机上的IPC,省去了网络协议栈的层层封装等流程。但从中也能看出,实际上BaseManager在设计上是能够跨机器进行通信的
1 | # managers.py |
在创建好conn
之后,就会通过dispatch
调用conn.send
发送一个对methodname为”create”的调用,并且在参数表头部附上当前的typeid
。当这个调用被传到Server类时,会调用Server.create
方法,在Server子进程空间中创建一个共享对象,这个将在下一节讲解有关Server部分中提到。
1 | # 继续是_create函数 |
dispatch函数
下面介绍这个大boss,dispatch
函数。
1 | # dispatch函数 |
接着dispatch
函数就调用conn.recv()
获取返回值。返回值的kind
一共有#RETURN
、#ERROR
、#TRACEBACK
、#UNSERIALIZABLE
四种。如果当前的dispatch
调用返回正常的话,他应该会拿到一个exposed
,包含了共享对象中所能够调用的所有方法名。
在调用dispatch
函数之后,_create
就返回了一个Token
类型和从dispatch
拿到的exposed
。Token
的定义如下,被用来唯一确定一个共享对象。
1 | class Token(object): |
proxy
在调用_create
通知Server
创建完共享对象之后,temp
函数就创建一个proxy来代理这个远程的共享对象,这里的proxytype
是由register
函数中传入的,一般是AutoProxy
,我们将在后面详细介绍。
1 | proxy = proxytype( |
创建共享对象(Server)
上面的内容是Client部分的初始化过程,下面我们查看Server部分。需要注意的是,虽然是Server/Client结构,但是因为Manager
的Server和Client都在一台机器上,并且由同样的代码进行管理,所以在形式上和传统的Socket是不一样的。由于Server是作为一个子进程存在,并且这个子进程和Server本身都是由BaseManager
来管理的,并且也需要通过BaseManager
提供的方法来操控。所以我们可以看到很多Server部分的工作是在BaseManager
而不是Server
内部处理的。
根据文档,一旦BaseManager
被创建,就需要调用start
或者get_server().serve_forever()
来确保这个manager是和一个进程关联的。查看代码可以发现,这个start是在__init__.py
上面被调用的
1 | # __init__.py |
因此要了解Server如何被创建,就要先看BaseManager
是如何被创建的。
构造函数
首先来全面查看一下BaseManager
的构造函数
1 | # managers.py |
我们查看一下authkey
的相关实现,由于其中涉及了不少Process
相关的实现,在这里暂时不提
1 | # process.py |
start方法
我们查看BaseManager.start
方法,首先创建了一个Process
,并通过connection.Pipe
和这个子进程进行通信。关于这个管道的实现,在后面进行说明。
1 | # managers.py |
self._registry
这个就是之前在register
方法中保存下来的_registry
字典。-
self._address
来自于BaseManager
的__init__
-
self._authkey
来自于BaseManager
的__init__
下面暂停对start
的阅读,而step in查看Process
中执行的主函数_run_server
,可以看到在这个方法中:
- 首先执行
initializer
,这个initializer
是start
在args
里面传进来的,如果是从__init__.py
调用的,那么就是None
- 创建一个
Server
对象_Server
- 向管道里面写入
server.address
,这个会被在主进程的reader
读取 - 调用
serve_forever
1 |
|
关于Server
类,我们稍后来查看,现在我们继续看完start
1 | # 继续start方法 |
额外说明一下,这里的shutdown
在__exit__
中被调用。这里的__enter__
和__exit__
被用来实现with
1 | def __enter__(self): |
实际上,util.Finalize
是调用type(self)._finalize_manager
,使用type(self)
是因为_finalize_manager
是一个类方法。一般涉及到Process
有关的函数,都不能是带有上下文的,而必须是自由函数。在_finalize_manager
中主要就是先创建一个_Client
,向Server
进程self._process
发送shutdown
指令,接着尝试join这个进程。如果在0.2秒之后这个进程还在的话,就尝试去terminate
它。从Process
源码来看,它是始终有terminate
这个方法的,不知道为啥还需要hasattr
来判断下。
1 |
|
Server
先前围绕BaseManager
类讨论了Server部分的初始化和析构。现在我们来看Server
这个对象,以便了解它具体负责的内容。
首先,照例是构造函数,包含了和套接口API很像的Listener。后面的id_to_obj
就是Server所维护的所有共享对象了。id_to_obj
是一个dict,它的key是由'%x' % id(obj)
来生成的,之所以需要转换成字符串类型是因为xmlrpclib
这个库只支持32位的int,所以用str保险一点(当然之前提到,实际用的是pickle)。id_to_refcount
则用来标记每个obj的生命周期。
1 | # managers.py |
回忆前面的内容,Client会调用dispatch(conn, None, 'create', (typeid,)+args, kwds)
来通知子进程创建一个共享对象。看源码显而易见,肯定最终是调用Server.create
这个方法的,但在这之前,会经过一个serve_client
方法分发的过程。
1 | # managers.py |
Proxy
下面我们来看Proxy部分的实现。在SyncManager.register('dict', dict, DictProxy)
中用到的DictProxy
是由MakeProxyType
来生成的,这个函数就是一个创建类的简易的“宏”。它创建名字为name
的类,继承自BaseProxy
,并且传入一个tuple作为这个子类所拥有的方法。对于每个方法,实际调用BaseProxy
里面的_callmethod
来执行。
1 | DictProxy = MakeProxyType('DictProxy', ( |
下面,我们来看看BaseProxy
这个类是如何通过_callmethod
转发方法的。可以看到,BaseProxy
通过self._tls.connection.send()
这个方法将需要调用的方法发送出去,看起来这个_tls
就是个套接口了。
1 | def _callmethod(self, methodname, args=(), kwds={}): |
我们查看BaseProxy
的初始化部分,看看_tls
是何方神圣
1 | class BaseProxy(object): |
补充
Manager的Util部分
看看这个ForkAwareThreadLock
实现
1 | class ForkAwareThreadLock(object): |
这里需要跟踪fork之后锁的原因是The child process is created with a single thread–the one that called fork(),因此可能拥有锁的子线程并不在子进程之中。
可以发现,在fork之后,会执行_run_after_forkers
方法,这个方法会遍历出_afterfork_registry
中的所含有lock,并且将它们换成新的threading.Lock()
connection.Pipe实现
可以想象,根据是否是POSIX系统分为了两种实现。根据sys.platform
来判断系统类型。
POSIX
对于POSIX系统,借助socketpair
实现双向管道,pipe
实现单向管道。相比于popen和pclose,socketpair直接提供了一个双向的读写管道。
我们知道可以通过dup2
来进行重定向,例如dup2(f, STDOUT_FILENO)
可以将标准输出指向文件f
。那么这里为什么要复制一份s1.fileno()
并来创建一个_multiprocessing.Connection
对象呢?
1 | import multiprocessing |
要想研究这个问题,需要查看_multiprocessing.Connection
这个用C实现的模块。
通过观察_multiprocessing
下面的目录结构可以看出,它会在c文件里面写上一堆特化的实现,然后在最后#include connection.h
来做一个公共的实现。从#define CONNECTION_NAME "Connection"
可以看出,_multiprocessing.Connection
实际上是socket_connection
。
分析一个模块,首先要找它的PyMODINIT_FUNC
函数、PyMethodDef
结构、PyModuleDef
结构。但_multiprocessing.Connection
被定义为一个对象,所以我们分析它的PyVarObject_HEAD_INIT(NULL, 0)
的定义。可以看到,它的初始化方法是connection_new
。
我们尝试了下面的代码,发现在Linux下面抛出IOError: [Errno 9] Bad file descriptor
的错误。因此可以猜测到s1
和s2
在销毁时会释放自己的fd,这里提前复制一份出来是单纯为了复用这两个fd。
WINDOWS
对于WINDOWS系统,借助于CreateNamedPipe
实现。
注意事项
multiprocessing.Manager()
的dict()
不可以嵌套,也就是说下面的语句实际上会报错
1 | def inner(metainfo): |