Python 응답 라 이브 러 리 RxPy

9569 단어 PythonRxPy응답 식
기본 개념
리 액 티 브 X 에는 몇 가지 핵심 개념 이 있 는데,먼저 간단하게 소개 해 드 리 겠 습 니 다.
1.1.Observable 과 Observer(관찰 대상 과 관찰자)
우선 Observable 과 Observer 는 각각 관찰 대상 과 관찰자 이다.Observable 은 비동기 데이터 원본 으로 이해 할 수 있 으 며 일련의 값 을 보 냅 니 다.Observer 는 소비자 와 유사 하 며 Observable 을 먼저 구독 한 다음 에 발사 값 을 받 을 수 있 습 니 다.이 개념 은 디자인 모델 중의 관찰자 모델 과 생산자-소비자 모델 의 종합 체 라 고 할 수 있다.
1.2、Operator(연산 자)
또 하나의 매우 중요 한 개념 은 바로 조작 부호 이다.조작 부 호 는 Observable 의 데이터 흐름 에 작용 하여 다양한 조작 을 할 수 있다.더 중요 한 것 은 조작 부 호 는 체인 으로 조합 할 수 있다 는 것 이다.이러한 체인 함수 호출 은 데이터 와 조작 을 분리 할 뿐만 아니 라 코드 도 더욱 선명 하 게 읽 을 수 있다.일단 숙련 된 후에 너 는 이런 느낌 을 사랑 하 게 될 것 이다.
1.3.Single(단일 사례)
RxJava 와 그 변체 중 에는 비교적 특수 한 개념 인 Single 이 라 고 하 는데 이것 은 같은 값 만 발사 하 는 Observable 로 말하자면 하나의 예 이다.물론 자바 등 언어 에 익숙 하 다 면 한 가지 예 로 도 잘 알 고 있 을 것 이다.
1.4.제목(주체)
주체 라 는 개념 은 매우 특수 하 다.그것 은 Observable 이자 Observer 이다.바로 이 특징 때문에 Subject 는 다른 Observable 을 구독 할 수도 있 고 발사 대상 을 다른 Observer 에 줄 수도 있다.어떤 장면 에 서 는 Subject 가 큰 역할 을 한다.
1.5.스케줄 러(스케줄 러)
기본적으로 Reactive X 는 현재 스 레 드 에서 만 실행 되 지만 필요 하 다 면 스케줄 러 로 Reactive X 를 다 중 스 레 드 환경 에서 실행 할 수도 있 습 니 다.많은 스케줄 러 와 대응 하 는 조작 부호 가 있어 다 중 스 레 드 장면 에서 의 각종 요 구 를 처리 할 수 있다.
1.6 Observer 와 Observable
먼저 가장 간단 한 예 를 들 어 실행 결 과 는 순서대로 이 숫자 들 을 인쇄 할 것 이다.이 곳 의 of 는 주어진 매개 변수 에 따라 새로운 Observable 을 만 들 수 있 는 연산 자 입 니 다.생 성 후 Observable 을 구독 할 수 있 습 니 다.세 가지 리 셋 방법 은 해당 하 는 시기 에 실 행 됩 니 다.Observer 가 Observable 을 구독 하면 후속 Observable 이 발사 하 는 각 값 을 받 습 니 다.

from rx import of

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
    on_next=lambda i: print(f'Received: {i}'),
    on_error=lambda e: print(f'Error: {e}'),
    on_completed=lambda: print('Completed')

)
이 예 는 보기 에는 매우 간단 하고 쓸모 가 없어 보인다.그러나 Rx 의 핵심 개념 을 알 게 되면 이것 이 얼마나 강력 한 도구 인지 알 게 될 것 이다.더 중요 한 것 은 Observable 이 데 이 터 를 만 들 고 구독 하 는 과정 이 비동기 적 이라는 점 이다.익숙 하 다 면 이 특성 을 이용 해 많은 일 을 할 수 있다.
1.7 연산 자
RxPy 에서 또 다른 중요 한 개념 은 바로 조작 부호 이다.심지어 조작 부호 가 가장 중요 한 개념 이 라 고 할 수 있다.거의 모든 기능 은 각 조작 부 호 를 조합 하여 실현 할 수 있다.조작 부 호 를 익히 는 것 이 RxPy 를 잘 배 우 는 관건 이다.연산 자 간 에 도 pipe 함수 로 연결 하여 복잡 한 조작 체인 을 구성 할 수 있다.

from rx import of, operators as op
import rx

ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
    op.map(lambda i: i ** 2),
    op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))
RxPy 에는 다양한 기능 을 수행 할 수 있 는 대량의 조작 부호 가 있다.우 리 는 그 중에서 자주 사용 하 는 조작 부 호 를 간단하게 보 자.자바 8 의 스 트림 라 이브 러 리 나 다른 함수 식 프로 그래 밍 라 이브 러 리 에 익숙 하 다 면 이 연산 자 들 에 게 친절 해 야 합 니 다.
1.8,창설 형 조작 부호
우선 Observable 을 만 드 는 조작 자 입 니 다.자주 사용 하 는 생 성 형 조작 자 를 열거 하 였 습 니 다.

1.9.여과 형 연산 자
필터 형 조작 부호 의 주요 역할 은 Observable 을 선별 하고 여과 하 는 것 이다.

1.10,변환 형 조작 부호

1.11.산술 연산 자

1.12、Subject
Subject 는 Observer 이자 Observable 인 특별한 대상 입 니 다.그러나 이 대상 은 일반적으로 자주 사용 되 지 않 지만,만약 어떤 용도 가 여전히 매우 유용 하 다 면.그 러 니까 소개 좀 해 줘 야 겠 다.아래 코드 는 구독 할 때 첫 번 째 값 이 발사 되 었 기 때문에 구독 후에 발 사 된 값 만 인쇄 합 니 다.

from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject

# Subject   Observer Observable

print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4
그리고 몇 가지 특별한 Subject 가 있 습 니 다.소개 해 드 리 겠 습 니 다.
1.13、ReplaySubject
Replay Subject 는 발 사 된 모든 값 을 기록 하 는 특수 한 Subject 입 니 다.언제든지 구독 할 수 있 습 니 다.그래서 캐 시 로 사용 할 수 있 습 니 다.ReplaySubject 는 bufferSize 인 자 를 받 아들 일 수 있 습 니 다.캐 시 할 수 있 는 최근 데이터 수 를 지정 합 니 다.기본적으로 모든 것 입 니 다.
아래 코드 는 위의 코드 와 거의 같 지만 ReplaySubject 를 사 용 했 기 때문에 모든 값 이 인쇄 됩 니 다.물론 구독 문 구 를 다른 위치 에 두 고 출력 에 변화 가 있 는 지 살 펴 볼 수도 있다.

# ReplaySubject      ,                  
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4
1.14、BehaviorSubject
Behavior Subject 는 최근 에 발 사 된 값 만 기록 하 는 특수 한 Subject 입 니 다.그리고 이 를 만 들 때 초기 값 을 지정 해 야 합 니 다.모든 구독 대상 이 이 초기 값 을 받 을 수 있 습 니 다.물론 구독 이 늦 어 지면 이 초기 값 도 뒤에서 발 사 된 값 으로 덮어 씌 울 수 있다 는 점 에 주의해 야 한다.

# BehaviorSubject         ,  Observable    
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4
1.15、AsyncSubject
AsyncSubject 는 특수 한 Subject 입 니 다.말 그대로 비동기 Subject 입 니 다.Observer 가 완 료 될 때 만 데 이 터 를 발사 하고 마지막 데이터 만 발사 합 니 다.따라서 아래 코드 는 출력 4.마지막 줄 을 주석 하면 cocompleted 호출,아무것도 출력 하지 않 습 니 다.

# AsyncSubject         ,     Observable       
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4
1.16、Scheduler
RxPy 는 비동기 적 인 프레임 워 크 라 고 할 수 있 지만 기본적으로 하나의 스 레 드 위 에서 실 행 됩 니 다.따라서 스 레 드 실행 을 방해 할 수 있 는 동작 을 사용 하면 프로그램 이 끊 깁 니 다.물론 이런 상황 에 대해 우 리 는 다른 Scheduler 를 사용 하여 임 무 를 배정 하여 프로그램 이 효율 적 으로 운행 할 수 있 도록 할 수 있다.
다음 예 는 스 레 드 풀 스케줄 러 를 기반 으로 한 Thread PoolScheduler 를 만 들 었 습 니 다.두 Observable 용 subscribeon 방법 은 스케줄 러 를 지정 하기 때문에 서로 다른 스 레 드 로 작업 합 니 다.

import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op

import multiprocessing
import time
import threading
import random


def long_work(value):
    time.sleep(random.randint(5, 20) / 10)
    return value


pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())

rx.range(5).pipe(
    op.map(lambda i: long_work(i + 1)),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))

rx.of(1, 2, 3, 4, 5).pipe(
    op.map(lambda i: i * 2),
    op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))
각 연산 자의 API 를 관찰 한 적 이 있다 면 대부분의 연산 자 는 선택 할 수 있 는 Scheduler 인 자 를 지원 하고 연산 자 에 게 스케줄 러 를 지정 하 는 것 을 발견 할 수 있 습 니 다.연산 자 에 스케줄 러 가 지정 되 어 있 으 면 이 스케줄 러 를 우선 사용 합 니 다.그 다음 에 subscribe 방법 에서 지정 한 스케줄 러 를 사용 합 니 다.위 에서 지정 하지 않 으 면 기본 스케줄 러 를 사용 합 니 다.
2.응용 장면
자,Reactive X 에 대한 지식 을 소개 하고 Reactive X 를 어떻게 사용 하 는 지 알 아 보 겠 습 니 다.많은 응용 장면 에서 Reactive X 를 이용 하여 추상 적 인 데이터 처 리 를 하고 개념 을 단순화 할 수 있다.
2.1 중복 발송 방지
많은 상황 에서 우 리 는 사건 의 발생 간격 을 통제 해 야 한다.예 를 들 어 버튼 이 여러 번 눌 렀 는데 첫 번 째 버튼 이 효력 이 발생 하 기 를 바 랄 뿐이다.이 경우 debounce 연산 자 를 사용 할 수 있 습 니 다.Observable 을 걸 러 내 고 지정 한 시간 간격 보다 작은 데 이 터 를 걸 러 냅 니 다.debounce 조작 부 호 는 간격 이 지나 야 마지막 데 이 터 를 발사 할 수 있 습 니 다.뒤의 데 이 터 를 걸 러 내 려 면 첫 번 째 데 이 터 를 보 내 려 면 throttle 을 사용 해 야 합 니 다.first 연산 자.
아래 코드 는 이 조작 부 호 를 비교적 잘 보 여 주 며 리 턴 키 를 빠르게 눌 러 데 이 터 를 보 낼 수 있 습 니 다.버튼 과 데이터 표시 간 의 관 계 를 주의 깊 게 관찰 하고 throttlefirst 연산 자 를 debounce 연산 자로 바 꾼 다음 에 출력 이 어떻게 변 하 는 지,pipe 의 연산 자 를 완전히 설명 하고 출력 이 어떻게 변 하 는 지 볼 수 있 습 니 다.

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

# debounce   ,             

ob = Subject()
ob.pipe(
    op.throttle_first(3)
    # op.debounce(3)
).subscribe(
    on_next=lambda i: print(i),
    on_completed=lambda: print('Completed')
)

print('press enter to print, press other key to exit')
while True:
    s = input()
    if s == '':
        ob.on_next(datetime.datetime.now().time())
    else:
        ob.on_completed()
        break
2.2 조작 데이터 흐름
만약 일부 데 이 터 를 조작 해 야 한다 면,마찬가지 로 많은 조작 부호 가 수 요 를 만족 시 킬 수 있다.물론 이 부분의 기능 은 Reactive X 만 의 것 이 아니다.자바 8 의 라 이브 러 리 에 대해 알 고 있다 면 이 두 가지 기능 이 거의 똑 같다 는 것 을 알 수 있 을 것 이다.
다음은 간단 한 예 로 두 데이터 원본 을 결합 한 다음 에 그 중의 모든 짝 수 를 찾 아 보 자.

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

#      
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
    op.merge(some_data2),
    op.filter(lambda i: i % 2 == 0),
    # op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))
혹은 reduce 의 간단 한 예 를 이용 하여 1-100 의 정수 와.

import rx
from rx import operators as op
from rx.subject import Subject
import datetime

rx.range(1, 101).pipe(
    op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))
이상 은 Python 응답 식 라 이브 러 리 RxPy 의 상세 한 내용 입 니 다.Python 응답 식 라 이브 러 리 RxPy 에 관 한 자 료 는 다른 관련 글 을 주목 하 십시오!

좋은 웹페이지 즐겨찾기