[Twisted-Python] Lock class using Deferreds
Andy Gayton
andy at thecablelounge.com
Sun Mar 7 00:36:28 MST 2004
Christopher Armstrong wrote:
>> dnsRequest_Semaphore = Semaphore(20)
>> for (sink, input) in source :
>> dnsRequest_Semaphore.run( request, input ).addCallback( sink )
> OTOH, my
> thing didn't call .next() on the source until its result was actually
> required, but this snippet is obviously queuing up the (potentially
> thousands) of operations in the Semaphore.. Probably doesn't actually
> matter.
It might matter - if .next took even a small amount of time (say
fetching jobs out of a text file) - then my for loop would block for a
bit which is against the rules .. maybe these are different tasks after
all ..
It's a nice exercise for learning defers though :)
The problem with my for loop is its effectively copying the queue stored
in source and putting it in Semaphore upfront, instead of as needed.
Perhaps something which is a cross between Lock and DeferredQueue would
be better - if there is a queue of known work to be processed, leave it
as an external queue instead of copying it ...
Ok - a sunday afternoon spent with the defers howto later - I came up
with this - which only does what your original does Chris but allows you
to add new jobs on the fly and is 5 times longer in code :) .. I was a
bit embarrassed by that so added the ability to prioritise jobs in the
queue to justify the whole experience ;)
--------------
from twisted.internet import reactor, defer
class Semaphore:
"""A semaphore for event driven systems."""
"""This hasn't changed from the earlier email."""
def __init__(self, tokens):
self.waiting = []
self.tokens = tokens
self.limit = tokens
def acquire(self):
"""Attempt to acquire the token.
@return Deferred which returns on token acquisition.
"""
assert self.tokens >= 0
d = defer.Deferred()
if not self.tokens:
self.waiting.append(d)
else:
self.tokens = self.tokens - 1
d.callback(self)
return d
def release(self):
"""Release the token.
Should be called by whoever did the acquire() when the shared
resource is free.
"""
assert self.tokens < self.limit
self.tokens = self.tokens + 1
if self.waiting:
# someone is waiting to acquire token
self.tokens = self.tokens - 1
d = self.waiting.pop(0)
d.callback(self)
def _releaseAndReturn(self, r):
self.release()
return r
def run(self, f, *args, **kwargs):
"""Acquire token, run function, release token.
@return Deferred of function result.
"""
d = self.acquire()
d.addCallback(lambda r: defer.maybeDeferred(f, *args,
**kwargs).addBoth(self._releaseAndReturn))
return d
class DeferredPriorityQueue:
"""An event driven priority queue"""
def __init__( self, max_concurrent ) :
self.semaphore = Semaphore( max_concurrent )
self.queue = {}
self.processing = False
def next(self) :
"""
@return the next job in the queue
raises StopIteration when out of jobs
"""
if not len( self.queue ) : raise StopIteration
highest_priority = min( self.queue.keys() )
try:
return self.queue[highest_priority][0].next()
except StopIteration :
del self.queue[highest_priority][0]
if not len( self.queue[highest_priority] ) : del
self.queue[highest_priority]
return self.next()
def _cbTokenAcquired(self, semaphore, job) :
"""Runs queued up job once a token can be acquired
Sets the job to release the token when completed and calls the
jobs callback
Then continues processing the queue
"""
(f, args, kwargs, cb) = job
d = defer.maybeDeferred(f, *args,
**kwargs).addBoth(self.semaphore._releaseAndReturn)
d.addCallback(cb)
self._processQueue()
def _processQueue(self):
"""Grabs the next job in queue if available and then waits for
a token"""
try:
job = self.next()
except StopIteration :
self.processing = False
return
self.semaphore.acquire().addCallback(self._cbTokenAcquired, job)
def _startQueue(self) :
"""Start queue processing if its not already started"""
if not self.processing:
self.processing = True
self._processQueue()
def run( self, f, cb, *args, **kwargs ) :
"""Queue up a single job"""
# not happy with this bit at all, but can't
# think of another way to do it without making
# it compulsory to pass in priority
if kwargs.has_key("priority") :
priority = kwargs["priority"]
del kwargs["priority"]
else :
priority = 5
self.runQueue( iter([(f, args, kwargs, cb)]), priority )
def runQueue(self, queue, priority=5):
"""Queue up a (potentially external) queue of jobs"""
if not self.queue.has_key( priority ) :
self.queue[priority] = []
self.queue[priority].append( queue )
self._startQueue()
#
# test things out
#
def triple(x):
d = defer.Deferred()
reactor.callLater(2, d.callback, x * 3)
return d
def cbPrintResult( result ) :
print result
def sources() :
for i in range(8) :
yield ( triple, [3], {}, cbPrintResult )
#
# sources can be run directly off a semaphore,
# but if it takes a while to retrieve jobs
# from the source
# then the following will block until all
# jobs are queued up
#
#s = Semaphore(2)
#for (f, args, kwargs, cb) in sources() :
# s.run( f, *args, **kwargs ).addCallback( cb )
q = DeferredPriorityQueue(2)
q.runQueue( sources() )
q.run( triple, cbPrintResult, 4, priority=3 )
reactor.run()
------------
It might have its uses - perhaps to stop mass mailouts from holding up
hadhoc mails ...
I'm mainly trying to learn though by tweaking with Itamar's and Chris's
code - so feedback/criticism would be welcome.
Andy.
More information about the Twisted-Python
mailing list