아파치 카프카 만졌어.

6434 단어 DockerKafkaPython
MDC Advent Calendar 2019의 15일째입니다.

Kafka 소개


LinkedIn이 제작한 OSS의 분산 메시지 송수신 시스템 (메시지 전달 대기열).
높은 처리량, 높은 확장성.
Java(Scara)로 작성되었습니다.
Producter, Broker, Consuumer 세 가지 구성 요소로 구성됩니다.
Producter 흐름에서 전송된 데이터를 Consummer로 트렁크합니다.옳고 그름도 데이터를 지속시킬 수 있기 때문이다.송달 보증도 이뤄졌다.
・Producter: 메시지 보내기
• 브로커: Producter에서 Consuumer로 메시지 전달
• Consuumer: 메시지 수신
※ 자세한 구조와 구조 등은 다음과 같은 내용을 참고할 수 있습니다.
Apache Kafka의 개요 및 아키텍처
Apache Kafka 시작

무엇


용례로 다음과 같은 내용을 열거할 수 있다.
  • 데이터 센터로 아키텍처에 통합하여 시스템 신경화(예: 마이크로서비스)를 방지합니다
  • .
  • 기록 수집을 위해 Flentd 등과 협력
  • 웹 사이트의 사용자의 페이지 이동 등을 수집하여 웹 활동 분석에 사용
  • IoT 장치의 센서 값을 종합하여 시각화, 분석, 기타 장치의 제어 등에 사용
  • 빅데이터, 머신러닝, etc
  • 구체적인 부분은 다음과 같다.
    LINE의 대규모 데이터 파이프라인
    실시간 검색
    대형 보건 IT 기업 씨어너의 카프카 활용 사례

    시험해 보다


    간단한 샘플을 만들어 보겠습니다.
    이번에 Kafka 호스트는 Kafka-docker를 사용하여 환경 구축을 진행했고 Producter와 Connsumer의 고객은 Kafka-pytohon을 사용했다.

    Kafka-docker 설치, 시작


    공식.에서 보듯이kafka-docker를 다운로드하고docker-composie를 다운로드합니다.ymlKAFKA_ADVERTISED_HOST_NAME에 docker host의 IP 주소를 쓴 후
    docker-compose up -d
    
    이 정도면 됐어.
    참조: kafka in docker 자습서

    Producter 설치


    사실 트위터 스트림 API와 같은 데이터를 얻어 IoT의 센서 값을 얻으려고 했는데 이번에는 준비가 안 돼서 적당한 수치를 먼저 얻어 마우스의 x 좌표를 얻어 1초 간격으로 카프카에게 보내는 스크립트를 썼다.
    KafkaProducter 매개 변수bootstrap서버에 전달된 값은 docker-compose입니다.yml에도 기록된 docker host의 IP 주소와 kafka에 할당된 컨테이너의 Port 번호를 지정합니다.
    지정한 Port 번호는 docker ps를 사용하여 확인할 수 있습니다.

    위의 상황은 32783이다.
    다음은 Producter 측의 소스 코드입니다.procuer.send(에서 현재 마우스의 x 좌표를 test라는 이름의 Topic에 투사합니다.
    from kafka import KafkaProducer
    import pyautogui
    import time
    
    def main():
        producer = KafkaProducer(bootstrap_servers='{Docker HostのIPアドレス}:{Port}')
        while True:
            result = producer.send('test', str(pyautogui.position().x).encode()).get(timeout=60)
            print(result)
            time.sleep(1)
    
    if __name__ == '__main__':
        main()
    
    

    Consuumer 설치


    다음은 Consummer 측 설치입니다.Kafka의 IP 주소와 Port를 똑같이 지정합니다.for message in consumer:로 카프카에서 순차적으로pull로 데이터를 작성합니다.
    from kafka import KafkaConsumer
    
    def main():
        consumer = KafkaConsumer(
                'test', 
                bootstrap_servers=['{Docker HostのIPアドレス}:{Port}'])
    
        for message in consumer:
            print("x = " + message.value.decode())
    
    if __name__ == '__main__':
        main()
    

    실행


    아래 왼쪽, Producter 옆, 오른쪽에서 Consumer를 실행합니다.
    Producter에서 Kafka로 보내는 값 (마우스의 x 좌표) 은Consuumer 측에서 Kafka에서 가져와 표시할 수 있습니다.

    앞으로 하고 싶은 거.


    ・ 실러캔스 파이, 아두노 등 센서가 장착된 값을 사용한다.
    ・ 획득한 데이터를 도표로 만들거나 해석한다.

    좋은 웹페이지 즐겨찾기