javascript - How are asynchronous streams transmitted in RXJS? -


i'm trying understand how stream transmitted through pipe in rxjs.
know should not concern because that's whole idea async streams - still there's want understand.

looking @ code :

var source = rx.observable     .range(1, 3)     .flatmaplatest(function (x) {  //`switch` these days...         return rx.observable.range(x*100, 2);     });    source.subscribe(value => console.log('i got value ', value)) 

result :

i got value 100 got value 200 got value 300 got value 301 

i believe (iiuc) diagram : (notice striked 101,201 unsubscribed)

----1---------2------------------3------------------------------|  ░░░░░░░░flatmaplatest(x=>rx.observable.range(x*100, 2))░░░░░░░░ -----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301------------- 

and here question:

question:

is guaranteed 2 arrive before (101) ? same 3 arriving before (201) ?

i mean - if i'm not suppose @ time line legal following diagram occur :

----1---------------2---------------3------------------------------|  ░░░░░░░░flatmaplatest(x=>rx.observable.range(x*100, 2))░░░░░░░░ -----100-------101------200---201-----300------301------------- 

where 2 arrived slight delay 101 emitted

what missing here? how pipe work here ?

for particular observable chain particular rxjs version order of emissions going same.

as mentioned, in rxjs 4 uses currentthread scheduler can see here: https://github.com/reactive-extensions/rxjs/blob/master/src/core/perf/operators/range.js#l39.
schedulers (except immediate rxjs 4) internally using type of queue order same.

the order of events similar showed in diagram (... or @ least think is):

  1. 1 scheduled , emitted because it's action in queue.
  2. 100 scheduled. @ point there no more action in scheduler's queue because 2 hasn't been scheduled yet. rangeobservable schedules emission recursively after calls onnext(). means 100 scheduled before 2.
  3. 2 scheduled.
  4. 100 emitted, 101 scheduled
  5. 2 emitted, 101 disposed.
  6. ... , on

note behavior different in rxjs 4 , rxjs 5.

in rxjs 5 observables , operators default don't use scheduler (an obvious exception observables/operator need work delays). in rxjs 5 rangeobservable won't schedule anything , start emitting values right away in loop.

the same example in rxjs 5 produce different result:

const source = observable   .range(1, 3)   .switchmap(function (x) {     return observable.range(x * 100, 2);   });  source.subscribe(value => console.log('i got value ', value)); 

this print following:

i got value  100 got value  101 got value  200 got value  201 got value  300 got value  301 

however, change if add example delay(0). common sense suggests shouldn't anything:

const source = observable   .range(1, 3)   .switchmap(function (x) {     return observable.range(x * 100, 2).delay(0);   });  source.subscribe(value => console.log('i got value ', value)); 

now inner rangeobservable scheduled , disposed on again several times makes emit values last rangeobservable:

i got value  300 got value  301 

Comments

Popular posts from this blog

php - Permission denied. Laravel linux server -

google bigquery - Delta between query execution time and Java query call to finish -

python - Pandas two dataframes multiplication? -