[Twisted-Python] Deferred style question.

Anthony Baxter anthony at interlink.com.au
Fri Jan 7 06:51:18 EST 2005


On Friday 07 January 2005 21:24, Bob Ippolito wrote:
> It's funny that you are using a decorator, but not the __contains__
> operator ;)

Pah! Here's a final version for now, fixed to work for methods and 
also supporting errbacks, as well as kwargs and unhashable arguments. 
The previous version didn't work for methods, because I was trying to be 
too clever. 

This is checked into shtoom (on a branch at the moment, since that's where 
I need it). If people like, I can check it into the twisted sandbox somewhere 
as well...

If it's considered a good idea, I'm happy for this to go into twisted proper.
Now that I've got it, there's going to be a whole pile of places I can use
this.

-- 
Anthony Baxter     <anthony at interlink.com.au>
It's never too late to have a happy childhood.
-------------- next part --------------
from twisted.internet import defer

class _DeferredCache:
    """ Wraps a call that returns a deferred in a cache. Any subsequent
        calls with the same argument will wait for the first call to 
        finish and return the same result (or errback).
    """

    allowUnhashableArgs = True

    def __init__(self, op, allowUnhashableArgs=None):
        self.op = op
        self.cache = {}
        if allowUnhashableArgs is not None:
            self.allowUnhashableArgs = allowUnhashableArgs

    def cb_triggerUserCallback(self, res, deferred):
        deferred.callback(res)
        return res

    def cb_triggerUserErrback(self, failure, deferred):
        deferred.errback(res)
        return res

    def _genCache(self, args, kwargs):
        # This could be better, probably
        try:
            arghash = hash(args)
        except TypeError:
            return None
        kwit = kwargs.items()
        kwit.sort()
        try:
            kwhash = hash(tuple(kwit))
        except TypeError:
            return None
        return (arghash, kwhash)

    def call(self, *args, **kwargs):
        # Currently not in progress - start it
        cacheVal = self._genCache(args, kwargs)
        if cacheVal is None and not self.allowUnhashableArgs:
            raise TypeError('DeferredCache(%s) arguments must be hashable'%(
                                self.op.func_name))

        opdef = self.cache.get(cacheVal)
        if not opdef:
            # XXX check it returns a deferred?
            opdef = self.op(*args, **kwargs)

            if cacheVal is not None:
                self.cache[cacheVal] = opdef

        userdef = defer.Deferred()
        opdef.addCallbacks(lambda x: self.cb_triggerUserCallback(x, userdef),
                           lambda x: self.cb_triggerUserErrback(x, userdef))
        return userdef

class _DeferredCacheHashableArgs(_DeferredCache):
    allowUnhashableArgs = False

def DeferredCache(op, allowUnhashableArgs=None):
    c = _DeferredCache(op, allowUnhashableArgs)
    def func(*args, **kwargs):
        return c.call(*args, **kwargs)
    return func

def DeferredCacheHashableArgs(op, allowUnhashableArgs=None):
    c = _DeferredCacheHashableArgs(op, allowUnhashableArgs)
    def func(*args, **kwargs):
        return c.call(*args, **kwargs)
    return func

-------------- next part --------------
# Copyright (C) 2004 Anthony Baxter

"""Tests for shtoom.defcache.
"""

from twisted.trial import unittest
from shtoom.defcache import DeferredCache, DeferredCacheHashableArgs
from twisted.internet import defer, reactor

class TestBlobby:
    def __init__(self):
        self.calls = []

    def _operation(self, *args, **kwargs):
        # Stub pointless operation - returns the first value passed, 
        # after a small delay
        self.calls.append((args,kwargs))
        opdef = defer.Deferred()
        reactor.callLater(0.2, lambda :opdef.callback(args[0]))
        return opdef
    operation = DeferredCache(_operation)
    operation2 = DeferredCacheHashableArgs(_operation)

class Saver:
    def __init__(self):
        self.val = None

    def save(self, arg):
        self.val = arg


class DefcacheTests(unittest.TestCase):

    def test_defcache(self):
        from twisted.trial import util
        ae = self.assertEquals
        ar = self.assertRaises

        t = TestBlobby()
        s = Saver()
        d = t.operation('foo')
        d.addCallback(s.save)
        util.wait(d)
        ae(s.val, 'foo')

        t = TestBlobby()
        s1 = Saver()
        s2 = Saver()
        d1 = t.operation('foo')
        d2 = t.operation('foo')
        d1.addCallback(s1.save)
        d2.addCallback(s2.save)
        util.wait(d1)
        util.wait(d2)
        ae(s1.val, 'foo')
        ae(s2.val, 'foo')
        # Check it was only called once, as expected
        ae(t.calls, [(('foo',),{})])

        # Check for the cache of completed calls
        d3 = t.operation('foo')
        s3 = Saver()
        d3.addCallback(s3.save)
        util.wait(d3)
        ae(s3.val, 'foo')
        ae(t.calls, [(('foo',),{})])
        
        # Now test kwargs
        t = TestBlobby()
        s1 = Saver()
        s2 = Saver()
        s3 = Saver()
        d1 = t.operation('foo', kw=True)
        d2 = t.operation('foo', kw=True)
        d3 = t.operation('foo', kw=False)
        d1.addCallback(s1.save)
        d2.addCallback(s2.save)
        d3.addCallback(s3.save)
        util.wait(d1)
        util.wait(d2)
        util.wait(d3)
        ae(s1.val, 'foo')
        ae(s2.val, 'foo')
        ae(s3.val, 'foo')
        # Check it was called twice only
        ae(len(t.calls), 2)

        t = TestBlobby()
        ar(TypeError, t.operation2, 'foo', {})

        s1 = Saver()
        s2 = Saver()
        d1 = t.operation2('foo', kw=True)
        d2 = t.operation('foo', kw=True)
        d1.addCallback(s1.save)
        d2.addCallback(s2.save)
        util.wait(d1)
        util.wait(d2)
        ae(s1.val, 'foo')
        ae(s2.val, 'foo')
        # Check it was called twice only
        ae(len(t.calls), 2)

        t = TestBlobby()

        s1 = Saver()
        s2 = Saver()
        d1 = t.operation('foo', kw={})
        d2 = t.operation('foo', kw={})
        d1.addCallback(s1.save)
        d2.addCallback(s2.save)
        util.wait(d1)
        util.wait(d2)
        ae(s1.val, 'foo')
        ae(s2.val, 'foo')
        # Check it was called both times
        ae(len(t.calls), 2)





More information about the Twisted-Python mailing list