File: /var/www/tana/frontend/node_modules/concurrently/node_modules/rx/src/core/linq/observable/expand.js
/**
* Expands an observable sequence by recursively invoking selector.
*
* @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again.
* @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler.
* @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion.
*/
observableProto.expand = function (selector, scheduler) {
isScheduler(scheduler) || (scheduler = immediateScheduler);
var source = this;
return new AnonymousObservable(function (observer) {
var q = [],
m = new SerialDisposable(),
d = new CompositeDisposable(m),
activeCount = 0,
isAcquired = false;
var ensureActive = function () {
var isOwner = false;
if (q.length > 0) {
isOwner = !isAcquired;
isAcquired = true;
}
if (isOwner) {
m.setDisposable(scheduler.scheduleRecursive(function (self) {
var work;
if (q.length > 0) {
work = q.shift();
} else {
isAcquired = false;
return;
}
var m1 = new SingleAssignmentDisposable();
d.add(m1);
m1.setDisposable(work.subscribe(function (x) {
observer.onNext(x);
var result = null;
try {
result = selector(x);
} catch (e) {
observer.onError(e);
}
q.push(result);
activeCount++;
ensureActive();
}, observer.onError.bind(observer), function () {
d.remove(m1);
activeCount--;
if (activeCount === 0) {
observer.onCompleted();
}
}));
self();
}));
}
};
q.push(source);
activeCount++;
ensureActive();
return d;
}, this);
};