Advanced Operators
These operators might get a little confusing if you haven't read the rest of this document as they deal with shared execution and combine subjects and observables. Make sure you have a solid grasp on at the very least what is meant by single vs shared execution and the basic properties of subjects, observables, and how they relate to each other.
Multicast and Connect
Recall from before that a given subcriber to an observable only has one execution and that execution runs separately from other subscriptions to the same observable. Also recall that subjects can share their execution with down stream observers.
const obs = Observable.interval(1000).take(10);
const subject = new Subject();
subject.subscribe(x => console.log(x));
obs.subscribe(subject);
setTimeout(() => {
subject.subscribe(x => console.log(x));
}, 4500);
Rather than, every single time, creating a subject, using it as the observer to the observable we want to share, and then subscribing to that subject, we can use the multicast
and connect
operators.
First let's look at multicast
.
const obs = Observable
.interval(1000)
.take(10)
.multicast(new Subject());
The multicast
operator takes a subject as a parameter to its contructor. The above is (almost) equivalent to this:
const obs = Observable.interval(1000).take(10);
const sub = new Subject();
obs.subscribe(sub);
I say almost, because the subscribe call hasn't actually happened yet. Calling multicast
returns a special type of observable called a ConnectableObservable
. A connectable observable is like an observable, they have all the same methods and operators available, but they are backed by a subject. Execution doesn't actually begin until we call connect
on the connectable observable.
const obs = Observable
.interval(1000)
.take(10)
.multicast(new Subject());
obs.connect();
Now they are equivalent. Calling connect
causes a subcription to be called on the source observable using the multicast
's subject as the observer. Any subscriptions called on our connectable observable are made on the backing subject.
const obs = Observable
.interval(1000)
.take(10)
.multicast(new Subject());
obs.subscribe(x => console.log(x));
obs.connect();
setTimeout(() => {
obs.subscribe(x => console.log(x));
}, 4500);
As soon as connect
is called execution begins, so any emissions between connect and the next subscribe will be missed by observers.
Another thing to note, calling unsubscribe on the subscriptions returned from the subscribe
calls on the observable won't stop execution. This is because the inner subscription between the subject and the source observable still exists. Luckily, connect
returns this subscription, so we can call unsubscribe
on that subscription to stop execution. To start it again, we would need to call connect
again.
const obs = Observable
.interval(1000)
.do(x => console.log('Execution Tick'))
.multicast(new Subject());
const outerSub = obs.subscribe(x => console.log(x));
const innerSub = obs.connect();
setTimeout(() => outerSub.unsubscribe(), 2000); // Doesn't stop execution
setTimeout(() => innerSub.unsubscribe(), 4000); // Does stop execution
The multicast
operator can take any kind of subject: Subject
, BehaviorSubject
, ReplaySubject
, AsyncSubject
.
Publish
So, multicast
is nice, but there is a shorthand for that too, called publish
.
The publish
operator just does a multicast
backed by a vanilla Subject
.
const obs = Observable.interval(1000).publish();
obs.subscribe(x => console.log(x));
obs.connect();
is exactly equivalent to
const obs = Observable
.interval(1000)
.take(10)
.multicast(new Subject());
obs.subscribe(x => console.log(x));
obs.connect();
Since there are multiple flavors of subjects, there are multiple flavors of publish
:
publish
: multicast + SubjectpublishBehavior
: multicast + BehaviorSubjectpublishReplay
: multicast + ReplaySubjectpublishLast
: multicast + AsyncSubject
You can pass any arguments the subject type expects to the relevant publish
operator.
RefCount
Alright, so publish
is nice, but will still have to do this connect nonsense. What if I want execution to start only when something subscribes, and to stop when no subscribers are left?
Well, you are in luck, because the refCount
operator exists.
The refCount
operator counts references. Specifically it counts references of subscriptions. A multicasted, connectable observable, will subscribe the backing subject to the inner observable only when the first subscription is made on the multicasted, connectable observable. It will also automatically unsubscribe the backing subject from the inner observable when the last subscription to the connectable observable is unsubscribed.
Let's look at an example.
const obs = Observable
.interval(1000)
.publish().refCount();
obs.subscribe(x => console.log(x));
We don't need connect
any more, the connectable observable is automatically connected when the first subscription is called on it.
Share
Using publish
+ refCount
is so common, that the share
operator was created to automatically do just that.
const obs = Observable.interval(1000).share();
obs.subscribe(x => console.log(x));
setTimeout(() => obs.subscribe(x => console.log(x)), 3000);
is exactly equivalent to
const obs = Observable
.interval(1000)
.publish().refCount();
obs.subscribe(x => console.log(x));
With share
it is very easy to convert a cold
observable that runs a separate, independent execution for each subscriber, into a hot
observable that shares the same execution between all subscribers.