[Twisted-Python] Learning about IPushProducer

Jean-Paul Calderone exarkun at divmod.com
Mon Mar 12 15:09:56 MDT 2007


On Mon, 12 Mar 2007 15:44:41 -0400, "Rutt, Benjamin" <benjamin.rutt at gs.com> wrote:
>Hi.
>
>Anyone have any pointers as to how I can get some of my questions
>answered below?  I had hoped to get some response.  Did I not use the
>proper etiquitte?  Or there is some expert on the IPushProducer
>mechanism or the author of page

Sorry, your question was big and challenging to approach.

> [snip]
>
>	When running the following code (my 2nd twisted program!), it
>works as I had hoped - it doesn't starve any clients that want to
>receive data back, even with a simultaneously active really long
>streaming server-to-client communication (i.e. one piggy client asking
>for millions of bytes).  i.e. another client can get in and ask for just
>a few bytes while a large payload is being delivered to a different
>client.  Which is great!
>
>	Here's a sample interaction from the client side:
>
>	$ telnet localhost 8007
>	Trying 127.0.0.1...
>	Connected to localhost.
>	Escape character is '^]'.
>	1
>	x
>	2
>	xx
>	3
>	xxx
>	10
>	xxxxxxxxxx
>	99999
>
>xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>
>	[...lots of x's...]
>	xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>	bye
>	Connection closed by foreign host.
>	$
>
>	So I have 2 questions on my code:
>
>	1) am I doing anything wrong in setting up the plumbing?
>	2) does pauseProducing() get called by another thread whilst
>resumeProducing() is running?  (I believe it must, otherwise my
>resumeProducing() would only be entered once).  If so I should have an
>appropriate mutex around the read/write of self.pause, no?
>
>	Here is the code, and output from the server is at the end.
>Thanks -- Benjamin
>
>	#!/usr/bin/env python
>	import os, os.path, sys, re, commands, pickle, tempfile, getopt,
>datetime
>	import socket, string, random, time, traceback, shutil, popen2
>
>	from zope.interface import implements
>	from twisted.internet import protocol, defer, interfaces, error,
>reactor
>	from twisted.internet.protocol import Protocol, Factory
>	from twisted.protocols.basic import LineReceiver
>
>	class NonStarvingXGiver:
>	    implements(interfaces.IPushProducer)
>	    def __init__(self, howmany, consumer):
>	        self.howmany = howmany
>	        self.sent_already = 0
>	        self.paused = False
>	        self.consumer = consumer
>	    def beginSendingXs(self):
>	        self.deferred = deferred = defer.Deferred()
>	        self.consumer.registerProducer(self, False)
>	        return deferred
>	    def pauseProducing(self):
>	        print 'pauseProducing: invoked'
>	        self.paused = True
>	    def resumeProducing(self):
>	        print 'resumeProducing: invoked'
>	        self.paused = False
>	        maxchunksz = 1024

This loop:

>	        while not self.paused and self.howmany >
>self.sent_already:
>	            chunksz = min(maxchunksz, self.howmany -
>self.sent_already)
>	            self.consumer.write('x' * chunksz)
>	            self.sent_already += chunksz

is a bit atypical, I think.  The reason it is eventually stopping
is that your code is being invoked re-entrantly by the consumer as
soon as it decides its buffer is full.  I'm not sure the loop is
/wrong/, but it is a bit surprising.  You don't need a mutex here,
since it's single threaded, but you do need to be aware that your
code can be re-entered within a single thread.

Does that answer your questions?

> [snip]

Jean-Paul




More information about the Twisted-Python mailing list