RxJava2.x Flowable를 사용하여 backpressure 제어

입문



만약 짧은 시간 내에 대량의emit 흐름의 수신단에 대해 비교적 무거운 처리를 실행한다면, 처리는 넘쳐서 따라갈 수 없을 것이다.그때는 backpressure로
backpressure는 유량을 제어하는 메커니즘이다.
RxJava2.x에서 Observable 및 Flowable 분리, Observable=backpressure 없음, Flowable=backpressure 있음.
이 글에서 우리는 flowable를 사용하여 backpressure를 제어하려고 합니다.
예는 Kotlin에서 구현됩니다.Kotlin 경험이 없어도 Java에서 Rx를 처리했다면 어렵지 않았다

예제 응용 프로그램



컨텐트

  • 탐색기를 조작할 때 대량의 값 변경
  • 수신 포트에서 1000ms 처리
  • 실시

  • 화면 위: Observable
  • backpressure 제어 없음
  • 트랙터를 이동할 때마다 대량으로 처리
  • 화면 하단: Flowable
  • backpressure 제어를 진행하여 처리가 끝날 때마다 다음 값을 수신합니다
  • 트랙터를 움직여도 대량으로 처리하지 않는다
  • 다음은 이 견본의 원본 코드를 살펴보겠습니다.
    소스 코드 여기 있습니다.

    일단 플로우블 만들기.


    PublishProcessor를 사용하여 검색 표시줄 값의 변화를 emit합니다.
    PublishProcessor는 Flowable을 계승하는 클래스입니다.
    io.reactivex.Flowable<T>
      io.reactivex.processors.FlowableProcessor<T>
        io.reactivex.processors.PublishProcessor<T>
    
    val processor = PublishProcessor.create<Int>()
    val seekBar = findViewById(R.id.seekBar) as SeekBar
    
    seekBar.setOnSeekBarChangeListener(object : SeekBar.OnSeekBarChangeListener {
        override fun onProgressChanged(bar: SeekBar?, progress: Int, fromUser: Boolean) {
            // シークバーの値が変わった時にprocessorへonNextされる
            processor.onNext(progress)
        }
    
        override fun onStartTrackingTouch(p0: SeekBar?) {}
        override fun onStopTrackingTouch(p0: SeekBar?) {}
    })
    

    배경 압력 제어

    subscription.request를 사용하여 수신자가 처리를 마칠 때마다 값을 수신할 수 있도록 제어합니다.onBackpressureLatest() 운영자가 최신 값을 받아들일 수 있도록 합니다.
    val textView = findViewById(R.id.text) as TextView
    processor
        .onBackpressureLatest()  // 最新の値を受け取る
        .subscribe(object : FlowableSubscriber<Int> {
            private var subscription: Subscription? = null
            override fun onNext(t: Int?) {
                // 実行に1000ミリ秒必要な処理を開始
                Observable.just(0)
                        .delay(1000, TimeUnit.MILLISECONDS)
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe {
                            textView.text = additionalString() + textView.text
                            subscription?.request(1) // 処理が終わってから次の値を1つ受け取る
                        }
            }
    
            override fun onError(t: Throwable?) {
            }
    
            override fun onComplete() {
            }
    
            override fun onSubscribe(s: Subscription?) {
                this.subscription = s // subscriptionを保持
                s?.request(1) // 最初に受け取れる値は1つ
            }
        })
    

    실제로 이런 코드를 쓰나요?나는 더욱 쉽게 쓰고 싶다...

  • 실제로throttle계 조작원을 사용하여 간단하게 제작하는 경우가 많다.
  • processor
        .throttleLast(1000, TimeUnit.MILLISECONDS) // 1000ミリ秒間で最新の値を1つ流す
        .subscribe{ /*省略*/ }
    

    총결산


    나는 검색창의 예를 들어 Flowable로 backpressure 제어를 해 보았다.
    실제로throttle계 운영자를 사용하여 설치하는 경우가 많습니다, RxJava2.x 가져오는 기회에서 subscription.request() 제어를 사용하는 것이 좋다는 것을 알 수 있다.

    참고 자료

  • RxJava
  • https://github.com/ReactiveX/RxJava
  • 좋은 웹페이지 즐겨찾기