Quarkus, WebSockets 및 Kafka
본고는 진정한 회화 관리가 없기 때문에 우리는 장래에 그것을 토론할 수 있지만, 이것은 일부 기본 사용자를 관리하고 모든 사용자에게 방송하는 것이 얼마나 쉬운지 확실히 증명한다.
카프카 시작 및 실행
이를 실현하기 위해서는 Kafka의 실례가 시작되고 실행되어야 하기 때문에 우리는 그것을 시작할 것이다.
이것은 본 조항의 요구사항입니다.
KAFKA_HOME
라고 부른다ZooKeeper 시작
zookeeper를 다운로드한 후 디렉터리로 압축을 풀고 Java11이 현재 JDK인지 확인하십시오.
다음은 conf/zoo를 만듭니다.다음 속성이 있는 cfg 파일:
동물원.cfg 회사
tickTime=2000
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=60
서버가 디렉터리에 쓸 수 있는 모든 위치에 데이터Dir를 설정할 수 있습니다.그런 다음 다음 ZooKeeper를 시작할 수 있습니다.$ bin/zkServer.sh start conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED
이어서 우리는 카프카를 설치할 것이다.카프카가 길을 떠났어요.
Kafka를 실행하려면 먼저 Java 11을 JDK로 설정해야 합니다.
다음 시작 카프카:
$ bin/kafka-server.sh start config/server.properties
INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
INFO starting (kafka.server.KafkaServer) [2020-09-08 19:04:53,486] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.version=14.0.2 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
...
INFO Log directory /tmp/kafka-logs not found, creating it. (kafka.log.LogManager)
INFO Loading logs. (kafka.log.LogManager)
INFO Logs loading complete in 10 ms. (kafka.log.LogManager)
INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
INFO [SocketServer brokerId=0] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
메세지가 한 무더기 있을 것이다. 그러나 더 중요한 것은 시작하는 탐지기이다. EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
이것은 우리가 포트9092
의 비안전 연결을 사용하여 Kafka에 연결할 수 있음을 나타낸다테마 만들기
우리는 수동으로 읽을 수 있는 주제를 만들어야 한다.터미널을 열고
KAFKA_HOME
디렉토리로 이동하여 다음 명령을 수행합니다.$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic chat-messages --partitions 1 --replication-factor 1
Created topic chat-messages.
이것은 우리에게 새로운 주제를 만들 것이다 chat-messages
.WebSocket API 업데이트
계속하기 위해서는 Kafka에 연결하기 위해 WebSocket API에 더 많은 의존 항목이 필요합니다.
pom.xml
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.14.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.14.3</version>
<scope>test</scope>
</dependency>
Kafka 연결 구성
다음은 Kafka 서버에 연결할 수 있도록 프로그램을 설정해야 합니다.
src/main/resources/application.properties
를 열고 다음 사항을 변경합니다.quarkus.kafka-streams.application-server=localhost:8011
quarkus.kafka-streams.application-id=${quarkus.application.name}
quarkus.kafka-streams.bootstrap-servers=${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
quarkus.kafka-streams.topics=chat-messages
Kafka 호스트의 경우 KAFKA_HOST
환경 변수와 localhost
환경 변수로 되돌아오는 포트를 정의했습니다.또한 앞에서 만든 기본 테마를 KAFKA_PORT
로 설정합니다.⚠️ NOTE
If you want to run a quick compile from here there are a couple of things we need to add to our test configuration:
src/test/resources/application.속성
콕스.활용단어참조이름 = WebSocket 테스트
콕스.로그카테고리“com.brightfield.streams”.등급 = 전부
콕스.카프카 강.주제 = 채팅 메시지
카프카 소비자 만들기
이를 위해 클래스 업데이트
9092
우선, 모든 연결된 사용자에게 방송하는 방법을 만듭니다.private void broadcast(String message) {
socketSessions.values().forEach(s -> {
s.getAsyncRemote().sendText(message, result -> {
if (result.getException() != null) {
log.error("Unable to send message: {}", result.getException().getMessage(), result.getException());
}
});
});
}
보시다시피, 사용자 이름 인덱스로 만들어진 다른 사용자 세션으로 구성된 맵을 훑어보고 있으며, 모든 사용자에게 텍스트 메시지를 보내는 비동기 원격 서버를 만들고 있습니다.다음은 소비자를 추가하고
chat-messages
클래스에서 다음 코드를 추가합니다.@Produces
public Topology buildTopology() {
log.info("Building the Topology...");
StreamsBuilder builder = new StreamsBuilder();
builder.stream("chat-messages", Consumed.with(Serdes.String(), Serdes.String()))
.peek((id, message) -> {
log.info("Incoming transaction: {}", message);
broadcast(message);
});
return builder.build();
}
여기에서, 우리는 정탐할 흐름을 지정하고, 문자열 키 서열화기와 문자열 값 서열화기를 사용하여 테마에서 메시지를 읽습니다.그런 다음 WebSocket을 통해 연결된 모든 사용자에게 메시지를 기록하고 브로드캐스트합니다.업데이트 단위 테스트
만약 우리가 이 서비스를 구축하려고 시도한다면, Kafka 서버를 실행하지 않으면, 테스트를 실행할 때 문제가 발생할 것입니다.만약 이렇게 한다면, 당신은 단원 테스트가 끊긴 것을 발견할 수 있습니다. 왜냐하면 테스트 중에 닫는 과정이 없기 때문입니다.이것이 바로 SocketEndpoint
작용을 발휘하는 곳이다.
이전 글에서 작성한 단원 테스트에서 Kafka 서버에 새로운 생명주기를 사용할 수 있도록 증강할 것입니다.
우선, 우리는 테스트 kafka 실례를 만들 것이다.
src/test/java/com/brightfield/streams/InfrastructureTestResource.Java 언어
package com.brightfield.streams;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
public class InfrastructureTestResource implements QuarkusTestResourceLifecycleManager {
private final Logger log = LoggerFactory.getLogger(InfrastructureTestResource.class);
private final KafkaContainer kafkaContainer = new KafkaContainer("5.5.1");
@Override
public int order() {
return 1;
}
@Override
public void init(Map<String, String> initArgs) {
log.info("Initialising...");
}
@Override
public Map<String, String> start() {
log.info("Starting kafka test container...");
this.kafkaContainer.start();
log.info("Creating topic...");
createTopics("chat-messages");
return configurationParameters();
}
@Override
public void stop() {
this.kafkaContainer.close();
}
private void createTopics(String... topics) {
var newTopics =
Arrays.stream(topics)
.map(topic -> new NewTopic(topic, 1, (short) 1))
.collect(Collectors.toList());
try (var admin = AdminClient.create(Map.of("bootstrap.servers", getKafkaBrokers()))) {
admin.createTopics(newTopics);
}
}
private String getKafkaBrokers() {
this.kafkaContainer.getFirstMappedPort();
return String.format("%s:%d", kafkaContainer.getContainerIpAddress(), kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT));
}
private Map<String, String> configurationParameters() {
log.info("Returning configurationParameters...");
final Map<String, String> conf = new HashMap<>();
String bootstrapServers = getKafkaBrokers();
log.info("Brokers: {}", bootstrapServers);
conf.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
conf.put("quarkus.kafka-streams.bootstrap-servers", bootstrapServers);
conf.put("mp.messaging.outgoing.delivery.bootstrap.servers", bootstrapServers);
return conf;
}
}
다음으로, 우리는 우리의 테스트에서 이 자원을 사용하기를 희망한다.
src/test/java/com/brightfield/streams/SocketEndpointTest.Java 언어
@QuarkusTest
@QuarkusTestResource(value = InfrastructureTestResource.class)
public class SocketEndpointTest {
...
}
단원 테스트를 컴파일하고 실행할 때, 테스트가 웹socket 테스트를 실행하고 있으며, kafka 용기에 연결된 다음 연결을 끊고 끊지 않는 것을 보셔야 합니다.Infrastructure Test Resource를 생성함으로써 우리는 기본적으로 Kafka 용기의 관리 방식에 생명주기를 추가했다.
package com.brightfield.streams;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
public class InfrastructureTestResource implements QuarkusTestResourceLifecycleManager {
private final Logger log = LoggerFactory.getLogger(InfrastructureTestResource.class);
private final KafkaContainer kafkaContainer = new KafkaContainer("5.5.1");
@Override
public int order() {
return 1;
}
@Override
public void init(Map<String, String> initArgs) {
log.info("Initialising...");
}
@Override
public Map<String, String> start() {
log.info("Starting kafka test container...");
this.kafkaContainer.start();
log.info("Creating topic...");
createTopics("chat-messages");
return configurationParameters();
}
@Override
public void stop() {
this.kafkaContainer.close();
}
private void createTopics(String... topics) {
var newTopics =
Arrays.stream(topics)
.map(topic -> new NewTopic(topic, 1, (short) 1))
.collect(Collectors.toList());
try (var admin = AdminClient.create(Map.of("bootstrap.servers", getKafkaBrokers()))) {
admin.createTopics(newTopics);
}
}
private String getKafkaBrokers() {
this.kafkaContainer.getFirstMappedPort();
return String.format("%s:%d", kafkaContainer.getContainerIpAddress(), kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT));
}
private Map<String, String> configurationParameters() {
log.info("Returning configurationParameters...");
final Map<String, String> conf = new HashMap<>();
String bootstrapServers = getKafkaBrokers();
log.info("Brokers: {}", bootstrapServers);
conf.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
conf.put("quarkus.kafka-streams.bootstrap-servers", bootstrapServers);
conf.put("mp.messaging.outgoing.delivery.bootstrap.servers", bootstrapServers);
return conf;
}
}
@QuarkusTest
@QuarkusTestResource(value = InfrastructureTestResource.class)
public class SocketEndpointTest {
...
}
SocketEndpoint
방법을 호출한다.우리의 장면에서, 우리가 방금 취소한 testcontainers
방법은 이미 호출되었다.init()
방법을 호출합니다. 이 방법은 우리가 사용할 init()
테마를 만들고 카프카 용기의 설정을 되돌려줍니다.start()
방법으로 Kafka 용기를 정리하고 닫습니다.⚠️
We won't be implementing full testing of the Kafka integration in this article, what we have for now will be sufficient to test from the command line.
우리의 창고를 운행하다
이제 모든 것이 다 준비되어야 한다.서비스와angular 클라이언트 응용 프로그램을 시작하여 효과가 있는지 봅시다!
웹 인터페이스를 통해 테스트 메시지를 보내는 기능은 이전과 같아야 한다.
우리의 방송 능력을 시험하기 위해서, 우리는 명령행으로 돌아가서 그곳에서 소식을 발표할 것이다.
터미널 창의 testcontainer
디렉토리에 액세스하여 다음을 입력합니다.
$ bin/kafka-console-producer.sh --broker-list=localhost:9092 --topic chat-messages
>Below
>People
사용자 인터페이스에서 같은 값을 사용하여 업데이트하는 것을 보십시오.
결론
이를 바탕으로 간단하고 전면적인 메시지 전달 도구를 개발할 수 있다.사용자와 그 상태, 심지어 그룹 메시지를 표시합니다.
미래의 글에서 우리는 카프카 구성 요소를 어떻게 테스트하는지 탐색하는 동시에 즐겁게 이야기할 것이다!
Reference
이 문제에 관하여(Quarkus, WebSockets 및 Kafka), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/anthonyikeda/quarkus-websockets-and-kafka-5e6b
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
$ bin/kafka-console-producer.sh --broker-list=localhost:9092 --topic chat-messages
>Below
>People
이를 바탕으로 간단하고 전면적인 메시지 전달 도구를 개발할 수 있다.사용자와 그 상태, 심지어 그룹 메시지를 표시합니다.
미래의 글에서 우리는 카프카 구성 요소를 어떻게 테스트하는지 탐색하는 동시에 즐겁게 이야기할 것이다!
Reference
이 문제에 관하여(Quarkus, WebSockets 및 Kafka), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/anthonyikeda/quarkus-websockets-and-kafka-5e6b텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)