리액티브 프로그래밍 실행 - 파트 1
10684 단어 socketiotypescriptrxjskafka
리액티브 프로그래밍 실행 - 파트 1
이 게시물은 Funnel이라는 reactive programming의 중앙 소프트웨어 구성 요소 중 하나에서 DataBeacon이 어떻게 사용되는지 보여줍니다. 이 게시물은 rubber duck debugging method 에서 영감을 받았습니다.
기본 사항을 다루는 대신 훨씬 더 나은 리소스가 있으며 일부 아키텍처 결정에 대한 실제 예제와 설명이 포함된 프로덕션 준비 코드에 중점을 둘 것입니다.
Code snippets have been adapted to this blog post specifically, it is not a 1:1 copy of production code and some implementation details have hidden.
소개
Funnel 구성 요소는 Kafka 주제와 SPA 클라이언트 사이에 있습니다. 클라이언트 연결을 조정하고 Kafka 입력 스트림을 각 클라이언트 상태 및 기본 설정에 맞게 조정된 웹 소켓 연결(Socket.IO)로 변환합니다.
Kafka 주제 -> 퍼널 -> 클라이언트
Funnel은 TypeScript으로 작성되고 Node으로 번역되지만 현재 Go으로 마이그레이션되고 있습니다. 이 게시물은 노드 구현에 중점을 두지만 향후 게시물에서 Go로의 이론적 근거와 마이그레이션을 다룰 수 있습니다.
유입경로를 자세히 살펴보겠습니다.
클라이언트 설정
환경 세부 정보를 설정한 후 기본 코드는 다음과 같이 시작됩니다.
const connection$ = await socketIOServer();
여기서 우리는 rxjs-socket.io을 사용하여 connection$
유형의 fromSocketIO
관찰 가능 항목을 만듭니다. 각각의 새로운 http 요청에 대해 connection$
는 구독자에게 Connector
유형의 개체를 알립니다.
interface Connector<L extends EventsMap, S extends EventsMap> {
from: <Ev extends EventNames<L>>(eventName: Ev) => Observable<EventParam<L, Ev>>;
to: <Ev extends EventNames<S>>(eventName: Ev) => Observer<Parameters<S[Ev]>>;
id: string;
user?: string;
onDisconnect: (callback: () => void) => void;
}
처음 두 가지 방법에 유의하십시오. 둘 다 공장입니다.
Code snippets have been adapted to this blog post specifically, it is not a 1:1 copy of production code and some implementation details have hidden.
const connection$ = await socketIOServer();
interface Connector<L extends EventsMap, S extends EventsMap> {
from: <Ev extends EventNames<L>>(eventName: Ev) => Observable<EventParam<L, Ev>>;
to: <Ev extends EventNames<S>>(eventName: Ev) => Observer<Parameters<S[Ev]>>;
id: string;
user?: string;
onDisconnect: (callback: () => void) => void;
}
from
는 이벤트 이름을 매개변수로 사용하고 receive에서 Observable
를 생성합니다. to
는 이벤트 이름을 매개변수로 사용하여 Observer
~ emit을 생성합니다. 이를 통해 클라이언트 상태를 원격으로 관리하는 데 사용할 수 있는
from('action').subscribe(to('reducer'))
와 같은 것을 허용합니다.매개변수
id
및 user
는 자체 설명적이며 onDisconnect
는 클라이언트 연결 해제 시 실행될 콜백을 등록합니다.후드 아래에서 Socket.IO 서버는 auth0-socketio 미들웨어로 보호되어 Auth0 ID 공급자와의 인증을 관리합니다.
이 연결 개체는 연결 활동을 모니터링하는 데 사용할 수 있습니다.
connection$.subscribe(({ id, user, onDisconnect }) => {
log(`Connected ${user} through session ${id}`);
onDisconnect(() => log(`Disconnected ${user} from session ${id}`));
});
이 인터페이스는
client
의 관찰 가능 항목을 생성하기 위해 함수client$
에서 사용됩니다.const client$ = connection$.pipe(map((connector) => client(connector)));
각각
client
에는 React+Redux를 사용하여 구현된 클라이언트 상태로 업데이트되는 관찰 가능 항목state$
이 있습니다.client$.subscribe(({ state$ }) => state$.subscribe((state) => log(util.inspect(state, { depth: 4 }))));
다음으로 필요한 것은 데이터 소스를 클라이언트에 연결하는 것입니다. 다행히
state$
외에도 각 클라이언트는 attachDataSource
및 removeDataSource
를 구현합니다.한 번에 하나의 소스만 연결할 수 있으며,
attachDataSource
관찰 가능 항목이 필요하고 removeDataSource
는 소스 업데이트에서 클라이언트 구독을 취소하는 기능일 뿐입니다.이것이 지금 필요한 전부입니다. 향후 게시물에서
client
세대에 대해 설명하겠습니다. 이제 데이터 소스를 설정해 보겠습니다.소스에서 데이터 가져오기
Kafka 주제를 관찰 가능 항목으로 변환하기 위해 rxkfk 라이브러리를 사용합니다. 연결 세부 정보는
kafkaConnector
내부에 숨겨져 있지만 주어진 주제의 메시지와 함께 형식화된 관찰 가능 항목을 반환합니다.const msg$ = await kafkaConnector();
간단한 구독으로 입력 데이터를 모니터링할 수 있습니다.
msg$.subscribe(({ flights }) => log(`Got a new msg containing ${flights.length} flights`));
마지막으로 각 클라이언트는 개별적으로 구독해야 합니다. 모든 클라이언트를 동일한 소스에 연결하려는 경우 관찰자를
client$
에 연결하여 각 개인client
과 관찰 가능 항목msg$
간의 링크를 설정하면 됩니다.client$.subscribe((client) => client.attachDataSource(msg$));
데이터 소스를 첨부하는 것 외에도 클라이언트는
client.removeDataSource()
메서드를 호출하여 구독을 취소할 수 있습니다. 이를 통해 클라이언트는 데이터 소스를 동적으로 변경할 수 있습니다.다음에 온다
지금까지 우리는 코드의 기본 구조를 다루었습니다. 클라이언트와 서버 측에 대해 두 개의 관찰 가능 항목을 만들고 프로그래밍 방식으로 ✨ 둘 다 연결했습니다.
다음 장에서는 공백을 메우고 클라이언트와 데이터 소스가 연결되는 방법,
clients
에서 connections
를 생성하는 방법 및 projection and combination operator을 사용하여 데이터 소스를 필터링하는 방법을 설명합니다.
Reference
이 문제에 관하여(리액티브 프로그래밍 실행 - 파트 1), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/typesamuel/reactive-programming-in-action-part-1-h75텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)