Python multiprocessing 使用手记[2] – 跨进程对象共享
继ç»å†™å…³äºŽPython multiprocessing的使用手记,继上次的进程模型之åŽï¼Œè¿™æ¬¡å±•å¼€è®¨è®ºä¸€ä¸‹multiprocessing当ä¸çš„跨进程对象共享的问题。
在mp库当ä¸ï¼Œè·¨è¿›ç¨‹å¯¹è±¡å…±äº«æœ‰ä¸‰ç§æ–¹å¼ï¼Œç¬¬ä¸€ç§ä»…适用于原生机器类型,å³python.ctypes当ä¸çš„类型,这ç§åœ¨mp库的文档当ä¸ç§°ä¸ºshared memoryæ–¹å¼ï¼Œå³é€šè¿‡å…±äº«å†…å˜å…±äº«å¯¹è±¡ï¼›å¦å¤–一ç§ç§°ä¹‹ä¸ºserver process,å³æœ‰ä¸€ä¸ªæœåŠ¡å™¨è¿›ç¨‹è´Ÿè´£ç»´æŠ¤æ‰€æœ‰çš„对象,而其他进程连接到该进程,通过代ç†å¯¹è±¡æ“作æœåŠ¡å™¨è¿›ç¨‹å½“ä¸çš„对象;最åŽä¸€ç§åœ¨mp文档当ä¸æ²¡æœ‰å•ç‹¬æ出,但是在其ä¸å¤šæ¬¡æ到,而且是mp库当ä¸æœ€é‡è¦çš„一ç§å…±äº«æ–¹å¼ï¼Œç§°ä¸ºinheritance,å³ç»§æ‰¿ï¼Œå¯¹è±¡åœ¨çˆ¶è¿›ç¨‹å½“ä¸åˆ›å»ºï¼Œç„¶åŽåœ¨çˆ¶è¿›ç¨‹æ˜¯é€šè¿‡multiprocessing.Process创建å进程之åŽï¼Œå进程自动继承了父进程当ä¸çš„对象,并且å进程对这些对象的æ“作都是åæ˜ åˆ°äº†åŒä¸€ä¸ªå¯¹è±¡ã€‚
这三者共享方å¼å„有特色,在这里进行一些简å•çš„比较。
首先是共享方å¼æ‰€åº”对的对象类型,看这个表:
å…±äº«æ–¹å¼ | 支æŒçš„类型 |
Shared memory | ctypes当ä¸çš„类型,通过RawValue,RawArrayç‰åŒ…装类æä¾› |
Inheritance | ç³»ç»Ÿå†…æ ¸å¯¹è±¡ï¼Œä»¥åŠåŸºäºŽè¿™äº›å¯¹è±¡å®žçŽ°çš„对象。包括Pipe, Queue, JoinableQueue, åŒæ¥å¯¹è±¡(Semaphore, Lock, RLock, Condition, Eventç‰ç‰) |
Server process | 所有对象,å¯èƒ½éœ€è¦è‡ªå·±æ‰‹å·¥æ供代ç†å¯¹è±¡(Proxy) |
这个表总结了三ç§ä¸åŒçš„共享方å¼æ‰€æ”¯æŒçš„类型,下é¢ä¸€ä¸ªä¸ªå±•å¼€è®¨è®ºã€‚
å…¶ä¸æœ€å•çº¯ç®€å•çš„就是shared memoryè¿™ç§æ–¹å¼ï¼Œåªæœ‰ctypes当ä¸çš„æ•°æ®ç±»åž‹å¯ä»¥é€šè¿‡è¿™ç§æ–¹å¼å…±äº«ã€‚由于mp库本身缺少命å的机制,å³åœ¨ä¸€ä¸ªè¿›ç¨‹å½“ä¸åˆ›å»ºçš„å¯¹è±¡ï¼Œæ— æ³•åœ¨å¦å¤–一个进程当ä¸é€šè¿‡åå—æ¥å¼•ç”¨ï¼Œå› æ¤ï¼Œè¿™ç§å…±äº«æ–¹å¼ä¾èµ–于继承,对象应该由父进程创建,然åŽç”±å进程引用。关于这ç§æœºåˆ¶çš„例å,å¯ä»¥å‚è§Python文档当ä¸çš„例å Synchronization types like locks, conditions and queues,å‚考其ä¸çš„test_sharedvalues函数。
然åŽæ˜¯ç»§æ‰¿æ–¹å¼ã€‚首先关于继承方å¼éœ€è¦æœ‰è¯´æ˜Žï¼Œç»§æ‰¿æœ¬è´¨ä¸Šå¹¶ä¸æ˜¯ä¸€ç§å¯¹è±¡å…±äº«çš„机制,对象共享åªæ˜¯å…¶å‰¯ä½œç”¨ã€‚å进程从父进程继承æ¥çš„对象并ä¸ä¸€å®šæ˜¯å…±äº«çš„。继承本质上是父进程fork出的å进程自动继承父进程的内å˜çŠ¶æ€å’Œå¯¹è±¡æè¿°ç¬¦ã€‚å› æ¤ï¼Œå®žé™…上å进程å¤åˆ¶äº†ä¸€ä»½çˆ¶è¿›ç¨‹çš„对象,åªä¸è¿‡ï¼Œå½“è¿™ä¸ªå¯¹è±¡åŒ…è£…äº†ä¸€äº›ç³»ç»Ÿå†…æ ¸å¯¹è±¡çš„æ述符的时候,拷è´è¿™ä¸ªå¯¹è±¡ï¼ˆåŠå…¶åŒ…装的æè¿°ç¬¦ï¼‰å®žçŽ°äº†å¯¹è±¡çš„å…±äº«ã€‚å› æ¤ï¼Œåœ¨ä¸Šé¢çš„表当ä¸ï¼Œåªæœ‰ç³»ç»Ÿå†…æ ¸å¯¹è±¡ï¼Œå’ŒåŸºäºŽè¿™äº›å¯¹è±¡å®žçŽ°çš„å¯¹è±¡ï¼Œæ‰èƒ½å¤Ÿé€šè¿‡ç»§æ‰¿æ¥å…±äº«ã€‚通过继承共享的对象在linuxå¹³å°ä¸Šæ²¡æœ‰ä»»ä½•é™åˆ¶ï¼Œä½†æ˜¯åœ¨Windows上é¢ç”±äºŽæ²¡æœ‰forkçš„å®žçŽ°ï¼Œå› æ¤æœ‰ä¸€äº›é¢å¤–çš„é™åˆ¶æ¡ä»¶ï¼Œå› æ¤ï¼Œåœ¨Windows上é¢ï¼Œç»§æ‰¿æ–¹å¼æ˜¯å‡ ä¹Žæ— æ³•ç”¨çš„ã€‚
最åŽå°±æ˜¯Server Processè¿™ç§æ–¹å¼ã€‚è¿™ç§æ–¹å¼å¯ä»¥æ”¯æŒçš„类型比å¦å¤–两ç§éƒ½å¤šï¼Œå› ä¸ºå…¶æ¨¡åž‹æ˜¯è¿™æ ·çš„ï¼š
在这个模型当ä¸ï¼Œæœ‰ä¸€ä¸ªmanager进程,负责管ç†å®žé™…的对象。真æ£çš„对象也是在manager进程的内å˜ç©ºé—´å½“ä¸ã€‚所有需è¦è®¿é—®è¯¥å¯¹è±¡çš„进程都需è¦å…ˆè¿žæŽ¥åˆ°è¯¥ç®¡ç†è¿›ç¨‹ï¼Œç„¶åŽèŽ·å–到对象的一个代ç†å¯¹è±¡ï¼ˆProxy object),通常情况下,这个代ç†å¯¹è±¡æ供了实际对象的公共函数的代ç†ï¼Œå°†å‡½æ•°å‚数进行pickle,然åŽé€šè¿‡è¿žæŽ¥ä¼ é€åˆ°ç®¡ç†è¿›ç¨‹å½“ä¸ï¼Œç®¡ç†è¿›ç¨‹å°†å‚æ•°unpickle之åŽï¼Œè½¬å‘给相应的实际对象的函数,返回值(或者异常)åŒæ ·ç»è¿‡ç®¡ç†è¿›ç¨‹pickle之åŽï¼Œé€šè¿‡è¿žæŽ¥ä¼ 回到客户进程,å†ç”±proxy对象进行unpickle,返回给调用者或者抛出异常。
很明显,这个模型是一个典型的RPCï¼ˆè¿œç¨‹è¿‡ç¨‹è°ƒç”¨ï¼‰çš„æ¨¡åž‹ã€‚å› ä¸ºæ¯ä¸ªå®¢æˆ·è¿›ç¨‹å®žé™…上都是在访问manager进程当ä¸çš„å¯¹è±¡ï¼Œå› æ¤å®Œå…¨å¯ä»¥é€šè¿‡è¿™ä¸ªå®žçŽ°å¯¹è±¡å…±äº«ã€‚
managerå’Œproxy之间的连接å¯ä»¥æ˜¯åŸºäºŽsocket的网络连接,也å¯ä»¥æ˜¯unix pipe。如果是使用基于socket的连接方å¼ï¼Œåœ¨ä½¿ç”¨proxy之å‰ï¼Œéœ€è¦è°ƒç”¨manager对象的connect函数与远程的manager进程建立连接。由于manager进程会打开端å£æŽ¥æ”¶è¯¥è¿žæŽ¥ï¼Œå› æ¤å¿…è¦çš„身份验è¯æ˜¯éœ€è¦çš„,å¦åˆ™ä»»ä½•äººéƒ½å¯ä»¥è¿žä¸Šmanagerå¼„ä¹±ä½ çš„å…±äº«å¯¹è±¡ã€‚mp库通过authkeyçš„æ–¹å¼æ¥è¿›è¡Œèº«ä»½éªŒè¯ã€‚
在实现当ä¸ï¼Œmanager进程通过multiprocessing.Manager类或者BaseManagerçš„å类实现。BaseManageræ供了函数register注册一个函数æ¥èŽ·å–共享对象的proxy。这个函数会被客户进程调用,然åŽåœ¨manager进程当ä¸æ‰§è¡Œã€‚这个函数å¯ä»¥è¿”回一个共享的对象(对所有的调用返回åŒä¸€ä¸ªå¯¹è±¡ï¼‰ï¼Œæˆ–者å¯ä»¥ä¸ºæ¯ä¸€ä¸ªè°ƒç”¨åˆ›å»ºä¸€ä¸ªæ–°çš„对象,通过å‰è€…å°±å¯ä»¥å®žçŽ°å¤šä¸ªè¿›ç¨‹å…±äº«ä¸€ä¸ªå¯¹è±¡ã€‚关于这个的用法å¯ä»¥å‚考Python文档当ä¸çš„例å“Demonstration of how to create and use customized managers and proxiesâ€ã€‚
典型的导出一个共享对象的代ç 是:
ObjectType object_ class ObjectManager(multiprocessing.managers.BaseManager): pass ObjectManager.register("object", lambda: object_) |
注æ„上é¢ä»‹ç»proxy对象的时候,我æ到的“公共函数â€å››ä¸ªå—。æ¯ä¸ªproxy对象åªä¼šå¯¼å‡ºå®žé™…对象的公共函数。这里é¢æœ‰ä¸¤ä¸ªå«ä¹‰ï¼Œä¸€ä¸ªæ˜¯â€œå…¬å…±â€ï¼Œå³æ‰€æœ‰éžä¸‹åˆ’线开头的æˆå‘˜ï¼Œå¦ä¸€ä¸ªæ˜¯â€œå‡½æ•°â€ï¼Œå³æ‰€æœ‰callableçš„æˆå‘˜ã€‚这就带æ¥ä¸€äº›é™åˆ¶ï¼Œä¸€æ˜¯æ— æ³•å¯¼å‡ºå±žæ€§ï¼ŒäºŒæ˜¯æ— æ³•å¯¼å‡ºä¸€äº›å…¬å…±çš„ç‰¹æ®Šå‡½æ•°ï¼Œä¾‹å¦‚__get__, __next__ç‰ç‰ã€‚对于这个mp库有一套处ç†ï¼Œå³è‡ªå®šä¹‰proxy对象。首先是BaseManagerçš„registerå¯ä»¥æ供一个proxy_type作为第三个å‚数,这个å‚数指定了哪些æˆå‘˜éœ€è¦è¢«å¯¼å‡ºã€‚详细的使用方法å¯ä»¥å‚è§æ–‡æ¡£å½“ä¸çš„第一个例å。
å¦å¤–manager还有一些细节的问题需è¦æ³¨æ„。由于Proxy对象ä¸æ˜¯çº¿ç¨‹å®‰å…¨çš„ï¼Œå› æ¤å¦‚果需è¦åœ¨ä¸€ä¸ªå¤šçº¿ç¨‹ç¨‹åºå½“ä¸ä½¿ç”¨proxy,mp库会为æ¯ä¸ªçº¿ç¨‹åˆ›å»ºä¸€ä¸ªproxy对象,而æ¯ä¸ªproxy对象都会对server process创建一个连接,而manager那边对于æ¯ä¸ªè¿žæŽ¥éƒ½åˆ›å»ºä¸€ä¸ªå•ç‹¬çš„线程æ¥ä¸ºå…¶æœåŠ¡ã€‚è¿™æ ·å¸¦æ¥çš„问题就是,如果客户进程有很多线程,很容易会导致manager进程的fd数目达到ulimitçš„é™åˆ¶ï¼Œå³ä½¿æ²¡æœ‰è¾¾åˆ°é™åˆ¶ï¼Œä¹Ÿä¼šå› 为manager进程当ä¸æœ‰å¤ªå¤šçº¿ç¨‹è€Œä¸¥é‡å½±å“manager的性能。解决方案å¯ä»¥æ˜¯ä¸€ä¸ªè¿›ç¨‹å†…cache,åªæœ‰ä¸€ä¸ªå•ç‹¬çš„线程å¯ä»¥åˆ›å»ºproxy对象访问共享对象,其余线程åªèƒ½è®¿é—®è¯¥è¿›ç¨‹å½“ä¸çš„cache。
一旦managerå› ä¸ºè¾¾åˆ°ulimité™åˆ¶æˆ–者其他异常,manager会直接退出,é—憾的是,这时候已ç»å»ºç«‹çš„proxy会试图é‡æ–°è¿žæŽ¥manager – 但是它已ç»ä¸å˜åœ¨äº†ã€‚这个会导致客户进程hang在对proxy的函数调用上,这个时候,目å‰é™¤äº†æ€æŽ‰è¿›ç¨‹æ²¡æœ‰æ‰¾åˆ°åˆ«çš„办法。
å¦å¤–proxy使用socketçš„æ–¹å¼æ¯”较trickyï¼Œå› æ¤å’Œå†…置的socket库有很多冲çªï¼Œæ¯”如socket.setdefaulttimeout(Python Issue 6056 )。在setdefaulttimeout调用了之åŽï¼Œè¿›ç¨‹å½“ä¸æ‰€æœ‰é€šè¿‡socket模å—建立的socket都是被设置为unblock模å¼çš„,但是mp库并ä¸çŸ¥é“这一点,而且它总是å‡è®¾socket都是block模å¼çš„,于是,一旦调用了setdefaulttimeout,所有对于proxy的函数调用都会抛出OSError,错误代ç 为11ï¼Œé”™è¯¯åŽŸå› æ˜¯éžå¸¸æœ‰è¯¯å¯¼æ€§çš„“Resource temporarily unavailableâ€ï¼Œå®žé™…上就是EAGAIN。这个错误å¯ä»¥é€šè¿‡æˆ‘æ供的一个patchæ¥è¡¥æ•‘(这个patch当ä¸è¿˜åŒ…å«å…¶ä»–的一些修å¤ï¼Œæ‰€ä»¥è¯·è‡ªè¡ŒæŸ¥çœ‹å¹¶ä¿®æ”¹è¯¥patch)。
ç”±äºŽä»¥ä¸Šçš„ä¸€äº›åŽŸå› ï¼Œserver process模å¼ä½œä¸ºä¸€ä¸ªå¯¹è±¡çš„共享模å¼ï¼Œèƒ½å¤Ÿæ供最为çµæ´»çš„共享方å¼ï¼Œä½†æ˜¯ä¹Ÿæœ‰æœ€å¤šçš„问题。这个在使用过程当ä¸å°±é 自己去衡é‡äº†ã€‚ç›®å‰æˆ‘们的系统对于数æ®å¯é 性方é¢è¦æ±‚ä¸é«˜ï¼Œä¸¢å¤±æ•°æ®æ˜¯å¯ä»¥æŽ¥å—的,但是也åªç”¨è¿™ç§æ¨¡å¼æ¥ç»´æŠ¤ç»Ÿè®¡å€¼ï¼Œä¸æ•¢ç”¨æ¥ç»´æŠ¤æ›´å¤šçš„东西。
关于跨进程共享对象的问题就写到这里,åŽé¢å†…容待ç»â€¦â€¦
test_sharedvalues