Wednesday, March 29, 2017

Future<IQ> - About the New Asynchronous API in Babbler

This is a topic which is long overdue, but still pretty interesting: The new asynchronous, non-blocking API for IQ requests in Babbler (since version 0.7).

The Problem

Until the previous release, all IQ-based API was synchronous and did block until the IQ response has been received.

Lets have a look at what that means and how Last Activity (XEP-0012) of an entity was retrieved to illustrate the problem:

LastActivityManager lastActivityManager = xmppSession.getManager(LastActivityManager.class);
LastActivity lastActivity = lastActivityManager.getLastActivity(Jid.of("juliet@example.com/balcony"));

The method getLastActivity() sends out the IQ request and then waited (blocked) a few milliseconds or even seconds for the response or until a timeout happened.

Having blocking operations is of course resource consuming because you have to dedicate a thread for it, which however is blocking most of the time while waiting on the operation to finish. It's the same issue as with blocking IO and the reason why NIO exists.

Doing multiple blocking IQ queries in parallel means, you have to create a thread for each query.

If you want to do such an IQ query from a JavaFX application you also had to write a lot of boilerplate code like this:

Task<LastActivity> task = new Task<LastActivity>() {
    @Override
    protected LastActivity call() throws Exception {
        LastActivityManager lastActivityManager = xmppSession.getManager(LastActivityManager.class);
        return lastActivityManager.getLastActivity(Jid.of("juliet@example.com/balcony"));
    }
};
task.stateProperty().addListener((observableValue, state, state1) -> {
    switch (state1) {
        case SUCCEEDED:
            updateUI(task.getValue());
            break;
        case FAILED:
            task.getException().printStacktrace();
            break;
        default:
            break;
    }
});
new Thread(task).start();

You don't want to block the UI thread and therefore need to run blocking operations in a background task.

Furthermore the blocking API in Babbler was not interruptible because it didn't throw InterruptedException. Of course we could have solved the interruptible issue easily, but you still would have the drawbacks of a blocking API.

Futures to the Rescue

Instead of waiting for the response and then returning the result, all IQ-based APIs now return a java.util.concurrent.Future:

Future<LastActivity> lastActivityFuture = lastActivityManager.getLastActivity(Jid.of("juliet@example.com/balcony"));

The method call is now asynchronous, it no longer blocks and passes control immediately back to the caller!

As with every Future, you can get the result with its get() method:

LastActivity lastActivity = lastActivityManager.getLastActivity(jid).get();

or with a timeout:

LastActivity lastActivity = lastActivityManager.getLastActivity(jid).get(5, TimeUnit.SECONDS);

You might ask, what we've gained now, because the get() method is blocking again and usually you need the IQ result anyway.

Well, that's true. One part of the answer is that we gain interruptibility and the other part is that the returned result is not only a Future, but also a java.util.concurrent.CompletionStage (Java 8's new toy).

CompletionStage<LastActivity> lastActivityFuture = lastActivityManager.getLastActivity(Jid.of("juliet@example.com/balcony"));

It basically allows you to react asynchronously when the result is present, i.e. when the Future is done. Some frameworks like Guava already have such a concept of a "Listenable Future", now it's part of the JDK.

Taking our JavaFX example from above, updating the UI with the result as in the above example can now become a simple one-liner:

lastActivityManager.getLastActivity(jid).thenAcceptAsync(this::updateUI, Platform::runLater);

It sends the IQ request and later when the response is received it asynchronously executes the updateUI method in the JavaFX thread.

No more blocking, no more extra threads, everything is asynchronous!

Even better:

CompletionStages can be chained together. There are use cases, which require multiple IQ queries like Service Discovery or File Transfer. They can then be composed together into one:

CompletionStage<Boolean> isSupported = xmppClient.isSupported(LastActivity.NAMESPACE, jid);
CompletionStage<LastActivity> lastActivityFuture = isSupported.thenCompose(result -> {
    if (result) {
        return lastActivityManager.getLastActivity(jid);
    } else {
        throw new RuntimeException("XEP-0012 not supported by" + jid);
    }
});

This code first checks if Last Activity is supported (using Service Discovery) and only if it is, queries the entity.

File Transfer is pretty complicated, with a lot of queries going on. This pseudo-code example illustrates the power of composing asynchronous calls (IBB fallback not shown here):

CompletionStage<ByteStreamSession> future =
initiateStream() // Initiate a file transfer stream with somebody
    .thenCompose(streamInitiation -> discoverStreamHosts() // When accepted, discover SOCKS5 stream hosts
        .thenCompose(streamHosts -> letReceiverChoseOne() // Query the receiver and let him choose a stream host
            .thenCompose(streamHostUsed -> activateStreamHost() // When receiver responds with the chosen stream host, activate it 
                .thenApply(result -> createByteStreamSession())))); // After activation, create a stream session.

For convenience there's also a class AsyncResult, which implements both interfaces.

I think asynchronous programming is the future :-) and this is a first, but huge step in the right direction.

Thursday, March 16, 2017

Babbler Version 0.7.4 released

Version 0.7.4 of the Java XMPP library has been released to Maven Central!

It turned out there was a rare deadlock, when using Stream Management. It happened only rarely and was hard to spot, but when it did, it was of course a blocker.

Here's the full changelog:

  • Resolve rare deadlock when using Stream Management
  • Rework how WebSocket connections are closed
  • Don’t let a stream error close the stream immediately, but instead wait for the closing stream element and then close the connection.
  • Increase performance of IBB stream
  • Prevent rare, but possible NullPointerException after sending stanzas.
  • Fix error when using pages (XEP-0141) within data forms (XEP-0004)
  • Reset nick to null, if entering a chat room fails

Saturday, February 11, 2017

Babbler Version 0.7.3 released

I've released version 0.7.3 of the Java XMPP library. This is primarily a "bug fix and improvements" release and is compatible with previous 0.7.x releases. Here's the changelog:
  • Use single equals sign (“=”) for zero-length data in SASL, as per RFC 6120 § 6.4.2
  • Allow configuring a custom stream host and skip proxy discovery then for SI file transfer.
  • Implement WebSocket pings/pongs.
  • Fix WebSocket’s proxy URI construction.
  • Use connect timeout for WebSocket connections.
  • XEP-0198: Send an ack right before gracefully closing the stream (i.e. update to version 1.5.2).
  • MUC Room “enter” events should fire for oneself entering the room as well.
  • Use java.text.Collator for String-based default comparison.
  • XEP-0066: Use URI instead of URL.
  • Fix XMPP Ping in External Components, which broke the connection.
  • Jid.asBareJid returns this if it is already bare, reducing GC pressure.
  • connect() method should not throw CancellationException
  • Check if the connection has been secured (if configured) before starting to authenticate.

Maven coordinates

<dependency>
    <groupId>rocks.xmpp</groupId>
    <artifactId>xmpp-core-client</artifactId>
    <version>0.7.3</version>
</dependency>
<dependency>
    <groupId>rocks.xmpp</groupId>
    <artifactId>xmpp-extensions-client</artifactId>
    <version>0.7.3</version>
</dependency>