[Twisted-Python] Event/Notifier Across Pb Boundaries
Moshe Zadka
m at moshez.org
Wed Sep 19 02:13:16 MDT 2001
I've written a small module to do event/notifications. It's a fairly
small and nice example of how to write pb protocols as well as being
something useful
>----------- twisted/internet/event.py ---------------<
# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from twisted.spread import pb
from twisted.internet import passport
from twisted.python import reflect
class EventPublishPerspective(pb.Perspective):
def perspective_subscribe(self, event, subscriber):
self.service.subscribe(event, subscriber)
def perspective_notifyEvent(self, event, *args, **kw):
self.service.notifyEvent(event, args, kw)
class EventPublishService(pb.Service):
def __init__(self, *args, **kw):
apply(pb.Service.__init__, (self,)+args, kw)
self.eventSubscribers = {}
def subscribe(self, event, subscriber):
if not self.eventSubscribers.has_key(event):
self.eventSubscribers[event] = []
self.eventSubscribers[event].append(subscriber)
def notifyEvent(self, event, args, kw):
for subscriber in self.eventSubscribers.get(event, ()):
try:
apply(subscriber.notifyEvent, (event,)+args, kw)
except pb.ProtocolError:
pass
def getPerspectiveNamed(self, name):
return EventPublishPerspective("any", self)
class EventNotifier(pb.Referenceable):
def registerAll(self, perspective):
dct = {}
reflect.addMethodNamesToDict(self.__class__, dct, "event_")
for name in dct.keys():
perspective.subscribe(name, self)
def remote_notifyEvent(self, event, *args, **kw):
method = getattr(self, 'event_'+event, None)
if method is None:
return
apply(method, args, kw)
if __name__ == '__main__':
import event
from twisted.internet.main import Application
app = Application("event")
i = passport.Identity("guest", app)
i.setPassword("guest")
app.authorizer.addIdentity(i)
bf = pb.BrokerFactory(app)
svc = event.EventPublishService("event", app)
i.addKeyForPerspective(svc.getPerspectiveNamed('any'))
app.listenOn(pb.portno, bf)
app.save("start")
>-----------------------------------------------------------<
>----------- run this to get "event-start.tap" -------------<
from twisted.internet import event, passport, main
from twisted.spread import pb
app = main.Application("event")
i = passport.Identity("guest", app)
i.setPassword("guest")
app.authorizer.addIdentity(i)
bf = pb.BrokerFactory(app)
svc = event.EventPublishService("event", app)
i.addKeyForPerspective(svc.getPerspectiveNamed('any'))
app.listenOn(pb.portno, bf)
app.save("start")
>--------------------------------------------<
>------------ this connects and gets a notification -----------<
from twisted.spread import pb
from twisted.internet import tcp, main, event
class HelloEvent(event.EventNotifier):
def event_hello(self, who):
print "hello", who
main.shutDown()
def failure(error):
print "Failure...",error
main.shutDown()
def connected(perspective):
hello = HelloEvent()
hello.registerAll(perspective)
print "registered"
perspective.notifyEvent("hello", "world", pberrback=failure)
print "connected."
def preConnected(identity):
identity.attach("event", "any", None,
pbcallback=connected,
pberrback=failure)
b = pb.Broker()
b.requestIdentity("guest", "guest",
callback = preConnected,
errback = failure)
tcp.Client("localhost",pb.portno,b)
main.run()
>-------------------------------------------<
--
The Official Moshe Zadka FAQ: http://moshez.geek
The Official Moshe Zadka FAQ For Dummies: http://moshez.org
Read the FAQ
More information about the Twisted-Python
mailing list