[Twisted-Python] Learning about IPushProducer
Jean-Paul Calderone
exarkun at divmod.com
Mon Mar 12 15:09:56 MDT 2007
On Mon, 12 Mar 2007 15:44:41 -0400, "Rutt, Benjamin" <benjamin.rutt at gs.com> wrote:
>Hi.
>
>Anyone have any pointers as to how I can get some of my questions
>answered below? I had hoped to get some response. Did I not use the
>proper etiquitte? Or there is some expert on the IPushProducer
>mechanism or the author of page
Sorry, your question was big and challenging to approach.
> [snip]
>
> When running the following code (my 2nd twisted program!), it
>works as I had hoped - it doesn't starve any clients that want to
>receive data back, even with a simultaneously active really long
>streaming server-to-client communication (i.e. one piggy client asking
>for millions of bytes). i.e. another client can get in and ask for just
>a few bytes while a large payload is being delivered to a different
>client. Which is great!
>
> Here's a sample interaction from the client side:
>
> $ telnet localhost 8007
> Trying 127.0.0.1...
> Connected to localhost.
> Escape character is '^]'.
> 1
> x
> 2
> xx
> 3
> xxx
> 10
> xxxxxxxxxx
> 99999
>
>xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>
> [...lots of x's...]
> xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
> bye
> Connection closed by foreign host.
> $
>
> So I have 2 questions on my code:
>
> 1) am I doing anything wrong in setting up the plumbing?
> 2) does pauseProducing() get called by another thread whilst
>resumeProducing() is running? (I believe it must, otherwise my
>resumeProducing() would only be entered once). If so I should have an
>appropriate mutex around the read/write of self.pause, no?
>
> Here is the code, and output from the server is at the end.
>Thanks -- Benjamin
>
> #!/usr/bin/env python
> import os, os.path, sys, re, commands, pickle, tempfile, getopt,
>datetime
> import socket, string, random, time, traceback, shutil, popen2
>
> from zope.interface import implements
> from twisted.internet import protocol, defer, interfaces, error,
>reactor
> from twisted.internet.protocol import Protocol, Factory
> from twisted.protocols.basic import LineReceiver
>
> class NonStarvingXGiver:
> implements(interfaces.IPushProducer)
> def __init__(self, howmany, consumer):
> self.howmany = howmany
> self.sent_already = 0
> self.paused = False
> self.consumer = consumer
> def beginSendingXs(self):
> self.deferred = deferred = defer.Deferred()
> self.consumer.registerProducer(self, False)
> return deferred
> def pauseProducing(self):
> print 'pauseProducing: invoked'
> self.paused = True
> def resumeProducing(self):
> print 'resumeProducing: invoked'
> self.paused = False
> maxchunksz = 1024
This loop:
> while not self.paused and self.howmany >
>self.sent_already:
> chunksz = min(maxchunksz, self.howmany -
>self.sent_already)
> self.consumer.write('x' * chunksz)
> self.sent_already += chunksz
is a bit atypical, I think. The reason it is eventually stopping
is that your code is being invoked re-entrantly by the consumer as
soon as it decides its buffer is full. I'm not sure the loop is
/wrong/, but it is a bit surprising. You don't need a mutex here,
since it's single threaded, but you do need to be aware that your
code can be re-entered within a single thread.
Does that answer your questions?
> [snip]
Jean-Paul
More information about the Twisted-Python
mailing list