주희하세요!

Share

2019-07-14
Juhee Kim

여러 관찰자가 하나의 구독에서만 이벤트를 공유하기 원하면 어떻게 해야 할까요?

정의해야 할 2가지가 있습니다.

  • 새로운 구독자가 관찰을 시작하기 전에 수신된 과거의 이벤트를 처리할 것인가?
    • 최신것만 반복, 전체를 반복, 마지막 n개만 반복
  • 공유 구독을 시작할 시기는 언제인가?
    • refCount, 수동, 혹은 기타 알고리즘

보통은 replay(1).refCount() 을 조합한 share(replay:1)을 사용합니다.

let counter = myInterval(.milliseconds(100))
    .share(replay: 1)

print("Started ----")

let subscription1 = counter
    .subscribe(onNext: { n in
        print("First \(n)")
    })
let subscription2 = counter
    .subscribe(onNext: { n in
        print("Second \(n)")
    })

Thread.sleep(forTimeInterval: 0.5)

subscription1.dispose()

Thread.sleep(forTimeInterval: 0.5)

subscription2.dispose()

print("Ended ----")

결과는 다음과 같습니다.

Started ----
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
First 5
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

SubscribedDisposed 이벤트가 한번씩만 발생하는 것을 볼 수 있습니다.

URL observable을 위해서도 똑같이 동작합니다.

다음은 Rx로 HTTP Request를 보내는 방법입니다. interval 연산자의 패턴과 비슷해보이네요.

extension Reactive where Base: URLSession {
    public func response(request: URLRequest) -> Observable<(response: HTTPURLResponse, data: Data)> {
        return Observable.create { observer in
            let task = self.base.dataTask(with: request) { (data, response, error) in

                guard let response = response, let data = data else {
                    observer.on(.error(error ?? RxCocoaURLError.unknown))
                    return
                }

                guard let httpResponse = response as? HTTPURLResponse else {
                    observer.on(.error(RxCocoaURLError.nonHTTPResponse(response: response)))
                    return
                }

                observer.on(.next((httpResponse, data)))
                observer.on(.completed)
            }

            task.resume()

            return Disposables.create {
                task.cancel()
            }
        }
    }
}

Similar Posts

Comments