Quarkus 및 Angular 10이 있는 가장 빠른 WebSocket

Quarkus는 메시징과 WebSocket을 빠르게 사용할 수 있도록 도와줍니다.하지만 이 두 가지 기술을 결합시키고 싶을 때 어떤 일이 일어날까요?
우리가 시작하기 전에 본고에서 사용한 원본 코드는 여기서 찾을 수 있다.
https://github.com/cloudy-engineering/quarkus-chat-api
https://github.com/cloudy-engineering/angular-chat-ui
Quarkus WebSockets의 간단한 시작:
$ mvn io.quarkus:quarkus-maven-plugin:1.7.0.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=websockets-quickstart \
    -Dextensions="undertow-websockets"
$ cd websockets-quickstart
이것은 전형적인 마븐 기반 소스 코드 구조를 만들고 다음과 같은 하이라이트를 가진다.
  • quarkus-undertow-websockets 의존 항목 추가
  • quarkus-resteasy 종속성 기본 추가
  • 실제로 첫 번째 통합을 시작하고 실행하는 것은 매우 쉽다.
  • WebSocket 끝점을 나타내는 새 클래스 만들기
  • 표준 WebSocket 라이프 사이클 구현 방법(onOpen, onError, onMessage, onClose
  • WebSocket 노드와 통합된 UI 만들기
  • 기능 구현


    Quarkus WebSockets는 인터페이스나 확장 기본 클래스가 아닌 주기에 대한 설명을 사용합니다.
    @ServerEndpoint("/chat/{username}")
    public class SocketEndpoint {
    
        private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
    
        @OnOpen
        public void onOpen(Session session, @PathParam("username") String username) {
            log.debug("{} has just connected", username);
        }
    
        @OnError
        public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
            log.error("{} encountered an error", username);
        }
    
        @OnMessage
        public void onMessage(String message, @PathParam("username") String username) {
            log.debug("{} has just sent us a message: {}", username, message);
        }
    
        @OnClose
        public void onClose(Session session, @PathParam("username") String username) {
            log.debug("{} has now disconnected", username);
        }
    }
    
    서버 측 구성 요소가 기억해야 할 것은 Session 이다.이것은 최종 사용자와 통신하는 방식입니다.이 문서에서는 AsyncRemote를 사용하여 사용자에게 객체를 전송합니다.
    @ServerEndpoint("/chat/{username}")
    @ApplicationScoped
    public class SocketEndpoint {
    
        @OnOpen
        public void onOpen(Session session, @PathParam("username") String username) {
            log.debug("{} has just connected", username);
            session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
        }
    
    ...
    }
    
    프런트엔드 연결을 통과하면 onOpen 방법이 실례화됩니다.여기에서 사용자의 상호작용을 설정하고 모든 조작을 확인하는 메시지를 보낼 수 있습니다.여기서 우리는 단지 하나의 회답만 보낼 것이다.
    로그 및 CORS
    계속하기 전에, 디버그 메시지를 볼 수 있도록 로그 기록을 설정할 것입니다.src/main/resources/application.properties 파일에 다음 항목을 추가합니다.
    quarkus.log.category."com.brightfield.streams".level=ALL
    
    또한 CORS를 활성화해야 하므로 다음이 필요합니다.
    quarkus.http.cors.enabled=true
    quarkus.http.cors.origins=http://localhost:4200
    quarkus.http.cors.methods=get,post,put,head,options
    
    이렇게 많은 응용 프로그램이 포트8080에서 실행되기 때문에 포트를 8011로 변경합니다.
    quarkus.http.port=8011
    
    이 점을 테스트하기 위해 단원 테스트를 만듭니다.
    서버 노드 테스트.Java 언어
    package com.brightfield.streams;
    
    import io.quarkus.test.common.http.TestHTTPResource;
    import io.quarkus.test.junit.QuarkusTest;
    import org.junit.jupiter.api.Assertions;
    import org.junit.jupiter.api.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.websocket.*;
    import java.net.URI;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.TimeUnit;
    
    @QuarkusTest
    public class SocketEndpointTest {
    
        private static final LinkedBlockingDeque<String> MESSAGES = new LinkedBlockingDeque<>();
    
        @TestHTTPResource("/chat/testuser")
        URI uri;
    
        @Test
        public void testWebSocketChat() throws Exception {
            try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) {
                Assertions.assertEquals("Connecting to central control...", MESSAGES.poll(10, TimeUnit.SECONDS));
                Assertions.assertEquals("Welcome to the show testuser", MESSAGES.poll(10, TimeUnit.SECONDS));
            }
        }
    
        @ClientEndpoint
        public static class Client {
            private final Logger log = LoggerFactory.getLogger(Client.class);
    
            @OnOpen
            public void open(final Session session) {
                log.debug("Connecting to server");
                String toSend = "Connecting to central control...";
                session.getAsyncRemote().sendText(toSend);
            }
    
            @OnMessage
            void message(final String message) {
                log.debug("Incoming message: {}", message);
                MESSAGES.add(message);
            }
        }
    }
    
    그럼, 우리는 여기서 무엇을 했습니까?
    우선, 파이프를 통해 전달되는 메시지를 저장하기 위해 대기열을 설정합니다.클라이언트가 메시지를 보내거나 수신할 때, 우리는 메시지가 도착하는 순서를 검증하기 위해 줄을 서기를 희망합니다.
    이 경우 첫 번째 메시지는 Client.class 처음 연결할 때 보내는 메시지입니다. "방송 시청을 환영합니다".
    클라이언트가 연결될 때, 우리는 첫 번째 메시지를 보낼 것이다. "중앙 제어에 연결..."이것은 우리가 서열에 있는 두 번째 소식이 될 것이다.
    코드를 컴파일하고 실행하면 테스트를 디버깅과 함께 통과하는 것을 볼 수 있습니다. 다음과 같습니다.
    INFO  [io.und.web.jsr] (main) UT026004: Adding annotated client endpoint class com.brightfield.streams.SocketEndpointTest$Client
    INFO  [io.und.web.jsr] (main) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
    INFO  [io.quarkus] (main) Quarkus 1.7.2.Final on JVM started in 1.791s. Listening on: http://0.0.0.0:8081
    INFO  [io.quarkus] (main) Profile test activated.
    INFO  [io.quarkus] (main) Installed features: [cdi, resteasy, servlet, undertow-websockets]
    DEBUG [com.bri.str.SocketEndpointTest$Client] (main) Connecting to server
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just connected
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just sent us a message: Connecting to central control...
    DEBUG [com.bri.str.SocketEndpointTest$Client] (nioEventLoopGroup-2-1) Incoming message: Welcome to the show testuser
    
    만약 우리가 사건의 순서를 고려한다면:
  • 서버에 클라이언트 연결
  • 서버에서 "show testuser에 오신 것을 환영합니다"
  • 클라이언트가 서버에 "중앙 제어에 연결..."
  • 클라이언트로부터 "show testuser 사용을 환영합니다"
  • 우리의 테스트는 서버 측의 상호작용과 클라이언트의 상호작용을 추적한다.

    UI 만들기


    더 좋은 그림을 얻기 위해 Angular 10에서 UI를 만드는 방법을 보여 드리겠습니다.
    Angular 응용 프로그램 만들기 시작:
    $ ng new chat-ui
    ? Would you like to add Angular routing? Yes
    ? Which stylesheet format would you like to use? CSS
    ... 
    Installing packages...
    ✔ Packages installed successfully.
        Successfully initialized git.
    
    다음으로 우리는 app.module.ts에서 반응 형태의 광맥을 확보해야 한다.
    @NgModule({
    ...
      imports: [
        BrowserModule,
        AppRoutingModule,
        BrowserAnimationsModule,
        ReactiveFormsModule,
      ],
    
    });
    
    우리는 두 가지 종류를 만들고 싶다.
  • WebSocket 상태를 관리하는 서비스
  • 상호 작용 표시
  • $ ng g s _services/socket
    CREATE src/app/_services/socket.service.spec.ts (357 bytes)
    CREATE src/app/_services/socket.service.ts (135 bytes)
    $ ng g c chat
    CREATE src/app/chat/chat.component.css (0 bytes)
    CREATE src/app/chat/chat.component.html (19 bytes)
    CREATE src/app/chat/chat.component.spec.ts (612 bytes)
    CREATE src/app/chat/chat.component.ts (267 bytes)
    UPDATE src/app/app.module.ts (388 bytes)
    
    모범 사례를 위해 Quarkus 애플리케이션 노드에 환경 변수를 먼저 구성합니다.
    src/environments/environment.송전 시스템
    export const environment = {
      production: false,
      socket_endpoint: 'ws://localhost:8011'
    };
    

    서비스 구현


    서버 측 구성 요소를 연결하고 상호작용을 하기 위해서, 우리는 내장된 rxjs 클래스를 사용할 것입니다.
    import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
    
    RXJS 및 WebSockets
    RXJS는 ServerEndpoint에 연결하는 것이 ServerEndpoint를 구현하는 것만큼 쉽습니다.WebSocketSubject 클라이언트와 서버 간의 통신 상태를 나타냅니다.BehaviorSubject처럼, 우리는 메시지를 전송하고 WebSocketSubject 구독을 통해 응답할 것입니다.webSocket 클래스는 우리 공장이 만든 WebSocketSubject 서버에 대한 연결을 나타냅니다.우리는 URL을 우리의 서비스에 전달할 것이다. URL은 우리에게 전송하고 구독할 수 있도록 되돌아갈 것이다.
    우리는 생명주기의 세 부분을 실현해야 한다.
  • 연결(onOpen)
  • 닫기/제거(onClose)
  • 보내기(onMessage)
  • 잡담하다서비스송전 시스템
    import { Injectable } from '@angular/core';
    import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
    import { Observable } from 'rxjs';
    import { environment as env } from '../../environments/environment';
    
    @Injectable({
      providedIn: 'root'
    })
    export class SocketService {
    
      connection$: WebSocketSubject<any>;
    
      constructor() { }
    
      connect(): Observable<any> {
        this.connection$ = webSocket({
          url: `${env.socket_endpoint}/chat/angularuser`,
          deserializer: ({data}) => data,
          serializer: ({data}) => data,
        });
        return this.connection$;
      }
    ...
    }
    
    RXJS를 사용하여 WebSocket 연결을 만들 때 기본 정렬화/반정렬화WebSocketSubject입니다.서버 측 구성 요소에서 일반 테스트를 사용하기 때문에, 데이터를 분석하지 않고 serde를 다시 쓸 것입니다.
    잠시 후에,connect () 방법을 호출하여 메시지를 보내고 받을 수 있도록 초기 연결을 만드는 방법을 볼 수 있습니다.
    메시지를 보내기 위해서는 사용자가 볼 수 있는 모든 정보와 같이 메시지를 줄을 서야 합니다.
    import { Injectable } from '@angular/core';
    import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
    import { Observable } from 'rxjs';
    import { environment as env } from '../../environments/environment';
    
    @Injectable({
      providedIn: 'root'
    })
    export class SocketService {
    
      connection$: WebSocketSubject<any>;
    
      constructor() { }
    
      connect(): Observable<any> {
        this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
        return this.connection$;
      }
    
      send(data: any): void {
        if (this.connection$) {
          this.connection$.next(data);
        } else {
          console.log('Did not send data, unable to open connection');
        }
      }
    
    }
    
    JSON.parse 파이프가 열릴 때, 우리는 connection$ 방법으로 대상을 서버에 보냅니다.만약 우리가 연결을 잃었다면, 우리는 지금 단지 하나의 소식을 기록할 수 있을 뿐이다.
    마지막으로 서버와의 연결을 끊으면 연결을 닫고 백엔드 이벤트 next() 를 트리거하는 방법을 원하기 때문에 @OnClose 방법을 실현하고 closeConnection() 이벤트에서 호출합니다.
    잡담하다서비스송전 시스템
    import { Injectable } from '@angular/core';
    import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
    import { Observable } from 'rxjs';
    import { environment as env } from '../../environments/environment';
    
    @Injectable({
      providedIn: 'root'
    })
    export class SocketService {
    
      connection$: WebSocketSubject<any>;
    
      constructor() { }
    
      connect(): Observable<any> {
        this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
        return this.connection$;
      }
    
      send(data: any): void {
        if (this.connection$) {
          this.connection$.next(data);
        } else {
          console.log('Did not send data, unable to open connection');
        }
      }
    
      closeConnection(): void {
        if (this.connection$) {
          this.connection$.complete();
          this.connection$= null;
        }
      }
    
      ngOnDestroy() {
        this.closeConnection();
      }
    
    }
    

    각도 컴포넌트 생성하기


    보시다시피 우리는 매우 직접적인 실현을 하고 표준을 사용하는 관찰 가능한 모델을 가지고 있습니다.이 서비스를 사용하려면 연결을 시작하고 웹소켓 연결을 통해 데이터를 보내는 구성 요소를 만들어야 합니다.
    import { Component, OnInit } from '@angular/core';
    import { SocketService } from '../_services/socket.service';
    import { Subject } from 'rxjs';
    import { takeUntil } from 'rxjs/operators';
    import { FormControl } from '@angular/forms';
    
    @Component({
      selector: 'app-chat',
      templateUrl: './chat.component.html',
      styleUrls: ['./chat.component.css']
    })
    export class ChatComponent implements OnInit {
    
      messages: string[] = [];
      msgControl = new FormControl('');
      destroyed$ = new Subject();
    
      constructor(private chatService: SocketService) { }
    
      ngOnInit(): void {
        const chatSub$ = this.chatService.connect().pipe(
          takeUntil(this.destroyed$),
        );
    
        chatSub$.subscribe(message => this.messages.push(message));
      }
    
      sendMessage(): void {
        this.chatService.send(this.msgControl.value);
        this.msgControl.setValue('');
      }
    
      ngOnDestroy(): void {
        this.destroyed$.next();
      }
    
    }
    
    잡담하다구성 부분html
    <ul>
      <li *ngFor="let message of messages">{{ message }}</li>
    </ul>
    <input placeholder="Send a message..." [formControl]="msgControl">
    <button (click)="sendMessage()">Send</button>
    
    새 구성 요소에 대한 경로를 빠르게 추가합니다.
    응용 프로그램 라우팅.단원송전 시스템
    import { NgModule } from '@angular/core';
    import { Routes, RouterModule } from '@angular/router';
    import { ChatComponent } from './chat/chat.component';
    
    const routes: Routes = [
      { path: 'chat', component: ChatComponent }
    ];
    
    @NgModule({
      imports: [RouterModule.forRoot(routes)],
      exports: [RouterModule]
    })
    export class AppRoutingModule { }
    
    구성 요소에서 보듯이, 우리는 onDestroy() 방법을 호출하여 웹 소켓 연결의 생명 주기를 실행할 것입니다.사용자 인터페이스는 되돌아오는 메시지 목록을 포함하는 간단한 폼 컨트롤러입니다.
    서비스와angular 사용자 인터페이스를 시작합니다. http://localhost:4200/chat 설정된 루트에 접근할 수 있어야 합니다.

    페이지에 접근할 때, 우리의 초기 메시지인 "showangularuser에 오신 것을 환영합니다"와 입력 상자를 볼 수 있습니다.
    로그를 보면 초기 연결이 표시됩니다.
    __  ____  __  _____   ___  __ ____  ______
     --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
     -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
    --\___\_\____/_/ |_/_/|_/_/|_|\____/___/
    WARN  [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
    INFO  [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
    INFO  [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
    INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
    INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
    
    메시지를 입력하고 보내기를 누르면 서버에 기록된 메시지를 볼 수 있습니다.
    __  ____  __  _____   ___  __ ____  ______
     --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
     -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
    --\___\_\____/_/ |_/_/|_/_/|_|\____/___/
    WARN  [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
    INFO  [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
    INFO  [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
    INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
    INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just sent us a message: "Good morning"
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-8) angularuser has just connected
    
    지금까지는 괜찮았지만, 우리는 그것이 더욱 상호작용성을 가지기를 바란다.이 문서에서는 사용자가 보낸 내용을 회상합니다.
    @ServerEndpoint("/chat/{username}")
    public class SocketEndpoint {
    
        private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
        private Map<String, Session> socketSessions = new HashMap<>();
    
        @OnOpen
        public void onOpen(Session session, @PathParam("username") String username) {
            log.debug("{} has just connected", username);
            session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
            socketSessions.put(username, session);
        }
    
        @OnError
        public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
            log.error("{} encountered an error", username);
        }
    
        @OnMessage
        public void onMessage(String message, @PathParam("username") String username) {
            log.debug("{} has just sent us a message: {}", username, message);
            Session session = socketSessions.get(username);
            session.getAsyncRemote().sendText(message);
        }
    
        public void onClose(Session session, @PathParam("username") String username) {
            log.debug("{} has now disconnected", username);
        }
    }
    
    코드에 대한 업데이트에서 사용자가 연결될 때, 사용자 이름의 SocketService 인덱스에서 Session 인용을 보류합니다.메시지를 받았을 때, 우리는 세션을 찾은 후에 메시지를 돌려보낼 것이다.

    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just connected
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "Glad to be here"
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "What day is it?"
    
    다음 글에서는 Kafka를 웹 소켓 세션에 연결하는 방법을 설명하고 Kafka 대기열에서 온 메시지를 방송할 것입니다.

    좋은 웹페이지 즐겨찾기