Tunko Development Diary

RxSwift) sharing Subscription Operator (multicast, publish, replay) 본문

Development/RxSwift

RxSwift) sharing Subscription Operator (multicast, publish, replay)

Tunko 2022. 6. 8. 04:02

multicast

multicast 연산자는 하나의 옵저버블을 공유할떄 사용하는 연산자입니다. 하지만 서브젝트를 따로 생성하고 이벤트가 방출되길 원하는 시점에 .connect() 를 호출해 주어야 합니다.

함수 원형

public func multicast<Subject: SubjectType>(_ subject: Subject)
    -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
    ConnectableObservableAdapter(source: self.asObservable(), makeSubject: { subject })
}

멀티캐스트 연산자는 서브젝트를 전달받습니다.

원본 옵저버블에서 발생하는 이벤트는 구독자로 전달되는게 아니라 전달한 서브젝트로 이벤트가 전달됩니다.

그리고 서브젝트는 이벤트를 등록된 다수의 구독자에게 전달합니다.

multicast연산자는 특별한 옵저버블 형태인 ConnectableObservable를 리턴합니다.

일반적인 옵저버블은 구독하게 되면 새로운 시퀀스가 생성되면서 이벤트를 방출합니다.

하지만 ConnectableObservable 옵저버블은 구독자가 추가되어도 시퀀스가 시작되지 않습니다 즉, 이벤트 방출이 이루어지지 않습니다.

ConnectableObservable은 connect 되는 시점에 시퀀스가 시작됩니다.

따라서 전달한 서브젝트를 모두가 구독한다음에 동시에 이벤트를 보낼수 있습니다.

먼저 multicast 사용전의 코드와 출력결과를 보겠습니다.

let disposeBag = DisposeBag()
let subject = PublishSubject<Int>()

let oneSecondObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(3)

oneSecondObservable
    .subscribe { print("🍎 : ", $0) }
    .disposed(by: disposeBag)

oneSecondObservable
    .delaySubscription(.seconds(1), scheduler: MainScheduler.instance)
    .subscribe { print("🥝 : ", $0) }
    .disposed(by: disposeBag)

출력

🍎 :  next(0)
🍎 :  next(1)
🥝 :  next(0)
🍎 :  next(2)
🍎 :  completed
🥝 :  next(1)
🥝 :  next(2)
🥝 :  completed

출력 결과를 보면 delaySubscription 연산자의 영향으로 두번째 구독은 1초뒤에 하게 됩니다.

따라서 🍎 보다 🥝  가 1 초 지연되서 이벤트가 방출된것을 확인할 수 있습니다.

다음으로 multicast 연산자를 사용해보겠습니다 .

let oneSecondObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .multicast(subject)

oneSecondObservable
    .subscribe { print("🍎 : ", $0) }
    .disposed(by: disposeBag)

oneSecondObservable
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🥝 : ", $0) }
    .disposed(by: disposeBag)

oneSecondObservable.connect()

출력

🍎 :  next(0)
🍎 :  next(1)
🍎 :  next(2)
🥝 :  next(2)
🍎 :  next(3)
🥝 :  next(3)
🍎 :  next(4)
🥝 :  next(4)
🍎 :  completed
🥝 :  completed

출력결과를 확인해보면 구독한 직후가 아닌 connect() 와 동시에 방출된 것을 확인할 수 있습니다.

출력결과를 보면 🥝 구독자는 next이벤트를 2부터 받는것을 확인할 수 있습니다. .multicast를 통해서 이벤트가 시작되는 시퀀스가 동일해 졌기 때문입니다.

 

publish

publish 연선자는 multicast를 적용하고 전달한 PublishSubject를 그대로 리턴합니다. multicast에서 반환 하는 서브젝트가 PublishSubject라고 한다면 따로 Subject를 선언해서 multicast를 사용하는것보다 간편한 Publish 연산자를 사용하는게 합리적입니다.

결과는 위 multicast와 동일합니다.

let disposeBag = DisposeBag()
let oneSecondObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .publish()

oneSecondObservable
    .subscribe { print("🍎 : ", $0) }
    .disposed(by: disposeBag)

oneSecondObservable
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🥝 : ", $0) }
    .disposed(by: disposeBag)

oneSecondObservable.connect()

 

replay

multicast에 전달하는 파라미터 타입이 RepaySubject 라면 replay를 통해 단순화 시킬수 있습니다.

함수원형

public func replay(_ bufferSize: Int)
    -> ConnectableObservable<Element> {
    self.multicast { ReplaySubject.create(bufferSize: bufferSize) }
}

예제

let disposeBag = DisposeBag() 
let oneSecondObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .replay(5)

oneSecondObservable
    .subscribe { print("🍎 : ", $0) }
    .disposed(by: disposeBag)

oneSecondObservable
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🥝 : ", $0) }
    .disposed(by: disposeBag)

oneSecondObservable.connect()
반응형
Comments