[Twisted-Python] How to debug an AMP connection?

Peter Westlake peter.westlake at pobox.com
Fri Jul 24 09:12:28 MDT 2009



On Fri, 24 Jul 2009 10:44 -0400, "Drew Smathers"
<drew.smathers at gmail.com> wrote:
> On Fri, Jul 24, 2009 at 9:35 AM, Peter Westlake<peter.westlake at pobox.com>
> wrote:
> > Hello,
> >
> > I'm having trouble with an AMP connection that doesn't fire
> > the Deferred returned by callRemote. The AMP command copies
> > files from the client (called a "worker" in the code below)
> > to a server ("controller"). It sends a chunk of text at a
> > time - 16K originally, but I tried smaller amounts too.
> > The server appends the text to a queue to be written;
> > it shouldn't take long, because it doesn't do the I/O
> > synchronously, just starts a thread.
> >
> 
> "justs starts a thread"? Where are you starting a thread in the code
> example you've posted?

> Please post a complete example; I don't see methods on client code
> below such as send_next() or next_or_retry().  It's hard to help
> without a complete example.

Sorry - I was trying to avoid clogging everyone's inboxes with too
much code. So far I haven't been able to reproduce the bug in a
simple way. In the meantime, here's the code that starts the thread.
It's one of the very first bits of code I wrote with either Twisted
or Python, so it may do things in quite the wrong way. It serialises
writes to a file by storing requests in a queue, and it gets called
by the line

       ac.instances[number].logfile[stream].write(data)

in the command handler.


from twisted.internet import defer, threads
import os, errno

class FileMan(object):
    def __init__(self, filename):
        self.filename = filename
        self.queue = []
        self.started = False

    def check_file(self):
        self.started = os.path.exists(self.filename)
        return self.started

    def makedir(self):
        dirname = os.path.dirname(self.filename)
        if not os.path.isdir(dirname):
            os.makedirs(dirname)

    def add(self, op, *a, **kw):
        def getnext(result):
            del(self.queue[0])
            if len(self.queue) > 0:
                self.queue[0].callback('unused')
            return result

        d = defer.Deferred()
        d.addCallback(op, *a, **kw)
        d.addBoth(getnext)
        self.queue.append(d)
        if len(self.queue) == 1:
            d.callback('unused')
        return d

    def write(self, data, mode='a'):
        def do_write(result, filename, data):
            def write_in_thread(filename, data):
                f = open(filename, mode)
                try:
                    f.write(data)
                finally:
                    f.close()
                self.started = True
            d = threads.deferToThread(write_in_thread, filename, data)
            return d
        return self.add(do_write, self.filename, data)


>  In general, though, if you're trying to
> debug Deferreds you might try setting debugging with
> defer.setDebugging(1).

What kind of output does that produce? I did try it,
but didn't see anything out of the ordinary.
 
Peter.


> > Client-side code:
> >
> > class ConnectionBuffer(object):
> >    """Class to store AMP commands and retry them after network outages.
> >    ...
> >    """
> >
> >    def send(self, adv_arg, *a, **kw):
> >        if self.protocol:
> >            # call_d is here purely to make the Deferred accessible when
> >            debugging.
> >            self.call_d  = self.protocol.callRemote(*a,
> >            **kw).addBoth(self.next_or_retry, adv_arg)
> >
> >    def connected(self, protocol):
> >        self.protocol = protocol
> >        self.send_next()
> >
> >    def disconnected(self):
> >        self.protocol = None
> >
> >
> > Server:
> >
> > class ControllerProtocol(amp.AMP):
> >    """Protocol used by the controller to communicate with a worker.
> >    ...
> >    """
> >
> >    def logoutput(self, product, branch, site, job, action, number,
> >    data, stream):
> >        """Receive stdout or stderr from an action running on a
> >        worker."""
> >        actid = (product, branch, site, job, action)
> >        ac = self.factory.controller.actions.find(actid)
> >        ac.instances[number].logfile[stream].write(data)
> >        return {'status': True}
> >    commands.LogOutput.responder(logoutput)
> >
> > _______________________________________________
> > Twisted-Python mailing list
> > Twisted-Python at twistedmatrix.com
> > http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python
> >
> 
> _______________________________________________
> Twisted-Python mailing list
> Twisted-Python at twistedmatrix.com
> http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python




More information about the Twisted-Python mailing list