12.11 实现消息发布/订阅模型
问题
You have a program based on communicating threads and want them to implementpublish/subscribe messaging.
解决方案
To implement publish/subscribe messaging, you typically introduce a separate “ex‐change” or “gateway” object that acts as an intermediary for all messages. That is, insteadof directly sending a message from one task to another, a message is sent to the exchangeand it delivers it to one or more attached tasks. Here is one example of a very simpleexchange implementation:
from collections import defaultdict
class Exchange:def init(self):self._subscribers = set()def attach(self, task):self._subscribers.add(task)def detach(self, task):self._subscribers.remove(task)def send(self, msg):for subscriber in self._subscribers:subscriber.send(msg)
Dictionary of all created exchanges_exchanges = defaultdict(Exchange)
Return the Exchange instance associated with a given namedef get_exchange(name):
return _exchanges[name]
An exchange is really nothing more than an object that keeps a set of active subscribersand provides methods for attaching, detaching, and sending messages. Each exchangeis identified by a name, and the get_exchange() function simply returns the Exchange instance associated with a given name.Here is a simple example that shows how to use an exchange:
Example of a task. Any object with a send() method
class Task:
...def send(self, msg):
...
task_a = Task()task_b = Task()
Example of getting an exchangeexc = get_exchange(‘name')
Examples of subscribing tasks to itexc.attach(task_a)exc.attach(task_b)
Example of sending messagesexc.send(‘msg1')exc.send(‘msg2')
Example of unsubscribingexc.detach(task_a)exc.detach(task_b)
Although there are many different variations on this theme, the overall idea is the same.Messages will be delivered to an exchange and the exchange will deliver them to attachedsubscribers.
讨论
The concept of tasks or threads sending messages to one another (often via queues) iseasy to implement and quite popular. However, the benefits of using a public/subscribe(pub/sub) model instead are often overlooked.First, the use of an exchange can simplify much of the plumbing involved in setting upcommunicating threads. Instead of trying to wire threads together across multiple pro‐gram modules, you only worry about connecting them to a known exchange. In somesense, this is similar to how the logging library works. In practice, it can make it easierto decouple various tasks in the program.Second, the ability of the exchange to broadcast messages to multiple subscribers opensup new communication patterns. For example, you could implement systems with re‐dundant tasks, broadcasting, or fan-out. You could also build debugging and diagnostictools that attach themselves to exchanges as ordinary subscribers. For example, here isa simple diagnostic class that would display sent messages:
class DisplayMessages:def init(self):self.count = 0def send(self, msg):self.count += 1print(‘msg[{}]: {!r}'.format(self.count, msg))
exc = get_exchange(‘name')d = DisplayMessages()exc.attach(d)
Last, but not least, a notable aspect of the implementation is that it works with a varietyof task-like objects. For example, the receivers of a message could be actors (as describedin Recipe 12.10), coroutines, network connections, or just about anything that imple‐ments a proper send() method.One potentially problematic aspect of an exchange concerns the proper attachment anddetachment of subscribers. In order to properly manage resources, every subscriber thatattaches must eventually detach. This leads to a programming model similar to this:
exc = get_exchange(‘name')exc.attach(some_task)try:
...
finally:exc.detach(some_task)
In some sense, this is similar to the usage of files, locks, and similar objects. Experiencehas shown that it is quite easy to forget the final detach() step. To simplify this, youmight consider the use of the context-management protocol. For example, adding asubscribe() method to the exchange like this:
from contextlib import contextmanagerfrom collections import defaultdict
class Exchange:def init(self):self._subscribers = set()def attach(self, task):self._subscribers.add(task)def detach(self, task):self._subscribers.remove(task)
@contextmanagerdef subscribe(self, *tasks):
for task in tasks:self.attach(task)try:yieldfinally:for task in tasks:self.detach(task)
def send(self, msg):for subscriber in self._subscribers:subscriber.send(msg)
Dictionary of all created exchanges_exchanges = defaultdict(Exchange)
Return the Exchange instance associated with a given namedef get_exchange(name):
return _exchanges[name]
Example of using the subscribe() methodexc = get_exchange(‘name')with exc.subscribe(task_a, task_b):
...exc.send(‘msg1')exc.send(‘msg2')...
task_a and task_b detached here
Finally, it should be noted that there are numerous possible extensions to the exchangeidea. For example, exchanges could implement an entire collection of message channels
or apply pattern matching rules to exchange names. Exchanges can also be extendedinto distributed computing applications (e.g., routing messages to tasks on differentmachines, etc.).
更多建议: