어떻게 k6를 사용하여 카프카 제작자와 소비자에 대해 부하 테스트를 진행합니까
이로써 k6는 서로 다른 프로토콜을 테스트하고 다양한 상황에 적응하는 일반적인 도구가 되었다.이 글은 내가 k6를 사용하여 각종 시스템 시리즈를 테스트한 세 번째 부분이다.
카프카 자체는 간단한 설정에서 초당 수백 수백만 개의 사건을 틈새 없이 처리할 수 있다.하지만 온라인에 접속하기 전에 카프카 서비스의 행동을 테스트하고 관찰하려면 어떻게 해야 합니까?
xk6-kafka extension 카프카 제작자와 소비자에게 편리한 상호작용 기능을 제공했다.이것은 생산자로서 초당 대량의 메시지를 보낼 수 있으며, 측정된 시스템 (SUT) 을 감시하고, 응용 프로그램이 부하를 어떻게 따라가는지 테스트할 수 있다.
카프카
이 문서를 작성할 때 xk6 kafka 확장은 다음과 같은 API를 제공합니다.
기능
묘사
소모(카드 리더기, 제한)
카프카 서버에서 온 메시지를 사용합니다.
주소, 테마
새 테마를 만듭니다.
목록 테마(주소)
독특한 주제를 되돌려줍니다.
제작(작성자, 정보)
카프카 서버에 메시지를 보냅니다.
독자(매니저, 주제)
새로운 카드 리더의 실례를 실례화하다.
작가(매니저, 주제)
새 Writer 인스턴스를 인스턴스화합니다.
위에서 언급한 일부 API는 인증과 메시지 압축에 사용할 추가 옵션 매개 변수를 확실히 받아들인다.자세한 내용은 more examples를 참조하십시오.
카프카가 확장한 k6 빌딩
기본적으로 k6는 테스트 카프카드를 지원하지 않습니다.xk6-kafka extension를 사용하여 k6를 구축하면 k6버전을 만들 것입니다. 이 버전은 카프카 제작자와 소비자를 테스트하는 기능을 가지고 있습니다.
계속하기 전에 다음이 설치되어 있는지 확인합니다.
go install go.k6.io/xk6/cmd/xk6@latest
명령이 성공적으로 완료되면 다음과 같이 카프카를 위한 사용자 정의 k6 바이너리 파일을 만들 수 있습니다.xk6 build --with github.com/mostafa/xk6-kafka@latest
작업 디렉터리에 새 k6 바이너리 파일을 만드는 데 시간이 좀 걸립니다.Troubleshooting: If you experienced issues installing xk6 extension, kindly head over to the Github repository to download the pre-compiled binaries instead.
카프카를 운영하다
추천하는 방법은 docker를 사용하는 것입니다. 수동 설치가 매우 복잡하고 오류가 발생하기 쉽기 때문입니다.DockerHub에서 떼어낼 수 있습니다 following image by lensesio.그것은 완전한 카프카 개발 설정을 포함한다.
docker pull lensesio/fast-data-dev:latest
그런 다음 다음 다음 명령을 실행하여 분리 모드에서 docker를 시작합니다.sudo docker run -d --rm --name lenseio -p 2181:2181 -p 3030:3030 \
-p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 \
-e ADV_HOST=127.0.0.1 lensesio/fast-data-dev
sudo docker logs -f -t lenseio
액세스 http://localhost:3030
빠른 데이터 개발 환경 파악k6 시험
수입하다
이제 test script라는 새 JavaScript 파일을 만듭니다.js와 k6 바이너리 파일은 같은 디렉터리에 있습니다.그런 다음 파일 상단에 다음 가져오기 문을 추가합니다.
import { check } from "k6";
import { writer, produce, reader, consume, createTopic } from "k6/x/kafka";
초기화
다음 초기화 코드를 첨부하여 계속합니다.
const bootstrapServers = ["localhost:9092"];
const kafkaTopic = "xk6_kafka_json_topic";
const producer = writer(bootstrapServers, kafkaTopic);
const consumer = reader(bootstrapServers, kafkaTopic);
코드는 지정한 설정에 따라 writer와reader 실례를 초기화합니다.Kafka 서버에 대해 다른 IP/호스트 주소와 포트를 사용하고 있다면 이에 따라 수정하십시오.다음에 createTopic 함수를 호출하여 새 테마를 만듭니다.테마가 이미 존재한다면 이 함수는 아무런 작용도 하지 않을 것입니다.
createTopic(bootstrapServers[0], kafkaTopic);
다음 메시지의 유일한 표식자로 랜덤 정수를 만드는 함수를 만듭니다.이것은 선택할 수 있는 것이지 부하 테스트를 하는 강제적인 요구가 아니라는 것을 주의하십시오.function getRandomInt(max=1000) {
return Math.floor((Math.random() * max) + 1);
}
기본 함수
기본 함수의 정의는 다음과 같습니다.
export default function () {
let messages = [{
key: JSON.stringify({
correlationId: "test-id-sql-" + getRandomInt(),
}),
value: JSON.stringify({
title: "Load Testing SQL Databases with k6",
url: "https://k6.io/blog/load-testing-sql-databases-with-k6/",
locale: "en"
}),
},
{
key: JSON.stringify({
correlationId: "test-id-redis-" + getRandomInt(),
}),
value: JSON.stringify({
title: "Benchmarking Redis with k6",
url: "https://k6.io/blog/benchmarking-redis-with-k6/",
locale: "en"
}),
}];
let error = produce(producer, messages);
check(error, {
"is sent": (err) => err == undefined,
});
}
이러한 코드 블록은 다음과 같이 작동합니다.찢어지다
완료되면 분리 기능을 생성하고 연결을 닫습니다.
export function teardown(data) {
producer.close();
consumer.close();
}
실행 테스트
파일을 저장하고 터미널에서 다음 명령을 실행합니다.
./k6 run --vus 50 --duration 5s test_script.js
다음과 같은 출력이 표시되어야 합니다.running (05.0s), 00/50 VUs, 15136 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs 5s
✓ is sent
█ teardown
checks.........................: 100.00% ✓ 15136 ✗ 0
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=16.49ms min=31.9µs med=13.52ms max=1.14s p(90)=28.55ms p(95)=36.46ms
iterations.....................: 15136 3017.4609/s
kafka.writer.dial.count........: 151 30.102841/s
kafka.writer.error.count.......: 0 0/s
kafka.writer.message.bytes.....: 5.2 MB 1.0 MB/s
kafka.writer.message.count.....: 30272 6034.9218/s
kafka.writer.rebalance.count...: 0 0/s
kafka.writer.write.count.......: 30272 6034.9218/s
vus............................: 5 min=5 max=50
vus_max........................: 50 min=50 max=50
부하 조절
VU 수를 늘리면 부하를 쉽게 확장할 수 있습니다.예를 들어, 다음 명령은 1분간 500개의 VU 로드 테스트를 사용합니다.
./k6 run --vus 500 --duration 1m test_script.js
If you are new to k6, check out how to configure the load options in the script or run a stress test with k6.
테스트 시간 연장
위의 스크립트는 Kafka 서버에 메시지를 생성하는 것에 관한 것입니다.사실상, 코드를 메시지를 생성하고 사용하는 테스트로 쉽게 수정할 수 있습니다.
for 순환 코드 아래에 다음 코드만 추가하면 됩니다.
let result = consume(consumer, 10);
check(result, {
"10 messages returned": (msgs) => msgs.length == 10,
});
코드는 매번 10개의 메시지를 읽을 것이다.더 많은 메시지를 사용하려면 값을 더 높은 값으로 수정하십시오.같은 명령을 사용하여 실행할 때 출력은 다음과 같습니다.
running (05.0s), 00/50 VUs, 9778 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs 5s
✓ is sent
✓ 10 messages returned
█ teardown
checks.........................: 100.00% ✓ 19556 ✗ 0
data_received..................: 0 B 0 B/s
data_sent......................: 0 B 0 B/s
iteration_duration.............: avg=25.53ms min=41.4µs med=18ms max=1.41s p(90)=37.73ms p(95)=52.37ms
iterations.....................: 9778 1946.80798/s
kafka.reader.dial.count........: 50 9.955042/s
kafka.reader.error.count.......: 0 0/s
kafka.reader.fetches.count.....: 101 20.109184/s
kafka.reader.message.bytes.....: 15 MB 2.9 MB/s
kafka.reader.message.count.....: 97830 19478.034846/s
kafka.reader.rebalance.count...: 0 0/s
kafka.reader.timeouts.count....: 46 9.158638/s
kafka.writer.dial.count........: 152 30.263327/s
kafka.writer.error.count.......: 0 0/s
kafka.writer.message.bytes.....: 3.4 MB 669 kB/s
kafka.writer.message.count.....: 19556 3893.615961/s
kafka.writer.rebalance.count...: 0 0/s
kafka.writer.write.count.......: 19556 3893.615961/s
vus............................: 50 min=50 max=50
vus_max........................: 50 min=50 max=50
k6의 카프카 도량
기본적으로 k6는 자신의built-in metrics 자동 수집이 있습니다.이외에도 own custom metrics를 만들 수 있습니다.사용자 지정 지표는 다음 유형으로 나눌 수 있습니다.
Counter: 누적치 총화의 도량.
Gauge: 최소값, 최대값, 마지막 값에 추가된 메트릭을 저장합니다.
Rate: 제로 부가가치 비율을 추적하는 지표.
Trend: 부가값(최소값, 최대값, 평균값과 백분위수) 통계 데이터의 도량을 계산할 수 있습니다.
독자.
reader의 지표를 봅시다.
운율학
타입
묘사
카프카.독자다이얼을 돌리다.헤아리다
카운터
독자가 카프카에 연결을 시도한 총 횟수.
카프카.독자잘못 1헤아리다
카운터
카프카를 읽을 때 발생한 오류의 총 수입니다.
카프카.독자끌어당기다헤아리다
카운터
리더가 카프카에서 대량의 정보를 얻는 총 횟수
카프카.독자메모바이트
카운터
소모된 총 바이트 수.
카프카.독자메모헤아리다
카운터
사용한 메일의 총 수입니다.
카프카.독자다시 균형을 잡다.헤아리다
카운터
소비자 집단에서 어떤 주제의 재균형 총수 (폐기).
카프카.독자타임.헤아리다
카운터
카프카에서 읽을 때 발생한 시간 초과 총수
작가.
writer에 대해 지표는 다음과 같다.
운율학
타입
묘사
카프카.작가다이얼을 돌리다.헤아리다
카운터
저자는 카프카에 연결된 총 횟수를 시도했다.
카프카.작가잘못 1헤아리다
카운터
카프카에 쓸 때 발생한 오류의 총 수입니다.
카프카.작가메모바이트
카운터
생성된 총 바이트 수입니다.
카프카.작가메모헤아리다
카운터
생성된 메일의 총 수입니다.
카프카.작가다시 균형을 잡다.헤아리다
카운터
테마의 재균형 총 수 (사용하지 않음).
카프카.작가쓰다헤아리다
카운터
컴파일러는 카프카의 총 횟수에 메시지를 대량으로 기록합니다.
더 많은 카프카 지표here를 찾을 수 있습니다.그러나 확장은 모든 지표를 수집하지 않았다.너는 이것GitHub issue에 따라 그들이 추가한 진도를 추적할 수 있다.
더 많은 예
그 밖에 xk6 카프카 메모리 라이브러리는 새로운 사용자에게 기존의 few test scripts를 제공했다.이 문서에는 다음과 같은 테스트가 포함되어 있습니다.
test_avro.js: 매번 200개의 Avro 메시지 테스트 카프카드를 교체한다.
test_avro_with_schema_registry.js: schema registry
test_json.js: 매번 200개의 JSON 메시지 테스트 카프카드를 교체한다.
test_json_with_snappy_compression.js: snappy 압축
test_sasl_auth.js: SASL 인증을 바탕으로 매번 200개의 JSON 메시지 테스트 카프카드를 교체한다.
test_topics.js: 모든 카프카 구역의 테마를 표시하고 무작위 테마를 만듭니다.
결론
한마디로 k6를 사용하여 아파치 카프카에 대한 부하 테스트를 하는 것이 훨씬 쉬워졌다.k6는 부하 테스트를 만들고 확장하는 데 기초를 제공했고 xk6 kafka는 kafka 서버와 상호작용하는 데 편리한 API를 제공했다.
사용 가능한 k6 확장에 대한 더 많은 정보를 알고 싶으면 bundle builder page만 방문하십시오.이 페이지에서는 사용자 정의 k6 바이너리 파일을 구축하기 위해 명령을 생성할 수 있습니다.
질문이 있거나 확장에 관심이 있으면 슬랙의 k6 커뮤니티에 가입하십시오.
Reference
이 문제에 관하여(어떻게 k6를 사용하여 카프카 제작자와 소비자에 대해 부하 테스트를 진행합니까), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/k6/how-to-load-test-your-kafka-producers-and-consumers-using-k6-2l8k텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)