[Twisted-Python] Re: How to receive a big stream data?
David Bolen
db3l.net at gmail.com
Wed Jul 11 18:39:12 MDT 2007
"steven wang" <steven.zdwang at gmail.com> writes:
> But I want to receive binary data in my protocol.
Even if you start with a non-binary header, you can switch to
receiving binary information at any time by going using the raw mode
of most of the basic protocols. And having some sort of ASCII header
prior to the raw data is often a very simple way to handle things
(something in common with a tremendous number of standard TCP-based
protocols).
Your original post had a fairly straight-forward ASCII header that I
think would probably be fine. What you're probably missing is the
concept of switching to a raw binary receive mode which then switches
your protocol from getting data in its lineReceived method to having
rawDataReceived called.
For example, here's a slightly stripped pair of protocols (server and
client) that I'm currently using as part of a bigger project. Most of
the communication is over a PB connection which the client uses to
perform operations on the server, one of which is editing job
information. But jobs contain attached files (often very large
audio/video files), so committing changes to a job also involves
transmitting up any newly added files. So after the client updates
the server's meta data, it initiates a separate set of file transfers
across a different port.
In my case, the header for a file transfer includes a session key
(which the protocol uses to reference the original PB-based job
session the client was using) along with a file key used for storage
(which uniquely references a specific file in the job). The final
element is the total file size. That is, upon connecting, the client
transmits a line such as:
<session_uuid> <file_uuid> #######
where the two uuids are specific to the transfer underway (and help
with security since a random client isn't going to know the right
ids), and ######## is the overall file length. After sending that
line (e.g., right after its final newline), the client just blasts up
the raw data.
The protocol is a simple LineReceiver based protocol, that receives
that information information as an ASCII initial line, after which it
switches to raw mode to receive the data. Although the data length
could technically be inferred from when the client disconnects, having
it up front ensures I can detect a transfer that gets interrupted.
So on the server side you have:
- - - - - - - - - - - - - - - - - - - - - - - - -
class FileIOProtocol(LineReceiver):
def __init__(self):
self.info = None
self.outfile = None
self.remain = 0
self.crc = 0
def lineReceived(self, line):
logger.debug('FileIOProtocol:lineReceived:%s', line)
sess_key, file_key, self.size = line.split()
file_key = uuid.UUID(file_key)
try:
session_uuid = uuid.UUID(sess_key)
except:
logger.debug('FileIOProtocol:lineReceived Invalid session')
self.transport.loseConnection()
return
self.job_session = self.factory.sessions.get(session_uuid)
if not self.job_session:
logger.debug('FileIOProtocol:lineReceived Invalid session')
self.transport.loseConnection()
return
if not self.job_session.active:
logger.debug('FileIOProtocol:lineReceived Stale session')
self.transport.loseConnection()
return
# [db3l] The original code validates the individual file uuid here
# resulting in self.job_file as job file object from the session
if not self.job_file:
logger.debug('FileIOProtocol:lineReceived Invalid file key')
self.transport.loseConnection()
return
# Create the upload directory if not already present
if not os.path.isdir(self.job_session.upload_dir):
os.makedirs(self.job_session.upload_dir)
self.outfilename = os.path.join(self.job_session.upload_dir,
self.job_file['uuid'].hex)
logger.debug('FileIOProtocol:lineReceived Receiving into %s',
self.outfilename)
try:
self.outfile = open(self.outfilename,'wb')
except Exception, value:
logger.debug('FileIOProtocol:lineReceived Unable to open file %s '
'(%s)', self.outfilename, value)
self.transport.loseConnection()
return
self.remain = int(self.size)
logger.debug('FileIOProtocol:lineReceived Entering raw mode: %s %s',
self.outfile, self.remain)
self.setRawMode()
def rawDataReceived(self, data):
self.remain -= len(data)
self.crc = crc32(data, self.crc)
self.outfile.write(data)
def connectionMade(self):
LineReceiver.connectionMade(self)
logger.debug('FileIOProtocol:connectionMade')
def connectionLost(self, reason):
LineReceiver.connectionLost(self, reason)
logger.debug('FileIOProtocol:connectionLost')
if self.outfile:
self.outfile.close()
if self.remain != 0:
# Problem uploading - discard
logger.debug('FileIOProtocol:connectionLost remain(%d)!=0',
self.remain)
os.remove(self.outfilename)
else:
# Update job object with upload status
self.job_file['uploaded'] = datetime.utcnow()
self.job_file['size'] = self.size
self.job_file['crc'] = self.crc
class FileIOFactory(ServerFactory):
protocol = FileIOProtocol
def __init__(self, db, sessions, options):
self.db = db
self.options = options
self.sessions = sessions
- - - - - - - - - - - - - - - - - - - - - - - - -
which is bound to an appropriate port on the server however you'd like.
I use code like:
self.fileio = FileIOFactory(db, self.sessions, options)
reactor.listenTCP(self.options['file_port'], self.fileio)
On the client side, I have an equivalent protocol that transmits up
the file. It's run beneath a GUI, so keeps a reference to the GUI
controller object that might indicate it needs to cancel a transfer
mid-stream, as well as updating the controller during the transfer so
it can update a progress bar on screen.
It is also a LineReceiver based protocol, and uses the Twisted
FileSender object to do the raw data transfer (which is implemented as
a straight producer with the TCP socket being the consumer). The
connectionMade method is where it transmits the ASCII header and then
institutes the raw data transfer.
- - - - - - - - - - - - - - - - - - - - - - - - -
class TransferCancelled(Exception):
"""Exception for a user cancelling a transfer"""
pass
class FileIOClient(LineReceiver):
def __init__(self, path, sess_key, file_key, controller):
self.path = path
self.sess_key = sess_key
self.file_key = file_key
self.controller = controller
self.infile = open(self.path, 'rb')
self.insize = os.stat(self.path).st_size
self.result = None
self.completed = False
self.controller.file_sent = 0
self.controller.file_size = self.insize
def _monitor(self, data):
self.controller.file_sent += len(data)
self.controller.total_sent += len(data)
# Check with controller to see if we've been cancelled and abort
# if so.
if self.controller.cancel:
print 'FileIOClient._monitor Cancelling'
# Need to unregister the producer with the transport or it will
# wait for it to finish before breaking the connection
self.transport.unregisterProducer()
self.transport.loseConnection()
# Indicate a user cancelled result
self.result = TransferCancelled('User cancelled transfer')
return data
def cbTransferCompleted(self, lastsent):
self.completed = True
self.transport.loseConnection()
def connectionMade(self):
self.transport.write('%s %s %s\r\n' % (str(self.sess_key),
str(self.file_key),
self.insize))
sender = FileSender()
sender.CHUNK_SIZE = 2 ** 16
d = sender.beginFileTransfer(self.infile, self.transport,
self._monitor)
d.addCallback(self.cbTransferCompleted)
def connectionLost(self, reason):
LineReceiver.connectionLost(self, reason)
print 'FileIOClient:connectionLost'
self.infile.close()
if self.completed:
self.controller.completed.callback(self.result)
else:
self.controller.completed.errback(reason)
class FileIOClientFactory(ClientFactory):
protocol = FileIOClient
def __init__(self, path, sess_key, file_key, controller):
self.path = path
self.sess_key = sess_key
self.file_key = file_key
self.controller = controller
def clientConnectionFailed(self, connector, reason):
ClientFactory.clientConnectionFailed(self, connector, reason)
self.controller.completed.errback(reason)
def buildProtocol(self, addr):
print 'buildProtocol'
p = self.protocol(self.path, self.sess_key, self.file_key,
self.controller)
p.factory = self
return p
- - - - - - - - - - - - - - - - - - - - - - - - -
Within the presentation layer controller on the client, initiating a
transfer is done with:
def _transmitOne(self, address, port, path, sess_key, file_key):
self.completed = defer.Deferred()
f = FileIOClientFactory(path, sess_key, file_key, self)
reactor.connectTCP(address, port, f)
return self.completed
and the result is that self.completed fires (callback or errback) when
the transfer is done (which the controller uses to then initiate the
next transfer when there are a list of files to go up for a job).
While probably not exactly what you're trying to do, perhaps it'll
point you in the right direction.
-- David
More information about the Twisted-Python
mailing list