Quarkus, WebSockets 및 Kafka

따라서 Quarkus를 신속하게 시작하고 실행하며 클라이언트에서 서버로 메시지를 전송하는 기본 UI를 만드는 방법을 연구했습니다.본고에서 우리는 이를 다음 단계로 추진하고 Kafka를 하나의 메시지 전달 플랫폼으로 소개하여 주제에서 온 메시지를 사용자 인터페이스로 직접 전달할 것이다.
본고는 진정한 회화 관리가 없기 때문에 우리는 장래에 그것을 토론할 수 있지만, 이것은 일부 기본 사용자를 관리하고 모든 사용자에게 방송하는 것이 얼마나 쉬운지 확실히 증명한다.

카프카 시작 및 실행


이를 실현하기 위해서는 Kafka의 실례가 시작되고 실행되어야 하기 때문에 우리는 그것을 시작할 것이다.
이것은 본 조항의 요구사항입니다.
  • 자바 11
  • 아파치 동물원 관리자
  • 카프카 2.3.0
  • 소스 코드가 여기에 있습니다branch.
  • 우리는 당신이 카프카를 압축 해제한 위치를 KAFKA_HOME 라고 부른다

    ZooKeeper 시작


    zookeeper를 다운로드한 후 디렉터리로 압축을 풀고 Java11이 현재 JDK인지 확인하십시오.
    다음은 conf/zoo를 만듭니다.다음 속성이 있는 cfg 파일:
    동물원.cfg 회사
    tickTime=2000
    dataDir=/tmp/zookeeper
    clientPort=2181
    maxClientCnxns=60
    
    서버가 디렉터리에 쓸 수 있는 모든 위치에 데이터Dir를 설정할 수 있습니다.그런 다음 다음 ZooKeeper를 시작할 수 있습니다.
    $ bin/zkServer.sh start conf/zoo.cfg
    ZooKeeper JMX enabled by default
    Using config: conf/zoo.cfg
    Starting zookeeper ... STARTED
    
    이어서 우리는 카프카를 설치할 것이다.

    카프카가 길을 떠났어요.


    Kafka를 실행하려면 먼저 Java 11을 JDK로 설정해야 합니다.
    다음 시작 카프카:
    $ bin/kafka-server.sh start config/server.properties
    INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
    INFO starting (kafka.server.KafkaServer) [2020-09-08 19:04:53,486] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
    INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
    INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
    INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
    INFO Client environment:java.version=14.0.2 (org.apache.zookeeper.ZooKeeper)
    INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
    ...
    INFO Log directory /tmp/kafka-logs not found, creating it. (kafka.log.LogManager)
    INFO Loading logs. (kafka.log.LogManager)
    INFO Logs loading complete in 10 ms. (kafka.log.LogManager)
    INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
    INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
    INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
    INFO [SocketServer brokerId=0] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
    
    메세지가 한 무더기 있을 것이다. 그러나 더 중요한 것은 시작하는 탐지기이다. EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)이것은 우리가 포트9092의 비안전 연결을 사용하여 Kafka에 연결할 수 있음을 나타낸다

    테마 만들기


    우리는 수동으로 읽을 수 있는 주제를 만들어야 한다.터미널을 열고 KAFKA_HOME 디렉토리로 이동하여 다음 명령을 수행합니다.
    $ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic chat-messages --partitions 1 --replication-factor 1
    Created topic chat-messages.
    
    이것은 우리에게 새로운 주제를 만들 것이다 chat-messages.

    WebSocket API 업데이트


    계속하기 위해서는 Kafka에 연결하기 위해 WebSocket API에 더 많은 의존 항목이 필요합니다.
  • io.quarkus:quarkus kafka 흐름
  • 조직.testcontainers:testcontainers
  • 조직.테스트 용기: 카프카
  • pom을 업데이트합니다.xml 종속성:
    pom.xml
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-kafka-streams</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers</artifactId>
        <version>1.14.3</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>kafka</artifactId>
        <version>1.14.3</version>
        <scope>test</scope>
    </dependency>
    

    Kafka 연결 구성


    다음은 Kafka 서버에 연결할 수 있도록 프로그램을 설정해야 합니다.src/main/resources/application.properties를 열고 다음 사항을 변경합니다.
    quarkus.kafka-streams.application-server=localhost:8011
    quarkus.kafka-streams.application-id=${quarkus.application.name}
    quarkus.kafka-streams.bootstrap-servers=${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
    quarkus.kafka-streams.topics=chat-messages
    
    Kafka 호스트의 경우 KAFKA_HOST 환경 변수와 localhost 환경 변수로 되돌아오는 포트를 정의했습니다.또한 앞에서 만든 기본 테마를 KAFKA_PORT 로 설정합니다.

    ⚠️ NOTE
    If you want to run a quick compile from here there are a couple of things we need to add to our test configuration:


    src/test/resources/application.속성
    콕스.활용단어참조이름 = WebSocket 테스트
    콕스.로그카테고리“com.brightfield.streams”.등급 = 전부
    콕스.카프카 강.주제 = 채팅 메시지

    카프카 소비자 만들기


    이를 위해 클래스 업데이트 9092우선, 모든 연결된 사용자에게 방송하는 방법을 만듭니다.
    private void broadcast(String message) {
        socketSessions.values().forEach(s -> {
            s.getAsyncRemote().sendText(message, result -> {
                if (result.getException() != null) {
                    log.error("Unable to send message: {}", result.getException().getMessage(), result.getException());
                }
            });
        });
    }
    
    보시다시피, 사용자 이름 인덱스로 만들어진 다른 사용자 세션으로 구성된 맵을 훑어보고 있으며, 모든 사용자에게 텍스트 메시지를 보내는 비동기 원격 서버를 만들고 있습니다.
    다음은 소비자를 추가하고 chat-messages 클래스에서 다음 코드를 추가합니다.
    @Produces
    public Topology buildTopology() {
        log.info("Building the Topology...");
        StreamsBuilder builder = new StreamsBuilder();
    
        builder.stream("chat-messages", Consumed.with(Serdes.String(), Serdes.String()))
            .peek((id, message) -> {
                log.info("Incoming transaction: {}", message);
                broadcast(message);
            });
        return builder.build();
    }
    
    여기에서, 우리는 정탐할 흐름을 지정하고, 문자열 키 서열화기와 문자열 값 서열화기를 사용하여 테마에서 메시지를 읽습니다.그런 다음 WebSocket을 통해 연결된 모든 사용자에게 메시지를 기록하고 브로드캐스트합니다.

    업데이트 단위 테스트


    만약 우리가 이 서비스를 구축하려고 시도한다면, Kafka 서버를 실행하지 않으면, 테스트를 실행할 때 문제가 발생할 것입니다.만약 이렇게 한다면, 당신은 단원 테스트가 끊긴 것을 발견할 수 있습니다. 왜냐하면 테스트 중에 닫는 과정이 없기 때문입니다.이것이 바로 SocketEndpoint 작용을 발휘하는 곳이다.
    이전 글에서 작성한 단원 테스트에서 Kafka 서버에 새로운 생명주기를 사용할 수 있도록 증강할 것입니다.
    우선, 우리는 테스트 kafka 실례를 만들 것이다.
    src/test/java/com/brightfield/streams/InfrastructureTestResource.Java 언어
    package com.brightfield.streams;
    
    import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.testcontainers.containers.KafkaContainer;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    public class InfrastructureTestResource implements QuarkusTestResourceLifecycleManager {
    
        private final Logger log = LoggerFactory.getLogger(InfrastructureTestResource.class);
        private final KafkaContainer kafkaContainer = new KafkaContainer("5.5.1");
    
        @Override
        public int order() {
            return 1;
        }
    
        @Override
        public void init(Map<String, String> initArgs) {
            log.info("Initialising...");
        }
    
        @Override
        public Map<String, String> start() {
            log.info("Starting kafka test container...");
            this.kafkaContainer.start();
    
            log.info("Creating topic...");
            createTopics("chat-messages");
            return configurationParameters();
        }
    
        @Override
        public void stop() {
            this.kafkaContainer.close();
        }
    
        private void createTopics(String... topics) {
            var newTopics =
                Arrays.stream(topics)
                    .map(topic -> new NewTopic(topic, 1, (short) 1))
                    .collect(Collectors.toList());
            try (var admin = AdminClient.create(Map.of("bootstrap.servers", getKafkaBrokers()))) {
                admin.createTopics(newTopics);
            }
        }
    
        private String getKafkaBrokers() {
            this.kafkaContainer.getFirstMappedPort();
            return String.format("%s:%d", kafkaContainer.getContainerIpAddress(), kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT));
        }
    
        private Map<String, String> configurationParameters() {
            log.info("Returning configurationParameters...");
            final Map<String, String> conf = new HashMap<>();
            String bootstrapServers = getKafkaBrokers();
            log.info("Brokers: {}", bootstrapServers);
            conf.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
            conf.put("quarkus.kafka-streams.bootstrap-servers", bootstrapServers);
            conf.put("mp.messaging.outgoing.delivery.bootstrap.servers", bootstrapServers);
            return conf;
        }
    }
    
    다음으로, 우리는 우리의 테스트에서 이 자원을 사용하기를 희망한다.
    src/test/java/com/brightfield/streams/SocketEndpointTest.Java 언어
    @QuarkusTest
    @QuarkusTestResource(value = InfrastructureTestResource.class)
    public class SocketEndpointTest {
    ...
    }
    
    단원 테스트를 컴파일하고 실행할 때, 테스트가 웹socket 테스트를 실행하고 있으며, kafka 용기에 연결된 다음 연결을 끊고 끊지 않는 것을 보셔야 합니다.Infrastructure Test Resource를 생성함으로써 우리는 기본적으로 Kafka 용기의 관리 방식에 생명주기를 추가했다.
  • 우선, SocketEndpoint 방법을 호출한다.우리의 장면에서, 우리가 방금 취소한 testcontainers 방법은 이미 호출되었다.
  • 다음에 init() 방법을 호출합니다. 이 방법은 우리가 사용할 init() 테마를 만들고 카프카 용기의 설정을 되돌려줍니다.
  • 테스트가 완료되면 start() 방법으로 Kafka 용기를 정리하고 닫습니다.
  • ⚠️
    We won't be implementing full testing of the Kafka integration in this article, what we have for now will be sufficient to test from the command line.


    우리의 창고를 운행하다


    이제 모든 것이 다 준비되어야 한다.서비스와angular 클라이언트 응용 프로그램을 시작하여 효과가 있는지 봅시다!
    웹 인터페이스를 통해 테스트 메시지를 보내는 기능은 이전과 같아야 한다.

    우리의 방송 능력을 시험하기 위해서, 우리는 명령행으로 돌아가서 그곳에서 소식을 발표할 것이다.
    터미널 창의 testcontainer 디렉토리에 액세스하여 다음을 입력합니다.
    $ bin/kafka-console-producer.sh --broker-list=localhost:9092 --topic chat-messages
    >Below
    >People
    
    사용자 인터페이스에서 같은 값을 사용하여 업데이트하는 것을 보십시오.

    결론


    이를 바탕으로 간단하고 전면적인 메시지 전달 도구를 개발할 수 있다.사용자와 그 상태, 심지어 그룹 메시지를 표시합니다.
    미래의 글에서 우리는 카프카 구성 요소를 어떻게 테스트하는지 탐색하는 동시에 즐겁게 이야기할 것이다!

    좋은 웹페이지 즐겨찾기