[Twisted-Python] mutexes in twisted.enterprise?
Andrew Bennetts
andrew-twisted at puzzling.org
Thu Apr 18 20:14:48 MDT 2002
On Fri, Apr 19, 2002 at 11:32:50AM +1000, Andrew Bennetts wrote:
> .acquire(). I'd recommend that the network reading thread pushes the
> data onto a Queue.Queue that is being popped by a DB thread. If this is
> all you are doing with the DB, then perhaps you don't even need
> twisted.enterprise, which is more designed for getting data than
> inserting it.
>
> I actually have a module somewhere that basically does exactly that, and
> returns Deferreds that trigger when individual data items are inserted.
> I might see if I can find it, and if it is not too specific that I can
> post it.
And here is that module.
It has a few issues, it assumes you're using mx.ODBC.Windows to talk to
MS SQL Server using a DSN rather than an ODBC link, and it has the
beginnings of some code to ensure that packets don't get lost if the DB
goes down, but isn't being used yet. It could do with general tidying
up and documentation. On the other hand, it seems to work for the
simple uses I've put it to.
Oh, I forgot to add this comment: call .insertPacket(None) to terminate
the thread when it is done. That can be handy combined with the
daemonic flag you can pass to __init__, because you can use it to ensure
that your program doesn't exit until the DB done.
If there's sufficient interest, I'll look into fixing it up and checking
it in...
-Andrew.
----
# Twisted imports
from twisted.python import threadable, defer, failure
from twisted.internet import task
# Standard library imports
from Queue import Queue
from cStringIO import StringIO
import threading, traceback, time
# Twiddle the mx.ODBC.Windows module to, well, work.
import mx.ODBC.Windows
from mx.ODBC.Windows import DataError, IntegrityError, ProgrammingError, \
InternalError
# Required by DBAPI 2.0 -- it should already be there, but isn't :(
mx.ODBC.Windows.threadsafety = 2
# For some reason, the default connect function doesn't work with our DSNs
mx.ODBC.Windows.connect = mx.ODBC.Windows.DriverConnect
threadable.init(1)
class PacketInserter:
"""Class for inserting data from network packets into a DB
It creates a seperate thread for processing SQL statements.
"""
dsn = ("driver={SQL Server};provider=sqloledb;server=db02;"
"uid=sa;pwd=*****;database=Test;app=Twisted;")
def __init__(self, daemonic=1):
self.queue = Queue()
self.recoveryQueue = Queue()
self.dbThread = threading.Thread(target=self._processQueue,
name='DB-Thread-')
self.dbThread.setDaemon(daemonic)
self.successCount = self.errorCount = 0
self.active = 0
self._db = None
self.dbThread.start()
def insertPacket(self, packet):
"""Insert a packet into the db.
packet should be an instance of something with a .toSQL method.
It returns a defer.Deferred that is called with the packet that was
passed to this method. The Deferred will be called after the packet
has been inserted (or it will have its errback called).
"""
d = defer.Deferred()
self.queue.put((packet, d))
return d
def _getNext(self):
if self.queue.empty():
self.active = 0
packet, deferred = self.queue.get()
self.active = 1
return packet, deferred
def _getCursor(self):
if self._db is not None:
try:
self._db.close()
except:
pass
self._db = None
cursor = None
try:
self._db = mx.ODBC.Windows.connect(self.dsn)
cursor = self._db.cursor()
cursor.execute('set implicit_transactions off;')
print 'Connected to DB'
#self._doRecovery() FIXME
except Exception, e:
print 'Unable to create DB connection:', e
return cursor
def _doRecovery(self):
print 'Retrying missed packets...'
while not self.recoveryQueue.empty():
# FIXME
pass
def _processQueue(self):
cursor = self._getCursor()
packet, deferred = self._getNext()
start = time.time()
while 1:
if packet is None:
print 'End of queue marker found, DB-Thread terminating'
break
if cursor is None:
time.sleep(1)
cursor = self._getCursor()
continue
try:
cursor.execute(packet.toSQL())
except (ProgrammingError, DataError, IntegrityError,
InternalError), e:
# InternalErrors include
# "COMMIT TRAN has no corresponding BEGIN TRAN" errors, which
# should be a programming error...
print 'Error inserting SQL: "%s"' % (packet.toSQL(),)
traceback.print_exc()
print 'Discarding statement'
self.errorCount += 1
task.schedule(deferred.errback, Failure(e))
packet, deferred = self._getNext()
except Exception, e:
print 'Error inserting SQL: "%s"' % (packet.toSQL(),)
traceback.print_exc()
print 'Queuing statement'
# FIXME self.recoveryQueue.put(packet)
print 'Reconnecting to DB'
task.schedule(deferred.errback, Failure(e))
cursor = self._getCursor()
else:
# If the last statement executed ok, then get the next one
self.successCount += 1
task.schedule(deferred.callback, packet)
packet, deferred = self._getNext()
end = time.time()
print 'DB-Thread processed %d packets (and rejected %d) in %0.3f secs' \
% (self.successCount, self.errorCount, (end - start),)
print '%0.3f packets/sec' % ((end - start)/self.successCount,)
----
More information about the Twisted-Python
mailing list