Docker 및 Python을 사용하여 카프카드 소개
객체 및 로그
데이터베이스 디자인에서 통상적인 방법은 사물의 각도에서 세계를 고려하는 것이다.이런 사고방식은 MySQL과 PostgreSQL 등 SQL 데이터베이스의 디자인을 묘사할 수 있다.개발자는 세계의 사물을 묘사하기 위해 대상을 제시했는데 이런 대상은 모델을 정의한 표에 저장되었다.예를 들어 전구를 설명하면 브랜드, 제조사, 밝기, 에너지 소모와 전구의 현재 상태(켜거나 끄기)를 생각할 수 있습니다.데이터를 저장하거나 실제 대상의 디지털 복사본을 만드는 데 중점을 두면 매우 유용하다.그러나 대상 데이터베이스는 흐르는 데이터를 처리하는 데 도전에 직면해 있다.전구의 예에서 전구를 모스 부호의 지시로 사용하면 전구의 상태가 계속 켜지고 닫히는 것을 나타낸다.끊임없이 변화하는 전구를 물체로 표시하기는 매우 어렵다.이 때 데이터를 이벤트 로그로 보는 것이 매우 유용해진다.
카프카가 뭐예요?
Kafka는 흐르는 데이터를 위해 설계된 분포식 시스템이다.그것은 이벤트 기반 시스템으로 전통적인 대상 기반 데이터베이스 저장소를 대체했다.카프카는 분포식 로그로 볼 수 있으며, 그 중의 정보는 이벤트로 저장되고 데이터를 추가할 수 있다.
카프카의 디자인은 현대 컨테이너화 추세와 모든 것을 조작하는 대형 소프트웨어에서 독립된 임무를 수행하는 소형 컨테이너로의 변화에 잘 적응했다.데이터를 로그로 보고, 응용 프로그램을 여러 개의 독립된 읽기와 쓰기 작업으로 나누면, 절차를 쉽게 풀 수 있다.카프카는 데이터 생산자에게 쓰기 접근권과 데이터 소비자에게 읽기 접근권을 제공함으로써 데이터 통신을 촉진한다.
카프카는 분포식 시스템을 통해 대량의 유량을 처리할 수 있다.Kafka 배치는 여러 개의 프록시 서버를 배치할 수 있으며, 이것은 가로 확장을 허용합니다.카프카의 이벤트 데이터는 테마에 따라 구성되며, 테마마다 여러 개의 구역으로 분할됩니다.이것은 구역 부하를 처리하고 다시 균형 있게 하기 위해 새로운 프록시 노드를 분배할 수 있기 때문에 Kafka의 수평 확장을 허용합니다.Kafka는 각 파티션을 여러 번 복제하여 데이터 내결함성을 보장합니다.
Docker에 카프 카드 배포
세 개의 프록시 노드가 있는 카프카 그룹을 docker로 배치할 것입니다.이 예에서 우리는 카프카 동물원 관리자를 사용하여 설치할 것이다.또 다른 Kafdrop 노드는 카프카 그룹을 감시하는 웹 사용자 인터페이스를 제공하는 데 사용될 것입니다.아키텍처는 다음과 같습니다.
환경 설정
Docker, Docker 설치에 대한 정보here
Docker Compose, Docker Compose 설치에 대한 정보here
Docker Compose 쓰기
Zookeeper 구성zookeeper docker 그림을 사용합니다. 포트 2181을 표시합니다. 이것은 zookeeper의 기본 포트입니다.
zookeeper:
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
Kafka 프록시 노드 구성우리는 카프카드를 위해 융합된 docker 이미지를 사용하고 카프카 노드와 Zookeeper 노드의 통신을 설정합니다.다음 구성은 포트 9091, 9092 및 9093에서 호스트 이름 kafka1, kakfa2, kafka3을 사용하여 세 번 복제합니다.kafka1:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
volumes:
- ./data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
Kafdrop 구성Kafdrop을 Kafka 에이전트에 연결하는 것으로 설정하면 Kafdrop은 메타데이터에서 다른 Kafka 에이전트에 대한 정보를 얻을 수 있습니다.kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:19091"
depends_on:
- kafka1
- kafka2
- kafka3
최종 설정은 docker-compose.yml 파일과 유사해야 합니다.실행
docker-compose up
을 통해 Kafka 그룹을 시작합니다. 이것은 5개의 docker 용기를 배치합니다.docker ps
를 사용하여 검사할 수 있습니다.localhost:9000로 이동하면 Kafdrop 페이지를 볼 수 있습니다. Kafka배치와 Kafka1, Kafka2, Kafka3이라는 프록시 노드가 세 개 있습니다.
카프카 데이터 흐름
로컬 카프카 서비스가 실행됨에 따라 카프카와 상호작용을 시작할 수 있습니다.이 부분에서, 우리는 처음에 언급한 전구 모스 코드 예시를 사용하여 출판사를 만들고, 전송의 '점' 과 '대시' 의 수량을 계산하는 소비자를 만들 것이다.
카프카 테마 만들기
출판사가 데이터를 보낼 수 있도록 카프카에 테마를 만들어야 한다.Kafdrop에서 페이지 아래쪽에 있는 New를 클릭하여 새 주제의 제목
light_bulb
을 지정하고 파티션 수를 3으로 설정합니다.복사 인자에 대해 기본 설정 3을 보존할 수 있습니다. 이것은 전구의 각 구역이 세 번 복사된다는 것을 의미합니다.홈 페이지로 돌아가면 주제 light bulb with 세 개의 파티션을 볼 수 있습니다.건축 출판사
전구의 주제에 데이터를 추가하기 위해서, 우리는 출판사가 카프카와 이야기를 나누어야 한다.간단한 방법은 파이톤 라이브러리를 사용하는 것이다.Confluent 사용
pip install confluent-kafka
으로 Kafka Python 커넥터를 설치하면 다음 명령을 사용하여 Kafka에 데이터를 보내기 시작할 수 있습니다.from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost:9091'})
p.produce('light_bulb', key='hello', value='world')
p.flush(30)
Producer
클래스는 설정 사전을 받아들입니다. Kafka 에이전트에 주소를 지정합니다.에이전트는 다른 에이전트와 연락할 수 있는 메타데이터를 포함하기 때문에 주소만 필요합니다.produce
함수는 확인을 기다리지 않고 데이터를 보내는데 세 가지 입력을 받아들인다. 카프카 테마 이름, 데이터가 어느 구역에 추가되었는지 확인하는 키, 로그 데이터의 값 문자열이다.프로세스가 끝날 때 flush
기능을 사용합니다.이 producer.py 파일은 모스 부호로 전구를 보내는 예시를 제공했다.우리는 카프카 집단에 켜기/끄기 상태 로그를 발표하여 전구에 모스 코드를 보낼 수 있다.
python3 producer.py --key="light-1" --string="XYZ"
일부 발표된 데이터를 통해 우리는 카프카 인터페이스에서 데이터에 관한 정보를 볼 수 있다.건축 소비자
다음은 파이톤 라이브러리를 사용하여 정보를 소비하는 간단한 예입니다.
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'localhost:9091',
'group.id': 'counting-group',
'enable.auto.commit': True,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
})
c.subscribe(['light_bulb'])
while True:
msg = c.poll(0.1)
if msg is None:
continue
elif not msg.error():
print(msg.value())
elif msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached")
else:
print("Error")
Consumer
류는 설정 사전도 받아들인다.우리는 사용자 그룹의 이름을 지정하여 사용자 편이량의 자동 제출을 사용하고 시간 초과를 설정하며 편이량을 최소한의 요소부터 설정합니다.consumer 대상에서 구독 테마 목록을 사용할 수 있으며,while 순환은 구독 테마의 새로운 로그 데이터를 계속 검사할 수 있습니다.이 consumer.py 파일은 전구 예시에서 전송된 모스 부호의'점'과'파절호'수량을 계산하는 것을 제공했다.
고장 처리
단일 노드 장애
Kafka의 디자인이 잘못되었기 때문에, 우리는 용기를 정지해서 노드의 고장을 시뮬레이션할 수 있다.이것은 소프트웨어, 하드웨어 또는 네트워크에서 발생할 수 있는 고장으로 인해 서버가 정지되는 효과를 시뮬레이션할 것이다.3번 프록시 노드를 정지하려면
docker stop kafka_kafka3_1
Kafdrop 페이지를 새로 고칠 때 호스트 Kafka3이 분실된 것을 볼 수 있습니다. Kafka3이 제공하는 구역은 현재 Kafka1과 Kafka2 사이에 분배되어 있습니다.이 예에서 우리는 남은 두 개의 에이전트 노드 중 하나가 모두 두 개의 구역을 처리하는 것을 보았다. 이것은 모두 시스템의 네 개의 구역이다. 이것은 에이전트 노드인kafaka3의 고장으로 데이터가 분실되지 않았음을 나타낸다.테마light Bubble을 누르면partitions에서kafka3이 이끄는 구역이 다른 사용 가능한 노드로 업데이트되는 것을 볼 수 있습니다.또한 프록시 노드 3을 잃어버리면 모든 구역의 복제가 부족합니다. 설정은 세 번의 복제가 필요하지만 두 개의 프록시 노드만 사용할 수 있습니다.
다중 노드 장애
또한 실행
docker stop kafka_kafka2_1
을 통해 프록시 노드 2를 중지합니다.이 경우 복제 인자를 3으로 설정하기 때문에 두 노드가 고장나면 데이터가 분실되지 않습니다.모든 구역의 리더 노드가 나머지 브로커 노드 1로 전환된 것을 볼 수 있습니다.모든 노드 장애
나머지 마지막 노드를 중지하려면
docker stop kafka_kafka1_1
를 실행하십시오.이런 상황에서 우리는 데이터를 잃어버릴 것이다.모든 데이터 복제를 잃었기 때문이다.그러나 이것은 모든 프록시 노드가 고장난 것이 아니라 네트워크 문제로 인해 일어날 수 있는 불가능한 상황이다.따라서 네트워크가 복구될 때 데이터를 검색할 수 있을 가능성이 높다.그러나 프록시 노드가 없으면 Kafka 서버를 사용할 수 없습니다.요약
전체적으로 말하자면 카프카는 고도로 설정할 수 있는 분포식 시스템으로 현대 민첩한 응용 프로그램의 수요에 적합하다.그것은 응용 프로그램 데이터 발표자와 소비자 간의 데이터 서비스 층이다.
Reference
이 문제에 관하여(Docker 및 Python을 사용하여 카프카드 소개), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/boyu1997/intro-to-kafka-4hn2텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)