Wednesday, March 29, 2017

Future<IQ> - About the New Asynchronous API in Babbler

This is a topic which is long overdue, but still pretty interesting: The new asynchronous, non-blocking API for IQ requests in Babbler (since version 0.7).

The Problem

Until the previous release, all IQ-based API was synchronous and did block until the IQ response has been received.

Lets have a look at what that means and how Last Activity (XEP-0012) of an entity was retrieved to illustrate the problem:

LastActivityManager lastActivityManager = xmppSession.getManager(LastActivityManager.class);
LastActivity lastActivity = lastActivityManager.getLastActivity(Jid.of(""));

The method getLastActivity() sends out the IQ request and then waited (blocked) a few milliseconds or even seconds for the response or until a timeout happened.

Having blocking operations is of course resource consuming because you have to dedicate a thread for it, which however is blocking most of the time while waiting on the operation to finish. It's the same issue as with blocking IO and the reason why NIO exists.

Doing multiple blocking IQ queries in parallel means, you have to create a thread for each query.

If you want to do such an IQ query from a JavaFX application you also had to write a lot of boilerplate code like this:

Task<LastActivity> task = new Task<LastActivity>() {
    protected LastActivity call() throws Exception {
        LastActivityManager lastActivityManager = xmppSession.getManager(LastActivityManager.class);
        return lastActivityManager.getLastActivity(Jid.of(""));
task.stateProperty().addListener((observableValue, state, state1) -> {
    switch (state1) {
        case SUCCEEDED:
        case FAILED:
new Thread(task).start();

You don't want to block the UI thread and therefore need to run blocking operations in a background task.

Furthermore the blocking API in Babbler was not interruptible because it didn't throw InterruptedException. Of course we could have solved the interruptible issue easily, but you still would have the drawbacks of a blocking API.

Futures to the Rescue

Instead of waiting for the response and then returning the result, all IQ-based APIs now return a java.util.concurrent.Future:

Future<LastActivity> lastActivityFuture = lastActivityManager.getLastActivity(Jid.of(""));

The method call is now asynchronous, it no longer blocks and passes control immediately back to the caller!

As with every Future, you can get the result with its get() method:

LastActivity lastActivity = lastActivityManager.getLastActivity(jid).get();

or with a timeout:

LastActivity lastActivity = lastActivityManager.getLastActivity(jid).get(5, TimeUnit.SECONDS);

You might ask, what we've gained now, because the get() method is blocking again and usually you need the IQ result anyway.

Well, that's true. One part of the answer is that we gain interruptibility and the other part is that the returned result is not only a Future, but also a java.util.concurrent.CompletionStage (Java 8's new toy).

CompletionStage<LastActivity> lastActivityFuture = lastActivityManager.getLastActivity(Jid.of(""));

It basically allows you to react asynchronously when the result is present, i.e. when the Future is done. Some frameworks like Guava already have such a concept of a "Listenable Future", now it's part of the JDK.

Taking our JavaFX example from above, updating the UI with the result as in the above example can now become a simple one-liner:

lastActivityManager.getLastActivity(jid).thenAcceptAsync(this::updateUI, Platform::runLater);

It sends the IQ request and later when the response is received it asynchronously executes the updateUI method in the JavaFX thread.

No more blocking, no more extra threads, everything is asynchronous!

Even better:

CompletionStages can be chained together. There are use cases, which require multiple IQ queries like Service Discovery or File Transfer. They can then be composed together into one:

CompletionStage<Boolean> isSupported = xmppClient.isSupported(LastActivity.NAMESPACE, jid);
CompletionStage<LastActivity> lastActivityFuture = isSupported.thenCompose(result -> {
    if (result) {
        return lastActivityManager.getLastActivity(jid);
    } else {
        throw new RuntimeException("XEP-0012 not supported by" + jid);

This code first checks if Last Activity is supported (using Service Discovery) and only if it is, queries the entity.

File Transfer is pretty complicated, with a lot of queries going on. This pseudo-code example illustrates the power of composing asynchronous calls (IBB fallback not shown here):

CompletionStage<ByteStreamSession> future =
initiateStream() // Initiate a file transfer stream with somebody
    .thenCompose(streamInitiation -> discoverStreamHosts() // When accepted, discover SOCKS5 stream hosts
        .thenCompose(streamHosts -> letReceiverChoseOne() // Query the receiver and let him choose a stream host
            .thenCompose(streamHostUsed -> activateStreamHost() // When receiver responds with the chosen stream host, activate it 
                .thenApply(result -> createByteStreamSession())))); // After activation, create a stream session.

For convenience there's also a class AsyncResult, which implements both interfaces.

I think asynchronous programming is the future :-) and this is a first, but huge step in the right direction.

No comments:

Post a Comment