RxRelay는 PublishRelay와 BehaviorRelay의 두 종류의 릴레이를 제공합니다. 기존 PublishSubject와 BehaviorSubject와는 두 가지 차이점을 가집니다. :
Relay는 .error 이벤트가 발생하지 않습니다. 즉 릴레이는 .next 이벤트 만 방출하고 절대로 .error / .complete 이벤트가 나오지 않습니다. 마지막에 .dispose만 발생합니다.
.onNext()
, .onError()
, .onComplete()
가 없습니다. 항목을 내보내기 위해서는 .onNext()
대신 .accept()
를 사용합니다.Subject의 경우 오류가 발생할 경우 sequence가 종료되지만, Relay는 dispose가 불리기 전까지 종료되지 않기 때문에 UI 이벤트를 받을 때 사용해볼 수 있을 것 같습니다.
func testForPublishRelay() {
let disposeBag = DisposeBag()
let publishRelay = PublishRelay<String>()
publishRelay.accept("Is Anyone Else?")
publishRelay.subscribe(onNext: { str in
print("observer 1 : \(str)")
}, onCompleted: {
print("observer 1 complete ")
}, onDisposed: {
print("observer 1 disposed ")
}).disposed(by: disposeBag)
publishRelay.accept("Hi")
publishRelay.accept("Observers")
publishRelay.subscribe(onNext: { str in
print("observer 2 : \(str)")
}, onCompleted: {
print("observer 2 complete ")
}, onDisposed: {
print("observer 2 disposed ")
}).disposed(by: disposeBag)
publishRelay.accept("nice to")
publishRelay.accept("meet you guys.")
}
observer 1 : Hi
observer 1 : Observers
observer 1 : nice to
observer 2 : nice to
observer 1 : meet you guys.
observer 2 : meet you guys.
observer 1 disposed
observer 2 disposed
value
property로 현재 값을 가져올 수 있습니다.
func testForBehaviorRelay() {
let disposeBag = DisposeBag()
let behaviorRelay = BehaviorRelay<String>(value: "Is Anyone Else?")
behaviorRelay.subscribe(onNext: { str in
print("observer 1 : \(str)")
}, onCompleted: {
print("observer 1 complete ")
}, onDisposed: {
print("observer 1 disposed ")
}).disposed(by: disposeBag)
behaviorRelay.accept("Hi")
behaviorRelay.accept("Observers")
behaviorRelay.subscribe(onNext: { str in
print("observer 2 : \(str)")
}, onCompleted: {
print("observer 2 complete ")
}, onDisposed: {
print("observer 2 disposed ")
}).disposed(by: disposeBag)
behaviorRelay.accept("nice to")
behaviorRelay.accept("meet you guys.")
}
observer 1 : Is Anyone Else?
observer 1 : Hi
observer 1 : Observers
observer 2 : Observers
observer 1 : nice to
observer 2 : nice to
observer 1 : meet you guys.
observer 2 : meet you guys.
observer 1 disposed
observer 2 disposed
Observable Sequence에 변형 클로적를 적용하고 이를 기반으로 새로운 Observable Sequence를 생성합니다.
func testMap() {
let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
1
4
9
Observable에 의해 방출된 항목들을 원하는 형태의 Observable로 변환합니다. Observable의 항목을 다시 Observable
func testFlatMap() {
let disposeBag = DisposeBag()
struct Player {
init(score: Int) {
self.score = BehaviorSubject(value: score)
}
let score: BehaviorSubject<Int>
}
let 👦🏻 = Player(score: 80)
let 👧🏼 = Player(score: 90)
let player = BehaviorSubject(value: 👦🏻)
player.asObservable() // Observale<Player>
.flatMap { $0.score.asObservable() } // Observable<BehaviorSubject> -> Observable<Observable<Int>>
.subscribe(onNext: { print("flatmap : \($0)") })
.disposed(by: disposeBag)
👦🏻.score.onNext(85)
player.onNext(👧🏼)
👦🏻.score.onNext(95)
👧🏼.score.onNext(100)
}
👦🏻를 초기값으로 가지는 BehaviorSubject
를 구독하는 관찰자 player
가 있습니다.
plyaer에 대한 onNext()는 한 번만 불립니다.
하지만 BehaviorSubject
이기 때문에 기본값인 👦🏻와, onNext(👧🏼)
로 총 2개의 항목이 배출됩니다.
그리고 각 player의 score 또한 BehaviorSubject
로, Observable Stream입니다.
굳이 표현하자면, BehaviorSubject로 넘어오는 항목의 형태는 Observable<Player>
이며, score는 Observable<Observable<Item>>
으로 표현될 수 있습니다.
flatmap
을 통해서 Observable<Player>
의 score를 .asObservable()
로 Observable<Observable<Item>>
형태로 변환했습니다.
그 결과는 다음과 같습니다.
flatmap : 80
flatmap : 85
flatmap : 90
flatmap : 95
flatmap : 100
onNext(👧🏼)
를 통해 배출되는 항목이 달라졌지만, player의 score : BehaviorSubject
에 subscribe를 했기 때문에 계속 관찰자로서 변화를 받아들일 수 있습니다.
위에서 설명했던 .map
으로 변경하면 결과가 어떨까요? 연산자를 .flapMap
에서 .map
으로 변경하면 다음과 같은 결과를 가집니다.
flatmap : RxSwift.BehaviorSubject<Swift.Int>
flatmap : RxSwift.BehaviorSubject<Swift.Int>
.map
을 통해 Player의 score로 변환했기 때문에 넘어오는 항목은 BehaviorSubject<Int>
타입이 넘어옵니다.
flatMap은 Observable Sequence를 Observable처럼 평평하게 풀어내주지만, Map은 변환만 해줍니다.
그럼 flatMapLatest는 뭘까요? 마찬가지로 연산자를 .flatMapLatest
로 변경한 결과는 다음과 같습니다.
flatmap : 80
flatmap : 85
flatmap : 90
flatmap : 100
마지막 Observable Sequence만을 유지합니다. 이전과는 달리 👦🏻의 스코어가 95점으로 변경된 이벤트를 받지 못합니다.
자매품으로 .flatMapFirst
가 있습니다. 이 연산자는 처음으로 들어온 Observable Sequence만을 유지합니다.
flatmap : 80
flatmap : 85
flatmap : 95
onNext(👧🏼)
에 대한 이벤트를 전혀 받아오지 못합니다.
초기 시드값(Int)를 가지고 들어오는 Observable 항목에 대한 변경을 하고, 변경된 값을 다시 시드값으로 삼아서 다음 들어오는 항목에 사용합니다.
func testScan() {
let disposeBag = DisposeBag()
Observable.of("Hello", "Every", "body")
.scan("start") { aggregateValue, newValue -> String in
"\(aggregateValue) : \(newValue)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
start : Hello
start : Hello : Every
start : Hello : Every : body
이를 기반으로 Int의 1승부터 10승까지를 출력해주는 기능을 만들어봅시다~
func power(element: Int) {
Observable.repeatElement(element)
.scan(1) { aggregateValue, newValue -> Int in
aggregateValue * newValue
}
.take(10)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
power(2)
2
4
8
16
32
64
128
256
512
1024
앞의 값을 시드 삼아 만들기 때문에 제곱한 결과가 나오게 됩니다 XD
Buffer는 omit 되는 observable를 지정된 크기만큼 담고 있다가 그 버퍼 크기가 꽉 차면 그때 observable을 배열로 내보내는 연산자입니다.
func testBuffer() {
let disposeBag = DisposeBag()
Observable<Int>.range(start: 1, count: 10)
.buffer(timeSpan: .seconds(2), count: 3, scheduler: SerialDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]
GroupBy는 주어진 조건에 따라 Observable을 Grouping 해서 Observable<GroupedObservable<Key, Int>>
형태로 변경합니다.
저는 1부터 10까지 Observable을 발생시키고 짝수냐 홀수냐에 따라 2개의 그룹으로 나누었습니다.
func testGroupBy() {
let disposeBag = DisposeBag()
Observable<Int>.range(start: 1, count: 10)
.groupBy { $0 % 2 == 0 ? "even" : "odd" }
.subscribe(onNext: { print("\($0.key) : \($0.asObservable())")})
.disposed(by: disposeBag)
}
odd : RxSwift.(unknown context at $10d81fa88).GroupedObservableImpl<Swift.Int>
even : RxSwift.(unknown context at $10d81fa88).GroupedObservableImpl<Swift.Int>
key와 value를 출력하니, key로 함께 묶인 GroupedObservable 이 나오는 군요! 그리고 총 10개의 항목이 배출되었지만, 그룹화되어 2개의 항목으로 묶여서 나왔습니다.
이렇게 된 상태에서는 내부의 값을 알아보기가 어렵습니다. 그럼 어떻게 해야할까요?
key와 함께 해당하는 값을 함께 보고 싶다면~ 10번 발생했다면 10번 모두 대응하고 싶다면~~~~ 앞에서 썼던 .flatMap
을 활용합시다.
.flatMap
은 Observable<Observable<Int>>
로 되어있다면 얘를 Observable<Int>
로 평평~하게 한번 감싸진 것을 풀어내줍니다.
func testGroupBy() {
let disposeBag = DisposeBag()
Observable<Int>.range(start: 1, count: 10)
.groupBy { $0 % 2 == 0 ? "even" : "odd" }
.flatMap({ group -> Observable<String> in
return group.asObservable()
.map { "\(group.key) : \($0)" }
})
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
odd : 1
even : 2
odd : 3
even : 4
odd : 5
even : 6
odd : 7
even : 8
odd : 9
even : 10
우왕! ㅋ_ㅋ 원하는 결과가 나왔네요.
Observable에서 Observable window으로 항목을 주기적으로 세분하고 한 번에 하나씩 항목을 내보내는 대신 window를 내보냅니다. Window는 Buffer와 유사하지만 Observable 소스에서 항목의 패킷을 내보내는 대신 Observable Sequence로 변경하여 내보냅니다.
func testWindow() {
let disposeBag = DisposeBag()
Observable<Int>.range(start: 1, count: 10)
.window(timeSpan: .seconds(2), count: 3, scheduler: SerialDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
RxSwift.AddRef<Swift.Int> // 1, 2, 3
RxSwift.AddRef<Swift.Int> // 4, 5, 6
RxSwift.AddRef<Swift.Int> // 7, 8, 9
RxSwift.AddRef<Swift.Int> // 10, 11, 12
얘를 다시 풀어내고 싶다면? .flatMap
을 사용하면 되겠죠!
func testWindow() {
let disposeBag = DisposeBag()
Observable<Int>.range(start: 1, count: 10)
.window(timeSpan: .seconds(2), count: 3, scheduler: SerialDispatchQueueScheduler(qos: .background))
.flatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
1
2
3
4
5
6
7
8
9
10
어떤 결과를 원하느냐에 따라 buffer
와 window
를 선택해서 사용해볼 수 있겠네요.
Subject는 관찰자와 Observable 모두에서 사용할 수 있는 ReactiveX의 일종의 브리지 또는 프록시입니다. 관찰자이기 때문에 하나 이상의 Observable을 구독 할 수 있으며 Observable이기 때문에 관찰하는 항목을 다시 내보낼 수 있고 새로운 항목을 내보낼 수도 있습니다.
Subject가 Observable을 구독하면 Observable이 항목을 내보내기 시작합니다.(Observable이 “차가운”경우 - 즉 항목을 보내기 전에 구독을 기다리는 경우). 이것은 결과 Subject가 원래의 “cold” Observable를 “hot” Observable로 만드는 효과를 가질 수 있습니다.
각각의 use case에 맞게 설계된 4가지 Subject가 있습니다. 이들 중 일부는 모든 구현에서 사용할 수 있는 것은 아니며 일부 구현은 다른 명명 규칙을 사용합니다.
AsyncSubject
는 complete 이벤트가 발생한 다음 마지막 한 개의 항목만을 배출합니다. 만약 항목이 들어오지 않았다면 항목을 배출하지 않고 종료됩니다.
만약 오류때문에 Observable이 종료된다면 어떠한 항목도 배출하지 않고 error 이벤트를 전달합니다.
func testForAsyncSubject() {
let disposeBag = DisposeBag()
let asynSubject = AsyncSubject<String>()
asynSubject.onNext("Is Anyone Else?")
asynSubject.subscribe(onNext: { str in
print("observer 1 : \(str)")
}, onError: { error in
print("observer 1 error : \(error)")
}, onCompleted: {
print("observer 1 complete ")
}).disposed(by: disposeBag)
asynSubject.onNext("Hi")
asynSubject.onNext("Observers")
asynSubject.subscribe(onNext: { str in
print("observer 2 : \(str)")
}, onError: { error in
print("observer 2 error : \(error)")
}, onCompleted: {
print("observer 1 complete ")
}).disposed(by: disposeBag)
asynSubject.onNext("nice to")
asynSubject.onNext("meet you guys.")
asynSubject.onError(TestError.test)
asynSubject.onCompleted()
}
observer 1 error : test
observer 2 error : test
BehaviorSubject
를 구독하면, 가장 최근 항목을 배출하며 계속해서 항목이 발생할 때마다 배출합니다.
마찬가지로 오류로 인해 Observable이 종료되면 오류를 전달합니다. 오류가 발생된 이후 이 Subject를 구독하면 기본값이 아니라 오류가 전달됩니다.
func testForBehaviorSubject() {
let disposeBag = DisposeBag()
let behaviorSubject = BehaviorSubject<String>(value: "Is Anyone Else?")
behaviorSubject.subscribe(onNext: { str in
print("observer 1 : \(str)")
}, onError: { error in
print("observer 1 error : \(error)")
}, onCompleted: {
print("observer 1 complete ")
}).disposed(by: disposeBag)
behaviorSubject.onNext("Hi")
behaviorSubject.onNext("Observers")
behaviorSubject.subscribe(onNext: { str in
print("observer 2 : \(str)")
}, onError: { error in
print("observer 2 error : \(error)")
}, onCompleted: {
print("observer 1 complete ")
}).disposed(by: disposeBag)
behaviorSubject.onNext("nice to")
behaviorSubject.onNext("meet you guys.")
behaviorSubject.onError(TestError.test)
behaviorSubject.onCompleted()
}
observer 1 : Is Anyone Else?
observer 1 : Hi
observer 1 : Observers
observer 2 : Observers
observer 1 : nice to
observer 2 : nice to
observer 1 : meet you guys.
observer 2 : meet you guys.
observer 1 error : test
observer 2 error : test
PublishSubject
는 subscribe를 시작한 시점을 기준부터 발생한 항목들을 전달합니다. PublicSubject
는 hot Observable로 생성 즉시 아이템을 방출하기 시작할 수 있으며, Subject가 생성되고 관찰자가 publish를 시작하는 사이에 하나 이상의 항목이 손실될 수도 있습니다. Observable에서 모든 항목을 전달하는 것이 보장되어야 할경우 “cold” Observable 동작을 수동으로 다시 시작할 수 있도록 Create
를 통해서 Observable을 구성하거나, ReplqySubject
를 사용하세요.
Observable이 에러로 종료되었으면, PublishSubject
또한 오류 이벤트를 전달합니다.
func testForPublishSubject() {
let disposeBag = DisposeBag()
let publishSubject = PublishSubject<String>()
publishSubject.onNext("Is Anyone Else?")
publishSubject.subscribe(onNext: { str in
print("observer 1 : \(str)")
}, onError: { error in
print("observer 1 error : \(error)")
}, onCompleted: {
print("observer 1 complete ")
}).disposed(by: disposeBag)
publishSubject.onNext("Hi")
publishSubject.onNext("Observers")
publishSubject.subscribe(onNext: { str in
print("observer 2 : \(str)")
}, onError: { error in
print("observer 2 error : \(error)")
}, onCompleted: {
print("observer 1 complete ")
}).disposed(by: disposeBag)
publishSubject.onNext("nice to")
publishSubject.onNext("meet you guys.")
publishSubject.onError(TestError.test)
publishSubject.onCompleted()
}
observer 1 : Hi
observer 1 : Observers
observer 1 : nice to
observer 2 : nice to
observer 1 : meet you guys.
observer 2 : meet you guys.
observer 1 error : test
observer 2 error : test
observer가 붙기 전에 발행된 “Is Anyone Else?”는 관찰자가 없기 때문에 아무도 처리하지 않았습니다. 이후에 observer 1과 observer 2 가 구독을 시작했고, 시작한 시점으로부터 발행된 이벤트들만 처리하게 됩니다.
ReplaySubject
는 관찰자가 구독하는 시기와 관계없이 Observable이 생성한 모든 항목을 관찰자에게 내보냅니다. 또한 ReplaySubject 버전은 재생 버퍼가 일정 크기 이상으로 커질 위험이 있거나 이전에 항목이 배출된 이후 지정된 시간이 경과하면 오래된 항목을 폐기합니다.
ReplaySubject
를 사용할경우, onNext(:)
를 여러 메서드에서 호출하지 않도록 주의하세요. 비순차적인 호출로 인해 Observable
이 손상되고 결과적으로 어떤 항목이나 notification이 먼저 재생되어야 하는지에 대해 모호함이 발생할 수 있습니다.
func testForReplaySubject() {
let disposeBag = DisposeBag()
let replaySubject = ReplaySubject<String>.create(bufferSize: 2)
replaySubject.onNext("Is Anyone Else?")
replaySubject.subscribe(onNext: { str in
print("observer 1 : \(str)")
}, onError: { error in
print("observer 1 error : \(error)")
}, onCompleted: {
print("observer 1 complete ")
}).disposed(by: disposeBag)
replaySubject.onNext("Hi")
replaySubject.onNext("Observers")
replaySubject.subscribe(onNext: { str in
print("observer 2 : \(str)")
}, onError: { error in
print("observer 2 error : \(error)")
}, onCompleted: {
print("observer 2 complete ")
}).disposed(by: disposeBag)
replaySubject.onNext("nice to")
replaySubject.onNext("meet you guys.")
replaySubject.onError(TestError.test)
replaySubject.subscribe(onNext: { str in
print("observer 3 : \(str)")
}, onError: { error in
print("observer 3 error : \(error)")
}, onCompleted: {
print("observer 3 complete ")
}).disposed(by: disposeBag)
replaySubject.dispose()
replaySubject.subscribe(onNext: { str in
print("observer 4 : \(str)")
}, onError: { error in
print("observer 4 error : \(error)")
}, onCompleted: {
print("observer 4 complete ")
}).disposed(by: disposeBag)
}
observer 1 : Is Anyone Else?
observer 1 : Hi
observer 1 : Observers
observer 2 : Hi
observer 2 : Observers
observer 1 : nice to
observer 2 : nice to
observer 1 : meet you guys.
observer 2 : meet you guys.
observer 1 error : test
observer 2 error : test
observer 3 : nice to
observer 3 : meet you guys.
observer 3 error : test
observer 4 error : Object `RxSwift.(unknown context at $10e4f9250).ReplayMany<Swift.String>` was already disposed.
버퍼가 있기 때문에 observer 1가 구독을 시작하기 전에 발생한 “Is Anyone Else” 또한 함께 전달됩니다. 마찬가지로 observer 2가 구독되었을 때 이전에 발생한 2개의 이벤트 또한 전달받습니다. observer 3는 에러가 발생한 다음 구독을 시작하는데요, 버퍼에 error는 포함되지 않은 채로 이전 이벤트 2개와 error 이벤트를 전달받습니다. observer 4는 replay subject가 dispose 된 다음 구독을 시작합니다. 이 때는 어떠한 이벤트도 전달받지 못하고 이미 disponse된 객체라고 알려줍니다.
onError
이벤트가 발생하면, 모든 Observable의 항목이 배출된 다음 마지막에 onError
이벤트를 배출합니다. RxSwift에는 없습니다!concat
은 여러개의 Observable을 이어 붙입니다. 여러 개의 Observable이 동시에 항목을 배출하더라도, 앞선 Observable의 항목이 모두 배출된 다음에 다음 Observable을 subscribe
하여 항목을 배출합니다. 그렇기 때문에 concat에 “Hot Observable”을 붙이게 되면 앞 선 Observable이 끝나는 시점에 따라 배출받는 항목이 달라질 수 있습니다.
앞서 사용했던 Player
구조체를 사용했습니다. 플레이어의 score
는 BehaviorSubject
이며 두 플레이어를 만들어서 score
를 merge
해보죠.
func testMerge() {
let disposeBag = DisposeBag()
let 👦🏻 = Player(score: 80)
let 👧🏼 = Player(score: 90)
Observable.of(👦🏻.score, 👧🏼.score)
.merge()
.subscribe(onNext: {
print("merge : \($0)")
}, onError: { error in
print("merge error")
})
.disposed(by: disposeBag)
👦🏻.score.onNext(85)
👦🏻.score.onNext(95)
👧🏼.score.onNext(100)
👦🏻.score.onError(TestError.test)
👧🏼.score.onNext(86)
👦🏻.score.onNext(88)
}
merge : 80 // 👦🏻 = Player(score: 80)
merge : 90 // 👧🏼 = Player(score: 90)
merge : 85 // 👦🏻.score.onNext(85)
merge : 95 // 👦🏻.score.onNext(95)
merge : 100 // 👧🏼.score.onNext(100)
merge error // 👦🏻.score.onError(TestError.test)
BehaviorSubject
이기 때문에 merge
되는 순간 기본값인 80 과 90 이 내보내졌고, 이후 어느 플레이어든 상관없이 배출 순서에 맞게 그대로 배출됩니다. onError
이벤트가 발생했을 경우 모든 Observable이 dispose 됩니다.
같은 코드를 사용해서, merge
를 concat
으로 변경해보겠습니다.
concat : 80 // 👦🏻 = Player(score: 80)
concat : 85 // 👦🏻.score.onNext(85)
concat : 95 // 👦🏻.score.onNext(95)
concat error // 👦🏻.score.onError(TestError.test)
concat
은 Observable이 끝나는 시점에 다음 Observable을 구독합니다. 하지만 첫 번째 Observable에서 onError
발생하고 dispose 되버려서, 두 번째 플레이어에 대한 스코어정보는 알수가 없군요. 에러 이벤트를 발생시키지 않는다면, 다음과 같은 결과가 나옵니다.
concat : 80 // 👦🏻 = Player(score: 80)
concat : 85 // 👦🏻.score.onNext(85)
concat : 95 // 👦🏻.score.onNext(95)
concat : 88 // 👦🏻.score.onNext(88)
concat : 90 // 👧🏼 = Player(score: 90)
concat : 100 // 👧🏼.score.onNext(100)
concat : 86 // 👧🏼.score.onNext(86)
한 번에 merge 할 수 있는 Observable의 개수를 제한합니다. 세 번째 player인 👦🏻👧🏼를 만들고, maxConcurrent를 2개로 제한한다면 어떻게 될까요?
func testMergeMaxConcurrent() {
let disposeBag = DisposeBag()
let 👦🏻 = Player(score: 80)
let 👧🏼 = Player(score: 90)
let 👦🏻👧🏼 = Player(score: 40)
Observable.of(👦🏻.score, 👧🏼.score, 👦🏻👧🏼.score)
.merge(maxConcurrent: 2)
.subscribe(onNext: {
print("merge : \($0)")
}, onError: { error in
print("merge error")
})
.disposed(by: disposeBag)
👦🏻.score.onNext(85)
👦🏻👧🏼.score.onNext(55)
👧🏼.score.onNext(100)
👦🏻.score.onCompleted()
👦🏻👧🏼.score.onNext(60)
👧🏼.score.onNext(86)
}
결과는 다음과 같습니다. 2개의 Observable만 merge하도록 허용했기 때문에, 👦🏻👧🏼.score
의 event는 👦🏻.score.onCompleted()
가 호출되어서 첫 번째 Observable이 종료된 다음에 배출됩니다.
merge : 80 // 👦🏻 = Player(score: 80)
merge : 90 // 👧🏼 = Player(score: 90)
merge : 85 // 👦🏻.score.onNext(85)
merge : 100 // 👧🏼.score.onNext(100)
merge : 40 // 👦🏻👧🏼 = Player(score: 40)
merge : 55 // 👦🏻👧🏼.score.onNext(55)
merge : 86 // 👧🏼.score.onNext(86)
여러가지 Observable의 항목들을 특정 함수로 묶어서 처리하고 그 결과를 내보냅니다.
Zip
메서드는 두 개의 혹은 그 이상의 다른 observable에 의해 차례대로 방출된 항목의 조합을 특정 함수를 거쳐 만들어진 Observable을 반환합니다. 이 함수는 Observable에서 방출된 순서를 정확하게 지켜서 처리됩니다.
각 Observables 마다 항목이 모두 방출되었을 때 그 항목들을 조합해서 새로운 항목을 만들어 내기 때문에, 일부 Observable들만이 항목을 방출했다고 해서 새 항목을 방출하지는 않습니다.
interval
을 사용해서 1초마다 하나씩 항목을 emit 하게 하는 Observable을 생성하고, PublishSubject
로 새로운 참가자를 발행하는 Subject를 함께 zip
해봅시다.
func testZip() {
let disposeBag = DisposeBag()
let scheduler = SerialDispatchQueueScheduler(qos: .background)
let interval = Observable<Int>.interval(.seconds(1), scheduler: scheduler)
let playerSubject = PublishSubject<String>()
interval.subscribe(onNext: { print("interval emit : \($0)")})
.disposed(by: disposeBag)
Thread.sleep(forTimeInterval: 1)
Observable.zip(interval, playerSubject) { index, player in
"\(index) 번째 참가자 \(player)"
}.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
playerSubject.onNext("김주희")
playerSubject.onNext("이하늘")
playerSubject.onNext("박민지")
Thread.sleep(forTimeInterval: 5)
}
interval emit : 0
interval emit : 1
0 번째 참가자 김주희
interval emit : 2
1 번째 참가자 이하늘
interval emit : 3
2 번째 참가자 박민지
interval emit : 4
이 함수의 쓰레드는 총 6초동안 살아있는데요, 6초가 되는 순간 종료되면서 disposeBag에 의해 dispose 되므로 interval
또한 종료됩니다.
그래서 interval
이 배출하는 항목은 0-4 까지 5개의 항목을 내보내게 됩니다.
zip
은 방출된 순서에 맞추어서 두 개의 Observable을 결합하는 것입니다.
그렇기 때문에 interval
이 1까지 나온 상태에서 playerSubject
와 zip
을 시작해도 interval
의 첫 번째 항목과 playerSubject
의 첫 번째 항목이 결합되어 나옵니다.
이후로 playerSubject
에서 새로운 항목이 배출되면 interval
의 다음 항목과 결합되어 나오고, interval
의 항목이 더 많이 배출되어도 playerSubject
와 매칭되는 항목이 없으면 새로운 항목을 배출하지 않습니다.
두 Observable 중 하나가 항목을 방출 할 때 지정된 함수를 통해 Observable이 방출했던 최신 항목을 결합하고 이 함수의 결과에 따라 변형된 항목을 내 보냅니다.
CombineLatest
연산자는 Zip
과 비슷한 방식으로 작동하지만 Zip
은 각 압축 된 소스 Observable이 이전에 압축 해제 된 항목을 내보낼 때만 항목을 내보내는 반면 CombineLatest는 소스 Observable 중 하나가 항목을 내보낼 때마다 항목을 내보냅니다. Observables의 출처가 적어도 하나의 항목을 방출했다). Source Observables가 항목을 방출 할 때, CombineLatest는 제공 한 함수를 사용하여 다른 소스 Observables 각각에서 가장 최근에 방출 된 항목을 결합하고 해당 함수의 반환 값을 방출합니다.
위의 zip을 조금 변형했습니다. 0.4초 주기의 interval
이 있고, 0.6초 마다 플레이어 정보를 내보내는 player
를 가지고 combineLatest
를 사용해봤습니다.
func testCombineLatest() {
let disposeBag = DisposeBag()
let scheduler = SerialDispatchQueueScheduler(qos: .default)
let interval = Observable<Int>.interval(.seconds(1), scheduler: scheduler)
let playerInterval = Observable<Int>.interval(.seconds(2), scheduler: scheduler)
let players = Observable<String>.from(["김주희","이하늘","박민지"])
let player = Observable.zip(playerInterval, players) { $1 }
Observable.combineLatest(interval, player) { index, player in
"interval > \(index) / player > \(player)"
}.subscribe(onNext: {
print(Date())
print($0) })
.disposed(by: disposeBag)
Thread.sleep(forTimeInterval: 6)
}
결과는 다음과 같네요.
interval > 1 / player > 김주희 // 0
interval > 2 / player > 김주희 // 1
interval > 3 / player > 김주희 // 2 동시에 들어와도 앞의 Observable 항목(interval 3)을 먼저 배출합니다.
interval > 3 / player > 이하늘 // 2
interval > 4 / player > 이하늘 // 3
interval > 5 / player > 이하늘 // 4
interval > 5 / player > 박민지 // 4
interval > 6 / player > 박민지 // 5
Observable에서 동시에 이벤트가 발생해도 Observable이 선언된 순서대로 이벤트를 발생시킵니다. 저는 6초동안 6개의 이벤트가 발생할거라고 생각했는데, 8개의 이벤트가 발생했네요! 왜 그럴까요?
2초에는 interval
에서도 이벤트가 발생하고 player
에서도 이벤트가 발생합니다. 하지만 combineLatest
에서 interval
이 player
의 앞에 선언되었기 때문에 interval
의 새로운 항목인 ‘3’과 기존 player
의 마지막 항목인 ‘김주희’를 결합시키고, 그 다음 interval
의 기존 항목인 ‘3’과 player
의 새로운 항목 이하늘
을 결합합니다.
withLatestFrom
은 combineLatest
와는 조금 다릅니다. combineLatest
는 다수의 Observable에서 이벤트가 발생될 때마다 새로운 이벤트를 만들어내지만, withLatestFrom
은 주요 Observable이 있습니다. 주 Observavble에 새로운 이벤트가 발생했을 때, 다른 Observable의 마지막 항목을 가져다 결합하는 형태입니다.
그래서 combineLatest
는 Observable의 static func
으로 다음과 같이 결합할 source Observable을 받지만,
static func combineLatest<O1, O2>(_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.Element, O2.Element) throws -> String) -> Observable<String> where O1 : ObservableType, O2 : ObservableType
withLatestFrom
은 Observable의 instance func
으로 이 Observable에 결합할 second Observable을 받습니다.
func withLatestFrom<Source, ResultType>(_ second: Source, resultSelector: @escaping (Int, Source.Element) throws -> ResultType) -> Observable<ResultType> where Source : ObservableConvertibleType
예를 들어서, 한 식당에 웨이팅하는 사람들이 있다고 합시다. 1.8초마다 사람이 들어갈 수 있지만 그 때 사람이 없을 경우에는 지나가며, 사람들은 1초만 기다립니다.
func testWithLatestFrom() {
let disposeBag = DisposeBag()
let scheduler = SerialDispatchQueueScheduler(qos: .default)
let enterInterval = Observable<Int>.interval(.milliseconds(1800), scheduler: scheduler)
let waitingInterval = Observable<Int>
.interval(.seconds(1), scheduler: scheduler)
let waitingList = Observable<String>.from(["김주희","이하늘","박민지", "공주연", "이상민"])
let waitings = Observable.zip(waitingInterval, waitingList) { $1 }
enterInterval.withLatestFrom(waitings, resultSelector: { $1 })
.subscribe(onNext: {
print($0) })
.disposed(by: disposeBag)
Thread.sleep(forTimeInterval: 6)
}
이걸 좀 더 쉽게 도식화 해봅시다. (X 식당에 자리가 났을 때 발생되는 이벤트입니다.)
X X X
0———1———2———3———4———5——–6
김주희 이하늘 박민지 공주연 이상민
첫 번째로 X가 발생했을 때 기다리고 있던 사람은 김주희였고, 두 번째로 X가 발생했을 때 기다리고 있던 사람은 박민지였습니다. 이하늘은 1초만 기다리고 자리를 떠났기 때문에 다음 사람으로 넘어갔습니다.
그리고 마찬가지로 공주연 또한 지나가고, 마지막으로 이상민이 호출됩니다.
결과를 보시죠!
김주희
박민지
이상민
Observable이 새로운 항목을 내보내기 전에 미리 지정된 항목을 내보냅니다.
이전 operator 보다는 비교적 간단하네요. 새로운 항목을 내보낼 기 전 최초의 항목을 추가해주는 연산자입니다!
func testStartWith() {
let disposeBag = DisposeBag()
Observable.from([1, 2])
.startWith(0)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
0
1
2
Observable Sequence를 항목으로 받으면 이 Sequence의 이벤트들을 내보내고, 다음 Sequence가 항목으로 진입될 경우 이전 Sequence의 마지막 항목을 생략합니다.
func testSwitchLastest() {
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "⚽️")
let subject2 = BehaviorSubject(value: "🍎")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("🏈")
subject1.onNext("🏀")
subjectsSubject.onNext(subject2)
subject1.onNext("⚾️")
subject2.onNext("🍐")
}
⚽️
🏈
🏀
🍎
🍐
패턴과 Plan intermediaries를 통해 2개 이상의 Observable을 결합합니다.
And/Then/When 연산자는 .zip
연산자와 매우 유사하지만 중간 데이터 구조를 사용합니다. 두 개 이상의 Observable을 받아들이고 한 번에 한 set 씩 방출된 각가의 배출물을 Pattern
개체로 결합합니다. 그 다음 Pattern
객체를 조작하여 Plan
단계에서 변형합니다. 이후 이 Plan
객체를 Observable
로 변환합니다.
근데 RxSwift 에서 andThen 은 Completable
에만 사용될 수 있나봐요. 하지만 그 예가 많지 않아서, 좀 더 찾아봐야 할 것 같습니다.
참고하기
Swift는 응용 프로그램의 정확성과 안정성을 향상시키고 Rx를보다 직관적이고 직접적인 경험으로 사용하는 데 사용할 수있는 강력한 유형 시스템을 갖추고 있습니다.
특성은 모든 경계에서 사용할 수있는 원시 Observable과 비교할 때 인터페이스 경계에서 관찰 가능한 시퀀스 속성을 전달하고 보장하며 문맥상의 의미, 구문 설탕을 제공하고보다 구체적인 사용 사례를 타겟팅하는 데 도움이됩니다. 이러한 이유로 특성은 전적으로 선택 사항입니다. 모든 핵심 RxSwift / RxCocoa API가 원시 Observable 시퀀스를 프로그램에서 사용할 수 있습니다.
참고 :이 문서에서 설명하는 일부 특성 (예 : 드라이버)은 RxCocoa 프로젝트에만 국한되며, 일부는 일반 RxSwift 프로젝트의 일부입니다. 그러나 필요한 경우 다른 Rx 구현에서도 동일한 원칙을 쉽게 구현할 수 있습니다.
Traits는 단일 읽기 전용 Observable sequence 속성을 가진 래퍼 구조체입니다.
struct Single<Element> {
let source: Observable<Element>
}
struct Driver<Element> {
let source: Observable<Element>
}
...
Observable sequence에 대한 빌더 패턴 구현의 일종이라고 생각할 수도 있습니다. Trait이 만들어지면, .asObservable()
이 이를 vanila??? observable sequence로 변환합니다.
Single은 여러개의 항목 시리지를 배출하는 대신 항상 단일 항목 혹은 오류를 배출한다고 보장되는 Observable입니다.
Single은 Observable을 생성하는 것과 매우 유사합니다. Single<T>.create()
func getRepo(_ repo: String) -> Single<[String: Any]> {
return Single<[String: Any]>.create { single in
let task = URLSession.shared.dataTask(with: URL(string: "https://api.github.com/repos/\(repo)")!) { data, _, error in
if let error = error {
single(.error(error))
return
}
guard let data = data,
let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
let result = json as? [String: Any] else {
single(.error(DataError.cantParseJSON))
return
}
single(.success(result))
}
task.resume()
return Disposables.create { task.cancel() }
}
}
이렇게 만들어진 건 다음과 같이 구독하여 쓸 수 있습니다.
getRepo("ReactiveX/RxSwift")
.subscribe { event in
switch event {
case .success(let json):
print("JSON: ", json)
case .error(let error):
print("Error: ", error)
}
}
.disposed(by: disposeBag)
혹은 subscrie(onSuccess:onErrore:)
를 사용할 수 있습니다.
getRepo("ReactiveX/RxSwift")
.subscribe(onSuccess: { json in
print("JSON: ", json)
},
onError: { error in
print("Error: ", error)
})
.disposed(by: disposeBag)
구독을 하게 되면 Single 항목을 포함한 .sucess 혹은 .error 인 열거형 타입 SingleEvent
를 제공해줍니다. 첫 번째 이벤트가 배출되면 더 이상 이벤트가 발생되지 않습니다.
또한 기본 Observable sequence에서 .asSingle()
을 통해 Single로 변환시킬 수 있습니다.
Completable은 complete 나 error 만을 배출할 수 있는 다양한 Observable 들을 말합니다. 어떠한 항목도 배출되지 않는 다는 것을 보장합니다.
Observable<Void>
와 비교해볼 수 있습니다.
Completable을 Observable 처럼 생성할 수 있습니다.
func cacheLocally() -> Completable {
return Completable.create { completable in
// Store some data locally
...
guard success else {
completable(.error(CacheError.failedCaching))
return Disposables.create {}
}
completable(.completed)
return Disposables.create {}
}
}
그리고 다음과 같은 방식으로 사용할 수 있습니다.
cacheLocally()
.subscribe { completable in
switch completable {
case .completed:
print("Completed with no error")
case .error(let error):
print("Completed with an error: \(error.localizedDescription)")
}
}
.disposed(by: disposeBag)
혹은 subscribe(onCompleted:onError:)
을 사용할 수 있습니다.
cacheLocally()
.subscribe(onCompleted: {
print("Completed with no error")
},
onError: { error in
print("Completed with an error: \(error.localizedDescription)")
})
.disposed(by: disposeBag)
구독하게 되면 열거형 타입인 CompletableEvent
을 제공합니다. 이 열거형은 .compeleted 혹은 .error 입니다. 첫 번째 이벤트를 배출하고 나서 더 이상 이벤트가 발생하지 않습니다.
Maybe는 Single과 Completable 사이에 있는 Observable의 변형입니다. 단일 항목를 내보낼 수도 있고, 항목을 내보내지 않고 완료되거나 오류가 발생할 수도 있습니다. 세 가지 이벤트가 하나라도 발생한다면 그 이후에 어떠한 이벤트도 발생되지 않습니다.
func generateString() -> Maybe<String> {
return Maybe<String>.create { maybe in
maybe(.success("RxSwift"))
// OR
maybe(.completed)
// OR
maybe(.error(error))
return Disposables.create {}
}
}
사용하기
generateString()
.subscribe { maybe in
switch maybe {
case .success(let element):
print("Completed with element \(element)")
case .completed:
print("Completed with no element")
case .error(let error):
print("Completed with an error \(error.localizedDescription)")
}
}
.disposed(by: disposeBag)
subscribe(onSuccess:onError:onCompleted:)
사용하기
generateString()
.subscribe(onSuccess: { element in
print("Completed with element \(element)")
},
onError: { error in
print("Completed with an error \(error.localizedDescription)")
},
onCompleted: {
print("Completed with no element")
})
.disposed(by: disposeBag)
일반 Observable sequence에서 .asMaybe()
를 사용하여 Maybe
로 전환할 수 있습니다.
이래저래 파고들다 보니 SingleTrait
이란 걸 발견했습니다.
Single
의 정의를 타고 들어가 보면 Single.swift
파일에서 다음과 같은 걸 볼 수 있습니다.
/// Sequence containing exactly 1 element
public enum SingleTrait { }
/// Represents a push style sequence containing 1 element.
public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
public enum SingleEvent<Element> {
/// One and only sequence element is produced. (underlying observable sequence emits: `.next(Element)`, `.completed`)
case success(Element)
/// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)
case error(Swift.Error)
}
extension PrimitiveSequenceType where Trait == SingleTrait {
public typealias SingleObserver = (SingleEvent<Element>) -> Void
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<Element> {
let source = Observable<Element>.create { observer in
...
}
}
Single
은 PrimitiveSequence<SingleTrait, Element>
의 typealias
였군요! SingleTrait은 enum 입니다.
PrimitiveSequence
: Observable sequences containing 0 or 1 element.
0개 혹은 1개의 항목을 포함하는 Observable Sequence를 의미합니다. 앞서 나왔던 Single
, Completable
, Maybe
를 포괄하는 의미가 되겠네요. 마찬가지로 다른 Trait들의 정의를 살펴보다도 유사한 구조를 가지는 것을 알 수 있습니다.
/// Sequence containing 0 elements
public enum CompletableTrait { }
/// Represents a push style sequence containing 0 elements.
public typealias Completable = PrimitiveSequence<CompletableTrait, Swift.Never>
/// Sequence containing 0 or 1 elements
public enum MaybeTrait { }
/// Represents a push style sequence containing 0 or 1 element.
public typealias Maybe<Element> = PrimitiveSequence<MaybeTrait, Element>
왜 이런 구조를 가져가야했을까요? 잘 모르겠으니까 기억해둡시다!!! 추후에 더 찾아보는 걸로 Completable+AndThen
RxCocoa traits 여기 안봤음
구독 후 Observable Sequence 항목 생성을 취소하고 resource를 반환하려면, dispose
를 호출하세요.
Sequence가 유한 시간 내에 종료되면, dispose를 호출하지 않거나 disposeBag으로 disposed를 사용하지 않아도 resource leak이 발생하지 않습니다. 하지만 이 resource들은 요소의 생성이 완료되거나 오류를 반환하여 시퀀스가 완료될 때까지 사용됩니다.
만약 Sequence가 스스로 종료되지 않는다면 disposeBag
, takeUntil
연산자를 사용하거나 다른 방식으로 dispose
가 수동으로 호출되지 않는 한 리소스가 영구적으로 할당됩니다.
dispose bag이나 takeUntil
연산자를 사용하면 자원을 정리할 수 있습니다. Sequence가 유한시간 안에 종료되더라도 사용하는 게 좋습니다.
enum Event<Element> {
case next(Element) // next element of a sequence
case error(Error) // sequence failed with error
case completed // sequence terminated successfully
}
만약 error가 generic type이라면 어떨까요?
Observable<String, E1>
and Observable<String, E2>
가 있다면, 각각의 Observable이 만들어내는 에러 타입이 무엇인지 알아내야 합니다. 이는 composition 속성을 해치며, Rx는 왜 Sequence가 실패했는지에는 관심이 없으며(ㅋㅋ), 실패한 결과를 observable chain 하위로 전달하게 됩니다.
어음 느낌적인 느낌으로 굳이 Generic 타입의 Error를 사용할 필요가 없다-는 건 알겠는데, (이미 Swift에서 정의한 Error protocol을 사용하는 것으로 충분하기 때문에) 글의 의미를 이해를 못하겠음.
observable sequence를 종료시키는 방법입니다. dispose
를 구독에 호출하면 기존에 할당된 모든 항목과 앞으로의 항목들에 대한 resource가 모두 해제되며 sequence가 종료됩니다.
interval
연산자와 관련된 예가 있습니다. 300 milliseconds마다 항목이 발생되며, 2초간 이 쓰레드를 유지해봅시다.
let scheduler = SerialDispatchQueueScheduler(qos: .default)
let subscription = Observable<Int>.interval(.milliseconds(300), scheduler: scheduler)
.subscribe { event in
print(event)
}
Thread.sleep(forTimeInterval: 2.0)
subscription.dispose()
SerialDispatchQueueScheduler를 사용하여 현제 쓰레드와 별도의 쓰레드를 사용하는 Observable을 만들었습니다. 그렇기 때문에 현재 쓰레드가 종료되어 버리면 Observable Sequece의 쓰레드가 아무리 항목을 만들어도 받을 수 없습니다.(구독중인 쓰레드가 종료되었으니까) 그래서
Thread.sleep(forTimeInterval: 2.0)
를 통해 현재 쓰레드를 2초간 유지해줍니다. 그럼 300 milliseconds마다 항목이 배출되니까, 다음과 같은 결과가 나오겠죠!0 1 2 3 4 5
dispose
를 호출하고 싶지 않다면, 다른 방법이 있습니다!DisposeBag
을 사용하거나takeUntil
연산자를 사용하세요!
그래서 위의 코드에서는 dispose
가 호출된 다음 뭔가가 print 될까요? 정답은 : 때때로 다릅니다. XD
scheduler
가 serial scheduler라면 (MainScheduler
라던가) dispose
가 같은 serial scheduler에서 호출됩니다. 그래서 더 이상 print 되지 않죠. let serial = SerialDispatchQueueScheduler(qos: .default)
let subscription = Observable<Int>.interval(.milliseconds(100), scheduler: serial)
.subscribe { event in
print("serial: \(event)")
}
let concurrent: SchedulerType = ConcurrentDispatchQueueScheduler(qos: .default)
let subscription2 = Observable<Int>.interval(.milliseconds(100), scheduler: concurrent)
.subscribe { event in
print("concurrent: \(event)")
}
Thread.sleep(forTimeInterval: 3.0)
print("ended")
subscription.dispose()
subscription2.dispose()
serial: next(0)
concurrent: next(0)
serial: next(1)
concurrent: next(1)
...
serial: next(28)
concurrent: next(28)
serial: next(29)
ended
concurrent: next(29)
Dispose bag은 Rx에서 ARC와 같은 동작을 합니다. DisposeBag
이 dealloc되는 시점에, DisposeBag에 포함된 모든 disposable
들에 대해 dispose
가 호출됩니다.
그렇기 때문에 Dispose bag에는 dispose
메서드가 없습니다. 만약 Dispose bag의 모든 항목을 정리해야한다면, 새로운 DisposeBag을 만들면 됩니다.
self.disposeBag = DisposeBag()
그러면 이전 DisposeBag이 dealloc되면서 담겨있던 항목들이 dispose()
가 호출됩니다.
만약 명시적으로 dispose를 처리하고 싶다면 CompositeDisposable
을 사용하세요.
다른 방법으로는 dealloc될 때 takeUntil
을 사용해서 자동으로 dispose 되도록 하는 방법이 있습니다.
sequence
.takeUntil(self.rx.deallocated)
.subscribe {
print($0)
}