Wednesday, September 24, 2008

Pooling connections

In my previous post, I mentioned that I got into a little trouble while implementing transactions:

AttributeError: 'pysqlite2.dbapi2.Cursor' object 
has no attribute 'commit



I asked for advice, and I got it :-) A poster suggested to use a connection pool, since cursors themselves do not control a transaction (thanks to arachnid btw!!!). I took that advice and ran with it -- at least a few steps. I am going to describe now what has happened since then; maybe someone can tell me more about in-memory SQLite?

The way I remember it from the good old Java days, a connection pool has (from my datastore's perspective) two relevant functions:


  • a checkout-method that gives me an unused connection from the pool

  • a release-method that puts the connection back into the pool when I'm done with it.



I decided to rewrite the my datastore's constructor accordingly:


  def __init__(self, get_connection, release_connection):
"""Constructor.

Args:
get_connection: a parameterless function that provides a new
connection from the pool
release_connection: a function that accepts a connection and puts
it back into the pool
"""
self._get_connection = get_connection
self._release_connection = release_connection
self.__next_tx_handle = 1
self.__open_transactions = {}
self.__tx_handle_lock = threading.Lock()



Using these new methods in our transaction implementations was easy:

  def _connect(self, transaction=None, may_update=False, delete=False):
"""Opens a new or returns an opened connection.

Args:
transaction: an optional transaction protocol buffer. If not set,
a new, temporary object is returned. If set to a transaction without
a handle, a new object will be created. If set to a transaction, the
appropriate element. If the handle has no handle, that means a new
cursor will be created and the handle be set.
may_update: if set to True (default is False), the newly created
cursor object will be remembered as a new transaction and its handle
updated in the transaction object
delete: if set to true (default is False), delete the cursor from
the internal data structure if it exists

Returns:
a cursor
"""
if not transaction:
return self._get_connection()
self.__tx_handle_lock.acquire()
try:
if transaction.has_handle():
handle = transaction.handle()
if handle in self.__open_transactions:
if delete:
return self.__open_transactions.pop(handle)
else:
return self.__open_transactions.get(handle)
else:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.BAD_REQUEST,
'Transaction handle %d not found' % handle)
else:
if not may_update:
return self._get_connection()
handle = self.__next_tx_handle
self.__next_tx_handle += 1
transaction.set_handle(handle)
cursor = self._get_connection()
self.__open_transactions[handle] = cursor
finally:
self.__tx_handle_lock.release()

def _Dynamic_BeginTransaction(self, request, transaction):
self._connect(transaction, True, False)


def _Dynamic_Commit(self, transaction, transaction_response):
connection = self._connect(transaction, False, True)
connection.commit()
self._release_connection(connection)


def _Dynamic_Rollback(self, transaction, transaction_response):
connection = self._connect(transaction, False, True)
connection.rollback()
self._release_connection(connection)



I also updated the get and put implementation accordingly (see snippet below):

  def _Dynamic_Put(self, put_request, put_response):

# Fetch a cursor from the connection
connection = self._connect(put_request.transaction(), False, False)
cursor = connection.cursor()

# Iterate theough the instances
....

# Do a database commit
if not put_request.has_transaction():
connection.commit()
self._release_connection(connection)

# Populate the response
put_response.key_list().extend([c.key() for c in clones])



With these modifications, the unit tests finally passed. That's the good news. Unfortunately, this also leaves a couple of questions unanswered. First of all, I am still using a single in-memory connection for my unit tests:

def setup_sqlite(name=None):
"""Sets up an in-memory sqlite instance and connects the datastore to it.

Args:
name: the name of the instance to connect to,
None or empty string for in-memory

Returns:
a sqlite connection object pointing to the database.
"""
if name:
raise 'Not implemented yet'
else:
connection = sqlite.connect(':memory:')
name = 'memory'
get_connection = lambda: connection
release_connection = lambda x: None
stub = DatastoreSqliteStub(get_connection, release_connection)
apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', stub)
return connection


This is fine for the tests I have so far, but it prevents me from really writing anything that check the store's behavior if data is concurrently modified. Does anyone have an idea how one can create more than one connection into the same in-memory database?

Second, I kindof deferred the choice as to what connection pool to use. This is not a big deal for now, but I would like to figure it out as the implementation matures. Any suggestions? Feedback is as always appreciated.

1 comments:

Arachnid said...

I think you'll find that if you want multiple connections to the same database, you'll need to create an on-disk DB, in a temporary directory. However, since I expect most of your tests won't have more than one active transaction at a time, why not create a dummy connection pool that has only one connection, and raises an exception if you try to check out more than one at a time?

I'm not familiar with any connection pool APIs I can recommend, unfortunately. If you can't find one, writing your own ought to be easy enough.

Also, I hope you're making it easy to choose any other DB-API compliant database in place of SQLite. :)