[Twisted-Python] PB waitingForAnswers KeyError

William Waites ww at groovy.net
Fri Jul 22 08:56:47 MDT 2005


This is with the Twisted Sumo tar, with threadedselectreactor dropped in:

          File "/usr/lib/python2.3/site-packages/twisted/internet/threadedselectreactor.py", line 252, in _doReadOrWrite
            why = getattr(selectable, method)()
          File "/usr/lib/python2.3/site-packages/twisted/internet/tcp.py", line 351, in doRead
            return self.protocol.dataReceived(data)
          File "/usr/lib/python2.3/site-packages/twisted/spread/banana.py", line 184, in dataReceived
            gotItem(item)
          File "/usr/lib/python2.3/site-packages/twisted/spread/banana.py", line 114, in gotItem
            self.callExpressionReceived(item)
          File "/usr/lib/python2.3/site-packages/twisted/spread/banana.py", line 81, in callExpressionReceived
            self.expressionReceived(obj)
          File "/usr/lib/python2.3/site-packages/twisted/spread/pb.py", line 561, in expressionReceived
            method(*sexp[1:])
          File "/usr/lib/python2.3/site-packages/twisted/spread/pb.py", line 919, in proto_answer
            d = self.waitingForAnswers[requestID]
        exceptions.KeyError: 19

What am I doing to make this happen? I am using a class derived from the threadedselect
blocking demo. This implementation is attached, and has a blocking run() method and
a blocking getDeferred() method. It takes interleaved' callbacks and puts them in a
queue. The run() method takes them out of the queue in a blocking way and calls them.
getDeferred() makes a threading.Event(), causes a CB to be queued, and then waits
on the event to return the result. This way there can be many threads using the blocking
getDeferred(), and many threads that just use the normal twisted stuff.

This is running inside Asterisk and implements a PB client. This error appears under
load (30-40 PB calls per second, 10-20 threads using getDeferred()).

The server works just fine, and I have verified with a non-threadedselectreactor dummy
client. A single threaded client is able to get several hundred PB calls per second
through, and running more than one single threaded client works as expected.

Looking at the spread/pb.py, I can't see how this KeyError should happen. 
waitingForAnswers[requestID] always gets set before the remote call, so I don't see
a race condition where we could get a response before the dictionary is populated.

Help?

-w
-------------- next part --------------
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.

from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
from twisted.internet import reactor
from twisted.python.runtime import seconds
from itertools import count
from Queue import Queue, Empty
from threading import Event

class TwistedManager(object):
    def __init__(self):
        self.twistedQueue = Queue()
        self.key = count()
        self.results = {}
        self.events = {}
        self.running = False
        
    def getKey(self):
        # get a unique identifier
        return self.key.next()

    def run(self):
        # start the reactor
        if self.running:
            return
        self.running = True

        reactor.interleave(self.twistedQueue.put)
        
        while True:
            callback = self.twistedQueue.get()
            callback()
            if not self.running and self.twistedQueue.empty():
                return
            
    def _stopIterating(self, value, key):
        e = self.events[key]
        del self.events[key]
        self.results[key] = value
        e.set()
            
    def stop(self):
        # stop the reactor
        key = self.getKey()
        e = Event()
        self.events[key] = e
        reactor.addSystemEventTrigger('after', 'shutdown',
            self._stopIterating, True, key)
        self.running = False
        reactor.stop()
        e.wait()
            
    def getDeferred(self, d):
        # get the result of a deferred or raise if it failed
        key = self.getKey()
        e = Event()
        self.events[key] = e
        d.addBoth(self._stopIterating, key)
        e.wait()
        res = self.results.pop(key)
        if isinstance(res, Failure):
            res.raiseException()
        return res


More information about the Twisted-Python mailing list