Sunday, September 28, 2008

GQL queries in SQL, part 2

I'm having a clean-up-your-code day today :-)
As I was thinking a bit more about how I was going to do the queries, how to translate the query protocol buffer into an SQL statement, I realized two things that I should really clean up:


  • The methods entityToDict and dictToEntity contained a lot of logic that translates protocol buffers into sql rows and vice versa, but they were relatively specialized for the SQLite case. Since the overall goal is to make the classes work with other RDBMS as well, that is certainly not ideal.

  • The same methods might prove useful for my work on queries, but they do not hand out all the information I need. For example, entityToDict gives me a nice flat list of column keys and values, but it does not tell me which key corresponds to which original property.



As a result of this realization, I decided to wrap the original method into a new class called PRMHelper. I also made entityToDict a bit more customizable (but in a downwards compatible way):

class PRMHelper(object):
"""ProtocolBuffer-Relational-Mapping tool.
Encapsulates helpers that deal with the mapping between
protocol buffer and a relational database. Currently, this
means SQLite, but it could be generalized in the future.
"""

def entityToDict(self, pb,
populate_dict=None,
get_list=lambda x:x.property_list(),
unwrap_properties=lambda x:[x]):
"""Converts an entity protocol buffer to a dictionary.
This method will only deal with the properties of the entity,
not its primary key.

Args:
pb: the entity protocol buffer
populate_dict: a method that takes a dictionary,
a property object(or whatever else get_list returns to
iterate on), a column key and a value as parameters
and decides how to modify the dictionary. If not given, the
default behavior is to ignore the property and put the
value into the map, using the column key as key.
Implementation detail: if a property value maps to more
than one row (like a geo-coordinate that could be split
up in latitude and longitude), the method might be called
more than once with the same property object
get_list: a way to extract a list of properties from pb
(or anything else iterable that returns something that
the get_property method can turn into a property;
default behavior is to call "property_list")
unwrap_properties: takes an element from the get_list
result and turns it into list of property pbs.
Default is lambda x:[x].

Returns:
a dictionary with appropriate kev/value pairs that can
be stored in a SQLite database.
"""
result = {}
#TODO: what about raw properties?
for item in get_list(pb):
for property in unwrap_properties(item):
#TODO: what about multiple properties?
assert not (property.has_multiple() and property.multiple())
if not property.has_value():
continue
property_name = property.name()
value_pb = property.value()
col_key = None
property_value = None
if value_pb.has_int64value():
col_key = 'int64_' + property_name
property_value = value_pb.int64value()
elif value_pb.has_stringvalue():
col_key = 'string_' + property_name
property_value = value_pb.stringvalue()
else:
raise 'Not supported yet: %s' % value
if not populate_dict:
result[col_key] = property_value
else:
populate_dict(result, item, col_key, property_value)
return result

def dictToEntity(self, values, pb):
"""Transfers values from a dictionary into a protocol buffer
This method will only deal with the properties of the entity,
not its primary key.

Args:
values: a dctionary of values
pb: the entity protocol buffer
"""
for key, value in values.items():

# Split up the key (like int64_name) into segments
i = key.find('_')
if i < 1:
continue
p1 = key[0:i]
p2 = key[i+1:]

# Case: type integer
if p1 == 'int64':
prop = pb.add_property()
prop.set_name(p2)
prop.set_multiple(False)
prop.mutable_value().set_int64value(long(value))

# Case: type string
elif p1 == 'string':
prop = pb.add_property()
prop.set_name(p2)
prop.set_multiple(False)
prop.mutable_value().set_stringvalue(value)



In the constructor of the datastore, a PRMHelper object is
passed as a parameter and stored in a private field called
self.prm. After a few modifications to previously written
methods, I was
able to make sure that all unit tests still passed.

So, how am I using the modifications that were introduced?
Take a look at the following snippet, an incomplete first pass
at _Dynamic_RunQuery:

  _Operator_NAMES = {
1: "<",
2: "<=",
3: ">",
4: ">=",
5: "=",
#6: "IN",
#7: "EXISTS",
}

def _Dynamic_RunQuery(self, query, query_result):

# Turn the filter-objects into a set of conditions
def build_query_conditions(dictionary, filter, col_key, value):
operator = filter.op()
names = DatastoreSqliteStub._Operator_NAMES
if operator in names:
key = '%s %s' % (col_key, names[operator])
assert not key in dictionary,\
'Multiple conditions not supported yet: %s' % key
dictionary[key] = value
elif operator == 7:
key = '%s NOT NULL' % col_key
assert not key in dictionary,\
'Multiple conditions not supported yet: %s' % key
dictionary[key] = None
else:
assert False, 'Unsupported operator: %s' % operator
conditions = self.prm.entityToDict(
pb=query,
populate_dict=build_query_conditions,
get_list=lambda x:x.filter_list(),
unwrap_properties=lambda x:x.property_list())

# Concatenate the conditions in a statement
query = 'SELECT * FROM %s' % query.kind()
params = {}
count = 0
for key, value in conditions.items():
if count == 0:
query = '%s WHERE %s' % (query, key)
else:
query = '%s AND %s' % (query, key)
if value != None:
params[str(count)] = value
query = '%s :%s' % (query, count)
count += 1

#TODO: consider sort order
#TODO: execute query


By slightly tweaking the way property objects are extracted from the protocol buffer (using small lambda functions), I can use the pre-existing logic in entityToDict to execute a helper-method called build_query_conditions. This method populates a dictionary with conditional snippets (like column_name < 'foo'). I can then concatenate these snippets to build an SQL query.

How does the resulting query look like? Consider our unit test from the last post:

  def testSimpleQuery(self):
helpers.create_tables([TestModel()], self.connection)
model = TestModel(text='t1', number=13)
model.put()
TestModel.gql('WHERE text=:1 and number=:2 order by text desc',
't1', 13).fetch(5)


With our current code, this will result in the following query string:

SELECT * FROM TestModel WHERE string_text = :0 AND int64_number = :1


What's next?


Thanks to the reuse of our existing logic in entityToDict, we are one step closer to being able to execute arbitray GQL queries on the database (well, assumed we restrict ourselves to the few types we know so far). Even better, since all logic outside the PRMHelper is agnostic with regards to the specific SQL dialect, new database types should (almost) be a matter of just writing a new PRMHelper.

So, how close are we to getting the queries done? I would say, probably about 40 per cent. There is still the small matter of actually executing against the datastore. Furthermore, we need to return results to the user -- possibly within multiple fetch operations. There is definitely still a lot to be done...

Saturday, September 27, 2008

GQL queries in SQL, part 1

Today, I want to start looking a little bit at queries. This is one of the more complicated subjects, so it will probably take a couple of posts to get it right. Here is the first one...

In this post, I will focus on getting a better understanding on how queries are expressed internally. Queries are one of the most difficult things to get right in this whole coding project -- not because they are evil black voodoo magic, but because what we use most of the time (GQL) and what the lower level APIs actually digest and return are so far apart. When we enter a GQL query on a model class, a parser translates this query into an object structure. This structure then gets pushed down a couple of layers on the stack and ends up looking something like the following (sources were heavily cut down to only show content relevant for this post!):

class Query_Filter(ProtocolBuffer.ProtocolMessage):

_Operator_NAMES = {
1: "LESS_THAN",
2: "LESS_THAN_OR_EQUAL",
3: "GREATER_THAN",
4: "GREATER_THAN_OR_EQUAL",
5: "EQUAL",
6: "IN",
7: "EXISTS",
}

def __init__(self, contents=None):
self.op_ = 0
self.property_ = []


class Query_Order(ProtocolBuffer.ProtocolMessage):

_Direction_NAMES = {
1: "ASCENDING",
2: "DESCENDING",
}

def __init__(self, contents=None):
self.property_ = ""
self.direction_ = 1


class Query(ProtocolBuffer.ProtocolMessage):

_Plan_NAMES = {
1: "ORDER_FIRST",
2: "ANCESTOR_FIRST",
3: "FILTER_FIRST",
}

def __init__(self, contents=None):
self.app_ = ""
self.kind_ = ""
self.ancestor_ = None
self.filter_ = []
self.search_query_ = ""
self.order_ = []
self.hint_ = 0
self.offset_ = 0
self.limit_ = 0
self.composite_index_ = []
self.require_perfect_plan_ = 0



So, how does a generated query object tree exactly look like? Well, let's create a simple unit test and set a break point in our datastore's _Dynamic_RunQuery:

  def testSimpleQuery(self):
helpers.create_tables([TestModel()], self.connection)
model = TestModel(text='t1', number=13)
model.put()
TestModel.gql('WHERE text=:1 and number=:2 order by text desc',
't1', 13).fetch(5)



As we look into the variables in eclipse, we get confirmation that the incoming value really is a Query instance. Here his a rough approximation of the object structure:

kind:TestModel
limit:5
offset:0

order: list with one element:
Query_Order:
property: "text"
direction: 2

filter: list with 2 elements:
Query_Filter:
op: 5
property <
name: "text"
value <
stringValue: "t1"
>
multiple: false
>
Query_Filter:
op: 5
property <
name: "number"
value <
int64Value: 13
>
multiple: false
>



In summary:

  • Search conditions (the "WHERE" clause) are expressed as a chain of "filter" conditions.

  • Sorting conditions ("ORDER BY") are expressed as a chain of "order" conditions.

  • Table name (model kind), maximum amount of search results and initial offset are fields in the top-level query object.



There are probably a few details I missed, but that will hopefully be discovered by unit tests eventually. The next step is going to be to learn more about how query results actually look like. Stay tuned...

Wednesday, September 24, 2008

Pooling connections

In my previous post, I mentioned that I got into a little trouble while implementing transactions:

AttributeError: 'pysqlite2.dbapi2.Cursor' object 
has no attribute 'commit



I asked for advice, and I got it :-) A poster suggested to use a connection pool, since cursors themselves do not control a transaction (thanks to arachnid btw!!!). I took that advice and ran with it -- at least a few steps. I am going to describe now what has happened since then; maybe someone can tell me more about in-memory SQLite?

The way I remember it from the good old Java days, a connection pool has (from my datastore's perspective) two relevant functions:


  • a checkout-method that gives me an unused connection from the pool

  • a release-method that puts the connection back into the pool when I'm done with it.



I decided to rewrite the my datastore's constructor accordingly:


  def __init__(self, get_connection, release_connection):
"""Constructor.

Args:
get_connection: a parameterless function that provides a new
connection from the pool
release_connection: a function that accepts a connection and puts
it back into the pool
"""
self._get_connection = get_connection
self._release_connection = release_connection
self.__next_tx_handle = 1
self.__open_transactions = {}
self.__tx_handle_lock = threading.Lock()



Using these new methods in our transaction implementations was easy:

  def _connect(self, transaction=None, may_update=False, delete=False):
"""Opens a new or returns an opened connection.

Args:
transaction: an optional transaction protocol buffer. If not set,
a new, temporary object is returned. If set to a transaction without
a handle, a new object will be created. If set to a transaction, the
appropriate element. If the handle has no handle, that means a new
cursor will be created and the handle be set.
may_update: if set to True (default is False), the newly created
cursor object will be remembered as a new transaction and its handle
updated in the transaction object
delete: if set to true (default is False), delete the cursor from
the internal data structure if it exists

Returns:
a cursor
"""
if not transaction:
return self._get_connection()
self.__tx_handle_lock.acquire()
try:
if transaction.has_handle():
handle = transaction.handle()
if handle in self.__open_transactions:
if delete:
return self.__open_transactions.pop(handle)
else:
return self.__open_transactions.get(handle)
else:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.BAD_REQUEST,
'Transaction handle %d not found' % handle)
else:
if not may_update:
return self._get_connection()
handle = self.__next_tx_handle
self.__next_tx_handle += 1
transaction.set_handle(handle)
cursor = self._get_connection()
self.__open_transactions[handle] = cursor
finally:
self.__tx_handle_lock.release()

def _Dynamic_BeginTransaction(self, request, transaction):
self._connect(transaction, True, False)


def _Dynamic_Commit(self, transaction, transaction_response):
connection = self._connect(transaction, False, True)
connection.commit()
self._release_connection(connection)


def _Dynamic_Rollback(self, transaction, transaction_response):
connection = self._connect(transaction, False, True)
connection.rollback()
self._release_connection(connection)



I also updated the get and put implementation accordingly (see snippet below):

  def _Dynamic_Put(self, put_request, put_response):

# Fetch a cursor from the connection
connection = self._connect(put_request.transaction(), False, False)
cursor = connection.cursor()

# Iterate theough the instances
....

# Do a database commit
if not put_request.has_transaction():
connection.commit()
self._release_connection(connection)

# Populate the response
put_response.key_list().extend([c.key() for c in clones])



With these modifications, the unit tests finally passed. That's the good news. Unfortunately, this also leaves a couple of questions unanswered. First of all, I am still using a single in-memory connection for my unit tests:

def setup_sqlite(name=None):
"""Sets up an in-memory sqlite instance and connects the datastore to it.

Args:
name: the name of the instance to connect to,
None or empty string for in-memory

Returns:
a sqlite connection object pointing to the database.
"""
if name:
raise 'Not implemented yet'
else:
connection = sqlite.connect(':memory:')
name = 'memory'
get_connection = lambda: connection
release_connection = lambda x: None
stub = DatastoreSqliteStub(get_connection, release_connection)
apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', stub)
return connection


This is fine for the tests I have so far, but it prevents me from really writing anything that check the store's behavior if data is concurrently modified. Does anyone have an idea how one can create more than one connection into the same in-memory database?

Second, I kindof deferred the choice as to what connection pool to use. This is not a big deal for now, but I would like to figure it out as the implementation matures. Any suggestions? Feedback is as always appreciated.

Sunday, September 21, 2008

If you AssUMe...

Sorry if this is a relatively short post, but it's a busy weekend and I'm kindof stuck here. I'll describe the situation -- maybe someone out there can advise?

Last week, we had written a unit test that failed:

  def testGetOrInsert(self):
"""tests if get_or_insert works correctly"""
helpers.create_tables([TestModel()], self.connection)
model1 = TestModel(number=1)
model1.put()
model2 = TestModel.get_or_insert('foo', number=13, text='t')
self.assertEquals(13, model2.number)
fetched = TestModel.get_by_key_name('foo')
self.assertEquals(13, fetched)



The logs indicated that our lack of transacton support was to blame. This week, I wanted to go ahead and fix that!

My assumption was (spoiler alert: that was probably wrong!) that a sqlite cursor is the equivalent of a database transaction. So, I decided to store cursors of active transactions in a dictionary within my datastore implementation:

  def __init__(self, database_name, connection=None):
"""Constructor.

Initializes and loads the datastore from the backing files, if they exist.

Args:
database_name: the name of the sqlite instance
connection: a pre-initialized connection for unit tests, optional
"""
#TODO: initialize sqlite instance
if connection:
self.connection = connection
self.__next_tx_handle = 1
self.__open_transactions = {}
self.__tx_handle_lock = threading.Lock()


def _get_cursor(self, transaction=None, may_update=False, delete=False):
"""Opens a new or returns an opened transaction (cursor).

Args:
transaction: an optional transaction protocol buffer. If not set,
a new, temporary object is returned. If set to a transaction without
a handle, a new object will be created. If set to a transaction, the
appropriate element. If the handle has no handle, that means a new
cursor will be created and the handle be set.
may_update: if set to True (default is False), the newly created
cursor object will be remembered as a new transaction and its handle
updated in the transaction object
delete: if set to true (default is False), delete the cursor from
the internal data structure if it exists

Returns:
a cursor
"""
if not transaction:
return self.connection.cursor()
self.__tx_handle_lock.acquire()
try:
if transaction.has_handle():
handle = transaction.handle()
if handle in self.__open_transactions:
if delete:
return self.__open_transactions.pop(handle)
else:
return self.__open_transactions.get(handle)
else:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.BAD_REQUEST,
'Transaction handle %d not found' % handle)
else:
if not may_update:
return self.connection.cursor()
handle = self.__next_tx_handle
self.__next_tx_handle += 1
transaction.set_handle(handle)
cursor = self.connection.cursor()
self.__open_transactions[handle] = cursor
finally:
self.__tx_handle_lock.release()



With this helper-method, creating a transaction or doing a commit or rollback seemed easy:

  def _Dynamic_BeginTransaction(self, request, transaction):
self._get_cursor(transaction, True, False)


def _Dynamic_Commit(self, transaction, transaction_response):
cursor = self._get_cursor(transaction, False, True)
cursor.commit()
cursor.close()


def _Dynamic_Rollback(self, transaction, transaction_response):
cursor = self._get_cursor(transaction, False, True)
cursor.rollback()
cursor.close()


I also updated my get and put implementations to fetch the proper cursor and operate on it.

As so often, simply assuming does not cut it, though: I am getting an error

AttributeError: 'pysqlite2.dbapi2.Cursor' object 
has no attribute 'commit'


Turns out that commit and roolback seem to be on connection level. Before I go ahead and refactor anything though, I was hoping that one of you could point me to good documentation that answers the following questions (or just answer them directly):


  • How are parallel transactions expected to be handled? Open one connection object for each?

  • If that is the case, how to open multiple connections on the same in-memory instance for unit tests

  • What cleanup is required at the end of a transaction? Should I close the connection object? Do I need to close any cursors first, or are those automatically cleaned up?



Feedback is appreciated.

Sunday, September 14, 2008

AppEngine and SQLite: getting values out of the database

Today, we are going to focus on implementing the _Dynamic_Get method of our SQLite-based datastore implementation (well, at least for the few data types we currently support). At the end of the article, I will also briefly address the what-happens-to-this-code-once-it's-done question I have been asked a few times.

How does a get work?


Let's take a look at the implementation from the file-based datastore:

  def _Dynamic_Get(self, get_request, get_response):
for key in get_request.key_list():
app = self.ResolveAppId(key.app())
key.set_app(app)
last_path = key.path().element_list()[-1]

group = get_response.add_entity()
try:
entity = self.__entities[app, last_path.type()][key]
except KeyError:
entity = None

if entity:
group.mutable_entity().CopyFrom(entity)



Apparently, the request-protocolbuffer contains a list of keys. The method iterates through these keys and does a lookup for each of them in the in-memory datastructure. If the key points to an existing protocol buffer, the result is copied into a list of entities contained in the response-protocolbuffer. Otherwise, no value (an empty group) is added. Since None does not contain any particular information about what key it belongs to, it has to be assumed that the order in which the elements are added to the response must correspond to the order of the keys.

Prep-work


This is test driven development, so let's go ahead and write a unit test first. Come to think of it, let's make it four:

  def testGetSingleElement(self):
"""Gets a single model from the datastore."""
helpers.create_tables([TestModel()], self.connection)
model1 = TestModel(number=1)
model2 = TestModel(number=2, text='#2')
model3 = TestModel(number=3)
key1 = model1.put()
key2 = model2.put()
key3 = model3.put()
fetched = TestModel.get(key2)
self.assertEquals(2, fetched.number)
self.assertEquals('#2', fetched.text)

def testGetSingleElementByCustomKey(self):
"""Gets a single model from the datastore with a string key."""
helpers.create_tables([TestModel()], self.connection)
model1 = TestModel(number=1)
model2 = TestModel(key_name='custom', number=2, text='#2')
model3 = TestModel(number=3)
key1 = model1.put()
key2 = model2.put()
key3 = model3.put()
fetched = TestModel.get_by_key_name('custom')
self.assertEquals(2, fetched.number)
self.assertEquals('#2', fetched.text)


def testGetMultipleElements(self):
"""Gets a several models from the datastore."""
helpers.create_tables([TestModel()], self.connection)
model1 = TestModel(number=1)
model2 = TestModel(number=2)
model3 = TestModel(number=3)
key1 = model1.put()
key2 = model2.put()
key3 = model3.put()
fetched = TestModel.get([key2, key1])
self.assertEquals(2, len(fetched))
self.assertEquals(2, fetched[0].number)
self.assertEquals(1, fetched[1].number)

def testGetOrInsert(self):
"""tests if get_or_insert works correctly"""
helpers.create_tables([TestModel()], self.connection)
model1 = TestModel(number=1)
model1.put()
model2 = TestModel.get_or_insert('foo', number=13, text='t')
self.assertEquals(13, model2.number)
fetched = TestModel.get_by_key_name('foo')
self.assertEquals(13, fetched)



Right now, all those tests fail miserably, but that was to be expected. However, by the end of this article, most of them should work!


Implementation


Let's take the original datastore method and convert it into SQLite.

Warning: this is a naiive and non-performant approach that should be improved in a later version! (I will address this issue a little bit later in the article).

As you might remember from the last article, every model class (or entity kind to be precise) is represented by a separate table in the SQL database. The name of the table equals the entity's kind. Every property is represented by a column, its type being "baked" into its name (a string property named foo would thus be represented as string_foo). Primary keys are either stored in a column called pk_int (for integer primary keys) or pk_string (for custom keys). In other words, the equivalent to the implementation above would be to iterate through all the primary keys, load their values into memory and translate them into an entity protocol buffer. The query that we use to fetch the data depends on the primary key: if it is a "name" (custome string key), query on pk_string, otherwise on pk_int. The following code accomplishes that goal:

  def _Dynamic_Get(self, get_request, get_response):
# Fetch a cursor from the connection
cursor = self.connection.cursor()

for key in get_request.key_list():

# Populate an entity with a clone of the key
entity = entity_pb.EntityProto()
result_key = entity.mutable_key()
result_key.CopyFrom(key)
result_key.set_app('sql-app')
key = result_key
group = get_response.add_entity()

# Build and execute the SQL query
tablename = key.path().element_list()[0].type()
last_path = key.path().element_list()[-1]
if last_path.has_name():
query = 'SELECT * FROM %s WHERE pk_string=?' % tablename
param = str(last_path.name())
else:
query = 'SELECT * FROM %s WHERE pk_int=?' % tablename
param = long(last_path.id())
cursor.execute(query, [param])
data = cursor.fetchone()
if not data: # nothing found
continue
keys = [metadata[0] for metadata in cursor.description]

# Convert the data from the cursor into a map of values
keyvals = dict([(keys[i], data[i]) for i in range(len(keys))])
keyvals.pop('pk_string')
keyvals.pop('pk_int')

# Populate the entity and store it in the response
dictToEntity(keyvals, entity)
entity.mutable_entity_group().CopyFrom(key.path())
group.mutable_entity().CopyFrom(entity)

# TODO: do I need to close the cursor? Does anyone know?


Before I talk about the missing method dictToEntity, let me explain why this is not a very efficient implementation. Suppose you call this method with two-hundred primary keys, each of them for the same entity kind. In this case, this implementation would fire two-hundred queries to the database. However, the same result could easily be achieved with a single query like SELECT * FROM modelName where pk_string='name1' or pk_string='name2'.... A better performing implementation would consider this accordingly.

Now, without further ado, the code that maps the values from the database into the protocol buffer:

def dictToEntity(values, pb):
"""Helper, transfers values from a dictionary into a protocol buffer
This method will only deal with the properties of the entity,
not its primary key.

Args:
values: a dctionary of values
pb: the entity protocol buffer
"""
for key, value in values.items():

# Split up the key (like int64_name) into segments
i = key.find('_')
if i < 1:
continue
p1 = key[0:i]
p2 = key[i+1:]

# Case: type integer
if p1 == 'int64':
prop = pb.add_property()
prop.set_name(p2)
prop.set_multiple(False)
prop.mutable_value().set_int64value(long(value))

# Case: type string
elif p1 == 'string':
prop = pb.add_property()
prop.set_name(p2)
prop.set_multiple(False)
prop.mutable_value().set_stringvalue(value)


What's next


Well, now that we have implemented the get-method, let's take a look at the results of our unit tests:

Importing test modules ... done.

Checks if our TestModel can be converted into a valid SQL table. ... ok
Gets a several models from the datastore. ... ok
tests if get_or_insert works correctly ... FAIL
Gets a single model from the datastore. ... ok
Gets a single model from the datastore with a string key. ... ok
Writes a value into the database twice. ... ok
Writes a single model to the database retrieves it. ... ok

======================================================================
FAIL: tests if get_or_insert works correctly
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/jens/appengine/sqlite/src/unittests.py", line 102, in testGetOrInsert
model2 = TestModel.get_or_insert('foo', number=13, text='t')
File "/home/jens/appengine/google_appengine/google/appengine/ext/db/__init__.py", line 862, in get_or_insert
return run_in_transaction(txn)
File "/home/jens/appengine/google_appengine/google/appengine/api/datastore.py", line 1393, in RunInTransaction
result = function(*args, **kwargs)
File "/home/jens/appengine/google_appengine/google/appengine/ext/db/__init__.py", line 857, in txn
entity = cls.get_by_key_name(key_name, parent=kwds.get('parent'))
File "/home/jens/appengine/google_appengine/google/appengine/ext/db/__init__.py", line 779, in get_by_key_name
return get(*keys)
File "/home/jens/appengine/google_appengine/google/appengine/ext/db/__init__.py", line 974, in get
entities = datastore.Get(keys)
File "/home/jens/appengine/google_appengine/google/appengine/api/datastore.py", line 209, in Get
_MaybeSetupTransaction(req, keys[0])
File "/home/jens/appengine/google_appengine/google/appengine/api/datastore.py", line 1468, in _MaybeSetupTransaction
tx.handle)
File "/home/jens/appengine/google_appengine/google/appengine/api/apiproxy_stub_map.py", line 46, in MakeSyncCall
stub.MakeSyncCall(service, call, request, response)
File "/home/jens/appengine/sqlite/src/datastore_sqlite_stub.py", line 91, in MakeSyncCall
assert response.IsInitialized(explanation), explanation
AssertionError: ['Required field: handle not set.']

----------------------------------------------------------------------
Ran 7 tests in 0.045s

FAILED (failures=1)



Most of it looks great -- hurray :-) Only problem is that the get_or_insert method isn't quite doing its job yet. Turns out that this method requires transactions to work. Sounds like a fun project for our next article...

Parting words for the week.


I would like to thank Dirk for pointing me to Pygments for sourcecode highlighting. It looks like a good choice, and I will give it a try in the next blog post!

I have been asked if I am going to make the source of this project available in its entirety. The answer is yes, but only once it's done. The world is flooded with too many half-baked projects, so I'd rather wait until I am further along. Once that happens, I will do one of the following things:


  • I will dump the code somewhere for download under Apache license and never look back.

  • I will have found an open source project where this is a good fit and that has a license compatible with Apache and donate the code.

  • I will have found a couple of passionate developer who would like to make this code compatible with more databases and start it as a standalone project.



Either way, this is not going away -- don't worry. Just give me a few more weeks to actually implement something worth releasing :-)

Sunday, September 7, 2008

AppEngine and SQLite: storing values in the database (part 2)

Time to implement the put operation! By the end of this article, we will be able to write very simple models into the database. The code for the put-method will not be complete: we will only support integer values and strings for now, no transactions, no lists. Still, it will be a basic skeleton that we can build on.

Now that we know what we are going to do, how are we going to do it? First, let's take a look how datastore_file_stub implements the put method:

def _Dynamic_Put(self, put_request, put_response):
clones = []
for entity in put_request.entity_list():
clone = entity_pb.EntityProto()
clone.CopyFrom(entity)
clones.append(clone)

assert clone.has_key()
assert clone.key().path().element_size() > 0

app = self.ResolveAppId(clone.key().app())
clone.mutable_key().set_app(app)

last_path = clone.key().path().element_list()[-1]
if last_path.id() == 0 and not last_path.has_name():
self.__id_lock.acquire()
last_path.set_id(self.__next_id)
self.__next_id += 1
self.__id_lock.release()

assert clone.entity_group().element_size() == 0
group = clone.mutable_entity_group()
root = clone.key().path().element(0)
group.add_element().CopyFrom(root)

else:
assert (clone.has_entity_group() and
clone.entity_group().element_size() > 0)

self.__entities_lock.acquire()

try:
for clone in clones:
last_path = clone.key().path().element_list()[-1]
kind_dict = self.__entities.setdefault((app, last_path.type()), {})
kind_dict[clone.key()] = clone
finally:
self.__entities_lock.release()

if not put_request.has_transaction():
self.__WriteDatastore()

put_response.key_list().extend([c.key() for c in clones])



Hmmm... looks mighty complicated, doesn't it? Actually, it's not so bad when you summarize it in pseudo-code. It seems that the file-based datastore keeps its entire content in memory, and just writes it to disk to provide persistence in case the program dies or gets killed and restarted. Let's look at it from a higher level:

def _Dynamic_Put(self, put_request, put_response):
for each entity from the put_request:
create a defensive copy called "clone"
make sure that the clone has a valid key
copy the app's name into the key of the clone

if entity has no existing or custom_key
(in other words last_path.id() == 0
and not last_path.has_name()):
create a new numeric primary key for the entity
copy the entity-group from the key's root
else:
make some more data consistency checks

for each of the clones:
put the clone into an
in-memory represenation of the datastore

if we are in a transaction:
write the in-memory representation to disc
else: # (implicit, not in the original code)
do nothing, something else will persist it later

put all the keys from the clones into put_response



While this gives us a basic framework on how to proceed, it also means we do not get a lot of help on how to mode our data store. However, this is the first decision we have to make before we can actually write anything to the database.


Our relational schema


Generally speaking, there are two schools of thought on how the database could be set up. Option number one would be a one-table-fits-all approach:


  • We create one table, let's say "MODELS" that contains one row for each model put into the datastore (with a column describing its type and a primary key).

  • A second table, "PROPERTIES", contains all the properties of a model instance. Each potential data type (integer, string, blob...) would be a separate column in the schema. A separate column would contain the primary key of the model, possibly with a foreign key constraint towards the MODELS-table. When we load a model from the table, we would query for all rows with the same primary key.



This approach has many advantages. It is simple, robust towards schema migration, requires no changes to the datastore when a new model is introducedl. There is only one problem: I don't like it and this is my project, so I'll do something else instead ;-).

I won't go too much into specifics here, but suffice it to say that in a previous life, long ago at another job in a galaxy far, far away, I had my share of trouble with a generic schema like this. Getting it to perform well with external tools, e.g. for reporting, was quite painful. I tend to think that if somebody goes through all the trouble and connect App Engine to a relational database, he or she won't mind the little extra inconvenience of managing a schema.

So, here's the plan: for every different model, I will create a new table (with the same name that the Model has). For every field in the Model, I will create a new row of the proper database type (TEXT for string, INTEGER for numbers and so on). Since I want to make life easier for myself in the future however, I would like to have a tool that creates the initial schema for me, similar to Active Record Migration in Rails. The following unit test will validate that our TestModel is converted into a nice little create-table statement:

class TestModel(db.Model):
text = db.StringProperty(default='some text')
number = db.IntegerProperty(default=42)

#...

def testCreateTabledef(self):
"""Checks if our TestModel can be converted into a valid SQL table."""
helpers.create_tables([TestModel()], self.connection)
cursor = self.connection.cursor()
cursor.execute(
"INSERT INTO TestModel(string_text,int64_number) "
"VALUES ('test', 13)")




So, how does our create_tables method look like? Well, while I was clicking through the debugger to make sense of the low-level APIs, I ran into some helpful methods. _populate_internal_entity() converts a Model-instance into an Entity object (not the protocol buffer, but a class of the same name in the lower-level API). That entity has a method _ToPb() that will finish the conversion into a protocol-buffer. Once we have this protocol buffer, the following code can transform it into a map of key/value pairs:

def entityToDict(pb):
"""Helper, converts an entity protocol buffer to a dictionary.
This method will only deal with the properties of the entity,
not its primary key.

Args:
pb: the entity protocol buffer

Returns:
a dictionary with appropriate kev/value pairs that can
be stored in a SQLite database.
"""
result = {}
#TODO: what about raw properties?
for property in pb.property_list():
#TODO: what about multiple properties?
assert not (property.has_multiple() and property.multiple())
if not property.has_value():
continue
key = property.name()
value = property.value()
if value.has_int64value():
result['int64_' + key] = value.int64value()
elif value.has_stringvalue():
result['string_' + key] = value.stringvalue()
else:
raise 'Not supported yet: %s' % value
return result


As the reader can see, I take the name of the property and prepend its type to create a column name. An integer-property named "foo" would thus be transformed to "int64_foo". As a result, I retrieve a dictionary with the column names as keys and the values for later insertion into the database. The following code takes this information and translates it into a SQL statement that creates a table:

def create_tabledef(model_instance):
"""Create a SQL statement to populate a table from a fully populated Model.

Args:
model_instance: an instance of a model, each field filled with a
non-None value

Returns:
a SQL statement that could be used to create a new table.
"""

# Convert the model into a map of propert key/value pairs
entity = model_instance._populate_internal_entity()
pb = entity._ToPb()
as_dict = datastore_sqlite_stub.entityToDict(pb)

# Translate each key/value pair into a SQL-ish type definition
types = {str: 'TEXT', long: 'INTEGER'}
def convert(value):
key = type(value)
assert 'Cannot convert type %s' % key, key in types
return types[key]
cols = ['%s %s' % (key, convert(val)) for key,val in as_dict.items()]

# Add two more columns for the primary key
cols.append('pk_int INTEGER PRIMARY KEY')
cols.append('pk_string TEXT')

# Concatenate all type definitions into one create statement
return 'CREATE TABLE %s (%s);' % (model_instance.kind(), ','.join(cols))


Notice that I add two additional columns, pk_int and pk_string. Pk_int contains the primary key in the relational database, which is auto-populated if not given in an insert statement. In case a user chooses to use a string-based primary key instead, we store that entry in pk_string. Now that we have this method defined, it is only a matter of applying the statements to the database:

def create_tables(list_of_models, connection):
"""Creates one or more SQL tables from a list of prepopulated models.

Args:
list_of_models: a list of models, each of a different kind
connection: the SQLite connection that should be used
"""
cursor = connection.cursor()
ok = False
try:
for model in list_of_models:
tabledef = create_tabledef(model)
cursor.execute(tabledef)
ok = True
finally:
if ok:
connection.commit()
else:
connection.rollback()



Writing the data


Back to our put-statement. The following unit-tests should verify that the basic operation is successfully implemented:

  def testWriteSingle(self):
"""Writes a single model to the database retrieves it."""
helpers.create_tables([TestModel()], self.connection)
model = TestModel()
key = model.put()
id = key._Key__reference.path().element_list()[-1].id()
cursor = self.connection.cursor()
cursor.execute(
'SELECT string_text, int64_number FROM TestModel WHERE pk_int=%s' % id)
result = cursor.fetchone()
self.assertEquals('some text', result[0])
self.assertEquals(42, result[1])

def testWriteDouble(self):
"""Writes a value into the database twice."""
helpers.create_tables([TestModel()], self.connection)
model = TestModel()
key = model.put()
id = key._Key__reference.path().element_list()[-1].id()
key = model.put()
id2 = key._Key__reference.path().element_list()[-1].id()
self.assertEquals(id, id2)
cursor = self.connection.cursor()
cursor.execute(
'SELECT COUNT(*) FROM TestModel WHERE pk_int=%s' % id)
result = cursor.fetchone()
self.assertEquals(1, result[0])



Now it is time to implement the put. I have done my best to document what I am doing, so the code should be rather self-explanatory. Basically, this is what I do:


  • I iterate through all entities in the request.

  • If the entity has an existing primary key, I execute a DELETE-statement on that key in the database. I do not bother checking if the object already exists; if it doesn't, the delete is simply a NOP.

  • I convert the entity into a dictionary using the previously defined entityToDict.

  • If there was a pre-existing primary key, I add that key to the dictionary.

  • I create an INSERT statement from the dictionary and execute it.

  • If there was no pre-existing primary key, I fetch the newly created row-id from the cursor and consider that the new key.

  • I finish iterating and return the result.



Here is the code that does it all.


  def _Dynamic_Put(self, put_request, put_response):

# Fetch a cursor from the connection
cursor = self.connection.cursor()

# Iterate theough the instances
clones = []
for entity in put_request.entity_list():
# create a defensive copy called "clone"
clone = entity_pb.EntityProto()
clone.CopyFrom(entity)
clones.append(clone)

# make sure that the clone has a valid key
assert clone.has_key()
assert clone.key().path().element_size() > 0
tablename = clone.key().path().element_list()[0].type()

# populate the key's app name with a value
clone.mutable_key().set_app('sql-app')

# Convert the protocol buffer into a dictionary
values = entityToDict(clone)

# If the instance has a primary key, delete the old row
last_path = clone.key().path().element_list()[-1]
if last_path.id() != 0:
cursor.execute('DELETE FROM %s WHERE pk_int=%s' %
(tablename, last_path.id()))
values['pk_int'] = last_path.id()
if last_path.has_name():
cursor.execute('DELETE FROM %s WHERE pk_string=?' %
tablename, [last_path.name()])
values['pk_string'] = last_path.name()

# Using the dictionary of values, create a SQL query
# TODO: how handle list elements
keyval_list = values.items()
key_list = ','.join([first for first, second in keyval_list])
value_list = [second for first, second in keyval_list]
questionmarks = ','.join(['?' for value in value_list])
cursor.execute(
'INSERT INTO %s (%s) VALUES (%s)' %
(tablename, key_list, questionmarks),
value_list)

# Grab the primary key and update the result
if last_path.id() == 0 and not last_path.has_name():
last_path.set_id(cursor.lastrowid)

# Do a database commit
# TODO: how to handle transactions?
if not put_request.has_transaction():
self.connection.commit()

# Populate the response
put_response.key_list().extend([c.key() for c in clones])


Next steps


Well, I've had enough for this weekend, but I might continue this in a few days from now. I'm still looking for a good syntax highlighter, so let me know if you find one. Beyond that, I'd also appreciate some feedback on what to do next. I could either extend the put method by dealing with more data types, list and such, or I can move on to things like get and query and worry about feature completeness later? What's more interesting? Vote by submitting a comment to this post!

Saturday, September 6, 2008

AppEngine and SQLite: storing values in the database (part 1)

AppEngine and SQLite: storing values in the database (part 1)

All right, time for the first installment of "getting-things-to-run-in-sqlite" ;-)
Please be patient if I have to change things I'm posting today further down the line -- I am implementing this while typing, so assumptions on how things work in app engine might be wrong and may need to be revised.

Preparations:


All righty then, let's get started. I have already set up my machine by installing the following things:

  • python 2.5

  • sqlite and python-sqlite library (pysqlite2)

  • app engine

  • eclipse

  • pydev



For console-based execution, I am including all app engine libraries into my python-path (setting PYTHONPATH to :/home/jens/appengine/google_appengine:/home/jens/appengine/google_appengine/lib/yaml/lib:/home/jens/appengine/google_appengine/lib/webop:/home/jens/appengine/google_appengine/lib/django:).
Next, I create a python project in eclipse. I copy in the sources from the first post into the project (in other words, the file datastore_sqlite_stub.py).

Last but not least, since I have no clue about sqlite, I read a small tutorial.


First baby steps


In my first iteration, I would like to figure out how the low-level datastore put is supposed to work, but I have to start with some plumbing work. I create a new file with utility methods (helpers.py). The first method sets up a datastore with an in-memory sqlite instance:

from datastore_sqlite_stub import DatastoreSqliteStub
from google.appengine.api import apiproxy_stub_map
from pysqlite2 import dbapi2 as sqlite

def setup_sqlite(name=None):
"""Sets up an in-memory sqlite instance and connects the datastore to it.

Args:
name: the name of the instance to connect to,
None or empty string for in-memory

Returns:
a sqlite connection object pointing to the database.
"""
if name:
connection = sqlite.connect(name)
else:
connection = sqlite.connect(':memory:')
name = 'memory'
stub = DatastoreSqliteStub(database_name=name, connection=connection)
apiproxy_stub_map.apiproxy.RegisterStub('datastore_v3', stub)
return connection



For this to work, I need to make a small adjustment to the constructor's signature in the stub:


def __init__(self, database_name, connection=None):
"""Constructor.

Initializes and loads the datastore from the backing files, if they exist.

Args:
database_name: the name of the sqlite instance
connection: a pre-initialized connection for unit tests, optional
"""
#TODO: initialize sqlite instance
pass



Now, before I do anything else, I create a simple unit-test (file "unittests.py") that tries to write something to the database:

from google.appengine.ext import db
import helpers
import unittest


class TestModel(db.Model):
text = db.StringProperty(default='some text')
number = db.IntegerProperty(default=42)


class UnitTests(unittest.TestCase):

def setUp(self):
"""Set up in-memory connection."""
self.connection = helpers.setup_sqlite()

def testWriteSingle(self):
"""Writes a single model to the database retrieves it."""
model = TestModel()
model.put()


if __name__ == '__main__':
unittest.main()



Now I put a breakpoint into the "_Dynamic_Put" method of my stub and run the test in the debugger. This will tell me what exactly the input and output values for a put are.

Digging through buffers


Turns out that the input into my method is a "PutRequest" object that looks something like this:

PutRequest: entity <
key <
app: ":self"
path <
Element {
type: "TestModel"
id: 0
}
>
>
entity_group <
>
property <
name: "number"
value <
...



The second parameter is a "PutResponse". A quick text search for PutRequest reveals that its source can be found in the SDK under google/appengine/datastore/datastore_pb.py. Turns out that both parameters are protocol buffers, Google's language-agnostic data structure (see http://code.google.com/p/protobuf/). Makes kindof sense -- if one aims to support more than one programming language and not reinvent the wheel, there has to be some point in the stack where all parts speak the same protocol. Unfortunately, that means the sourcecode is machine generated, without a lot of documentation inside. Oh well, more fun for me :-)

The constructor of of the request suggests that the object is mostly a collection of "entities" plus a transaction:

class PutRequest(ProtocolBuffer.ProtocolMessage):
def __init__(self, contents=None):
self.entity_ = []
self.transaction_ = None
self.composite_index_ = []
self.has_transaction_ = 0
self.lazy_init_lock_ = thread.allocate_lock()
if contents is not None: self.MergeFromString(contents)



I'm not going to worry about the transaction for now (that can be part of a later post) and focus on the entities. The debugger tells me that an entity looks like this:

EntityProto: key <
app: ":self"
path <
Element {
type: "TestModel"
id: 0
}
>
>
entity_group <
>
property <
name: "number"
value <
int64Value: 42
>
multiple: false...



Another protocol buffer, eh? This one can be found in "entity_pb.py" in the same directory. Let's take a look at the constructor:

  def __init__(self, contents=None):
self.key_ = Reference()
self.entity_group_ = Path()
self.owner_ = None
self.kind_ = 0
self.kind_uri_ = ""
self.property_ = []
self.raw_property_ = []
self.has_key_ = 0
self.has_entity_group_ = 0
self.has_owner_ = 0
self.has_kind_ = 0
self.has_kind_uri_ = 0
self.lazy_init_lock_ = thread.allocate_lock()
if contents is not None: self.MergeFromString(contents)



Scary stuff. Well, let's make life a little bit easier here and assume that everything that starts with "has_" simply encodes if a particular parameter exists or not (has_key is 0 if no key exists and so on). In that case, our entity seems to have the following content:


  • a key object (a Reference-object, another protocol buffer)

  • an entity_group (Path-object, protocol buffer)

  • an owner (unknown)

  • a kind (numeric)

  • a kind_uri (string)

  • a property and a raw_property (both lists of some sort?)



In our concrete example (looking into the debugger), the object is populated as follows:

Key is
Reference: app: ":self"
path <
Element {
type: "TestModel"
id: 0
}


Entity goup just seems to be the empty Path-object
from the constructor


owner is not set.
Kind is not set.
kind_uri is not set.


Property contains two more protocol buffers:

Property: name: "number"
value <
int64Value: 42
>
multiple: false

Property: name: "text"
value <
stringValue: "some text"
>
multiple: false


raw_property is an empty list



Ok, so in summary, it looks like each model is translated into an entity; each property is (I'm sparing the reader the digging through some more nested protocol buffers, check out class PropertyValue in entity_pb) of one of the following types:


  • int64

  • boolean

  • string

  • double

  • point (don't know what that means yet)

  • user (don't know what that means yet)

  • reference (don't know what that means yet)




So what about output parameter? To get to the proper output of the method, I could replace my stub with the in-memory implementation and debug again, but I'm way too lazy for now. Since I have the files open anyway, I might as well just peek at the PutResponse and take an educated guess ;-) Here's the constructor:

  def __init__(self, contents=None):
self.key_ = []
if contents is not None: self.MergeFromString(contents)



So, it seems that all the output is is a collection of keys -- the primary keys of the objects stored in the db, to be precise. A little further digging reveals that these keys are Reference-objects, the same protocol buffer we have already seen before. Makes sense, considering that the put-method of our Model class returns the Key of the stored entity...

Next steps:


So, now I basically know what I have to do to implement a put in the database. First, I have to decide on how to model the different data types in a relational schema (as a matter of fact, I probably have to do some more digging for point, user and reference). Next, I will write a couple of more in-depth unit tests that validate the different edge cases (note: I do not need to implement get to make this work, if I use the sql-queries directly to test what data was written into the store). Then, I will work on the put-method until all the tests pass. Wish me luck!

PS:
I know my current posts do not syntax-highlight python code, and I would like to change that for the next post. Can anyone recommend a tool to better format the code? Please, do not recommend a Windows-based blogging editor, since I am using a Linux system...

Monday, September 1, 2008

Doubts about App Engine?

It's still a couple of more days until my next post in the SQLite series, but I just ran into an interesting thread on the forum that I'd like to point people to -- it's called Having doubts about App Engine. Interesting discussion, refreshingly little flame for such a topic :-). Enjoy.