RxJS Release v2.2.14
This will be one of the last updates before we start making our way to version 2.3.
This release includes the following:
- Promises Integration
- BackPressure
- Bug Fixes
Promises Integration
A major highlight of this release is the integration with JavaScript Promises. With the upcoming ES6 standardization, and Promises showing up in browsers such as Chrome, now is the time to strike. In order for Promises integration to work with RxJS, the Promise implementation must be compatible with ES6, in that the constructor must look like the following:
var promise = new Promise(function (resolve, reject) {
resolve(42);
});
In previous releases, we've had the ability to convert a Promise to an Observable by using Rx.Observable.fromPromise
, but now, we have the ability to create Promises from Observable sequences by using the toPromise
method. You can either give the constructor to the toPromise
method or use our configuration with Rx.config.Promise
.
// Using the constructor
var promise = Rx.Observable.return(42).toPromise(RSVP.Promise);
promise.then(console.log.bind(console));
// Using configuration
Rx.config.Promise = RSVP.Promise;
var promise = Rx.Observable.return(42).toPromise();
promise.then(console.log.bind(console));
We have also implemented overloads to the following methods as to accept Promises, or even in some cases Observable of Promises. We have implemented flatMap
/selectMany
, concatAll
/concatObservable
, mergeAll
/mergeObservable
, switch
/switchLatest
, and finally startAsync
.
// Using flatMap/selectMany with a promise
var observable = Rx.Observable.return(42)
.flatMap(RSVP.Promise.resolve(56));
observable.subscribe(console.log.bind(console));
// Using flatMap/selectMany with a promise inside a function
var observable = Rx.Observable.return(42)
.flatMap(function (x, i) { return RSVP.Promise.resolve(x + 1); });
observable.subscribe(console.log.bind(console));
// => 43
// Using flatMap/selectMany with a promise inside a function with a result selector
var observable = Rx.Observable.return(42)
.flatMap(
function (x, i) { return RSVP.Promise.resolve(x + 1); },
function (x, y, i) { return { fst: x + i, snd: y }});
observable.subscribe(console.log.bind(console));
// => { fst: 42, snd: 43 }
// Concat support
var sources = Rx.Observable
.fromArray([
RSVP.Promise.resolve(42),
RSVP.Promise.resolve(56),
RSVP.Promise.resolve(72)]
)
.concatAll();
sources.subscribe(console.log.bind(console));
// => 42
// => 56
// => 72
// Merge support
var sources = Rx.Observable
.fromArray([
RSVP.Promise.resolve(42),
RSVP.Promise.resolve(56),
RSVP.Promise.resolve(72)]
)
.mergeAll();
sources.subscribe(console.log.bind(console));
// => 42
// => 56
// => 72
// Switch support
var sources = Rx.Observable
.fromArray([
RSVP.Promise.resolve(42),
RSVP.Promise.resolve(56),
RSVP.Promise.resolve(72)]
)
.switch();
sources.subscribe(console.log.bind(console));
// => 72
// StartAsync Support
var source = Rx.Observable.startAsync(function () {
return RSVP.Promise.resolve(42);
});
source.subscribe(console.log.bind(console));
// => 42
BackPressure
This is the first experimental release of backpressure. The idea is to pause and resume for a particular observable if the observer cannot keep up for whatever reason. To do this automatically seems to us naive, and instead, we should not punish the producer if the consumer cannot keep up, so we've set a pretty high bar for getting it right. You can now find them in their own file rx.backpressure.js
or if you're using rx.lite.js
, then you're already in luck because you have them already.
There are many ways around this problem of backpressure including using throttle
if you're ok with losing some data, the buffer
methods by time, count, etc, if you'd like the results in batch for a particular count or timeframe. In this case, we've added two methods, pausable
and pausableBuffer
.
With pausable
, you have the ability to pause a hot observable, such as mouse movements and then resume, but you will lose data in between the pause and resume methods. Below is an example of it in action.
var controller = new Rx.Subject();
var events = Rx.Observable.fromEvent(document, 'mousemove');
// Control the events by the controller
var controlled = events.pausable(controller);
var subscription = controlled.subscribe(function (e) {
// Do something with events
// Woops, too fast
// Pause the event stream
controller.onNext(false);
// When you want to start again, call controller.onNext(true);
});
// Start listening
controller.onNext(true);
The other is pausableBuffer
where you will not lose data, in fact, it will be kept in a buffer until you are ready to start consuming again.
var controller = new Rx.Subject();
var interval = Rx.Observable.interval(1000).timeInterval();
// Control the events by the controller
var controlled = interval.pausable(controller);
var subscription = controlled.subscribe(function (x) {
console.log('x', x.value);
});
// Start it
var shouldRun = true;
controller.onNext(shouldRun);
// Make it pause every so often
setIterval(function () {
controller.onNext(shouldRun = !shouldRun);
}, 5000);
In future releases, we will also cover ideas on how you can request a number of items each time, such as the following.
var source = Rx.Observable.range(0, 1000).controlled();
source.subscribe(function(x) {
console.log('x', x);
});
// Get 10 items
source.request(10);
// Maybe later get another
source.request(5);
This is just a first exploration into what's possible, but we're pretty excited about this.
Bug Fixes
Closed the following bugs: