[Twisted-Python] Factory question
Gabriel Rossetti
mailing_lists at evotex.ch
Fri Feb 29 03:34:46 MST 2008
Gabriel Rossetti wrote:
> Drew Smathers wrote:
>> On Thu, Feb 28, 2008 at 9:42 AM, Gabriel Rossetti
>> <mailing_lists at evotex.ch> wrote:
>>
>>> Drew Smathers wrote:
>>> > On Wed, Feb 27, 2008 at 3:32 AM, Gabriel Rossetti
>>> > <mailing_lists at evotex.ch> wrote:
>>> >
>>> >> Hello everyone,
>>> >>
>>> >> I have a small question, I have a service which needs to
>>> sometimes send
>>> >> data (without having received any prior to sending) and sometimes
>>> >> receive data, which is better :
>>> >>
>>> >> 1) create a factory that inherits from ServerFactory and
>>> ClientFactory,
>>> >> thus it can listen and send data
>>> >>
>>> >> 2) create a factory that inherits from ServerFactory only and
>>> uses a
>>> >> single-use client (ClientCreator, as shown in the writing
>>> clients howto)
>>> >> when it needs to send data
>>> >>
>>> >>
>>> >
>>> > I'm not sure of there's a single right way to do it, but I wouldn't
>>> > bother inheriting from both ClientFactory and ServerFactory. I
>>> think
>>> > you're on the write track with 2, though.
>>> >
>>> >
>>> I had taken route 1 up until now (I'm thinking about switching...)
>>>
>>> Maybe there's something I haven't quite gotten, when ClientA initially
>>> connects to the server, the factory creates an instance of the
>>> protocol,
>>> correct?
>>>
>>
>> Yes.
>>
>>
> Ok
>>> Now ClientA sends some data to the server, which processes it
>>> and sends something back. After that, the TCP session ends, and the
>>> client disconnects, and the protocols instance dies. Is this
>>> correct or
>>> does it live on and get reused somehow?
>>>
>>
>> The protocol instance does not get reused.
>>
>>
> Ok, so every time there is data exchanged (new tcp/ip session) then a
> new protocol instance is created. Any persistence/state data must
> therefore be stored in the factory if I understand correctly.
>>> I ask this because since
>>> initially the clients send data to the server (registration), the
>>> server
>>> will there after send data to the clients. This makes the client
>>> have to
>>> connect to the server initially though a port using
>>> reactor.connectTCP()
>>> and listen to a port (that the server now knows since the client
>>> registered itself) using reactor.listenTCP().
>>>
>>
>> This is might be a bad idea - depending on the locality of your
>> servers and clients. Why not just use the established connection?
> The idea is to have services register on a central server then they
> transmit messages to the daemon which routes/relays them to the
> correct service, sort of like a micro-kernel. So, sometimes the
> services initiate the communication process and sometimes they don't,
> the central server does. This makes the services be servers and
> clients. Imagine this :
>
> service1 has some data that needs to be processed by service2 (which
> will in turn send it to another service), is sends it to the central
> server, which sends it to service2. Service2 does whatever it has to
> do, and then sends it to the central server to route to service_n.
>
> So the established connection is usually useless, except for sending
> some sort of ACK maybe.
>
>> If
>> the *client* is listening on a port then it isn't just a client - it's
>> a server, or a peer in a clustered system.
>>
>>
> I guess it's/they a sort of peer(s) in a clustered system, it/they may
> reside on the same machine as the central server or not.
>>> I think I have to use
>>> reactor.connectTCP() instead of ClientCreator since the connection
>>> has
>>> to happen at the beginning and a transport needs to exist before I can
>>> send anything. Well...now that I think about it, I could have the
>>> factory register the client...... that would make me not have to
>>> inherit
>>> from the Client factory.... (I've now switched to solution 2, see last
>>> part of this email).
>>>
>>>
>>
>> Ok.
>>
>>
>>>> In most use cases you shouldn't have to create custom factories.
>>>>
>>> I have to since I need a non-reconnecting client xmlstream, and the
>>> factory used with xmlstreams is a reconnecting client.
>>>
>>>
>>>> Just
>>>>
>>> > write the protocol to support bidirectional comm and to create the
>>> > server:
>>> >
>>> > f = Factory()
>>> > f.protocol = YourBidirectionalProtocol
>>> >
>>> > Regarding the client, how you implement it depends on whether or not
>>> > the server is establishing the connection vs. reusing the existing
>>> > connection.
>>> What exactly do you mean by reusing an existing connection?
>>>
>>>
>>>> If you're establishing the connection (like in a cluster
>>>>
>>> > app with known peers), just use ClientCreator. If you're reusing
>>> the
>>> > existing connection, then you might not have to anything, unless you
>>> > have some state to set up which could be done by overriding
>>> > connectionMade on your Protocol.
>>> >
>>> > Finally, take everything I've stated above with a grain of salt.
>>> >
>>> >
>>> Thanks, I find it fairly hard to get used to Twisted, I wanted to buy
>>> the book, but it was written in 2005 and I'm not sure if it's still
>>> valid with today's version.
>>>
>>>
>>
>> The book it is not up to date.
>>
>>
>>> BTW, any idea why I'm getting this type of behavior (one server, 3
>>> distinct connections from clients) :
>>>
>>>
>>
>> Without seeing your code, no.
>>
>>
Ok, I figured out what is wrong, when the factories are created, I add a
bootstrap to an event that will call a callback when that event is
received. In the central server, when a client/service connects, the
callback is called and xml event observers are added (and now I added a
method that removes them when the client disconnects). What happens is
when a second client connects, the event calls the callback for every
instance of the protocol, not just the one for the current connection. I
traced it down to the following code in :
self.addBootstrap(xmlstream.STREAM_START_EVENT, xs._connected)
self.addBootstrap(xmlstream.STREAM_END_EVENT, xs._disconnected)
for event, fn in self.bootstraps:
xs.addObserver(event, fn)
since the original XmlStreamFactory/XmlStreamFactoryMixin was designed
to be a XML client and not a server, it has no notion of differentiating
protocol instances (since there is only one)
Gabriel
> ok, it's a bit long.....
>
> -------------------------"Central server" aka
> Daemon---------------------------------------------------------------------------
>
>
> class MdfXmlStreamFactory(XmlStreamFactoryMixin):
> """
> The factory class used by the daemon and services to create
> protocol instances
> """
> def __init__(self, proto, *args, **kwargs):
> """
> Constructor
> @param proto: the protocol to use
> @type proto: a subclass of
> L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
> @param args: misc args
> @type args: C{tuple}
> @param kwargs: misc keyword args
> @type kwargs: C{dict}
> """
> XmlStreamFactoryMixin.__init__(self)
> self.args = args
> self.kwargs = kwargs
> self.protocol = proto
>
> def buildProtocol(self, addr):
> """
> Builds the protocol and
> @param addr: The address (protocol, IP, port) of the
> connection
> @type addr:
> L{IPv4Address<twisted.internet.address._ServerFactoryIPv4Address>}
> @return: an instance of the built protocol
> """
> #self.resetDelay()
> xs = self.protocol(*self.args, **self.kwargs)
> xs.factory = self
> self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
> xs.connected) # stream connect event or xml start event???
> for event, fn in self.bootstraps:
> xs.addObserver(event, fn)
>
> return xs
>
> class MdfXmlStreamServerFactory(MdfXmlStreamFactory, ServerFactory):
> """
> The factory class used by the daemon to create
> protocol instances
> """
> # The registered services
> _services = {}
> def __init__(self, proto, *args, **kwargs):
> """
> Constructor
> @param proto: the protocol to use
> @type proto: a subclass of
> L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
> @param args: misc args
> @type args: C{tuple}
> @param kwargs: misc keyword args
> @type kwargs: C{dict}
> """
> MdfXmlStreamFactory.__init__(self, proto, *args, **kwargs)
>
> class Daemon(xmlstream.XmlStream):
> """
> The daemon is the implementation of a microkernel type inter-service
> communication (ISC) routing daemon. Here is how it works :
> - Services announce their presence to the daemon by giving
> their name,
> version, ip, port and a list of message-types that they accept
> - The daemon listens for messages from the attached services,
> when one
> is received, it routes the message to the correct service
> @todo: add unique id generation/verification
> """
> # Holds the real method
> __dataReceived = xmlstream.XmlStream.dataReceived
> # The registered services
> #__services = {}
> cnt = 1
> def __init__(self, *args, **kwargs):
> """
> Constructor
> @param args: non-keyword args
> @type args: C{tuple}
> @param kwargs: keyword args
> @type kwargs: C{dict}
> """
> xmlstream.XmlStream.__init__(self)
> self.__routeTo = None
> self.__lastMsgType = None
> self.__lastMsgId = None
> self.inst = Daemon.cnt
> Daemon.cnt += 1
> print "daemon proto instance %d" % self.inst
> def connectionMade(self):
> xmlstream.XmlStream.connectionMade(self)
> def dataReceived(self, data):
> """
> Called everytime data is received
> @param data: the data received
> @type data: C{object} (anything)
> """
> self.__dataReceived(data)
> def connectionLost(self, reason):
> """
> Called when the connection is shut down, restores the
> dataReceived method
> @param reason: the reason why the connection was lost
> @type reason: C{str}
> """
> self.__dataReceived = xmlstream.XmlStream.dataReceived
> self.__routeTo = None
> xmlstream.XmlStream.connectionLost(self, reason)
> def __onHeader(self, element):
> """
> Analyse a header and set the data's recipiant
> @param element: the header element (XML)
> @type element: L{Element<twisted.words.xish.domish.Element>}
> """
> print "got header from %s:%s : %s" %
> (str(self.transport.getPeer().host),
> str(self.transport.getPeer().port), element.toXml())
> self.__lastMsgId = element.getAttribute("id")
> self.__lastMsgType = element.getAttribute("type")
> if(self.__lastMsgType != constants._REG_MSG_TYPE):
> self.__routeTo = self.factory._services[type]
> self.__dataReceived = __routeDataReceived
> def __onReg(self, element):
> """
> Register a service
> @param element: the registeration element (XML)
> @type element: L{Element<twisted.words.xish.domish.Element>}
> """
> print "got reg from %s:%s : %s" %
> (str(self.transport.getPeer().host),
> str(self.transport.getPeer().port), element.toXml())
>
> name =
> str(xpath.XPathQuery("/body/reg/name").queryForNodes(element)[0])
> version =
> str(xpath.XPathQuery("/body/reg/version").queryForNodes(element)[0])
> #address =
> str(xpath.XPathQuery("/body/reg/address").queryForNodes(element)[0])
> port =
> int(str(xpath.XPathQuery("/body/reg/port").queryForNodes(element)[0]))
> msgs = [ str(m) for m in
> xpath.XPathQuery("/body/reg/message_type").queryForNodes(element) ]
> address = self.transport.getPeer().host
> #port = self.transport.getPeer().port
> serv = ServiceReg(name, version, msgs, address, port)
> self.__registerService(serv)
> def connected(self, xs):
> """
> Called when a client connects using an XML stream
> @param xs: the current xml stream
> @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
> """
> print 'Connection from %s:%s!' %
> (str(self.transport.getPeer().host), str(self.transport.getPeer().port))
> xs.addObserver("/header", self.__onHeader)
> xs.addObserver("/body/reg", self.__onReg)
> def __routeDataReceived(self, data):
> """
> Pushes the messages to the correct service
> @param data: the data received
> @type data: C{object} (anything)
> """
> print "route '%s' to : %s" % (str(data), self.__routeTo)
> utils.sendMessage(self.__routeTo.ip, self.__routeTo.port,
> data)
> #self.send(data)
> def __registerService(self, service):
> """
> Register a service
> @param service: the service to register
> @type service: L{ServiceReg}
> @raise ServiceMessageConflictError: if another service
> already has a
> message registered that the
> current
> service is trying to register
> @todo: what is to be done with the exception once
> raised???? Finish status message...
> """
> def foundConflict(self, msgTypes):
> """
> Check if there is a conflict with message types to be
> registered
> by this service
> @param service: the service messages to check for
> conflicts
> @type service: C{str}
> @return: the conflicting service type or None if no
> conflict is found
> """
> for mt in msgTypes:
> if(self.factory._services.has_key(mt)):
> return mt
> return None
> print "Registering service : ", str(service)
> try:
> #
> # Check if another service already registered a message
> type that
> # the current service is trying to register
> #
> conflict = foundConflict(self, service.acceptedMsgs)
> if(conflict != None):
> raise ServiceMessageConflictError(conflict)
> #
> # Regrister the message types and this service
> #
> for msgType in service.acceptedMsgs:
> self.factory._services[msgType] = service
> except ServiceMessageConflictError, reason:
> status =
> utils.createConfirmationMsgBody(constants._MSG_FAILURE_TYPE,
> self.__lastMsgId,
> str(reason))
> else:
> status =
> utils.createConfirmationMsgBody(constants._MSG_SUCCESS_TYPE,
> self.__lastMsgId)
> #
> # Send registeration confirmation (succeeded or failed)
> #
> msgRoot = utils.createMessage(constants._REG_MSG_CONFIRM_TYPE,
> constants._CONF_MSG_ID,
> constants._DAEMON_SERVICE_NAME,
> constants._MSG_SPEC_VERSION,
> constants._STATUS_DATA_TYPE, status)
> print "Sending confirmation message to %s : %s" %
> (self.transport.getPeer().host, msgRoot.toXml())
> #self.send(msgRoot)
> utils.sendMessage(self.transport.getPeer().host, service.port,
> msgRoot)
> if(__name__ == "__main__"):
>
> reactor.listenTCP(4321, MdfXmlStreamServerFactory(Daemon))
> print "Listening for connections..."
> reactor.run()
>
> ----------------------------------------"Service"-----------------------------------------------------------------------------------------------------
>
>
> class MdfXmlStreamClientFactory(MdfXmlStreamServerFactory):
> """
> The factory class used by the services to create
> protocol instances
> @attention: this class might dissapear, I have to see if it's
> useful to keep it or not
> """
> __daemonAddrs = None
> __daemonPort = None
> def __init__(self, proto, *args, **kwargs):
> """
> Constructor
> @param proto: the protocol to use
> @type proto: a subclass of
> L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
> @param args: misc args
> @type args: C{tuple}
> @param kwargs: misc keyword args
> @type kwargs: C{dict}
> """
> MdfXmlStreamServerFactory.__init__(self, proto, *args, **kwargs)
> self._serviceInfo = ServiceReg(kwargs["name"],
> kwargs["version"], list(kwargs.get("messageTypes", [])))
> MdfXmlStreamClientFactory.__daemonAddrs = kwargs["address"]
> MdfXmlStreamClientFactory.__daemonPort = kwargs["port"]
> def register(self, port):
> """
> Register the service
> @param port: the service's port
> @type port: C{int}
> """
> self._serviceInfo.port = port
> msgBodyData = utils.createRegMsgBody(self._serviceInfo.name,
> self._serviceInfo.version,
> str(self._serviceInfo.port),
>
> self._serviceInfo.acceptedMsgs)
> msgRoot = utils.createMessage(constants._REG_MSG_TYPE,
> constants._REG_MSG_ID,
> self._serviceInfo.name,
> constants._MSG_SPEC_VERSION,
> constants._REG_DATA_TYPE,
> msgBodyData)
> utils.sendMessage(MdfXmlStreamClientFactory.__daemonAddrs,
> MdfXmlStreamClientFactory.__daemonPort,
> msgRoot)
>
> class BaseService(xmlstream.XmlStream):
> """
> The service is the implementation of a microkernel type inter-service
> communication (ISC) endpoint. Here is how it works :
> - Services announce their presence to the daemon by giving
> their name,
> version, ip, port and a list of message-types that they accept
> - The daemon listens for messages from the attached services,
> when one
> is received, it routes the message to the correct service
> @todo: add unique id generation/verification
> """
> def __init__(self, *args, **kwargs):
> """
> Constructor
> @param args: non-keyword args
> @type args: C{tuple}
> @param kwargs: keyword args
> @type kwargs: C{dict}
> """
> xmlstream.XmlStream.__init__(self)
> self._msgSrc = None
> self._msgDest = None
> self._msgBodyData = None
> self._registered = False
> def _onHeader(self, element):
> """
> Analyse a header and save the source and destination
> @param element: the header element (XML)
> @type element: L{Element<twisted.words.xish.domish.Element>}
> @todo: add msg spec version verification
> @todo: add id verification???
> """
> print "got header from %s:%s : %s" %
> (str(self.transport.getPeer().host),
> str(self.transport.getPeer().port), element.toXml())
> self._msgSrc =
> xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("source")
>
> self._msgDest =
> xpath.XPathQuery("/header").queryForNodes(element)[0].getAttribute("destination")
>
> def _onBody(self, element):
> """
> Get the body act accordingly
> @param element: the body element (XML)
> @type element: L{Element<twisted.words.xish.domish.Element>}
> @todo: add data type verification
> @todo: call data action callbacks
> """
> self._msgBodyData =
> xpath.XPathQuery("/body").queryForNodes(element)[0].toXml()
> print "_onbody : ", self._msgBodyData
> def _onConfirm(self, element):
> """
> Get the confirmation message and act accordingly
> @param element: the confirmation element (XML)
> @type element: L{Element<twisted.words.xish.domish.Element>}
> @todo: add data type verification
> @todo: call data action callbacks
> """
> status =
> str(xpath.XPathQuery("/body/conf/status").queryForNodes(element)[0])
> id =
> int(str(xpath.XPathQuery("/body/conf/id").queryForNodes(element)[0]))
> msg =
> str(xpath.XPathQuery("/body/conf/msg").queryForNodes(element)[0])
> if(id == constants._REG_MSG_ID):
> if(status == constants._MSG_SUCCESS_TYPE):
> self._registered = True
> def connected(self, xs):
> """
> Called when a client connects using an XML stream
> @param xs: the current xml stream
> @type xs: L{XmlStream<twisted.words.xish.xmlstream.XmlStream>}
> @todo: add data action callbacks
> """
> print 'Connection from %s:%s!' %
> (str(self.transport.getPeer().host), str(self.transport.getPeer().port))
> xs.addObserver("/header", self._onHeader)
> xs.addObserver("/body/conf", self._onConfirm)
> xs.addObserver("/body", self._onBody)
>
> def start(daemonAddrs, daemonPort, serviceRef, serviceName,
> serviceVersion, serviceMessageTypes):
> """
> Start the daemon
> @param daemonPort: the port the daemon listens on
> @type daemonPort: C{int}
> @param serviceRef: a reference to the service's class
> @type serviceRef: a subclass of L{BaseService<service.BaseService>}
> @param serviceName: the service's name
> @type serviceName: C{str}
> @param serviceVersion: the service's version
> @type serviceVersion: C{str}
> @param serviceMessageTypes: the list of messages the service registers
> @type serviceMessageTypes: C{str list}
> """
> f = MdfXmlStreamClientFactory(serviceRef, address=daemonAddrs,
> port=daemonPort, name=serviceName, version=serviceVersion,
> messageTypes=serviceMessageTypes)
> port = reactor.listenTCP(0, f).getHost().port
> f.register(port)
> print port
> reactor.run()
>
> if(__name__ == "__main__"):
>
> start("localhost", 4321, BaseService, "service_base", "1.0", ["all"])
>>> Daemon listening for connections...
>>>
>>> daemon proto instance 1
>>> Connection from 127.0.0.1:57821! <-- ok, Client1
>>> ....
>>> daemon proto instance 2
>>> Connection from 127.0.0.1:57821! <-- Client1 again????? why?
>>> Connection from 127.0.0.1:57823! <-- ok, Client2
>>> ....
>>> daemon proto instance 3
>>> Connection from 127.0.0.1:57821! <-- ok, Client1 again????? why?
>>> Connection from 127.0.0.1:57823! <-- ok, Client2 again????? why?
>>> Connection from 127.0.0.1:57824! <-- ok, Client3
>>>
>>> Oh, and by the time I finished writing this email, I've switched to
>>> solution 2, but I still get the behavior above.
>>>
>>>
>>>
>>> _______________________________________________
>>> Twisted-Python mailing list
>>> Twisted-Python at twistedmatrix.com
>>> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>>>
>>>
>>
>>
>>
>>
> Thank you,
> Gabriel
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>
More information about the Twisted-Python
mailing list