Sunday, October 12, 2008

GQL queries in SQL, part 4

Time to complete the query implementation! First, let's use last article's schema-method to add a sort order to the query. We do this by iterating through the order-objects in the query protcol-buffer and look up the corresponding columns in the schema definition:

      order_conditions = []
schema = self.prm.getSchema(connection, query.kind()) \
if query.order_size() > 0 else None
for entry in query.order_list():
if entry.property() in schema:
for column in schema[entry.property()]:
order = \
'ASC'\
if entry.direction() == \
datastore_pb.Query_Order.ASCENDING \
else 'DESC'
order_conditions.append('%s %s' % (column, order))
if len(order_conditions) > 0:
sqlquery = '%s ORDER BY %s' % (
sqlquery, ', '.join(order_conditions))


The main work here is done in the getSchema-method that was described in last week's article.

Now that our sql query is complete, we can fetch data from our connection. We do this by executing the query, then skipping as many entries as are defined in the protocol buffer's offset value. Then, we fetch up to 1000 entries from our store:

      # Execute the query
cursor = connection.cursor()
cursor.execute(sqlquery, params)
try:
for i in range(query.offset()):
cursor.next()
except StopIteration:
pass
rows = cursor.fetchmany(max(0,min(1000, query.limit())))



The interesting question is -- now that we have successfully queried the database, what are we going to do with the results? Taking a closer look at the file-based implementation of the datastore reveals that our query method does not actually return any results -- it stores them in a local map, so that they can be retrieved by calls to _Dynamic_Next. By using the same approach, we can actually reuse the file-stub's implementation of next. All we have to do is convert the rows into protocol buffers (the logic needs to be factored out from the get-implementation) and cache them locally. The following is the completed code that makes our unit test pass:

class PRMHelper(object):

# ...

def getSchema(self, connection, table_name):
"""For a given table name, return all property names defined.

Args:
connection: the sql connection that should be used to
gather any metadata
table_name: the name of the table/model to inspect

Returns:
a dictionary with the property names as keys and a list
of column names (used to store them) as values
"""
result = {}
for column in connection.cursor().execute(
"PRAGMA TABLE_INFO(%s)" % table_name).fetchall():
key = column[1]
i = key.find('_')
if i < 1:
continue
p1 = key[0:i]
if p1 == 'pk':
continue
p2 = key[i+1:]
if not p2 in result:
result[p2] = []
result[p2].append(key)
return result


def rowToDict(self, cursor, row, remove_metadata=True):
"""Convert a given row from a cursor into a dictionary,
using the column names as keys.

Args:
cursor: a database curspr that contains the metadata
row: the row to work on
remove_pk: if set to True (default), kick the rows out
that are related to metadata (like primary keys)
"""
keys = [metadata[0] for metadata in cursor.description]
keyvals = dict([(keys[i], row[i]) for i in range(len(keys))])
if remove_metadata:
keyvals.pop('pk_string')
keyvals.pop('pk_int')
return keyvals

def pkFromRow(self, kind, keyvals):
"""Converts a given dictionary of values into a primary key."""
int_pk = keyvals.get('pk_int', None)
string_pk = keyvals.get('pk_string', None)
if int_pk == None and string_pk == None:
return None
if string_pk:
return datastore.Key.from_path(kind, string_pk)
return datastore.Key.from_path(kind, int_pk)._ToPb()

# ...


class DatastoreSqliteStub(object):
""" Datastore stub implementation that uses a sqlite instance."""

def __init__(
self, get_connection, release_connection, prm_helper=None):
"""Constructor.

Args:
get_connection: a parameterless function that provides a new
connection from the pool
release_connection: a function that accepts a connection
and puts it back into the pool
prm_helper:
a class that provides miscellaneous tools for
protocolbuffer-to-rdbms mapping
"""
self._get_connection = get_connection
self._release_connection = release_connection
self.__next_tx_handle = 1
self.__open_transactions = {}
self.__tx_handle_lock = threading.Lock()
self.__next_cursor = 1
self.__cursor_lock = threading.Lock()
self.__queries = {}
if prm_helper:
self.prm = prm_helper
else:
self.prm = PRMHelper()

# ...


def _Dynamic_RunQuery(self, query, query_result):

# Turn the filter-objects into a set of conditions
def build_query_conditions(dictionary, filter, col_key, value):
operator = filter.op()
names = DatastoreSqliteStub._Operator_NAMES
if operator in names:
key = '%s %s' % (col_key, names[operator])
assert not key in dictionary,\
'Multiple conditions not supported yet: %s' % key
dictionary[key] = value
elif operator == 7:
key = '%s NOT NULL' % col_key
assert not key in dictionary,\
'Multiple conditions not supported yet: %s' % key
dictionary[key] = None
else:
assert False, 'Unsupported operator: %s' % operator
conditions = self.prm.entityToDict(
pb=query,
populate_dict=build_query_conditions,
get_list=lambda x:x.filter_list(),
unwrap_properties=lambda x:x.property_list())

# Concatenate the conditions in a statement
sqlquery = 'SELECT * FROM %s' % query.kind()
params = {}
count = 0
for key, value in conditions.items():
if count == 0:
sqlquery = '%s WHERE %s' % (sqlquery, key)
else:
sqlquery = '%s AND %s' % (sqlquery, key)
if value != None:
params[str(count)] = value
sqlquery = '%s :%s' % (sqlquery, count)
count += 1

# Open a connection and execute the reentitiesst of the
# method in a try-finally block
connection = self._connect(None, False, False)
try:

# Now, look at the sort order
order_conditions = []
schema = self.prm.getSchema(connection, query.kind()) \
if query.order_size() > 0 else None
for entry in query.order_list():
if entry.property() in schema:
for column in schema[entry.property()]:
order = \
'ASC'\
if entry.direction() == \
datastore_pb.Query_Order.ASCENDING \
else 'DESC'
order_conditions.append('%s %s' % (column, order))
if len(order_conditions) > 0:
sqlquery = '%s ORDER BY %s' % (
sqlquery, ', '.join(order_conditions))

# Execute the query
cursor = connection.cursor()
cursor.execute(sqlquery, params)
try:
for i in range(query.offset()):
cursor.next()
except StopIteration:
pass
rows = cursor.fetchmany(max(0,min(1000, query.limit())))

# Convert the rows into PBs
results = []
for data in rows:

# Extract data from row and convert to entity
entity = entity_pb.EntityProto()
results.append(entity)
keyvals = self.prm.rowToDict(cursor, data, False)
self.prm.dictToEntity(keyvals, entity)

# Extract primary key
pk = self.prm.pkFromRow(query.kind(), keyvals)
key_pb = entity.mutable_key().CopyFrom(pk)

# TODO: better support for entity groups?
group = entity.mutable_entity_group()
root = pk.path().element(0)
group.add_element().CopyFrom(root)


# The following code is taken from datastore_file_stub
self.__cursor_lock.acquire()
cursor = self.__next_cursor
self.__next_cursor += 1
self.__cursor_lock.release()
self.__queries[cursor] = (results, len(results))
query_result.mutable_cursor().set_cursor(cursor)
query_result.set_more_results(len(results) > 0)

# Clean up resources at the end
finally:
self._release_connection(connection)


def _Dynamic_Next(self, next_request, query_result):
"""Taken verbatim from datastore_file_stub.py."""
cursor = next_request.cursor().cursor()

try:
results, orig_count = self.__queries[cursor]
except KeyError:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.BAD_REQUEST,
'Cursor %d not found' % cursor)

count = next_request.count()
for r in results[:count]:
query_result.add_result().CopyFrom(r)
del results[:count]

query_result.set_more_results(len(results) > 0)



What's next?


Our sql connector is in pretty good shape by now. We can insert and get data from the database and run GQL queries. So, what's keeping me from releasing it into the open source world?

Well, I do not necessarily think that the code needs to be perfect, but there are a couple of higher level items that I would like to address. For example, I do not worry too much about implementing the count feature, but deletion is a must. Also, the PRMHelper can currently only handle strings and integers, but there are a lot more data types to be supported. List values are also something important and potentially tricky; I'd like to make sure that there is at least a basic implementation for it.

I'll keep you posted on this blog.

Sunday, October 5, 2008

GQL queries in SQL, part 3

I did not have a lot of time to play with my sql connector this week, so there is only a brief update today -- hopefully, there will be a little bit more next week.

I started looking into expressing the sort order in sql statements and ran into a little issue: without any concrete type information in a query (remember, the properties in our model are a couple of layers further above), how would I know which column in our SQL table to sort by? I needed to map each possible value in an entity to one or more rows. For this reason, I added the following method to my prm-helper:

  def getSchema(self, connection, table_name):
"""For a given table name, return all property names defined.

Args:
connection: the sql connection that should be used to gather
any metadata
table_name: the name of the table/model to inspect

Returns:
a dictionary with the property names as keys and a list
of column names (used to store them) as values
"""
result = {}
for column in connection.cursor().execute(
"PRAGMA TABLE_INFO(%s)" % table_name).fetchall():
key = column[1]
i = key.find('_')
if i < 1:
continue
p1 = key[0:i]
if p1 == 'pk':
continue
p2 = key[i+1:]
if not p2 in result:
result[p2] = []
result[p2].append(key)
return result


This little method iterates through a table's metadata and extracts the information I need, using the same naming conventions that are in place for the other mapping-related logic. The following unit test makes sure that the method works:

  def testGetSchema(self):
helpers.create_tables([TestModel()], self.connection)
prm = datastore_sqlite_stub.PRMHelper()
schema = prm.getSchema(self.connection, 'TestModel')
assert schema
self.assertEquals(['text', 'number'], schema.keys())
self.assertEquals({
'text': ['string_text'],
'number': ['int64_number']}, schema)



With this helper in place, I should be able to express sort order conditions in SQL during the next iteration.