[Twisted-Python] Learning about IPushProducer
Rutt, Benjamin
Benjamin.Rutt at gs.com
Tue Mar 6 10:03:53 MST 2007
When running the following code (my 2nd twisted program!), it works as I
had hoped - it doesn't starve any clients that want to receive data
back, even with a simultaneously active really long streaming
server-to-client communication (i.e. one piggy client asking for
millions of bytes). i.e. another client can get in and ask for just a
few bytes while a large payload is being delivered to a different
client. Which is great!
Here's a sample interaction from the client side:
$ telnet localhost 8007
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1
x
2
xx
3
xxx
10
xxxxxxxxxx
99999
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[...lots of x's...]
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
bye
Connection closed by foreign host.
$
So I have 2 questions on my code:
1) am I doing anything wrong in setting up the plumbing?
2) does pauseProducing() get called by another thread whilst
resumeProducing() is running? (I believe it must, otherwise my
resumeProducing() would only be entered once). If so I should have an
appropriate mutex around the read/write of self.pause, no?
Here is the code, and output from the server is at the end. Thanks --
Benjamin
#!/usr/bin/env python
import os, os.path, sys, re, commands, pickle, tempfile, getopt,
datetime
import socket, string, random, time, traceback, shutil, popen2
from zope.interface import implements
from twisted.internet import protocol, defer, interfaces, error, reactor
from twisted.internet.protocol import Protocol, Factory
from twisted.protocols.basic import LineReceiver
class NonStarvingXGiver:
implements(interfaces.IPushProducer)
def __init__(self, howmany, consumer):
self.howmany = howmany
self.sent_already = 0
self.paused = False
self.consumer = consumer
def beginSendingXs(self):
self.deferred = deferred = defer.Deferred()
self.consumer.registerProducer(self, False)
return deferred
def pauseProducing(self):
print 'pauseProducing: invoked'
self.paused = True
def resumeProducing(self):
print 'resumeProducing: invoked'
self.paused = False
maxchunksz = 1024
while not self.paused and self.howmany > self.sent_already:
chunksz = min(maxchunksz, self.howmany - self.sent_already)
self.consumer.write('x' * chunksz)
self.sent_already += chunksz
if self.howmany == self.sent_already:
self.consumer.write('\n')
self.consumer.unregisterProducer()
print 'resumeProducing: exiting for the last time'
def stopProducing(self):
print 'stopProducing: invoked'
self.consumer.unregisterProducer()
class xgiver(LineReceiver):
def lineReceived(self, howmany):
print 'got line [%s] from client [%s]' % (howmany,
self.transport.getPeer())
if howmany == 'bye':
print 'goodbye to', self.transport.getPeer()
self.transport.loseConnection()
return
try:
howmany = int(howmany)
s = NonStarvingXGiver(howmany, self.transport)
s.beginSendingXs()
except Exception, ex:
self.transport.write("invalid input " + howmany + "\n")
# Next lines are magic:
factory = Factory()
factory.protocol = xgiver
# 8007 is the port you want to run under. Choose something >1024
reactor.listenTCP(8007, factory)
reactor.run()
-------------------------------------------------------------------
Server output:
$ ./xgiver.py
got line [1] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [2] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [3] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [10] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [99999] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
resumeProducing: invoked
pauseProducing: invoked
resumeProducing: invoked
resumeProducing: exiting for the last time
got line [bye] from client [IPv4Address(TCP, '127.0.0.1', 51007)]
goodbye to IPv4Address(TCP, '127.0.0.1', 51007)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: </pipermail/twisted-python/attachments/20070306/f2bde4cb/attachment.html>
More information about the Twisted-Python
mailing list