Monday, October 12, 2009

"Hooking" into Java App Engine

Last week, Guido van Rossum gave a lightning talk on RPC instrumentation in App Engine. He was using api hooks to grab all sorts of useful information, then aggregated it in a web view. One of the questions that was asked after the talk was "how can this be done in Java?" I thought I'd give it a shot ;-). In this post, we are going to collect a very simple set of statistics. For each request, we are going to list all api calls that were made, with start and end time and whether they produced an error.

In order to get started, we need to find a way to tie into the RPC mechanism, similarly to the api proxy in python. As it turns out, there is also an ApiProxy class in Java, as described in the unit testing section of the documentation. In the documentation, there is a call

ApiProxy.setDelegate(new ApiProxyLocalImpl(new File(".")){});


where ApiProxyLocalImpl implements an interface com.google.apphosting.api.ApiProxy.Delegate. The delegate is not externally documented (as far as I know), but pulling up the class in Eclipse shows that there is a method makeSyncCall witn a bunch of arguments, where arguments 1 and 2 are strings. Let's try to to intercept that method call!

Since Delegate is not documented, we have no guarantees how it behaves, or if its contract overall will stay the same. Thus, I am reluctant to provide a full implementation of each method. Instead, let's use Dynamic Proxies:

package com.appenginefan.instrumentation;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;

import com.google.apphosting.api.ApiProxy;
import com.google.apphosting.api.ApiProxy.Delegate;

/**
* Catches calls to the ApiProxy and can perform
* measurements.
*/
public class Interceptor implements InvocationHandler {

private final Delegate<?> wrappedDelegate;
private final Delegate<?> wrapper;
private final ThreadLocal<List<String>> cache;

public Interceptor() {
this.wrappedDelegate = ApiProxy.getDelegate();
this.wrapper = (Delegate<?>) Proxy.newProxyInstance(
Interceptor.class.getClassLoader(),
new Class[]{Delegate.class},
this);
this.cache = new ThreadLocal<List<String>>();
}

/**
* Installs the interceptor in the ApiProxy.
*/
public void install() {
ApiProxy.setDelegate(wrapper);
}

/**
* Uninstalls the interceptor from the ApiProxy.
*/
public void uninstall() {
ApiProxy.setDelegate(wrappedDelegate);
}

/**
* Sets or removed a place where the interceptor can
* log method statistics for this call.
*/
public void setCache(List<String> localCache) {
cache.set(localCache);
}

@Override
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {

// Delegate to the wrapped proxy for most method calls
if (!method.getName().equals("makeSyncCall") ||
cache.get() == null) {
return method.invoke(wrappedDelegate, args);
}

// For sync-calls, let's collect some statistics
long startTime = System.currentTimeMillis();
String arg1 = String.valueOf(args[1]);
String arg2 = String.valueOf(args[2]);
Throwable errorInDelegate = null;
Object result = null;
try {
result = method.invoke(wrappedDelegate, args);
} catch (Throwable t) {
errorInDelegate = t;
}
long endTime = System.currentTimeMillis();

// Let's store the statistics somewhere
cache.get().add(String.format(
"%s.%s(), from %s, until %s, %s",
arg1, arg2, startTime, endTime,
(errorInDelegate == null) ?
"ok" : errorInDelegate.getMessage()));

// Return the proxied result, or rethrow exception
if (errorInDelegate != null) {
throw errorInDelegate;
} else {
return result;
}
}

}


This class will intercept only calls to makeSyncCall, and only if statistics collection is turned on for this particular request (by setting a statistics cache using setCache). It does not make many assumptions about the wrapped Delegate, execept that the method parameters for makeSyncCall contain at least three arguments.

Now, all we need is something to turn instrumentation on or off for specific requests. Assume we have the servlet we'd like to test mapped to the path /tst/, and this is only servlet we'd like to turn statistics on for. What we can do is create a servlet filter that we only enable for specific paths. In out web.xml, this might look something like this:

 <filter>
<filter-name>Stats</filter-name>
<filter-class>com.appenginefan.instrumentation.StatsCollector</filter-class>
</filter>
<filter-mapping>
<filter-name>Stats</filter-name>
<url-pattern>/tst/</url-pattern>
</filter-mapping>


Let's look at the implementation. The filter's job is to


  • Install the proxy upon startup

  • Uninstall the proxy during shutdown

  • Turn on stats and write the results to the log file.



Here is how such a class could look like:

package com.appenginefan.instrumentation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;

public class StatsCollector implements Filter {

private static final Logger LOG =
Logger.getLogger(StatsCollector.class.getName());
private Interceptor interceptor;

@Override
public void init(FilterConfig config)
throws ServletException {
interceptor = new Interceptor();
interceptor.install();
}

@Override
public void destroy() {
interceptor.uninstall();
}

@Override
public void doFilter(ServletRequest req,
ServletResponse resp, FilterChain chain)
throws IOException, ServletException {
List<String> collectedData = new ArrayList<String>();
try {
interceptor.setCache(collectedData);
chain.doFilter(req, resp);
} finally {
interceptor.setCache(null);
}
write(collectedData);
}

protected void write(List<String> stats) {
LOG.info("API calls: " + stats);
}
}


In summary, while not explicitly mentioned in the meetup, it is already possible to collect useful information with Java App Engine. Naturally, the example above is much more primitive and not as feature-rich as what was shown during the presentation. However, it shows that implementing things such as hooks is just as feasible in Java as in the Python version.

Sunday, October 11, 2009

Ouch

I cannot watch hospital shows on television: whenever I see somebody cut up, I cannot help but imagine the same thing happening to me -- and that makes me nauseous. Same thing for some reality tv, btw ;-)

It's been going around for a day or two: stories about what happened to T-Mobile Sidekick users. It's sad for the users, and I feel for them. I have an Android phone, and all my data is also "in the cloud". Ok, it's Google's cloud, and it's good to know that the Google File System and other technologies these Google's services are built on (including App Engine) have replication and distribution built in. Yet I am wondering: should I be backing up my data? Thanks to the Data Liberation Front, there are now solutions documented for pretty much any service I use, so I guess it would be possible. Yet, I somehow cannot see me downloading all data and storing it on some disk. Over the course of the years (before I switched to Gmail), I have lost my email history several times, as I switched between email clients, my harddisk crashed, or I simply lost the CD-Rom with the backup. So far, through my last three years or so as Google Apps user, they have not lost a single bit of information I put onto their servers. I could throw all my computers into the trash right now, and all I would have lost would be a couple of savegames from Titan Quest and Diablo 2. I find that kindof reassuring.

How do others handle their online accounts, be it Hotmail, Yahoo, Google, or anything else? Do you folks do manual backups?


Tuesday, October 6, 2009

Random musings of a slightly feverish mind

Last weekend, I was sitting at home with the common cold. I'm sure you know that situation: too miserable to go out or do anything useful, but not miserable enough to stay in bed all day. Anyhow, I was aimlessly surfing the web, when I ran into the Brettspielewelt site. Try it out -- those guys do a lot of the things I'd like my JOGRE port to App Engine do eventually, and more. (PS: Don't worry about the language, the site is both English and German). Personally, I am hooked on Stone Age, but I am getting off topic.

Brettspielewelt allows the user to either run its client directly in the browser, or download and install a local version that connects to their site. The client itself is written in Java, which again reminded me of the JOGRE project. When I first posted about my attempts to do a port, I was asked why not doing an ajaxy client instead. My answer was mostly about not wanting to reinvent the wheel, but that gaming site got me thinking: is there really that hard differentiation, that choice between web and desktop development, that one has to make?

While I am completely in the cloud for most of my office-style apps now, I am still using my desktop for quite a few things that are not browser based -- and a lot of them nowdays have an online component. My Eclipse for programming runs locally, but my revision control is on hosted on code.google.com. I kill Zombies in Left 4 Dead on my local machine, but my scores and game licenses are managed by Steam, enabling me to install run the game from any machine in the world without having to carry install CDs or my savegames with me (not even to mention online play). It seems to me that many successful desktop apps nowadays have a hosted component that adds value for the end user, and (if done right) makes work much more profitable also for the desktop developers out there. Of course, if not entirely fine-tuned, such an online component may backfire and alienate some users more than it attracts.

Are these hybrid desktop apps like amphibians, a transitionary step from the desktop into the cloud? It's possible, considering HTML5, nativeclient, and some of the other great projects that are out there to make the web more powerful. I'm still sneezing too much and don't want to get my crystal ball all icky, so I'd appreciate your comments. Personally, I am not coding much for the Desktop any more -- is anyone out there who still does desktop software and is using App Engine in cool and interesting ways? Please add your examples as comments to this thread.

Thursday, October 1, 2009

Non-sharded counters, part 2: using task queue

A few months ago, I posted an article on how to use cron jobs to increment non-sharded counters. I mentioned at that time that it would also make sense to try the task queue API for this but have not followed up yet. Let's take a look at this today.

As a brief refresher, let's remember the current implementation of increment_counter_later (if you are not familiar with the sample app, please revisit the original article).

def increment_counter_later(model_id, delta):

# Increment the particular counter
change_memcache_counter(COUNTER_PREFIX % model_id, delta)

# Store the name of the counter in a "bucket" with increasing number
bucket_id = change_memcache_counter(TODO_COUNTER, 1)
memcache.set(str(bucket_id), model_id)


Basically, we store our delta in memcache and then simulate a task queue by storing the model_id in a "bucket". A cron job retrieves that bucket id and applies the count.

Let's try the naiive replacement first: instead of simulating the task queue, let's just use the new API:

def increment_counter_later(model_id, delta):

# Increment the particular counter
change_memcache_counter(COUNTER_PREFIX % model_id, delta)

# Add a task into the queue for that counter
taskqueue.add(url='/task', params={'model_id': model_id})


This implementation assumes that there is going to be a url /task that can be invoked by the task queue mechanism. Such a script can be easily done -- it is a simpler version of our original cron job that

  • fetches the delta from memcache

  • if the delta is not 0, increments the count in the store

  • decreases the delta in memcache



The following script performs that task:

import counter
from google.appengine.api import memcache
from google.appengine.ext import db
import os
import urllib


def main():

print 'Content-Type: text/html'
print ''
model_id = urllib.splitvalue(os.environ['QUERY_STRING'])[1]
count = memcache.get(counter.COUNTER_PREFIX % model_id)
if count:
counter.change_memcache_counter(
counter.COUNTER_PREFIX % model_id, -count)
db.run_in_transaction(
counter.increment_counter,
model_id,
count)
print 'Incremented %s by %s' % (model_id, count)
else:
print 'Nothing to do for %s' % model_id

if __name__ == "__main__":
main()


We can deploy the modified application and verify that it works. However, this is not code we would want to run in production! First of all, we would be running out of quota very quickly. The current implementation of increment_counter_later creates a new task for every counter increase. Let's assume for the sake of this argument that this is the only task we need to run and that we have a quota of 100,000 task insertions. In that case, we can only increase counters 100,000 times per day. That's not much for a popular app, where popular counts may get increased several times per minute (or even second)!

What we want to do is bundle increments for the same counter together. The task queue has a countdown parameter that lets us defer task execution by a certain amount of time. The following sample code uses this to defer task execution by 60 seconds. It also puts a lock into memcache with the same expiration time, thus not creating any new tasks for this counter until the time expires:

def increment_counter_later(model_id, delta):

# Increment the particular counter
change_memcache_counter(COUNTER_PREFIX % model_id, delta)

# If there's already a task in line, we are done
if not memcache.add('LOCK_%s' % model_id, 1, time=60):
print 'already locked'
return

# Add a task into the queue for that counter
taskqueue.add(
url='/task',
params={'model_id': model_id},
countdown=60)


This implementation is slightly better, but it's still not ideal. Assumed that due to some other problems in the code we run out of quota, or anything else goes wrong while inserting the task. Right now, these problems would bubble up into the main handler, thus resulting in an error reported to the end user. In most apps, it is preferrable to fail silently and just skipping one counter increase. We should do the same in this implementation:

from google.appengine.runtime import apiproxy_errors

# ...

def increment_counter_later(model_id, delta):

# Increment the particular counter
change_memcache_counter(COUNTER_PREFIX % model_id, delta)

# If there's already a task in line, we are done
if not memcache.add('LOCK_%s' % model_id, 1, time=60):
print 'already locked'
return

# Add a task into the queue for that counter
try:
taskqueue.add(
url='/task',
params={'model_id': model_id},
countdown=60)
except taskqueue.Error: # task-queue specific error
# do something here, like logging the problem
pass
except apiproxy_errors.Error: # includes out of quota error
# do something here, like logging the problem
pass


This implementation of increment_counter_later should accomplish our goal of reduced-latency counts (because it does not happen while the browser waits for a response, and because we decrease contention issues by bundling increases for the same count) -- and it should do it without being a resource hog (if the app still uses up too many tasks, consider increasing the buffer time from 60 seconds to 5 minutes).

As always, if you have any question or find a bug, feel free to post it to this article.