kafka를 만진 메모

kafka란? 특징은?


  • 분산 이벤트 구동 플랫폼
  • PUB/SUB(Producer/Consumer)에서 이벤트 구동 개발이 가능
  • message 순서를 지킬 수 있다 1
  • 필요한 경우 메시지 보존 방법을 설정할 수 있습니다.
  • 시간이 지나면 자동 삭제
  • 로그 데이터가 커지면 자동 삭제
  • 데이터 덮어 쓰기로 최신 데이터 만 유지
  • 영원히 유지하고 time travel도 할 수 있다

  • Consumer 측에서 어디까지 데이터를 consume했는지를 관리할 수 있다 2
  • 자동
  • 수동

  • 데이터 손실을 제어할 수 있습니다 3
  • transaction 지원 4
  • atomicity, consistency를 실현하기 위해 데이터를 큰 json에 넣고 PUB하는 방법도있다.


  • kafka의 시스템 레벨 이벤트 구동 개발 이미지



    kafka의 logo가 나타내는 것처럼 다양한 서비스가 kafka를 통해 실시간으로 정보를 연계 할 수 있습니다.

    5

    시스템을 설정할 때의 유의점


  • message의 순서를 지켜야 하는가
  • 어느 정도의 데이터 loss는 용서해야 하는가
  • 중복 데이터를 consume 할 때, 처리를 어떻게 할까
  • schema less이므로 주제와 데이터 구조에 대한 문서를 작성해야합니다
  • 데이터 버전 관리
  • 데이터 자체에 버전을 넣을지, topic으로 버전 관리할지를 결정할 필요가 있다


  • docker로 Kafka server 만들기



    사용
    docker-compose -f docker-compose-single-broker.yml up
    

    rails + karafka로 메시지 교환


  • rails app 만들기
  • rails new karafka_example
    
  • Gemfile에 다음 gem을 추가하고 bundle install
  • gem 'karafka'
    
  • Karafka 설정 만들기
  • bundle exec karafka install
    

    다음 파일이 생성됨
    app/consumers/application_consumer.rb
    app/responders/application_responder.rb
    karafka.rb
    
  • karafka.rb 편집
  • ENV['RAILS_ENV'] ||= 'development'
    ENV['KARAFKA_ENV'] = ENV['RAILS_ENV']
    require ::File.expand_path('../config/environment', __FILE__)
    Rails.application.eager_load!
    
    class KarafkaApp < Karafka::App
      setup do |config|
        config.kafka.seed_brokers = %w[kafka://127.0.0.1:9092]
        config.client_id = 'example_app'
        config.backend = :inline
        config.batch_fetching = true
        config.logger = Rails.logger
      end
    
      Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)
    
      consumer_groups.draw do
        consumer_group :bigger_group do
          batch_fetching false
    
          topic :users do
            consumer UsersConsumer
          end
        end
      end
    end
    
    KarafkaApp.boot!
    
  • consumer 만들기

  • app/consumers/users_consumers.rb
    class UsersConsumer < ApplicationConsumer
      def consume
        Karafka.logger.info "New [User] event: #{params}"
      end
    end
    
  • responder 만들기

  • app/responders/users_responder.rb
    class UsersResponder < ApplicationResponder
      topic :users
    
      def respond(event_payload)
        respond_to :users, event_payload
      end
    end
    
  • karafka server 시작
  • bundle exec karafka server
    
  • rails console에서 데이터 작성 시도
  • UsersResponder.call({ event_name: "user_created", payload: { id: 1 } }
    

    관련 자료





    wurstmeister/kafka-docker  message 순서

      로그 삭제

      commit-offset

      message guarantee

      transaction

    좋은 웹페이지 즐겨찾기