RxJs Subject vs BehaviorSubject vs ReplaySubject vs AsyncSubject

Subject

  • Sends only upcoming values;
  • A Subject doesn't hold a value;

An RxJS Subject is an Observable that allows values to be multicasted to many Observers.

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

/*
Console output:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
*/

Every Subject is an Observer, so you may provide a Subject as the argument to the subscribe of any Observable, like the example below shows:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // You can subscribe providing a Subject

/*
Console output:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
*/

BehaviorSubject

  • Sends one previous value and upcoming values;
  • A BehaviorSubject holds one value. When it is subscribed it emits the value immediately;
  • BehaviorSubject can be created with initial value: new Rx.BehaviorSubject(1)
  • You can get current value synchronously by subject.value;
  • BehaviorSubject is the best for 90% of the cases to store current value comparing to other Subject types;
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);
console.log('Value async:', subject.value); // Access subject value synchronously
/*
Console output:
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
Value async: 3
*/

ReplaySubject

  • Sends all previous values and upcoming values
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

/*
Console output:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
*/

AsyncSubject

  • Sends one latest value when the stream will close
var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

/*
Console output:
observerA: 5
observerB: 5
*/

Sources:

Leave a Comment