April 4, 2016

Using RxJS

ReactiveX is a library with support for many platforms and languages. It allows you to write reactive code using streams.

What is a stream?

A stream is a data source that will provide 0 – N messages at some undetermined time. Let’s unpack that.

An array can be viewed as a stream. If we have an array of 5 items, [1, 2, 3, 4, 5], this is a stream of data that will provide 5 messages immediately. An empty array will provide 0 messages immediately.

A promise can be viewed as a stream that will provide 1 message sometime in the future.

An input element like a button can be viewed as a stream that will provide an unlimited number of messages sometime in the future.

As you can see, the concept of a stream is very versatile and powerful. Streams can be used to represent any source of data.

ReactiveX lets us build, consume, and compose streams. If you have ever used a library like underscore or lodash, ReactiveX has similar capabilities for combining, transforming, and aggregating streams of data.

RxJS is the name of the JavaScript implementation of ReactiveX.

Getting started

First, add Rx as a dependency to your project.

npm install rx --save

Some existing libraries return RxJS streams directly, but RxJS also has functions for converting data sources into streams. Let’s look at turning a mouse move event into an event stream. We’ll build our example in Angular on top of the application built in Let’s Learn Angular JS.

Event streams

Now we create a simple directive that wraps an Rx stream around the mousemove event on the associated element.

https://github.com/SonofNun15/explore-angular/commit/5194111176bf6cab73011ed0f1504a2b62cef5d8

Once we’ve required the Rx library, this is as simple as calling Rx.Observable.fromEvent() and passing in the host element and the event name. We then put this stream on the external scope so that it is available for consumption. In the next commit, we add a controller that will pick this stream up from the scope (once we wire it up) and subscribes for messages from the stream.

https://github.com/SonofNun15/explore-angular/commit/60564be0f001a7634ffb8092ab369d151ca32bde

Listening for messages is very similar to awaiting promises with then(). Instead we call subscribe(). The first argument is a callback for new messages, the second argument is a callback for errors, and the final callback is a completed callback. Completion is a new concept with streams. Streams have the option of signaling when then are done sending messages. Some streams, like DOM event streams never signal completion. Others, like an array, have very clear endings. We can also pass in a scheduler if desired; we’ll touch on this briefly later.

Operators

Things get much more interesting when we start to use operators on the event stream. Up to this point, wrapping the event handler in a stream seemed like a lot of extra work without a lot of benefit.

https://github.com/SonofNun15/explore-angular/commit/f0c096c20d09ff532d103733fe85fa36b25c5520

Now that we have a stream of mousemove events, we can use common Rx operators on this stream. In this example we first map / transform the stream to pull out the x coordinate of the event. Next we call sample to dilute the frequency of the events we receive to at most 1 event every second.

This illustrates the power and flexibility of reactive streams. Not only do we have a flexible and powerful way to process and manipulate streams through Rx operators, but we have a standard way to interact with all data sources. We also have a very efficient push based mechanism for dealing with asynchronous data sources.

Custom streams

Next, let’s examine how we can make our own custom stream that emits data at regular intervals.

https://github.com/SonofNun15/explore-angular/commit/daabbaba2c26646d214503cd3fc6607f7ded6a3c

Creating custom streams is as simple as creating a subject (new Rx.Subject()) and then emitting messages from it. Messages can be emitted using onNext(), onError(), and onCompleted() to signal a message, error, or completion respectfully. Sometimes it is helpful use a custom stream to wrap data sources yourself, but most of the time you can use Rx.Observable.from* to build an observable from data sources automatically.

Array streams

Wrapping arrays (or other simple data structures) into streams is simple. Call Rx.Observable.from() and pass in an array.

https://github.com/SonofNun15/explore-angular/commit/acdbd88f2ee34c08114ab645547c0400f60bfa70

Array streams by default emit all of their data immediately upon request since all of the data is known in advance. In this example, we are calling controlled() in order to convert our stream into a pulled stream instead of a pushed stream. In a pushed stream, data is pushed to us whenever the stream has additional data, which occurs immediately for an array since all of the data is available. When a stream is converted to a pull stream, the data is buffered and notifications are not emitted until we ask for them. In our example we set up a timer and pull values from the array stream one at a time on regular time intervals, lines 97 – 98. We request values on a controlled stream using the request() call, passing in the number of values we want to wait for.

Hot and cold streams

Streams in RxJS can either be "hot" or "cold."

Hot streams are always emitting messages regardless of who is subscribed. They are asynchronous and real time. If no one subscribes to a hot stream, messages are lost. A good example of a hot stream, is the mouse over event stream we created earlier. When a mouse over event occurs, a data message is emitted whether or not anyone is subscribe to the stream.

Cold streams only emit messages when someone subscribes to listen to the stream. In most cases, they give the same set of messages to each subscriber. The array stream is a classic example of this kind of stream.

The custom stream that we built is hot. Subjects are always hot, although we can buffer messages to make a custom stream (subject) appear cold.

Schedulers

When subscribing to a stream, you can optionally specify a scheduler. Schedulers are used to specify where your listener code should execute when a stream emits a message. The default scheduler in RxJS will run the callback asynchronously using setTimeout. Schedulers can be used to run listeners in worker threads, run them immediately in the calling thread, or implement other custom execution behavior. The full details of schedulers is beyond the scope of this post.

Following along

GitHub repo: https://github.com/SonofNun15/explore-angular

To follow along with this post, clone the repository from the application tag. This tag is the conclusion of the Let’s Learn Angular JS post. We'll build our examples on top of this simple application.

To see the finished product, clone the repository from the rxjs branch.

Post progresses from:

application -> rxjs