3장. 카프카와 스프링 부트
1장에서 실행한 카프카 컨테이너를 스프링 부트에서 활용해 보자
Spring boot with intelliJ IDE and Kafka setting
모든 자세한 사항은 여기에서 찾아볼 수 있다
스프링 부트와 카프카
스프링 부트 프로젝트 세팅
1. 사랑스런 spring initianlizr 로 아래와 같이 대충 세팅해서 스프링부트 프로젝트를 만들자.
- Dependencies에 우선 Kafka, Mysql, Lombok, JPA(없어도 무방은 할 것 같으나 추후 추가 테스트를 위해)를 추가했다.
- 빌드는 Gradle로 설정했다. 메이븐 싫다! 😀
2. Dependencies에 이것 저것 추가해둔 것들, 특히 DB (JPA포함) 관련 설정들 해주기
- 위 사진에서 참조할 것은 왼쪽 경로만 참조하면 된다 ^*^
- application properties에 하단 정보를 DB password부분만 변경해서 붙여넣자
server.address=localhost
server.port=8080
# API 호출시, SQL 문을 콘솔에 출력한다.
spring.jpa.show-sql=true
# DDL 정의시 데이터베이스의 고유 기능을 사용합니다.
# ex) 테이블 생성, 삭제 등
spring.jpa.generate-ddl=true
# MySQL 을 사용할 것.
spring.jpa.database=mysql
# MySQL 설정
spring.datasource.url=jdbc:mysql://localhost:3306/study?useSSL=false&characterEncoding=UTF-8&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=여길바꾸세요!
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# MySQL 상세 지정
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
############################## Kafka ##############################
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=kafka-demo
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.max-poll-records=1000
spring.kafka.template.default-topic=kafka-demo
-
하단에 kafka관련 설정들은 아래와 같다.
spring.kafka.bootstrap-servers
카프카서버 정보, 기본적으로 9092 포트를 사용한다.spring.kafka.consumer.group-id
컨슈머의 그룹idspring.kafka.consumer.enable-auto-commit
데이터를 어디까지 읽었다는 offset을 주기적으로 저장할지 여부spring.kafka.consumer.auto-offset-reset
offset에 오류가 있을 경우 어디서부터 다시 할지 여부
ealiest - 맨처음부터 다시 읽는다
latest - 이전꺼는 무시하고, 이제부터 들어오는 데이터부터 읽기 시작한다spring.kafka.producer.key-serializer
데이터를 kafka로 전달할때 사용하는 Key Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능spring.kafka.consumer.key-deserializer
데이터를 kafka에서 받아서 사용하는 Key Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능spring.kafka.producer.value-serializer
데이터를 kafka로 전달할때 사용하는 Value Encoder ClassStringSerializer는 문자열 형태의 데이터에만 사용 가능spring.kafka.consumer.value-deserializer
데이터를 kafka에서 받아서 사용하는 Value Decoder ClassStringDeserializer는 문자열 형태의 데이터에만 사용 가능spring.kafka.consumer.max-poll-records
consumer가 한번에 가져오는 message 갯수spring.kafka.template.default-topic
기본 설정 topic name
Controller / Service 만들어 주기
1. 프로젝트 구성도와 service
- 프로젝트 구성도는 위와 같고, restAPI 아키텍처를 따를 것이다. controller / service 패키지를 src > main > java > com.kafka.demo 하위에 만들어 주자.
package com.kafka.demo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final String TOPIC = "kafka-demo";
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
System.out.println(String.format("Produce message : %s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}
- KafkaProducer는 위와 같다. TOPIC은 properties에서 설정한 토픽으로 설정해줘야 한다.
- 아 물론 당연히 저 토픽 부분이 앞으로는 request에서 header, body(data), params 등 으로 가변적인 요소가 될 것이다.
- kafkaTemplate API를 살펴보는 것도 좋은 방법이다.
- 핵심은 producer는
this.kafkaTemplate.send(TOPIC, message);
를 통해서 TOPIC에 해당하는 message를 전달할 것이다.
package com.kafka.demo.service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "kafka-demo", groupId = "kafka-demo")
public void consume(String message) throws IOException {
System.out.println(String.format("Consumed message : %s", message));
}
}
- KafkaConsumer는 위와 같다. topics와 groupId는 우선 properties에서 설정한 것으로 해주자.
- 어노테이션이
@KafkaListener
와 같다. 이 부분은 데브원영 님이 kafka의 대가!
2. Controller 마저 완성하기.
package com.kafka.demo.controller;
import com.kafka.demo.service.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final KafkaProducer producer;
@Autowired
KafkaController(KafkaProducer producer) {
this.producer = producer;
}
@PostMapping
public String sendMessage(@RequestParam("message") String message) {
this.producer.sendMessage(message);
return "success";
}
}
- 마지막으로 controller는 위와 같다.
- 특별한 것 전혀없이, POST 형식의 request를 message 부분을 data로 받는 것이고, success라는 string만 return한다.
Build와 Deploy후 실제 test하기
1. 일단 gradle build부터 해서 war파일을 만들자.
2. 톰켓 서버 세팅하고 러닝해보자!
- (2)에서 build한 war파일을 Deployment 할 것임을 명시해주자.
- context는 편하게 / 로 설정해 두자. 이게 뭔가 싶으면 검색하자 / 클릭
- 톰켓 서버 러닝을 성공한다면 다음과 같다 :) 왜냐면 아무 설정도 안했으니까 위와 같이 뜬다!
3. 이게 뭐에요,, 서버 에러 로그 계속 찍히잖아요!
2021-07-18 21:20:03.012 WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2021-07-18 21:20:03.855 WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-07-18 21:20:03.856 WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2021-07-18 21:20:04.746 WARN 13819 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-foo-1, groupId=foo] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
- Kafka도 당연히 서버가 필요하고, '브로커'가 Producer되는 메신저를 다 받아서 전달한다. 조금 더 자세히 알아보고 싶으면 클릭
- 브로커 서버를 러닝해라! 1장에서 브로커 Dokcer Container를 기억할 것 이다!
- 이번엔 docker에서 핫하게 밀어주시는 App으로 조져보자 (mac기준이다)
- 그냥,, container 러닝만 하면된다 ^^,, container를 제거한 사람은 docker-compose up -d 를 통해 (물론 yml 파일이 있는 곳에서 ^^) 다시 러닝해주면 된다.
4. 다시 서버 실행, 그리고 실제 test
- 뭐 도구는 알아서 선택하고, 러닝한 서버에 위와 같이 request를 보내보자!
- 콘솔창에서 확인해보면,
Produce message : Hello Kafka World!
와 같이 우리가 설정한 로그가 찍혀있는 것을 확인할 수 있다. - 위에서 우리가 러닝한 컨테이너 (Kafka 요놈)에 접근해보자! 어떻게요? 라면 클릭
- container로 접근해서 consumer를 실행시키는 shell을 통해서 'kafka-demo'에 해당하는 토픽을 'from-beginning'으로 찍어보니 우리가 보낸 메시지, Hello Kafka World를 볼 수 있다.
- 실시간으로 직접 해보자! 생각보다 퍼포먼스가 굉장히 빠르다는 것을 바로 체감할 수 있다!!
마무리
다시 처음으로 돌아가서
- 하나씩 다시 정리하고 '어떻게 컨셉을 가져가서 도입을 해야할까?' 에 초점을 맞춰보자.
- 단순한 로그를 kafka를 통해서?
- ELK stack에서 kafka의 도입?
- 라인에서 kafka를 사용하는 방법?
- 우선 '로그를 kafka를 통해서' 라고 생각을 시작하고, 실제 프로젝트에 도입을 해보자.
Author And Source
이 문제에 관하여(3장. 카프카와 스프링 부트), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@qlgks1/3장.-카프카와-스프링-부트저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)