[Twisted-Python] process output modulation
exarkun at twistedmatrix.com
exarkun at twistedmatrix.com
Tue Dec 21 13:51:57 EST 2010
On 06:14 pm, dvkeeney at gmail.com wrote:
>We are streaming a large amount of program output to the browser via a
>twisted app, and we are seeing huge memory consumption.
>
>We have a database process that generates large amounts of data to
>stdout, and we are streaming that to the browser through a twisted
>web2 app. We are using web2 because it supports upload streaming as
>well.
>
>Our code looks like:
>
> env = {}
> input = stream.MemoryStream('')
> SQLDUMP = '/usr/bin/dump'
>
> pstream = stream.ProcessStreamer(input, SQLDUMP,
> [SQLDUMP,'--format=p','--clean',
> '--host='+cfg.SOCKET,
> '--username='+self.role,dbname],
> env)
> pstream.run()
>
> outstream = WatchedStream(pstream.outStream)
>
> response = http.Response( headers=headers, stream=outstream)
>
>
> class WatchedStream(object):
>
> def __init__(self,stream):
> self.stream = stream
> def split(self, point):
> ... some implementation
> def close(self):
> ... some implementation
> def read(self):
> d = self.stream.read()
> bufSize = sum( [len(b) for b in self.stream.buffer if
>b])
> log.msg('buffer size: %s'%bufSize)
> return d
>
>Watching the log shows us that the stream (a web2.ProducerStream)
>buffer is growing continuously to hundreds of MB. Doesn't a stream
>object have a bufferSize attribute and the ability to throttle the
>flow of data based on buffer fullness? Does that throttling behavior
>have to be triggered explicitly?
>
>Yes, I know that web2 is deprecated, but I don't know that the problem
>is in the web2 components. The reactor.spawnProcess documentation
>does not seem to address the matter of modulating the read speed. Any
>assistance will be appreciated.
You probably want to pause Twisted's child process reader when you
notice your buffer is getting too large. Off hand, I can't say how you
might translate this advice into specific stream API calls, but
ultimately you want to call `pauseProducing` on something.
It's also possible you'll have to go through some not-quite-documented
interfaces(/implementations) to get there. IProcessTransport has no
pauseProducing method, but the POSIX implementation of that interface
has a `pipes` dictionary where the values are `IProducer` providers, so
you can call `pauseProducing` on them (or just the one for, say, stdout,
if that's where you're getting bytes from).
Later, of course, you'll want to undo the pause with a call to
`resumeProducing`.
Jean-Paul
More information about the Twisted-Python
mailing list