Tunko Development Diary

RxSwift) sharing Subscription Operator (share) 본문

Development/RxSwift

RxSwift) sharing Subscription Operator (share)

Tunko 2022. 6. 8. 16:37

share

이 연산자를 설명하기에 앞서 multicast, publish, replay, refCount 기능이 통합된 연산자라고 생각하시면 편하실것 같습니다. 

https://huniroom.tistory.com/entry/RxSwift-sharing-Subscription-Operator-multicast?category=1019140

 

RxSwift) sharing Subscription Operator (multicast, publish, replay)

multicast multicast 연산자는 하나의 옵저버블을 공유할떄 사용하는 연산자입니다. 하지만 서브젝트를 따로 생성하고 이벤트가 방출되길 원하는 시점에 .connect() 를 호출해 주어야 합니다. 함수 원형

huniroom.tistory.com

https://huniroom.tistory.com/entry/RxSwift-sharing-Subscription-Operator-refCount?category=1019140

 

RxSwift) sharing Subscription Operator (refCount)

RefCount extension ConnectableObservableType { public func refCount() -> Observable { RefCount(source: self) } } RefCount 는 ConnectableObservableType 형에만 따로 구현되어있습니다. 구현된것을 보면..

huniroom.tistory.com

이 글을 참고해주세요.

 

share연산자는 구독을 공유합니다. 하지만 전달하는 인자에 따라서 내부적 처리나 반환형태가 달라집니다.

함수 원형

public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
    -> Observable<Element> {
    switch scope {
    case .forever:
        switch replay {
        case 0: return self.multicast(PublishSubject()).refCount()
        default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
        }
    case .whileConnected:
        switch replay {
        case 0: return ShareWhileConnected(source: self.asObservable())
        case 1: return ShareReplay1WhileConnected(source: self.asObservable())
        default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
        }
    }
}

share 연산자는 replay 파라미터에 전달하는 값에 따라 반환 형태가 달라집니다.

0일때는 PublishSubject를 리턴하고 0이 아닐때는 ReplaySubject를 리턴합니다.

 

ReplaySubject는 버퍼의 크기를 지정합니다. 두번쨰 인자로 scope 파라미터를 전달하는데 두가지 케이스가 있습니다.

scope 파라미터는 whileConnected, forever 가 들어갑니다.

 

case whileConnected :

    기본값 입니다.

    새로운 구독자 (즉, 새로운 시퀀스가 시작되면) 새로운 커넥션을시작합니다.

case forever :  

    모든 커넥션이 하나의 구독자를 공유합니다.

 

우선 share연산자를 사용하지 않은 예제 코드입니다.

let disposeBag = DisposeBag()
let oneSecondObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)

let observer1 = oneSecondObservable
    .subscribe { print("🍎 : ", $0) }

let observer2 = oneSecondObservable
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe{ print("🥝 : ", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = oneSecondObservable
        .subscribe { print("🍊 : ", $0) }
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
}

출력

🍎 :  next(0)
🍎 :  next(1)
🍎 :  next(2)
🍎 :  next(3)
🥝 :  next(0)
🍎 :  next(4)
🍎 :  completed
🥝 :  next(1)
🍊 :  next(0)
🍊 :  next(1)
🍊 :  next(2)

출력 결과를 보면 🥝  이벤트가 0과 1까지만 출력된것을 유의해서 봐줍시다.

다음으로 share 연산자를 적용해보겠습니다.

let disposeBag = DisposeBag()
let oneSecondObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .share()

let observer1 = oneSecondObservable
    .subscribe { print("🍎 : ", $0) }

let observer2 = oneSecondObservable
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe{ print("🥝 : ", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = oneSecondObservable
        .subscribe { print("🍊 : ", $0) }
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
}

출력 결과

🍎 :  next(0)
🍎 :  next(1)
🍎 :  next(2)
🍎 :  next(3)
🥝 :  next(3)
🍎 :  next(4)
🥝 :  next(4)
🍎 :  completed
🥝 :  completed
🍊 :  next(0)
🍊 :  next(1)
🍊 :  next(2)

share를 해서 나온 출력결과 키위 이벤트는 하나의 시퀀스로 멀티캐스트 되기때문에 delaySubscription(3) 의 영향으로 앞서 방출된 이벤트 0,1,2 생략되었습니다.

 

때문에 next(3)부터 이벤트가 방출되었습니다.

 

그리고 두 구독자가 5초후에 dispose됩니다.

 

이후 7초에 오렌지 구독자가재구독을 하게 됩니다.

 

이때는 시퀀스가 종료되고 다시 connect되는것이기에 이벤트가 0부터 방출됩니다.

 

다음으로 .share(replay: 5)를 추가해주었습니다.

let disposeBag = DisposeBag()
let oneSecondObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .share(replay: 5)

let observer1 = oneSecondObservable
    .subscribe { print("🍎 : ", $0) }

let observer2 = oneSecondObservable
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe{ print("🥝 : ", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = oneSecondObservable
        .subscribe { print("🍊 : ", $0) }
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
}

출력결과

🍎 :  next(0)
🍎 :  next(1)
🍎 :  next(2)
🥝 :  next(0)
🥝 :  next(1)
🥝 :  next(2)
🍎 :  next(3)
🥝 :  next(3)
🍎 :  next(4)
🥝 :  next(4)
🍎 :  completed
🥝 :  completed
🍊 :  next(0)
🍊 :  next(1)
🍊 :  next(2)

새로운 구독자는 구독이 시작될때 버퍼에 저장된 next 이벤트를 함께 전달받게 됩니다.

따라서 앞서 .share() 때랑은 다르게 버퍼에 저장되어있던 🥝 0,1,2 이벤트가 방출되었습니다.

 

지금까지는 share의 scope 인자가 .whileConnected 이였습니다.

이번엔 .forever 로 설정하고 결과를 보겠습니다.

let disposeBag = DisposeBag()
let oneSecondObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .share(replay: 5, scope: .forever)

let observer1 = oneSecondObservable
    .subscribe { print("🍎 : ", $0) }

let observer2 = oneSecondObservable
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe{ print("🥝 : ", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = oneSecondObservable
        .subscribe { print("🍊 : ", $0) }
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
}

출력

🍎 :  next(0)
🍎 :  next(1)
🍎 :  next(2)
🥝 :  next(0)
🥝 :  next(1)
🥝 :  next(2)
🍎 :  next(3)
🥝 :  next(3)
🍎 :  next(4)
🥝 :  next(4)
🍎 :  completed
🥝 :  completed
🍊 :  next(0)
🍊 :  next(1)
🍊 :  next(2)
🍊 :  next(3)
🍊 :  next(4)
🍊 :  completed

위쪽 출력을 보면 늘 3개씩만 이벤트를 방출하던 🍊  이벤트가 5번의 이벤트를 방출됩니다.

.forever 를 하므로서 시퀀스가 계속 유지되기에 버퍼안에 있던 이벤트가 계속 남아있고 그것이 구독과 동시에 한번에 방출되었습니다.

때문에 실제로 로그를 확인해보면 🍊  : next(0~4) 까지 한번에 로그에 나오는걸 확인할 수 있습니다.

반응형
Comments