spring boot+kafka 사용 상세 절차

1.Kafka 의 설치 및 설정
1.파일 다운로드
wget http://mirror.bit.edu.cn/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz

2.설치
tar xzvf kafka_2.11-0.11.0.0.tgz -C /usr/local/

3.설정(서버 가 아 리 클 라 우 드 ECS 에 있 음)
vi %kafka_home%/server.properties

############################# Server Basics #############################
broker.id=0
port=9092
host.name=     ip
advertised.host.name=     ip
delete.topic.enable=true

....       

둘째,Spring boot 조작 Kafka
1.Spring 부팅 pom 의존

    org.springframework.boot
    spring-boot-starter-parent
    1.5.6.RELEASE
     


2.kafka 의존
 

    org.springframework.kafka
    spring-kafka


3.kafka 설정
#kafka    
spring.kafka.bootstrap-servers=     ip:9092
#       
spring.kafka.consumer.group-id=defaultGroup
#key-value       
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288

4.kafka 송신 코드
@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     *      kafka,   test
     */
    public void sendTest(){
        kafkaTemplate.send("test","hello,kafka  "  + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
    }
}

5.Kafka 소비 코드
public class KafkaConsumer {

    /**
     *   test  ,      
     * @param message
     */
    @KafkaListener(topics = {"test"})
    public void consumer(String message){
        log.info("test topic message : {}", message);
    }
}

6,테스트 코드 시작
@SpringBootApplication
@EnableScheduling
public class ServerApplication {
    @Autowired
    private KafkaSender kafkaSender;


    public static void main(String[] args) {
        SpringApplication.run(ServerApplication.class, args);
    }

    //    1      
    @Scheduled(fixedRate = 1000 * 60)
    public void testKafka() throws Exception {
        kafkaSender.sendTest();
    }
}

3.이상 은 완전한 spring boot 가 Kafka 메시지 큐 를 통합 하 는 절차 입 니 다.

좋은 웹페이지 즐겨찾기