Bridging CompleteableFutures and RxJava

Asynchronous computations can be a tough to wrap your head around. We often think of programming as a series of steps, kind of like this:

In this example, each step is executed only after the previous line of code has completed. But suppose we don’t want to wait for long running tasks to complete. With asynchronous (hereafter called async) programming, we start the execution of a command and register a callback to be run when the command completes. For example, the call to fetchData may take some time to complete. In Java, we can use a CompleteableFuture to handle the result. This allows other code to be executed instead of waiting around.

Why write Asynchrous Code at all?

In today’s programming world, there’s a move towards distributed programming, microservices, and web API’s. Calling a web service somewhere on the internet and waiting for it to respond with the information you requested can take time. Sometimes millisecs. Sometimes much, much longer. It makes for a truly awful user experience to have an application wait, or hang, waiting for the service to respond. And many apps need to call multiple services, or the service they call may be calling multiple services. And bad things happen on the internet, servers go down, etc. Your users will trash your application in on-line reviews if it appears unresponsive. There’s already an excellent guide to using Java’s CompleteableFuture at Guide To CompletableFuture. Instead of rehashing that, I’ll talk about bridging futures with reactive observables.

Why rxJava?

I find using the Observer pattern easier to read and reason through than Java’s Futures. Especially when you start nesting or chaining them. Here’s an example, suppose I need to run a whole bunch of identical calls in parallel and collect the results of each? Using CompleteableFuture, I could create a method that runs all the futures and collects the results, like so:

Then I would call it like so:

But what happens if one of the calls throws an error? Also, this only returns the collection when all the futures complete. That might not be what you really want. This is where an Observer may be handy, it can pass on the results as each future completes rather than waiting for all to complete AND it can signal an error if one occurs.

CompleteableFuture to rxJava

The first thing we need is a method to convert a CompleteableFuture to an Observer. That’s pretty simple:

This takes a future and returns an Observable that will eventually (we think) emit a value. And we can subscribe to the observable:

Running Many Future’s … But Only one Observable

Now we can get fancy. What if we want to run a bunch of futures in parallel like our earlier example? Here’s a method that takes a collection of items and a function that will convert an item to a CompleteableFuture, then returns an Observable that will emit results as each item is processed asynchronously.

Now we can process multiple items asynchronously, handle the results as each future completes and get a signal if an error occurs. Here’s an example:

Polyglot coder. Deep-sea Researcher. Zazen aficionado. I think squids are pretty cool.