Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Commit

Permalink
Fixing backpressure and adding controlled
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Mar 18, 2014
1 parent 0363218 commit c16e58b
Show file tree
Hide file tree
Showing 19 changed files with 1,262 additions and 34 deletions.
18 changes: 9 additions & 9 deletions Gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,11 @@ var browsers = [{
// Backpressure operators
'src/core/backpressure/pausable.js',
'src/core/backpressure/pausablebuffered.js',
/* 'src/core/backpressure/controlled.js',
'src/core/backpressure/controlled.js',
'src/core/backpressure/controlledobservable.js',
'src/core/backpressure/controlledsubject.js',
'src/core/backpressure/windowed.js',
'src/core/backpressure/windowedobservable.js',*/
//'src/core/backpressure/windowed.js',
//'src/core/backpressure/windowedobservable.js',

'src/core/anonymousobservable.js',
'src/core/autodetachobserver.js',
Expand Down Expand Up @@ -581,11 +581,11 @@ var browsers = [{
// Backpressure operators
'src/core/backpressure/pausable.js',
'src/core/backpressure/pausablebuffered.js',
/* 'src/core/backpressure/controlled.js',
'src/core/backpressure/controlled.js',
'src/core/backpressure/controlledobservable.js',
'src/core/backpressure/controlledsubject.js',
'src/core/backpressure/windowed.js',
'src/core/backpressure/windowedobservable.js',*/
//'src/core/backpressure/windowed.js',
//'src/core/backpressure/windowedobservable.js',

'src/core/anonymousobservable.js',
'src/core/autodetachobserver.js',
Expand All @@ -610,11 +610,11 @@ var browsers = [{
// Backpressure operators
'src/core/backpressure/pausable.js',
'src/core/backpressure/pausablebuffered.js',
/* 'src/core/backpressure/controlled.js',
'src/core/backpressure/controlled.js',
'src/core/backpressure/controlledobservable.js',
'src/core/backpressure/controlledsubject.js',
'src/core/backpressure/windowed.js',
'src/core/backpressure/windowedobservable.js',*/
//'src/core/backpressure/windowed.js',
//'src/core/backpressure/windowedobservable.js',

'src/core/suboutro.js'
],
Expand Down
2 changes: 1 addition & 1 deletion bower.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "rxjs",
"version": "2.2.14",
"version": "2.2.15",
"main": [
"rx.js",
"rx.compat.js",
Expand Down
2 changes: 1 addition & 1 deletion nuget/RxJS-BackPressure/RxJS-BackPressure.nuspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<package>
<metadata>
<id>RxJS-Async</id>
<id>RxJS-BackPressure</id>
<title>Reactive Extensions for JavaScript - BackPressure-Based Operations</title>
<!-- Automatically updated by build, keeping fixed dev build number here in case of local build -->
<version>2.2.7</version>
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "rx",
"title": "Reactive Extensions for JavaScript (RxJS)",
"description": "Library for composing asynchronous and event-based operations in JavaScript",
"version": "2.2.14",
"version": "2.2.15",
"homepage": "https://github.com/Reactive-Extensions/RxJS",
"author": {
"name": "Cloud Programmability Team",
Expand Down
265 changes: 259 additions & 6 deletions rx.backpressure.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@
// References
var Observable = Rx.Observable,
observableProto = Observable.prototype,
AnonymousObservable = Rx.Internals.AnonymousObservable,
AnonymousObservable = Rx.AnonymousObservable,
CompositeDisposable = Rx.CompositeDisposable,
Subject = Rx.Subject,
Observer = Rx.Observer,
disposableEmpty = Rx.Disposable.empty,
disposableCreate = Rx.Disposable.create,
inherits = Rx.Internals.inherits,
addProperties = Rx.Internals.addProperties,
inherits = Rx.internals.inherits,
addProperties = Rx.internals.addProperties,
timeoutScheduler = Rx.Scheduler.timeout;

var objectDisposed = 'Object has been disposed';
function checkDisposed() {
if (this.isDisposed) { throw new Error(objectDisposed); }
}
function checkDisposed() { if (this.isDisposed) { throw new Error(objectDisposed); } }
function identity (x) { return x; }

/**
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false.
Expand Down Expand Up @@ -77,5 +77,258 @@
return new CompositeDisposable(subscription, connection, pausable);
});
};
function combineLatestSource(source, subject, resultSelector) {
return new AnonymousObservable(function (observer) {
var n = 2,
hasValue = [false, false],
hasValueAll = false,
isDone = false,
values = new Array(n);

function next(x, i) {
values[i] = x
var res;
hasValue[i] = true;
if (hasValueAll || (hasValueAll = hasValue.every(identity))) {
try {
res = resultSelector.apply(null, values);
} catch (ex) {
observer.onError(ex);
return;
}
observer.onNext(res);
} else if (isDone) {
observer.onCompleted();
}
}

return new CompositeDisposable(
source.subscribe(
function (x) {
next(x, 0);
},
observer.onError.bind(observer),
function () {
isDone = true;
observer.onCompleted();
}),
subject.subscribe(
function (x) {
next(x, 1);
},
observer.onError.bind(observer))
);
});
}

/**
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false,
* and yields the values that were buffered while paused.
* @example
* var pauser = new Rx.Subject();
* var source = Rx.Observable.interval(100).pausableBuffered(pauser);
* @param {Observable} pauser The observable sequence used to pause the underlying sequence.
* @returns {Observable} The observable sequence which is paused based upon the pauser.
*/
observableProto.pausableBuffered = function (subject) {
var source = this;
return new AnonymousObservable(function (observer) {
var q = [], previous = true;

var subscription =
combineLatestSource(
source,
subject.distinctUntilChanged(),
function (data, shouldFire) {
return { data: data, shouldFire: shouldFire };
})
.subscribe(
function (results) {
if (results.shouldFire && previous) {
observer.onNext(results.data);
}
if (results.shouldFire && !previous) {
while (q.length > 0) {
observer.onNext(q.shift());
}
previous = true;
} else if (!results.shouldFire && !previous) {
q.push(results.data);
} else if (!results.shouldFire && previous) {
previous = false;
}

},
observer.onError.bind(observer),
observer.onCompleted.bind(observer)
);

subject.onNext(false);

return subscription;
});
};

/**
* Attaches a controller to the observable sequence with the ability to queue.
* @example
* var source = Rx.Observable.interval(100).controlled();
* source.request(3); // Reads 3 values
* @param {Observable} pauser The observable sequence used to pause the underlying sequence.
* @returns {Observable} The observable sequence which is paused based upon the pauser.
*/
observableProto.controlled = function (enableQueue) {
if (enableQueue == null) { enableQueue = true; }
return new ControlledObservable(this, enableQueue);
};
var ControlledObservable = (function (_super) {

inherits(ControlledObservable, _super);

function subscribe (observer) {
return this.source.subscribe(observer);
}

function ControlledObservable (source, enableQueue) {
_super.call(this, subscribe);
this.subject = new ControlledSubject(enableQueue);
this.source = source.multicast(this.subject).refCount();
}

ControlledObservable.prototype.request = function (numberOfItems) {
if (numberOfItems == null) { numberOfItems = -1; }
return this.subject.request(numberOfItems);
};

return ControlledObservable;

}(Observable));

var ControlledSubject = Rx.ControlledSubject = (function (_super) {

function subscribe (observer) {
return this.subject.subscribe(observer);
}

inherits(ControlledSubject, _super);

function ControlledSubject(enableQueue) {
if (enableQueue == null) {
enableQueue = true;
}

_super.call(this, subscribe);
this.subject = new Subject();
this.enableQueue = enableQueue;
this.queue = enableQueue ? [] : null;
this.requestedCount = 0;
this.requestedDisposable = disposableEmpty;
this.error = null;
this.hasFailed = false;
this.hasCompleted = false;
this.controlledDisposable = disposableEmpty;
}

addProperties(ControlledSubject.prototype, Observer, {
onCompleted: function () {
checkDisposed.call(this);
this.hasCompleted = true;

if (!this.enableQueue || this.queue.length === 0) {
this.subject.onCompleted();
}
},
onError: function (error) {
checkDisposed.call(this);
this.hasFailed = true;
this.error = error;

if (!this.enableQueue || this.queue.length === 0) {
this.subject.onError(error);
}
},
onNext: function (value) {
checkDisposed.call(this);
var hasRequested = false;

if (this.requestedCount === 0) {
if (this.enableQueue) {
this.queue.push(value);
}
} else {
if (this.requestedCount !== -1) {
if (this.requestedCount-- === 0) {
this.disposeCurrentRequest();
}
}
hasRequested = true;
}

if (hasRequested) {
this.subject.onNext(value);
}
},
_processRequest: function (numberOfItems) {
if (this.enableQueue) {
//console.log('queue length', this.queue.length);

while (this.queue.length >= numberOfItems && numberOfItems > 0) {
//console.log('number of items', numberOfItems);
this.subject.onNext(this.queue.shift());
numberOfItems--;
}

if (this.queue.length !== 0) {
return { numberOfItems: numberOfItems, returnValue: true };
} else {
return { numberOfItems: numberOfItems, returnValue: false };
}
}

if (this.hasFailed) {
this.subject.onError(this.error);
this.controlledDisposable.dispose();
this.controlledDisposable = disposableEmpty;
} else if (this.hasCompleted) {
this.subject.onCompleted();
this.controlledDisposable.dispose();
this.controlledDisposable = disposableEmpty;
}

return { numberOfItems: numberOfItems, returnValue: false };
},
request: function (number) {
checkDisposed.call(this);
this.disposeCurrentRequest();
var self = this,
r = this._processRequest(number);

number = r.numberOfItems;
if (!r.returnValue) {
this.requestedCount = number;
this.requestedDisposable = disposableCreate(function () {
self.requestedCount = 0;
});

return this.requestedDisposable
} else {
return disposableEmpty;
}
},
disposeCurrentRequest: function () {
this.requestedDisposable.dispose();
this.requestedDisposable = disposableEmpty;
},

dispose: function () {
this.isDisposed = true;
this.error = null;
this.subject.dispose();
this.requestedDisposable.dispose();
}
});

return ControlledSubject;
}(Observable));
return Rx;
}));
2 changes: 1 addition & 1 deletion rx.backpressure.min.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c16e58b

Please sign in to comment.