javascript - How are asynchronous streams transmitted in RXJS? -
i'm trying understand how stream transmitted through pipe in rxjs.
know should not concern because that's whole idea async streams - still there's want understand.
looking @ code :
var source = rx.observable .range(1, 3) .flatmaplatest(function (x) { //`switch` these days... return rx.observable.range(x*100, 2); }); source.subscribe(value => console.log('i got value ', value))
result :
i got value 100 got value 200 got value 300 got value 301
i believe (iiuc) diagram : (notice striked 101,201 unsubscribed)
----1---------2------------------3------------------------------| ░░░░░░░░flatmaplatest(x=>rx.observable.range(x*100, 2))░░░░░░░░ -----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301-------------
and here question:
question:
is guaranteed 2 arrive before (101) ? same 3 arriving before (201) ?
i mean - if i'm not suppose @ time line legal following diagram occur :
----1---------------2---------------3------------------------------| ░░░░░░░░flatmaplatest(x=>rx.observable.range(x*100, 2))░░░░░░░░ -----100-------101------200---201-----300------301-------------
where 2
arrived slight delay 101 emitted
what missing here? how pipe work here ?
for particular observable chain particular rxjs version order of emissions going same.
as mentioned, in rxjs 4 uses currentthread
scheduler can see here: https://github.com/reactive-extensions/rxjs/blob/master/src/core/perf/operators/range.js#l39.
schedulers (except immediate
rxjs 4) internally using type of queue order same.
the order of events similar showed in diagram (... or @ least think is):
1
scheduled , emitted because it's action in queue.100
scheduled. @ point there no more action in scheduler's queue because2
hasn't been scheduled yet.rangeobservable
schedules emission recursively after callsonnext()
. means100
scheduled before2
.2
scheduled.100
emitted,101
scheduled2
emitted,101
disposed.- ... , on
note behavior different in rxjs 4 , rxjs 5.
in rxjs 5 observables , operators default don't use scheduler (an obvious exception observables/operator need work delays). in rxjs 5 rangeobservable
won't schedule anything , start emitting values right away in loop.
the same example in rxjs 5 produce different result:
const source = observable .range(1, 3) .switchmap(function (x) { return observable.range(x * 100, 2); }); source.subscribe(value => console.log('i got value ', value));
this print following:
i got value 100 got value 101 got value 200 got value 201 got value 300 got value 301
however, change if add example delay(0)
. common sense suggests shouldn't anything:
const source = observable .range(1, 3) .switchmap(function (x) { return observable.range(x * 100, 2).delay(0); }); source.subscribe(value => console.log('i got value ', value));
now inner rangeobservable
scheduled , disposed on again several times makes emit values last rangeobservable
:
i got value 300 got value 301
Comments
Post a Comment