RxSwift - Mathematical & Aggregate & Connectable Observable Operators
concat
concat 은 RxSwift 에 CombineOperator 안에 속에 있는 Operator 입니다.
concat 은 두개의 Observable 에서 방출되는 Event 를 결합하여
하나의 Observable 로 만들어주는 기능을 합니다.
let firstObservable = Observable<Int>.of(1,2,3)
let secondObservable = Observable<Int>.of(4,5,6)
firstObservable
.concat(secondObservable)
.subscribe(onNext: {
print($0)
}).disposed(by: disposBag)
//print
1
2
3
4
5
6
두개의 Observable 의 값을 결합하여 방출합니다.
이 밖에 다양한 Operator를 사용하여 Event 의 값을 변형 할수도 있습니다.
let firstObservable = Observable<Int>.of(1,2,3)
let secondObservable = Observable<Int>.of(4,5,6)
firstObservable
.concat(secondObservable)
.filter { $0 % 2 == 0 }
.toArray()
.subscribe { singleEvent in
print(singleEvent)
}.disposed(by: disposBag)
//print
success([2, 4, 6])
하지만 위 예시는 가장 기본적인 동작을 테스트 하기 위한 예시입니다.
여기서 놓칠수 있는 부분이 있어 조금더 다양한 예시를 통해 살펴보겠습니다.
let first = Observable<Int>.create { observer in
observer.onNext(1)
return Disposables.create()
}
let second = Observable<Int>.of(2,3,4)
let concat = Observable.concat([first,second])
concat
.debug()
.subscribe(onNext : {
print($0)
}).disposed(by: disposBag)
creat 와 of 생성생 Int 타입의 Observable 2개가 있습니다.
이 두개의 Observable 을 concat을 통해 새로운 Observable 로 결합합니다.
그럼 출력되는 값은 어떻게 될까요?
저는 1 2 3 4 를 생각했습니다.
1
이유는 위 이미지 와 debug 를 통해 알수 있습니다.
2022-03-17 11:28:11.132: SetupVC.swift:54 (testConcat()) -> subscribed
2022-03-17 11:28:11.135: SetupVC.swift:54 (testConcat()) -> Event next(1)
debug 출력문 에서는 Dispose 에 대한 출력이 없습니다.
그렇다는건 구독행위 자체가 dispose 되지 않았고 그 이유는 방출에서 알수 있습니다.
concat 연산자는 첫번째 Observable 의 생명주기가 종료된후 두번째 Observable 를 방출하여 결합합니다.
그로 인해 첫번째 Observable 의 주기가 종료되지 않아 결합을 할수 없는 것입니다.
이 문제는 onCompleted Event 방출로 해결할수 있습니다.
let first = Observable<Int>.create { observer in
observer.onNext(1)
observer.onCompleted()
return Disposables.create()
}
onCompleted 방출
2022-03-17 11:30:01.756: SetupVC.swift:54 (testConcat()) -> subscribed
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(1)
1
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(2)
2
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(3)
3
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event next(4)
4
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> Event completed
2022-03-17 11:30:01.759: SetupVC.swift:54 (testConcat()) -> isDisposed
정상작동
let subject = PublishSubject<String>()
subject.onNext("FirstSubject 이벤트 요소 === 하나")
let secondSubject = BehaviorSubject<String>(value: "SecondSubject 이벤트 요소 === 하나")
secondSubject.onNext("SecondeSubject === 둘")
let concatObservable = Observable.concat([subject,secondSubject])
concatObservable
.debug()
.subscribe(onNext: {
print($0)
}).disposed(by: disposBag)
subject.onNext("FirstSubject 이벤트 요소 === 셋")
secondSubject.onNext("SecondSubject 이벤트 요소 === 셋")
subject 또한 비슷한 맥락을 가지고 있습니다.
다른 특성에 Publish / Behavior 두 개의 subject 를 concat 하였을때
2022-03-17 11:35:23.577: SetupVC.swift:75 (testConcat()) -> subscribed
2022-03-17 11:35:23.579: SetupVC.swift:75 (testConcat()) -> Event next(FirstSubject 이벤트 요소 === 셋)
FirstSubject 이벤트 요소 === 셋
이와 같은 출력문을 전달해 줍니다.
여기서 PublishSubject 의 특징은 맞게 나타나지만
BehaviorSubject 의 특징은 잘 나타나지 않습니다. 구독 이전의 Event요소를 가져오지 못합니다.
조금더 알기 쉬운 예시 코드를 작성하자면
let pSub = PublishSubject<String>()
pSub.onNext("p")
let bSub = BehaviorSubject<String>(value: "bA")
pSub
.concat(bSub)
.toArray()
.debug()
.subscribe()
.disposed(by: disposBag)
pSub.onNext("pA")
bSub.onNext("bB")
pSub.onNext("pB")
bSub.onNext("bC")
pSub.onNext("pC")
bSub.onNext("bD")
pSub.onNext("pD")
pSub.onCompleted()
bSub.onNext("bF")
bSub.onNext("bG")
bSub.onNext("bH")
bSub.onCompleted()
위와 같은 코드에서 concat이 작동되면
publishSubject 는
- 구독 이전 Event 방출을 무시한다
-> p 를 무시함
behaviorSubject 는
- 구독 이전 최신값 + 구독 이후 값을 관찰한다.
-> bA -> bH 까지
2022-03-17 12:17:46.180: SetupVC.swift:119 (testConcat()) -> subscribed
2022-03-17 12:17:46.182: SetupVC.swift:119 (testConcat()) -> Event next(["pA", "pB", "pC", "pD", "bD", "bF", "bG", "bH"])
2022-03-17 12:17:46.182: SetupVC.swift:119 (testConcat()) -> Event completed
2022-03-17 12:17:46.182: SetupVC.swift:119 (testConcat()) -> isDisposed
하지만 output 에서 보여지는 behaviorSubject는 publishSubject 의 방출이 끝나는 지점(onCompletd)의 전 의 방출된 자신의 Event 만 관찰할수 있습니다.
즉 구독의 기준이 아닌 모체로 실행되는 publishSubject의 생명주기 종료 시점에 기준하여 Event를 관찰할수 있습니다.
또한 두개의 subject 둘중 하나라도 onCompleted Event 가 실행되지 않는다면 observer 는 onNext Event를 관찰할수 없습니다.
reduce
Swift 고차함수 안에서도 속해있는 reduce와 동일합니다.
reduce 는 모든 Event에 대한 총합을 방출하게됩니다.
(scan 은 중간 과정을 모두 방출합니다.)
let observer = Observable.range(start: 1, count: 5)
observer.scan(0, accumulator: +)
.subscribe { print("scan === \($0)") }
.disposed(by: disposBag)
observer.reduce(0, accumulator: +)
.subscribe{ print("reduce === \($0)") }
.disposed(by: disposBag)
클로져를 통한 다양한 연산도 가능합니다.
Observable.from([1, 2, 3])
.reduce(2) { $0 * $1 }
.debug()
.subscribe()
.disposed(by: disposBag)
Connectable Observable Operators
connectable Operaotr 를 사용하게 되면 공유 할수 있는 (연결 가능한) Observable 로 변환 할수 있습니다.
Observable 은 unicast 로써 하나의 observer 만 연결될수 있습니다.
func buttonLabelConfigure(){
view.addSubview(bt)
bt.snp.makeConstraints {
$0.centerX.centerY.equalToSuperview()
$0.width.equalTo(200)
$0.height.equalTo(80)
}
view.addSubview(lb)
lb.snp.makeConstraints {
$0.top.equalTo(bt.snp.bottom).offset(20)
$0.centerX.equalToSuperview()
$0.width.equalTo(200)
$0.height.equalTo(80)
}
}
func testShare(){
// API Requst 를 통해 Responde 받는 Observable 이라고 가정
let requestAPI = Observable.just(100).debug("API - Request")
let tapResult = bt.rx.tap
.flatMap { requestAPI }
tapResult
.map { $0 > 3 }
// bind(to:)는subscribe()의 별칭(Alias)으로 Subscribe()를 호출한 것과 동일
.bind(to: bt.rx.isHidden )
.disposed(by: disposBag)
tapResult
.map { "Count : \($0)" }
.bind(to: lb.rx.text )
.disposed(by: disposBag)
}
버튼 tap 대한 Observable(시퀀스) 에 2번에 bind ( 구독 ) 가 있다고 가정하겠습니다.
이럴경우 API 콜에 대한 Observable 은 bind(2번) 를 통해 2개의 시퀀스가 생성 방출됩니다.
2022-03-17 13:42:20.669: API - Request -> subscribed
2022-03-17 13:42:20.671: API - Request -> Event next(100)
2022-03-17 13:42:20.671: API - Request -> Event completed
2022-03-17 13:42:20.671: API - Request -> isDisposed
2022-03-17 13:42:20.672: API - Request -> subscribed
2022-03-17 13:42:20.672: API - Request -> Event next(100)
2022-03-17 13:42:20.673: API - Request -> Event completed
2022-03-17 13:42:20.673: API - Request -> isDisposed
출력문에서 볼수 있듯 응답으로 들어오는 onNext 이벤트가 2번이 들어왔습니다.
이러한 문제점을 해결할수 있는 Connect Observable Operator 로는
share() operator 가 있습니다.
share
연산자를 사용하면 Subscribe()할때마다 새로운 Observable 시퀀스가 생성되지 않고, 하나의 시퀀스에서 방출되는 Event 요소를 공유할수 있습니다
unicast -> multicast ??
let tapResult = bt.rx.tap
.flatMap { requestAPI }
.share()
2022-03-17 13:59:34.499: API - Request -> subscribed
2022-03-17 13:59:34.501: API - Request -> Event next(100)
2022-03-17 13:59:34.502: API - Request -> Event completed
2022-03-17 13:59:34.502: API - Request -> isDisposed
하지만 그렇다고 Subject 의 기능을 완벽히 수행한다고 볼수는 없습니다.
이것을 확인할수 있는 코드를 먼저 보시면
let observable = Observable<Int>.create { observer in
observer.onNext(Int.random(in: 0..<10))
return Disposables.create()
}
let observer1 = observable
.subscribe { print("observer 1 === \($0)") }
observer1.disposed(by: disposBag)
let observer2 = observable
.subscribe {print("observer 2 === \($0)") }
observer2.disposed(by: disposBag)
//print
observer 1 === next(5)
observer 2 === next(2)
share 를 사용하지 않는 경우의 Observable 입니다.
Observable 은 앞써 설명드린대로 구독이 시작되면 생성되어 방출되며
구독의 지점에 따라 각기 다른 시퀀스가 생성됩니다.
하지만 share 를 사용하게 될경우
let observable = Observable<Int>.create { observer in
observer.onNext(Int.random(in: 0..<10))
return Disposables.create()
}.share()
observer 1 === next(6)
이러한 출력문 나오게 됩니다.
여기서 Subject 와의 차이점을 찾을수 있습니다.
Subject 는 observer 간에 Event를 공유 합니다.
하지만 share 를 사용하는 Observable 같은 경우에는
첫번째 Subscribe 가 이루어지는 시점에만 (ex_구독 카운트가 0 에서 1로 변했을때 ) subscribe 를 생성합니다. 이후 생성되는 구독 행위에 대해선 무시되고 첫 호출한 subscribe 를 공유하게 됩니다.
replay(default : 0 ) , scope(default : .whileConnected )
share 에는 파라미터를 넣을수 있습니다.share(replay: , scope :)
여기서 replay 는 새롭게 subscribe 되는 observer 에게 방출되는 Event 요소에 대해 설정할수 있습니다.
let observable = Observable<Int>.create { observer in
observer.onNext(Int.random(in: 0..<10))
observer.onNext(Int.random(in: 11..<20))
observer.onNext(Int.random(in: 21..<30))
observer.onNext(Int.random(in: 31..<40))
return Disposables.create()
}.share(replay: 2 , scope: .whileConnected )
let observer1 = observable
.subscribe { print("observer 1 === \($0)") }
observer1.disposed(by: disposBag)
let observer2 = observable
.subscribe {print("observer 2 === \($0)") }
observer2.disposed(by: disposBag)
let observer3 = observable
.subscribe { print("observer 3 === \($0)") }
observer3.disposed(by: disposBag)
let observer4 = observable
.subscribe { print("observer 4 === \($0)")}
observer4.disposed(by: disposBag)
observer 1 === next(9)
observer 1 === next(14)
observer 1 === next(22)
observer 1 === next(34)
observer 2 === next(22)
observer 2 === next(34)
observer 3 === next(22)
observer 3 === next(34)
observer 4 === next(22)
observer 4 === next(34)
replay 에서 설정되는 값은 buffer size 라고 생각할수 있습니다.
observer 1 에서 subscribe 0 -> 1 로 변하고
observer 2 부터는 새롭게 subscribe 된 observer 입니다.
그렇기에 observer2 부터는 share 를 통한 observer1 subscribe 공유 이기 때문에
replay 에 설정된 값에 맞는 방출을 하게 됩니다.
또한 Observable 에서 completed 이벤트가 방출되면
모든 share 는 해제 됩니다.
let observable = Observable<Int>.create { observer in
observer.onNext(Int.random(in: 0..<10))
observer.onNext(Int.random(in: 11..<20))
observer.onNext(Int.random(in: 21..<30))
observer.onNext(Int.random(in: 31..<40))
observer.onCompleted()
return Disposables.create()
}.share(replay: 2 , scope: .whileConnected )
2022-03-17 15:11:09.723: SetupVC.swift:95 (testShareTwo()) -> subscribed
2022-03-17 15:11:09.725: SetupVC.swift:95 (testShareTwo()) -> Event next(3)
observer 1 === next(3)
2022-03-17 15:11:09.725: SetupVC.swift:95 (testShareTwo()) -> Event next(13)
observer 1 === next(13)
2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> Event next(21)
observer 1 === next(21)
2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> Event next(37)
observer 1 === next(37)
2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> Event completed
observer 1 === completed
2022-03-17 15:11:09.726: SetupVC.swift:95 (testShareTwo()) -> isDisposed
2022-03-17 15:11:09.726: SetupVC.swift:100 (testShareTwo()) -> subscribed
2022-03-17 15:11:09.726: SetupVC.swift:100 (testShareTwo()) -> Event next(8)
observer 2 === next(8)
2022-03-17 15:11:09.726: SetupVC.swift:100 (testShareTwo()) -> Event next(18)
observer 2 === next(18)
2022-03-17 15:11:09.727: SetupVC.swift:100 (testShareTwo()) -> Event next(28)
observer 2 === next(28)
2022-03-17 15:11:09.727: SetupVC.swift:100 (testShareTwo()) -> Event next(38)
observer 2 === next(38)
2022-03-17 15:11:09.727: SetupVC.swift:100 (testShareTwo()) -> Event completed
observer 2 === completed
2022-03-17 15:11:09.728: SetupVC.swift:100 (testShareTwo()) -> isDisposed
2022-03-17 15:11:09.728: SetupVC.swift:105 (testShareTwo()) -> subscribed
2022-03-17 15:11:09.728: SetupVC.swift:105 (testShareTwo()) -> Event next(6)
observer 3 === next(6)
2022-03-17 15:11:09.728: SetupVC.swift:105 (testShareTwo()) -> Event next(15)
observer 3 === next(15)
2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> Event next(22)
observer 3 === next(22)
2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> Event next(35)
observer 3 === next(35)
2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> Event completed
observer 3 === completed
2022-03-17 15:11:09.729: SetupVC.swift:105 (testShareTwo()) -> isDisposed
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> subscribed
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(0)
observer 4 === next(0)
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(13)
observer 4 === next(13)
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(21)
observer 4 === next(21)
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event next(33)
observer 4 === next(33)
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> Event completed
observer 4 === completed
2022-03-17 15:11:09.729: SetupVC.swift:110 (testShareTwo()) -> isDisposed
scope 에는 2개의 설정값이 있습니다
-
.whileConnected(observer 가 한 개 이상 있으면 replay가 유지되지만, dispose되어 0개가 되면 replay 버퍼 초기화)
-
.forever (observer 가 없어도 버퍼 유지)
whileConnected는 dispose 와 disposeBag 을 통해서 첫번째 observer 를 선택할수 있고
새로운 observer 들에게도 영향을 줄수 있습니다.
let observer1 = observable
.subscribe { print("observer 1 === \($0)") }
observer1.dispose()//첫번째 observer 지만 dispose 됨
let observer2 = observable
.subscribe {print("observer 2 === \($0)") }
observer2.disposed(by: disposBag) // disposeBage
let observer3 = observable
.subscribe { print("observer 3 === \($0)") }
observer3.dispose()
let observer4 = observable
.subscribe { print("observer 4 === \($0)")}
observer4.dispose()
observer 1 === next(5)
observer 1 === next(11)
observer 1 === next(23)
observer 1 === next(39)
// observer 2 에 영향을 받음
observer 2 === next(2)
observer 2 === next(18)
observer 2 === next(28)
observer 2 === next(38)
observer 3 === next(28)
observer 3 === next(38)
observer 4 === next(28)
observer 4 === next(38)
하지만 여기서 forever 를 사용하게 되면
share(replay: 2 , scope: .forever )
observer 1 === next(5)
observer 1 === next(19)
observer 1 === next(22) // a
observer 1 === next(31) // b
observer 2 === next(22) // a
observer 2 === next(31) // b
observer 2 === next(2)
observer 2 === next(14)
observer 2 === next(22)
observer 2 === next(38)
observer 3 === next(22)
observer 3 === next(38)
observer 4 === next(22)
observer 4 === next(38)
observer1 에 3~4 번째 event 를 observer 2 의 1~2 번째의 추가 되어
observer2 는 Observable 의 4번에 방출이 아닌 6번의 방출 Event 를 관찰하고 있습니다.
이유는 forever 스트림의 내부캐시가 지워지지 않아서 입니다.
그래서 observer1 에 3~4번째 Event 를 받아오는 것입니다(bufferSize : 2)
- publish() : 이 연산자는 보통의 Observable을 ConnectableObservable로 변환해 줍니다.
let observable = Observable<Int>.create { observer in
observer.onNext(Int.random(in: 10..<30))
return Disposables.create()
}.publish()
let observer = observable
.debug()
.subscribe(onNext : { print($0)})
observer.disposed(by: disposBag)
let observer2 = observable
.debug()
.subscribe(onNext : { print($0)})
observer2.disposed(by: disposBag)
observable.connect().disposed(by: disposBag)
//print
2022-03-17 15:57:43.384: SetupVC.swift:54 (testPublish()) -> subscribed
2022-03-17 15:57:43.385: SetupVC.swift:59 (testPublish()) -> subscribed
2022-03-17 15:57:43.387: SetupVC.swift:54 (testPublish()) -> Event next(21)
21
2022-03-17 15:57:43.387: SetupVC.swift:59 (testPublish()) -> Event next(21)
21
- ConnectableObservable : ConnectableObservable은 Subscriber가 있어도 connect()를 호출하기 전까지는 아이템을 방출하지 않습니다. connect()를 호출하고 나서야 아이템을 방출하기 시작합니다.
- refcount() : refcount() 는 ConnectableObservable에 Connect와 Disconnect를 자동으로 담당하고, ConnectableObservable을 보통의 Observable처럼 사용할 수 있게 해줍니다. 다시말해 Subscription count를 계속 세고 있다가 Subscription의 개수가 0 -> 1 개가 되는 시점에 connect()를 수행하고 Subscription이 0이 되면 disconnect()를 수행합니다.
Author And Source
이 문제에 관하여(RxSwift - Mathematical & Aggregate & Connectable Observable Operators), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@leejinseong9410/RxSwift-Mathematical-Aggregate-Connectable-Observable-Operators저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)