[Twisted-Python] Re: rewrite of flow.py completed
Clark C. Evans
cce at clarkevans.com
Sat Apr 12 19:39:48 EDT 2003
On Sat, Apr 12, 2003 at 03:51:41PM +0200, Philippe Lafoucrière wrote:
| - will flow.py be included in Twisted 1.0.4 (rc) ?
It will probably be in the sandbox till it gets users other
than me, and in particular, till etrepum's feedback.
| - Did you write some unit tests ?
I started; see sandbox/test_flow.py
| - could you add the outpout of your exemple (at the beginning of flow.py)?
Done.
| - In QueryIterator :
|
| "
| self.curs = conn.cursor()
| "
| but :
|
| >>> 'cursor' in dir (adbapi.ConnectionPool)
| 0
If you look, the conn is created from self.pool.connect() which
returns a standard API DB 2.0 connection. I would have rather had
the constructor take a connection, but this won't work as the
Twisted ADBAPI uses the current thread to find the connection, thus
pool.connect() must be run in the appropriate thread, which is
not the thread where the constructor for this class would be executed.
| I think you want to use a 'standard' pool corresponding to API DB 2.0
| twisted.entreprise.adbapi.ConnectionPool doesn't have a cursor method.
| NTL, it's just the right use of flow, because we don't want a deffered here
| (because of thread use).
QueryIterator uses the ThreadedIterator. ThreadedIterator basically
is two iterators that are 'linked' together, one in the main thread
and one in the thread which is allowed to block. The two iterators
communicate with a shared list which holds intermediate output, ie,
it is the message queue from the iterator in the secondary thread
to the primary thread. Anyway, the iterator in the primary thread
is a simple 'proxy' like iterator, which raises Cooperate if there
isn't any data available from the secondary iterator.
Here is a simple example of how to make a ThreadedIterator...
note that the sleep(.1) call is not executed in the main
thread -- that's the whole point of the ThreadedIterator.
class CountIterator(flow.ThreadedIterator):
def __init__(self, count):
flow.ThreadedIterator.__init__(self)
self.count = count
def next(self): # this is run in a separate thread
from time import sleep
sleep(.1)
val = self.count
if not(val):
raise flow.StopIteration
self.count -= 1
return [val]
def res(x): assert([5,4,3,2,1] == x)
from twisted.internet import reactor
f = flow.DeferredFlow(CountIterator(5))
f.addCallback(res)
reactor.callLater(2,reactor.stop)
reactor.run()
Anyway, I made a bunch of checkins about a day ago to clean
up some of the documentation and to add the test_flow.py
Best,
Clark
-------------- next part --------------
# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from __future__ import nested_scopes
import flow
import unittest
class producer:
""" iterator version of the following generator...
def producer():
lst = flow.Generator([1,2,3])
nam = flow.Generator(['one','two','three'])
while 1:
yield lst; yield nam
if lst.stop or nam.stop:
return
yield (lst.result, nam.result)
"""
def __iter__(self):
self.lst = flow.Generator([1,2,3])
self.nam = flow.Generator(['one','two','three'])
self.state = self.yield_lst
return self
def yield_lst(self):
self.state = self.yield_nam
return self.lst
def yield_nam(self):
self.state = self.yield_results
return self.nam
def yield_results(self):
self.state = self.yield_lst
if self.lst.stop or self.nam.stop:
raise flow.StopIteration
return (self.lst.result, self.nam.result)
def next(self):
return self.state()
class consumer:
""" iterator version of the following generator...
def consumer():
title = flow.Generator(['Title'])
lst = flow.Generator(producer())
yield title
yield title.getResult()
try:
while 1:
yield lst
yield lst.getResult()
except flow.StopIteration: pass
"""
def __iter__(self):
self.title = flow.Generator(['Title'])
self.lst = flow.Generator(producer())
self.state = self.yield_title
return self
def yield_title(self):
self.state = self.yield_title_result
return self.title
def yield_title_result(self):
self.state = self.yield_lst
return self.title.getResult()
def yield_lst(self):
self.state = self.yield_result
return self.lst
def yield_result(self):
self.state = self.yield_lst
return self.lst.getResult()
def next(self):
return self.state()
class FlowTest(unittest.TestCase):
def testBasic(self):
f = flow.Flow([1,2,3])
f.execute()
self.assertEqual([1,2,3],f.results)
def testProducer(self):
f = flow.Flow(producer())
f.execute()
self.assertEqual([(1,'one'),(2,'two'),(3,'three')],f.results)
def testConsumer(self):
f = flow.Flow(consumer())
f.execute()
self.assertEqual(['Title',(1,'one'),(2,'two'),(3,'three')],f.results)
def testDeferred(self):
from twisted.internet import reactor
def res(x):
self.assertEqual(['Title', (1,'one'),(2,'two'),(3,'three')], x)
f = flow.DeferredFlow(consumer())
f.addCallback(res)
reactor.iterate()
def testThreaded(self):
class CountIterator(flow.ThreadedIterator):
def __init__(self, count):
flow.ThreadedIterator.__init__(self)
self.count = count
def next(self): # this is run in a separate thread
from time import sleep
sleep(.1)
val = self.count
if not(val):
raise flow.StopIteration
self.count -= 1
return [val]
def res(x): self.assertEqual([5,4,3,2,1], x)
from twisted.internet import reactor
f = flow.DeferredFlow(CountIterator(5))
f.addCallback(res)
reactor.callLater(2,reactor.stop)
reactor.run()
if '__main__' == __name__:
unittest.main()
-------------- next part --------------
# Twisted, the Framework of Your Internet
# Copyright (C) 2003 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General
# Public License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
#
from __future__ import nested_scopes
""" Flow ... async data flow
This module provides a mechanism for using async data flows through
the use of generators. While this module does not use generators in
its implementation, it isn't very useable without them. A data flow
is constructed with a top level generator, which can have three
types of yield statements: flow.Cooperate, flow.Generator, or
any other return value with exceptions wrapped using failure.Failure
An example program...
from __future__ import generators
import flow
def producer():
lst = flow.Generator([1,2,3])
nam = flow.Generator(['one','two','three'])
while 1:
yield lst; yield nam
if lst.stop or nam.stop:
return
yield (lst.result, nam.result)
def consumer():
title = flow.Generator(['Title'])
yield title
print title.getResult()
lst = flow.Generator(producer())
try:
while 1:
yield lst
print lst.getResult()
except flow.StopIteration: pass
flow.Flow(consumer()).execute()
produces the output:
Title
(1, 'one')
(2, 'two')
(3, 'three')
"""
from twisted.python import failure
from twisted.python.compat import StopIteration, iter
class FlowCommand:
""" Objects given special meaning when returned from yield """
pass
class Cooperate(FlowCommand):
""" Represents a request to delay and let other events process
Objects of this type are returned within a flow when
the flow would block, or needs to sleep. This object
is then used as a signal to the flow mechanism to pause
and perhaps let other delayed operations to proceed.
"""
def __init__(self, timeout = 0):
self.timeout = timeout
class Generator(FlowCommand):
""" Wraps a generator or other iterator for use in a flow
Creates a nested generation stage (a producer) which can provide
zero or more values to the current stage (the consumer). After
a yield of this object when control has returned to the caller,
this object will have two attributes:
stop This is true if the underlying generator has not
been started (a yield is needed) or if the underlying
generator has raised StopIteration
result This is the result of the generator if it is active,
the result may be a fail.Failure object if an
exception was thrown in the nested generator.
"""
def __init__(self, iterable):
self._next = iter(iterable).next
self.result = None
self.stop = 1
def isFailure(self):
""" return a boolean value if the result is a Failure """
if self.stop: raise StopIteration()
return isinstance(self.result, failure.Failure)
def getResult(self):
""" return the result, or re-throw an exception on Failure """
if self.isFailure():
raise (self.result.value or self.result.type)
return self.result
def _generate(self):
""" update the active and result member variables """
try:
self.result = self._next()
self.stop = 0
except StopIteration:
self.stop = 1
self.result = None
except Cooperate, coop:
self.stop = 0
self.result = coop
except failure.Failure, fail:
self.stop = 1
self.result = failure
except:
self.stop = 1
self.result = failure.Failure()
class Flow:
""" A flow contruct, created with a top-level generator/iterator
The iterable provided to this flow is the top-level consumer
object. From within the consumer, multiple 'yield' calls can
be made returning either Cooperate or Generate. If a Generate
object is returned, then it becomes the current context and
the process is continued. Communication from the producer
back to the consumer is done by yield of a non FlowItem
"""
def __init__(self, iterable):
self.results = []
self._stack = [Generator(iterable)]
def _addResult(self, result):
""" private called as top-level results are added"""
self.results.append(result)
def _execute(self):
""" private execute, execute flow till a Cooperate is found """
while self._stack:
head = self._stack[-1]
head._generate()
if head.stop:
self._stack.pop()
else:
result = head.result
if isinstance(result, FlowCommand):
if isinstance(result, Cooperate):
return result.timeout
assert(isinstance(result, Generator))
self._stack.append(result)
else:
if len(self._stack) > 1:
self._stack.pop()
else:
if self._addResult(result):
return
def execute(self):
""" continually execute, using sleep for Cooperate """
from time import sleep
while 1:
timeout = self._execute()
if timeout is None: break
sleep(timeout)
from twisted.internet import defer
class DeferredFlow(Flow, defer.Deferred):
""" a version of Flow using Twisted's reactor and Deferreds
In this version, a call to execute isn't required. Instead,
the iterable is scheduled right away using the reactor. And,
the Cooperate is implemented through the reactor's callLater.
Since more than one (possibly failing) result could be returned,
this uses the same semantics as DeferredList
"""
def __init__(self, iterable, delay = 0,
fireOnOneCallback=0, fireOnOneErrback=0):
"""initialize a DeferredFlow
@param iterable: top level iterator / generator
@param delay: delay when scheduling reactor.callLater
@param fireOnOneCallback: a flag indicating that the first good
yielded result should be sent via Callback
@param fireOnOneErrback: a flag indicating that the first failing
yield result should be sent via Errback
"""
from twisted.internet import reactor
defer.Deferred.__init__(self)
Flow.__init__(self,iterable)
self.fireOnOneCallback = fireOnOneCallback
self.fireOnOneErrback = fireOnOneErrback
reactor.callLater(delay, self._execute)
def execute(self):
raise TypeError("Deferred Flow is auto-executing")
def _addResult(self, result):
""" emulate DeferredList behavior, short circut if event is fired """
if not self.called:
if self.fireOnOneCallback:
if not isinstance(result, failure.Failure):
self.callback((result,len(self.results)))
return 1
if self.fireOnOneErrback:
if isinstance(result, failure.Failure):
self.errback(fail.Failure((result,len(self.results))))
return 1
self.results.append(result)
def _execute(self):
timeout = Flow._execute(self)
if timeout is None:
if not self.called:
self.callback(self.results)
else:
from twisted.internet import reactor
reactor.callLater(timeout, self._execute)
#
# The following is a thread package which really is othogonal to
# Flow. Flow does not depend on it, and it does not depend on Flow.
# Although, if you are trying to bring the output of a thread into
# a Flow, it is exactly what you want. The QueryIterator is
# just an obvious application of the ThreadedIterator.
#
class ThreadedIterator:
"""
This is an iterator which tunnels output from an iterator
executed in a thread to the main thread. Note, unlike
regular iterators, this one throws a Cooperate exception
which must be handled by calling reactor.callLater so that
the producer threads can have a chance to send events to
the main thread.
Basically, the 'init' and 'next' method of subclasses are
executed in this alternative thread. The results of 'next'
are marshalled back into the primary thread. If when the
main thread data is not available, then a particular
exception.
"""
def __init__(self):
class _Tunnel:
def __init__(self, source):
"""
This is the setup, the source argument is the iterator
being wrapped, which exists in another thread.
"""
self.source = source
self.isFinished = 0
self.failure = None
self.buff = []
def process(self):
"""
This is called in the 'source' thread, and
just basically sucks the iterator, appending
items back to the main thread.
"""
try:
self.source.init()
except:
self.failure = failure.Failure()
from twisted.internet.reactor import callFromThread
try:
while 1:
val = self.source.next()
self.buff.extend(val) # lists are thread safe
except StopIteration:
callFromThread(self.stop)
except:
if not self.failure:
self.failure = failure.Failure()
self.source = None
def setFailure(self, failure):
self.failure = failure
def stop(self):
self.isFinished = 1
def next(self):
if self.buff:
return self.buff.pop(0)
if self.isFinished:
raise StopIteration
if self.failure:
raise self.failure
raise Cooperate()
tunnel = _Tunnel(self)
self._tunnel = tunnel
def __iter__(self):
from twisted.internet.reactor import callInThread
callInThread(self._tunnel.process)
return self._tunnel
def init(self):
pass
def next(self):
raise StopIteration
class QueryIterator(ThreadedIterator):
def __init__(self, pool, sql, fetchall=0):
ThreadedIterator.__init__(self)
self.curs = None
self.sql = sql
self.pool = pool
self.fetchall = fetchall
def init(self):
conn = self.pool.connect()
self.curs = conn.cursor()
self.curs.execute(self.sql)
def next(self):
if self.fetchall:
res = self.curs.fetchall()
else:
res = self.curs.fetchmany()
if not(res):
raise StopIteration
return res
More information about the Twisted-Python
mailing list