[Twisted-Python] XMLRPC server help neede
Brett Viren
bv at bnl.gov
Wed Apr 27 09:19:58 MDT 2005
Roland Hedberg <roland.hedberg at adm.umu.se> writes:
> If the packet is well formed and the server knows what to do with it,
> it should reply to the client and then perform the action.
>
> My problem is how I would go about doing this. Conceptually I could
> imaging having a workqueue where I would place the message and then
> from the point of view of the client-sever communication just forget
> about it.
>
> Anyone who has done anything similar or has an idea on how to do this ?
I do this by subclassing twisted.web.xmlrpc.XMLRPC and handling the
query in a work queue (appended below) that runs in its own thread.
In my case the client doesn't care if it sent me well formed data or
not, so immediately after putting the query in the work queue I return
from the xmlrpc method and the client is free. In my case, I do some
sanity checking of the query inside the queue. In your case you'd do
the checking before stuffing the queue so you could inform the client.
Here is a sketch of the server chopped out from my code:
from twisted.web import xmlrpc
class DataSource(xmlrpc.XMLRPC):
"The XML-RPC listener"
def __init__(self,services,sem):
xmlrpc.XMLRPC.__init__(self)
xmlrpc.addIntrospection(self)
self.data_cq = CommandQueue()
self.sem = sem
self.services = services
def _handle_sem_release(self,x):
#print "DataSource._handle_sem_release(%s)"%str(x)
self.sem.release()
from twisted.python.failure import Failure
if x.__class__ == Failure:
if x.value[0] == "DEBUG": return x
log.error(x)
return x
def xmlrpc_method(self,idstr,values):
"Accept callbacks from Export API"
d = self.sem.acquire()
d.addCallback(lambda x: self.data_cq(self.services.method,idstr,values))
d.AddBoth(self._handle_sem_release)
return 0
All the real work is done in the "services.method" method. You'll
note that I use a Semaphore class (appended below). This is keep
other operations not shown here from being executed in the middle of
handling the query.
BTW, the Semaphore and CommandQueue classes were developed with much
help from this list. Thanks again!
-Brett.
# ------------------------------------------------
from twisted.internet import defer
from Queue import Queue, Empty
from twisted.python import failure
class Semaphore(object):
"""Asynchronous semaphore stolen from:
http://twistedmatrix.com/pipermail/twisted-python/2003-August/005323.html
"""
def __init__(self, value=1, verbose=None):
self.queue = []
self.value = value
def acquire(self):
d = defer.Deferred()
if self.value:
self.value -= 1
d.callback(False)
else:
self.queue.append(d)
return d
def release(self):
if self.queue:
self.queue.pop(0).callback(True)
else:
self.value += 1
class CommandQueue:
'''Queue up commands for serial calling. One must call the
drain() method to start reading the internal queue. Most likely
one wants to call this in a thread.'''
all_queues = []
def __init__(self):
"Create a CommandQueue"
self.queue = Queue()
self.stop = False
CommandQueue.all_queues.append(self)
from twisted.internet import reactor
reactor.callInThread(self.drain)
return
def __call__(self,meth,*a,**k):
'''Call meth(*a,**k) when it reaches end of queue. Returns a
Deferred that will pass the return of meth.'''
deferred = defer.Deferred()
deferred.addErrback(self._error)
self.queue.put((deferred,meth,a,k))
return deferred
def _error(self,a):
try:
a.printTraceback(sys.stderr)
except:
print str(a)
return a
def drain(self):
'Drain the command queue until CommandQueue.stop is True'
while not self.stop:
try:
d,meth,a,k = self.queue.get(True,1)
except Empty:
continue
#print "calling %s(%s,%s)"%(meth.__name__,str(a),str(k))
try:
res = meth(*a,**k)
except Exception,err:
res = failure.Failure(sys.exc_value)
reactor.callFromThread(d.callback,res)
#d.callback(meth(*a,**k))
#print "callback done"
#print "drain closing"
return 0
More information about the Twisted-Python
mailing list