Subjects
Definition
A subject is both an observable and an observer, meaning a subject can both be subscribe
d to and subscribe
to other things.
Let's define what is meant by observer
. Recall that whenever you subscribe
to something you are passing one to three functions: a next
, error
, and complete
function.
const obs = Observable.of(1, 2, 3);
obs.subscribe(
x => console.log(x), // next handler
err => console.log(err), // error handler
() => console.log('completed') // complete handler
);
In fact, when you use the create
method, you are calling these functions directly.
const obs = Observable.create(observer => {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch(err) {
observer.error(err);
}
});
obs.subscribe(
x => console.log(x), // next handler
err => console.log(err), // error handler
() => console.log('completed') // complete handler
);
An observer is anything that implements these methods. The subscribe
method can either take functions themselves as arguments, or an object that defines the methods. In the former case, it creates the object itself. So the first example could look like this:
const obs = Observable.of(1, 2, 3);
const observer = {
next: x => console.log(x), // next handler
error: err => console.log(err), // error handler
complete: () => console.log('completed') // complete handler
};
obs.subscribe(observer);
A subject is both an observable and an observer. Lets see what that looks like.
const obs = Observable.of(1, 2, 3);
const subject = new Subject();
subject.subscribe(x => console.log(x));
obs.subscribe(subject);
So what did we just do. We created an observable, exactly like we have before, that emits just three things: 1, 2, and 3.
We subscribed to our subject. Since subjects are also observables
, we can subscribe
to them just like any other observable.
We subscribed to the observable, just like have before, but instead of giving it functions, we gave it a subject.
This is what is meant by a subject is an observer
, it has its next
, error
, and complete
methods already defined.
Basically our subject is acting as an intermediary between our original observable and our subscription.
Our source observable emits 1
, the subject subscribe
s to it and receives that 1
, we subscribe to the subject, so we receive the 1
, and then we console log out the 1
.
So why would anyone do this, what is the point? Well, recall that each subscription gets their own independent execution of an observable. Meaning each observer makes the observable emit everything to it independently.
Subjects are special. They are the real subscriber to the source observable, and they can share their independent execution with all of their subscribers.
const obs = Observable.interval(1000).take(10);
const subject = new Subject();
subject.subscribe(x => console.log(x));
obs.subscribe(subject);
setTimeout(() => {
subject.subscribe(x => console.log(x));
}, 4500);
In the case above, after 4.5 seconds, we are adding another subscription to the subject. You will see that instead of starting at 0
, like the first subscription, it will start at the same value as the first subscription.
This is because the subject shares its execution with its subscribers (aka observers).
Another useful feature of subjects is, because they are also observers, they have their next
, error
, and complete
methods defined and we can use them freely. This allows us to programmatically inject new values to our downstream subscribers.
const subject = new Subject();
subject.subscribe(x => console.log(x));
subject.next(1);
subject.next(2);
subject.next(3);
This behaves just as before, but we got rid of our observable and are manually emitting values to our downstream subscribers.
HERE BE DRAGONS This is powerful, but it goes against what RxJS is for. Remember, RxJS is named after Reactive Programming
. We want to as much as possible react to streams instead of messing around with their internals.
BehaviorSubject
There are different kinds of subjects that are made to suit specific use cases. The first of these is the BehaviorSubject
.
A BehaviorSubject
is a subject that remembers the last value emitted. Since subjects provide shared execution, it's possible that later subscribers will miss the items emitted. Consider this example:
const sub = new Subject();
console.log('First Subscription');
sub.subscribe(x => console.log(x));
sub.next(1);
sub.next(2);
sub.next(3);
setTimeout(() => {
console.log('Second Subscription');
sub.subscribe(x => console.log(x));
}, 1000);
The first subscriber will see all the emissions from our subject, but the second will see nothing since it subscribes too late.
A BehaviorSubject
will always emit the last value emitted to new subscribers. This means that it always has a value, and is created with an initial value.
This type of subject is perfect for representing values over time. In fact, the store
in the Angular 2 library ngrx
is backed by a BehaviorSubject
.
const sub = new BehaviorSubject(0);
console.log('First Subscription');
sub.subscribe(x => console.log(x));
sub.next(1);
sub.next(2);
sub.next(3);
setTimeout(() => {
console.log('Second Subscription');
sub.subscribe(y => console.log(y));
}, 1000);
Now, the second subscription will see 3
, as it was the last value emitted.
ReplaySubject
A Replay Subject
is similar to a behavior subject in that it remembers and replays past values, but it differs in some key ways.
It does not take an initial value as a parameter and therefore does not always have value. It does take one and optionally two parameters:
- A buffer size
- A window timeframe
The buffer size determines how many past values the subject will remember. For example a buffer size of 3 will always replay the last 3 values emitted to new subscribers.
const obs = Observable.of(1, 2, 3, 4, 5);
const sub = new ReplaySubject(3);
console.log('First Subscription');
sub.subscribe(x => console.log(x));
obs.subscribe(sub);
setTimeout(() => {
console.log('Second Subscription');
sub.subscribe(x => console.log(x));
}, 1000);
The window is some window of time in milliseconds. Any emmision older than this window is dropped from memory.
const sub = new ReplaySubject(3, 250);
console.log('First Subscription');
sub.subscribe(x => console.log(x));
setTimeout(() => sub.next(1), 100);
setTimeout(() => sub.next(2), 200);
setTimeout(() => sub.next(3), 300);
setTimeout(() => {
console.log('Second Subscription');
sub.subscribe(x => console.log(x));
}, 400);
Another difference is that a replay subject will always replay emissions even after the subject has completed. This is not true with a behavior subject.
AsyncSubject
The final type of subject is one that isn't often used, but ... hey it exists. It's called an AsyncSubject
.
This type of subject only ever emits the last value emitted, and only after the subject has completed.
const sub = new AsyncSubject();
console.log('First Subscription');
sub.subscribe(x => console.log(x));
sub.next(1);
sub.next(2);
sub.next(3);
console.log('Second Subscription');
sub.subscribe(x => console.log(x));
setTimeout(() => sub.complete(), 1000);
Both subscriptions only get 3
, and only after we completed the subject.