반응성 데이터 스트림 - 빠른 rxJava 요약
13610 단어 reactivejavaprogramming
네트워크 채팅을 효과적으로 줄일 수 있도록 Netflix에서 구축했습니다. rxJava의 목표는 클라이언트가 서버에서 병렬로 실행되는 단일 "무거운"클라이언트 요청을 호출할 수 있도록 하는 것입니다.
그 개념은
Observable
/Iterable
유형과 Subscribing
유형을 기반으로 데이터 스트림을 비동기로 보냅니다.다음 참고 사항은 자세히 설명되어 있지 않습니다. 이것을 작성하는 과정에서 Java 8이 필요한 Spring Boot 2로 업그레이드하고 rxJava는 Java 6만 실행하기 때문에
Reactor
라이브러리로 전환했습니다. 둘 다 유사한 개념을 기반으로 하지만 구조가 다릅니다.관찰 가능/반복 가능
Observable
데이터 유형은 "풀"인 Iterable
와 동일한 "푸시"로 생각할 수 있습니다.Iterable
데이터 유형은 해당 값이 도착할 때까지 생산자 및 스레드 블록에서 값을 가져옵니다.생산자는 값을 사용할 수 있을 때마다 소비자에게 값을 푸시합니다.
값이 동기식 또는 비동기식으로 도착할 수 있기 때문에 보다 유연한 접근 방식을 만듭니다.
관찰 가능한 유형
Iterable
유형에 있는 두 개의 누락된 semantiqcs를 추가합니다.이렇게 하면
Observable
와 Iterable
가 통합됩니다.유일한 차이점은 데이터가 흐르는 방향입니다.
항상 반환
Observable
, 항상 요청Iterable
.기존 데이터 구조에서 Observable 만들기
just()
및 from()
메서드를 사용하여 개체, 목록 또는 개체 배열을 해당 개체를 내보낼 수 있는 Observable로 변환합니다.
Observable<String> o = Observable.from("a","b","c");
// Inserting a list into an observable
def list = [5,6,7,8]
Observable<Integer> o = Observable.from(list);
//
Observable<String> o = Observable.just("one object");
create() 메소드를 통해 Observable 생성
create()
메서드를 통해 자체 Observable을 설계하여 비동기 I/O, 계산 작업 또는 '무한' 데이터 스트림을 구현할 수 있습니다. 동기 관찰 가능 예제:
def customObservableBlocking() {
return Observable.create(
{ aSub ->
for (int i=0; i<50; i++) {
if (false == aSub.isUnsubscribed()) {
aSub.onNext("value_" + i);
};
}
// after sending all values we complete the sequence
if (false == aSub.isUnsubscribed()) {
aSub.onCompleted();
}
});
}
// Output:
customObservableBlocking().sub({ it -> println(it); });
비동기 관찰 가능 예제:
def customObservableNonBlocking() {
return Observable.create(
{ sub ->
final Thread t = new Thread(new Runnable() {
void run() {
for (int i = 0; i < 75; i++) {
if (true == sub.isUnsubscribed()) {
return;
}
sub.onNext("value_" + i);
}
if (false == sub.isUnsubscribed())
}
});
t.start();
}
);
}
// Output:
customObservableNonBlocking().sub({ println(it) })
def fetchWikiArticleAsync(String... wikiArticleNames) {
return Observable.create({ sub ->
Thread.start( {
for (articleName in wikiArticleNames) {
if (true == sub.isUnsubscribed()) {
return;
}
sub.onNext(new URL("http://en.wikipedia.org/wiki/" + articleName).getText());
}
if (false == sub.isUnsubscribed()) {
sub.onCompleted();
}
});
return(sub);
});
}
// Output:
fetchWikiArticleAsync("Tiger", "Elephant")
.sub({println "--- Article ---\n" + it.substring(0, 125); });
연산자로 Observable 변환
operators
customObservableNonBlock
:def simpleComposition() {
customObservableNonBlocking().skip(10).take(5)
.map({ stringValue -> return stringValue + "_xform" })
.subscribe({ println "onNext => " + it })
}
skip(10)
- 10번째 값으로 이동합니다take(5)
- 5개의 다음 값을 가져옵니다map(...)
- 각 값을 매핑하고 stringValue
를 _xform
=> $stringValue_xform
로 연결합니다.subscribe(...)
- onNext =>
가 연결된 매핑된 값을 반환합니다이것은 초기 rxJava 여름입니다. 대신
reactor
라이브러리로 전환하면 개념은 동일하지만 구조가 다르고 더 단순하며 Java 8을 허용합니다.rxJava에 대한 자세한 내용은 여기에서 확인하십시오. (여기)[ https://github.com/ReactiveX/RxJava ]
Reference
이 문제에 관하여(반응성 데이터 스트림 - 빠른 rxJava 요약), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/andrehatlo/reactive-data-streams-quick-rxjava-summary-2pci텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)