[Twisted-Python] process output modulation
David
dvkeeney at gmail.com
Tue Dec 21 15:51:04 EST 2010
Thank you.
I was thinking along those lines, but wasn't sure how the
producer/consumer methods related to the streams model. I guess I
still don't know, but I now have more confidence that a solution is
there to find.
I was considering a klunky workaround, where the db output is piped to
a throttling program which pipes to the web app. I still have the
problem of telling that filter how fast to go, but that is more of a
unix domain problem and less of a twisted domain problem. Doable, but
yuck.
I guess it's time to dig into the reactor.spawnProcess code and find
where the producer-related method calls are.
David
On Tue, Dec 21, 2010 at 11:51 AM, <exarkun at twistedmatrix.com> wrote:
> On 06:14 pm, dvkeeney at gmail.com wrote:
>>We are streaming a large amount of program output to the browser via a
>>twisted app, and we are seeing huge memory consumption.
>>
>>We have a database process that generates large amounts of data to
>>stdout, and we are streaming that to the browser through a twisted
>>web2 app. We are using web2 because it supports upload streaming as
>>well.
>>
>>Our code looks like:
>>
>> env = {}
>> input = stream.MemoryStream('')
>> SQLDUMP = '/usr/bin/dump'
>>
>> pstream = stream.ProcessStreamer(input, SQLDUMP,
>> [SQLDUMP,'--format=p','--clean',
>> '--host='+cfg.SOCKET,
>> '--username='+self.role,dbname],
>> env)
>> pstream.run()
>>
>> outstream = WatchedStream(pstream.outStream)
>>
>> response = http.Response( headers=headers, stream=outstream)
>>
>>
>> class WatchedStream(object):
>>
>> def __init__(self,stream):
>> self.stream = stream
>> def split(self, point):
>> ... some implementation
>> def close(self):
>> ... some implementation
>> def read(self):
>> d = self.stream.read()
>> bufSize = sum( [len(b) for b in self.stream.buffer if
>>b])
>> log.msg('buffer size: %s'%bufSize)
>> return d
>>
>>Watching the log shows us that the stream (a web2.ProducerStream)
>>buffer is growing continuously to hundreds of MB. Doesn't a stream
>>object have a bufferSize attribute and the ability to throttle the
>>flow of data based on buffer fullness? Does that throttling behavior
>>have to be triggered explicitly?
>>
>>Yes, I know that web2 is deprecated, but I don't know that the problem
>>is in the web2 components. The reactor.spawnProcess documentation
>>does not seem to address the matter of modulating the read speed. Any
>>assistance will be appreciated.
>
> You probably want to pause Twisted's child process reader when you
> notice your buffer is getting too large. Off hand, I can't say how you
> might translate this advice into specific stream API calls, but
> ultimately you want to call `pauseProducing` on something.
>
> It's also possible you'll have to go through some not-quite-documented
> interfaces(/implementations) to get there. IProcessTransport has no
> pauseProducing method, but the POSIX implementation of that interface
> has a `pipes` dictionary where the values are `IProducer` providers, so
> you can call `pauseProducing` on them (or just the one for, say, stdout,
> if that's where you're getting bytes from).
>
> Later, of course, you'll want to undo the pause with a call to
> `resumeProducing`.
>
> Jean-Paul
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>
--
dkeeney at travelbyroad.net
Rdbhost -> SQL databases as a webservice [www.rdbhost.com]
More information about the Twisted-Python
mailing list