RabbitMQ作为一个工业级的消æ¯é˜Ÿåˆ—æœåŠ¡å™¨ï¼Œåœ¨å…¶å®¢æˆ·ç«¯æ‰‹å†Œåˆ—表的Python段当ä¸æŽ¨è了一篇blog,作为RabbitMQ+Python的入门手册å†åˆé€‚ä¸è¿‡äº†ã€‚ä¸è¿‡ï¼Œæ£å¦‚å…¶æ ‡é¢˜Rabbit and Warrens(兔åå’Œå…»å…”åœºï¼‰ä¸€æ ·ï¼Œè¿™ç¯‡è‹±æ–‡å†™çš„ç›¸å½“ä¿çš®ï¼Œä»¥è‡³äºŽå¯¹äºŽæˆ‘ç‰éžè‹±æ–‡è¯»è€…æ¥è¯´ä¸åƒä¸€èˆ¬çš„技术文档那么好懂,所以,翻译一下å§ã€‚翻译过了,希望其他人å¯ä»¥å°‘用一些时间。翻译水平有é™ï¼Œä¸å¯èƒ½åƒåŽŸæ–‡ä¸€æ ·ä¿çš®ï¼Œéƒ¨åˆ†åœ°æ–¹å¯èƒ½å°±æ„译了,希望以容易懂为准。想看看è€å¤–的幽默的,推è去看原文,其实,也ä¸æ˜¯é‚£ä¹ˆéš¾ç†è§£â€¦â€¦
原文:http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/
å…”å和兔åçª
当时我们的动机很简å•ï¼šä»Žç”Ÿäº§çŽ¯å¢ƒçš„电å邮件处ç†æµç¨‹å½“ä¸åˆ†æ”¯å‡ºä¸€ä¸ªç‰¹å®šçš„离线分æžæµç¨‹ã€‚我们开始用的MySQL,将è¦å¤„ç†çš„东西放在表里é¢ï¼Œå¦ä¸€ä¸ªç¨‹åºä»Žä¸å–。ä¸è¿‡å¾ˆå¿«ï¼Œè¿™ç§è®¾è®¡çš„丑陋之处就显现出æ¥äº†â€¦â€¦ ä½ æƒ³è¦å¤šä¸ªç¨‹åºä»Žä¸€ä¸ªé˜Ÿåˆ—当ä¸å–æ•°æ®æ¥å¤„ç†ï¼Ÿæ²¡é—®é¢˜ï¼Œæˆ‘们硬编ç 程åºçš„个数好了……什么?还è¦èƒ½å¤Ÿå…许程åºåŠ¨æ€åœ°å¢žåŠ å’Œå‡å°‘的时候动æ€è¿›è¡ŒåŽ‹åŠ›åˆ†é…?
是的,当年我们想的简å•çš„东西(åšä¸€ä¸ªåˆ†æ”¯å¤„ç†ï¼‰é€æ¸å˜æˆäº†ä¸€ä¸ªæ£˜æ‰‹çš„问题。以å‰æ‹¿ç€é”¤å(MySQL)看所有东西都是钉å(表)的年代是多么美好……
在æœç´¢äº†ä¸€ä¸‹ä¹‹åŽï¼Œæˆ‘们走进了消æ¯é˜Ÿåˆ—(message queue)的大门。ä¸ä¸ï¼Œæˆ‘们当然知é“消æ¯é˜Ÿåˆ—是什么,我们å¯æ˜¯ä»¥åšç”µå邮件程åºè°‹ç”Ÿçš„。我们实现过å„ç§å„æ ·çš„ä¸“ä¸šçš„ï¼Œé«˜é€Ÿçš„å†…å˜é˜Ÿåˆ—用æ¥åšç”µå邮件处ç†ã€‚我们ä¸çŸ¥é“的是那一大类现æˆçš„ã€é€šç”¨çš„消æ¯é˜Ÿåˆ—(MQ)æœåŠ¡å™¨â€”â€”æ— è®ºæ˜¯ç”¨ä»€ä¹ˆè¯è¨€å†™å‡ºçš„,ä¸éœ€è¦å¤æ‚的装é…的,å¯ä»¥è‡ªç„¶çš„在网络上的应用程åºä¹‹é—´ä¼ é€æ•°æ®çš„一类程åºã€‚ä¸ç”¨æˆ‘们自己写?看看å†è¯´ã€‚
è®©å¤§å®¶çœ‹çœ‹ä½ ä»¬çš„Queueå§â€¦â€¦
过去的4年里,人们写了有好多好多的开æºçš„MQæœåŠ¡å™¨å•Šã€‚å…¶ä¸å¤§å¤šæ•°éƒ½æ˜¯æŸå…¬å¸ä¾‹å¦‚LiveJournal写出æ¥ç”¨æ¥è§£å†³ç‰¹å®šé—®é¢˜çš„。它们的确ä¸å…³å¿ƒä¸Šé¢è·‘的是什么类型的消æ¯ï¼Œä¸è¿‡ä»–们的设计æ€æƒ³é€šå¸¸æ˜¯å’Œåˆ›å»ºè€…æ¯æ¯ç›¸å…³çš„(消æ¯çš„æŒä¹…化,崩溃æ¢å¤ç‰é€šå¸¸ä¸åœ¨ä»–们考虑范围内)。ä¸è¿‡ï¼Œæœ‰ä¸‰ä¸ªä¸“门设计用æ¥åšåŠå…¶çµæ´»çš„消æ¯é˜Ÿåˆ—的程åºå€¼å¾—关注:
Apache ActiveMQ æ›å…‰çŽ‡æœ€é«˜ï¼Œä¸è¿‡çœ‹èµ·æ¥å®ƒæœ‰äº›é—®é¢˜ï¼Œå¯èƒ½ä¼šé€ æˆä¸¢æ¶ˆæ¯ã€‚ä¸å¯æŽ¥å—,下一个。
ZeroMQ å’Œ RabbitMQ 都支æŒä¸€ä¸ªå¼€æºçš„消æ¯å议,æˆä¸ºAMQP。AMQP的一个优点是它是一个çµæ´»å’Œå¼€æ”¾çš„å议,以便和å¦å¤–两个商业化的Message Queue (IBMå’ŒTibco)竞争,很好。ä¸è¿‡ZeroMQä¸æ”¯æŒæ¶ˆæ¯æŒä¹…化和崩溃æ¢å¤ï¼Œä¸å¤ªå¥½ã€‚剩下的åªæœ‰RabbitMQäº†ã€‚å¦‚æžœä½ ä¸åœ¨æ„消æ¯æŒä¹…化和崩溃æ¢å¤ï¼Œè¯•è¯•ZeroMQå§ï¼Œå»¶è¿Ÿå¾ˆä½Žï¼Œè€Œä¸”支æŒçµæ´»çš„拓扑。
剩下的åªæœ‰è¿™ä¸ªåƒèƒ¡èåœçš„家伙了……
当我读到它是用Erlang写的时候,RabbitMQ震了我一下。Erlang 是爱立信开å‘的高度并行的è¯è¨€ï¼Œç”¨æ¥è·‘在电è¯äº¤æ¢æœºä¸Šã€‚是的,那些è¦æ±‚6个9的在线时间的东西。在Erlang当ä¸ï¼Œå……æ–¥ç€å¤§é‡è½»é‡è¿›ç¨‹ï¼Œå®ƒä»¬ä¹‹é—´ç”¨æ¶ˆæ¯ä¼ 递æ¥é€šä¿¡ã€‚å¬èµ·æ¥æ€è·¯å’Œæˆ‘们用消æ¯é˜Ÿåˆ—çš„æ€è·¯æ˜¯ä¸€æ ·çš„,ä¸æ˜¯ä¹ˆï¼Ÿ
而且,RabbitMQ支æŒæŒä¹…化。是的,如果RabbitMQæ»æŽ‰äº†ï¼Œæ¶ˆæ¯å¹¶ä¸ä¼šä¸¢å¤±ï¼Œå½“队列é‡å¯ï¼Œä¸€åˆ‡éƒ½ä¼šå›žæ¥ã€‚而且,æ£å¦‚在DigiTar(注:原文作者的公å¸ï¼‰åšäº‹æƒ…æœŸæœ›çš„é‚£æ ·ï¼Œå®ƒå¯ä»¥å’ŒPythonæ— ç¼ç»“åˆã€‚除æ¤ä¹‹å¤–,RabbitMQ的文档相当的……ææ€–ã€‚å¦‚æžœä½ æ‡‚AMQP,这些文档还好,但是有多少人懂AMQP?这些文档就åƒMySQL的文档å‡è®¾ä½ å·²ç»æ‡‚了SQLä¸€æ ·â€¦â€¦ä¸è¿‡æ²¡å…³ç³»å•¦ã€‚
好了,废è¯å°‘说。这里是花了一周时间阅读关于AMQP和关于它如何在RabbitMQ上工作的文档之åŽçš„一个总结,还有,怎么在Python当ä¸ä½¿ç”¨ã€‚
开始å§
AMQP当ä¸æœ‰å››ä¸ªæ¦‚念éžå¸¸é‡è¦ï¼šè™šæ‹Ÿä¸»æœºï¼ˆvirtual host),交æ¢æœºï¼ˆexchange),队列(queue)和绑定(binding)。一个虚拟主机æŒæœ‰ä¸€ç»„交æ¢æœºã€é˜Ÿåˆ—和绑定。为什么需è¦å¤šä¸ªè™šæ‹Ÿä¸»æœºå‘¢ï¼Ÿå¾ˆç®€å•ï¼ŒRabbitMQ当ä¸ï¼Œç”¨æˆ·åªèƒ½åœ¨è™šæ‹Ÿä¸»æœºçš„粒度进行æƒé™æŽ§åˆ¶ã€‚å› æ¤ï¼Œå¦‚果需è¦ç¦æ¢A组访问B组的交æ¢æœº/队列/绑定,必须为Aå’ŒB分别创建一个虚拟主机。æ¯ä¸€ä¸ªRabbitMQæœåŠ¡å™¨éƒ½æœ‰ä¸€ä¸ªé»˜è®¤çš„虚拟主机“/â€ã€‚如果这就够了,那现在就å¯ä»¥å¼€å§‹äº†ã€‚
交æ¢æœºï¼Œé˜Ÿåˆ—,还有绑定……天哪ï¼
刚开始我æ€ç»´çš„列车就是在这里脱轨的…… 这些鬼东西怎么结åˆèµ·æ¥çš„?
队列(Queuesï¼‰æ˜¯ä½ çš„æ¶ˆæ¯ï¼ˆmessages)的终点,å¯ä»¥ç†è§£æˆè£…消æ¯çš„容器。消æ¯å°±ä¸€ç›´åœ¨é‡Œé¢ï¼Œç›´åˆ°æœ‰å®¢æˆ·ç«¯ï¼ˆä¹Ÿå°±æ˜¯æ¶ˆè´¹è€…,Consumer)连接到这个队列并且将其å–走为æ¢ã€‚ä¸è¿‡ã€‚ä½ å¯ä»¥å°†ä¸€ä¸ªé˜Ÿåˆ—é…ç½®æˆè¿™æ ·çš„:一旦消æ¯è¿›å…¥è¿™ä¸ªé˜Ÿåˆ—,biu~,它就烟消云散了。这个有点跑题了……
需è¦è®°ä½çš„是,队列是由消费者(Consumer)通过程åºå»ºç«‹çš„,ä¸æ˜¯é€šè¿‡é…置文件或者命令行工具。这没什么问题,如果一个消费者试图创建一个已ç»å˜åœ¨çš„队列,RabbitMQ就会起æ¥æ‹æ‹ä»–的脑袋,笑一笑,然åŽå¿½ç•¥è¿™ä¸ªè¯·æ±‚ã€‚å› æ¤ä½ å¯ä»¥å°†æ¶ˆæ¯é˜Ÿåˆ—çš„é…置写在应用程åºçš„代ç 里é¢ã€‚这个概念ä¸é”™ã€‚
OKï¼Œä½ å·²ç»åˆ›å»ºå¹¶ä¸”è¿žæŽ¥åˆ°äº†ä½ çš„é˜Ÿåˆ—ï¼Œä½ çš„æ¶ˆè´¹è€…ç¨‹åºæ£åœ¨ç™¾æ— èŠèµ–的敲ç€æ‰‹æŒ‡ç‰å¾…消æ¯çš„到æ¥ï¼Œæ•²å•Šï¼Œæ•²å•Šâ€¦â€¦ 没有消æ¯ã€‚å‘ç”Ÿäº†ä»€ä¹ˆï¼Ÿä½ å½“ç„¶éœ€è¦å…ˆæŠŠä¸€ä¸ªæ¶ˆæ¯æ”¾è¿›é˜Ÿåˆ—æ‰è¡Œã€‚ä¸è¿‡è¦åšè¿™ä¸ªï¼Œä½ 需è¦ä¸€ä¸ªäº¤æ¢æœºï¼ˆExchange)……
交æ¢æœºå¯ä»¥ç†è§£æˆå…·æœ‰è·¯ç”±è¡¨çš„路由程åºï¼Œä»…æ¤è€Œå·²ã€‚æ¯ä¸ªæ¶ˆæ¯éƒ½æœ‰ä¸€ä¸ªç§°ä¸ºè·¯ç”±é”®ï¼ˆrouting key)的属性,就是一个简å•çš„å—符串。交æ¢æœºå½“ä¸æœ‰ä¸€ç³»åˆ—的绑定(binding),å³è·¯ç”±è§„则(routes),例如,指明具有路由键 “X†的消æ¯è¦åˆ°å为timbuku的队列当ä¸åŽ»ã€‚å…ˆä¸è®¨è®ºè¿™ä¸ªï¼Œæˆ‘们有点超å‰äº†ã€‚
ä½ çš„æ¶ˆè´¹è€…ç¨‹åºè¦è´Ÿè´£åˆ›å»ºä½ 的交æ¢æœºä»¬ï¼ˆå¤æ•°ï¼‰ã€‚å•¥ï¼Ÿä½ æ˜¯è¯´ä½ å¯ä»¥æœ‰å¤šä¸ªäº¤æ¢æœºï¼Ÿæ˜¯çš„,这个å¯ä»¥æœ‰ï¼Œä¸è¿‡ä¸ºå•¥ï¼Ÿå¾ˆç®€å•ï¼Œæ¯ä¸ªäº¤æ¢æœºåœ¨è‡ªå·±ç‹¬ç«‹çš„进程当ä¸æ‰§è¡Œï¼Œå› æ¤å¢žåŠ 多个交æ¢æœºå°±æ˜¯å¢žåŠ 多个进程,å¯ä»¥å……分利用æœåŠ¡å™¨ä¸Šçš„CPUæ ¸ä»¥ä¾¿è¾¾åˆ°æ›´é«˜çš„æ•ˆçŽ‡ã€‚ä¾‹å¦‚ï¼Œåœ¨ä¸€ä¸ª8æ ¸çš„æœåŠ¡å™¨ä¸Šï¼Œå¯ä»¥åˆ›å»º5个交æ¢æœºæ¥ç”¨5ä¸ªæ ¸ï¼Œå¦å¤–3ä¸ªæ ¸ç•™ä¸‹æ¥åšæ¶ˆæ¯å¤„ç†ã€‚类似的,在RabbitMQ的集群当ä¸ï¼Œä½ å¯ä»¥ç”¨ç±»ä¼¼çš„æ€è·¯æ¥æ‰©å±•äº¤æ¢æœºä¸€è¾¹èŽ·å–更高的åžåé‡ã€‚
OKï¼Œä½ å·²ç»åˆ›å»ºäº†ä¸€ä¸ªäº¤æ¢æœºã€‚但是他并ä¸çŸ¥é“è¦æŠŠæ¶ˆæ¯é€åˆ°å“ªä¸ªé˜Ÿåˆ—ã€‚ä½ éœ€è¦è·¯ç”±è§„则,å³ç»‘定(bindingï¼‰ã€‚ä¸€ä¸ªç»‘å®šå°±æ˜¯ä¸€ä¸ªç±»ä¼¼è¿™æ ·çš„è§„åˆ™ï¼šå°†äº¤æ¢æœºâ€œdesertï¼ˆæ²™æ¼ ï¼‰â€å½“ä¸å…·æœ‰è·¯ç”±é”®â€œé˜¿é‡Œå·´å·´â€çš„消æ¯é€åˆ°é˜Ÿåˆ—“hideout(山洞)â€é‡Œé¢åŽ»ã€‚æ¢å¥è¯è¯´ï¼Œä¸€ä¸ªç»‘定就是一个基于路由键将交æ¢æœºå’Œé˜Ÿåˆ—连接起æ¥çš„路由规则。例如,具有路由键“auditâ€çš„消æ¯éœ€è¦è¢«é€åˆ°ä¸¤ä¸ªé˜Ÿåˆ—,“log-foreverâ€å’Œâ€œalert-the-big-dudeâ€ã€‚è¦åšåˆ°è¿™ä¸ªï¼Œå°±éœ€è¦åˆ›å»ºä¸¤ä¸ªç»‘定,æ¯ä¸ªéƒ½è¿žæŽ¥ä¸€ä¸ªäº¤æ¢æœºå’Œä¸€ä¸ªé˜Ÿåˆ—,两者都是由“auditâ€è·¯ç”±é”®è§¦å‘。在这ç§æƒ…况下,交æ¢æœºä¼šå¤åˆ¶ä¸€ä»½æ¶ˆæ¯å¹¶ä¸”把它们分别å‘é€åˆ°ä¸¤ä¸ªé˜Ÿåˆ—当ä¸ã€‚交æ¢æœºä¸è¿‡å°±æ˜¯ä¸€ä¸ªç”±ç»‘定构æˆçš„路由表。
现在å¤æ‚的东西æ¥äº†ï¼šäº¤æ¢æœºæœ‰å¤šç§ç±»åž‹ã€‚他们都是åšè·¯ç”±çš„,ä¸è¿‡æŽ¥å—ä¸åŒç±»åž‹çš„绑定。为什么ä¸åˆ›å»ºä¸€ç§äº¤æ¢æœºæ¥å¤„ç†æ‰€æœ‰ç±»åž‹çš„è·¯ç”±è§„åˆ™å‘¢ï¼Ÿå› ä¸ºæ¯ç§è§„则用æ¥åšåŒ¹é…分åçš„CPU开销是ä¸åŒçš„。例如,一个“topicâ€ç±»åž‹çš„交æ¢æœºè¯•å›¾å°†æ¶ˆæ¯çš„路由键与类似“dogs.*â€çš„模å¼è¿›è¡ŒåŒ¹é…。匹é…è¿™ç§æœ«ç«¯çš„通é…符比直接将路由键与“dogsâ€æ¯”较(“directâ€ç±»åž‹çš„交æ¢æœºï¼‰è¦æ¶ˆè€—更多的CPUã€‚å¦‚æžœä½ ä¸éœ€è¦â€œtopicâ€ç±»åž‹çš„交æ¢æœºå¸¦æ¥çš„çµæ´»æ€§ï¼Œä½ å¯ä»¥é€šè¿‡ä½¿ç”¨â€œdirectâ€ç±»åž‹çš„交æ¢æœºèŽ·å–更高的处ç†æ•ˆçŽ‡ã€‚那么有哪些类型,他们åˆæ˜¯æ€Žä¹ˆå¤„ç†çš„呢?
Fanout Exchange – ä¸å¤„ç†è·¯ç”±é”®ã€‚ä½ åªéœ€è¦ç®€å•çš„将队列绑定到交æ¢æœºä¸Šã€‚一个å‘é€åˆ°äº¤æ¢æœºçš„消æ¯éƒ½ä¼šè¢«è½¬å‘到与该交æ¢æœºç»‘定的所有队列上。很åƒå网广æ’,æ¯å°å网内的主机都获得了一份å¤åˆ¶çš„消æ¯ã€‚Fanout交æ¢æœºè½¬å‘消æ¯æ˜¯æœ€å¿«çš„。
Direct Exchange – 处ç†è·¯ç”±é”®ã€‚需è¦å°†ä¸€ä¸ªé˜Ÿåˆ—绑定到交æ¢æœºä¸Šï¼Œè¦æ±‚该消æ¯ä¸Žä¸€ä¸ªç‰¹å®šçš„路由键完全匹é…。这是一个完整的匹é…。如果一个队列绑定到该交æ¢æœºä¸Šè¦æ±‚路由键 “dogâ€ï¼Œåˆ™åªæœ‰è¢«æ ‡è®°ä¸ºâ€œdogâ€çš„消æ¯æ‰è¢«è½¬å‘,ä¸ä¼šè½¬å‘dog.puppy,也ä¸ä¼šè½¬å‘dog.guard,åªä¼šè½¬å‘dog。
Topic Exchange – 将路由键和æŸæ¨¡å¼è¿›è¡ŒåŒ¹é…。æ¤æ—¶é˜Ÿåˆ—需è¦ç»‘定è¦ä¸€ä¸ªæ¨¡å¼ä¸Šã€‚符å·â€œ#â€åŒ¹é…一个或多个è¯ï¼Œç¬¦å·â€œ*â€åŒ¹é…ä¸å¤šä¸å°‘一个è¯ã€‚å› æ¤â€œaudit.#â€èƒ½å¤ŸåŒ¹é…到“audit.irs.corporateâ€ï¼Œä½†æ˜¯â€œaudit.*†åªä¼šåŒ¹é…到“audit.irsâ€ã€‚我在RedHat的朋å‹åšäº†ä¸€å¼ ä¸é”™çš„图,æ¥è¡¨æ˜Žtopic交æ¢æœºæ˜¯å¦‚何工作的:
Source: Red Hat Messaging Tutorial: 1.3 Topic Exchange
æŒä¹…化这些å°ä¸œè¥¿ä»¬
ä½ èŠ±äº†å¤§é‡çš„时间æ¥åˆ›å»ºé˜Ÿåˆ—ã€äº¤æ¢æœºå’Œç»‘定,然åŽï¼Œç °ï½žæœåŠ¡å™¨ç¨‹åºæŒ‚äº†ã€‚ä½ çš„é˜Ÿåˆ—ã€äº¤æ¢æœºå’Œç»‘å®šæ€Žä¹ˆæ ·äº†ï¼Ÿè¿˜æœ‰ï¼Œæ”¾åœ¨é˜Ÿåˆ—é‡Œé¢ä½†æ˜¯å°šæœªå¤„ç†çš„消æ¯ä»¬å‘¢ï¼Ÿ
放æ¾ï½žå¦‚æžœä½ æ˜¯ç”¨é»˜è®¤å‚æ•°æž„é€ çš„è¿™ä¸€åˆ‡çš„è¯ï¼Œé‚£ä¹ˆï¼Œä»–们,都,biu~,ç°é£žçƒŸç了。是的,RabbitMQé‡å¯ä¹‹åŽä¼šå¹²å‡€çš„åƒä¸ªæ–°ç”Ÿå„¿ã€‚ä½ å¿…é¡»é‡åšæ‰€æœ‰çš„一切,亡羊补牢,如何é¿å…å°†æ¥å†åº¦å‘生æ¤ç±»æ¯å…·ï¼Ÿ
队列和交æ¢æœºæœ‰ä¸€ä¸ªåˆ›å»ºæ—¶å€™æŒ‡å®šçš„æ ‡å¿—durable,直译å«åšåšå›ºçš„。durable的唯一å«ä¹‰å°±æ˜¯å…·æœ‰è¿™ä¸ªæ ‡å¿—的队列和交æ¢æœºä¼šåœ¨é‡å¯ä¹‹åŽé‡æ–°å»ºç«‹ï¼Œå®ƒä¸è¡¨ç¤ºè¯´åœ¨é˜Ÿåˆ—当ä¸çš„消æ¯ä¼šåœ¨é‡å¯åŽæ¢å¤ã€‚那么如何æ‰èƒ½åšåˆ°ä¸åªæ˜¯é˜Ÿåˆ—和交æ¢æœºï¼Œè¿˜æœ‰æ¶ˆæ¯éƒ½æ˜¯æŒä¹…的呢?
ä½†æ˜¯é¦–å…ˆä¸€ä¸ªé—®é¢˜æ˜¯ï¼Œä½ çœŸçš„éœ€è¦æ¶ˆæ¯æ˜¯æŒä¹…çš„å—?对于一个需è¦åœ¨é‡å¯ä¹‹åŽå›žå¤çš„消æ¯æ¥è¯´ï¼Œå®ƒéœ€è¦è¢«å†™å…¥åˆ°ç£ç›˜ä¸Šï¼Œè€Œå³ä½¿æ˜¯æœ€ç®€å•çš„ç£ç›˜æ“作也是è¦æ¶ˆè€—时间的。如果和消æ¯çš„å†…å®¹ç›¸æ¯”ï¼Œä½ æ›´çœ‹é‡çš„是消æ¯å¤„ç†çš„速度,那么ä¸è¦ä½¿ç”¨æŒä¹…化的消æ¯ã€‚ä¸è¿‡å¯¹äºŽæˆ‘们@DigiTaræ¥è¯´ï¼ŒæŒä¹…化很é‡è¦ã€‚
å½“ä½ å°†æ¶ˆæ¯å‘布到交æ¢æœºçš„时候,å¯ä»¥æŒ‡å®šä¸€ä¸ªæ ‡å¿—“Delivery Modeâ€ï¼ˆæŠ•é€’模å¼ï¼‰ã€‚æ ¹æ®ä½ 使用的AMQP的库ä¸åŒï¼ŒæŒ‡å®šè¿™ä¸ªæ ‡å¿—的方法å¯èƒ½ä¸å¤ªä¸€æ ·ï¼ˆæˆ‘们åŽé¢ä¼šè®¨è®ºå¦‚何用Pythonæžå®šï¼‰ã€‚简å•çš„说,就是将Delivery Mode设置æˆ2,也就是æŒä¹…的(persistent)å³å¯ã€‚一般的AMQP库都是将Delivery Mode设置æˆ1,也就是éžæŒä¹…的。所以è¦æŒä¹…化消æ¯çš„æ¥éª¤å¦‚下:
- 将交æ¢æœºè®¾æˆ durable。
- å°†é˜Ÿåˆ—è®¾æˆ durable。
- 将消æ¯çš„ Delivery Mode 设置æˆ2 。
å°±è¿™æ ·ï¼Œä¸æ˜¯å¾ˆå¤æ‚,起ç æ²¡æœ‰é€ ç«ç®å¤æ‚,ä¸è¿‡ä¹Ÿæœ‰å¯èƒ½çŠ¯ç‚¹å°é”™è¯¯ã€‚
下é¢è¿˜è¦ç½—嗦一个东西……绑定(Bindingsï¼‰æ€Žä¹ˆåŠžï¼Ÿæˆ‘ä»¬æ— æ³•åœ¨åˆ›å»ºç»‘å®šçš„æ—¶å€™è®¾ç½®æˆdurableã€‚æ²¡é—®é¢˜ï¼Œå¦‚æžœä½ ç»‘å®šäº†ä¸€ä¸ªdurable的队列和一个durable的交æ¢æœºï¼ŒRabbitMQ会自动ä¿ç•™è¿™ä¸ªç»‘å®šã€‚ç±»ä¼¼çš„ï¼Œå¦‚æžœåˆ é™¤äº†æŸä¸ªé˜Ÿåˆ—或交æ¢æœºï¼ˆæ— 论是ä¸æ˜¯durable),ä¾èµ–å®ƒçš„ç»‘å®šéƒ½ä¼šè‡ªåŠ¨åˆ é™¤ã€‚
注æ„两点:
- RabbitMQ ä¸å…è®¸ä½ ç»‘å®šä¸€ä¸ªéžåšå›ºï¼ˆnon-durable)的交æ¢æœºå’Œä¸€ä¸ªdurable的队列。å之亦然。è¦æƒ³æˆåŠŸå¿…须队列和交æ¢æœºéƒ½æ˜¯durable的。
- 一旦创建了队列和交æ¢æœºï¼Œå°±ä¸èƒ½ä¿®æ”¹å…¶æ ‡å¿—了。例如,如果创建了一个non-durable的队列,然åŽæƒ³æŠŠå®ƒæ”¹å˜æˆdurableçš„ï¼Œå”¯ä¸€çš„åŠžæ³•å°±æ˜¯åˆ é™¤è¿™ä¸ªé˜Ÿåˆ—ç„¶åŽé‡çŽ°åˆ›å»ºã€‚å› æ¤ï¼Œæœ€å¥½ä»”ç»†æ£€æŸ¥åˆ›å»ºçš„æ ‡å¿—ã€‚
开始喂蛇了~
ã€è¯‘æ³¨ã€‘è¯´å–‚è›‡æ˜¯å› ä¸ºPythonçš„å›¾æ ‡æ˜¯æ¡è›‡ã€‚
AMQP的一个空白地带是如何在Python当ä¸ä½¿ç”¨ã€‚对于其他è¯è¨€æœ‰ä¸€å¤§å¨æ料。
但是对Pythonè€å…„æ¥è¯´ï¼Œä½ 需è¦èŠ±ç‚¹æ—¶é—´æ¥æŒ–æŽ˜ä¸€ä¸‹ã€‚æ‰€ä»¥æˆ‘å†™äº†è¿™ä¸ªï¼Œè¿™æ ·åˆ«çš„å®¶ä¼™ä»¬å°±ä¸éœ€è¦ç»åŽ†æˆ‘è¿™ç§æŠ“狂的过程了。
首先,我们需è¦ä¸€ä¸ªPythonçš„AMQP库。有两个å¯é€‰ï¼š
æ ¹æ®ä½ 的需求,py-amqplib或者txAMQP都是å¯ä»¥çš„ã€‚å› ä¸ºæ˜¯åŸºäºŽTwisted的,txAMQPå¯ä»¥ä¿è¯ç”¨å¼‚æ¥IO构建超高性能的AMQP程åºã€‚但是Twistedç¼–ç¨‹æœ¬èº«å°±æ˜¯ä¸€ä¸ªå¾ˆå¤§çš„ä¸»é¢˜â€¦â€¦å› æ¤æ¸…æ™°èµ·è§ï¼Œæˆ‘们打算用 py-amqplib。更新:请å‚è§Esteve Fernandez关于txAMQP的使用和代ç æ ·ä¾‹çš„å›žå¤ã€‚
AMQP支æŒåœ¨ä¸€ä¸ªTCP连接上å¯ç”¨å¤šä¸ªMQ通信channel,æ¯ä¸ªchannel都å¯ä»¥è¢«åº”用作为通信æµã€‚æ¯ä¸ªAMQP程åºè‡³å°‘è¦æœ‰ä¸€ä¸ªè¿žæŽ¥å’Œä¸€ä¸ªchannel。
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel() |
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
æ¯ä¸ªchannel都被分é…äº†ä¸€ä¸ªæ•´æ•°æ ‡è¯†ï¼Œè‡ªåŠ¨ç”±Connection()类的.channel()æ–¹æ³•ç»´æŠ¤ã€‚æˆ–è€…ï¼Œä½ å¯ä»¥ä½¿ç”¨.channel(x)æ¥æŒ‡å®šchannelæ ‡è¯†ï¼Œå…¶ä¸xæ˜¯ä½ æƒ³è¦ä½¿ç”¨çš„channelæ ‡è¯†ã€‚é€šå¸¸æƒ…å†µä¸‹ï¼ŒæŽ¨è使用.channel()方法æ¥è‡ªåŠ¨åˆ†é…channelæ ‡è¯†ï¼Œä»¥ä¾¿é˜²æ¢å†²çªã€‚
现在我们已ç»æœ‰äº†ä¸€ä¸ªå¯ä»¥ç”¨çš„连接和channel。现在,我们的代ç 将分æˆä¸¤ä¸ªåº”用,生产者(producer)和消费者(consumer)。我们先创建一个消费者程åºï¼Œä»–会创建一个å«åšâ€œpo_boxâ€çš„队列和一个å«â€œsorting_roomâ€çš„交æ¢æœºï¼š
chan.queue_declare(queue="po_box", durable=True,
exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,
auto_delete=False,) |
chan.queue_declare(queue="po_box", durable=True,
exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,
auto_delete=False,)
这段代ç 干了啥?首先,它创建了一个åå«â€œpo_boxâ€çš„队列,它是durable的(é‡å¯ä¹‹åŽä¼šé‡æ–°å»ºç«‹ï¼‰ï¼Œå¹¶ä¸”最åŽä¸€ä¸ªæ¶ˆè´¹è€…æ–开的时候ä¸ä¼šè‡ªåŠ¨åˆ 除(auto_delete=False)。在创建durable的队列(或者交æ¢æœºï¼‰çš„时候,将auto_delete设置æˆfalse是很é‡è¦çš„,å¦åˆ™é˜Ÿåˆ—将会在最åŽä¸€ä¸ªæ¶ˆè´¹è€…æ–开的时候消失,与durable与å¦æ— 关。如果将durableå’Œauto_delete都设置æˆTrue,åªæœ‰å°šæœ‰æ¶ˆè´¹è€…活动的队列å¯ä»¥åœ¨RabbitMQæ„外崩溃的时候自动æ¢å¤ã€‚
ï¼ˆä½ å¯ä»¥æ³¨æ„到了å¦ä¸€ä¸ªæ ‡å¿—,称为“exclusiveâ€ã€‚如果设置æˆTrue,åªæœ‰åˆ›å»ºè¿™ä¸ªé˜Ÿåˆ—的消费者程åºæ‰å…许连接到该队列。这ç§é˜Ÿåˆ—对于这个消费者程åºæ˜¯ç§æœ‰çš„)。
还有å¦ä¸€ä¸ªäº¤æ¢æœºå£°æ˜Žï¼Œåˆ›å»ºäº†ä¸€ä¸ªåå—å«â€œsorting_roomâ€çš„交æ¢æœºã€‚auto_deleteå’Œdurableçš„å«ä¹‰å’Œé˜Ÿåˆ—æ˜¯ä¸€æ ·çš„ã€‚ä½†æ˜¯ï¼Œ.excange_declare() 还有å¦å¤–一个å‚æ•°å«åštype,用æ¥æŒ‡å®šè¦åˆ›å»ºçš„交æ¢æœºçš„类型(如å‰é¢åˆ—出的): fanout, direct å’Œ topic.
到æ¤ä¸ºæ¢ï¼Œä½ å·²ç»æœ‰äº†ä¸€ä¸ªå¯ä»¥æŽ¥æ”¶æ¶ˆæ¯çš„队列和一个å¯ä»¥å‘é€æ¶ˆæ¯çš„交æ¢æœºã€‚ä¸è¿‡æˆ‘们需è¦åˆ›å»ºä¸€ä¸ªç»‘定,把它们连接起æ¥ã€‚
chan.queue_bind(queue=”po_box”, exchange=”sorting_room”,
routing_key=”jason”)
这个绑定的过程éžå¸¸ç›´æŽ¥ã€‚任何é€åˆ°äº¤æ¢æœºâ€œsorting_roomâ€çš„具有路由键“jason†的消æ¯éƒ½è¢«è·¯ç”±åˆ°å为“po_box†的队列。
çŽ°åœ¨ï¼Œä½ æœ‰ä¸¤ç§æ–¹æ³•ä»Žé˜Ÿåˆ—当ä¸å–出消æ¯ã€‚第一个是调用chan.basic_get(),主动从队列当ä¸æ‹‰å‡ºä¸‹ä¸€ä¸ªæ¶ˆæ¯ï¼ˆå¦‚果队列当ä¸æ²¡æœ‰æ¶ˆæ¯ï¼Œchan.basic_get()会返回None, å› æ¤ä¸‹é¢ä»£ç 当ä¸print msg.body 会在没有消æ¯çš„时候崩掉):
msg = chan.basic_get("po_box")
print msg.body
chan.basic_ack(msg.delivery_tag) |
msg = chan.basic_get("po_box")
print msg.body
chan.basic_ack(msg.delivery_tag)
ä½†æ˜¯å¦‚æžœä½ æƒ³è¦åº”用程åºåœ¨æ¶ˆæ¯åˆ°è¾¾çš„时候立å³å¾—到通知怎么办?这ç§æƒ…况下ä¸èƒ½ä½¿ç”¨chan.basic_get()ï¼Œä½ éœ€è¦ç”¨chan.basic_consume()注册一个新消æ¯åˆ°è¾¾çš„回调。
def recv_callback(msg):
  print 'Received: ' + msg.body
chan.basic_consume(queue='po_box', no_ack=True,
callback=recv_callback, consumer_tag="testtag")
while True:
chan.wait()
chan.basic_cancel("testtag") |
def recv_callback(msg):
  print 'Received: ' + msg.body
chan.basic_consume(queue='po_box', no_ack=True,
callback=recv_callback, consumer_tag="testtag")
while True:
chan.wait()
chan.basic_cancel("testtag")
chan.wait() æ”¾åœ¨ä¸€ä¸ªæ— é™å¾ªçŽ¯é‡Œé¢ï¼Œè¿™ä¸ªå‡½æ•°ä¼šç‰å¾…在队列上,直到下一个消æ¯åˆ°è¾¾é˜Ÿåˆ—。chan.basic_cancel() 用æ¥æ³¨é”€è¯¥å›žè°ƒå‡½æ•°ã€‚å‚æ•°consumer_tag 当ä¸æŒ‡å®šçš„å—符串和chan.basic_consume() 注册的一直。在这个例å当ä¸chan.basic_cancel() ä¸ä¼šè¢«è°ƒç”¨åˆ°ï¼Œå› 为上é¢æ˜¯ä¸ªæ— é™å¾ªçŽ¯â€¦â€¦ ä¸è¿‡ä½ 需è¦çŸ¥é“这个调用,所以我把它放在了代ç 里。
需è¦æ³¨æ„çš„å¦ä¸€ä¸ªä¸œè¥¿æ˜¯no_ackå‚数。这个å‚æ•°å¯ä»¥ä¼ ç»™chan.basic_get()å’Œchan.basic_consume(),默认是false。当从队列当ä¸å–出一个消æ¯çš„时候,RabbitMQ需è¦åº”用显å¼åœ°å›žé¦ˆè¯´å·²ç»èŽ·å–到了该消æ¯ã€‚如果一段时间内ä¸å›žé¦ˆï¼ŒRabbitMQ会将该消æ¯é‡æ–°åˆ†é…ç»™å¦å¤–一个绑定在该队列上的消费者。å¦ä¸€ç§æƒ…况是消费者æ–开连接,但是获å–到的消æ¯æ²¡æœ‰å›žé¦ˆï¼Œåˆ™RabbitMQåŒæ ·é‡æ–°åˆ†é…。如果将no_ack å‚数设置为true,则py-amqplib会为下一个AMQPè¯·æ±‚æ·»åŠ ä¸€ä¸ªno_ack属性,告诉AMQPæœåŠ¡å™¨ä¸éœ€è¦ç‰å¾…å›žé¦ˆã€‚ä½†æ˜¯ï¼Œå¤§å¤šæ•°æ—¶å€™ï¼Œä½ ä¹Ÿè®¸æƒ³è¦è‡ªå·±æ‰‹å·¥å‘é€å›žé¦ˆï¼Œä¾‹å¦‚,需è¦åœ¨å›žé¦ˆä¹‹å‰å°†æ¶ˆæ¯å˜å…¥æ•°æ®åº“。回馈通常是通过调用chan.basic_ack()方法,使用消æ¯çš„delivery_tag属性作为å‚数。å‚è§chan.basic_get() 的实例代ç 。
好了,这就是消费者的全部代ç 。(下载:amqp_consumer.py)
ä¸è¿‡æ²¡æœ‰äººå‘é€æ¶ˆæ¯çš„è¯ï¼Œè¦æ¶ˆè´¹è€…何用?所以需è¦ä¸€ä¸ªç”Ÿäº§è€…。下é¢çš„代ç 示例表明如何将一个简å•æ¶ˆæ¯å‘é€åˆ°äº¤æ¢åŒºâ€œsorting_roomâ€ï¼Œå¹¶ä¸”æ ‡è®°ä¸ºè·¯ç”±é”®â€œjason†:
msg = amqp.Message("Test message!")
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg,exchange="sorting_room",routing_key="jason") |
msg = amqp.Message("Test message!")
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
ä½ ä¹Ÿè®¸æ³¨æ„到我们设置消æ¯çš„delivery_mode属性为2ï¼Œå› ä¸ºé˜Ÿåˆ—å’Œäº¤æ¢æœºéƒ½è®¾ç½®ä¸ºdurable的,这个设置将ä¿è¯æ¶ˆæ¯èƒ½å¤ŸæŒä¹…化,也就是说,当它还没有é€è¾¾æ¶ˆè´¹è€…之å‰å¦‚æžœRabbitMQé‡å¯åˆ™å®ƒèƒ½å¤Ÿè¢«æ¢å¤ã€‚
剩下的最åŽä¸€ä»¶äº‹æƒ…(生产者和消费者都需è¦è°ƒç”¨çš„)是关é—channel和连接:
chan.close()
conn.close() |
chan.close()
conn.close()
很简å•å§ã€‚(下载:amqp_publisher.py)
æ¥çœŸå®žåœ°è·‘一下å§â€¦â€¦
现在我们已ç»å†™å¥½äº†ç”Ÿäº§è€…和消费者,让他们跑起æ¥å§ã€‚å‡è®¾ä½ çš„RabbitMQ在localhost上安装并且è¿è¡Œã€‚
打开一个终端,执行python ./amqp_consumer.py让消费者è¿è¡Œï¼Œå¹¶ä¸”创建队列ã€äº¤æ¢æœºå’Œç»‘定。
然åŽåœ¨å¦ä¸€ä¸ªç»ˆç«¯è¿è¡Œpython ./amqp_publisher.py “AMQP rocks.â€ ã€‚å¦‚æžœä¸€åˆ‡è‰¯å¥½ï¼Œä½ åº”è¯¥èƒ½å¤Ÿåœ¨ç¬¬ä¸€ä¸ªç»ˆç«¯çœ‹åˆ°è¾“å‡ºçš„æ¶ˆæ¯ã€‚
付诸使用å§
我知é“这个教程是éžå¸¸ç²—浅的关于AMQP/RabbitMQ和如何使用Python访问的教程。希望这个å¯ä»¥è¯´æ˜Žæ‰€æœ‰çš„概念如何在Python当ä¸è¢«ç»„åˆèµ·æ¥ã€‚å¦‚æžœä½ å‘现任何错误,请è”系原作者(williamsjj@digitar.com) ã€è¯‘注:如果是翻译问题请è”系译者】。åŒæ—¶ï¼Œæˆ‘很高兴回ç”我知é“的问题。ã€è¯‘æ³¨ï¼šè¯‘è€…ä¹Ÿæ˜¯ä¸€æ ·çš„ã€‘ã€‚æŽ¥ä¸‹æ¥æ˜¯ï¼Œé›†ç¾¤åŒ–(clustering)ï¼ä¸è¿‡æˆ‘需è¦å…ˆæŠŠå®ƒå¼„懂å†è¯´ã€‚
注:关于RabbitMQ的知识我主è¦æ¥è‡ªè¿™äº›æ¥æºï¼ŒæŽ¨è阅读:
–完–