[Twisted-Python] Learning about IPushProducer
Rutt, Benjamin
Benjamin.Rutt at gs.com
Fri Mar 16 09:48:53 MDT 2007
BTW, how was it that my pauseProducing ever got invoked below? after
all I registered my producer (erroneously, no doubt since it implements
IPushProducer) as a pull producer by using the 2nd argument of 'False'
below:
self.consumer.registerProducer(self, False)
If indeed it's registered as a pull producer I wouldn't think its pause
should ever be called. Does twisted actually use the type of the class
to see what to call? Or getattr(class,'pauseProducing') or somesuch?
If so, what's the purpose of 'True' or 'False' during registration?
from the docs:
registerProducer(producer, streaming)
So that a consumer can invoke methods on a producer, the
consumer needs to be told about the producer. This is done with the
registerProducer method. The first argument is either a IPullProducer or
IPushProducer provider; the second argument indicates which of these
interfaces is provided: True for push producers, False for pull
producers.
Thanks!
________________________________
From: twisted-python-bounces at twistedmatrix.com
[mailto:twisted-python-bounces at twistedmatrix.com] On Behalf Of Rutt,
Benjamin
Sent: Tuesday, March 06, 2007 12:04 PM
To: twisted-python at twistedmatrix.com
Subject: [Twisted-Python] Learning about IPushProducer
When running the following code (my 2nd twisted program!), it
works as I had hoped - it doesn't starve any clients that want to
receive data back, even with a simultaneously active really long
streaming server-to-client communication (i.e. one piggy client asking
for millions of bytes). i.e. another client can get in and ask for just
a few bytes while a large payload is being delivered to a different
client. Which is great!
Here's a sample interaction from the client side:
$ telnet localhost 8007
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1
x
2
xx
3
xxx
10
xxxxxxxxxx
99999
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[...lots of x's...]
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
bye
Connection closed by foreign host.
$
So I have 2 questions on my code:
1) am I doing anything wrong in setting up the plumbing?
2) does pauseProducing() get called by another thread whilst
resumeProducing() is running? (I believe it must, otherwise my
resumeProducing() would only be entered once). If so I should have an
appropriate mutex around the read/write of self.pause, no?
Here is the code, and output from the server is at the end.
Thanks -- Benjamin
#!/usr/bin/env python
import os, os.path, sys, re, commands, pickle, tempfile, getopt,
datetime
import socket, string, random, time, traceback, shutil, popen2
from zope.interface import implements
from twisted.internet import protocol, defer, interfaces, error,
reactor
from twisted.internet.protocol import Protocol, Factory
from twisted.protocols.basic import LineReceiver
class NonStarvingXGiver:
implements(interfaces.IPushProducer)
def __init__(self, howmany, consumer):
self.howmany = howmany
self.sent_already = 0
self.paused = False
self.consumer = consumer
def beginSendingXs(self):
self.deferred = deferred = defer.Deferred()
self.consumer.registerProducer(self, False)
return deferred
def pauseProducing(self):
print 'pauseProducing: invoked'
self.paused = True
def resumeProducing(self):
print 'resumeProducing: invoked'
self.paused = False
maxchunksz = 1024
while not self.paused and self.howmany >
self.sent_already:
chunksz = min(maxchunksz, self.howmany -
self.sent_already)
self.consumer.write('x' * chunksz)
self.sent_already += chunksz
if self.howmany == self.sent_already:
self.consumer.write('\n')
self.consumer.unregisterProducer()
print 'resumeProducing: exiting for the last time'
def stopProducing(self):
print 'stopProducing: invoked'
self.consumer.unregisterProducer()
class xgiver(LineReceiver):
def lineReceived(self, howmany):
print 'got line [%s] from client [%s]' % (howmany,
self.transport.getPeer())
if howmany == 'bye':
print 'goodbye to', self.transport.getPeer()
self.transport.loseConnection()
return
try:
howmany = int(howmany)
s = NonStarvingXGiver(howmany, self.transport)
s.beginSendingXs()
except Exception, ex:
self.transport.write("invalid input " + howmany +
"\n")
# Next lines are magic:
factory = Factory()
factory.protocol = xgiver
# 8007 is the port you want to run under. Choose something >1024
reactor.listenTCP(8007, factory)
reactor.run()
-------------------------------------------------------------------
Server output:
$ ./xgiver.py
got line [1] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [2] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [3] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [10] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [99999] from client [IPv4Address(TCP, '127.0.0.1',
51007)]
resumeProducing: invoked
pauseProducing: invoked
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [bye] from client [IPv4Address(TCP, '127.0.0.1',
51007)]
goodbye to IPv4Address(TCP, '127.0.0.1', 51007)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: </pipermail/twisted-python/attachments/20070316/44ed0574/attachment.html>
More information about the Twisted-Python
mailing list