Doing actual non-blocking, incremental HTTP access with async-http-client
Async-http-client
library, originally developed at Ning (by Jean-Francois, Tom, Brian and
maybe others and since then by quite a few others) has been around for a
while now.
Its main selling point is the claim for better scalability
compared to alternatives like Jakarta
HTTP Client (this is not the only selling points: its API also seems
more intuitive).
But although library itself is capable of working well in non-blocking mode, most examples (and probably most users) use it in plain old blocking mode; or at most use Future to simply defer handling of respoonses, but without handling content incrementally when it becomes available.
While this lack of documentation is bit unfortunate just in itself, the
bigger problem is that most usage as done by sample code requires
reading the whole response in memory.
This may not be a big deal for
small responses, but in cases where response size is in megabytes, this
often becomes problematic.
1. Blocking, fully in-memory usage
The usual (and potentially problematic) usage pattern is something like:
AsyncHttpClient asyncHttpClient = new AsyncHttpClient(); Future<Response> f = asyncHttpClient.prepareGet("http://www.ning.com/ ").execute(); Response r = f.get();
byte[] contents = r.getResponseBodyAsBytes();
which gets the whole response as a byte array; no surprises there.
2. Use InputStream to avoid buffering the whole entity?
The first obvious work around attempt is to have a look at Response object, and notice that there is method "getResponseBodyAsStream()". This would seemingly allow one to read response, piece by piece, and process it incrementally, by (for example) writing it to a file.
Unfortunately, this method is just a facade, implemented like so:
public InputStream getResponseBodyAsStream() {
return new ByteArrayInputStream(getResponseBodyAsBytes());
}
which actually is no more efficient than accessing the whole content as a byte array. :-/
(why is it implemented that way? Mostly because underlying non-blocking I/O library, like Netty or Grizzly, provides content using "push" style interface, which makes it very hard to support "pull" style abstractions like java.io.InputStream -- so it is not really AHC's fault, but rather a consequence of NIO/async style of I/O processing)
3. Go fully async
So what can we do to actually process large response payloads (or large PUT/POST request payloads, for that matter)?
To do that, it is necessary to use following callback abstractions:
- To handle response payloads (for HTTP GETs), we need to implement AsyncCompletionHandler interface.
- To handle PUT/POST request payloads, we need to implement BodyGenerator (which is used for creating a Body instance, abstraction for feeding content)
Let's have a look at what is needed for the first case.
(note: there are existing default implementations for some of the pieces -- but here I will show how to do it from ground up)
4. A simple download-a-file example
Let's start with a simple case of downloading a large file into a file, without keeping more than a small chunk in memory at any given time. This can be done as follows:
public class SimpleFileHandler implements AsyncHandler<File> { private File file; private final FileOutputStream out; private boolean failed = false; public SimpleFileHandler(File f) throws IOException { file = f; out = new FileOutputStream(f); } public com.ning.http.client.AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart part) throws IOException { if (!failed) { part.writeTo(out); } return STATE.CONTINUE; } public File onCompleted() throws IOException { out.close(); if (failed) { file.delete(); return null; } return file; } public com.ning.http.client.AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders h) { // nothing to check here as of yet return STATE.CONTINUE; } public com.ning.http.client.AsyncHandler.STATE onStatusReceived(HttpResponseStatus status) { failed = (status.getStatusCode() != 200); return failed ? STATE.ABORT : STATE.CONTINUE; } public void onThrowable(Throwable t) { failed = true; } }
Voila. Code is not very brief (event-based code seldom is), and it could
use some more handling for error cases.
But it should at least show
the general processing flow -- nothing very complicated there, beyond
basic state machine style operation.
5. Booooring. Anything more complicated?
Downloading a large file is something useful, but while not a contriver example, it is rather plain. So let's consider the case where we not only want to download a piece of content, but also want uncompress it, in one fell swoop. This serves as an example of additional processing we may want to do, in incremental/streaming fashion -- as an alternative to having to store an intermediate copy in a file, then uncompress to another file.
But before showing the code, however, it is necessary to explain why this is bit tricky.
First, remember that we can't really use InputStream-based processing here: all content we get is "pushed" to use (without our code ever blocking with input); whereas InputStream would want to push content itself, possibly blocking the thread.
Second: most decompressors present either InputStream-based abstraction, or uncompress-the-whole-thing interface: neither works for us, since we are getting incremental chunks; so to use either, we would first have to buffer the whole content. Which is what we are trying to avoid.
As luck would have it, however, Ning Compress package (version 0.9.4, specifically) just happens to have a push-style uncompressor interface (aptly named as "com.ning.compress.Uncompressor"); and two implementations:
- com.ning.compress.lzf.LZFUncompressor
- com.ning.compress.gzip.GZIPUncompressor (uses JDK native zlib under the hood)
So why is that fortunate? Because interface they expose is push style:
public abstract class Uncompressor { public abstract void feedCompressedData(byte[] comp, int offset, int len) throws IOException; public abstract void complete() throws IOException;
}
and is thereby usable to our needs here. Especially when we use additional class called "UncompressorOutputStream", which makes an OutputStream out of Uncompressor and target stream (which is needed for efficient access to content AHC exposes via HttpResponseBodyPart)
6. Show me the code
Here goes:
public class UncompressingFileHandler implements AsyncHandler<File>, DataHandler // for Uncompressor { private File file; private final OutputStream out; private boolean failed = false; private final UncompressorOutputStream uncompressingStream; public UncompressingFileHandler(File f) throws IOException { file = f; out = new FileOutputStream(f); } public com.ning.http.client.AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart part) throws IOException { if (!failed) { // if compressed, pass through uncompressing stream if (uncompressingStream != null) { part.writeTo(uncompressingStream); } else { // otherwise write directly part.writeTo(out); } part.writeTo(out); } return STATE.CONTINUE; } public File onCompleted() throws IOException { out.close(); if (uncompressingStream != null) { uncompressingStream.close(); } if (failed) { file.delete(); return null; } return file; } public com.ning.http.client.AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders h) { // must verify that we are getting compressed stuff here: String compression = h.getHeaders().getFirstValue("Content-Encoding"); if (compression != null) { if ("lzf".equals(compression)) { uncompressingStream = new UncompressorOutputStream(new LZFUncompressor(this)); } else if ("gzip".equals(compression)) { uncompressingStream = new UncompressorOutputStream(new GZIPUncompressor(this)); } } // nothing to check here as of yet return STATE.CONTINUE; } public com.ning.http.client.AsyncHandler.STATE onStatusReceived(HttpResponseStatus status) { failed = (status.getStatusCode() != 200); return failed ? STATE.ABORT : STATE.CONTINUE; } public void onThrowable(Throwable t) { failed = true; } // DataHandler implementation for Uncompressor; called with uncompressed content: public void handleData(byte[] buffer, int offset, int len) throws IOException { out.write(buffer, offset, len); } }
Handling gets bit more complicated here, since we have to handle both case where content is compressed; and case where it is not (since server is ultimately responsible for applying compression or not).
And to make call, you also need to indicate capability to accept compressed data. For example, we could define a helper method like:
public File download(String url) throws Exception { AsyncHttpClient ahc = new AsyncHttpClient(); Request req = ahc.prepareGet(url) .addHeader("Accept-Encoding", "lzf,gzip") .build(); ListenableFuture<File> futurama = ahc.executeRequest(req,
new UncompressingFileHandler(new File("download.txt"))); try { // wait for 30 seconds to complete return futurama.get(30, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { throw new IOException("Failed to download due to timeout"); } }
which would use handler defined above.
7. Easy enough?
I hope above shows that while doing incremental, "streaming" processing is bit more work, it is not super difficult to do.
Not even when you have bit of pipelining to do, like uncompressing (or compressing) data on the fly.