Subjects

Definition

A subject is both an observable and an observer, meaning a subject can both be subscribed to and subscribe to other things.

Let's define what is meant by observer. Recall that whenever you subscribe to something you are passing one to three functions: a next, error, and complete function.

const obs = Observable.of(1, 2, 3);
obs.subscribe(
    x => console.log(x), // next handler
    err => console.log(err), // error handler
    () => console.log('completed') // complete handler
);

In fact, when you use the create method, you are calling these functions directly.

const obs = Observable.create(observer => {
    try {
        observer.next(1);
        observer.next(2);
        observer.next(3);
        observer.complete();
    } catch(err) {
        observer.error(err);
    }
});

obs.subscribe(
    x => console.log(x), // next handler
    err => console.log(err), // error handler
    () => console.log('completed') // complete handler
);

An observer is anything that implements these methods. The subscribe method can either take functions themselves as arguments, or an object that defines the methods. In the former case, it creates the object itself. So the first example could look like this:

const obs = Observable.of(1, 2, 3);
const observer = {
    next: x => console.log(x), // next handler
    error: err => console.log(err), // error handler
    complete: () => console.log('completed') // complete handler
};

obs.subscribe(observer);

Try Me

A subject is both an observable and an observer. Lets see what that looks like.

const obs = Observable.of(1, 2, 3);
const subject = new Subject();

subject.subscribe(x => console.log(x));
obs.subscribe(subject);

Try Me

So what did we just do. We created an observable, exactly like we have before, that emits just three things: 1, 2, and 3.

We subscribed to our subject. Since subjects are also observables, we can subscribe to them just like any other observable.

We subscribed to the observable, just like have before, but instead of giving it functions, we gave it a subject.

This is what is meant by a subject is an observer, it has its next, error, and complete methods already defined.

Basically our subject is acting as an intermediary between our original observable and our subscription.

Our source observable emits 1, the subject subscribes to it and receives that 1, we subscribe to the subject, so we receive the 1, and then we console log out the 1.

So why would anyone do this, what is the point? Well, recall that each subscription gets their own independent execution of an observable. Meaning each observer makes the observable emit everything to it independently.

Subjects are special. They are the real subscriber to the source observable, and they can share their independent execution with all of their subscribers.

const obs = Observable.interval(1000).take(10);
const subject = new Subject();

subject.subscribe(x => console.log(x));
obs.subscribe(subject);

setTimeout(() => {
    subject.subscribe(x => console.log(x));
}, 4500);

Try Me

In the case above, after 4.5 seconds, we are adding another subscription to the subject. You will see that instead of starting at 0, like the first subscription, it will start at the same value as the first subscription.

This is because the subject shares its execution with its subscribers (aka observers).

Another useful feature of subjects is, because they are also observers, they have their next, error, and complete methods defined and we can use them freely. This allows us to programmatically inject new values to our downstream subscribers.

const subject = new Subject();
subject.subscribe(x => console.log(x));

subject.next(1);
subject.next(2);
subject.next(3);

Try Me

This behaves just as before, but we got rid of our observable and are manually emitting values to our downstream subscribers.

HERE BE DRAGONS This is powerful, but it goes against what RxJS is for. Remember, RxJS is named after Reactive Programming. We want to as much as possible react to streams instead of messing around with their internals.

BehaviorSubject

There are different kinds of subjects that are made to suit specific use cases. The first of these is the BehaviorSubject.

A BehaviorSubject is a subject that remembers the last value emitted. Since subjects provide shared execution, it's possible that later subscribers will miss the items emitted. Consider this example:

const sub = new Subject();

console.log('First Subscription');
sub.subscribe(x => console.log(x));

sub.next(1);
sub.next(2);
sub.next(3);

setTimeout(() => {
    console.log('Second Subscription');
    sub.subscribe(x => console.log(x));
}, 1000);

Try Me

The first subscriber will see all the emissions from our subject, but the second will see nothing since it subscribes too late.

A BehaviorSubject will always emit the last value emitted to new subscribers. This means that it always has a value, and is created with an initial value.

This type of subject is perfect for representing values over time. In fact, the store in the Angular 2 library ngrx is backed by a BehaviorSubject.

const sub = new BehaviorSubject(0);

console.log('First Subscription');
sub.subscribe(x => console.log(x));

sub.next(1);
sub.next(2);
sub.next(3);

setTimeout(() => {
    console.log('Second Subscription');
    sub.subscribe(y => console.log(y));
}, 1000);

Try Me

Now, the second subscription will see 3, as it was the last value emitted.

ReplaySubject

A Replay Subject is similar to a behavior subject in that it remembers and replays past values, but it differs in some key ways.

It does not take an initial value as a parameter and therefore does not always have value. It does take one and optionally two parameters:

  1. A buffer size
  2. A window timeframe

The buffer size determines how many past values the subject will remember. For example a buffer size of 3 will always replay the last 3 values emitted to new subscribers.

const obs = Observable.of(1, 2, 3, 4, 5);
const sub = new ReplaySubject(3);

console.log('First Subscription');
sub.subscribe(x => console.log(x));
obs.subscribe(sub);

setTimeout(() => {
    console.log('Second Subscription');
    sub.subscribe(x => console.log(x));
}, 1000);

Try Me

The window is some window of time in milliseconds. Any emmision older than this window is dropped from memory.

const sub = new ReplaySubject(3, 250);

console.log('First Subscription');
sub.subscribe(x => console.log(x));

setTimeout(() => sub.next(1), 100);
setTimeout(() => sub.next(2), 200);
setTimeout(() => sub.next(3), 300);

setTimeout(() => {
    console.log('Second Subscription');
    sub.subscribe(x => console.log(x));
}, 400);

Try Me

Another difference is that a replay subject will always replay emissions even after the subject has completed. This is not true with a behavior subject.

AsyncSubject

The final type of subject is one that isn't often used, but ... hey it exists. It's called an AsyncSubject.

This type of subject only ever emits the last value emitted, and only after the subject has completed.

const sub = new AsyncSubject();

console.log('First Subscription');
sub.subscribe(x => console.log(x));

sub.next(1);
sub.next(2);
sub.next(3);

console.log('Second Subscription');
sub.subscribe(x => console.log(x));

setTimeout(() => sub.complete(), 1000);

Try Me

Both subscriptions only get 3, and only after we completed the subject.

results matching ""

    No results matching ""