In the Beginning
Once upon a time, I needed to call a microservice that could return a lot of data. So much data, that the request would often timeout. The usual solution to this is to request smaller chunks of data, also called pages. A page request takes two parameters, often called limit and offset. Limit is the number of items to be returned. Offset is the number of items to be skipped before beginning to return items
This being programming, solving one problem creates others. In my case, two problems appeared. First, this being the internet, an occasional page request would fail. Second, the simplest means to fetch pages is to do submit requests sequentially; that is, one at a time. But for an IO bound application, this can take a long, long time, resulting in unhappy customers. The next simplest alternative to sequential requests is to fire off all page requests simultaneously. This is a bit like using Low Orbit Ion Canon on my own services; it’s easy to overwhelm services using that technique.
My solution to scheduling requests was to put them all into a queue, then allow a few at a time to be executed in parallel. When one of the requests is completed, the next pending request is pulled from the queue and run. This is repeated until the queue is empty.
Handling Request Fails
My first task was to deal with the case where a page request fails. My solution was to allow the request to be reattempted a few times if it fails. After the number of attempts I’ve defined is reached, the request should fail with an exception.
To allow this, I convert my remote service call to a Supplier, that is, a function that returns the values from my web request. I’ll provide an example of this later. For now you just need to know that, since Java 8, it’s very easy to create a Supplier, here’s a simple one that just returns “Hello World”.
Supplier<String> fn = () -> "Hello World";
With a supplier in hand, I wrap it with a reactive Observable. I choose to do this as it allows one to compose functions to handle returned values as well as errors. Note the call to
Observable.defer(...) on line 39. The request won’t be executed until a subscriber subscribes to the observable. This is important! Otherwise, values may be emitted by the observable before a subscriber is attached to consume them. That’s like going to the grocery store and then tossing everything in the trash can as soon as you get home.
Here’s my class that retries web service calls. It’s used by the
RequestPager presented later on
Scheduling Page Requests
In order to manage page request, three things are needed:
- A queue of Suppliers; each supplier is for fetching a different page.
- An ExecutorService that will service the queue. This is only for servicing this queue. If a page request fails, the pager will call shutdownNow on it, so you don’t want to share this executor with any code outside this particular set of page requests.
- An Observable to emit the page results as they are retrieved.
In addition, here’s a few things you will need to know a priori:
- expectedCount: The total number of objects that you are expecting to be returned.
- pageSize: The number of objects to be returned by each page request.
- retryAttempts: If a page request fails , how many times to you want to reattempt the request.
- threadCount: The number of simultaneous page requests that are allowed.
It’s helpful to peek ahead to see how we would set up and use the RequestPager:
Below is the implementation of the request pager. Again, the general flow of usage is:
- Create a new request pager:
var pager = new RequestPager<List<MyObject>>(function, retryAttempts, threadCount);
- Build a request runner from the pager:
var runner = pager.build(expectedCount, pageSize);
- Subscribe to consume results:
- Start fetching pages: