File: /var/www/tana/frontend/node_modules/concurrently/node_modules/rx/src/core/subjects/replaysubject.js
/**
* Represents an object that is both an observable sequence as well as an observer.
* Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
*/
var ReplaySubject = Rx.ReplaySubject = (function (__super__) {
function createRemovableDisposable(subject, observer) {
return disposableCreate(function () {
observer.dispose();
!subject.isDisposed && subject.observers.splice(subject.observers.indexOf(observer), 1);
});
}
function subscribe(observer) {
var so = new ScheduledObserver(this.scheduler, observer),
subscription = createRemovableDisposable(this, so);
checkDisposed.call(this);
this._trim(this.scheduler.now());
this.observers.push(so);
for (var i = 0, len = this.q.length; i < len; i++) {
so.onNext(this.q[i].value);
}
if (this.hasError) {
so.onError(this.error);
} else if (this.isStopped) {
so.onCompleted();
}
so.ensureActive();
return subscription;
}
inherits(ReplaySubject, __super__);
/**
* Initializes a new instance of the ReplaySubject class with the specified buffer size, window size and scheduler.
* @param {Number} [bufferSize] Maximum element count of the replay buffer.
* @param {Number} [windowSize] Maximum time length of the replay buffer.
* @param {Scheduler} [scheduler] Scheduler the observers are invoked on.
*/
function ReplaySubject(bufferSize, windowSize, scheduler) {
this.bufferSize = bufferSize == null ? Number.MAX_VALUE : bufferSize;
this.windowSize = windowSize == null ? Number.MAX_VALUE : windowSize;
this.scheduler = scheduler || currentThreadScheduler;
this.q = [];
this.observers = [];
this.isStopped = false;
this.isDisposed = false;
this.hasError = false;
this.error = null;
__super__.call(this, subscribe);
}
addProperties(ReplaySubject.prototype, Observer.prototype, {
/**
* Indicates whether the subject has observers subscribed to it.
* @returns {Boolean} Indicates whether the subject has observers subscribed to it.
*/
hasObservers: function () {
return this.observers.length > 0;
},
_trim: function (now) {
while (this.q.length > this.bufferSize) {
this.q.shift();
}
while (this.q.length > 0 && (now - this.q[0].interval) > this.windowSize) {
this.q.shift();
}
},
/**
* Notifies all subscribed observers about the arrival of the specified element in the sequence.
* @param {Mixed} value The value to send to all observers.
*/
onNext: function (value) {
checkDisposed.call(this);
if (this.isStopped) { return; }
var now = this.scheduler.now();
this.q.push({ interval: now, value: value });
this._trim(now);
var o = this.observers.slice(0);
for (var i = 0, len = o.length; i < len; i++) {
var observer = o[i];
observer.onNext(value);
observer.ensureActive();
}
},
/**
* Notifies all subscribed observers about the exception.
* @param {Mixed} error The exception to send to all observers.
*/
onError: function (error) {
checkDisposed.call(this);
if (this.isStopped) { return; }
this.isStopped = true;
this.error = error;
this.hasError = true;
var now = this.scheduler.now();
this._trim(now);
var o = this.observers.slice(0);
for (var i = 0, len = o.length; i < len; i++) {
var observer = o[i];
observer.onError(error);
observer.ensureActive();
}
this.observers = [];
},
/**
* Notifies all subscribed observers about the end of the sequence.
*/
onCompleted: function () {
checkDisposed.call(this);
if (this.isStopped) { return; }
this.isStopped = true;
var now = this.scheduler.now();
this._trim(now);
var o = this.observers.slice(0);
for (var i = 0, len = o.length; i < len; i++) {
var observer = o[i];
observer.onCompleted();
observer.ensureActive();
}
this.observers = [];
},
/**
* Unsubscribe all observers and release resources.
*/
dispose: function () {
this.isDisposed = true;
this.observers = null;
}
});
return ReplaySubject;
}(Observable));