存档

文章标签 ‘python’

[翻译] [RabbitMQ+Python入门经典] 兔子和兔子窝

2010年3月14日 16 条评论

RabbitMQ作为一个工业级的消息队列服务器,在其客户端手册列表的Python段当中推荐了一篇blog,作为RabbitMQ+Python的入门手册再合适不过了。不过,正如其标题Rabbit and Warrens(兔子和养兔场)一样,这篇英文写的相当俏皮,以至于对于我等非英文读者来说不像一般的技术文档那么好懂,所以,翻译一下吧。翻译过了,希望其他人可以少用一些时间。翻译水平有限,不可能像原文一样俏皮,部分地方可能就意译了,希望以容易懂为准。想看看老外的幽默的,推荐去看原文,其实,也不是那么难理解……

原文:http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/

兔子和兔子窝

当时我们的动机很简单:从生产环境的电子邮件处理流程当中分支出一个特定的离线分析流程。我们开始用的MySQL,将要处理的东西放在表里面,另一个程序从中取。不过很快,这种设计的丑陋之处就显现出来了…… 你想要多个程序从一个队列当中取数据来处理?没问题,我们硬编码程序的个数好了……什么?还要能够允许程序动态地增加和减少的时候动态进行压力分配?

是的,当年我们想的简单的东西(做一个分支处理)逐渐变成了一个棘手的问题。以前拿着锤子(MySQL)看所有东西都是钉子(表)的年代是多么美好……

在搜索了一下之后,我们走进了消息队列(message queue)的大门。不不,我们当然知道消息队列是什么,我们可是以做电子邮件程序谋生的。我们实现过各种各样的专业的,高速的内存队列用来做电子邮件处理。我们不知道的是那一大类现成的、通用的消息队列(MQ)服务器——无论是用什么语言写出的,不需要复杂的装配的,可以自然的在网络上的应用程序之间传送数据的一类程序。不用我们自己写?看看再说。

让大家看看你们的Queue吧……

过去的4年里,人们写了有好多好多的开源的MQ服务器啊。其中大多数都是某公司例如LiveJournal写出来用来解决特定问题的。它们的确不关心上面跑的是什么类型的消息,不过他们的设计思想通常是和创建者息息相关的(消息的持久化,崩溃恢复等通常不在他们考虑范围内)。不过,有三个专门设计用来做及其灵活的消息队列的程序值得关注:

Apache ActiveMQ 曝光率最高,不过看起来它有些问题,可能会造成丢消息。不可接受,下一个。

ZeroMQ 和 RabbitMQ 都支持一个开源的消息协议,成为AMQP。AMQP的一个优点是它是一个灵活和开放的协议,以便和另外两个商业化的Message Queue (IBM和Tibco)竞争,很好。不过ZeroMQ不支持消息持久化和崩溃恢复,不太好。剩下的只有RabbitMQ了。如果你不在意消息持久化和崩溃恢复,试试ZeroMQ吧,延迟很低,而且支持灵活的拓扑。

剩下的只有这个吃胡萝卜的家伙了……

当我读到它是用Erlang写的时候,RabbitMQ震了我一下。Erlang 是爱立信开发的高度并行的语言,用来跑在电话交换机上。是的,那些要求6个9的在线时间的东西。在Erlang当中,充斥着大量轻量进程,它们之间用消息传递来通信。听起来思路和我们用消息队列的思路是一样的,不是么?

而且,RabbitMQ支持持久化。是的,如果RabbitMQ死掉了,消息并不会丢失,当队列重启,一切都会回来。而且,正如在DigiTar(注:原文作者的公司)做事情期望的那样,它可以和Python无缝结合。除此之外,RabbitMQ的文档相当的……恐怖。如果你懂AMQP,这些文档还好,但是有多少人懂AMQP?这些文档就像MySQL的文档假设你已经懂了SQL一样……不过没关系啦。

好了,废话少说。这里是花了一周时间阅读关于AMQP和关于它如何在RabbitMQ上工作的文档之后的一个总结,还有,怎么在Python当中使用。

开始吧

AMQP当中有四个概念非常重要:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。如果这就够了,那现在就可以开始了。

交换机,队列,还有绑定……天哪!

刚开始我思维的列车就是在这里脱轨的…… 这些鬼东西怎么结合起来的?

队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。不过。你可以将一个队列配置成这样的:一旦消息进入这个队列,biu~,它就烟消云散了。这个有点跑题了……

需要记住的是,队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。这没什么问题,如果一个消费者试图创建一个已经存在的队列,RabbitMQ就会起来拍拍他的脑袋,笑一笑,然后忽略这个请求。因此你可以将消息队列的配置写在应用程序的代码里面。这个概念不错。

OK,你已经创建并且连接到了你的队列,你的消费者程序正在百无聊赖的敲着手指等待消息的到来,敲啊,敲啊…… 没有消息。发生了什么?你当然需要先把一个消息放进队列才行。不过要做这个,你需要一个交换机(Exchange)……

交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes),例如,指明具有路由键 “X” 的消息要到名为timbuku的队列当中去。先不讨论这个,我们有点超前了。

你的消费者程序要负责创建你的交换机们(复数)。啥?你是说你可以有多个交换机?是的,这个可以有,不过为啥?很简单,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,可以创建5个交换机来用5个核,另外3个核留下来做消息处理。类似的,在RabbitMQ的集群当中,你可以用类似的思路来扩展交换机一边获取更高的吞吐量。

OK,你已经创建了一个交换机。但是他并不知道要把消息送到哪个队列。你需要路由规则,即绑定(binding)。一个绑定就是一个类似这样的规则:将交换机“desert(沙漠)”当中具有路由键“阿里巴巴”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。例如,具有路由键“audit”的消息需要被送到两个队列,“log-forever”和“alert-the-big-dude”。要做到这个,就需要创建两个绑定,每个都连接一个交换机和一个队列,两者都是由“audit”路由键触发。在这种情况下,交换机会复制一份消息并且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。

现在复杂的东西来了:交换机有多种类型。他们都是做路由的,不过接受不同类型的绑定。为什么不创建一种交换机来处理所有类型的路由规则呢?因为每种规则用来做匹配分子的CPU开销是不同的。例如,一个“topic”类型的交换机试图将消息的路由键与类似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将路由键与“dogs”比较(“direct”类型的交换机)要消耗更多的CPU。如果你不需要“topic”类型的交换机带来的灵活性,你可以通过使用“direct”类型的交换机获取更高的处理效率。那么有哪些类型,他们又是怎么处理的呢?

Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。

Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。我在RedHat的朋友做了一张不错的图,来表明topic交换机是如何工作的:

Source: Red Hat Messaging Tutorial: 1.3 Topic Exchange

持久化这些小东西们

你花了大量的时间来创建队列、交换机和绑定,然后,砰~服务器程序挂了。你的队列、交换机和绑定怎么样了?还有,放在队列里面但是尚未处理的消息们呢?

放松~如果你是用默认参数构造的这一切的话,那么,他们,都,biu~,灰飞烟灭了。是的,RabbitMQ重启之后会干净的像个新生儿。你必须重做所有的一切,亡羊补牢,如何避免将来再度发生此类杯具?

队列和交换机有一个创建时候指定的标志durable,直译叫做坚固的。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。那么如何才能做到不只是队列和交换机,还有消息都是持久的呢?

但是首先一个问题是,你真的需要消息是持久的吗?对于一个需要在重启之后回复的消息来说,它需要被写入到磁盘上,而即使是最简单的磁盘操作也是要消耗时间的。如果和消息的内容相比,你更看重的是消息处理的速度,那么不要使用持久化的消息。不过对于我们@DigiTar来说,持久化很重要。

当你将消息发布到交换机的时候,可以指定一个标志“Delivery Mode”(投递模式)。根据你使用的AMQP的库不同,指定这个标志的方法可能不太一样(我们后面会讨论如何用Python搞定)。简单的说,就是将Delivery Mode设置成2,也就是持久的(persistent)即可。一般的AMQP库都是将Delivery Mode设置成1,也就是非持久的。所以要持久化消息的步骤如下:

  1. 将交换机设成 durable。
  2. 将队列设成 durable。
  3. 将消息的 Delivery Mode 设置成2 。

就这样,不是很复杂,起码没有造火箭复杂,不过也有可能犯点小错误。

下面还要罗嗦一个东西……绑定(Bindings)怎么办?我们无法在创建绑定的时候设置成durable。没问题,如果你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是durable),依赖它的绑定都会自动删除。

注意两点:

  • RabbitMQ 不允许你绑定一个非坚固(non-durable)的交换机和一个durable的队列。反之亦然。要想成功必须队列和交换机都是durable的。
  • 一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。因此,最好仔细检查创建的标志。

开始喂蛇了~

【译注】说喂蛇是因为Python的图标是条蛇。

AMQP的一个空白地带是如何在Python当中使用。对于其他语言有一大坨材料。

但是对Python老兄来说,你需要花点时间来挖掘一下。所以我写了这个,这样别的家伙们就不需要经历我这种抓狂的过程了。

首先,我们需要一个Python的AMQP库。有两个可选:

  • py-amqplib – 通用的AMQP
  • txAMQP – 使用 Twisted 框架的AMQP库,因此允许异步I/O。

根据你的需求,py-amqplib或者txAMQP都是可以的。因为是基于Twisted的,txAMQP可以保证用异步IO构建超高性能的AMQP程序。但是Twisted编程本身就是一个很大的主题……因此清晰起见,我们打算用 py-amqplib。更新:请参见Esteve Fernandez关于txAMQP的使用和代码样例的回复。

AMQP支持在一个TCP连接上启用多个MQ通信channel,每个channel都可以被应用作为通信流。每个AMQP程序至少要有一个连接和一个channel。

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()

每个channel都被分配了一个整数标识,自动由Connection()类的.channel()方法维护。或者,你可以使用.channel(x)来指定channel标识,其中x是你想要使用的channel标识。通常情况下,推荐使用.channel()方法来自动分配channel标识,以便防止冲突。

现在我们已经有了一个可以用的连接和channel。现在,我们的代码将分成两个应用,生产者(producer)和消费者(consumer)。我们先创建一个消费者程序,他会创建一个叫做“po_box”的队列和一个叫“sorting_room”的交换机:

chan.queue_declare(queue="po_box", durable=True,
exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,
auto_delete=False,)

这段代码干了啥?首先,它创建了一个名叫“po_box”的队列,它是durable的(重启之后会重新建立),并且最后一个消费者断开的时候不会自动删除(auto_delete=False)。在创建durable的队列(或者交换机)的时候,将auto_delete设置成false是很重要的,否则队列将会在最后一个消费者断开的时候消失,与durable与否无关。如果将durable和auto_delete都设置成True,只有尚有消费者活动的队列可以在RabbitMQ意外崩溃的时候自动恢复。

(你可以注意到了另一个标志,称为“exclusive”。如果设置成True,只有创建这个队列的消费者程序才允许连接到该队列。这种队列对于这个消费者程序是私有的)。

还有另一个交换机声明,创建了一个名字叫“sorting_room”的交换机。auto_delete和durable的含义和队列是一样的。但是,.excange_declare() 还有另外一个参数叫做type,用来指定要创建的交换机的类型(如前面列出的): fanout, direct 和 topic.

到此为止,你已经有了一个可以接收消息的队列和一个可以发送消息的交换机。不过我们需要创建一个绑定,把它们连接起来。

chan.queue_bind(queue=”po_box”, exchange=”sorting_room”,
routing_key=”jason”)

这个绑定的过程非常直接。任何送到交换机“sorting_room”的具有路由键“jason” 的消息都被路由到名为“po_box” 的队列。

现在,你有两种方法从队列当中取出消息。第一个是调用chan.basic_get(),主动从队列当中拉出下一个消息(如果队列当中没有消息,chan.basic_get()会返回None, 因此下面代码当中print msg.body 会在没有消息的时候崩掉):

msg = chan.basic_get("po_box")
print msg.body
chan.basic_ack(msg.delivery_tag)

但是如果你想要应用程序在消息到达的时候立即得到通知怎么办?这种情况下不能使用chan.basic_get(),你需要用chan.basic_consume()注册一个新消息到达的回调。

def recv_callback(msg):
    print 'Received: ' + msg.body
chan.basic_consume(queue='po_box', no_ack=True,
callback=recv_callback, consumer_tag="testtag")
while True:
    chan.wait()
chan.basic_cancel("testtag")

chan.wait() 放在一个无限循环里面,这个函数会等待在队列上,直到下一个消息到达队列。chan.basic_cancel() 用来注销该回调函数。参数consumer_tag 当中指定的字符串和chan.basic_consume() 注册的一直。在这个例子当中chan.basic_cancel() 不会被调用到,因为上面是个无限循环…… 不过你需要知道这个调用,所以我把它放在了代码里。

需要注意的另一个东西是no_ack参数。这个参数可以传给chan.basic_get()和chan.basic_consume(),默认是false。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将no_ack 参数设置为true,则py-amqplib会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。但是,大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用chan.basic_ack()方法,使用消息的delivery_tag属性作为参数。参见chan.basic_get() 的实例代码。

好了,这就是消费者的全部代码。(下载:amqp_consumer.py)

不过没有人发送消息的话,要消费者何用?所以需要一个生产者。下面的代码示例表明如何将一个简单消息发送到交换区“sorting_room”,并且标记为路由键“jason” :

msg = amqp.Message("Test message!")
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")

你也许注意到我们设置消息的delivery_mode属性为2,因为队列和交换机都设置为durable的,这个设置将保证消息能够持久化,也就是说,当它还没有送达消费者之前如果RabbitMQ重启则它能够被恢复。

剩下的最后一件事情(生产者和消费者都需要调用的)是关闭channel和连接:

chan.close()
conn.close()

很简单吧。(下载:amqp_publisher.py)

来真实地跑一下吧……

现在我们已经写好了生产者和消费者,让他们跑起来吧。假设你的RabbitMQ在localhost上安装并且运行。

打开一个终端,执行python ./amqp_consumer.py让消费者运行,并且创建队列、交换机和绑定。

然后在另一个终端运行python ./amqp_publisher.py “AMQP rocks.” 。如果一切良好,你应该能够在第一个终端看到输出的消息。

付诸使用吧

我知道这个教程是非常粗浅的关于AMQP/RabbitMQ和如何使用Python访问的教程。希望这个可以说明所有的概念如何在Python当中被组合起来。如果你发现任何错误,请联系原作者(williamsjj@digitar.com) 【译注:如果是翻译问题请联系译者】。同时,我很高兴回答我知道的问题。【译注:译者也是一样的】。接下来是,集群化(clustering)!不过我需要先把它弄懂再说。

注:关于RabbitMQ的知识我主要来自这些来源,推荐阅读:

–完–

Python multiprocessing 使用手记[3] – 关于Queue

2009年12月13日 没有评论

继续讨论Python multiprocessing,这次讨论的主要内容是mp库的核心组件之一的Queue。

Queue是mp库当中用来提供多进程对象交换的方式。对象交换和上一部分当中提到的对象共享都是使多个进程访问同一个对象的方式,两者的区别就是,对象共享是多个进程访问同一个对象,对象交换则是将对象从一个进程传输的另一个进程。

multiprocessing当中的Queue使用方式和Python内置的threading.Queue对象很像,它支持一个put操作,将对象放入Queue,也支持一个get操作,将对象从Queue当中读出。和threading.Queue不同的是,mp.Queue默认不支持join()和task_done操作,这两个支持需要使用mp.JoinableQueue对象。

由于Queue对象负责进程之间的对象传输,因此第一个问题就是如何在两个进程之间共享这个Queue对象本身。在上一部分所言的三种共享方式当中,Queue对象只能使用继承(inheritance)的方式共享。这是因为Queue本身基于unix的Pipe对象实现,而Pipe对象的共享需要通过继承。因此,在一个典型的应用实现模型当中,应该是父进程创建Queue,然后创建子进程共享该Queue,由父进程和子进程分别读写。例如下面的这个例子:

import multiprocessing
 
q = multiprocessing.Queue()
 
def reader_proc():
    print q.get()
 
reader = multiprocessing.Process(target=reader_proc)
reader.start()
 
q.put(100)
reader.join()

另一种实现方式是父进程创建Queue,创建多个子进程,有的子进程读Queue,有的子进程写Queue,例如:

import multiprocessing
 
q = multiprocessing.Queue()
 
def writer_proc():
    q.put(100)
 
def reader_proc():
    print q.get()
 
reader = multiprocessing.Process(target=reader_proc)
reader.start()
writer = multiprocessing.Process(target=writer_proc)
writer.start()
 
reader.join()
writer.join()

由于使用继承的方式共享Queue,因此代码当中并没有明显的传输Queue对象本身的代码,看起来似乎只要将multiprocessing当中的对象换成threading当中的对象,程序仍然能够工作。反之,拿到一个现有的多线程程序,是不是将threading改成multiprocessing就可以工作呢?也许可以,但是更可能的情况是你会遇到很多问题。

第一个问题就是mp的Queue需要考虑多进程之间的对象传输,因此所传输的对象必须是可以pickle的。否则,在Queue的put操作上会抛出PicklingError。

其他的一些差异表现在一些技术细节上,这些不是任何高层逻辑可以抽象掉的,不知道这些差异会导致一些潜在的错误,例如死锁。在总结这些潜在的犯错的可能的同时,我们会简单看一下mp当中Queue的实现方式,以便能够方便的理解为什么会有这样的行为。这些实现问题仅仅针对Linux,Windows上面的实现和出现的问题在这里不涉及。

mp.Queue建构在系统的Pipe之上,但是实际上进程并不是直接将对象写入到Pipe里面,而是先写入一个本地的buffer,再由一个专门的feed线程将其放入Pipe当中。读取端则是直接从Pipe当中读出对象。之所以有这样一个feed线程,是为了能够提供Queue接口函数所需要的put的超时控制。但是由于这个feed线程的存在,mp.Queue提供了几个额外的函数来控制它,一个函数close来停止该线程,以及join_thread来join该线程。close同时负责把所有在buffer当中的对象刷新到Pipe当中。

但是这个feed线程也是个麻烦制造者,为了保证所有被放入Queue的东西最终都能够到达另外一端的进程,mp库注册了一个atexit的处理函数,用来在进程退出的时候自动close并且join该feed线程。这个join动作带来了很多问题,比如潜在的死锁。考虑下面一种状况:一个父进程创建了两个子进程,一个子进程读,另一个子进程写。当需要停止这些进程的时候,父进程如果先把读进程结束,但是同时写进程已经将太多的对象写入Queue,导致后继的对象等待在buffer当中,则这个进程将无法终止,因为atexit的处理函数等待把所有buffer当中的对象放入Pipe,但是Pipe已经满了,然后陷入了死锁。

有人可能会问,那只要保证总是按照数据流的顺序来停止进程不就行。问题是在很多复杂的系统流程当中,可能存在一个环形的数据流,这种情况下,无论按照什么顺序停止进程,终究有一个进程可能陷入这种情景当中。

幸运的是,Queue对象还提供了一个成员函数cancel_join_thread,这个函数可以使得在进程停止的时候不进行join操作,这样可以避免死锁,代价就是这个时候尚未刷新到Pipe当中的对象都会丢失。鉴于即使调用了join_thread,残留在Pipe当中的对象仍然可能丢失,所以一旦选择使用mp的Queue对象,就不要假设不会在流程当中丢对象了。

另外一个可能的方案是使用mp库当中的SimpleQueue对象。这个对象在文档当中没有提及,但是在multiprocessing.queue模块当中有定义。这个对象就是去掉了buffer的Queue对象,因此可能能够避免上面说的问题的。但是SimpleQueue没有提供put和get的超时处理,两个动作都是阻塞的。

除了使用multiprocessing.Queue,还可以使用multiprocessing.Pipe进行通信。mp.Pipe是Queue的底层结构,但是没有feed线程和put/get的超时控制。一定程度上和SimpleQueue很像。需要注意的是Pipe带有一个参数 duplex,当设置为True(默认)的时候,Pipe并不是使用系统的pipe来实现,而是通过socketpair,即Unix Domain Socket来实现。这个和pipe相比有些微的性能差异。

另外一个使用Queue的方式不是mp库内置的。这种方式使用上一篇文章当中提到的server process的方式来共享一个Queue对象。这个Queue对象实际上在server process当中,所有的子进程通过socket连接到server process获取该Queue的代理对象进行操作。说到这有人会想起来mp库有一个内置的SyncManager对象,可以通过multiprocess.Manager函数获取到,通过该对象的Queue方法可以获取一个Queue的代理对象。不幸的是,这个方法不是正确的获取Queue的方式,原因正如上一篇文章所说,SyncManager.Queue方法的每次调用获取到的是一个新建对象的代理对象,而不是一个共享对象。正确的使用server process当中的Queue的方式是:

共同部分:

import multiprocessing.managers as mpm
import Queue
 
class SharedQueueManager(mpm.BaseManager): pass
q = Queue.Queue()
SharedQueueManager.register('Queue', lambda: q)

服务进程:

mgr = SharedQueueManager(address=('', 12345))
server = mgr.get_server()
server.serve_forever()

客户进程:

mgr = SharedQueueManager(address=('localhost', 12345))
mgr.connect()
q = mgr.Queue() # 这里q就是共享的Queue对象的代理对象

这种方式比起mp库内置的Queue,有一些性能上的影响,因为毕竟牵涉到多次网络通讯,但是带来的好处是没有feed线程带来的一系列问题,而且理论上不会存在丢数据的问题,除非server process崩溃。但是正如上一篇所说,server process本身就不是很靠谱的,因此这里也只是“理论上”不会丢数据而已。

说到性能,这里就列两个性能数据,以前在twitter上面提到过的(这两个连接无法访问的请联系我):

操作对象为 pickle后512字节的对象,通过proxy操作Queue的性能大约是7000次/秒(本机)或1100次/秒(多机),如果使用 multiprocessing.Queue,效率可达54000次/秒。

Python multiprocessing 使用手记[2] – 跨进程对象共享

2009年12月7日 没有评论

继续写关于Python multiprocessing的使用手记,继上次的进程模型之后,这次展开讨论一下multiprocessing当中的跨进程对象共享的问题。

在mp库当中,跨进程对象共享有三种方式,第一种仅适用于原生机器类型,即python.ctypes当中的类型,这种在mp库的文档当中称为shared memory方式,即通过共享内存共享对象;另外一种称之为server process,即有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象;最后一种在mp文档当中没有单独提出,但是在其中多次提到,而且是mp库当中最重要的一种共享方式,称为inheritance,即继承,对象在父进程当中创建,然后在父进程是通过multiprocessing.Process创建子进程之后,子进程自动继承了父进程当中的对象,并且子进程对这些对象的操作都是反映到了同一个对象。

这三者共享方式各有特色,在这里进行一些简单的比较。

首先是共享方式所应对的对象类型,看这个表:

共享方式 支持的类型
Shared memory ctypes当中的类型,通过RawValue,RawArray等包装类提供
Inheritance 系统内核对象,以及基于这些对象实现的对象。包括Pipe, Queue, JoinableQueue, 同步对象(Semaphore, Lock, RLock, Condition, Event等等)
Server process 所有对象,可能需要自己手工提供代理对象(Proxy)

这个表总结了三种不同的共享方式所支持的类型,下面一个个展开讨论。

其中最单纯简单的就是shared memory这种方式,只有ctypes当中的数据类型可以通过这种方式共享。由于mp库本身缺少命名的机制,即在一个进程当中创建的对象,无法在另外一个进程当中通过名字来引用,因此,这种共享方式依赖于继承,对象应该由父进程创建,然后由子进程引用。关于这种机制的例子,可以参见Python文档当中的例子 Synchronization types like locks, conditions and queues,参考其中的test_sharedvalues函数。

然后是继承方式。首先关于继承方式需要有说明,继承本质上并不是一种对象共享的机制,对象共享只是其副作用。子进程从父进程继承来的对象并不一定是共享的。继承本质上是父进程fork出的子进程自动继承父进程的内存状态和对象描述符。因此,实际上子进程复制了一份父进程的对象,只不过,当这个对象包装了一些系统内核对象的描述符的时候,拷贝这个对象(及其包装的描述符)实现了对象的共享。因此,在上面的表当中,只有系统内核对象,和基于这些对象实现的对象,才能够通过继承来共享。通过继承共享的对象在linux平台上没有任何限制,但是在Windows上面由于没有fork的实现,因此有一些额外的限制条件,因此,在Windows上面,继承方式是几乎无法用的。

最后就是Server Process这种方式。这种方式可以支持的类型比另外两种都多,因为其模型是这样的:

server process模型

server process模型

在这个模型当中,有一个manager进程,负责管理实际的对象。真正的对象也是在manager进程的内存空间当中。所有需要访问该对象的进程都需要先连接到该管理进程,然后获取到对象的一个代理对象(Proxy object),通常情况下,这个代理对象提供了实际对象的公共函数的代理,将函数参数进行pickle,然后通过连接传送到管理进程当中,管理进程将参数unpickle之后,转发给相应的实际对象的函数,返回值(或者异常)同样经过管理进程pickle之后,通过连接传回到客户进程,再由proxy对象进行unpickle,返回给调用者或者抛出异常。

很明显,这个模型是一个典型的RPC(远程过程调用)的模型。因为每个客户进程实际上都是在访问manager进程当中的对象,因此完全可以通过这个实现对象共享。

manager和proxy之间的连接可以是基于socket的网络连接,也可以是unix pipe。如果是使用基于socket的连接方式,在使用proxy之前,需要调用manager对象的connect函数与远程的manager进程建立连接。由于manager进程会打开端口接收该连接,因此必要的身份验证是需要的,否则任何人都可以连上manager弄乱你的共享对象。mp库通过authkey的方式来进行身份验证。

在实现当中,manager进程通过multiprocessing.Manager类或者BaseManager的子类实现。BaseManager提供了函数register注册一个函数来获取共享对象的proxy。这个函数会被客户进程调用,然后在manager进程当中执行。这个函数可以返回一个共享的对象(对所有的调用返回同一个对象),或者可以为每一个调用创建一个新的对象,通过前者就可以实现多个进程共享一个对象。关于这个的用法可以参考Python文档当中的例子“Demonstration of how to create and use customized managers and proxies”。

典型的导出一个共享对象的代码是:

ObjectType object_
class ObjectManager(multiprocessing.managers.BaseManager): pass
ObjectManager.register("object", lambda: object_)

注意上面介绍proxy对象的时候,我提到的“公共函数”四个字。每个proxy对象只会导出实际对象的公共函数。这里面有两个含义,一个是“公共”,即所有非下划线开头的成员,另一个是“函数”,即所有callable的成员。这就带来一些限制,一是无法导出属性,二是无法导出一些公共的特殊函数,例如__get__, __next__等等。对于这个mp库有一套处理,即自定义proxy对象。首先是BaseManager的register可以提供一个proxy_type作为第三个参数,这个参数指定了哪些成员需要被导出。详细的使用方法可以参见文档当中的第一个例子。

另外manager还有一些细节的问题需要注意。由于Proxy对象不是线程安全的,因此如果需要在一个多线程程序当中使用proxy,mp库会为每个线程创建一个proxy对象,而每个proxy对象都会对server process创建一个连接,而manager那边对于每个连接都创建一个单独的线程来为其服务。这样带来的问题就是,如果客户进程有很多线程,很容易会导致manager进程的fd数目达到ulimit的限制,即使没有达到限制,也会因为manager进程当中有太多线程而严重影响manager的性能。解决方案可以是一个进程内cache,只有一个单独的线程可以创建proxy对象访问共享对象,其余线程只能访问该进程当中的cache。

一旦manager因为达到ulimit限制或者其他异常,manager会直接退出,遗憾的是,这时候已经建立的proxy会试图重新连接manager – 但是它已经不存在了。这个会导致客户进程hang在对proxy的函数调用上,这个时候,目前除了杀掉进程没有找到别的办法。

另外proxy使用socket的方式比较tricky,因此和内置的socket库有很多冲突,比如socket.setdefaulttimeout(Python Issue 6056 )。在setdefaulttimeout调用了之后,进程当中所有通过socket模块建立的socket都是被设置为unblock模式的,但是mp库并不知道这一点,而且它总是假设socket都是block模式的,于是,一旦调用了setdefaulttimeout,所有对于proxy的函数调用都会抛出OSError,错误代码为11,错误原因是非常有误导性的“Resource temporarily unavailable”,实际上就是EAGAIN。这个错误可以通过我提供的一个patch来补救(这个patch当中还包含其他的一些修复,所以请自行查看并修改该patch)。

由于以上的一些原因,server process模式作为一个对象的共享模式,能够提供最为灵活的共享方式,但是也有最多的问题。这个在使用过程当中就靠自己去衡量了。目前我们的系统对于数据可靠性方面要求不高,丢失数据是可以接受的,但是也只用这种模式来维护统计值,不敢用来维护更多的东西。

关于跨进程共享对象的问题就写到这里,后面内容待续……

test_sharedvalues

Python multiprocessing 使用手记[1] – 进程模型

2009年11月10日 没有评论

首先从multiprocessing的进程模型开始看。

multiprocessing的目的是创建一个接口和python.threading类似接口的库,用多进程的方式来并发处理。因此创建一个新的进程的的方法也和python.threading很像:

import multiprocessing
 
def dosomething(a,b,c): pass
 
p = multiprocessing.Process(target=dosomething, args=(1,2,3))
p.start()
p.join()

创建的这个新的进程在*nix上面使用的是fork,这意味着子进程开始执行的时候具有与父进程相同的全部内容。请记住这点,这个将是下面我们讨论基于继承的对象共享的基础。所谓基于继承的对象共享,是说在创建子进程之前由父进程初始化的某些对象可以在子进程当中直接访问到。在Windows平台上,因为没有fork语义的系统调用,基于继承的共享对象比*nix有更多的限制,最主要就是体现在要求Process的__init__当中的参数必须可以Pickle。

但是,并不是所有的对象都是可以通过继承来共享,只有multiprocessing库当中的某些对象才可以。例如Queue,同步对象,共享变量,Manager等等。

在一个multiprocessing库的典型使用场景下,所有的子进程都是由一个父进程启动起来的,这个父进程称为master进程。这个父进程非常重要,它会管理一系列的对象状态,一旦这个进程退出,子进程很可能会处于一个很不稳定的状态,因为它们共享的状态也许已经被损坏掉了。因此,这个进程最好尽可能做最少的事情,以便保持其稳定性。

Python multiprocessing库使用手记(引子)

2009年11月9日 2 条评论

前段时间在做的一个Python项目,需要实现一个后台服务程序,程序流程比较复杂,而且可能经常变动,但是如果把整个流程切分成一些步骤,每个步骤有自己的输入输出和处理。只要将他们的输入输出接在一起,进行不同的组合就可以实现常见的流程变动。

使用多进程的原因是考虑到Python的全局解释器锁(Global Interceptor Lock, GIL)。由于GIL的存在,在CPU密集型的程序当中,使用多线程并不能有效地利用多核CPU的优势,因为一个解释器在同一时刻只会有一个线程在执行。要想尽可能利用多核CPU并发,多进程是必需的。

引入多进程就带来几个问题:首先是这种设计方案需要一个类似于UNIX Pipe的底层结构,说的更确切一点就是message queue,操作类似于Python的Queue.Queue。另一个问题就是需要一个进程间信息共享的基础设施。

先说Queue,这个Queue需要满足下面一些需求,优先级递减:

  • 轻量 – 不需要依赖数据库等比较重型的程序。代价就是丢点东西没关系;
  • 跨进程 – 读端和写端可以不在一个进程当中;
  • 支持多进多出 – 读端和写端都要支持多个进程;
  • 稳定和高效 – 这个就不用多说了吧;
  • 有Pythonçš„API,再不济要有Cçš„API可以让我写Pythonçš„binding。

另外是进程间共享的设施,系统shm是迫不得已使用的,因为要自己处理Pickling和Unpickling,以及一些杂七杂八的竞争条件问题。有建构在上面的现成的库最好。

最终出于轻量的考虑,选定了Python的multiprocessing库,而没有用IPC MQ 或 RabbitMQ。这个库在Python2.6被引入到标准库当中,有为2.5提供的backport。因为我们的系统使用的是2.5版本,因此我们使用的其实是2.5的backport。这个实现除了一些细枝末节的地方之外,核心实现和2.6标准库当中是完全一样的。

在使用过程当中,遇到了挺多的问题,在这里一一记录。

Follow up: 可用于OpenInkpot的Trac扩展:TranslatedPages

2009年1月26日 2 条评论

继续上一篇当中提到的扩展。

http://trac-hacks.org/wiki/TranslatedPagesMacro

扩展目前实现成Macro的形式,但是硬编码了支持的语言类型,很丑。

下一步继续扩展,需要提供一个Web UI来管理需要支持的语言列表,然后还要加上权限控制,估计要写成Plugin了。