Spring Cloud를 사용한 기능적 Kafka - 1부

지금까지 찾을 수 없었던 Spring Cloud Kafka의 작업 데모를 만들기 위해 이 기사를 정리했습니다.

전제 조건:
  • Java 11+(Java 18 사용 중)
  • Linux 기반 OS(저는 macOS에 있습니다)
  • Httpie

  • 설정:
  • 스프링 클라우드 2021.0.1
  • Confluent 스키마 레지스트리 7.1.0
  • 아파치 카프카 2.13_3.1.0
  • 아파치 주키퍼 3.7.0

  • 이 기사는 먼저 Spring Cloud Stream을 사용하여 기능적 kafka 소비자가 있는 문자열로 Kafka에 웹 API 게시 이벤트를 설정하는 것으로 시작합니다.

    환경 설정



    여기에서 Apache ZooKeeper를 다운로드하십시오.

    https://zookeeper.apache.org/releases.html#download
    


    압축을 풀고 작업 폴더로 이동합니다($ZOOKEEPER_HOME 사용)

    여기에서 Kafka를 다운로드하십시오.

    https://kafka.apache.org/downloads
    


    다시 아카이브 압축을 풀고 작업 폴더로 이동합니다. 이번에는 작업 폴더를 $KAFKA_HOME로 참조하겠습니다.

    다음을 사용하여 스키마 레지스트리를 다운로드하고 압축을 풉니다.

    $ wget https://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
    $ tar -xf confluent-community-7.0.1.tar.gz
    $ cd confluent-7.0.1
    

    $CONFLUENT_HOME에서 이 애플리케이션 위치를 참조합니다.

    환경 구성



    먼저 ZooKeeper를 설정합시다!

    주키퍼 구성



    예제 구성을 복사하여 구성 파일을 만듭니다.

    ZOOKEEPER_HOME $> cp conf/zoo_sample.cfg conf/zoo.cfg
    


    편집할 수 있는 유일한 값은 메타데이터를 편안하게 저장할 수 있는 위치에 대한 dataDir 디렉터리입니다.

    ...
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just
    # example sakes.
    dataDir=./data
    


    파일을 저장하면 Kafka 구성으로 이동합니다.

    카프카 구성



    여기서 우리가 관심 있는 파일은 다음과 같습니다.

    $KAFKA_HOME/config/server.properties
    


    변경 가능성이 가장 높은 두 가지 값이 있습니다.
  • log.dirs=./logs
  • zookeeper.connect=localhost:2181

  • 이렇게 하면 로그가 액세스 가능한 위치에 있고 Kafka가 ZooKeeper에 연결하여 데이터를 유지할 수 있습니다.

    스키마 레지스트리 구성



    여기에서 하나의 파일만 확인해야 합니다: $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
    모든 기본값은 괜찮을 것입니다.

    listeners=http://0.0.0.0:8081
    kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
    kafkastore.topic=_schemas
    debug=false
    


    ZooKeeper 및 Kafka 시작



    Kafka에서 String 값으로 작업하고 있으므로 먼저 Schema Registry를 사용하지 않을 것이므로 시작하겠습니다.

    $ZOOKEEPER_HOME > bin/zkServer.sh start conf/zoo.cfg
    ZooKeeper JMX enabled by default
    Using config: conf/zoo.cfg
    Starting zookeeper ... STARTED
    
    $KAFKA_HOME > bin/kafka-server-start.sh config/server.properties
    ...
    [2022-04-25 11:19:17,742] INFO Kafka version: 3.1.0 (org.apache.kafka.common.utils.AppInfoParser)
    [2022-04-25 11:19:17,743] INFO Kafka commitId: 37edeed0777bacb3 (org.apache.kafka.common.utils.AppInfoParser)
    [2022-04-25 11:19:17,743] INFO Kafka startTimeMs: 1650910757737 (org.apache.kafka.common.utils.AppInfoParser)
    [2022-04-25 11:19:17,745] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    


    좋아, 프로그래밍하자!!!

    직원 API



    이것은 kafka 메시지를 생성하기 위한 단일POST 끝점이 있는 간단한 API입니다.

    https://start.spring.io으로 이동하여 기본 웹 애플리케이션을 생성하십시오.



    다운로드를 클릭하고 demo.zip 파일을 작업공간으로 확장합니다.

    시작하려면 Kafka에 게시할 몇 가지 종속 항목이 필요합니다.
    pom.xml 파일에 다음 종속성을 추가합니다.

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.8.4</version>
    </dependency>
    


    다음으로 엔드포인트를 제공할 컨트롤러를 생성합니다.

    package com.example.demo;
    
    import com.example.demo.model.Employee;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.ResponseEntity;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.net.URI;
    import java.util.UUID;
    
    @RequestMapping("/employee")
    @RestController
    public class EmployeeController {
    
        @Autowired
        protected KafkaTemplate<String, String>  kafka;
    
        public ResponseEntity<Void> createEmployee(@RequestParam("firstname") String firstname,
                                                   @RequestParam("lastname") String lastname) {
    
            String id = UUID.randomUUID().toString();
            kafka.send("employee", id, String.format("%s, %s", lastname, firstname));
    
            return ResponseEntity.created(URI.create(String.format("/employee/%s", id))).build();
        }
    
    }
    


    코드를 빠르게 살펴보겠습니다...

    먼저 KafkaTemplate<String, String> template 를 주입합니다. 멋진 작업을 수행하지 않기 때문에 메시지 키를 java.lang.String 로 보내고 메시지 본문도 java.lang.String 유형이 됩니다.

    우리@PostMapping는 단순히 2개의 쿼리 매개변수가 있는 엔드포인트가 될 것입니다.
  • 이름

  • 따라서 URL의 형식은 다음과 같습니다.http://localhost:8050/employee?firstname=Paula&lastname=Abdul
    API를 호출할 때 다음 구문과 함께 Httpie를 사용합니다.

    http POST http://localhost:8070/employee firstname==Paula lastname==Abdul
    


    다음으로 우리는 임의의 UUID를 메시지 ID로 생성하고 이를 성 및 이름의 연결된 문자열과 함께 employee 주제로 보냅니다.

    String id = UUID.randomUUID().toString();
    kafka.send("employee", id, String.format("%s, %s", lastname, firstname));
    


    애플리케이션을 시작하기 전에 구성을 조정해 보겠습니다. src/main/resources/application.yml 파일에서 다음과 동일하게 만들어 보겠습니다.

    spring:
      kafka:
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        bootstrap-servers:
          - localhost:9092
    
    server:
      port: 8070
    


    이제 애플리케이션을 실행하고 엔드포인트에 도달할 수 있습니다.

    ./mvnw spring-boot:run
    ...
    : Tomcat initialized with port(s): 8070 (http)
    : Starting service [Tomcat]
    : Starting Servlet engine: [Apache Tomcat/9.0.54]
    : Initializing Spring embedded WebApplicationContext
    : Root WebApplicationContext: initialization completed in 754 ms
    : Tomcat started on port(s): 8070 (http) with context path ''
    : Started DemoApplication in 1.522 seconds (JVM running for 1.841)
    


    끝점을 호출합니다.

    $ http POST http://localhost:8070/employee firstname==Andrew lastname==Marshall
    HTTP/1.1 201 
    Connection: keep-alive
    Content-Length: 0
    Date: Wed, 27 Apr 2022 17:30:39 GMT
    Keep-Alive: timeout=60
    Location: /employee/830da346-38b9-4d5b-a051-a302c395333e
    

    kafka-console-consumer.sh 명령을 사용하여 Kafka에서 메시지를 추적할 수 있습니다.

    $ $KAFKA_HOME/bin/kafka-console-consumer  --topic employee --bootstrap-server localhost:9092 --from-beginning
    Marshall, Andrew
    


    다음 기사 Spring Cloud Stream을 사용하여 기능 소비자를 설정합니다...

    좋은 웹페이지 즐겨찾기