April 26, 2016

【RxSwift】concatとmergeの違いをサンプルを元に整理してみる

Categories: 技術 | Tags: #RxSwift #Swift #Tips #メモ


最近はRxSwiftにどっぷりハマっているのですが、いまいちconcatmergeの違いが掴めてなかったので、
再度調べたりサンプルコードを書きながら整理してみました。

まずは2つのマーブル図を見てみる

整理するために、ReactiveX公式のマーブル図を見てみます。

image

image

図をみると、どちらも2つのものを合成する関数ですが、

mergeとconcatの違いはmergeはObservableを合成する際に全てのストリームの順を考慮して合成のに対し、concatは引数で渡されたObservableの順で合成すること。

(参考: 非同期や並列処理にも役立つRxJavaの使い方)

という違いがあるようです。
mergeはストリームの流れを見て、うまく合成して順番を決めますが、
concatは最初に合成した時点での順番に依存するようですね。

実際に動かしてみる

以上をふまえて、2つのPublicSubjectを生成して、それぞれconcatmergeで合成したストリームを作って、そこに値を流してみます。

let ps1 = PublishSubject<Int>()
let ps2 = PublishSubject<Int>()

let disposeBag = DisposeBag()

Observable
    .of(ps1, ps2)
    .merge()
    .subscribeNext {
        print("★1", $0) // ★1
    }
    .addDisposableTo(disposeBag)

Observable
    .of(ps1, ps2)
    .concat()
    .subscribeNext {
        print("★2", $0) // ★2
    }
    .addDisposableTo(disposeBag)

こんな感じでIntの値を流すストリームを作ってみました。 以下のように、ps1ps2にそれぞれ交互に値を流してみます。

ps1.onNext(1)
ps2.onNext(2)
ps1.onNext(3)
ps2.onNext(4)
ps1.onNext(5)
ps1.onCompleted() // ここで、ps1をcompleteにしてみる
ps2.onNext(6)
ps1.onNext(7) // ちなみにcompleteした後なのでこの値は流れない!

途中でps1complete にもしてみました。
結果として、

★1(merge) : 1 → 2 → 3 → 4 → 5 → 6
★2(concat) : 1 → 35 → 6

という結果が得られました。
やはり、mergeは流れてきた順に値を subscribe していましたが、concatps1ps2と合成したので、ps1complete になるまでは、
ps2から流れてきた値は subscribe していなかったです。

今までなんとなくconcatmergeを使っていたので、これからは気をつけつつ使おうと思います。
(特に非同期周りで使う時に間違えると意図しない結果になりそうなので…)

余談

こちらの記事を参考に、RxSwiftを試せるPlaygroundを構築してみました。
ちなみに

import XCPlayground

XCPlaygroundPage.currentPage.needsIndefiniteExecution = true

を加えてあげると、非同期(NSThreadのsleepとかうまく使って)でストリーム流して遊べるようになるのでおすすめです。
上記参考記事でPlaygroundのセットアップをして、下記のコードを貼り付ければ、この記事の内容で遊べます。

import XCPlayground
import UIKit
import RxSwift
import RxCocoa

XCPlaygroundPage.currentPage.needsIndefiniteExecution = true

let ps1 = PublishSubject<Int>()
let ps2 = PublishSubject<Int>()

let disposeBag = DisposeBag()

Observable
    .of(ps1, ps2)
    .merge()
    .subscribeNext {
        print($0)
    }
    .addDisposableTo(disposeBag)

Observable
    .of(ps1, ps2)
    .concat()
    .subscribeNext {
        print($0)
    }
    .addDisposableTo(disposeBag)

ps1.onNext(1)
NSThread.sleepForTimeInterval(0.5)
ps2.onNext(2)
NSThread.sleepForTimeInterval(0.5)
ps1.onNext(3)
NSThread.sleepForTimeInterval(0.5)
ps2.onNext(4)
NSThread.sleepForTimeInterval(0.5)
ps1.onNext(5)
NSThread.sleepForTimeInterval(0.5)
ps1.onCompleted()
NSThread.sleepForTimeInterval(0.5)
ps2.onNext(6)
NSThread.sleepForTimeInterval(0.5)
ps1.onNext(7)

0.5秒刻みで、ストリームに値が流れていきます。


written by sgr-ksmt