Thursday, May 24, 2012

Doing actual non-blocking, incremental HTTP access with async-http-client

Async-http-client library, originally developed at Ning (by Jean-Francois, Tom, Brian and maybe others and since then by quite a few others) has been around for a while now.
Its main selling point is the claim for better scalability compared to alternatives like Jakarta HTTP Client (this is not the only selling points: its API also seems more intuitive).

But although library itself is capable of working well in non-blocking mode, most examples (and probably most users) use it in plain old blocking mode; or at most use Future to simply defer handling of respoonses, but without handling content incrementally when it becomes available.

While this lack of documentation is bit unfortunate just in itself, the bigger problem is that most usage as done by sample code requires reading the whole response in memory.
This may not be a big deal for small responses, but in cases where response size is in megabytes, this often becomes problematic.

1. Blocking, fully in-memory usage

The usual (and potentially problematic) usage pattern is something like:

  AsyncHttpClient asyncHttpClient = new AsyncHttpClient();
  Future<Response> f = asyncHttpClient.prepareGet("http://www.ning.com/ ").execute();
  Response r = f.get();
byte[] contents = r.getResponseBodyAsBytes();

which gets the whole response as a byte array; no surprises there.

2. Use InputStream to avoid buffering the whole entity?

The first obvious work around attempt is to have a look at Response object, and notice that there is method "getResponseBodyAsStream()". This would seemingly allow one to read response, piece by piece, and process it incrementally, by (for example) writing it to a file.

Unfortunately, this method is just a facade, implemented like so:

 public InputStream getResponseBodyAsStream() {
return new ByteArrayInputStream(getResponseBodyAsBytes());
}

which actually is no more efficient than accessing the whole content as a byte array. :-/

(why is it implemented that way? Mostly because underlying non-blocking I/O library, like Netty or Grizzly, provides content using "push" style interface, which makes it very hard to support "pull" style abstractions like java.io.InputStream -- so it is not really AHC's fault, but rather a consequence of NIO/async style of I/O processing)

3. Go fully async

So what can we do to actually process large response payloads (or large PUT/POST request payloads, for that matter)?

To do that, it is necessary to use following callback abstractions:

  1. To handle response payloads (for HTTP GETs), we need to implement AsyncCompletionHandler interface.
  2. To handle PUT/POST request payloads, we need to implement BodyGenerator (which is used for creating a Body instance, abstraction for feeding content)

Let's have a look at what is needed for the first case.

(note: there are existing default implementations for some of the pieces -- but here I will show how to do it from ground up)

4. A simple download-a-file example

Let's start with a simple case of downloading a large file into a file, without keeping more than a small chunk in memory at any given time. This can be done as follows:


public class SimpleFileHandler implements AsyncHandler<File>
{
 private File file;
 private final FileOutputStream out;
 private boolean failed = false;

 public SimpleFileHandler(File f) throws IOException {
  file = f;
  out = new FileOutputStream(f);
 }

 public com.ning.http.client.AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart part)
   throws IOException
 {
  if (!failed) {
   part.writeTo(out);
  }
  return STATE.CONTINUE;
 }

 public File onCompleted() throws IOException {
  out.close();
  if (failed) {
   file.delete();
   return null;
  }
  return file;
 }

 public com.ning.http.client.AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders h) {
  // nothing to check here as of yet
  return STATE.CONTINUE;
 }

 public com.ning.http.client.AsyncHandler.STATE onStatusReceived(HttpResponseStatus status) {
  failed = (status.getStatusCode() != 200);
  return failed ?  STATE.ABORT : STATE.CONTINUE;
 }

 public void onThrowable(Throwable t) {
  failed = true;
 }
}

Voila. Code is not very brief (event-based code seldom is), and it could use some more handling for error cases.
But it should at least show the general processing flow -- nothing very complicated there, beyond basic state machine style operation.

5. Booooring. Anything more complicated?

Downloading a large file is something useful, but while not a contriver example, it is rather plain. So let's consider the case where we not only want to download a piece of content, but also want uncompress it, in one fell swoop. This serves as an example of additional processing we may want to do, in incremental/streaming fashion -- as an alternative to having to store an intermediate copy in a file, then uncompress to another file.

But before showing the code, however, it is necessary to explain why this is bit tricky.

First, remember that we can't really use InputStream-based processing here: all content we get is "pushed" to use (without our code ever blocking with input); whereas InputStream would want to push content itself, possibly blocking the thread.

Second: most decompressors present either InputStream-based abstraction, or uncompress-the-whole-thing interface: neither works for us, since we are getting incremental chunks; so to use either, we would first have to buffer the whole content. Which is what we are trying to avoid.

As luck would have it, however, Ning Compress package (version 0.9.4, specifically) just happens to have a push-style uncompressor interface (aptly named as "com.ning.compress.Uncompressor"); and two implementations:

  1. com.ning.compress.lzf.LZFUncompressor
  2. com.ning.compress.gzip.GZIPUncompressor (uses JDK native zlib under the hood)

So why is that fortunate? Because interface they expose is push style:

 public abstract class Uncompressor
 {
  public abstract void feedCompressedData(byte[] comp, int offset, int len) throws IOException;
  public abstract void complete() throws IOException;
}

and is thereby usable to our needs here. Especially when we use additional class called "UncompressorOutputStream", which makes an OutputStream out of Uncompressor and target stream (which is needed for efficient access to content AHC exposes via HttpResponseBodyPart)

6. Show me the code

Here goes:


public class UncompressingFileHandler implements AsyncHandler<File>,
   DataHandler // for Uncompressor
{
 private File file;
 private final OutputStream out;
 private boolean failed = false;
 private final UncompressorOutputStream uncompressingStream;

 public UncompressingFileHandler(File f) throws IOException {
  file = f;
  out = new FileOutputStream(f);
 }

 public com.ning.http.client.AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart part)
   throws IOException
 {
  if (!failed) {
   // if compressed, pass through uncompressing stream
   if (uncompressingStream != null) {
    part.writeTo(uncompressingStream);
   } else { // otherwise write directly
    part.writeTo(out);
   }
   part.writeTo(out);
  }
  return STATE.CONTINUE;
 }

 public File onCompleted() throws IOException {
  out.close();
  if (uncompressingStream != null) {
   uncompressingStream.close();
  }
  if (failed) {
   file.delete();
   return null;
  }
  return file;
 }

 public com.ning.http.client.AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders h) {
  // must verify that we are getting compressed stuff here:
  String compression = h.getHeaders().getFirstValue("Content-Encoding");
  if (compression != null) {
   if ("lzf".equals(compression)) {
    uncompressingStream = new UncompressorOutputStream(new LZFUncompressor(this));
   } else if ("gzip".equals(compression)) {
    uncompressingStream = new UncompressorOutputStream(new GZIPUncompressor(this));
   }
  }
  // nothing to check here as of yet
  return STATE.CONTINUE;
 }

 public com.ning.http.client.AsyncHandler.STATE onStatusReceived(HttpResponseStatus status) {
  failed = (status.getStatusCode() != 200);
  return failed ?  STATE.ABORT : STATE.CONTINUE;
 }

 public void onThrowable(Throwable t) {
  failed = true;
 }

 // DataHandler implementation for Uncompressor; called with uncompressed content:
 public void handleData(byte[] buffer, int offset, int len) throws IOException {
  out.write(buffer, offset, len);
 }
}

Handling gets bit more complicated here, since we have to handle both case where content is compressed; and case where it is not (since server is ultimately responsible for applying compression or not).

And to make call, you also need to indicate capability to accept compressed data. For example, we could define a helper method like:


public File download(String url) throws Exception
{
 AsyncHttpClient ahc = new AsyncHttpClient();
 Request req = ahc.prepareGet(url)
  .addHeader("Accept-Encoding", "lzf,gzip")
  .build();
 ListenableFuture<File> futurama = ahc.executeRequest(req,
new UncompressingFileHandler(new File("download.txt"))); try { // wait for 30 seconds to complete return futurama.get(30, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { throw new IOException("Failed to download due to timeout"); } }

which would use handler defined above.

7. Easy enough?

I hope above shows that while doing incremental, "streaming" processing is bit more work, it is not super difficult to do.

Not even when you have bit of pipelining to do, like uncompressing (or compressing) data on the fly.

Thursday, May 03, 2012

Jackson Data-binding: Did I mention it can do YAML as well?

Note: as useful earlier articles, consider reading "Jackson 2.0: CSV-compatible as well" and "Jackson 2.0: now with XML, too!"

1. Inspiration

Before jumping into the actual beef -- the new module -- I want to mention my inspiration for this extension: the Greatest New Thing to hit Java World Since JAX-RS called DropWizard.

For those who have not yet tried it out and are unaware of its Kung-Fu Panda like Awesomeness, please go and check it out. You won't be disappointed.

DropWizard is a sort of mini-framework that combines great Java libraries (I may be biased, as it does use Jackson), starting with trusty JAX-RS/Jetty8 combination, building with Jackson for JSON, jDBI for DB/JDBC/SQL, Java Validation API (impl from Hibernate project) for data validation, and logback for logging; adding bit of Jersey-client for client-building and optional FreeMarker plug-in for UI, all bundled up in a nice, modular and easily understandable packet.
Most importantly, it "Just Works" and comes with intuitive configuration and bootstrapping system. It also builds easily into a single deployable jar file that contains all the code you need, with just a bit of Maven setup; all of which is well documented. Oh, and the documentation is very accessible, accurate and up-to-date. All in all, a very rare combination of things -- and something that would give RoR and other "easier than Java" frameworks good run for their money, if hipsters ever decided to check out the best that Java has to offer.

The most relevant part here is the configuration system. Configuration can use either basic JSON or full YAML. And as I mentioned earlier, I am beginning to appreciate YAML for configuring things.

1.1. The Specific inspirational nugget: YAML converter

The way DropWizard uses YAML is to parse it using SnakeYAML library, then convert resulting document into JSON tree and then using Jackson for data binding. This is useful since it allows one to use full power of Jackson configuration including annotations and polymorphic type handling.

But this got me thinking -- given that the whole converter implementation about dozen lines or so (to work to degree needed for configs), wouldn't it make sense to add "full support" for YAML into Jackson family of plug-ins?

I thought it would.

2. And Then There Was One More Backend for Jackson

Turns out that implementation was, indeed, quite easy. I was able to improve certain things -- for example, module can use lower level API to keep performance bit better; and output side also works, not just reader -- but in a way, there isn't all that much to do since all module has to do is to convert YAML events into JSON events, and maybe help with some conversions.

Some of more advanced things include:

  • Format auto-detection works, thanks to "---" document prefix (that generator also produces by default)
  • Although YAML itself exposes all scalars as text (unless type hints are enabled, which adds more noise in content), module uses heuristics to make parser implementation bit more natural; so although data-binding can also coerce types, this should usually not be needed
  • Configuration includes settings to change output style, to allow use of more aesthetically pleasing output (for those who prefer "wiki look", for example)

At this point, functionality has been tested with a broad if shallow set of unit tests; but because data-binding used is 100% same as with JSON, testing is actually sufficient to use module for some work.

3. Usage? So boring I tell you

Oh. And you might be interested in knowing how to use the module. This is the boring part, since.... there isn't really much to it.

You just use "YAMLFactory" wherever you would normally use "JsonFactory"; and then under the hood you get "YAMLParser" and "YAMLGenerator" instances, instead of JSON equivalents. And then you either use parser/generator directly, or, more commonly, construct an "ObjectMapper" with "YAMLFactory" like so (code snippet itself is from test "SimpleParseTest.java")


  ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
User user = mapper.readValue("firstName: Billy\n"
+"lastName: Baggins\n"
+"gender: MALE\n"
+"userImage: AQIDBAY=",
User.class);


and to get the functionality itself, Maven dependency is:

<dependency>
  <groupId>com.fasterxml.jackson.dataformat</groupId>
  <artifactId>jackson-dataformat-yaml</artifactId>
  <version>2.0.0</version>
</dependency>

4. That's all Folks -- until you give us some Feedback!

That's it for now. I hope some of you will try out this new backend, and help us further make Jackson 2.0 the "Universal Java Data Processor"



Related Blogs

(by Author (topics))

Powered By

About me

  • I am known as Cowtowncoder
  • Contact me at@yahoo.com
Check my profile to learn more.