Python multiprocessing 使用手记[3] – 关于Queue
继ç»è®¨è®ºPython multiprocessing,这次讨论的主è¦å†…容是mpåº“çš„æ ¸å¿ƒç»„ä»¶ä¹‹ä¸€çš„Queue。
Queue是mp库当ä¸ç”¨æ¥æ供多进程对象交æ¢çš„æ–¹å¼ã€‚对象交æ¢å’Œä¸Šä¸€éƒ¨åˆ†å½“ä¸æ到的对象共享都是使多个进程访问åŒä¸€ä¸ªå¯¹è±¡çš„æ–¹å¼ï¼Œä¸¤è€…的区别就是,对象共享是多个进程访问åŒä¸€ä¸ªå¯¹è±¡ï¼Œå¯¹è±¡äº¤æ¢åˆ™æ˜¯å°†å¯¹è±¡ä»Žä¸€ä¸ªè¿›ç¨‹ä¼ 输的å¦ä¸€ä¸ªè¿›ç¨‹ã€‚
multiprocessing当ä¸çš„Queue使用方å¼å’ŒPython内置的threading.Queue对象很åƒï¼Œå®ƒæ”¯æŒä¸€ä¸ªputæ“作,将对象放入Queue,也支æŒä¸€ä¸ªgetæ“作,将对象从Queue当ä¸è¯»å‡ºã€‚å’Œthreading.Queueä¸åŒçš„是,mp.Queue默认ä¸æ”¯æŒjoin()å’Œtask_doneæ“作,这两个支æŒéœ€è¦ä½¿ç”¨mp.JoinableQueue对象。
由于Queueå¯¹è±¡è´Ÿè´£è¿›ç¨‹ä¹‹é—´çš„å¯¹è±¡ä¼ è¾“ï¼Œå› æ¤ç¬¬ä¸€ä¸ªé—®é¢˜å°±æ˜¯å¦‚何在两个进程之间共享这个Queue对象本身。在上一部分所言的三ç§å…±äº«æ–¹å¼å½“ä¸ï¼ŒQueue对象åªèƒ½ä½¿ç”¨ç»§æ‰¿ï¼ˆinheritance)的方å¼å…±äº«ã€‚è¿™æ˜¯å› ä¸ºQueue本身基于unixçš„Pipe对象实现,而Pipe对象的共享需è¦é€šè¿‡ç»§æ‰¿ã€‚å› æ¤ï¼Œåœ¨ä¸€ä¸ªå…¸åž‹çš„应用实现模型当ä¸ï¼Œåº”该是父进程创建Queue,然åŽåˆ›å»ºå进程共享该Queue,由父进程和å进程分别读写。例如下é¢çš„这个例å:
import multiprocessing q = multiprocessing.Queue() def reader_proc(): print q.get() reader = multiprocessing.Process(target=reader_proc) reader.start() q.put(100) reader.join() |
å¦ä¸€ç§å®žçŽ°æ–¹å¼æ˜¯çˆ¶è¿›ç¨‹åˆ›å»ºQueue,创建多个å进程,有的å进程读Queue,有的å进程写Queue,例如:
import multiprocessing q = multiprocessing.Queue() def writer_proc(): q.put(100) def reader_proc(): print q.get() reader = multiprocessing.Process(target=reader_proc) reader.start() writer = multiprocessing.Process(target=writer_proc) writer.start() reader.join() writer.join() |
由于使用继承的方å¼å…±äº«Queueï¼Œå› æ¤ä»£ç 当ä¸å¹¶æ²¡æœ‰æ˜Žæ˜¾çš„ä¼ è¾“Queue对象本身的代ç ,看起æ¥ä¼¼ä¹Žåªè¦å°†multiprocessing当ä¸çš„对象æ¢æˆthreading当ä¸çš„对象,程åºä»ç„¶èƒ½å¤Ÿå·¥ä½œã€‚å之,拿到一个现有的多线程程åºï¼Œæ˜¯ä¸æ˜¯å°†threading改æˆmultiprocessingå°±å¯ä»¥å·¥ä½œå‘¢ï¼Ÿä¹Ÿè®¸å¯ä»¥ï¼Œä½†æ˜¯æ›´å¯èƒ½çš„æƒ…å†µæ˜¯ä½ ä¼šé‡åˆ°å¾ˆå¤šé—®é¢˜ã€‚
第一个问题就是mpçš„Queue需è¦è€ƒè™‘å¤šè¿›ç¨‹ä¹‹é—´çš„å¯¹è±¡ä¼ è¾“ï¼Œå› æ¤æ‰€ä¼ 输的对象必须是å¯ä»¥pickle的。å¦åˆ™ï¼Œåœ¨Queueçš„putæ“作上会抛出PicklingError。
其他的一些差异表现在一些技术细节上,这些ä¸æ˜¯ä»»ä½•é«˜å±‚逻辑å¯ä»¥æŠ½è±¡æŽ‰çš„,ä¸çŸ¥é“这些差异会导致一些潜在的错误,例如æ»é”。在总结这些潜在的犯错的å¯èƒ½çš„åŒæ—¶ï¼Œæˆ‘们会简å•çœ‹ä¸€ä¸‹mp当ä¸Queue的实现方å¼ï¼Œä»¥ä¾¿èƒ½å¤Ÿæ–¹ä¾¿çš„ç†è§£ä¸ºä»€ä¹ˆä¼šæœ‰è¿™æ ·çš„行为。这些实现问题仅仅针对Linux,Windows上é¢çš„实现和出现的问题在这里ä¸æ¶‰åŠã€‚
mp.Queue建构在系统的Pipe之上,但是实际上进程并ä¸æ˜¯ç›´æŽ¥å°†å¯¹è±¡å†™å…¥åˆ°Pipe里é¢ï¼Œè€Œæ˜¯å…ˆå†™å…¥ä¸€ä¸ªæœ¬åœ°çš„buffer,å†ç”±ä¸€ä¸ªä¸“门的feed线程将其放入Pipe当ä¸ã€‚读å–端则是直接从Pipe当ä¸è¯»å‡ºå¯¹è±¡ã€‚ä¹‹æ‰€ä»¥æœ‰è¿™æ ·ä¸€ä¸ªfeed线程,是为了能够æä¾›Queue接å£å‡½æ•°æ‰€éœ€è¦çš„put的超时控制。但是由于这个feed线程的å˜åœ¨ï¼Œmp.Queueæä¾›äº†å‡ ä¸ªé¢å¤–的函数æ¥æŽ§åˆ¶å®ƒï¼Œä¸€ä¸ªå‡½æ•°closeæ¥åœæ¢è¯¥çº¿ç¨‹ï¼Œä»¥åŠjoin_threadæ¥join该线程。closeåŒæ—¶è´Ÿè´£æŠŠæ‰€æœ‰åœ¨buffer当ä¸çš„对象刷新到Pipe当ä¸ã€‚
但是这个feedçº¿ç¨‹ä¹Ÿæ˜¯ä¸ªéº»çƒ¦åˆ¶é€ è€…ï¼Œä¸ºäº†ä¿è¯æ‰€æœ‰è¢«æ”¾å…¥Queue的东西最终都能够到达å¦å¤–一端的进程,mp库注册了一个atexit的处ç†å‡½æ•°ï¼Œç”¨æ¥åœ¨è¿›ç¨‹é€€å‡ºçš„时候自动close并且join该feed线程。这个join动作带æ¥äº†å¾ˆå¤šé—®é¢˜ï¼Œæ¯”如潜在的æ»é”。考虑下é¢ä¸€ç§çŠ¶å†µï¼šä¸€ä¸ªçˆ¶è¿›ç¨‹åˆ›å»ºäº†ä¸¤ä¸ªå进程,一个å进程读,å¦ä¸€ä¸ªå进程写。当需è¦åœæ¢è¿™äº›è¿›ç¨‹çš„时候,父进程如果先把读进程结æŸï¼Œä½†æ˜¯åŒæ—¶å†™è¿›ç¨‹å·²ç»å°†å¤ªå¤šçš„对象写入Queue,导致åŽç»§çš„对象ç‰å¾…在buffer当ä¸ï¼Œåˆ™è¿™ä¸ªè¿›ç¨‹å°†æ— 法终æ¢ï¼Œå› 为atexit的处ç†å‡½æ•°ç‰å¾…把所有buffer当ä¸çš„对象放入Pipe,但是Pipeå·²ç»æ»¡äº†ï¼Œç„¶åŽé™·å…¥äº†æ»é”。
有人å¯èƒ½ä¼šé—®ï¼Œé‚£åªè¦ä¿è¯æ€»æ˜¯æŒ‰ç…§æ•°æ®æµçš„顺åºæ¥åœæ¢è¿›ç¨‹ä¸å°±è¡Œã€‚问题是在很多å¤æ‚的系统æµç¨‹å½“ä¸ï¼Œå¯èƒ½å˜åœ¨ä¸€ä¸ªçŽ¯å½¢çš„æ•°æ®æµï¼Œè¿™ç§æƒ…å†µä¸‹ï¼Œæ— è®ºæŒ‰ç…§ä»€ä¹ˆé¡ºåºåœæ¢è¿›ç¨‹ï¼Œç»ˆç©¶æœ‰ä¸€ä¸ªè¿›ç¨‹å¯èƒ½é™·å…¥è¿™ç§æƒ…景当ä¸ã€‚
幸è¿çš„是,Queue对象还æ供了一个æˆå‘˜å‡½æ•°cancel_join_thread,这个函数å¯ä»¥ä½¿å¾—在进程åœæ¢çš„时候ä¸è¿›è¡Œjoinæ“ä½œï¼Œè¿™æ ·å¯ä»¥é¿å…æ»é”,代价就是这个时候尚未刷新到Pipe当ä¸çš„对象都会丢失。鉴于å³ä½¿è°ƒç”¨äº†join_thread,残留在Pipe当ä¸çš„对象ä»ç„¶å¯èƒ½ä¸¢å¤±ï¼Œæ‰€ä»¥ä¸€æ—¦é€‰æ‹©ä½¿ç”¨mpçš„Queue对象,就ä¸è¦å‡è®¾ä¸ä¼šåœ¨æµç¨‹å½“ä¸ä¸¢å¯¹è±¡äº†ã€‚
å¦å¤–一个å¯èƒ½çš„方案是使用mp库当ä¸çš„SimpleQueue对象。这个对象在文档当ä¸æ²¡æœ‰æåŠï¼Œä½†æ˜¯åœ¨multiprocessing.queue模å—当ä¸æœ‰å®šä¹‰ã€‚这个对象就是去掉了bufferçš„Queueå¯¹è±¡ï¼Œå› æ¤å¯èƒ½èƒ½å¤Ÿé¿å…上é¢è¯´çš„问题的。但是SimpleQueue没有æä¾›putå’Œget的超时处ç†ï¼Œä¸¤ä¸ªåŠ¨ä½œéƒ½æ˜¯é˜»å¡žçš„。
除了使用multiprocessing.Queue,还å¯ä»¥ä½¿ç”¨multiprocessing.Pipe进行通信。mp.Pipe是Queue的底层结构,但是没有feed线程和put/get的超时控制。一定程度上和SimpleQueue很åƒã€‚需è¦æ³¨æ„的是Pipe带有一个å‚æ•° duplex,当设置为True(默认)的时候,Pipe并ä¸æ˜¯ä½¿ç”¨ç³»ç»Ÿçš„pipeæ¥å®žçŽ°ï¼Œè€Œæ˜¯é€šè¿‡socketpair,å³Unix Domain Socketæ¥å®žçŽ°ã€‚这个和pipe相比有些微的性能差异。
å¦å¤–一个使用Queueçš„æ–¹å¼ä¸æ˜¯mp库内置的。这ç§æ–¹å¼ä½¿ç”¨ä¸Šä¸€ç¯‡æ–‡ç« 当ä¸æ到的server processçš„æ–¹å¼æ¥å…±äº«ä¸€ä¸ªQueue对象。这个Queue对象实际上在server process当ä¸ï¼Œæ‰€æœ‰çš„å进程通过socket连接到server process获å–该Queue的代ç†å¯¹è±¡è¿›è¡Œæ“作。说到这有人会想起æ¥mp库有一个内置的SyncManager对象,å¯ä»¥é€šè¿‡multiprocess.Manager函数获å–到,通过该对象的Queue方法å¯ä»¥èŽ·å–一个Queue的代ç†å¯¹è±¡ã€‚ä¸å¹¸çš„是,这个方法ä¸æ˜¯æ£ç¡®çš„获å–Queueçš„æ–¹å¼ï¼ŒåŽŸå› æ£å¦‚ä¸Šä¸€ç¯‡æ–‡ç« æ‰€è¯´ï¼ŒSyncManager.Queue方法的æ¯æ¬¡è°ƒç”¨èŽ·å–到的是一个新建对象的代ç†å¯¹è±¡ï¼Œè€Œä¸æ˜¯ä¸€ä¸ªå…±äº«å¯¹è±¡ã€‚æ£ç¡®çš„使用server process当ä¸çš„Queueçš„æ–¹å¼æ˜¯ï¼š
å…±åŒéƒ¨åˆ†ï¼š
import multiprocessing.managers as mpm import Queue class SharedQueueManager(mpm.BaseManager): pass q = Queue.Queue() SharedQueueManager.register('Queue', lambda: q) |
æœåŠ¡è¿›ç¨‹ï¼š
mgr = SharedQueueManager(address=('', 12345)) server = mgr.get_server() server.serve_forever() |
客户进程:
mgr = SharedQueueManager(address=('localhost', 12345)) mgr.connect() q = mgr.Queue() # 这里q就是共享的Queue对象的代ç†å¯¹è±¡ |
è¿™ç§æ–¹å¼æ¯”èµ·mp库内置的Queue,有一些性能上的影å“ï¼Œå› ä¸ºæ¯•ç«Ÿç‰µæ¶‰åˆ°å¤šæ¬¡ç½‘ç»œé€šè®¯ï¼Œä½†æ˜¯å¸¦æ¥çš„好处是没有feed线程带æ¥çš„一系列问题,而且ç†è®ºä¸Šä¸ä¼šå˜åœ¨ä¸¢æ•°æ®çš„问题,除éžserver process崩溃。但是æ£å¦‚上一篇所说,server process本身就ä¸æ˜¯å¾ˆé è°±çš„ï¼Œå› æ¤è¿™é‡Œä¹Ÿåªæ˜¯â€œç†è®ºä¸Šâ€ä¸ä¼šä¸¢æ•°æ®è€Œå·²ã€‚
说到性能,这里就列两个性能数æ®ï¼Œä»¥å‰åœ¨twitter上é¢æåˆ°è¿‡çš„ï¼ˆè¿™ä¸¤ä¸ªè¿žæŽ¥æ— æ³•è®¿é—®çš„è¯·è”系我):
æ“作对象为 pickleåŽ512å—节的对象,通过proxyæ“作Queue的性能大约是7000次/秒(本机)或1100次/秒(多机),如果使用 multiprocessing.Queue,效率å¯è¾¾54000次/秒。