[Twisted-Python] Re: How to receive a big stream data?
steven wang
steven.zdwang at gmail.com
Thu Jul 19 22:50:19 MDT 2007
Thank you very much! :)
On 7/12/07, David Bolen <db3l.net at gmail.com> wrote:
>
> "steven wang" <steven.zdwang at gmail.com> writes:
>
> > But I want to receive binary data in my protocol.
>
> Even if you start with a non-binary header, you can switch to
> receiving binary information at any time by going using the raw mode
> of most of the basic protocols. And having some sort of ASCII header
> prior to the raw data is often a very simple way to handle things
> (something in common with a tremendous number of standard TCP-based
> protocols).
>
> Your original post had a fairly straight-forward ASCII header that I
> think would probably be fine. What you're probably missing is the
> concept of switching to a raw binary receive mode which then switches
> your protocol from getting data in its lineReceived method to having
> rawDataReceived called.
>
> For example, here's a slightly stripped pair of protocols (server and
> client) that I'm currently using as part of a bigger project. Most of
> the communication is over a PB connection which the client uses to
> perform operations on the server, one of which is editing job
> information. But jobs contain attached files (often very large
> audio/video files), so committing changes to a job also involves
> transmitting up any newly added files. So after the client updates
> the server's meta data, it initiates a separate set of file transfers
> across a different port.
>
> In my case, the header for a file transfer includes a session key
> (which the protocol uses to reference the original PB-based job
> session the client was using) along with a file key used for storage
> (which uniquely references a specific file in the job). The final
> element is the total file size. That is, upon connecting, the client
> transmits a line such as:
>
> <session_uuid> <file_uuid> #######
>
> where the two uuids are specific to the transfer underway (and help
> with security since a random client isn't going to know the right
> ids), and ######## is the overall file length. After sending that
> line (e.g., right after its final newline), the client just blasts up
> the raw data.
>
> The protocol is a simple LineReceiver based protocol, that receives
> that information information as an ASCII initial line, after which it
> switches to raw mode to receive the data. Although the data length
> could technically be inferred from when the client disconnects, having
> it up front ensures I can detect a transfer that gets interrupted.
>
> So on the server side you have:
>
> - - - - - - - - - - - - - - - - - - - - - - - - -
>
> class FileIOProtocol(LineReceiver):
>
> def __init__(self):
> self.info = None
> self.outfile = None
> self.remain = 0
> self.crc = 0
>
> def lineReceived(self, line):
> logger.debug('FileIOProtocol:lineReceived:%s', line)
> sess_key, file_key, self.size = line.split()
> file_key = uuid.UUID(file_key)
>
> try:
> session_uuid = uuid.UUID(sess_key)
> except:
> logger.debug('FileIOProtocol:lineReceived Invalid session')
> self.transport.loseConnection()
> return
>
> self.job_session = self.factory.sessions.get(session_uuid)
> if not self.job_session:
> logger.debug('FileIOProtocol:lineReceived Invalid session')
> self.transport.loseConnection()
> return
>
> if not self.job_session.active:
> logger.debug('FileIOProtocol:lineReceived Stale session')
> self.transport.loseConnection()
> return
>
> # [db3l] The original code validates the individual file uuid here
> # resulting in self.job_file as job file object from the session
>
> if not self.job_file:
> logger.debug('FileIOProtocol:lineReceived Invalid file key')
> self.transport.loseConnection()
> return
>
> # Create the upload directory if not already present
> if not os.path.isdir(self.job_session.upload_dir):
> os.makedirs(self.job_session.upload_dir)
>
> self.outfilename = os.path.join(self.job_session.upload_dir,
> self.job_file['uuid'].hex)
>
> logger.debug('FileIOProtocol:lineReceived Receiving into %s',
> self.outfilename)
> try:
> self.outfile = open(self.outfilename,'wb')
> except Exception, value:
> logger.debug('FileIOProtocol:lineReceived Unable to open file
> %s '
> '(%s)', self.outfilename, value)
> self.transport.loseConnection()
> return
>
> self.remain = int(self.size)
> logger.debug('FileIOProtocol:lineReceived Entering raw mode: %s
> %s',
> self.outfile, self.remain)
> self.setRawMode()
>
> def rawDataReceived(self, data):
> self.remain -= len(data)
> self.crc = crc32(data, self.crc)
> self.outfile.write(data)
>
> def connectionMade(self):
> LineReceiver.connectionMade(self)
> logger.debug('FileIOProtocol:connectionMade')
>
> def connectionLost(self, reason):
> LineReceiver.connectionLost(self, reason)
> logger.debug('FileIOProtocol:connectionLost')
> if self.outfile:
> self.outfile.close()
>
> if self.remain != 0:
> # Problem uploading - discard
> logger.debug('FileIOProtocol:connectionLost
> remain(%d)!=0',
> self.remain)
>
> os.remove(self.outfilename)
> else:
> # Update job object with upload status
> self.job_file['uploaded'] = datetime.utcnow()
> self.job_file['size'] = self.size
> self.job_file['crc'] = self.crc
>
>
> class FileIOFactory(ServerFactory):
> protocol = FileIOProtocol
>
> def __init__(self, db, sessions, options):
> self.db = db
> self.options = options
> self.sessions = sessions
>
> - - - - - - - - - - - - - - - - - - - - - - - - -
>
> which is bound to an appropriate port on the server however you'd like.
> I use code like:
>
> self.fileio = FileIOFactory(db, self.sessions, options)
> reactor.listenTCP(self.options['file_port'], self.fileio)
>
>
> On the client side, I have an equivalent protocol that transmits up
> the file. It's run beneath a GUI, so keeps a reference to the GUI
> controller object that might indicate it needs to cancel a transfer
> mid-stream, as well as updating the controller during the transfer so
> it can update a progress bar on screen.
>
> It is also a LineReceiver based protocol, and uses the Twisted
> FileSender object to do the raw data transfer (which is implemented as
> a straight producer with the TCP socket being the consumer). The
> connectionMade method is where it transmits the ASCII header and then
> institutes the raw data transfer.
>
> - - - - - - - - - - - - - - - - - - - - - - - - -
>
>
> class TransferCancelled(Exception):
> """Exception for a user cancelling a transfer"""
> pass
>
> class FileIOClient(LineReceiver):
>
> def __init__(self, path, sess_key, file_key, controller):
> self.path = path
> self.sess_key = sess_key
> self.file_key = file_key
> self.controller = controller
>
> self.infile = open(self.path, 'rb')
> self.insize = os.stat(self.path).st_size
>
> self.result = None
> self.completed = False
>
> self.controller.file_sent = 0
> self.controller.file_size = self.insize
>
> def _monitor(self, data):
> self.controller.file_sent += len(data)
> self.controller.total_sent += len(data)
>
> # Check with controller to see if we've been cancelled and abort
> # if so.
> if self.controller.cancel:
> print 'FileIOClient._monitor Cancelling'
> # Need to unregister the producer with the transport or it
> will
> # wait for it to finish before breaking the connection
> self.transport.unregisterProducer()
> self.transport.loseConnection()
> # Indicate a user cancelled result
> self.result = TransferCancelled('User cancelled transfer')
>
> return data
>
> def cbTransferCompleted(self, lastsent):
> self.completed = True
> self.transport.loseConnection()
>
> def connectionMade(self):
> self.transport.write('%s %s %s\r\n' % (str(self.sess_key),
> str(self.file_key),
> self.insize))
> sender = FileSender()
> sender.CHUNK_SIZE = 2 ** 16
> d = sender.beginFileTransfer(self.infile, self.transport,
> self._monitor)
> d.addCallback(self.cbTransferCompleted)
>
> def connectionLost(self, reason):
> LineReceiver.connectionLost(self, reason)
> print 'FileIOClient:connectionLost'
> self.infile.close()
> if self.completed:
> self.controller.completed.callback(self.result)
> else:
> self.controller.completed.errback(reason)
>
> class FileIOClientFactory(ClientFactory):
>
> protocol = FileIOClient
>
> def __init__(self, path, sess_key, file_key, controller):
> self.path = path
> self.sess_key = sess_key
> self.file_key = file_key
> self.controller = controller
>
> def clientConnectionFailed(self, connector, reason):
> ClientFactory.clientConnectionFailed(self, connector, reason)
> self.controller.completed.errback(reason)
>
> def buildProtocol(self, addr):
> print 'buildProtocol'
> p = self.protocol(self.path, self.sess_key, self.file_key,
> self.controller)
> p.factory = self
> return p
>
> - - - - - - - - - - - - - - - - - - - - - - - - -
>
>
> Within the presentation layer controller on the client, initiating a
> transfer is done with:
>
> def _transmitOne(self, address, port, path, sess_key, file_key):
> self.completed = defer.Deferred()
> f = FileIOClientFactory(path, sess_key, file_key, self)
> reactor.connectTCP(address, port, f)
> return self.completed
>
> and the result is that self.completed fires (callback or errback) when
> the transfer is done (which the controller uses to then initiate the
> next transfer when there are a list of files to go up for a job).
>
> While probably not exactly what you're trying to do, perhaps it'll
> point you in the right direction.
>
> -- David
>
>
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: </pipermail/twisted-python/attachments/20070720/37570bc1/attachment.html>
More information about the Twisted-Python
mailing list