RxJava2.x Flowable를 사용하여 backpressure 제어
입문
만약 짧은 시간 내에 대량의emit 흐름의 수신단에 대해 비교적 무거운 처리를 실행한다면, 처리는 넘쳐서 따라갈 수 없을 것이다.그때는 backpressure로
backpressure는 유량을 제어하는 메커니즘이다.
RxJava2.x에서 Observable 및 Flowable 분리, Observable=backpressure 없음, Flowable=backpressure 있음.
이 글에서 우리는 flowable를 사용하여 backpressure를 제어하려고 합니다.
예는 Kotlin에서 구현됩니다.Kotlin 경험이 없어도 Java에서 Rx를 처리했다면 어렵지 않았다
예제 응용 프로그램
컨텐트
컨텐트
실시
소스 코드 여기 있습니다.
일단 플로우블 만들기.
PublishProcessor를 사용하여 검색 표시줄 값의 변화를 emit합니다.
PublishProcessor는 Flowable을 계승하는 클래스입니다.io.reactivex.Flowable<T>
io.reactivex.processors.FlowableProcessor<T>
io.reactivex.processors.PublishProcessor<T>
val processor = PublishProcessor.create<Int>()
val seekBar = findViewById(R.id.seekBar) as SeekBar
seekBar.setOnSeekBarChangeListener(object : SeekBar.OnSeekBarChangeListener {
override fun onProgressChanged(bar: SeekBar?, progress: Int, fromUser: Boolean) {
// シークバーの値が変わった時にprocessorへonNextされる
processor.onNext(progress)
}
override fun onStartTrackingTouch(p0: SeekBar?) {}
override fun onStopTrackingTouch(p0: SeekBar?) {}
})
배경 압력 제어 subscription.request
를 사용하여 수신자가 처리를 마칠 때마다 값을 수신할 수 있도록 제어합니다.onBackpressureLatest()
운영자가 최신 값을 받아들일 수 있도록 합니다.val textView = findViewById(R.id.text) as TextView
processor
.onBackpressureLatest() // 最新の値を受け取る
.subscribe(object : FlowableSubscriber<Int> {
private var subscription: Subscription? = null
override fun onNext(t: Int?) {
// 実行に1000ミリ秒必要な処理を開始
Observable.just(0)
.delay(1000, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
textView.text = additionalString() + textView.text
subscription?.request(1) // 処理が終わってから次の値を1つ受け取る
}
}
override fun onError(t: Throwable?) {
}
override fun onComplete() {
}
override fun onSubscribe(s: Subscription?) {
this.subscription = s // subscriptionを保持
s?.request(1) // 最初に受け取れる値は1つ
}
})
실제로 이런 코드를 쓰나요?나는 더욱 쉽게 쓰고 싶다...
io.reactivex.Flowable<T>
io.reactivex.processors.FlowableProcessor<T>
io.reactivex.processors.PublishProcessor<T>
val processor = PublishProcessor.create<Int>()
val seekBar = findViewById(R.id.seekBar) as SeekBar
seekBar.setOnSeekBarChangeListener(object : SeekBar.OnSeekBarChangeListener {
override fun onProgressChanged(bar: SeekBar?, progress: Int, fromUser: Boolean) {
// シークバーの値が変わった時にprocessorへonNextされる
processor.onNext(progress)
}
override fun onStartTrackingTouch(p0: SeekBar?) {}
override fun onStopTrackingTouch(p0: SeekBar?) {}
})
subscription.request
를 사용하여 수신자가 처리를 마칠 때마다 값을 수신할 수 있도록 제어합니다.onBackpressureLatest()
운영자가 최신 값을 받아들일 수 있도록 합니다.val textView = findViewById(R.id.text) as TextView
processor
.onBackpressureLatest() // 最新の値を受け取る
.subscribe(object : FlowableSubscriber<Int> {
private var subscription: Subscription? = null
override fun onNext(t: Int?) {
// 実行に1000ミリ秒必要な処理を開始
Observable.just(0)
.delay(1000, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
textView.text = additionalString() + textView.text
subscription?.request(1) // 処理が終わってから次の値を1つ受け取る
}
}
override fun onError(t: Throwable?) {
}
override fun onComplete() {
}
override fun onSubscribe(s: Subscription?) {
this.subscription = s // subscriptionを保持
s?.request(1) // 最初に受け取れる値は1つ
}
})
실제로 이런 코드를 쓰나요?나는 더욱 쉽게 쓰고 싶다...
processor
.throttleLast(1000, TimeUnit.MILLISECONDS) // 1000ミリ秒間で最新の値を1つ流す
.subscribe{ /*省略*/ }
총결산
나는 검색창의 예를 들어 Flowable로 backpressure 제어를 해 보았다.
실제로throttle계 운영자를 사용하여 설치하는 경우가 많습니다, RxJava2.x 가져오는 기회에서 subscription.request()
제어를 사용하는 것이 좋다는 것을 알 수 있다.
참고 자료
Reference
이 문제에 관하여(RxJava2.x Flowable를 사용하여 backpressure 제어), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/jnikd/items/ad5ca824cf2856942dc8텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)