Advanced Operators

These operators might get a little confusing if you haven't read the rest of this document as they deal with shared execution and combine subjects and observables. Make sure you have a solid grasp on at the very least what is meant by single vs shared execution and the basic properties of subjects, observables, and how they relate to each other.

Multicast and Connect

Recall from before that a given subcriber to an observable only has one execution and that execution runs separately from other subscriptions to the same observable. Also recall that subjects can share their execution with down stream observers.

const obs = Observable.interval(1000).take(10);
const subject = new Subject();

subject.subscribe(x => console.log(x));
obs.subscribe(subject);

setTimeout(() => {
    subject.subscribe(x => console.log(x));
}, 4500);

Rather than, every single time, creating a subject, using it as the observer to the observable we want to share, and then subscribing to that subject, we can use the multicast and connect operators.

First let's look at multicast.

const obs = Observable
    .interval(1000)
    .take(10)
    .multicast(new Subject());

The multicast operator takes a subject as a parameter to its contructor. The above is (almost) equivalent to this:

const obs = Observable.interval(1000).take(10);
const sub = new Subject();

obs.subscribe(sub);

I say almost, because the subscribe call hasn't actually happened yet. Calling multicast returns a special type of observable called a ConnectableObservable. A connectable observable is like an observable, they have all the same methods and operators available, but they are backed by a subject. Execution doesn't actually begin until we call connect on the connectable observable.

const obs = Observable
    .interval(1000)
    .take(10)
    .multicast(new Subject());

obs.connect();

Now they are equivalent. Calling connect causes a subcription to be called on the source observable using the multicast's subject as the observer. Any subscriptions called on our connectable observable are made on the backing subject.

const obs = Observable
    .interval(1000)
    .take(10)
    .multicast(new Subject());

obs.subscribe(x => console.log(x));
obs.connect();

setTimeout(() => {
    obs.subscribe(x => console.log(x));
}, 4500);

Try Me

As soon as connect is called execution begins, so any emissions between connect and the next subscribe will be missed by observers.

Another thing to note, calling unsubscribe on the subscriptions returned from the subscribe calls on the observable won't stop execution. This is because the inner subscription between the subject and the source observable still exists. Luckily, connect returns this subscription, so we can call unsubscribe on that subscription to stop execution. To start it again, we would need to call connect again.

const obs = Observable
    .interval(1000)
    .do(x => console.log('Execution Tick'))
    .multicast(new Subject());

const outerSub = obs.subscribe(x => console.log(x));
const innerSub = obs.connect();

setTimeout(() => outerSub.unsubscribe(), 2000); // Doesn't stop execution
setTimeout(() => innerSub.unsubscribe(), 4000); // Does stop execution

Try Me

The multicast operator can take any kind of subject: Subject, BehaviorSubject, ReplaySubject, AsyncSubject.

Publish

So, multicast is nice, but there is a shorthand for that too, called publish.

The publish operator just does a multicast backed by a vanilla Subject.

const obs = Observable.interval(1000).publish();

obs.subscribe(x => console.log(x));
obs.connect();

is exactly equivalent to

const obs = Observable
    .interval(1000)
    .take(10)
    .multicast(new Subject());

obs.subscribe(x => console.log(x));
obs.connect();

Since there are multiple flavors of subjects, there are multiple flavors of publish:

  • publish: multicast + Subject
  • publishBehavior: multicast + BehaviorSubject
  • publishReplay: multicast + ReplaySubject
  • publishLast: multicast + AsyncSubject

You can pass any arguments the subject type expects to the relevant publish operator.

Try Me

RefCount

Alright, so publish is nice, but will still have to do this connect nonsense. What if I want execution to start only when something subscribes, and to stop when no subscribers are left?

Well, you are in luck, because the refCount operator exists.

The refCount operator counts references. Specifically it counts references of subscriptions. A multicasted, connectable observable, will subscribe the backing subject to the inner observable only when the first subscription is made on the multicasted, connectable observable. It will also automatically unsubscribe the backing subject from the inner observable when the last subscription to the connectable observable is unsubscribed.

Let's look at an example.

const obs = Observable
    .interval(1000)
    .publish().refCount();

obs.subscribe(x => console.log(x));

We don't need connect any more, the connectable observable is automatically connected when the first subscription is called on it.

Try Me

Share

Using publish + refCount is so common, that the share operator was created to automatically do just that.

const obs = Observable.interval(1000).share();
obs.subscribe(x => console.log(x));

setTimeout(() => obs.subscribe(x => console.log(x)), 3000);

is exactly equivalent to

const obs = Observable
    .interval(1000)
    .publish().refCount();

obs.subscribe(x => console.log(x));

With share it is very easy to convert a cold observable that runs a separate, independent execution for each subscriber, into a hot observable that shares the same execution between all subscribers.

Try Me

results matching ""

    No results matching ""