Mastering Asynchronous Javascript: Part 5 – Observables with RxJS

So far in this series we’ve looked at a number of way to handle asynchronous events in JavaScript:

  1. Callbacks
  2. Promises
  3. Generators
  4. Async / await

In this, the final instalment of the series we’ll be looking at how observables can be used for asynchronous programming. The particular implementation of observables I’ll be using is RxJS (Reactive eXtenstions for JavaScript). There are others out there, but this is by far the most popular.

First you need to understand the basic concept, or abstraction, behind observables. Imagine an observable like a stream of data. Each stream of data can have one or more sources. An example of a data source would be an event such as a mouse click. Each time there is a mouse click, the data generated by the event (ie the browser generated MouseEvent) will flow down the stream. At the end of the stream, there will be a subscriber to the stream that is observing the stream. This observer will react to each piece of data that arrives at the end of the stream. In between the event and the observer, we can place various methods to mutate the stream (e.g. transform the data). Streams can even be joined, so that multiple sources can flow to one stream.

A code example is worth 1000 words, so here you go:

var clicks = Rx.Observable.fromEvent(document, 'click');
var keypresses = Rx.Observable.fromEvent(document, 'keypress');
var clicksAndKeypresses = clicks.merge(keypresses);

clicksAndKeypresses
  .map((e) => e.target)
  .subscribe((t) => console.log(t));

Here we create three streams. The first, ‘clicks’ is sourced from all clicks on the page, and the second ‘keypresses’ is sourced from all keypress events. The third is then created by merging both of the first streams into one: clicksAndKeyPresses.

We map the all clicks and keypresses using the map function. This transforms the data in the stream. The data starts off being the browser event generated by either of the event sources, but we transform this data so the stream just contains the target property of this event. We then observe this stream using the ‘subscribe’ function, and log the data to the console.

I used this code on the following page:

I then performed the following actions:

  1. Clicked on the webpage
  2. Clicked on the ‘Hello Plunker!’
  3. Pressed a key
  4. Clicked on the text input
  5. Pressed a key while focused on the text input

The console output was as follows:

As you can see, the event target from the event object generated by either of the sources was displayed in the console each time either of the events occurred.

Now you have a brief overview of RxJS, we’re going to use it to rewrite the database code I first introduced in the callbacks post. Here’s the code:

var MongoClient = promisify("mongodb").MongoClient;
var Rx = require('rxjs/Rx');
var Observable = Rx.Observable;

var url = 'mongodb://localhost:27017/myproject';
function output (user, orders) {
    // output data in pretty format
}

function logError (error) {
    // Log the error (could be console, a service, a file etc)
}

let db = null;
const userId = process.argv[2];
let user = null;

Observable.fromPromise(MongoClient.connect(url))
    .switchMap(function (db) {
        return Observable
            .fromPromise(db.collection('access_log')
            .insert({ userId, date: new Date() }));
    })
    .switchMap(function () {
        return Observable
            .fromPromise(db.collection('users')
            .find({ userId })
            .toArray());
    })
    .switchMap(function (users) {
        user = users[0];
        return Observable
            .fromPromise(db.collection('orders')
            .find({ userId })
            .toArray(););
    })
    .catch(function (err) {
        logError(err);
        return Observable.from(false);
    })
    .subscribe(function (orders) {
        if (orders) {
            output(user, orders);
        }
    });

Just as with the generator and async/await versions of the code we convert the callback based functions to promises. This allows us to use the Observable.fromPromise function to create an observable who’s source is a promise. We create an observable for the database connection call. The Rx function switchMap is then called to chain all of the asynchronous functions together. switchMap is a function which is called on an observable, and takes a function which must return an observable. The stream then gets transformed to be the new observable returned from that function. It can be used to chain observables together in sequence, which is exactly what we need.

The call to catch catches all errors in the stream, and calls logError with the error. Since catch must return an observable, we use Observable.from, a function which creates an observable from a static value, to return an observable with the value false. Finally, we subscribe to the stream. If the value from the stream isn’t falsey (if it is that means there was an error and catch returned the false stream), we output the data.

You may well thing this is actually more complicated than the promise version of the code, and nowhere near as nice as the async/await or generator version. I would completely agree with you on this. The RxJS API is fairly large, and very powerful. It allows you to do things that would be hard to achieve in other ways (retrying failed streams, merging multiple event sources). The truth is that this functionality is rarely needed. Since streams are harder to get your head around than other methods of async programming, I would rarely recommend people use RxJS. I included it in this series because many companies use it, so awareness of it is useful.

The full series:

  1. Callbacks
  2. Promises
  3. Generators
  4. Async / await
  5. RxJS

Leave a Reply

Your email address will not be published. Required fields are marked *