[Twisted-Python] Synchronization techniques
Daniel Miller
daniel at keystonewood.com
Mon Apr 2 08:39:39 MDT 2007
Hello fellow twisters,
I am developing a twisted application on Mac OS X (using Perspective
Broker) in which the server component runs shell scripts to post
orders to an accounting system. The accounting system is developed
and maintained by a third party vendor, and is therefore beyond my
control. For a given command the scripts must be executed in order,
and all commands that affect the accounting system must be
synchronized. For example:
class AccountingService(pb.Root):
def remote_post_order(self, data):
self.lock.acquire()
try:
# put data in 'trans.dat' file
# ...
process = TwistedProcess()
process.check_call(["load_dat.sh", "trans.dat"])
process.check_call(["post_trx.sh"])
finally:
self.lock.release()
The process.check_call() method has (nearly) the same semantics as
the check_call() method in Python's built-in subprocess module, and
the lock has (nearly) the same semantics as Python's built-in
threading.Lock class. Unfortunately, I cannot use those built-in
methods/classes because they do not play nice with Twisted--I get
'Interrupted system call' errors from subprocess.check_call(), and (I
think) my server could deadlock if I used threading.Lock. So I
developed a few helper classes to hack around these Twisted mis-
features (questions follow the code):
from Queue import Queue, Empty
from twisted.internet.error import ProcessTerminated
from twisted.internet.protocol import ProcessProtocol
class ProcessError(Exception): pass
class ProcessErrorHandler(ProcessProtocol):
def __init__(self):
self.errbuf = []
self.errors = []
self.waiting = True
def errReceived(self, text):
self.errbuf.append(text)
def processEnded(self, status):
if self.errbuf:
self.errors.append("".join(self.errbuf))
if isinstance(status.value, ProcessTerminated):
self.errors.append(status.getErrorMessage())
self.waiting = False
def hasError(self):
return bool(self.errors)
def raiseError(self):
raise ProcessError("\n".join(self.errors))
class TwistedProcess(object):
def __init__(self, reactor=None):
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
def check_call(self, cmd):
status = ProcessErrorHandler()
self.reactor.spawnProcess(status, cmd[0], cmd, env=os.environ)
while status.waiting:
self.reactor.iterate(0.05)
if status.hasError():
status.raiseError()
class TwistedLock(object):
"""A lock object for Twisted
The lock is instantiated in a released state.
"""
def __init__(self, reactor=None):
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
self.queue = Queue(1)
self.release()
def acquire(self):
"""Acquire the lock
This method returns immediately when the lock is acquired.
It "blocks"
(i.e. iterates the reactor) indefinitely while it waits for
the lock to
be released.
"""
while True:
try:
self.queue.get_nowait()
except Empty:
# pseudo block (iterate the reactor) while waiting
for the lock to be released
self.reactor.iterate(0.05)
else:
break # the lock has been acquired
def release(self):
"""Release the lock
Raises Queue.Full if the lock was not acquired exactly once
since it was
last released.
"""
self.queue.put_nowait("TOKEN")
Questions:
1. Is reactor.iterate() meant to be used this way (see
TwistedLock.acquire() and TwistedProcess.check_call())? If not, what
is the right way to do those things? I've read everything I can find
on reactor.iterate() and while it seems to be the only way to do what
I need to do, its use is often strongly discouraged. Why is it so
difficult for twisted to handle non-async code?
2. Is my suspicion that the server could deadlock if I used
threading.Lock correct? Note: the server is not multi-threaded
(unless twisted is doing something with threads that I am not aware of).
Example scenario:
req 1: acquire lock
req 1: spawn sub-process
req 1: iterate reactor while waiting for process to complete
req 2: acquire lock (block and wait)
SERVER IS DEAD-LOCKED ?
3. Is there a better way to do what I'm trying to do here? (getting a
new accounting system is not an option :)
~ Daniel
More information about the Twisted-Python
mailing list