[Twisted-Python] PB waitingForAnswers KeyError
William Waites
ww at groovy.net
Fri Jul 22 08:56:47 MDT 2005
This is with the Twisted Sumo tar, with threadedselectreactor dropped in:
File "/usr/lib/python2.3/site-packages/twisted/internet/threadedselectreactor.py", line 252, in _doReadOrWrite
why = getattr(selectable, method)()
File "/usr/lib/python2.3/site-packages/twisted/internet/tcp.py", line 351, in doRead
return self.protocol.dataReceived(data)
File "/usr/lib/python2.3/site-packages/twisted/spread/banana.py", line 184, in dataReceived
gotItem(item)
File "/usr/lib/python2.3/site-packages/twisted/spread/banana.py", line 114, in gotItem
self.callExpressionReceived(item)
File "/usr/lib/python2.3/site-packages/twisted/spread/banana.py", line 81, in callExpressionReceived
self.expressionReceived(obj)
File "/usr/lib/python2.3/site-packages/twisted/spread/pb.py", line 561, in expressionReceived
method(*sexp[1:])
File "/usr/lib/python2.3/site-packages/twisted/spread/pb.py", line 919, in proto_answer
d = self.waitingForAnswers[requestID]
exceptions.KeyError: 19
What am I doing to make this happen? I am using a class derived from the threadedselect
blocking demo. This implementation is attached, and has a blocking run() method and
a blocking getDeferred() method. It takes interleaved' callbacks and puts them in a
queue. The run() method takes them out of the queue in a blocking way and calls them.
getDeferred() makes a threading.Event(), causes a CB to be queued, and then waits
on the event to return the result. This way there can be many threads using the blocking
getDeferred(), and many threads that just use the normal twisted stuff.
This is running inside Asterisk and implements a PB client. This error appears under
load (30-40 PB calls per second, 10-20 threads using getDeferred()).
The server works just fine, and I have verified with a non-threadedselectreactor dummy
client. A single threaded client is able to get several hundred PB calls per second
through, and running more than one single threaded client works as expected.
Looking at the spread/pb.py, I can't see how this KeyError should happen.
waitingForAnswers[requestID] always gets set before the remote call, so I don't see
a race condition where we could get a response before the dictionary is populated.
Help?
-w
-------------- next part --------------
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
from twisted.internet import reactor
from twisted.python.runtime import seconds
from itertools import count
from Queue import Queue, Empty
from threading import Event
class TwistedManager(object):
def __init__(self):
self.twistedQueue = Queue()
self.key = count()
self.results = {}
self.events = {}
self.running = False
def getKey(self):
# get a unique identifier
return self.key.next()
def run(self):
# start the reactor
if self.running:
return
self.running = True
reactor.interleave(self.twistedQueue.put)
while True:
callback = self.twistedQueue.get()
callback()
if not self.running and self.twistedQueue.empty():
return
def _stopIterating(self, value, key):
e = self.events[key]
del self.events[key]
self.results[key] = value
e.set()
def stop(self):
# stop the reactor
key = self.getKey()
e = Event()
self.events[key] = e
reactor.addSystemEventTrigger('after', 'shutdown',
self._stopIterating, True, key)
self.running = False
reactor.stop()
e.wait()
def getDeferred(self, d):
# get the result of a deferred or raise if it failed
key = self.getKey()
e = Event()
self.events[key] = e
d.addBoth(self._stopIterating, key)
e.wait()
res = self.results.pop(key)
if isinstance(res, Failure):
res.raiseException()
return res
More information about the Twisted-Python
mailing list