📄 pubsub.py
字号:
self.__topicTree = _TopicTreeRoot()
#
# Public API
#
def getDeliveryCount(self):
"""How many listeners have received a message since beginning of run"""
return self.__deliveryCount
def getMessageCount(self):
"""How many times sendMessage() was called since beginning of run"""
return self.__messageCount
def subscribe(self, listener, topic = ALL_TOPICS):
"""
Subscribe listener for given topic. If topic is not specified,
listener will be subscribed for all topics (that listener will
receive a Message for any topic for which a message is generated).
This method may be called multiple times for one listener,
registering it with many topics. It can also be invoked many
times for a particular topic, each time with a different
listener. See the class doc for requirements on listener and
topic.
:note: The listener is held by Publisher() only by *weak*
reference. This means you must ensure you have at
least one strong reference to listener, otherwise it
will be DOA ("dead on arrival"). This is particularly
easy to forget when wrapping a listener method in a
proxy object (e.g. to bind some of its parameters),
e.g.::
class Foo:
def listener(self, event): pass
class Wrapper:
def __init__(self, fun): self.fun = fun
def __call__(self, *args): self.fun(*args)
foo = Foo()
Publisher().subscribe( Wrapper(foo.listener) ) # whoops: DOA!
wrapper = Wrapper(foo.listener)
Publisher().subscribe(wrapper) # good!
:note: Calling this method for the same listener, with two
topics in the same branch of the topic hierarchy, will
cause the listener to be notified twice when a message
for the deepest topic is sent. E.g.
subscribe(listener, 't1') and then subscribe(listener,
('t1','t2')) means that when calling sendMessage('t1'),
listener gets one message, but when calling
sendMessage(('t1','t2')), listener gets message twice.
"""
self.validate(listener)
if topic is None:
raise TypeError, 'Topic must be either a word, tuple of '\
'words, or getStrAllTopics()'
self.__topicTree.addTopic(_tupleize(topic), listener)
def isSubscribed(self, listener, topic=None):
"""Return true if listener has subscribed to topic specified.
If no topic specified, return true if subscribed to something.
Use topic=getStrAllTopics() to determine if a listener will receive
messages for all topics."""
return self.__topicTree.isSubscribed(listener, topic)
def validate(self, listener):
"""Similar to isValid(), but raises a TypeError exception if not valid"""
# check callable
if not callable(listener):
raise TypeError, 'Listener '+`listener`+' must be a '\
'function, bound method or instance.'
# ok, callable, but if method, is it bound:
elif ismethod(listener) and not _isbound(listener):
raise TypeError, 'Listener '+`listener`+\
' is a method but it is unbound!'
# check that it takes the right number of parameters
min, d = _paramMinCount(listener)
if min > 1:
raise TypeError, 'Listener '+`listener`+" can't"\
' require more than one parameter!'
if min <= 0 and d == 0:
raise TypeError, 'Listener '+`listener`+' lacking arguments!'
assert (min == 0 and d>0) or (min == 1)
def isValid(self, listener):
"""Return true only if listener will be able to subscribe to
Publisher."""
try:
self.validate(listener)
return True
except TypeError:
return False
def unsubAll(self, topics=None, onNoSuchTopic=None):
"""Unsubscribe all listeners subscribed for topics. Topics can
be a single topic (string or tuple) or a list of topics (ie
list containing strings and/or tuples). If topics is not
specified, all listeners for all topics will be unsubscribed,
ie. the Publisher singleton will have no topics and no listeners
left. If onNoSuchTopic is given, it will be called as
onNoSuchTopic(topic) for each topic that is unknown.
"""
if topics is None:
del self.__topicTree
self.__topicTree = _TopicTreeRoot()
return
# make sure every topics are in tuple form
if isinstance(topics, list):
topicList = [_tupleize(x) for x in topics]
else:
topicList = [_tupleize(topics)]
# unsub every listener of topics
self.__topicTree.unsubAll(topicList, onNoSuchTopic)
def unsubscribe(self, listener, topics=None):
"""Unsubscribe listener. If topics not specified, listener is
completely unsubscribed. Otherwise, it is unsubscribed only
for the topic (the usual tuple) or list of topics (ie a list
of tuples) specified. Nothing happens if listener is not actually
subscribed to any of the topics.
Note that if listener subscribed for two topics (a,b) and (a,c),
then unsubscribing for topic (a) will do nothing. You must
use getAssociatedTopics(listener) and give unsubscribe() the returned
list (or a subset thereof).
"""
self.validate(listener)
topicList = None
if topics is not None:
if isinstance(topics, list):
topicList = [_tupleize(x) for x in topics]
else:
topicList = [_tupleize(topics)]
self.__topicTree.unsubscribe(listener, topicList)
def getAssociatedTopics(self, listener):
"""Return a list of topics the given listener is registered with.
Returns [] if listener never subscribed.
:attention: when using the return of this method to compare to
expected list of topics, remember that topics that are
not in the form of a tuple appear as a one-tuple in
the return. E.g. if you have subscribed a listener to
'topic1' and ('topic2','subtopic2'), this method
returns::
associatedTopics = [('topic1',), ('topic2','subtopic2')]
"""
return self.__topicTree.getTopics(listener)
def sendMessage(self, topic=ALL_TOPICS, data=None, onTopicNeverCreated=None):
"""Send a message for given topic, with optional data, to
subscribed listeners. If topic is not specified, only the
listeners that are interested in all topics will receive message.
The onTopicNeverCreated is an optional callback of your choice that
will be called if the topic given was never created (i.e. it, or
one of its subtopics, was never subscribed to by any listener).
It will be called as onTopicNeverCreated(topic)."""
aTopic = _tupleize(topic)
message = Message(aTopic, data)
self.__messageCount += 1
# send to those who listen to all topics
self.__deliveryCount += \
self.__topicTree.sendMessage(aTopic, message, onTopicNeverCreated)
#
# Private methods
#
def __call__(self):
"""Allows for singleton"""
return self
def __str__(self):
return str(self.__topicTree)
# Create the Publisher singleton. We prevent users from (inadvertently)
# instantiating more than one object, by requiring a key that is
# accessible only to module. From
# this point forward any calls to Publisher() will invoke the __call__
# of this instance which just returns itself.
#
# The only flaw with this approach is that you can't derive a new
# class from Publisher without jumping through hoops. If this ever
# becomes an issue then a new Singleton implementaion will need to be
# employed.
_key = _SingletonKey()
Publisher = PublisherClass(_key)
#---------------------------------------------------------------------------
class Message:
"""
A simple container object for the two components of a message: the
topic and the user data. An instance of Message is given to your
listener when called by Publisher().sendMessage(topic) (if your
listener callback was registered for that topic).
"""
def __init__(self, topic, data):
self.topic = topic
self.data = data
def __str__(self):
return '[Topic: '+`self.topic`+', Data: '+`self.data`+']'
#---------------------------------------------------------------------------
#
# Code for a simple command-line test
#
def test():
def done(funcName):
print '----------- Done %s -----------' % funcName
def testParam():
def testFunc00(): pass
def testFunc21(a,b,c=1): pass
def testFuncA(*args): pass
def testFuncAK(*args,**kwds): pass
def testFuncK(**kwds): pass
class Foo:
def testMeth(self,a,b): pass
def __call__(self, a): pass
class Foo2:
def __call__(self, *args): pass
assert _paramMinCount(testFunc00)==(0,0)
assert _paramMinCount(testFunc21)==(2,1)
assert _paramMinCount(testFuncA) ==(1,0)
assert _paramMinCount(testFuncAK)==(1,0)
assert _paramMinCount(testFuncK) ==(0,0)
foo = Foo()
assert _paramMinCount(Foo.testMeth)==(2,0)
assert _paramMinCount(foo.testMeth)==(2,0)
assert _paramMinCount(foo)==(1,0)
assert _paramMinCount(Foo2())==(1,0)
done('testParam')
testParam()
#------------------------
_NodeCallback.notified = 0
def testPreNotifyNode(self, dead):
_NodeCallback.notified += 1
print 'testPreNotifyNODE heard notification of', `dead`
_NodeCallback.preNotify = testPreNotifyNode
def testTreeNode():
class WS:
def __init__(self, s):
self.s = s
def __call__(self, msg):
print 'WS#', self.s, ' received msg ', msg
def __str__(self):
return self.s
def testPreNotifyRoot(dead):
print 'testPreNotifyROOT heard notification of', `dead`
node = _TopicTreeNode((ALL_TOPICS,), WeakRef(testPreNotifyRoot))
boo, baz, bid = WS('boo'), WS('baz'), WS('bid')
node.addCallable(boo)
node.addCallable(baz)
node.addCallable(boo)
assert node.getCallables() == [boo,baz]
assert node.hasCallable(boo)
node.removeCallable(bid) # no-op
assert node.hasCallable(baz)
assert node.getCallables() == [boo,baz]
node.removeCallable(boo)
assert node.getCallables() == [baz]
assert node.hasCallable(baz)
assert not node.hasCallable(boo)
node.removeCallable(baz)
assert node.getCallables() == []
assert not node.hasCallable(baz)
node2 = node.createSubtopic('st1', ('st1',))
node3 = node.createSubtopic('st2', ('st2',))
cb1, cb2, cb = WS('st1_cb1'), WS('st1_cb2'), WS('st2_cb')
node2.addCallable(cb1)
node2.addCallable(cb2)
node3.addCallable(cb)
node2.createSubtopic('st3', ('st1','st3'))
node2.createSubtopic('st4', ('st1','st4'))
print str(node)
assert str(node) == ' (st1: st1_cb1 st1_cb2 (st4: ) (st3: )) (st2: st2_cb )'
# verify send message, and that a dead listener does not get sent one
delivered = node2.sendMessage('hello')
assert delivered == 2
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -