반응성 데이터 스트림 - 빠른 rxJava 요약

(Reactive movement - manifesto)[ https://www.reactivemanifesto.org/ ]에서 영감을 받은 rxJava는 Netflix에서 만든 Reactive Stream 사양을 구현한 것입니다. 반응형 스트림은 데이터 스트림을 비동기식으로 처리하기 위한 개념입니다.

네트워크 채팅을 효과적으로 줄일 수 있도록 Netflix에서 구축했습니다. rxJava의 목표는 클라이언트가 서버에서 병렬로 실행되는 단일 "무거운"클라이언트 요청을 호출할 수 있도록 하는 것입니다.

그 개념은 Observable/Iterable 유형과 Subscribing 유형을 기반으로 데이터 스트림을 비동기로 보냅니다.

다음 참고 사항은 자세히 설명되어 있지 않습니다. 이것을 작성하는 과정에서 Java 8이 필요한 Spring Boot 2로 업그레이드하고 rxJava는 Java 6만 실행하기 때문에 Reactor 라이브러리로 전환했습니다. 둘 다 유사한 개념을 기반으로 하지만 구조가 다릅니다.

관찰 가능/반복 가능


Observable 데이터 유형은 "풀"인 Iterable와 동일한 "푸시"로 생각할 수 있습니다.
Iterable 데이터 유형은 해당 값이 도착할 때까지 생산자 및 스레드 블록에서 값을 가져옵니다.

생산자는 값을 사용할 수 있을 때마다 소비자에게 값을 푸시합니다.

값이 동기식 또는 비동기식으로 도착할 수 있기 때문에 보다 유연한 접근 방식을 만듭니다.

관찰 가능한 유형


Iterable 유형에 있는 두 개의 누락된 semantiqcs를 추가합니다.
  • 생산자는 소비자에게 사용 가능한 데이터가 더 이상 없다는 신호를 보낼 수 있습니다.
  • 생산자는 소비자에게 오류가 발생했음을 알릴 수 있습니다.

  • 이렇게 하면 ObservableIterable가 통합됩니다.

    유일한 차이점은 데이터가 흐르는 방향입니다.

    항상 반환Observable , 항상 요청Iterable .

    기존 데이터 구조에서 Observable 만들기


  • 기존 데이터 구조에서:
  • Observablejust()from() 메서드를 사용하여 개체, 목록 또는 개체 배열을 해당 개체를 내보낼 수 있는 Observable로 변환합니다.

  • 
    Observable<String> o = Observable.from("a","b","c");
    
    // Inserting a list into an observable
    def list = [5,6,7,8]
    Observable<Integer> o = Observable.from(list);
    
    //
    Observable<String> o = Observable.just("one object");
    
    


    create() 메소드를 통해 Observable 생성


  • create() 메서드를 통해 자체 Observable을 설계하여 비동기 I/O, 계산 작업 또는 '무한' 데이터 스트림을 구현할 수 있습니다.


  • 동기 관찰 가능 예제:
  • 구독할 때 차단하는 사용자 정의 Observable(추가 스레드를 생성하지 않음)

  • def customObservableBlocking() {
        return Observable.create(
            { aSub ->
                for (int i=0; i<50; i++) {
                    if (false == aSub.isUnsubscribed()) {
                        aSub.onNext("value_" + i);
                    };
            }
            // after sending all values we complete the sequence
            if (false == aSub.isUnsubscribed()) {
                aSub.onCompleted();
            }
        });
    }
    
    // Output:
    customObservableBlocking().sub({ it -> println(it); });
    




    비동기 관찰 가능 예제:
  • 75개의 문자열을 내보내는 Observable입니다.
  • 별도의 스레드를 생성하므로 구독할 때 차단되지 않습니다.

  • def customObservableNonBlocking() {
        return Observable.create(
            { sub ->
                final Thread t = new Thread(new Runnable() {
                    void run() {
                        for (int i = 0; i < 75; i++) {
                            if (true == sub.isUnsubscribed()) {
                                return;
                            }
                            sub.onNext("value_" + i);
                        }
                        if (false == sub.isUnsubscribed())
                    }
                });
                t.start();
            }
        );
    }
    // Output:
    customObservableNonBlocking().sub({ println(it) })
    


  • Groovy에서 Wiki 기사 목록을 비동기식으로 가져옵니다.

  • def fetchWikiArticleAsync(String... wikiArticleNames) {
        return Observable.create({ sub ->
            Thread.start( {
                for (articleName in wikiArticleNames) {
                    if (true == sub.isUnsubscribed()) {
                        return;
                    }
                    sub.onNext(new URL("http://en.wikipedia.org/wiki/" + articleName).getText());
                }
                if (false == sub.isUnsubscribed()) {
                    sub.onCompleted();
                }
            });
            return(sub);
        });
    }
    
    // Output:
    fetchWikiArticleAsync("Tiger", "Elephant")
        .sub({println "--- Article ---\n" + it.substring(0, 125); });
    


    연산자로 Observable 변환


  • Observables
  • 를 변환하고 구성하기 위해 함께 체인operators
  • 체인을 사용한 비동기 호출customObservableNonBlock:

  • def simpleComposition() {
        customObservableNonBlocking().skip(10).take(5)
            .map({ stringValue -> return stringValue + "_xform" })
            .subscribe({ println "onNext => " + it })
    }
    


  • skip(10) - 10번째 값으로 이동합니다
  • .
  • take(5) - 5개의 다음 값을 가져옵니다
  • .
  • map(...) - 각 값을 매핑하고 stringValue_xform => $stringValue_xform로 연결합니다.
  • subscribe(...) - onNext =>가 연결된 매핑된 값을 반환합니다
  • .

    이것은 초기 rxJava 여름입니다. 대신 reactor 라이브러리로 전환하면 개념은 동일하지만 구조가 다르고 더 단순하며 Java 8을 허용합니다.

    rxJava에 대한 자세한 내용은 여기에서 확인하십시오. (여기)[ https://github.com/ReactiveX/RxJava ]

    좋은 웹페이지 즐겨찾기