Python multiprocessing 使用手记[3] – 关于Queue
ç»§ç»è®¨è®ºPython multiprocessing,这次讨论的主è¦å†…容是mpåº“çš„æ ¸å¿ƒç»„ä»¶ä¹‹ä¸€çš„Queue。
Queue是mp库当ä¸ç”¨æ¥æä¾›å¤šè¿›ç¨‹å¯¹è±¡äº¤æ¢çš„æ–¹å¼ã€‚对象交æ¢å’Œä¸Šä¸€éƒ¨åˆ†å½“ä¸æåˆ°çš„å¯¹è±¡å…±äº«éƒ½æ˜¯ä½¿å¤šä¸ªè¿›ç¨‹è®¿é—®åŒä¸€ä¸ªå¯¹è±¡çš„æ–¹å¼ï¼Œä¸¤è€…的区别就是,对象共享是多个进程访问åŒä¸€ä¸ªå¯¹è±¡ï¼Œå¯¹è±¡äº¤æ¢åˆ™æ˜¯å°†å¯¹è±¡ä»Žä¸€ä¸ªè¿›ç¨‹ä¼ 输的å¦ä¸€ä¸ªè¿›ç¨‹ã€‚
multiprocessing当ä¸çš„Queue使用方å¼å’ŒPython内置的threading.Queue对象很åƒï¼Œå®ƒæ”¯æŒä¸€ä¸ªputæ“作,将对象放入Queue,也支æŒä¸€ä¸ªgetæ“作,将对象从Queue当ä¸è¯»å‡ºã€‚å’Œthreading.Queueä¸åŒçš„æ˜¯ï¼Œmp.Queueé»˜è®¤ä¸æ”¯æŒjoin()å’Œtask_doneæ“作,这两个支æŒéœ€è¦ä½¿ç”¨mp.JoinableQueue对象。
由于Queueå¯¹è±¡è´Ÿè´£è¿›ç¨‹ä¹‹é—´çš„å¯¹è±¡ä¼ è¾“ï¼Œå› æ¤ç¬¬ä¸€ä¸ªé—®é¢˜å°±æ˜¯å¦‚何在两个进程之间共享这个Queue对象本身。在上一部分所言的三ç§å…±äº«æ–¹å¼å½“ä¸ï¼ŒQueue对象åªèƒ½ä½¿ç”¨ç»§æ‰¿ï¼ˆinheritance)的方å¼å…±äº«ã€‚è¿™æ˜¯å› ä¸ºQueue本身基于unixçš„Pipe对象实现,而Pipe对象的共享需è¦é€šè¿‡ç»§æ‰¿ã€‚å› æ¤ï¼Œåœ¨ä¸€ä¸ªå…¸åž‹çš„应用实现模型当ä¸ï¼Œåº”该是父进程创建Queue,然åŽåˆ›å»ºå进程共享该Queue,由父进程和å进程分别读写。例如下é¢çš„这个例å:
import multiprocessing q = multiprocessing.Queue() def reader_proc(): print q.get() reader = multiprocessing.Process(target=reader_proc) reader.start() q.put(100) reader.join() |
å¦ä¸€ç§å®žçŽ°æ–¹å¼æ˜¯çˆ¶è¿›ç¨‹åˆ›å»ºQueue,创建多个å进程,有的å进程读Queue,有的å进程写Queue,例如:
import multiprocessing q = multiprocessing.Queue() def writer_proc(): q.put(100) def reader_proc(): print q.get() reader = multiprocessing.Process(target=reader_proc) reader.start() writer = multiprocessing.Process(target=writer_proc) writer.start() reader.join() writer.join() |
由于使用继承的方å¼å…±äº«Queueï¼Œå› æ¤ä»£ç 当ä¸å¹¶æ²¡æœ‰æ˜Žæ˜¾çš„ä¼ è¾“Queue对象本身的代ç ,看起æ¥ä¼¼ä¹Žåªè¦å°†multiprocessing当ä¸çš„å¯¹è±¡æ¢æˆthreading当ä¸çš„对象,程åºä»ç„¶èƒ½å¤Ÿå·¥ä½œã€‚å之,拿到一个现有的多线程程åºï¼Œæ˜¯ä¸æ˜¯å°†threading改æˆmultiprocessingå°±å¯ä»¥å·¥ä½œå‘¢ï¼Ÿä¹Ÿè®¸å¯ä»¥ï¼Œä½†æ˜¯æ›´å¯èƒ½çš„æƒ…å†µæ˜¯ä½ ä¼šé‡åˆ°å¾ˆå¤šé—®é¢˜ã€‚
第一个问题就是mpçš„Queue需è¦è€ƒè™‘å¤šè¿›ç¨‹ä¹‹é—´çš„å¯¹è±¡ä¼ è¾“ï¼Œå› æ¤æ‰€ä¼ 输的对象必须是å¯ä»¥pickle的。å¦åˆ™ï¼Œåœ¨Queueçš„putæ“作上会抛出PicklingError。
å…¶ä»–çš„ä¸€äº›å·®å¼‚è¡¨çŽ°åœ¨ä¸€äº›æŠ€æœ¯ç»†èŠ‚ä¸Šï¼Œè¿™äº›ä¸æ˜¯ä»»ä½•高层逻辑å¯ä»¥æŠ½è±¡æŽ‰çš„,ä¸çŸ¥é“这些差异会导致一些潜在的错误,例如æ»é”。在总结这些潜在的犯错的å¯èƒ½çš„åŒæ—¶ï¼Œæˆ‘们会简å•看一下mp当ä¸Queue的实现方å¼ï¼Œä»¥ä¾¿èƒ½å¤Ÿæ–¹ä¾¿çš„ç†è§£ä¸ºä»€ä¹ˆä¼šæœ‰è¿™æ ·çš„行为。这些实现问题仅仅针对Linux,Windows上é¢çš„å®žçŽ°å’Œå‡ºçŽ°çš„é—®é¢˜åœ¨è¿™é‡Œä¸æ¶‰åŠã€‚
mp.Queue建构在系统的Pipeä¹‹ä¸Šï¼Œä½†æ˜¯å®žé™…ä¸Šè¿›ç¨‹å¹¶ä¸æ˜¯ç›´æŽ¥å°†å¯¹è±¡å†™å…¥åˆ°Pipe里é¢ï¼Œè€Œæ˜¯å…ˆå†™å…¥ä¸€ä¸ªæœ¬åœ°çš„buffer,å†ç”±ä¸€ä¸ªä¸“门的feed线程将其放入Pipe当ä¸ã€‚读å–端则是直接从Pipe当ä¸è¯»å‡ºå¯¹è±¡ã€‚ä¹‹æ‰€ä»¥æœ‰è¿™æ ·ä¸€ä¸ªfeed线程,是为了能够æä¾›Queue接å£å‡½æ•°æ‰€éœ€è¦çš„put的超时控制。但是由于这个feed线程的å˜åœ¨ï¼Œmp.Queueæä¾›äº†å‡ 个é¢å¤–çš„å‡½æ•°æ¥æŽ§åˆ¶å®ƒï¼Œä¸€ä¸ªå‡½æ•°closeæ¥åœæ¢è¯¥çº¿ç¨‹ï¼Œä»¥åŠjoin_threadæ¥join该线程。closeåŒæ—¶è´Ÿè´£æŠŠæ‰€æœ‰åœ¨buffer当ä¸çš„对象刷新到Pipe当ä¸ã€‚
但是这个feedçº¿ç¨‹ä¹Ÿæ˜¯ä¸ªéº»çƒ¦åˆ¶é€ è€…ï¼Œä¸ºäº†ä¿è¯æ‰€æœ‰è¢«æ”¾å…¥Queue的东西最终都能够到达å¦å¤–一端的进程,mp库注册了一个atexit的处ç†å‡½æ•°ï¼Œç”¨æ¥åœ¨è¿›ç¨‹é€€å‡ºçš„æ—¶å€™è‡ªåЍclose并且join该feed线程。这个join动作带æ¥äº†å¾ˆå¤šé—®é¢˜ï¼Œæ¯”如潜在的æ»é”。考虑下é¢ä¸€ç§çŠ¶å†µï¼šä¸€ä¸ªçˆ¶è¿›ç¨‹åˆ›å»ºäº†ä¸¤ä¸ªå进程,一个å进程读,å¦ä¸€ä¸ªå进程写。当需è¦åœæ¢è¿™äº›è¿›ç¨‹çš„æ—¶å€™ï¼Œçˆ¶è¿›ç¨‹å¦‚果先把读进程结æŸï¼Œä½†æ˜¯åŒæ—¶å†™è¿›ç¨‹å·²ç»å°†å¤ªå¤šçš„对象写入Queue,导致åŽç»§çš„对象ç‰å¾…在buffer当ä¸ï¼Œåˆ™è¿™ä¸ªè¿›ç¨‹å°†æ— 法终æ¢ï¼Œå› 为atexit的处ç†å‡½æ•°ç‰å¾…把所有buffer当ä¸çš„对象放入Pipe,但是Pipeå·²ç»æ»¡äº†ï¼Œç„¶åŽé™·å…¥äº†æ»é”。
有人å¯èƒ½ä¼šé—®ï¼Œé‚£åªè¦ä¿è¯æ€»æ˜¯æŒ‰ç…§æ•°æ®æµçš„é¡ºåºæ¥åœæ¢è¿›ç¨‹ä¸å°±è¡Œã€‚é—®é¢˜æ˜¯åœ¨å¾ˆå¤šå¤æ‚的系统æµç¨‹å½“ä¸ï¼Œå¯èƒ½å˜åœ¨ä¸€ä¸ªçŽ¯å½¢çš„æ•°æ®æµï¼Œè¿™ç§æƒ…å†µä¸‹ï¼Œæ— è®ºæŒ‰ç…§ä»€ä¹ˆé¡ºåºåœæ¢è¿›ç¨‹ï¼Œç»ˆç©¶æœ‰ä¸€ä¸ªè¿›ç¨‹å¯èƒ½é™·å…¥è¿™ç§æƒ…景当ä¸ã€‚
幸è¿çš„æ˜¯ï¼ŒQueue对象还æä¾›äº†ä¸€ä¸ªæˆå‘˜å‡½æ•°cancel_join_thread,这个函数å¯ä»¥ä½¿å¾—åœ¨è¿›ç¨‹åœæ¢çš„æ—¶å€™ä¸è¿›è¡Œjoinæ“ä½œï¼Œè¿™æ ·å¯ä»¥é¿å…æ»é”,代价就是这个时候尚未刷新到Pipe当ä¸çš„对象都会丢失。鉴于å³ä½¿è°ƒç”¨äº†join_thread,残留在Pipe当ä¸çš„对象ä»ç„¶å¯èƒ½ä¸¢å¤±ï¼Œæ‰€ä»¥ä¸€æ—¦é€‰æ‹©ä½¿ç”¨mpçš„Queue对象,就ä¸è¦å‡è®¾ä¸ä¼šåœ¨æµç¨‹å½“ä¸ä¸¢å¯¹è±¡äº†ã€‚
å¦å¤–一个å¯èƒ½çš„æ–¹æ¡ˆæ˜¯ä½¿ç”¨mp库当ä¸çš„SimpleQueueå¯¹è±¡ã€‚è¿™ä¸ªå¯¹è±¡åœ¨æ–‡æ¡£å½“ä¸æ²¡æœ‰æåŠï¼Œä½†æ˜¯åœ¨multiprocessing.queue模å—当䏿œ‰å®šä¹‰ã€‚这个对象就是去掉了bufferçš„Queueå¯¹è±¡ï¼Œå› æ¤å¯èƒ½èƒ½å¤Ÿé¿å…上é¢è¯´çš„问题的。但是SimpleQueue没有æä¾›putå’Œget的超时处ç†ï¼Œä¸¤ä¸ªåŠ¨ä½œéƒ½æ˜¯é˜»å¡žçš„ã€‚
除了使用multiprocessing.Queue,还å¯ä»¥ä½¿ç”¨multiprocessing.Pipe进行通信。mp.Pipe是Queue的底层结构,但是没有feed线程和put/get的超时控制。一定程度上和SimpleQueue很åƒã€‚éœ€è¦æ³¨æ„的是Pipeå¸¦æœ‰ä¸€ä¸ªå‚æ•° duplex,当设置为True(默认)的时候,Pipe并䏿˜¯ä½¿ç”¨ç³»ç»Ÿçš„pipeæ¥å®žçŽ°ï¼Œè€Œæ˜¯é€šè¿‡socketpair,å³Unix Domain Socketæ¥å®žçŽ°ã€‚è¿™ä¸ªå’Œpipe相比有些微的性能差异。
å¦å¤–一个使用Queue的方å¼ä¸æ˜¯mpåº“å†…ç½®çš„ã€‚è¿™ç§æ–¹å¼ä½¿ç”¨ä¸Šä¸€ç¯‡æ–‡ç« å½“ä¸æåˆ°çš„server processçš„æ–¹å¼æ¥å…±äº«ä¸€ä¸ªQueue对象。这个Queue对象实际上在server process当ä¸ï¼Œæ‰€æœ‰çš„å进程通过socket连接到server process获å–该Queue的代ç†å¯¹è±¡è¿›è¡Œæ“作。说到这有人会想起æ¥mp库有一个内置的SyncManager对象,å¯ä»¥é€šè¿‡multiprocess.Manager函数获å–到,通过该对象的Queue方法å¯ä»¥èŽ·å–一个Queue的代ç†å¯¹è±¡ã€‚ä¸å¹¸çš„æ˜¯ï¼Œè¿™ä¸ªæ–¹æ³•䏿˜¯æ£ç¡®çš„获å–Queue的方å¼ï¼ŒåŽŸå› æ£å¦‚ä¸Šä¸€ç¯‡æ–‡ç« æ‰€è¯´ï¼ŒSyncManager.Queueæ–¹æ³•çš„æ¯æ¬¡è°ƒç”¨èŽ·å–到的是一个新建对象的代ç†å¯¹è±¡ï¼Œè€Œä¸æ˜¯ä¸€ä¸ªå…±äº«å¯¹è±¡ã€‚æ£ç¡®çš„使用server process当ä¸çš„Queueçš„æ–¹å¼æ˜¯ï¼š
å…±åŒéƒ¨åˆ†ï¼š
import multiprocessing.managers as mpm import Queue class SharedQueueManager(mpm.BaseManager): pass q = Queue.Queue() SharedQueueManager.register('Queue', lambda: q) |
æœåŠ¡è¿›ç¨‹ï¼š
mgr = SharedQueueManager(address=('', 12345)) server = mgr.get_server() server.serve_forever() |
客户进程:
mgr = SharedQueueManager(address=('localhost', 12345)) mgr.connect() q = mgr.Queue() # 这里q就是共享的Queue对象的代ç†å¯¹è±¡ |
è¿™ç§æ–¹å¼æ¯”èµ·mp库内置的Queue,有一些性能上的影å“ï¼Œå› ä¸ºæ¯•ç«Ÿç‰µæ¶‰åˆ°å¤šæ¬¡ç½‘ç»œé€šè®¯ï¼Œä½†æ˜¯å¸¦æ¥çš„好处是没有feed线程带æ¥çš„一系列问题,而且ç†è®ºä¸Šä¸ä¼šå˜åœ¨ä¸¢æ•°æ®çš„问题,除éžserver process崩溃。但是æ£å¦‚上一篇所说,server processæœ¬èº«å°±ä¸æ˜¯å¾ˆé è°±çš„ï¼Œå› æ¤è¿™é‡Œä¹Ÿåªæ˜¯â€œç†è®ºä¸Šâ€ä¸ä¼šä¸¢æ•°æ®è€Œå·²ã€‚
说到性能,这里就列两个性能数æ®ï¼Œä»¥å‰åœ¨twitterä¸Šé¢æåˆ°è¿‡çš„ï¼ˆè¿™ä¸¤ä¸ªè¿žæŽ¥æ— æ³•è®¿é—®çš„è¯·è”系我):
æ“作对象为 pickleåŽ512å—节的对象,通过proxyæ“作Queue的性能大约是7000次/秒(本机)或1100次/秒(多机),如果使用 multiprocessing.Queue,效率å¯è¾¾54000次/秒。