Higher Order Observables

Now we are getting to the good stuff! You should have a pretty solid undertanding of what Observables are and how the common operators work before continuing. Still here? Good!

Definition

What the hell even are Higher Order Observables? A higher order observable is simply an observable that operates on and / or emits other observables. You may have seen operators like switchMap, mergeMap, and concatMap hanging out in dark corners and shady streets, this section seeks to demystify these often intimidating or confusing pieces.

Switch

From the official docs, the switch operator will "convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables"

Huh?

Let's just start from Observable that emits Observables.

const obs = Observable.interval(1000).map(x => Observable.of(`Number: ${x}`)).take(10);

Alright, what is this doing? Every one second we are emitting a number. We take that number and we map it into another observable that emits a single value, Number: <our number here>. We stop after we emit 10 of those observables.

So I'll end result is an observable with the type of Observable<Observable<string>>, that is to say, it's an observable of observables. So if we subscribe to it...

const obs = Observable.interval(1000).map(x => Observable.of(`Number: ${x}`)).take(10);
obs.subscribe(x => x.subscribe(y => console.log(y)));

Try Me

Since the value we get from our first subscribe is an observable, we have to subscribe to it to get to our Number: ... string.

Now let's look at the second part of that definition from the docs, "... into a single Observable that emits the items emitted by the most-recently-emitted of those Observables".

Okay so it's going to take our Observable<Observable<string>> and turn it into a normal observable. ... into a single Observable ....

That single observable will emit whatever is emitted from our inner observable, which is a Observable<string>. ... that emits the items emitted by ...

Everytime a new observable is emitted out of the outer observable, it becomes the new source of what our single observable emits. ... by the most-recently-emitted of those Observables

Is your brain bleeding as much as mine?

Let's just use it and see what happens.

const obs = Observable
    .interval(1000)
    .take(10)
    .map(x => Observable.of(`Number: ${x}`))
    .switch();

obs.subscribe(x => console.log(x));

Try Me

What happened? We simply turned our Observable<Observable<string>> into a Observable<string>. We grab that inner observable, and treat it as our source from now on.

If we run that, we will see our Number: ... strings being logged to the console.

That most-recently-emitted part is interesting though. It says that anytime we emit a new Observable<string>, that becomes the new source. We switch it out.

That doesn't really cause anything different in this example, but lets change it up a bit.

const obs = Observable
    .interval(1000)
    .take(10)
    .map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(4))
    .switch();

obs.subscribe(x => console.log(x));

Try Me

Running that, you will see Inner #0: 0, Inner #0: 1, Inner #0: 2, Inner #0: 3, Inner #1: 0, Inner #1: 1, Inner #1: 2, Inner #1: 3, .... and so on until you reach Inner #9: 3.

So what happened? let's break it down:

  1. Each second, our outer observable ticks, returning a new inner observable.
  2. The inner observables it creates tick every 2 tenths of a second.
  3. This means the inner observable will tick four times for every one of our outer (fifth tick falls on same of outer).
  4. Switch grabs the current inner and emits whatever it emits.
  5. Each time the outer observable ticks, it will switch out the current inner observable for the new one.

When it switches our the old inner observable for the new inner observable, it actually unsubscribes form the old and subscribes to the new.

Keep playing around with these examples until you have a solid understanding of what is happening.

tl;dr: Given an observable of observables, switch will convert it to be the inner observable. Observable<Observable<string>> -> switch -> Observable<string>.

SwitchMap

It turns out that mapping something to a new observable and then switching it to the inner observable is such a common practice that there is a shorthand for it, switchMap.

Let's take our previous simple switch example:

const obs = Observable
    .interval(1000)
    .take(10)
    .map(x => Observable.of(`Number: ${x}`))
    .switch();

obs.subscribe(x => console.log(x));

Instead of doing .map(...).switch(), we can simply do this:

const obs = Observable
    .interval(1000)
    .take(10)
    .switchMap(x => Observable.of(`Number: ${x}`));

obs.subscribe(x => console.log(x));

That is all the switchMap does. It is simply syntactic sugar for .map(...).switch().

Try Me

Merge

Compared to switch, the merge operator is much easier to understand. It will combine one observable with another, emitting everything from both.

const obsX = Observable.interval(1000).take(10).map(x => `X: ${x}`);
const obsY = Observable.interval(500).take(20).map(x => `Y: ${x}`);
const merged = obsX.merge(obsY);

merged.subscribe(x => console.log(x));

Try Me

This will output everything from both obsX and obsY, as they will continue to run simultaneously with their emissions being interleaved together.

There is also a static method for merge:

const obsX = Observable.interval(1000).take(10).map(x => `X: ${x}`);
const obsY = Observable.interval(500).take(20).map(x => `Y: ${x}`);
const merged = Observable.merge(obsX, obsY);

merged.subscribe(x => console.log(x));

Try Me

That's all that merge does, super simple.

MergeAll

The mergeAll operator is like the merge operator but acts on inner observables. Lets go back to one of our switch examples:

const obs = Observable
    .interval(1000)
    .take(10)
    .map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`))
    .switch();

obs.subscribe(x => console.log(x));

Recall before that what we see when we subscribe is the inner observables emissions until a new inner observable is created and then we switch to that new one's emissions.

Let's swap out switch for mergeAll

const obs = Observable
    .interval(1000)
    .take(10)
    .map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10))
    .mergeAll();

obs.subscribe(x => console.log(x));

Try Me

Now what do we see? Well it starts off the same as before, we see the inner observables emissions; however, it changes when a second inner observable gets emitted. Now we continue seeing the first observable's emissions, but now also see the second's emissions interleaved with the first. Eventually, after 10 seconds, we will see all ten of the emitted inner observables printing to the console.

Instead of switching them out, they are merged together.

The mergeAll operator takes in one parameter, and that is to limit the number of concurrent subscriptions to inner observables. Once an inner observable completes, the next available inner observable will be subscribed to up until this max.

const obs = Observable
    .interval(1000)
    .take(10)
    .map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`))
    .mergeAll(2);

obs.subscribe(x => console.log(x));

Try Me

Here we will only ever see the emissions from the first two inner observables emitted, because we removed take from the inner observables and our max simultaneous inner observables is 2.

const obs = Observable
    .interval(1000)
    .take(10)
    .map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10))
    .mergeAll(2);

obs.subscribe(x => console.log(x));

Try Me

Now we will see the other inner observables start as the first ones complete.

Keep playing around with these examples until you understand what is happening.

MergeMap

As was the case with map(...).switch(), we have a short cut for map(...).mergeAll() called mergeMap.

If we take an example from mergeAll:

const obs = Observable
    .interval(1000)
    .take(10)
    .map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10))
    .mergeAll();

obs.subscribe(x => console.log(x));

and change it to be:

const obs = Observable
    .interval(1000)
    .take(10)
    .mergeMap(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10));

obs.subscribe(x => console.log(x));

Try Me

We would we be doing exactly the same thing as before.

MergeMap even let's us specify our max simultaneous subscriptions:

const obs = Observable
    .interval(1000)
    .take(10)
    .mergeMap(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10), null, 2);

obs.subscribe(x => console.log(x));

Try Me

As you can see, we've specified our limit as the third argument to mergeMap, what is our second argument?

Well, mergeMap also accepts a selector function (or projection), that allows us to format our output in someway.

The selector function takes in two parameters: the outer observable value, and the inner observable value. Whatever is returned from this function will be our new output.

You can think of it like a map that is applied to every inner observable emitted. In fact, let's change our last example to do just that:

const obs = Observable
    .interval(1000)
    .take(10)
    .mergeMap(x => Observable.interval(200).take(10), (x, y) => `Inner #${x}: ${y}`, 2);

obs.subscribe(x => console.log(x));

Try Me

That's all there is to it. We have moved our map function from the inner observable map to the second parameter of mergeMap.

It's also worth noting that you can pull these functions out into named functions to make things cleaner. This example isn't bad, be we will go ahead do that anyway.

function outerMap(x) { return Observable.interval(200).take(10); }
function innerMap(x, y) { return `Inner #${x}: ${y}`; }

const obs = Observable.interval(1000).take(10).mergeMap(outerMap, innerMap, 2);

obs.subscribe(x => console.log(x));

Try Me

Defining named functions can really make a difference in clarity.

Concat

The last of our combination operators is the concat family. Let's summarize the last two family's will quick:

  • switch will always swap to the new inner observable emitted
  • merge will always interleave the new inner observable emitted
  • concat will always add the the new inner observable to the end of the sequence of observables to emit from.

The concat operator is like merge, but it always runs the concatted observables in order. That is to say, that it waits for the first inner observable to complete, before it starts running the next.

Let's look at an example.

const firstObs = Observable.of('one', 'one', 'one', 'one');
const secondObs = Observable.of('two', 'two', 'two', 'two');

const combined = firstObs.concat(secondObs);

combined.subscribe(x => console.log(x));

Try Me

If we run this we will see all the emissions from firstObs, followed by all the emissions from secondObs.

If firstObs never completed, we would never see anything from secondObs.

Again, there is a static method for concat as well:

const firstObs = Observable.of('one', 'one', 'one', 'one');
const secondObs = Observable.of('two', 'two', 'two', 'two');

const combined = Observable.concat(firstObs, secondObs);

combined.subscribe(x => console.log(x));

Try Me

ConcatAll

Once again, concatAll is like concat but operates on inner observables. Example time:

const obs = Observable.interval(1000).take(10)
    .map(x => Observable.of(`#${x}`, `#${x}`, `#${x}`, `#${x}`))
    .concatAll();

obs.subscribe(x => console.log(x));

Try Me

If we run this, we would see all emissions from the first inner observable generated, followed by the second, followed by the third, etc. all the way until #9.

You can think of concatAll like mergeAll with a max concurrency of one (mergeAll(1)).

ConcatMap

You probably already know this, but concatMap is just shorthand for map(...).concatAll().

const obs = Observable.interval(1000).take(10)
    .concatMap(x => Observable.of(`#${x}`, `#${x}`, `#${x}`, `#${x}`));

obs.subscribe(x => console.log(x));

Try Me

Like mergeMap, concatMap takes in an optional selector function as a second parameter.

const obs = Observable.interval(1000).take(10)
    .concatMap(
        x => Observable.of(`#${x}`, `#${x}`, `#${x}`, `#${x}`), 
        (x, y) => `${x}, ${y}`
    );

obs.subscribe(x => console.log(x));

Try Me

results matching ""

    No results matching ""