Python 응답 라 이브 러 리 RxPy
리 액 티 브 X 에는 몇 가지 핵심 개념 이 있 는데,먼저 간단하게 소개 해 드 리 겠 습 니 다.
1.1.Observable 과 Observer(관찰 대상 과 관찰자)
우선 Observable 과 Observer 는 각각 관찰 대상 과 관찰자 이다.Observable 은 비동기 데이터 원본 으로 이해 할 수 있 으 며 일련의 값 을 보 냅 니 다.Observer 는 소비자 와 유사 하 며 Observable 을 먼저 구독 한 다음 에 발사 값 을 받 을 수 있 습 니 다.이 개념 은 디자인 모델 중의 관찰자 모델 과 생산자-소비자 모델 의 종합 체 라 고 할 수 있다.
1.2、Operator(연산 자)
또 하나의 매우 중요 한 개념 은 바로 조작 부호 이다.조작 부 호 는 Observable 의 데이터 흐름 에 작용 하여 다양한 조작 을 할 수 있다.더 중요 한 것 은 조작 부 호 는 체인 으로 조합 할 수 있다 는 것 이다.이러한 체인 함수 호출 은 데이터 와 조작 을 분리 할 뿐만 아니 라 코드 도 더욱 선명 하 게 읽 을 수 있다.일단 숙련 된 후에 너 는 이런 느낌 을 사랑 하 게 될 것 이다.
1.3.Single(단일 사례)
RxJava 와 그 변체 중 에는 비교적 특수 한 개념 인 Single 이 라 고 하 는데 이것 은 같은 값 만 발사 하 는 Observable 로 말하자면 하나의 예 이다.물론 자바 등 언어 에 익숙 하 다 면 한 가지 예 로 도 잘 알 고 있 을 것 이다.
1.4.제목(주체)
주체 라 는 개념 은 매우 특수 하 다.그것 은 Observable 이자 Observer 이다.바로 이 특징 때문에 Subject 는 다른 Observable 을 구독 할 수도 있 고 발사 대상 을 다른 Observer 에 줄 수도 있다.어떤 장면 에 서 는 Subject 가 큰 역할 을 한다.
1.5.스케줄 러(스케줄 러)
기본적으로 Reactive X 는 현재 스 레 드 에서 만 실행 되 지만 필요 하 다 면 스케줄 러 로 Reactive X 를 다 중 스 레 드 환경 에서 실행 할 수도 있 습 니 다.많은 스케줄 러 와 대응 하 는 조작 부호 가 있어 다 중 스 레 드 장면 에서 의 각종 요 구 를 처리 할 수 있다.
1.6 Observer 와 Observable
먼저 가장 간단 한 예 를 들 어 실행 결 과 는 순서대로 이 숫자 들 을 인쇄 할 것 이다.이 곳 의 of 는 주어진 매개 변수 에 따라 새로운 Observable 을 만 들 수 있 는 연산 자 입 니 다.생 성 후 Observable 을 구독 할 수 있 습 니 다.세 가지 리 셋 방법 은 해당 하 는 시기 에 실 행 됩 니 다.Observer 가 Observable 을 구독 하면 후속 Observable 이 발사 하 는 각 값 을 받 습 니 다.
from rx import of
ob = of(1, 2, 34, 5, 6, 7, 7)
ob.subscribe(
on_next=lambda i: print(f'Received: {i}'),
on_error=lambda e: print(f'Error: {e}'),
on_completed=lambda: print('Completed')
)
이 예 는 보기 에는 매우 간단 하고 쓸모 가 없어 보인다.그러나 Rx 의 핵심 개념 을 알 게 되면 이것 이 얼마나 강력 한 도구 인지 알 게 될 것 이다.더 중요 한 것 은 Observable 이 데 이 터 를 만 들 고 구독 하 는 과정 이 비동기 적 이라는 점 이다.익숙 하 다 면 이 특성 을 이용 해 많은 일 을 할 수 있다.1.7 연산 자
RxPy 에서 또 다른 중요 한 개념 은 바로 조작 부호 이다.심지어 조작 부호 가 가장 중요 한 개념 이 라 고 할 수 있다.거의 모든 기능 은 각 조작 부 호 를 조합 하여 실현 할 수 있다.조작 부 호 를 익히 는 것 이 RxPy 를 잘 배 우 는 관건 이다.연산 자 간 에 도 pipe 함수 로 연결 하여 복잡 한 조작 체인 을 구성 할 수 있다.
from rx import of, operators as op
import rx
ob = of(1, 2, 34, 5, 6, 7, 7)
ob.pipe(
op.map(lambda i: i ** 2),
op.filter(lambda i: i >= 10)
).subscribe(lambda i: print(f'Received: {i}'))
RxPy 에는 다양한 기능 을 수행 할 수 있 는 대량의 조작 부호 가 있다.우 리 는 그 중에서 자주 사용 하 는 조작 부 호 를 간단하게 보 자.자바 8 의 스 트림 라 이브 러 리 나 다른 함수 식 프로 그래 밍 라 이브 러 리 에 익숙 하 다 면 이 연산 자 들 에 게 친절 해 야 합 니 다.1.8,창설 형 조작 부호
우선 Observable 을 만 드 는 조작 자 입 니 다.자주 사용 하 는 생 성 형 조작 자 를 열거 하 였 습 니 다.
1.9.여과 형 연산 자
필터 형 조작 부호 의 주요 역할 은 Observable 을 선별 하고 여과 하 는 것 이다.
1.10,변환 형 조작 부호
1.11.산술 연산 자
1.12、Subject
Subject 는 Observer 이자 Observable 인 특별한 대상 입 니 다.그러나 이 대상 은 일반적으로 자주 사용 되 지 않 지만,만약 어떤 용도 가 여전히 매우 유용 하 다 면.그 러 니까 소개 좀 해 줘 야 겠 다.아래 코드 는 구독 할 때 첫 번 째 값 이 발사 되 었 기 때문에 구독 후에 발 사 된 값 만 인쇄 합 니 다.
from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject
# Subject Observer Observable
print('--------Subject---------')
subject = Subject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4
그리고 몇 가지 특별한 Subject 가 있 습 니 다.소개 해 드 리 겠 습 니 다.1.13、ReplaySubject
Replay Subject 는 발 사 된 모든 값 을 기록 하 는 특수 한 Subject 입 니 다.언제든지 구독 할 수 있 습 니 다.그래서 캐 시 로 사용 할 수 있 습 니 다.ReplaySubject 는 bufferSize 인 자 를 받 아들 일 수 있 습 니 다.캐 시 할 수 있 는 최근 데이터 수 를 지정 합 니 다.기본적으로 모든 것 입 니 다.
아래 코드 는 위의 코드 와 거의 같 지만 ReplaySubject 를 사 용 했 기 때문에 모든 값 이 인쇄 됩 니 다.물론 구독 문 구 를 다른 위치 에 두 고 출력 에 변화 가 있 는 지 살 펴 볼 수도 있다.
# ReplaySubject ,
print('--------ReplaySubject---------')
subject = ReplaySubject()
subject.on_next(1)
subject.subscribe(lambda i: print(i))
subject.on_next(2)
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 1 2 3 4
1.14、BehaviorSubjectBehavior Subject 는 최근 에 발 사 된 값 만 기록 하 는 특수 한 Subject 입 니 다.그리고 이 를 만 들 때 초기 값 을 지정 해 야 합 니 다.모든 구독 대상 이 이 초기 값 을 받 을 수 있 습 니 다.물론 구독 이 늦 어 지면 이 초기 값 도 뒤에서 발 사 된 값 으로 덮어 씌 울 수 있다 는 점 에 주의해 야 한다.
# BehaviorSubject , Observable
print('--------BehaviorSubject---------')
subject = BehaviorSubject(0)
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 2 3 4
1.15、AsyncSubjectAsyncSubject 는 특수 한 Subject 입 니 다.말 그대로 비동기 Subject 입 니 다.Observer 가 완 료 될 때 만 데 이 터 를 발사 하고 마지막 데이터 만 발사 합 니 다.따라서 아래 코드 는 출력 4.마지막 줄 을 주석 하면 cocompleted 호출,아무것도 출력 하지 않 습 니 다.
# AsyncSubject , Observable
print('--------AsyncSubject---------')
subject = AsyncSubject()
subject.on_next(1)
subject.on_next(2)
subject.subscribe(lambda i: print(i))
subject.on_next(3)
subject.on_next(4)
subject.on_completed()
# 4
1.16、SchedulerRxPy 는 비동기 적 인 프레임 워 크 라 고 할 수 있 지만 기본적으로 하나의 스 레 드 위 에서 실 행 됩 니 다.따라서 스 레 드 실행 을 방해 할 수 있 는 동작 을 사용 하면 프로그램 이 끊 깁 니 다.물론 이런 상황 에 대해 우 리 는 다른 Scheduler 를 사용 하여 임 무 를 배정 하여 프로그램 이 효율 적 으로 운행 할 수 있 도록 할 수 있다.
다음 예 는 스 레 드 풀 스케줄 러 를 기반 으로 한 Thread PoolScheduler 를 만 들 었 습 니 다.두 Observable 용 subscribeon 방법 은 스케줄 러 를 지정 하기 때문에 서로 다른 스 레 드 로 작업 합 니 다.
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as op
import multiprocessing
import time
import threading
import random
def long_work(value):
time.sleep(random.randint(5, 20) / 10)
return value
pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count())
rx.range(5).pipe(
op.map(lambda i: long_work(i + 1)),
op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}'))
rx.of(1, 2, 3, 4, 5).pipe(
op.map(lambda i: i * 2),
op.subscribe_on(pool_schedular)
).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))
각 연산 자의 API 를 관찰 한 적 이 있다 면 대부분의 연산 자 는 선택 할 수 있 는 Scheduler 인 자 를 지원 하고 연산 자 에 게 스케줄 러 를 지정 하 는 것 을 발견 할 수 있 습 니 다.연산 자 에 스케줄 러 가 지정 되 어 있 으 면 이 스케줄 러 를 우선 사용 합 니 다.그 다음 에 subscribe 방법 에서 지정 한 스케줄 러 를 사용 합 니 다.위 에서 지정 하지 않 으 면 기본 스케줄 러 를 사용 합 니 다.2.응용 장면
자,Reactive X 에 대한 지식 을 소개 하고 Reactive X 를 어떻게 사용 하 는 지 알 아 보 겠 습 니 다.많은 응용 장면 에서 Reactive X 를 이용 하여 추상 적 인 데이터 처 리 를 하고 개념 을 단순화 할 수 있다.
2.1 중복 발송 방지
많은 상황 에서 우 리 는 사건 의 발생 간격 을 통제 해 야 한다.예 를 들 어 버튼 이 여러 번 눌 렀 는데 첫 번 째 버튼 이 효력 이 발생 하 기 를 바 랄 뿐이다.이 경우 debounce 연산 자 를 사용 할 수 있 습 니 다.Observable 을 걸 러 내 고 지정 한 시간 간격 보다 작은 데 이 터 를 걸 러 냅 니 다.debounce 조작 부 호 는 간격 이 지나 야 마지막 데 이 터 를 발사 할 수 있 습 니 다.뒤의 데 이 터 를 걸 러 내 려 면 첫 번 째 데 이 터 를 보 내 려 면 throttle 을 사용 해 야 합 니 다.first 연산 자.
아래 코드 는 이 조작 부 호 를 비교적 잘 보 여 주 며 리 턴 키 를 빠르게 눌 러 데 이 터 를 보 낼 수 있 습 니 다.버튼 과 데이터 표시 간 의 관 계 를 주의 깊 게 관찰 하고 throttlefirst 연산 자 를 debounce 연산 자로 바 꾼 다음 에 출력 이 어떻게 변 하 는 지,pipe 의 연산 자 를 완전히 설명 하고 출력 이 어떻게 변 하 는 지 볼 수 있 습 니 다.
import rx
from rx import operators as op
from rx.subject import Subject
import datetime
# debounce ,
ob = Subject()
ob.pipe(
op.throttle_first(3)
# op.debounce(3)
).subscribe(
on_next=lambda i: print(i),
on_completed=lambda: print('Completed')
)
print('press enter to print, press other key to exit')
while True:
s = input()
if s == '':
ob.on_next(datetime.datetime.now().time())
else:
ob.on_completed()
break
2.2 조작 데이터 흐름만약 일부 데 이 터 를 조작 해 야 한다 면,마찬가지 로 많은 조작 부호 가 수 요 를 만족 시 킬 수 있다.물론 이 부분의 기능 은 Reactive X 만 의 것 이 아니다.자바 8 의 라 이브 러 리 에 대해 알 고 있다 면 이 두 가지 기능 이 거의 똑 같다 는 것 을 알 수 있 을 것 이다.
다음은 간단 한 예 로 두 데이터 원본 을 결합 한 다음 에 그 중의 모든 짝 수 를 찾 아 보 자.
import rx
from rx import operators as op
from rx.subject import Subject
import datetime
#
some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8)
some_data2 = rx.from_iterable(range(10, 20))
some_data.pipe(
op.merge(some_data2),
op.filter(lambda i: i % 2 == 0),
# op.map(lambda i: i * 2)
).subscribe(lambda i: print(i))
혹은 reduce 의 간단 한 예 를 이용 하여 1-100 의 정수 와.
import rx
from rx import operators as op
from rx.subject import Subject
import datetime
rx.range(1, 101).pipe(
op.reduce(lambda acc, i: acc + i, 0)
).subscribe(lambda i: print(i))
이상 은 Python 응답 식 라 이브 러 리 RxPy 의 상세 한 내용 입 니 다.Python 응답 식 라 이브 러 리 RxPy 에 관 한 자 료 는 다른 관련 글 을 주목 하 십시오!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Python의 None과 NULL의 차이점 상세 정보그래서 대상 = 속성 + 방법 (사실 방법도 하나의 속성, 데이터 속성과 구별되는 호출 가능한 속성 같은 속성과 방법을 가진 대상을 클래스, 즉 Classl로 분류할 수 있다.클래스는 하나의 청사진과 같아서 하나의 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.