Є оператори merge та concat, merge дозволяє мерджити потоки даних з двох джерел в одне.
Rx.Observable.interval(1000)
.take(3)
.map(v => `S1: ${v}`)
.merge(Rx.Observable.interval(1000)
.take(3)
.map(v => `S2: ${v}`)
)
.subscribe(console.log);
Ось ця фігня бере два потоки і зліплює їх в один, і на виході будемо мати
//чекаємо 1 сек
"S1: 0"
"S2: 0"
//чекаємо 1 сек
"S1: 1"
"S2: 1"
//чекаємо 1 сек
"S1: 2"
"S2: 2"
concat теж зліплює два джерела в одне, але воно зберігає чергу. Тому спочатку виведуться всі цифри для S1, а потім всі цифри для S2.
А от mergeMap і concatMap перемикаються з одного джерела, на інше.
Rx.Observable.interval(1000)
.take(3)
.map(v => `S1: ${v}`)
.mergeMap(v => Rx.Observable.interval(1000)
.take(3)
.map(v => `S2: ${v}`)
)
.subscribe(console.log);
Виведе ось це
"S2: 0"
"S2: 1"
"S2: 0"
"S2: 2"
"S2: 1"
"S2: 0"
"S2: 2"
"S2: 1"
"S2: 2"
тут 9 значень.
Працює воно ось так - коли перщий обс емітить значення, в mergeMap це значення перетворюється на новий Observable. В нашому випадку цей Observable емітить 0, 1, 2 з інтервалом в 1 секунду.
Тобто, виходить так, що коли s1 емітить 0, воно доходить до mergeMap, потім йде секунда паузи, і другий Observable емітить 0, в цей момент до mergeMap вже підходить 1 з першого обса, котрий в свою чергу перетворюється на ще один обс, котрий через 1 секунду емітить 0, але в цей самий час обс, котрий ми отримали при першому виклику mergeMap вже видасть 1, ну, ви зрозуміли...
При використанні mergeMap значення папєрєдніків перетворюються в обсервбли, і значення цих обсервлблів виводяться в одному потоці, в змішанному потоці.
В цей час concatMap працює так само, за виключенням того, що значення обсервблів будуть виходити по черзі.
Ось цей код
Rx.Observable.interval(1000)
.take(3)
.map(v => `S1: ${v}`)
.concatMap(v => Rx.Observable.interval(1000)
.take(3)
.map(v => `S2: ${v}`)
)
.subscribe(console.log);
поверне теж 9 значень, але всі обсервбли створені в concatMap будуть поставлені в чергу, і спочатку виведуться значення з найпершого обсервбла в черзі, потім другого, а потім третього, навіть якщо інтервал першого обсервбла буде 10, а другого 1, або взагалі не буде, то черга все одно буде зберігатись.
"S2: 0"
"S2: 1"
"S2: 2"
"S2: 0"
"S2: 1"
"S2: 2"
"S2: 0"
"S2: 1"
"S2: 2"