[Twisted-Python] UDP servers and socket queues

Phil Mayers p.mayers at imperial.ac.uk
Mon Apr 21 08:26:57 MDT 2003


All,

I'm porting an app over which had it's own async job thingy, similar to
Twisted's most basics - I now want it to run as a long-running server
with multiprotocol support, and Twisted is the logical choice.

The core of the app is a UDP-based RPC client (bespoke stuff) which does
a periodic flurry of activity (plus on-demand stuff - that's the plan
for later anyway).

The problem is that the flurry of activity fills the UDP socket queue,
and I think some simple changes to SelectReactor (which I've done
locally via a nasty/messy subclass) which could be useful to others will
fix it:

from twisted.internet import reactor
from twisted.internet import client

class RPCProtocol(protocol.DatagramProtocol):
    def __init__(self);
        # setup some icky stuff for retries etc here

    def datagramReceived(self, data, addr):
        pdu = self._decode(data)
        dfurred = self.outstanding[pdu.uuid]
        del self.outstanding[pdu.uuid]
        dfurred.callback(pdu)

    def sendPDU(pdu, addr):
        self.outstanding[pdu.uuid] = defer.Deferred()
        self.writeMessage(pdu.encode(), addr)
        return self.outstanding[pdu.uuid]

class endpoint:
    def __init__(self, protcol, host):
        self.protocol = protocol
        self.addr = resolve(host)
        self.actions = load_from_config(host)

    def go(self):
        pdu = self.create_pdu(self.actions.pop(0))
        self.protocol.sendPDU(pdu, self.addr).addCallback(self.handle)

    def handle(self, pdu):
        func = self.actions.pop(0)
        func(pdu)
        pdu = self.create_pdu(self.actions.pop(0))
        self.protocol.sendPDU(pdu, self.addr).addCallback(self.handle)
        
Anyway, you get the idea. The code fires up, and you get:

proto = RPCProtocol()
for host in hosts:
    ep = endpoint(proto, host)
    ep.go()

...the problem is that the responses come back too fast - they overflow
the UDP socket buffer after a couple of hundred packets. So, try this:

proto = RPCProtocol()
for host in hosts:
    ep = endpoint(proto, host)
    reactor.callLater(0, ep.go)

...except that the default SelectReactor will always execute any
pendingTimedCalls before calling doIteration (where the select() call
actually happens).

Ok, so inherit from SelectReactor and override mainLoop - except that
doIteration will only execute one doRead for each ready socket (there's
one socket - there are potentially thousands of hosts and I don't want
to overload either select()'s performance characteristics with the
number of fd's or the fd limit).

My old code did this:

while running:
    while 1:
        rs, ws, xs = select.select([self.fd,], [], [], self.timeout())
        if self.fd in rs:
            self.doRead()
        else:
            break
    for p in self.pending:
        p()

...i.e. empty the socket queue before doing *any* callback activity. I
can do this with a suitable reactor - it's just messy. Is it worthwhile
a flag in the core SelectReactor to change the preferences for
network/callback behaviour and the rate of network activity?

Just a thought.

Twisted is excellent work - keep it up!

-- 

Regards,
Phil

+------------------------------------------+
| Phil Mayers                              |
| Network & Infrastructure Group           |
| Information & Communication Technologies |
| Imperial College                         |
+------------------------------------------+




More information about the Twisted-Python mailing list