📄 pubsub.py
字号:
except ValueError:
return False
def sendMessage(self, message):
"""Send a message to our callables"""
deliveryCount = 0
for cb in self.__callables:
listener = cb()
if listener is not None:
listener(message)
deliveryCount += 1
return deliveryCount
def removeCallable(self, callable):
"""Remove weak callable from our node (and return True).
Does nothing if not here (and returns False)."""
try:
self.__callables.remove(_getWeakRef(callable))
return True
except ValueError:
return False
def clearCallables(self):
"""Abandon list of callables to caller. We no longer have
any callables after this method is called."""
tmpList = [cb for cb in self.__callables if cb() is not None]
self.__callables = []
return tmpList
def __notifyDead(self, dead):
"""Gets called when a listener dies, thanks to WeakRef"""
#print 'TreeNODE', `self`, 'received death certificate for ', dead
self.__cleanupDead()
if self.__onDeadListenerWeakCB is not None:
cb = self.__onDeadListenerWeakCB()
if cb is not None:
cb(dead)
def __cleanupDead(self):
"""Remove all dead objects from list of callables"""
self.__callables = [cb for cb in self.__callables if cb() is not None]
def __str__(self):
"""Print us in a not-so-friendly, but readable way, good for debugging."""
strVal = []
for callable in self.getCallables():
strVal.append(_getCallableName(callable))
for topic, node in self.__subtopics.iteritems():
strVal.append(' (%s: %s)' %(topic, node))
return ''.join(strVal)
class _TopicTreeRoot(_TopicTreeNode):
"""
The root of the tree knows how to access other node of the
tree and is the gateway of the tree user to the tree nodes.
It can create topics, and and remove callbacks, etc.
For efficiency, it stores a dictionary of listener-topics,
so that unsubscribing a listener just requires finding the
topics associated to a listener, and finding the corresponding
nodes of the tree. Without it, unsubscribing would require
that we search the whole tree for all nodes that contain
given listener. Since Publisher is a singleton, it will
contain all topics in the system so it is likely to be a large
tree. However, it is possible that in some runs, unsubscribe()
is called very little by the user, in which case most unsubscriptions
are automatic, ie caused by the listeners dying. In this case,
a flag is set to indicate that the dictionary should be cleaned up
at the next opportunity. This is not necessary, it is just an
optimization.
"""
def __init__(self):
self.__callbackDict = {}
self.__callbackDictCleanup = 0
# all child nodes will call our __rootNotifyDead method
# when one of their registered listeners dies
_TopicTreeNode.__init__(self, (ALL_TOPICS,),
_getWeakRef(self.__rootNotifyDead))
def addTopic(self, topic, listener):
"""Add topic to tree if doesnt exist, and add listener to topic node"""
assert isinstance(topic, tuple)
topicNode = self.__getTreeNode(topic, make=True)
weakCB = topicNode.addCallable(listener)
assert topicNode.hasCallable(listener)
theList = self.__callbackDict.setdefault(weakCB, [])
assert self.__callbackDict.has_key(weakCB)
# add it only if we don't already have it
try:
weakTopicNode = WeakRef(topicNode)
theList.index(weakTopicNode)
except ValueError:
theList.append(weakTopicNode)
assert self.__callbackDict[weakCB].index(weakTopicNode) >= 0
def getTopics(self, listener):
"""Return the list of topics for given listener"""
weakNodes = self.__callbackDict.get(_getWeakRef(listener), [])
return [weakNode().getPathname() for weakNode in weakNodes
if weakNode() is not None]
def isSubscribed(self, listener, topic=None):
"""Return true if listener is registered for topic specified.
If no topic specified, return true if subscribed to something.
Use topic=getStrAllTopics() to determine if a listener will receive
messages for all topics."""
weakCB = _getWeakRef(listener)
if topic is None:
return self.__callbackDict.has_key(weakCB)
else:
topicPath = _tupleize(topic)
for weakNode in self.__callbackDict[weakCB]:
if topicPath == weakNode().getPathname():
return True
return False
def unsubscribe(self, listener, topicList):
"""Remove listener from given list of topics. If topicList
doesn't have any topics for which listener has subscribed,
nothing happens."""
weakCB = _getWeakRef(listener)
if not self.__callbackDict.has_key(weakCB):
return
cbNodes = self.__callbackDict[weakCB]
if topicList is None:
for weakNode in cbNodes:
weakNode().removeCallable(listener)
del self.__callbackDict[weakCB]
return
for weakNode in cbNodes:
node = weakNode()
if node is not None and node.getPathname() in topicList:
success = node.removeCallable(listener)
assert success == True
cbNodes.remove(weakNode)
assert not self.isSubscribed(listener, node.getPathname())
def unsubAll(self, topicList, onNoSuchTopic):
"""Unsubscribe all listeners registered for any topic in
topicList. If a topic in the list does not exist, and
onNoSuchTopic is not None, a call
to onNoSuchTopic(topic) is done for that topic."""
for topic in topicList:
node = self.__getTreeNode(topic)
if node is not None:
weakCallables = node.clearCallables()
for callable in weakCallables:
weakNodes = self.__callbackDict[callable]
success = _removeItem(WeakRef(node), weakNodes)
assert success == True
if weakNodes == []:
del self.__callbackDict[callable]
elif onNoSuchTopic is not None:
onNoSuchTopic(topic)
def sendMessage(self, topic, message, onTopicNeverCreated):
"""Send a message for given topic to all registered listeners. If
topic doesn't exist, call onTopicNeverCreated(topic)."""
# send to the all-toipcs listeners
deliveryCount = _TopicTreeNode.sendMessage(self, message)
# send to those who listen to given topic or any of its supertopics
node = self
for topicItem in topic:
assert topicItem != ''
if node.hasSubtopic(topicItem):
node = node.getNode(topicItem)
deliveryCount += node.sendMessage(message)
else: # topic never created, don't bother continuing
if onTopicNeverCreated is not None:
onTopicNeverCreated(topic)
break
return deliveryCount
def numListeners(self):
"""Return a pair (live, dead) with count of live and dead listeners in tree"""
dead, live = 0, 0
for cb in self.__callbackDict:
if cb() is None:
dead += 1
else:
live += 1
return live, dead
# clean up the callback dictionary after how many dead listeners
callbackDeadLimit = 10
def __rootNotifyDead(self, dead):
#print 'TreeROOT received death certificate for ', dead
self.__callbackDictCleanup += 1
if self.__callbackDictCleanup > _TopicTreeRoot.callbackDeadLimit:
self.__callbackDictCleanup = 0
oldDict = self.__callbackDict
self.__callbackDict = {}
for weakCB, weakNodes in oldDict.iteritems():
if weakCB() is not None:
self.__callbackDict[weakCB] = weakNodes
def __getTreeNode(self, topic, make=False):
"""Return the tree node for 'topic' from the topic tree. If it
doesnt exist and make=True, create it first."""
# if the all-topics, give root;
if topic == (ALL_TOPICS,):
return self
# not root, so traverse tree
node = self
path = ()
for topicItem in topic:
path += (topicItem,)
if topicItem == ALL_TOPICS:
raise ValueError, 'Topic tuple must not contain ""'
if make:
node = node.createSubtopic(topicItem, path)
elif node.hasSubtopic(topicItem):
node = node.getNode(topicItem)
else:
return None
# done
return node
def printCallbacks(self):
strVal = ['Callbacks:\n']
for listener, weakTopicNodes in self.__callbackDict.iteritems():
topics = [topic() for topic in weakTopicNodes if topic() is not None]
strVal.append(' %s: %s\n' % (_getCallableName(listener()), topics))
return ''.join(strVal)
def __str__(self):
return 'all: %s' % _TopicTreeNode.__str__(self)
# -----------------------------------------------------------------------------
class _SingletonKey: pass
class PublisherClass:
"""
The publish/subscribe manager. It keeps track of which listeners
are interested in which topics (see subscribe()), and sends a
Message for a given topic to listeners that have subscribed to
that topic, with optional user data (see sendMessage()).
The three important concepts for Publisher are:
- listener: a function, bound method or
callable object that can be called with one parameter
(not counting 'self' in the case of methods). The parameter
will be a reference to a Message object. E.g., these listeners
are ok::
class Foo:
def __call__(self, a, b=1): pass # can be called with only one arg
def meth(self, a): pass # takes only one arg
def meth2(self, a=2, b=''): pass # can be called with one arg
def func(a, b=''): pass
Foo foo
Publisher().subscribe(foo) # functor
Publisher().subscribe(foo.meth) # bound method
Publisher().subscribe(foo.meth2) # bound method
Publisher().subscribe(func) # function
The three types of callables all have arguments that allow a call
with only one argument. In every case, the parameter 'a' will contain
the message.
- topic: a single word, a tuple of words, or a string containing a
set of words separated by dots, for example: 'sports.baseball'.
A tuple or a dotted notation string denotes a hierarchy of
topics from most general to least. For example, a listener of
this topic::
('sports','baseball')
would receive messages for these topics::
('sports', 'baseball') # because same
('sports', 'baseball', 'highscores') # because more specific
but not these::
'sports' # because more general
('sports',) # because more general
() or ('') # because only for those listening to 'all' topics
('news') # because different topic
- message: this is an instance of Message, containing the topic for
which the message was sent, and any data the sender specified.
:note: This class is visible to importers of pubsub only as a
Singleton. I.e., every time you execute 'Publisher()', it's
actually the same instance of PublisherClass that is
returned. So to use, just do'Publisher().method()'.
"""
__ALL_TOPICS_TPL = (ALL_TOPICS, )
def __init__(self, singletonKey):
"""Construct a Publisher. This can only be done by the pubsub
module. You just use pubsub.Publisher()."""
if not isinstance(singletonKey, _SingletonKey):
raise invalid_argument("Use Publisher() to get access to singleton")
self.__messageCount = 0
self.__deliveryCount = 0
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -