[Twisted-Python] inlineCallbacks cascading cancelling and more
Sergey Magafurov
smagafurov at naumen.ru
Mon Aug 16 02:34:35 MDT 2010
At first, sorry for my bad english
I will try to explain idea about Deferreds and inlineCallbacks
Wanted features:
1. Ability to delete callbacks (delCallbacks)
2. Automatic cancelling Deferreds if they are not needed any more (no
callbacks registered any more, all was deleted)
3. Ability to add/del hooks on deferred's finishing with errback or
callback (addFinalizer/delFinalizer)
4. Automatic call registered finalizers when deferred finished with
errback or callback (and cancel due to cancel calls errback)
With this features we can make cascading cancelling of inlineCallbacks
and cancelling full stack tree of inlineCallbacks when it is not needed
any more (some shutdown occurs for example at top level)
See full code for details (this is runnable script):
from twisted.internet import defer
from twisted.python.failure import Failure
from sys import exc_info
import warnings
class InlineCallbacksManager(object):
def __init__(self, *args, **kw):
self.deferred = defer.Deferred()
def send_result(self, g, result):
return g.send(result)
def throw_exception(self, g, result):
return result.throwExceptionIntoGenerator(g)
def stop_iteration(self):
self.deferred.callback(None)
return self.deferred
def return_value(self, value):
self.deferred.callback(value)
return self.deferred
def exception(self):
self.deferred.errback()
return self.deferred
def _inlineCallbacks(self, result, g):
"""
See L{inlineCallbacks}.
"""
# This function is complicated by the need to prevent unbounded
recursion
# arising from repeatedly yielding immediately ready
deferreds. This while
# loop and the waiting variable solve that by manually
unfolding the
# recursion.
waiting = [True, # waiting for result?
None] # result
while 1:
try:
# Send the last result back as the result of the yield
expression.
isFailure = isinstance(result, Failure)
if isFailure:
result = self.throw_exception(g, result)
else:
result = self.send_result(g, result)
except StopIteration:
# fell off the end, or "return" statement
return self.stop_iteration()
except defer._DefGen_Return, e:
# returnValue() was called; time to give a result to
the original
# Deferred. First though, let's try to identify the
potentially
# confusing situation which results when returnValue() is
# accidentally invoked from a different function, one
that wasn't
# decorated with @inlineCallbacks.
# The traceback starts in this frame (the one for
# _inlineCallbacks); the next one down should be the
application
# code.
appCodeTrace = exc_info()[2].tb_next.tb_next
if isFailure:
# If we invoked this generator frame by throwing an
exception
# into it, then throwExceptionIntoGenerator will
consume an
# additional stack frame itself, so we need to skip
that too.
appCodeTrace = appCodeTrace.tb_next
# Now that we've identified the frame being exited by the
# exception, let's figure out if returnValue was called
from it
# directly. returnValue itself consumes a stack frame,
so the
# application code will have a tb_next, but it will
*not* have a
# second tb_next.
if appCodeTrace.tb_next.tb_next:
# If returnValue was invoked non-local to the frame
which it is
# exiting, identify the frame that ultimately invoked
# returnValue so that we can warn the user, as this
behavior is
# confusing.
ultimateTrace = appCodeTrace
while ultimateTrace.tb_next.tb_next:
ultimateTrace = ultimateTrace.tb_next
filename = ultimateTrace.tb_frame.f_code.co_filename
lineno = ultimateTrace.tb_lineno
warnings.warn_explicit(
"returnValue() in %r causing %r to exit: "
"returnValue should only be invoked by
functions decorated "
"with inlineCallbacks" % (
ultimateTrace.tb_frame.f_code.co_name,
appCodeTrace.tb_frame.f_code.co_name),
DeprecationWarning, filename, lineno)
return self.return_value(e.value)
except:
return self.exception()
if isinstance(result, tuple): # yield tuple support!!!
non_deferreds_cnt = 0
list_of_deferreds = []
for r in result:
if not isinstance(r, defer.Deferred):
r = defer.succeed(r)
non_deferreds_cnt += 1
list_of_deferreds.append(r)
if non_deferreds_cnt != len(result):
result = defer.DeferredList(list_of_deferreds,
fireOnOneErrback=1)
if isinstance(result, defer.Deferred):
# a deferred was yielded, get the result.
if isinstance(result, defer.DeferredList): # yield
tuple support!!!
def gotResult(r):
if isinstance(r, Failure):
r = r.value.subFailure
else:
r = tuple(_r for _s, _r in r)
if waiting[0]:
waiting[0] = False
waiting[1] = r
else:
self.deferred.delFinalizer(result.delBoth,
gotResult) # cascading cancelling support!!!
self._inlineCallbacks(r, g)
else:
def gotResult(r):
if waiting[0]:
waiting[0] = False
waiting[1] = r
else:
self.deferred.delFinalizer(result.delBoth,
gotResult) # cascading cancelling support!!!
self._inlineCallbacks(r, g)
result.addBoth(gotResult)
if waiting[0]:
# Haven't called back yet, set flag so that we get
reinvoked
# and return from the loop
waiting[0] = False
self.deferred.addFinalizer(result.delBoth,
gotResult) # cascading cancelling support!!!
return self.deferred
result = waiting[1]
# Reset waiting to initial values for next loop.
gotResult uses
# waiting, but this isn't a problem because gotResult
is only
# executed once, and if it hasn't been executed yet,
the return
# branch above would have been taken.
waiting[0] = True
waiting[1] = None
from twisted.python.util import mergeFunctionMetadata
def create_inline_callbacks_decorator(manager_factory):
def inline_callbacks(f):
def unwind_generator(*args, **kwargs):
manager = manager_factory(*args, **kwargs)
return manager._inlineCallbacks(None, f(*args, **kwargs))
return mergeFunctionMetadata(f, unwind_generator)
return inline_callbacks
inlineCallbacks = create_inline_callbacks_decorator(InlineCallbacksManager)
defer.inlineCallbacks = inlineCallbacks
# # # #
# Deferred
#
# Fixes:
# 1. raise CancelledError with current traceback (original twisted code
raises with empty traceback)
# 2. ability to cancal with given traceback (parameter `failure`, if
callable must return failure)
def deferred_cancel(self, failure=None):
if not self.called:
canceller = self._canceller
if canceller:
canceller(self)
else:
# Arrange to eat the callback that will eventually be fired
# since there was no real canceller.
self._suppressAlreadyCalled = 1
if not self.called:
# There was no canceller, or the canceller didn't call
# callback or errback.
if failure is not None:
if callable(failure):
failure = failure()
self.errback(failure)
else:
try:
raise defer.CancelledError()
except defer.CancelledError:
self.errback(Failure())
elif isinstance(self.result, defer.Deferred):
# Waiting for another deferred -- cancel it instead.
self.result.cancel()
defer.Deferred.cancel = deferred_cancel
# Fixes: add `finalizers` member
original_deferred___init__ = defer.Deferred.__init__
def deferred___init__(self, *args, **kw):
original_deferred___init__(self, *args, **kw)
self.finalizers = []
defer.Deferred.__init__ = deferred___init__
def deferred_addFinalizer(self, callback, *args, **kw):
assert callable(callback)
self.finalizers.append((callback, args, kw))
if self.called:
self._runFinalizers()
return self
defer.Deferred.addFinalizer = deferred_addFinalizer
def deferred_delFinalizer(self, callback, *args, **kw):
if self.called:
defer.AlreadyCalledError
assert callable(callback)
self.finalizers.remove((callback, args, kw))
return self
defer.Deferred.delFinalizer = deferred_delFinalizer
def deferred__runFinalizers(self):
if not self._finalized:
self._finalized = True
for callback, args, kw in self.finalizers:
callback(*args, **kw)
defer.Deferred._runFinalizers = deferred__runFinalizers
defer.Deferred._finalized = False
# Fixes: run `finalizers` when done
original_deferred_callback = defer.Deferred.callback
def deferred_callback(self, result):
original_deferred_callback(self, result)
self._runFinalizers()
defer.Deferred.callback = deferred_callback
# Fixes: run `finalizers` when done
original_deferred_errback = defer.Deferred.errback
def deferred_errback(self, fail=None):
original_deferred_errback(self, fail=fail)
self._runFinalizers()
defer.Deferred.errback = deferred_errback
def _skip_result(result):
pass
def deferred_delCallbacks(self, callback, errback=None,
callbackArgs=None, callbackKeywords=None,
errbackArgs=None, errbackKeywords=None):
if self.called:
defer.AlreadyCalledError
assert callable(callback)
assert errback == None or callable(errback)
cbs = ((callback, callbackArgs, callbackKeywords),
(errback or (defer.passthru), errbackArgs, errbackKeywords))
self.callbacks.remove(cbs)
if not self.callbacks:
self.addBoth(_skip_result)
self.cancel()
return self
defer.Deferred.delCallbacks = deferred_delCallbacks
def deferred_delCallback(self, callback, *args, **kw):
return self.delCallbacks(callback, callbackArgs=args,
callbackKeywords=kw)
defer.Deferred.delCallback = deferred_delCallback
def deferred_delErrback(self, errback, *args, **kw):
return self.delCallbacks(defer.passthru, errback,
errbackArgs=args,
errbackKeywords=kw)
defer.Deferred.delErrback = deferred_delErrback
def deferred_delBoth(self, callback, *args, **kw):
return self.delCallbacks(callback, callback,
callbackArgs=args, errbackArgs=args,
callbackKeywords=kw, errbackKeywords=kw)
defer.Deferred.delBoth = deferred_delBoth
def deferred_unchainDeferred(self, d):
return self.delCallbacks(d.callback, d.errback)
defer.Deferred.unchainDeferred = deferred_unchainDeferred
def deferred_delCallbacksSafe(self, *args, **kw):
if not self.called:
return self.delCallbacks(*args, **kw)
return self
defer.Deferred.delCallbacksSafe = deferred_delCallbacksSafe
def deferred_delCallbackSafe(self, *args, **kw):
if not self.called:
return self.delCallback(*args, **kw)
return self
defer.Deferred.delCallbackSafe = deferred_delCallbackSafe
def deferred_delErrbackSafe(self, *args, **kw):
if not self.called:
return self.delErrback(*args, **kw)
return self
defer.Deferred.delErrbackSafe = deferred_delErrbackSafe
def deferred_delBothSafe(self, *args, **kw):
if not self.called:
return self.delBoth(*args, **kw)
return self
defer.Deferred.delBothSafe = deferred_delBothSafe
def deferred_unchainDeferredSafe(self, *args, **kw):
if not self.called:
return self.unchainDeferred(*args, **kw)
return self
defer.Deferred.unchainDeferredSafe = deferred_unchainDeferredSafe
# # # #
# DeferredList
#
# fixes: delCallbacks when finished
original_deferred_list___init__ = defer.DeferredList.__init__
def deferred_list___init__(self, deferredList, *args, **kw):
original_deferred_list___init__(self, deferredList, *args, **kw)
index = 0
for deferred in deferredList:
self.addFinalizer(
deferred.delCallbacksSafe,
self._cbDeferred,
self._cbDeferred,
callbackArgs=(index,defer.SUCCESS),
errbackArgs=(index,defer.FAILURE),
)
index = index + 1
defer.DeferredList.__init__ = deferred_list___init__
# # # #
# Test
#
if __name__ == '__main__':
from twisted.internet import reactor
import threading
import time
def log(s):
print time.strftime('%H:%M:%S'), s
# some long deferred work
def deferred_work():
cancel_flag = [False]
def my_canceller(d):
cancel_flag[0] = True
res = defer.Deferred(canceller=my_canceller)
def work():
log('start work')
cnt = 7
while cnt:
cnt -= 1
time.sleep(1)
if cancel_flag[0]:
log('work cancelled')
return
log('finish work, work not cancelled!!!')
return res.callback(7)
threading.Thread(target=work).start()
return res
# # # #
# Shuttingdown test
# notes: register all inlineCallbacks deferreds and cancel them all
when shuttingdown occurs
#
class ShuttingdownError(BaseException):
pass
# base class for shuttingdown support
class ShuttingdownSupport(object):
shuttingdown = 0
def __init__(self):
self._deferreds = {}
def shutdown(self):
self.shuttingdown = 1
self.do_shutdown()
def do_shutdown(self):
log('SHUTDOWN')
for deferred in self._deferreds.keys():
try:
raise ShuttingdownError
except ShuttingdownError:
deferred.errback()
def _reg_deferred(self, deferred):
self._deferreds[deferred] = 1
def _unreg_deferred(self, deferred):
del self._deferreds[deferred]
# special inlineCallbacks manager for shuttingdown support
class
InlineCallbacksManagerWithShuttingdownSupport(InlineCallbacksManager):
def __init__(self, instance, *args, **kw):
self.deferred = defer.Deferred()
self.instance = instance
instance._reg_deferred(self.deferred)
self.deferred.addFinalizer(instance._unreg_deferred,
self.deferred)
def send_result(self, g, result):
if self.instance.shuttingdown: # if shuttingdown occurs
while waiting result then raise ShuttingdownError into generator
try:
raise ShuttingdownError
except ShuttingdownError:
failure = Failure()
self.throw_exception(g, failure)
else:
return InlineCallbacksManager.send_result(self, g, result)
# decorator for methods of ShuttingdownSupport objects
inlineCallbacksShuttingdown =
create_inline_callbacks_decorator(InlineCallbacksManagerWithShuttingdownSupport)
class ShuttingdownTest(ShuttingdownSupport):
@inlineCallbacksShuttingdown
def test1(self):
log('start ShuttingdownTest.test1')
try:
res = yield deferred_work()
except defer.CancelledError:
log('ShuttingdownTest.test1 cancelled')
raise
except ShuttingdownError:
log('ShuttingdownTest.test1 shuttingdown detected')
raise
log('finish ShuttingdownTest.test1')
defer.returnValue(res)
@inlineCallbacksShuttingdown
def test2(self):
log('start ShuttingdownTest.test2')
try:
res = yield self.test1(), self.test1() # test yield
tuple!!!
except defer.CancelledError:
log('ShuttingdownTest.test2 cancelled')
raise
except ShuttingdownError:
log('ShuttingdownTest.test2 shuttingdown detected')
raise
log('finish ShuttingdownTest.test2')
defer.returnValue(res)
# @defer.inlineCallbacks
# def test1():
# log('start test1')
# try:
# res = yield deferred_work()
# except defer.CancelledError:
# log('test1 cancelled')
# raise
# log('finish test1, test1 not cancelled!!!')
# defer.returnValue(res)
# @defer.inlineCallbacks
# def test2():
# log('start test2')
# try:
# res = yield test1()
# except defer.CancelledError:
# log('test2 cancelled')
# raise
# log('finish test2, test2 not cancelled!!!')
# defer.returnValue(res)
def ok(result):
log('ok ' + repr(result))
def error(result):
log('error ' + repr(result))
# def run_test1():
# deferred = test2()
# deferred.addCallbacks(ok, error)
# reactor.callLater(1, deferred.cancel)
# reactor.callLater(1, run_test1)
# run_test1()
def run_test2():
obj = ShuttingdownTest()
deferred = obj.test2()
deferred.addCallbacks(ok, error)
reactor.callLater(1, obj.shutdown)
# reactor.callLater(3, run_test2)
run_test2()
reactor.callLater(10, reactor.stop)
reactor.run()
More information about the Twisted-Python
mailing list