[Twisted-Python] Lock class using Deferreds

Andy Gayton andy at thecablelounge.com
Sun Mar 7 00:36:28 MST 2004


Christopher Armstrong wrote:

>> dnsRequest_Semaphore = Semaphore(20)
>> for (sink, input) in source :
>>     dnsRequest_Semaphore.run( request, input ).addCallback( sink )

> OTOH, my 
> thing didn't call .next() on the source until its result was actually 
> required, but this snippet is obviously queuing up the (potentially 
> thousands) of operations in the Semaphore.. Probably doesn't actually 
> matter.

It might matter - if .next took even a small amount of time (say 
fetching jobs out of a text file) - then my for loop would block for a 
bit which is against the rules .. maybe these are different tasks after 
all ..

It's a nice exercise for learning defers though :)

The problem with my for loop is its effectively copying the queue stored
in source and putting it in Semaphore upfront, instead of as needed.

Perhaps something which is a cross between Lock and DeferredQueue would 
be better - if there is a queue of known work to be processed, leave it 
as an external queue instead of copying it ...

Ok - a sunday afternoon spent with the defers howto later - I came up 
with this - which only does what your original does Chris but allows you 
to add new jobs on the fly and is 5 times longer in code :) .. I was a 
bit embarrassed by that so added the ability to prioritise jobs in the 
queue to justify the whole experience ;)

--------------
from twisted.internet import reactor, defer

class Semaphore:
     """A semaphore for event driven systems."""

     """This hasn't changed from the earlier email."""

     def __init__(self, tokens):
         self.waiting = []
         self.tokens  = tokens
         self.limit   = tokens

     def acquire(self):
         """Attempt to acquire the token.

         @return Deferred which returns on token acquisition.
         """
         assert self.tokens >= 0
         d = defer.Deferred()
         if not self.tokens:
             self.waiting.append(d)
         else:
             self.tokens = self.tokens - 1
             d.callback(self)
         return d

     def release(self):
         """Release the token.

         Should be called by whoever did the acquire() when the shared
         resource is free.
         """
         assert self.tokens < self.limit
         self.tokens = self.tokens + 1
         if self.waiting:
             # someone is waiting to acquire token
             self.tokens = self.tokens - 1
             d = self.waiting.pop(0)
             d.callback(self)

     def _releaseAndReturn(self, r):
         self.release()
         return r

     def run(self, f, *args, **kwargs):
         """Acquire token, run function, release token.

         @return Deferred of function result.
         """
         d = self.acquire()
         d.addCallback(lambda r: defer.maybeDeferred(f, *args, 
**kwargs).addBoth(self._releaseAndReturn))
         return d


class DeferredPriorityQueue:
     """An event driven priority queue"""

     def __init__( self, max_concurrent ) :
         self.semaphore = Semaphore( max_concurrent )
         self.queue = {}
         self.processing = False

     def next(self) :
         """
         @return the next job in the queue
         raises StopIteration when out of jobs
         """
         if not len( self.queue ) : raise StopIteration

         highest_priority = min( self.queue.keys() )

         try:
             return self.queue[highest_priority][0].next()
         except StopIteration :
             del self.queue[highest_priority][0]
             if not len( self.queue[highest_priority] ) : del 
self.queue[highest_priority]
             return self.next()

     def _cbTokenAcquired(self, semaphore, job) :
         """Runs queued up job once a token can be acquired
         Sets the job to release the token when completed and calls the 
jobs callback
         Then continues processing the queue
         """
         (f, args, kwargs, cb) = job

         d = defer.maybeDeferred(f, *args, 
**kwargs).addBoth(self.semaphore._releaseAndReturn)
         d.addCallback(cb)

         self._processQueue()

     def _processQueue(self):
         """Grabs the next job in queue if available and then waits for 
a token"""
         try:
             job = self.next()
         except StopIteration :
             self.processing = False
             return

         self.semaphore.acquire().addCallback(self._cbTokenAcquired, job)

     def _startQueue(self) :
         """Start queue processing if its not already started"""
         if not self.processing:
             self.processing = True
             self._processQueue()

     def run( self, f, cb, *args, **kwargs ) :
         """Queue up a single job"""

         # not happy with this bit at all, but can't
         # think of another way to do it without making
         # it compulsory to pass in priority

         if kwargs.has_key("priority") :
             priority = kwargs["priority"]
             del kwargs["priority"]
         else :
             priority = 5

         self.runQueue( iter([(f, args, kwargs, cb)]), priority )

     def runQueue(self, queue, priority=5):
         """Queue up a (potentially external) queue of jobs"""

         if not self.queue.has_key( priority ) :
             self.queue[priority] = []
         self.queue[priority].append( queue )

         self._startQueue()


#
# test things out
#

def triple(x):
     d = defer.Deferred()
     reactor.callLater(2, d.callback, x * 3)
     return d

def cbPrintResult( result ) :
     print result

def sources() :
     for i in range(8) :
         yield ( triple, [3], {}, cbPrintResult )


#
# sources can be run directly off a semaphore,
# but if it takes a while to retrieve jobs
# from the source
# then the following will block until all
# jobs are queued up
#
#s = Semaphore(2)
#for (f, args, kwargs, cb) in sources() :
#    s.run( f, *args, **kwargs ).addCallback( cb )


q = DeferredPriorityQueue(2)
q.runQueue( sources() )
q.run( triple, cbPrintResult, 4, priority=3 )

reactor.run()
------------

It might have its uses - perhaps to stop mass mailouts from holding up 
hadhoc mails ...

I'm mainly trying to learn though by tweaking with Itamar's and Chris's 
code - so feedback/criticism would be welcome.

Andy.




More information about the Twisted-Python mailing list