Higher Order Observables
Now we are getting to the good stuff! You should have a pretty solid undertanding of what Observables are and how the common operators work before continuing. Still here? Good!
Definition
What the hell even are Higher Order Observables? A higher order observable is simply an observable that operates on and / or emits other observables. You may have seen operators like switchMap
, mergeMap
, and concatMap
hanging out in dark corners and shady streets, this section seeks to demystify these often intimidating or confusing pieces.
Switch
From the official docs, the switch
operator will "convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables"
Huh?
Let's just start from Observable that emits Observables
.
const obs = Observable.interval(1000).map(x => Observable.of(`Number: ${x}`)).take(10);
Alright, what is this doing? Every one second we are emitting a number. We take that number and we map it into another observable that emits a single value, Number: <our number here>
. We stop after we emit 10 of those observables.
So I'll end result is an observable with the type of Observable<Observable<string>>
, that is to say, it's an observable of observables. So if we subscribe to it...
const obs = Observable.interval(1000).map(x => Observable.of(`Number: ${x}`)).take(10);
obs.subscribe(x => x.subscribe(y => console.log(y)));
Since the value we get from our first subscribe is an observable, we have to subscribe to it to get to our Number: ...
string.
Now let's look at the second part of that definition from the docs, "... into a single Observable that emits the items emitted by the most-recently-emitted of those Observables".
Okay so it's going to take our Observable<Observable<string>>
and turn it into a normal observable. ... into a single Observable ...
.
That single observable will emit whatever is emitted from our inner observable, which is a Observable<string>
. ... that emits the items emitted by ...
Everytime a new observable is emitted out of the outer observable, it becomes the new source of what our single observable emits. ... by the most-recently-emitted of those Observables
Is your brain bleeding as much as mine?
Let's just use it and see what happens.
const obs = Observable
.interval(1000)
.take(10)
.map(x => Observable.of(`Number: ${x}`))
.switch();
obs.subscribe(x => console.log(x));
What happened? We simply turned our Observable<Observable<string>>
into a Observable<string>
. We grab that inner observable, and treat it as our source from now on.
If we run that, we will see our Number: ...
strings being logged to the console.
That most-recently-emitted
part is interesting though. It says that anytime we emit a new Observable<string>
, that becomes the new source. We switch
it out.
That doesn't really cause anything different in this example, but lets change it up a bit.
const obs = Observable
.interval(1000)
.take(10)
.map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(4))
.switch();
obs.subscribe(x => console.log(x));
Running that, you will see Inner #0: 0
, Inner #0: 1
, Inner #0: 2
, Inner #0: 3
, Inner #1: 0
, Inner #1: 1
, Inner #1: 2
, Inner #1: 3
, .... and so on until you reach Inner #9: 3
.
So what happened? let's break it down:
- Each second, our outer observable ticks, returning a new inner observable.
- The inner observables it creates tick every 2 tenths of a second.
- This means the inner observable will tick four times for every one of our outer (fifth tick falls on same of outer).
- Switch grabs the current inner and emits whatever it emits.
- Each time the outer observable ticks, it will
switch
out the current inner observable for the new one.
When it switch
es our the old inner observable for the new inner observable, it actually unsubscribes form the old and subscribes to the new.
Keep playing around with these examples until you have a solid understanding of what is happening.
tl;dr: Given an observable of observables, switch
will convert it to be the inner observable. Observable<Observable<string>>
-> switch
-> Observable<string>
.
SwitchMap
It turns out that map
ping something to a new observable and then switch
ing it to the inner observable is such a common practice that there is a shorthand for it, switchMap
.
Let's take our previous simple switch
example:
const obs = Observable
.interval(1000)
.take(10)
.map(x => Observable.of(`Number: ${x}`))
.switch();
obs.subscribe(x => console.log(x));
Instead of doing .map(...).switch()
, we can simply do this:
const obs = Observable
.interval(1000)
.take(10)
.switchMap(x => Observable.of(`Number: ${x}`));
obs.subscribe(x => console.log(x));
That is all the switchMap
does. It is simply syntactic sugar for .map(...).switch()
.
Merge
Compared to switch
, the merge
operator is much easier to understand. It will combine one observable with another, emitting everything from both.
const obsX = Observable.interval(1000).take(10).map(x => `X: ${x}`);
const obsY = Observable.interval(500).take(20).map(x => `Y: ${x}`);
const merged = obsX.merge(obsY);
merged.subscribe(x => console.log(x));
This will output everything from both obsX
and obsY
, as they will continue to run simultaneously with their emissions being interleaved together.
There is also a static method for merge
:
const obsX = Observable.interval(1000).take(10).map(x => `X: ${x}`);
const obsY = Observable.interval(500).take(20).map(x => `Y: ${x}`);
const merged = Observable.merge(obsX, obsY);
merged.subscribe(x => console.log(x));
That's all that merge
does, super simple.
MergeAll
The mergeAll
operator is like the merge operator but acts on inner observables. Lets go back to one of our switch
examples:
const obs = Observable
.interval(1000)
.take(10)
.map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`))
.switch();
obs.subscribe(x => console.log(x));
Recall before that what we see when we subscribe is the inner observables emissions until a new inner observable is created and then we switch
to that new one's emissions.
Let's swap out switch
for mergeAll
const obs = Observable
.interval(1000)
.take(10)
.map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10))
.mergeAll();
obs.subscribe(x => console.log(x));
Now what do we see? Well it starts off the same as before, we see the inner observables emissions; however, it changes when a second inner observable gets emitted. Now we continue seeing the first observable's emissions, but now also see the second's emissions interleaved with the first. Eventually, after 10 seconds, we will see all ten of the emitted inner observables printing to the console.
Instead of switch
ing them out, they are merge
d together.
The mergeAll
operator takes in one parameter, and that is to limit the number of concurrent subscriptions to inner observables. Once an inner observable completes, the next available inner observable will be subscribed to up until this max.
const obs = Observable
.interval(1000)
.take(10)
.map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`))
.mergeAll(2);
obs.subscribe(x => console.log(x));
Here we will only ever see the emissions from the first two inner observables emitted, because we removed take
from the inner observables and our max simultaneous inner observables is 2.
const obs = Observable
.interval(1000)
.take(10)
.map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10))
.mergeAll(2);
obs.subscribe(x => console.log(x));
Now we will see the other inner observables start as the first ones complete.
Keep playing around with these examples until you understand what is happening.
MergeMap
As was the case with map(...).switch()
, we have a short cut for map(...).mergeAll()
called mergeMap
.
If we take an example from mergeAll
:
const obs = Observable
.interval(1000)
.take(10)
.map(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10))
.mergeAll();
obs.subscribe(x => console.log(x));
and change it to be:
const obs = Observable
.interval(1000)
.take(10)
.mergeMap(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10));
obs.subscribe(x => console.log(x));
We would we be doing exactly the same thing as before.
MergeMap
even let's us specify our max simultaneous subscriptions:
const obs = Observable
.interval(1000)
.take(10)
.mergeMap(x => Observable.interval(200).map(y => `Inner #${x}: ${y}`).take(10), null, 2);
obs.subscribe(x => console.log(x));
As you can see, we've specified our limit as the third argument to mergeMap
, what is our second argument?
Well, mergeMap
also accepts a selector function (or projection), that allows us to format our output in someway.
The selector function takes in two parameters: the outer observable value, and the inner observable value. Whatever is returned from this function will be our new output.
You can think of it like a map
that is applied to every inner observable emitted. In fact, let's change our last example to do just that:
const obs = Observable
.interval(1000)
.take(10)
.mergeMap(x => Observable.interval(200).take(10), (x, y) => `Inner #${x}: ${y}`, 2);
obs.subscribe(x => console.log(x));
That's all there is to it. We have moved our map function from the inner observable map
to the second parameter of mergeMap
.
It's also worth noting that you can pull these functions out into named functions to make things cleaner. This example isn't bad, be we will go ahead do that anyway.
function outerMap(x) { return Observable.interval(200).take(10); }
function innerMap(x, y) { return `Inner #${x}: ${y}`; }
const obs = Observable.interval(1000).take(10).mergeMap(outerMap, innerMap, 2);
obs.subscribe(x => console.log(x));
Defining named functions can really make a difference in clarity.
Concat
The last of our combination operators is the concat
family. Let's summarize the last two family's will quick:
switch
will always swap to the new inner observable emittedmerge
will always interleave the new inner observable emittedconcat
will always add the the new inner observable to the end of the sequence of observables to emit from.
The concat
operator is like merge, but it always runs the concat
ted observables in order. That is to say, that it waits for the first inner observable to complete, before it starts running the next.
Let's look at an example.
const firstObs = Observable.of('one', 'one', 'one', 'one');
const secondObs = Observable.of('two', 'two', 'two', 'two');
const combined = firstObs.concat(secondObs);
combined.subscribe(x => console.log(x));
If we run this we will see all the emissions from firstObs
, followed by all the emissions from secondObs
.
If firstObs
never completed, we would never see anything from secondObs
.
Again, there is a static method for concat
as well:
const firstObs = Observable.of('one', 'one', 'one', 'one');
const secondObs = Observable.of('two', 'two', 'two', 'two');
const combined = Observable.concat(firstObs, secondObs);
combined.subscribe(x => console.log(x));
ConcatAll
Once again, concatAll
is like concat
but operates on inner observables. Example time:
const obs = Observable.interval(1000).take(10)
.map(x => Observable.of(`#${x}`, `#${x}`, `#${x}`, `#${x}`))
.concatAll();
obs.subscribe(x => console.log(x));
If we run this, we would see all emissions from the first inner observable generated, followed by the second, followed by the third, etc. all the way until #9
.
You can think of concatAll
like mergeAll
with a max concurrency of one (mergeAll(1)
).
ConcatMap
You probably already know this, but concatMap
is just shorthand for map(...).concatAll()
.
const obs = Observable.interval(1000).take(10)
.concatMap(x => Observable.of(`#${x}`, `#${x}`, `#${x}`, `#${x}`));
obs.subscribe(x => console.log(x));
Like mergeMap
, concatMap
takes in an optional selector function as a second parameter.
const obs = Observable.interval(1000).take(10)
.concatMap(
x => Observable.of(`#${x}`, `#${x}`, `#${x}`, `#${x}`),
(x, y) => `${x}, ${y}`
);
obs.subscribe(x => console.log(x));