여러 관찰자가 하나의 구독에서만 이벤트를 공유하기 원하면 어떻게 해야 할까요?
정의해야 할 2가지가 있습니다.
- 새로운 구독자가 관찰을 시작하기 전에 수신된 과거의 이벤트를 처리할 것인가?
- 최신것만 반복, 전체를 반복, 마지막 n개만 반복
- 공유 구독을 시작할 시기는 언제인가?
- refCount, 수동, 혹은 기타 알고리즘
보통은 replay(1).refCount()
을 조합한 share(replay:1)
을 사용합니다.
let counter = myInterval(.milliseconds(100))
.share(replay: 1)
print("Started ----")
let subscription1 = counter
.subscribe(onNext: { n in
print("First \(n)")
})
let subscription2 = counter
.subscribe(onNext: { n in
print("Second \(n)")
})
Thread.sleep(forTimeInterval: 0.5)
subscription1.dispose()
Thread.sleep(forTimeInterval: 0.5)
subscription2.dispose()
print("Ended ----")
결과는 다음과 같습니다.
Started ----
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
First 5
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----
Subscribed
와 Disposed
이벤트가 한번씩만 발생하는 것을 볼 수 있습니다.
URL observable을 위해서도 똑같이 동작합니다.
다음은 Rx로 HTTP Request를 보내는 방법입니다. interval
연산자의 패턴과 비슷해보이네요.
extension Reactive where Base: URLSession {
public func response(request: URLRequest) -> Observable<(response: HTTPURLResponse, data: Data)> {
return Observable.create { observer in
let task = self.base.dataTask(with: request) { (data, response, error) in
guard let response = response, let data = data else {
observer.on(.error(error ?? RxCocoaURLError.unknown))
return
}
guard let httpResponse = response as? HTTPURLResponse else {
observer.on(.error(RxCocoaURLError.nonHTTPResponse(response: response)))
return
}
observer.on(.next((httpResponse, data)))
observer.on(.completed)
}
task.resume()
return Disposables.create {
task.cancel()
}
}
}
}