[Twisted-Python] Synchronization techniques

Daniel Miller daniel at keystonewood.com
Mon Apr 2 08:39:39 MDT 2007


Hello fellow twisters,

I am developing a twisted application on Mac OS X (using Perspective  
Broker) in which the server component runs shell scripts to post  
orders to an accounting system. The accounting system is developed  
and maintained by a third party vendor, and is therefore beyond my  
control. For a given command the scripts must be executed in order,  
and all commands that affect the accounting system must be  
synchronized. For example:


class AccountingService(pb.Root):

     def remote_post_order(self, data):
         self.lock.acquire()
         try:
             # put data in 'trans.dat' file
             # ...

             process = TwistedProcess()
             process.check_call(["load_dat.sh", "trans.dat"])
             process.check_call(["post_trx.sh"])
         finally:
             self.lock.release()


The process.check_call() method has (nearly) the same semantics as  
the check_call() method in Python's built-in subprocess module, and  
the lock has (nearly) the same semantics as Python's built-in  
threading.Lock class. Unfortunately, I cannot use those built-in  
methods/classes because they do not play nice with Twisted--I get  
'Interrupted system call' errors from subprocess.check_call(), and (I  
think) my server could deadlock if I used threading.Lock. So I  
developed a few helper classes to hack around these Twisted mis- 
features (questions follow the code):


from Queue import Queue, Empty
from twisted.internet.error import ProcessTerminated
from twisted.internet.protocol import ProcessProtocol

class ProcessError(Exception): pass


class ProcessErrorHandler(ProcessProtocol):

     def __init__(self):
         self.errbuf = []
         self.errors = []
         self.waiting = True

     def errReceived(self, text):
         self.errbuf.append(text)

     def processEnded(self, status):
         if self.errbuf:
             self.errors.append("".join(self.errbuf))
         if isinstance(status.value, ProcessTerminated):
             self.errors.append(status.getErrorMessage())
         self.waiting = False

     def hasError(self):
         return bool(self.errors)

     def raiseError(self):
         raise ProcessError("\n".join(self.errors))


class TwistedProcess(object):

     def __init__(self, reactor=None):
         if reactor is None:
             from twisted.internet import reactor
         self.reactor = reactor

     def check_call(self, cmd):
         status = ProcessErrorHandler()
         self.reactor.spawnProcess(status, cmd[0], cmd, env=os.environ)
         while status.waiting:
             self.reactor.iterate(0.05)
         if status.hasError():
             status.raiseError()


class TwistedLock(object):
     """A lock object for Twisted

     The lock is instantiated in a released state.
     """

     def __init__(self, reactor=None):
         if reactor is None:
             from twisted.internet import reactor
         self.reactor = reactor
         self.queue = Queue(1)
         self.release()

     def acquire(self):
         """Acquire the lock

         This method returns immediately when the lock is acquired.  
It "blocks"
         (i.e. iterates the reactor) indefinitely while it waits for  
the lock to
         be released.
         """
         while True:
             try:
                 self.queue.get_nowait()
             except Empty:
                 # pseudo block (iterate the reactor) while waiting  
for the lock to be released
                 self.reactor.iterate(0.05)
             else:
                 break # the lock has been acquired

     def release(self):
         """Release the lock

         Raises Queue.Full if the lock was not acquired exactly once  
since it was
         last released.
         """
         self.queue.put_nowait("TOKEN")


Questions:
1. Is reactor.iterate() meant to be used this way (see  
TwistedLock.acquire() and TwistedProcess.check_call())? If not, what  
is the right way to do those things? I've read everything I can find  
on reactor.iterate() and while it seems to be the only way to do what  
I need to do, its use is often strongly discouraged. Why is it so  
difficult for twisted to handle non-async code?

2. Is my suspicion that the server could deadlock if I used  
threading.Lock correct? Note: the server is not multi-threaded  
(unless twisted is doing something with threads that I am not aware of).
Example scenario:
     req 1: acquire lock
     req 1: spawn sub-process
     req 1: iterate reactor while waiting for process to complete
     req 2: acquire lock (block and wait)
     SERVER IS DEAD-LOCKED ?

3. Is there a better way to do what I'm trying to do here? (getting a  
new accounting system is not an option :)

~ Daniel






More information about the Twisted-Python mailing list