Thursday, October 1, 2009

Non-sharded counters, part 2: using task queue

A few months ago, I posted an article on how to use cron jobs to increment non-sharded counters. I mentioned at that time that it would also make sense to try the task queue API for this but have not followed up yet. Let's take a look at this today.

As a brief refresher, let's remember the current implementation of increment_counter_later (if you are not familiar with the sample app, please revisit the original article).

def increment_counter_later(model_id, delta):

# Increment the particular counter
change_memcache_counter(COUNTER_PREFIX % model_id, delta)

# Store the name of the counter in a "bucket" with increasing number
bucket_id = change_memcache_counter(TODO_COUNTER, 1)
memcache.set(str(bucket_id), model_id)


Basically, we store our delta in memcache and then simulate a task queue by storing the model_id in a "bucket". A cron job retrieves that bucket id and applies the count.

Let's try the naiive replacement first: instead of simulating the task queue, let's just use the new API:

def increment_counter_later(model_id, delta):

# Increment the particular counter
change_memcache_counter(COUNTER_PREFIX % model_id, delta)

# Add a task into the queue for that counter
taskqueue.add(url='/task', params={'model_id': model_id})


This implementation assumes that there is going to be a url /task that can be invoked by the task queue mechanism. Such a script can be easily done -- it is a simpler version of our original cron job that

  • fetches the delta from memcache

  • if the delta is not 0, increments the count in the store

  • decreases the delta in memcache



The following script performs that task:

import counter
from google.appengine.api import memcache
from google.appengine.ext import db
import os
import urllib


def main():

print 'Content-Type: text/html'
print ''
model_id = urllib.splitvalue(os.environ['QUERY_STRING'])[1]
count = memcache.get(counter.COUNTER_PREFIX % model_id)
if count:
counter.change_memcache_counter(
counter.COUNTER_PREFIX % model_id, -count)
db.run_in_transaction(
counter.increment_counter,
model_id,
count)
print 'Incremented %s by %s' % (model_id, count)
else:
print 'Nothing to do for %s' % model_id

if __name__ == "__main__":
main()


We can deploy the modified application and verify that it works. However, this is not code we would want to run in production! First of all, we would be running out of quota very quickly. The current implementation of increment_counter_later creates a new task for every counter increase. Let's assume for the sake of this argument that this is the only task we need to run and that we have a quota of 100,000 task insertions. In that case, we can only increase counters 100,000 times per day. That's not much for a popular app, where popular counts may get increased several times per minute (or even second)!

What we want to do is bundle increments for the same counter together. The task queue has a countdown parameter that lets us defer task execution by a certain amount of time. The following sample code uses this to defer task execution by 60 seconds. It also puts a lock into memcache with the same expiration time, thus not creating any new tasks for this counter until the time expires:

def increment_counter_later(model_id, delta):

# Increment the particular counter
change_memcache_counter(COUNTER_PREFIX % model_id, delta)

# If there's already a task in line, we are done
if not memcache.add('LOCK_%s' % model_id, 1, time=60):
print 'already locked'
return

# Add a task into the queue for that counter
taskqueue.add(
url='/task',
params={'model_id': model_id},
countdown=60)


This implementation is slightly better, but it's still not ideal. Assumed that due to some other problems in the code we run out of quota, or anything else goes wrong while inserting the task. Right now, these problems would bubble up into the main handler, thus resulting in an error reported to the end user. In most apps, it is preferrable to fail silently and just skipping one counter increase. We should do the same in this implementation:

from google.appengine.runtime import apiproxy_errors

# ...

def increment_counter_later(model_id, delta):

# Increment the particular counter
change_memcache_counter(COUNTER_PREFIX % model_id, delta)

# If there's already a task in line, we are done
if not memcache.add('LOCK_%s' % model_id, 1, time=60):
print 'already locked'
return

# Add a task into the queue for that counter
try:
taskqueue.add(
url='/task',
params={'model_id': model_id},
countdown=60)
except taskqueue.Error: # task-queue specific error
# do something here, like logging the problem
pass
except apiproxy_errors.Error: # includes out of quota error
# do something here, like logging the problem
pass


This implementation of increment_counter_later should accomplish our goal of reduced-latency counts (because it does not happen while the browser waits for a response, and because we decrease contention issues by bundling increases for the same count) -- and it should do it without being a resource hog (if the app still uses up too many tasks, consider increasing the buffer time from 60 seconds to 5 minutes).

As always, if you have any question or find a bug, feel free to post it to this article.

5 comments:

Arachnid said...

You could use the Task Queue's named task support to avoid the need for a memcache lock: Just name each task after the 60-second-period it covers.

The App Engine Fan said...

> Just name each task after the 60-second-period it covers.

That would still count against the quota and thus defeat the purpose. The moment you do an insert, whether it results in a successful insertion or not, your task quota decreases. Therefore, memcache is preferable.

Arachnid said...

Hm, you're right. That's unfortunate.

Ian Lewis said...

Memcached really shouldn't be used for persistent data like counters. The memcached api might just throw away the counter as there is no contract that guarantees that the data will be there when the task runs. That could leave you with some woefully inaccurate counters.

The App Engine Fan said...

> Memcached really shouldn't be
> used for persistent data
> like counters

Nothing is always absolutely so. If you browse the blog, you'll see that there are at least 1/2 dozen articles about counters with several different implementations (like sharding, for example). Each implementation has its distinct advantages and disadvantages. I describe some of them in the the predecessor to this article, but it's certainly not an extensive debate.

You are mentioning that memcache is not guaranteed to retain the data long enough. That is correct. In practice however, I have made the experience that memcache is usually good enough for these tasks, as long as one does not push the timeout too far (which admittedly depends also on your application). It depends on how accurate the count needs to be. This solution is for high-volume counts (think page visits as the prototypical example) that cannot be sharded for one reason or another (like queries on entities), but that do not need to be 100% exact. If you need exact counters under similar circumstances, you might want to combine this technique with a different approach, such as storing the deltas in shards. That will give you the best of both worlds -- but the additional headache of cleaning up the shards after the fact. Think of a datastore where every entity has its own counter; then consider the additional load that shards pose for them (and that count against your quota). One would want to provide for means to eliminate unused shards. This all can be done, but might be a little too much for a single blog post. If you have a favorite implementation, feel free to add a link to this discussion thread :-)