Quarkus 및 Angular 10이 있는 가장 빠른 WebSocket
46563 단어 angularquarkuswebdevwebsockets
우리가 시작하기 전에 본고에서 사용한 원본 코드는 여기서 찾을 수 있다.
https://github.com/cloudy-engineering/quarkus-chat-api
https://github.com/cloudy-engineering/angular-chat-ui
Quarkus WebSockets의 간단한 시작:
$ mvn io.quarkus:quarkus-maven-plugin:1.7.0.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=websockets-quickstart \
-Dextensions="undertow-websockets"
$ cd websockets-quickstart
이것은 전형적인 마븐 기반 소스 코드 구조를 만들고 다음과 같은 하이라이트를 가진다.quarkus-undertow-websockets
의존 항목 추가quarkus-resteasy
종속성 기본 추가onOpen
, onError
, onMessage
, onClose
기능 구현
Quarkus WebSockets는 인터페이스나 확장 기본 클래스가 아닌 주기에 대한 설명을 사용합니다.
@ServerEndpoint("/chat/{username}")
public class SocketEndpoint {
private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
}
@OnError
public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
log.error("{} encountered an error", username);
}
@OnMessage
public void onMessage(String message, @PathParam("username") String username) {
log.debug("{} has just sent us a message: {}", username, message);
}
@OnClose
public void onClose(Session session, @PathParam("username") String username) {
log.debug("{} has now disconnected", username);
}
}
서버 측 구성 요소가 기억해야 할 것은 Session
이다.이것은 최종 사용자와 통신하는 방식입니다.이 문서에서는 AsyncRemote를 사용하여 사용자에게 객체를 전송합니다.
@ServerEndpoint("/chat/{username}")
@ApplicationScoped
public class SocketEndpoint {
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
}
...
}
프런트엔드 연결을 통과하면 onOpen
방법이 실례화됩니다.여기에서 사용자의 상호작용을 설정하고 모든 조작을 확인하는 메시지를 보낼 수 있습니다.여기서 우리는 단지 하나의 회답만 보낼 것이다.
로그 및 CORS
계속하기 전에, 디버그 메시지를 볼 수 있도록 로그 기록을 설정할 것입니다.src/main/resources/application.properties
파일에 다음 항목을 추가합니다.
quarkus.log.category."com.brightfield.streams".level=ALL
또한 CORS를 활성화해야 하므로 다음이 필요합니다.
quarkus.http.cors.enabled=true
quarkus.http.cors.origins=http://localhost:4200
quarkus.http.cors.methods=get,post,put,head,options
이렇게 많은 응용 프로그램이 포트8080
에서 실행되기 때문에 포트를 8011
로 변경합니다.
quarkus.http.port=8011
이 점을 테스트하기 위해 단원 테스트를 만듭니다.
서버 노드 테스트.Java 언어
package com.brightfield.streams;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.websocket.*;
import java.net.URI;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
@QuarkusTest
public class SocketEndpointTest {
private static final LinkedBlockingDeque<String> MESSAGES = new LinkedBlockingDeque<>();
@TestHTTPResource("/chat/testuser")
URI uri;
@Test
public void testWebSocketChat() throws Exception {
try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) {
Assertions.assertEquals("Connecting to central control...", MESSAGES.poll(10, TimeUnit.SECONDS));
Assertions.assertEquals("Welcome to the show testuser", MESSAGES.poll(10, TimeUnit.SECONDS));
}
}
@ClientEndpoint
public static class Client {
private final Logger log = LoggerFactory.getLogger(Client.class);
@OnOpen
public void open(final Session session) {
log.debug("Connecting to server");
String toSend = "Connecting to central control...";
session.getAsyncRemote().sendText(toSend);
}
@OnMessage
void message(final String message) {
log.debug("Incoming message: {}", message);
MESSAGES.add(message);
}
}
}
그럼, 우리는 여기서 무엇을 했습니까?
우선, 파이프를 통해 전달되는 메시지를 저장하기 위해 대기열을 설정합니다.클라이언트가 메시지를 보내거나 수신할 때, 우리는 메시지가 도착하는 순서를 검증하기 위해 줄을 서기를 희망합니다.
이 경우 첫 번째 메시지는 Client.class
처음 연결할 때 보내는 메시지입니다. "방송 시청을 환영합니다".
클라이언트가 연결될 때, 우리는 첫 번째 메시지를 보낼 것이다. "중앙 제어에 연결..."이것은 우리가 서열에 있는 두 번째 소식이 될 것이다.
코드를 컴파일하고 실행하면 테스트를 디버깅과 함께 통과하는 것을 볼 수 있습니다. 다음과 같습니다.
INFO [io.und.web.jsr] (main) UT026004: Adding annotated client endpoint class com.brightfield.streams.SocketEndpointTest$Client
INFO [io.und.web.jsr] (main) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO [io.quarkus] (main) Quarkus 1.7.2.Final on JVM started in 1.791s. Listening on: http://0.0.0.0:8081
INFO [io.quarkus] (main) Profile test activated.
INFO [io.quarkus] (main) Installed features: [cdi, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpointTest$Client] (main) Connecting to server
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just sent us a message: Connecting to central control...
DEBUG [com.bri.str.SocketEndpointTest$Client] (nioEventLoopGroup-2-1) Incoming message: Welcome to the show testuser
만약 우리가 사건의 순서를 고려한다면:
@ServerEndpoint("/chat/{username}")
public class SocketEndpoint {
private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
}
@OnError
public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
log.error("{} encountered an error", username);
}
@OnMessage
public void onMessage(String message, @PathParam("username") String username) {
log.debug("{} has just sent us a message: {}", username, message);
}
@OnClose
public void onClose(Session session, @PathParam("username") String username) {
log.debug("{} has now disconnected", username);
}
}
@ServerEndpoint("/chat/{username}")
@ApplicationScoped
public class SocketEndpoint {
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
}
...
}
quarkus.log.category."com.brightfield.streams".level=ALL
quarkus.http.cors.enabled=true
quarkus.http.cors.origins=http://localhost:4200
quarkus.http.cors.methods=get,post,put,head,options
quarkus.http.port=8011
package com.brightfield.streams;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.websocket.*;
import java.net.URI;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
@QuarkusTest
public class SocketEndpointTest {
private static final LinkedBlockingDeque<String> MESSAGES = new LinkedBlockingDeque<>();
@TestHTTPResource("/chat/testuser")
URI uri;
@Test
public void testWebSocketChat() throws Exception {
try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) {
Assertions.assertEquals("Connecting to central control...", MESSAGES.poll(10, TimeUnit.SECONDS));
Assertions.assertEquals("Welcome to the show testuser", MESSAGES.poll(10, TimeUnit.SECONDS));
}
}
@ClientEndpoint
public static class Client {
private final Logger log = LoggerFactory.getLogger(Client.class);
@OnOpen
public void open(final Session session) {
log.debug("Connecting to server");
String toSend = "Connecting to central control...";
session.getAsyncRemote().sendText(toSend);
}
@OnMessage
void message(final String message) {
log.debug("Incoming message: {}", message);
MESSAGES.add(message);
}
}
}
INFO [io.und.web.jsr] (main) UT026004: Adding annotated client endpoint class com.brightfield.streams.SocketEndpointTest$Client
INFO [io.und.web.jsr] (main) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO [io.quarkus] (main) Quarkus 1.7.2.Final on JVM started in 1.791s. Listening on: http://0.0.0.0:8081
INFO [io.quarkus] (main) Profile test activated.
INFO [io.quarkus] (main) Installed features: [cdi, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpointTest$Client] (main) Connecting to server
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just sent us a message: Connecting to central control...
DEBUG [com.bri.str.SocketEndpointTest$Client] (nioEventLoopGroup-2-1) Incoming message: Welcome to the show testuser
UI 만들기
더 좋은 그림을 얻기 위해 Angular 10에서 UI를 만드는 방법을 보여 드리겠습니다.
Angular 응용 프로그램 만들기 시작:
$ ng new chat-ui
? Would you like to add Angular routing? Yes
? Which stylesheet format would you like to use? CSS
...
Installing packages...
✔ Packages installed successfully.
Successfully initialized git.
다음으로 우리는 app.module.ts
에서 반응 형태의 광맥을 확보해야 한다.
@NgModule({
...
imports: [
BrowserModule,
AppRoutingModule,
BrowserAnimationsModule,
ReactiveFormsModule,
],
});
우리는 두 가지 종류를 만들고 싶다.
$ ng new chat-ui
? Would you like to add Angular routing? Yes
? Which stylesheet format would you like to use? CSS
...
Installing packages...
✔ Packages installed successfully.
Successfully initialized git.
@NgModule({
...
imports: [
BrowserModule,
AppRoutingModule,
BrowserAnimationsModule,
ReactiveFormsModule,
],
});
$ ng g s _services/socket
CREATE src/app/_services/socket.service.spec.ts (357 bytes)
CREATE src/app/_services/socket.service.ts (135 bytes)
$ ng g c chat
CREATE src/app/chat/chat.component.css (0 bytes)
CREATE src/app/chat/chat.component.html (19 bytes)
CREATE src/app/chat/chat.component.spec.ts (612 bytes)
CREATE src/app/chat/chat.component.ts (267 bytes)
UPDATE src/app/app.module.ts (388 bytes)
모범 사례를 위해 Quarkus 애플리케이션 노드에 환경 변수를 먼저 구성합니다.src/environments/environment.송전 시스템
export const environment = {
production: false,
socket_endpoint: 'ws://localhost:8011'
};
서비스 구현
서버 측 구성 요소를 연결하고 상호작용을 하기 위해서, 우리는 내장된 rxjs 클래스를 사용할 것입니다.
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
RXJS 및 WebSocketsRXJS는 ServerEndpoint에 연결하는 것이 ServerEndpoint를 구현하는 것만큼 쉽습니다.
WebSocketSubject
클라이언트와 서버 간의 통신 상태를 나타냅니다.BehaviorSubject
처럼, 우리는 메시지를 전송하고 WebSocketSubject
구독을 통해 응답할 것입니다.webSocket
클래스는 우리 공장이 만든 WebSocketSubject
서버에 대한 연결을 나타냅니다.우리는 URL을 우리의 서비스에 전달할 것이다. URL은 우리에게 전송하고 구독할 수 있도록 되돌아갈 것이다.우리는 생명주기의 세 부분을 실현해야 한다.
import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable } from 'rxjs';
import { environment as env } from '../../environments/environment';
@Injectable({
providedIn: 'root'
})
export class SocketService {
connection$: WebSocketSubject<any>;
constructor() { }
connect(): Observable<any> {
this.connection$ = webSocket({
url: `${env.socket_endpoint}/chat/angularuser`,
deserializer: ({data}) => data,
serializer: ({data}) => data,
});
return this.connection$;
}
...
}
RXJS를 사용하여 WebSocket 연결을 만들 때 기본 정렬화/반정렬화WebSocketSubject
입니다.서버 측 구성 요소에서 일반 테스트를 사용하기 때문에, 데이터를 분석하지 않고 serde를 다시 쓸 것입니다.잠시 후에,connect () 방법을 호출하여 메시지를 보내고 받을 수 있도록 초기 연결을 만드는 방법을 볼 수 있습니다.
메시지를 보내기 위해서는 사용자가 볼 수 있는 모든 정보와 같이 메시지를 줄을 서야 합니다.
import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable } from 'rxjs';
import { environment as env } from '../../environments/environment';
@Injectable({
providedIn: 'root'
})
export class SocketService {
connection$: WebSocketSubject<any>;
constructor() { }
connect(): Observable<any> {
this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
return this.connection$;
}
send(data: any): void {
if (this.connection$) {
this.connection$.next(data);
} else {
console.log('Did not send data, unable to open connection');
}
}
}
JSON.parse
파이프가 열릴 때, 우리는 connection$
방법으로 대상을 서버에 보냅니다.만약 우리가 연결을 잃었다면, 우리는 지금 단지 하나의 소식을 기록할 수 있을 뿐이다.마지막으로 서버와의 연결을 끊으면 연결을 닫고 백엔드 이벤트
next()
를 트리거하는 방법을 원하기 때문에 @OnClose
방법을 실현하고 closeConnection()
이벤트에서 호출합니다.잡담하다서비스송전 시스템
import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable } from 'rxjs';
import { environment as env } from '../../environments/environment';
@Injectable({
providedIn: 'root'
})
export class SocketService {
connection$: WebSocketSubject<any>;
constructor() { }
connect(): Observable<any> {
this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
return this.connection$;
}
send(data: any): void {
if (this.connection$) {
this.connection$.next(data);
} else {
console.log('Did not send data, unable to open connection');
}
}
closeConnection(): void {
if (this.connection$) {
this.connection$.complete();
this.connection$= null;
}
}
ngOnDestroy() {
this.closeConnection();
}
}
각도 컴포넌트 생성하기
보시다시피 우리는 매우 직접적인 실현을 하고 표준을 사용하는 관찰 가능한 모델을 가지고 있습니다.이 서비스를 사용하려면 연결을 시작하고 웹소켓 연결을 통해 데이터를 보내는 구성 요소를 만들어야 합니다.
import { Component, OnInit } from '@angular/core';
import { SocketService } from '../_services/socket.service';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { FormControl } from '@angular/forms';
@Component({
selector: 'app-chat',
templateUrl: './chat.component.html',
styleUrls: ['./chat.component.css']
})
export class ChatComponent implements OnInit {
messages: string[] = [];
msgControl = new FormControl('');
destroyed$ = new Subject();
constructor(private chatService: SocketService) { }
ngOnInit(): void {
const chatSub$ = this.chatService.connect().pipe(
takeUntil(this.destroyed$),
);
chatSub$.subscribe(message => this.messages.push(message));
}
sendMessage(): void {
this.chatService.send(this.msgControl.value);
this.msgControl.setValue('');
}
ngOnDestroy(): void {
this.destroyed$.next();
}
}
잡담하다구성 부분html<ul>
<li *ngFor="let message of messages">{{ message }}</li>
</ul>
<input placeholder="Send a message..." [formControl]="msgControl">
<button (click)="sendMessage()">Send</button>
새 구성 요소에 대한 경로를 빠르게 추가합니다.응용 프로그램 라우팅.단원송전 시스템
import { NgModule } from '@angular/core';
import { Routes, RouterModule } from '@angular/router';
import { ChatComponent } from './chat/chat.component';
const routes: Routes = [
{ path: 'chat', component: ChatComponent }
];
@NgModule({
imports: [RouterModule.forRoot(routes)],
exports: [RouterModule]
})
export class AppRoutingModule { }
구성 요소에서 보듯이, 우리는 onDestroy()
방법을 호출하여 웹 소켓 연결의 생명 주기를 실행할 것입니다.사용자 인터페이스는 되돌아오는 메시지 목록을 포함하는 간단한 폼 컨트롤러입니다.서비스와angular 사용자 인터페이스를 시작합니다. http://localhost:4200/chat 설정된 루트에 접근할 수 있어야 합니다.
페이지에 접근할 때, 우리의 초기 메시지인 "showangularuser에 오신 것을 환영합니다"와 입력 상자를 볼 수 있습니다.
로그를 보면 초기 연결이 표시됩니다.
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
WARN [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
INFO [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
메시지를 입력하고 보내기를 누르면 서버에 기록된 메시지를 볼 수 있습니다.__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
WARN [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
INFO [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just sent us a message: "Good morning"
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-8) angularuser has just connected
지금까지는 괜찮았지만, 우리는 그것이 더욱 상호작용성을 가지기를 바란다.이 문서에서는 사용자가 보낸 내용을 회상합니다.@ServerEndpoint("/chat/{username}")
public class SocketEndpoint {
private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
private Map<String, Session> socketSessions = new HashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
socketSessions.put(username, session);
}
@OnError
public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
log.error("{} encountered an error", username);
}
@OnMessage
public void onMessage(String message, @PathParam("username") String username) {
log.debug("{} has just sent us a message: {}", username, message);
Session session = socketSessions.get(username);
session.getAsyncRemote().sendText(message);
}
public void onClose(Session session, @PathParam("username") String username) {
log.debug("{} has now disconnected", username);
}
}
코드에 대한 업데이트에서 사용자가 연결될 때, 사용자 이름의 SocketService
인덱스에서 Session
인용을 보류합니다.메시지를 받았을 때, 우리는 세션을 찾은 후에 메시지를 돌려보낼 것이다.DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "Glad to be here"
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "What day is it?"
다음 글에서는 Kafka를 웹 소켓 세션에 연결하는 방법을 설명하고 Kafka 대기열에서 온 메시지를 방송할 것입니다.
Reference
이 문제에 관하여(Quarkus 및 Angular 10이 있는 가장 빠른 WebSocket), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/anthonyikeda/quickest-websockets-with-quarkus-and-angular-10-1e2c텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)