마이크로서비스 프레임워크적인 것을 목표로(Apach Kafka편)

시작된 배경


  • Microservices 에 대해 공부한 결과, 뭔가 만들고 싶어졌다. 거기서 서비스간의 통신에 어떤 기술 선정을 할까라고 생각하고 있었는데, 몇개인가 만들어 보려고 생각해, 우선SpringKafka 입문해 볼까라고 생각했다.
  • 최종적으로는 SpirngBootLayer + BusinessLogicLayer + RepositoryLayer 라고 하는 구성의 서비스를 만들어, BusinessLogicLayer 이외는 application.yml 인가 무언가로 설정을 전환하면, 그 서비스의 통신·데이터 스토어를 전환할 수 있는 프레임워크 를 만들려고 생각한다.
  • 이것이 릴리스 용이성·테스타빌리티·변경 용이성을 담보한 구조가 되고 있는지 물으면 미묘한 생각이 든다. .
  • 이미 여러 가지 단점을 생각할 수 있지만 아무것도 만들어 보자. 그렇게 하는 것으로 뭔가 보일 수 있을 것이다. 우선 SpringKafka에 입문.


  • Apache Kafka



    공식 doc을 읽고 대부분의 구성과 개념을 이해하십시오.

    기능 개요


  • 비즈니스 응용 프로그램이 처리하는 데이터를 pub/sub 모델 또는 pull 모델에서 배포 할 수 있습니다 hub
  • 배포 된 데이터를 fault-tolerant 구성으로 관리 (클러스터 구성)
  • 데이터 발생시 동시에 배포 가능
  • 배달 데이터 발생시 로그 생성은 배달 메시지 순서를 보존합니다
  • 데이터의 한 레코드는 key, value, timestamp로 구성
  • topic 큐와 같은 것에 데이터를 저장한다
  • topic에 전송 된 데이터가 topic에 연결 된 ConsumerGroup Consumer가 데이터를 수신 할 수 있음

  • SpringKafka에서 메시지 보내기



    apache kafka 설치


  • 이 기사을 참조하여 로컬에 Apache Kafka를 설치하고 시작
  • 분명히 zookeeper라는 관리 서비스가있는 것 같습니다. 이 기사zookeeperkafka 의 구성이 알기 쉽게 기재되어 있었으므로 참고로 했습니다.

  • Spring Kafka에서 데이터 전송


  • SpringInitilizer에서 webSpringKafka를 종속적으로 포함하는 병아리 만들기.
  • 로컬 kafka 서버에 topic를 생성하는 설정을 Bean 등록

  • KafkaConfig.java
    package kafka.sample.springKafka.config;
    
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class KafkaConfig {
        @Bean
        public NewTopic userTopic() {
            return new NewTopic("user", 10, (short) 1);
        }
    }
    
  • Apache Kafka에 데이터를 전송하는 구현 Bean 등록

  • KafkaServiceTemplate.java
    package kafka.sample.springKafka.template;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    @Component
    public class KafkaServiceTemplate {
        @Autowired
        private KafkaTemplate<String, String> template;
    
        public void send(String key, String value) {
            template.send(key, value).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    System.out.println(result);
    
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    System.out.println(ex.getMessage());
    
                }
    
            });
        }
    
    }
    
  • 마지막으로 RestAPI로 POST 된 사용자 이름으로 데이터를 등록하는 구현

  • UserController.java
    package kafka.sample.springKafka.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    
    import kafka.sample.springKafka.template.KafkaServiceTemplate;
    
    @Controller
    public class UserController {
        @Autowired
        private KafkaServiceTemplate template;
    
        @RequestMapping(path = "/register/user", method = RequestMethod.POST)
        public String register(@RequestBody String username) {
            template.send("user", username);
            return username;
        }
    }
    

    어플리케이션을 기동해, curl로 POST를 한다.
    그 후, 로컬로 인스톨 된 모듈에 포함되어 있는 CLI 툴로 어플리케이션으로 작성한 topic 를 처음부터 consume 한다. Apache Kafka 서버는 기본적으로 포트 9092에서 올라갑니다.
    $ kafka-console-consumer --bootstrap-server localhost:9092 --topic user --from-beginning
    testUser
    

    이것으로 데이터 전달은 구현 완료.
    쉽게 구현할 수 있었다.

    다음 번에는 좀 더 여러 서버에서 pub/sub 라든지 시도해 보자.

    좋은 웹페이지 즐겨찾기