File: /var/www/tana/frontend/node_modules/concurrently/node_modules/rx/src/core/linq/observable/window.js
/**
* Projects each element of an observable sequence into zero or more windows.
*
* @param {Mixed} windowOpeningsOrClosingSelector Observable sequence whose elements denote the creation of new windows, or, a function invoked to define the boundaries of the produced windows (a new window is started when the previous one is closed, resulting in non-overlapping windows).
* @param {Function} [windowClosingSelector] A function invoked to define the closing of each produced window. If a closing selector function is specified for the first parameter, this parameter is ignored.
* @returns {Observable} An observable sequence of windows.
*/
observableProto.window = function (windowOpeningsOrClosingSelector, windowClosingSelector) {
if (arguments.length === 1 && typeof arguments[0] !== 'function') {
return observableWindowWithBoundaries.call(this, windowOpeningsOrClosingSelector);
}
return typeof windowOpeningsOrClosingSelector === 'function' ?
observableWindowWithClosingSelector.call(this, windowOpeningsOrClosingSelector) :
observableWindowWithOpenings.call(this, windowOpeningsOrClosingSelector, windowClosingSelector);
};
function observableWindowWithOpenings(windowOpenings, windowClosingSelector) {
return windowOpenings.groupJoin(this, windowClosingSelector, observableEmpty, function (_, win) {
return win;
});
}
function observableWindowWithBoundaries(windowBoundaries) {
var source = this;
return new AnonymousObservable(function (observer) {
var win = new Subject(),
d = new CompositeDisposable(),
r = new RefCountDisposable(d);
observer.onNext(addRef(win, r));
d.add(source.subscribe(function (x) {
win.onNext(x);
}, function (err) {
win.onError(err);
observer.onError(err);
}, function () {
win.onCompleted();
observer.onCompleted();
}));
isPromise(windowBoundaries) && (windowBoundaries = observableFromPromise(windowBoundaries));
d.add(windowBoundaries.subscribe(function (w) {
win.onCompleted();
win = new Subject();
observer.onNext(addRef(win, r));
}, function (err) {
win.onError(err);
observer.onError(err);
}, function () {
win.onCompleted();
observer.onCompleted();
}));
return r;
}, source);
}
function observableWindowWithClosingSelector(windowClosingSelector) {
var source = this;
return new AnonymousObservable(function (observer) {
var m = new SerialDisposable(),
d = new CompositeDisposable(m),
r = new RefCountDisposable(d),
win = new Subject();
observer.onNext(addRef(win, r));
d.add(source.subscribe(function (x) {
win.onNext(x);
}, function (err) {
win.onError(err);
observer.onError(err);
}, function () {
win.onCompleted();
observer.onCompleted();
}));
function createWindowClose () {
var windowClose;
try {
windowClose = windowClosingSelector();
} catch (e) {
observer.onError(e);
return;
}
isPromise(windowClose) && (windowClose = observableFromPromise(windowClose));
var m1 = new SingleAssignmentDisposable();
m.setDisposable(m1);
m1.setDisposable(windowClose.take(1).subscribe(noop, function (err) {
win.onError(err);
observer.onError(err);
}, function () {
win.onCompleted();
win = new Subject();
observer.onNext(addRef(win, r));
createWindowClose();
}));
}
createWindowClose();
return r;
}, source);
}