Thursday, September 10, 2009

Using ExecutorCompletionService to synchronize multithreaded workflow

Today I ran into a problem where I needed to make sure that one multithreaded phase of processing had completely ended before starting another. Specifically, I was retrieving documents from S3 to load into a lucene index, and wanted to retry all document requests that had failed due to S3 flakiness, connectivity issues, i.e. standard distributed computing error conditions.

In other situations requiring synchronization between threads, I've used a CountDownLatch. This works really well when you know the exact number of threads that you need to synchronize. You initialize the latch with the number of threads that you are synchronizing. When they finish work they decrement the latch, when the latch count goes to 0 you continue processing.

This time was different because instead of synchronizing threads,  I was trying to halt processing until all asynchronous submitted tasks had completed processing. I was queuing up several hundred thousand tasks into a thread pool, and did not know when that thread pool would be finished with the work, or how many threads would be running when the entire job completed, or even exactly how many tasks I had to run -- that number  depended on the number of documents being fetched, which is always growing.

Fortunately my situation was not a unique one.  I figured that the first place to look was the java concurrency library, and when I did some research, I found that ExecutorCompletionService was exactly what I needed.

ExecutorCompletionService works with a supplied Executor using Runnable/Callable tasks. I decided to use Callable tasks, as they allowed me to return and inspect a value, and throw exceptions.  As those tasks complete, they are placed onto a queue that can be accessed via the poll() or take() methods. This approach simplified my life:
  1. It allowed me to use tasks and task status to control logic flow instead of threads and thread status. 
  2. It freed me up from having to know how many threads or tasks I was working with.
  3. It provided me with an access point to each task that I could use to analyze the results.
  4. When the ExecutorCompletionService queue was empty, I knew it was time to then retry all failed results. 
Point (1) above is the foundation for the points that follow. When I used Future and Callable to implement the work I needed to do, I was able to return the results I cared about and process them. Specifically it allowed the code that ran the original key fetch loop to not worry about tracking and storing exceptions, which made for much simpler looping logic.

ExecutorCompletionService is a great example of how picking the right primitives makes it easy to compose highly functional helper objects. In this case the primitives involved were a (thread pool) executor and a (linked blocking) queue. (side note: I don't mean to sound like such a concurrent lib fanboy, but I'm really happy I didn't have to write this code myself, and really happy that the choice of primitives that the authors used enabled creation of classes like the ExecutorCompletionService. This stuff used to take significant effort on my part, and significant bugs were usually introduced :)

Here is an example of the ExecutorCompletionService in action, the relevant bits bolded and italicized.

public void buildEntireIndex() throws Exception {

        boolean moreKeys = true;
        int submittedJobCt = 0;
        // tracking all failed keys for later retry
        Map failedKeys = new HashMap();
        // ErrorCapturingExecutor is subclassed from ThreadPoolExecutor. 
        //I override ThreadPoolExecutor.afterExecution() to queue and later analyze exceptions.
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ErrorCapturingExecutor executor = new ErrorCapturingExecutor(threadCount, 
            threadCount, 0l, TimeUnit.SECONDS, linkedBlockingQueue);
        // CompletionService works with the supplied executor and queues up tasks as they finish.
        executorCompletionService = new ExecutorCompletionService(executor);
        String lastKey = null;
        while(moreKeys  == true) {    
            Set keys =  viewKeyRetriever.retrieveKeys(lastKey);
            if(keys.size() > 0) {
                String array[] = new String[keys.size()];
                lastKey = array[keys.size()-1];
                // I need to keep the number of waiting tasks bounded. 
                if(linkedBlockingQueue.size() > MAXQUEUESIZE) {
                // this is where I actually submit tasks
                // I only know how many jobs I need to wait for when I've retrieved all keys.
            else {
                moreKeys = false;

        // I use the ExecutorCompletionService queue to check on jobs as they complete. 
        for(int i = 0; i < submittedJobCt;i++) {
            Future finishedJob = executorCompletionService.take();
            // at this point, all I really care about is failures that I need to retry.
            BuildResults results = finishedJob.get();
            if(results.hasFailures()) {
        // I can use failedKeys to retry processing on keys that have failed. 
        // Logic omitted for clarity.


The ProcessKeys method is shown below: I broke it out because I needed to call it again when re-processing failed keys

private void processKeys(Set keys) {
        // the builder builds the index from the content retrieved using the passed in keys.
        final IndexBuilder builder = indexBuilderFactory.createBuilder(keys);
          executorCompletionService.submit(new Callable() {
          public BuildResults call() throws Exception {
              BuildResults buildResults = null; 
              try {
                  // buildResults contains all information I need to post process the task.
                  buildResults =;
              } catch (Exception e) {
                  throw e; // caught by ErrorCapturingExecutor
              return buildResults;


Wednesday, September 9, 2009

Using ThreadLocal to pass in dummy components

Synchronization of data access across multiple threads is tricky.  While Java's threading primitives are fairly easy to understand and use, there can be unintended performance consequences of making an object thread safe. Depending on how objects synchronize other objects, you can also end up with deadlocks that are no fun to debug. For example if object1 tries to lock object2 while object2 is trying to lock object1, you're in for a long night.

In general, anything that reduces synchronization of data across threads reduces the potential for unintended consequences. An alternative to making an object threadsafe is to make it thread-local. Thread local objects provide a separate copy of themselves for all threads. Each thread can only see it's local instance of that object, and is free to modify it at will, without needing to synchronize.

Thread-local variables used to have significant performance issues, and there have been bugs in previous to 1.6 versions. Also, it is possible to easily run out of memory with large numbers of threads using large thread-local objects. But assuming you go in with your eyes open, reducing synchronization across the threads in your application is good for performance and can significantly reduce complexity.

Another benefit of thread-local variables (as if simplification and performance gains aren't enough) is that they make it easy to swap in stub components at unit test time. Why would you do this instead of passing in the component? I ended up using thread-local variables for my components when I had to instantiate an object via the Class.forName() method, and didnt know/want to know about how to wire up dependent components. It's a trick I want to remember (so I'm writing it down :)

I implement the component as a thread-local variable is via an anonymous inner class:

ThreadLocal sequenceClientLocal = new ThreadLocal() {
        protected SequenceClient initialValue() {
            SequenceClient sequenceClient =  null;
            try {
                sequenceClient = SequenceClientImpl.getInstance(hosts,hexIdNode);
            } catch (Exception e) {
                sequenceClient = null;
            return sequenceClient;

In order to swap this default value out for a stub file, I add a setter to override it:

public void setSequenceClientLocal(ThreadLocal sequenceClientLocal) {
        this.sequenceClientLocal = sequenceClientLocal;

At unit test time, I can stub in a dummy class by calling the setter:

public class TestCollapserAgent {

  public void setUp() throws Exception {

    sequenceClient = new DummySequenceClientImpl(1);

    collapserAgent.setSequenceClientLocal(new ThreadLocal() {
            protected SequenceClient initialValue() {
                return sequenceClient;

  // unit tests follow....


Thursday, September 3, 2009

The Ratio of Ceremony vs Essence, aka Framework Debt

This is going to be the kind of post I had sworn off of: a lot of opinion, with some rambling mixed in. I apologize in advance for that, but occasionally I need to vent about things that deeply disturb me, and venting tends to be nonlinear.

I just spent the last week trying to work with a legacy system component that was implemented using the Spring framework. This component read data from a database into a Lucene index wrapped by Compass. At the time of implementation, the lead engineer was using JPA, to load database records into POJOs, which he then annotated so that they could be serialized via JAXB, which enabled Compass to read them in as Lucene Documents. Whew!

Because time was limited and the code was already in production, I decided to ignore my fundamental misgivings about frameworks and Java Acronyms, and make the minimal modifications to the existing source that would get it to take input from S3 instead of a database.

After a day of struggle, I had figured out what was going on, and was astounded by the amount of code required just to set up the relatively simple business logic. When I hit a 'schema not found' error trying to load the application.xml, I gave up, ripped out the business logic, and re-implemented the entire thing in a matter of hours. With a lot less code. I know that the original implementation of the Spring based code took a week or so to write.

The massive increase in efficiency is not because I'm a brilliant coder. I wish I was, but I've worked with brilliant coders and I'm not one of them. It's because the actual business logic was pretty minimal. The logic required to implement and maintain the Spring application required a lot of code that could only be described as Ceremonial, as opposed to Essential business logic. I first read about Ceremonial vs Essential code here, the night after I had exorcised Spring from the logic. The timing couldn't have been more appropriate.

What is Ceremonial code? It is code that has nothing to do with implementing a business requirement. In Spring, I define Ceremonial code as:
  1. Configuration as code
  2. Dependency Injection
  3. (Pedantic use of) Interfaces
The three examples above are not terribly bad, in fact they come from decent intentions ("the road to hell..."). But put together they have an exponentially bad effect. They are, when added to a developer's blind belief in the goodness of all things Frameworky, the Four Horsemen of the (Framework) Apocalypse.

Configuration As Code

Separating configuration into a data file is inherently a good idea. You don't want to hardcode variables that you would then have to rebuild the application to change. I'm not sure how this basically sound idea warped into "hey, let's put EVERYTHING into configuration", but the biggest problem with this approach is that now part of the logic is in code, the other part is in a massive XML file. You need both to understand the control flow of the application, so  you spend a lot of time toggling back and forth, saying "What class is being called? Oh, let me check in xml configuration. Oh, that's the class. Great. What was I doing?" Maybe some people see this kind of rapid mental stack management as interesting and novel brain training. I see it as wasting time, time that I could be spending either coding or testing a feature that someone is paying me to implement.

Dependency Injection

This too, starts off as a great idea. One of the big issues people (OK, I'm talking about myself, but using the word 'people' to get some more legitimacy) had with EJB 2.0 code was that it was really hard to test. You had to have the whole stack up and running, and validating the integrity of different components in the stack was so hard we just didnt do it.

Dependency Injection/Inversion of Control allows you to parameterize the components of an object, making that object really easy to test. Just expose components with getters and setters, and you can dummy them up and test that object in true isolation! Again, there is still nothing really flawed at this point.

The major flaw in Dependency Injection comes at implementation.  Objects need all of their components in a known, initialized state, in order to function effectively. Dependency Injection as implemented in Spring is usually done in the configuration file. Objects that are created in the configuration file  have all of their components set in their configuration.

It is very easy to miss setting a component in the configuration file. This means that the object will initialize in a bad state that becomes apparent when you try to use it. People use constructors because they can specify components as parameters to the constructor, which is an explicit way of saying "This component needs components X, Y, and Z to run".

Using a constructor provides a foolproof way to successfully initialize an object without having to test for initialization success. If the constructor returns, you're good. If not, you know that the object is not usable.

In order to be able to be configurable via Spring, objects must (a) have a default (no argument)  public constructor and expose all of their required components via setters. There is no way to enforce that setup has been correct, so the developer has to spend time looking at the getters and setters of the object to determine what components they need to supply at configuration time. When I compare that effort to the effort of looking at the constructor parameters, it feels very inefficient.

Pedantic Use of Interfaces

The goal of the Java Interface is (a) separate functionality from initialization, and (b) provide a contract that a caller and callee can communicate across. This makes sense in the following two cases:
  1. You have a complex object and you only want to expose a part of it to a caller. For example you have a parent class and you want to expose a callback interface to the child class.
  2. You have multiple implementations of the same functionality and you don't want the caller to care about which object they are calling. 
What I see all over Java-Land, and especially in Spring, is interfaces being used because someone got pedantic about separating functionality from initialization. I fail to see the use of an interface when used to abstract the implementation of all of the methods of a single class. You're writing two structures when one could do the job just fine. Actually, you end up writing three structures: the interface, the implementation, and a factory object, which is more ceremonial code. Even if you need the interface, you could still have the implementation object return an instance of itself cast to the interface via a static initialization method:

public class S3AccessorImpl implements S3Accessor {

    private static final int DEFAULT_SET_SIZE = 1000;
    private S3Service service;
    private Logger logger;

    public static S3Accessor getInstance(AWSCredentials creds) throws S3ServiceException {
        return new S3AccessorImpl(creds);
    protected S3AccessorImpl(AWSCredentials creds) throws S3ServiceException {
        logger = Logger.getLogger(this.getClass());
        service = new RestS3Service(creds);


In spite of my comments above, I am a fan of using interfaces as the boundaries between an components because it facilitates easier unit testing. But I'm not entirely sold on abstracting the creation of an object to a Factory that returns the interface that object implements -- not when the above method (a) hides creation from the caller and (b) doesn't require an extra class with a single 'createFoo' method.

Also, I don't understand always writing interfaces first, then implementation classes second. I tend to implement classes until I have a real need for an interface, i.e. during unit testing when I am going to submit a 'dummy' component in place of a real one. 


My recent experience with Spring has reminded me of the existence of 'Framework Debt'. Framework Debt is the Technical Debt required to implement a solution with a given Framework. In short it is determined by the ratio of time spent writing  and maintaining ceremonial code vs the amount of time spent writing and maintaining essential business code. The problem with most frameworks, Spring included, is that they do not distinguish between ceremonial and essential code, because to them, it's _all_ essential code. And, to work in that particular framework, ceremonial code is absolutely essential, and having to maintain and understand a bunch of logic that has nothing to do with application functionality seems inherently wrong to me.

I actually do like some frameworks I've run into. Rails is great because of it's 'convention over configuration', but that is another kind of technical debt. Fortunately it is pretty low in Rails, and as a result applications can be rapidly developed in Rails without losing maintainability. But even Rails feels too heavy for me at times.  I do write apps that don't need the overhead of MVC. For these apps, Sinatra allows me to quickly get path routing out of the way and concentrate on the underlying code.