[Twisted-Python] UDP servers and socket queues
Phil Mayers
p.mayers at imperial.ac.uk
Mon Apr 21 08:26:57 MDT 2003
All,
I'm porting an app over which had it's own async job thingy, similar to
Twisted's most basics - I now want it to run as a long-running server
with multiprotocol support, and Twisted is the logical choice.
The core of the app is a UDP-based RPC client (bespoke stuff) which does
a periodic flurry of activity (plus on-demand stuff - that's the plan
for later anyway).
The problem is that the flurry of activity fills the UDP socket queue,
and I think some simple changes to SelectReactor (which I've done
locally via a nasty/messy subclass) which could be useful to others will
fix it:
from twisted.internet import reactor
from twisted.internet import client
class RPCProtocol(protocol.DatagramProtocol):
def __init__(self);
# setup some icky stuff for retries etc here
def datagramReceived(self, data, addr):
pdu = self._decode(data)
dfurred = self.outstanding[pdu.uuid]
del self.outstanding[pdu.uuid]
dfurred.callback(pdu)
def sendPDU(pdu, addr):
self.outstanding[pdu.uuid] = defer.Deferred()
self.writeMessage(pdu.encode(), addr)
return self.outstanding[pdu.uuid]
class endpoint:
def __init__(self, protcol, host):
self.protocol = protocol
self.addr = resolve(host)
self.actions = load_from_config(host)
def go(self):
pdu = self.create_pdu(self.actions.pop(0))
self.protocol.sendPDU(pdu, self.addr).addCallback(self.handle)
def handle(self, pdu):
func = self.actions.pop(0)
func(pdu)
pdu = self.create_pdu(self.actions.pop(0))
self.protocol.sendPDU(pdu, self.addr).addCallback(self.handle)
Anyway, you get the idea. The code fires up, and you get:
proto = RPCProtocol()
for host in hosts:
ep = endpoint(proto, host)
ep.go()
...the problem is that the responses come back too fast - they overflow
the UDP socket buffer after a couple of hundred packets. So, try this:
proto = RPCProtocol()
for host in hosts:
ep = endpoint(proto, host)
reactor.callLater(0, ep.go)
...except that the default SelectReactor will always execute any
pendingTimedCalls before calling doIteration (where the select() call
actually happens).
Ok, so inherit from SelectReactor and override mainLoop - except that
doIteration will only execute one doRead for each ready socket (there's
one socket - there are potentially thousands of hosts and I don't want
to overload either select()'s performance characteristics with the
number of fd's or the fd limit).
My old code did this:
while running:
while 1:
rs, ws, xs = select.select([self.fd,], [], [], self.timeout())
if self.fd in rs:
self.doRead()
else:
break
for p in self.pending:
p()
...i.e. empty the socket queue before doing *any* callback activity. I
can do this with a suitable reactor - it's just messy. Is it worthwhile
a flag in the core SelectReactor to change the preferences for
network/callback behaviour and the rate of network activity?
Just a thought.
Twisted is excellent work - keep it up!
--
Regards,
Phil
+------------------------------------------+
| Phil Mayers |
| Network & Infrastructure Group |
| Information & Communication Technologies |
| Imperial College |
+------------------------------------------+
More information about the Twisted-Python
mailing list