The Observable API is extraordinarily useful for asynchronous programming. It enables us to decouple data sources from data processing targets, create sophisticated processing pipelines, and create deterministic sequences. In RxJS, there are also a number of functions for creating Observable instances. This post will give you a brief overview of the history of the Reactive Extensions (RX), as well as an introduction to one of my favorite RX operators, fromEventPattern.

Reactive Extensions

The Reactive Extensions (RX) are, to my understanding, a project started at Microsoft. For context, there’s a fantastic Expert-to-Expert video with Erik Meijer that explains some of the history. What frequently surprises me is how obvious the RX project is for software development, and how long it took for RX to become a standard library. Nonethelesss, it’s a very well-documented project, and one that I enjoy using in places I feel it makes sense. I’ve found that RX is particularly good when implementing the Publish/Subscribe Pattern, because it decouples producers from consumers, and is particularly easy to have multiple consumers for a single producer. Such is the nature of the Observable.subscribe() method.

RxJS (an RX implementation) provides us with a whole list of functions that can create instances of the Observable class. Today, I’m going to talk about the fromEventPattern function, and provide an example of how you might use the function in real code. The gist of this function is that it enables us to define an Observable based on two events:

It also allows us to define a resultSelector function, which enables us to manipulate the data that will be provided to consumers of the resulting Observable.

Why use fromEventPattern?

The fromEventPattern function can be used to bind all sorts of non-RX objects to RX. One example of this is DOM manipulation and data binding. You could also use it for wrapping browser storage and storage events. Recently, I used it for wrapping a redux store inside an Angular application so that I could pipe the storage events. The ability to create observables from any source enables us to create all sorts of publish/subscribe workflows, and perform really useful data transformations.

Example

So how do you use it? It’s actually very simple. First, consider the following function definition (assuming TypeScript syntax), which is derived from the fromEventPattern documentation:

export declare function fromEventPattern<THandle, TData>(
    addHandler: () => THandle | void,
    removeHandler: (handlerFn: () => void, handle: THandle | void) => void,
    resultSelector?: (...args: any[]) => TData
);

Because I mentioned using a redux store in the original example, we’ll just continue using that as our use-case. The store API declares four methods:

  • getState()
  • dispatch(action)
  • subscribe(listener)
  • replaceReducer(nextReducer)

The subscribe method is the one we’re interested in, as it’s the method that notifies us that a change occurred. The listener argument doesn’t actually receive a copy of the state when a change occurs. Instead, listeners are called when a change occurs, notifying them that they need to refresh their view of the store state. In our case, we’ll create that hook using the resultSelector function, which will publish updates on the resulting Observable.

function createStoreObservable(store: redux.store): Observable<AppState> {
    return fromEventPattern(
        store.subscribe,
        (a, unsubscribeFunction) => unsubscribeFunction(),
        () => store.getState()
    );
}

Once we have this simple wrapper over the store lifecycle, we can do all sorts of other neat things. Take the example of us having a user property on our store state that we’re interested in exposing. After all, the store is really just a monolithic state bag that our app uses. Particular consumers, on the other hand, are only really interested in specific properties (in the general case - one notable exception is possibly a settings page). Here’s how we could leverage the power of RX to expose that property specifically. Let’s also assume that the next snippet is taken from an Angular component that displays properties about the user, and follows the unsubscribe pattern discussed here:

createStoreObservable(this.store).pipe(
    map(appState$ => appState$.user),
    filter(user$ => user$.id !== this.userId),
    takeUntil(this.ngUnsubscribe)
).subscribe(user$ => {
    this.userId = user$.id;
    this.name = user$.name;
    this.mail = user$.email;
    this.phone = user$.phone;
});

The map operator behaves the same way as the map operator you would find in other contexts, such as the Array.prototype.map method in JavaScript arrays. Similarly, the filter operator behaves the same way as the Array.prototype.filter method, or the .NET Where operator. Each of these is used to apply a predicate to each result in a collection or event stream, and the resulting objects are published to the next consumer. This form of composition is much more powerful than trying to manage the equivalent code written “by hand” (with fewer abstractions). The use of these functions, like all abstractions, will incurr a small degree of overhead, but your colleagues will nearly universally appreciate the readability gains they receive from the combination of RX operators with Observable pipes.

Summary

In this post, I introduced the RX fromEventPattern function. I also gave a brief history of the Reactive Extensions, showed how you can wrap a redux store as an Observable, obtain state changes, and select the portions of the application state that are most interesting to your code.

There are a huge number of benefits to using Observables, and RX clearly offers a number of useful hooks for managing your application flows. I’ve demonstrated one example of how you can wrap some of your other libraries with RX (via the redux example), but this is a concept that can be re-applied ubiquitously across many parts of your code.

Thanks for reading! I hope this post helps you with some aspect of a current or future project that you’re working on.