order_conditions = []
schema = self.prm.getSchema(connection, query.kind()) \
if query.order_size() > 0 else None
for entry in query.order_list():
if entry.property() in schema:
for column in schema[entry.property()]:
order = \
'ASC'\
if entry.direction() == \
datastore_pb.Query_Order.ASCENDING \
else 'DESC'
order_conditions.append('%s %s' % (column, order))
if len(order_conditions) > 0:
sqlquery = '%s ORDER BY %s' % (
sqlquery, ', '.join(order_conditions))
The main work here is done in the getSchema-method that was described in last week's article.
Now that our sql query is complete, we can fetch data from our connection. We do this by executing the query, then skipping as many entries as are defined in the protocol buffer's offset value. Then, we fetch up to 1000 entries from our store:
# Execute the query
cursor = connection.cursor()
cursor.execute(sqlquery, params)
try:
for i in range(query.offset()):
cursor.next()
except StopIteration:
pass
rows = cursor.fetchmany(max(0,min(1000, query.limit())))
The interesting question is -- now that we have successfully queried the database, what are we going to do with the results? Taking a closer look at the file-based implementation of the datastore reveals that our query method does not actually return any results -- it stores them in a local map, so that they can be retrieved by calls to _Dynamic_Next. By using the same approach, we can actually reuse the file-stub's implementation of next. All we have to do is convert the rows into protocol buffers (the logic needs to be factored out from the get-implementation) and cache them locally. The following is the completed code that makes our unit test pass:
class PRMHelper(object):
# ...
def getSchema(self, connection, table_name):
"""For a given table name, return all property names defined.
Args:
connection: the sql connection that should be used to
gather any metadata
table_name: the name of the table/model to inspect
Returns:
a dictionary with the property names as keys and a list
of column names (used to store them) as values
"""
result = {}
for column in connection.cursor().execute(
"PRAGMA TABLE_INFO(%s)" % table_name).fetchall():
key = column[1]
i = key.find('_')
if i < 1:
continue
p1 = key[0:i]
if p1 == 'pk':
continue
p2 = key[i+1:]
if not p2 in result:
result[p2] = []
result[p2].append(key)
return result
def rowToDict(self, cursor, row, remove_metadata=True):
"""Convert a given row from a cursor into a dictionary,
using the column names as keys.
Args:
cursor: a database curspr that contains the metadata
row: the row to work on
remove_pk: if set to True (default), kick the rows out
that are related to metadata (like primary keys)
"""
keys = [metadata[0] for metadata in cursor.description]
keyvals = dict([(keys[i], row[i]) for i in range(len(keys))])
if remove_metadata:
keyvals.pop('pk_string')
keyvals.pop('pk_int')
return keyvals
def pkFromRow(self, kind, keyvals):
"""Converts a given dictionary of values into a primary key."""
int_pk = keyvals.get('pk_int', None)
string_pk = keyvals.get('pk_string', None)
if int_pk == None and string_pk == None:
return None
if string_pk:
return datastore.Key.from_path(kind, string_pk)
return datastore.Key.from_path(kind, int_pk)._ToPb()
# ...
class DatastoreSqliteStub(object):
""" Datastore stub implementation that uses a sqlite instance."""
def __init__(
self, get_connection, release_connection, prm_helper=None):
"""Constructor.
Args:
get_connection: a parameterless function that provides a new
connection from the pool
release_connection: a function that accepts a connection
and puts it back into the pool
prm_helper:
a class that provides miscellaneous tools for
protocolbuffer-to-rdbms mapping
"""
self._get_connection = get_connection
self._release_connection = release_connection
self.__next_tx_handle = 1
self.__open_transactions = {}
self.__tx_handle_lock = threading.Lock()
self.__next_cursor = 1
self.__cursor_lock = threading.Lock()
self.__queries = {}
if prm_helper:
self.prm = prm_helper
else:
self.prm = PRMHelper()
# ...
def _Dynamic_RunQuery(self, query, query_result):
# Turn the filter-objects into a set of conditions
def build_query_conditions(dictionary, filter, col_key, value):
operator = filter.op()
names = DatastoreSqliteStub._Operator_NAMES
if operator in names:
key = '%s %s' % (col_key, names[operator])
assert not key in dictionary,\
'Multiple conditions not supported yet: %s' % key
dictionary[key] = value
elif operator == 7:
key = '%s NOT NULL' % col_key
assert not key in dictionary,\
'Multiple conditions not supported yet: %s' % key
dictionary[key] = None
else:
assert False, 'Unsupported operator: %s' % operator
conditions = self.prm.entityToDict(
pb=query,
populate_dict=build_query_conditions,
get_list=lambda x:x.filter_list(),
unwrap_properties=lambda x:x.property_list())
# Concatenate the conditions in a statement
sqlquery = 'SELECT * FROM %s' % query.kind()
params = {}
count = 0
for key, value in conditions.items():
if count == 0:
sqlquery = '%s WHERE %s' % (sqlquery, key)
else:
sqlquery = '%s AND %s' % (sqlquery, key)
if value != None:
params[str(count)] = value
sqlquery = '%s :%s' % (sqlquery, count)
count += 1
# Open a connection and execute the reentitiesst of the
# method in a try-finally block
connection = self._connect(None, False, False)
try:
# Now, look at the sort order
order_conditions = []
schema = self.prm.getSchema(connection, query.kind()) \
if query.order_size() > 0 else None
for entry in query.order_list():
if entry.property() in schema:
for column in schema[entry.property()]:
order = \
'ASC'\
if entry.direction() == \
datastore_pb.Query_Order.ASCENDING \
else 'DESC'
order_conditions.append('%s %s' % (column, order))
if len(order_conditions) > 0:
sqlquery = '%s ORDER BY %s' % (
sqlquery, ', '.join(order_conditions))
# Execute the query
cursor = connection.cursor()
cursor.execute(sqlquery, params)
try:
for i in range(query.offset()):
cursor.next()
except StopIteration:
pass
rows = cursor.fetchmany(max(0,min(1000, query.limit())))
# Convert the rows into PBs
results = []
for data in rows:
# Extract data from row and convert to entity
entity = entity_pb.EntityProto()
results.append(entity)
keyvals = self.prm.rowToDict(cursor, data, False)
self.prm.dictToEntity(keyvals, entity)
# Extract primary key
pk = self.prm.pkFromRow(query.kind(), keyvals)
key_pb = entity.mutable_key().CopyFrom(pk)
# TODO: better support for entity groups?
group = entity.mutable_entity_group()
root = pk.path().element(0)
group.add_element().CopyFrom(root)
# The following code is taken from datastore_file_stub
self.__cursor_lock.acquire()
cursor = self.__next_cursor
self.__next_cursor += 1
self.__cursor_lock.release()
self.__queries[cursor] = (results, len(results))
query_result.mutable_cursor().set_cursor(cursor)
query_result.set_more_results(len(results) > 0)
# Clean up resources at the end
finally:
self._release_connection(connection)
def _Dynamic_Next(self, next_request, query_result):
"""Taken verbatim from datastore_file_stub.py."""
cursor = next_request.cursor().cursor()
try:
results, orig_count = self.__queries[cursor]
except KeyError:
raise apiproxy_errors.ApplicationError(
datastore_pb.Error.BAD_REQUEST,
'Cursor %d not found' % cursor)
count = next_request.count()
for r in results[:count]:
query_result.add_result().CopyFrom(r)
del results[:count]
query_result.set_more_results(len(results) > 0)
What's next?
Our sql connector is in pretty good shape by now. We can insert and get data from the database and run GQL queries. So, what's keeping me from releasing it into the open source world?
Well, I do not necessarily think that the code needs to be perfect, but there are a couple of higher level items that I would like to address. For example, I do not worry too much about implementing the count feature, but deletion is a must. Also, the PRMHelper can currently only handle strings and integers, but there are a lot more data types to be supported. List values are also something important and potentially tricky; I'd like to make sure that there is at least a basic implementation for it.
I'll keep you posted on this blog.