어떻게 k6를 사용하여 카프카 제작자와 소비자에 대해 부하 테스트를 진행합니까

최근 k6는 지원k6 extensions을 시작하여 k6능력을 지역사회에 필요한 다른 상황으로 확대하였다.지역사회가 이미 건설되었다plenty of extensions.K6 확장은 Go로 작성되었고, 그 중 많은 것들이 기존의 Go 라이브러리를 다시 사용하고 있습니다.
이로써 k6는 서로 다른 프로토콜을 테스트하고 다양한 상황에 적응하는 일반적인 도구가 되었다.이 글은 내가 k6를 사용하여 각종 시스템 시리즈를 테스트한 세 번째 부분이다.


  • 이 글에서 우리가 유행하는 카프카 프로젝트를 어떻게 테스트하는지 봅시다.Apache Kafka는 강력한 이벤트 스트리밍 플랫폼으로 다음과 같은 기능을 제공합니다.
  • 쓰기 및 읽기 이벤트 흐름
  • 필요에 따라 스토리지 이벤트 흐름
  • 평행 사건 흐름을 회고적으로 처리
  • 클라이언트 애플리케이션이 Kafka 서버에 이벤트를 기록할 수 있도록 합니다.우리는 이런 유형의 응용을 생산자라고 부른다.Kafka 서버에서 이벤트를 읽고 처리하는 클라이언트 프로그램을 사용자라고 합니다.
    카프카 자체는 간단한 설정에서 초당 수백 수백만 개의 사건을 틈새 없이 처리할 수 있다.하지만 온라인에 접속하기 전에 카프카 서비스의 행동을 테스트하고 관찰하려면 어떻게 해야 합니까?
    xk6-kafka extension 카프카 제작자와 소비자에게 편리한 상호작용 기능을 제공했다.이것은 생산자로서 초당 대량의 메시지를 보낼 수 있으며, 측정된 시스템 (SUT) 을 감시하고, 응용 프로그램이 부하를 어떻게 따라가는지 테스트할 수 있다.

    카프카
    이 문서를 작성할 때 xk6 kafka 확장은 다음과 같은 API를 제공합니다.
    기능
    묘사
    소모(카드 리더기, 제한)
    카프카 서버에서 온 메시지를 사용합니다.
    주소, 테마
    새 테마를 만듭니다.
    목록 테마(주소)
    독특한 주제를 되돌려줍니다.
    제작(작성자, 정보)
    카프카 서버에 메시지를 보냅니다.
    독자(매니저, 주제)
    새로운 카드 리더의 실례를 실례화하다.
    작가(매니저, 주제)
    새 Writer 인스턴스를 인스턴스화합니다.
    위에서 언급한 일부 API는 인증과 메시지 압축에 사용할 추가 옵션 매개 변수를 확실히 받아들인다.자세한 내용은 more examples를 참조하십시오.

    카프카가 확장한 k6 빌딩
    기본적으로 k6는 테스트 카프카드를 지원하지 않습니다.xk6-kafka extension를 사용하여 k6를 구축하면 k6버전을 만들 것입니다. 이 버전은 카프카 제작자와 소비자를 테스트하는 기능을 가지고 있습니다.
    계속하기 전에 다음이 설치되어 있는지 확인합니다.
  • 시작(>=1.7)
  • 지트
  • 그런 다음 터미널에서 다음 명령을 실행하여 xk6 모듈을 설치합니다.
    go install go.k6.io/xk6/cmd/xk6@latest
    
    명령이 성공적으로 완료되면 다음과 같이 카프카를 위한 사용자 정의 k6 바이너리 파일을 만들 수 있습니다.
    xk6 build --with github.com/mostafa/xk6-kafka@latest
    
    작업 디렉터리에 새 k6 바이너리 파일을 만드는 데 시간이 좀 걸립니다.

    Troubleshooting: If you experienced issues installing xk6 extension, kindly head over to the Github repository to download the pre-compiled binaries instead.



    카프카를 운영하다
    추천하는 방법은 docker를 사용하는 것입니다. 수동 설치가 매우 복잡하고 오류가 발생하기 쉽기 때문입니다.DockerHub에서 떼어낼 수 있습니다 following image by lensesio.그것은 완전한 카프카 개발 설정을 포함한다.
    docker pull lensesio/fast-data-dev:latest
    
    그런 다음 다음 다음 명령을 실행하여 분리 모드에서 docker를 시작합니다.
    sudo docker run -d --rm --name lenseio -p 2181:2181 -p 3030:3030 \
           -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092  \
           -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev
    
    sudo docker logs -f -t lenseio
    
    액세스 http://localhost:3030 빠른 데이터 개발 환경 파악

    k6 시험

    수입하다
    이제 test script라는 새 JavaScript 파일을 만듭니다.js와 k6 바이너리 파일은 같은 디렉터리에 있습니다.그런 다음 파일 상단에 다음 가져오기 문을 추가합니다.
    import { check } from "k6";
    import { writer, produce, reader, consume, createTopic } from "k6/x/kafka";
    

    초기화
    다음 초기화 코드를 첨부하여 계속합니다.
    const bootstrapServers = ["localhost:9092"];
    const kafkaTopic = "xk6_kafka_json_topic";
    
    const producer = writer(bootstrapServers, kafkaTopic);
    const consumer = reader(bootstrapServers, kafkaTopic);
    
    코드는 지정한 설정에 따라 writer와reader 실례를 초기화합니다.Kafka 서버에 대해 다른 IP/호스트 주소와 포트를 사용하고 있다면 이에 따라 수정하십시오.
    다음에 createTopic 함수를 호출하여 새 테마를 만듭니다.테마가 이미 존재한다면 이 함수는 아무런 작용도 하지 않을 것입니다.
    createTopic(bootstrapServers[0], kafkaTopic);
    
    다음 메시지의 유일한 표식자로 랜덤 정수를 만드는 함수를 만듭니다.이것은 선택할 수 있는 것이지 부하 테스트를 하는 강제적인 요구가 아니라는 것을 주의하십시오.
    function getRandomInt(max=1000) {
      return Math.floor((Math.random() * max) + 1);
    }
    

    기본 함수
    기본 함수의 정의는 다음과 같습니다.
    export default function () {
        let messages = [{
          key: JSON.stringify({
              correlationId: "test-id-sql-" + getRandomInt(),
          }),
          value: JSON.stringify({
              title: "Load Testing SQL Databases with k6",
              url: "https://k6.io/blog/load-testing-sql-databases-with-k6/",
              locale: "en"
          }),
        },
        {
          key: JSON.stringify({
              correlationId: "test-id-redis-" + getRandomInt(),
          }),
          value: JSON.stringify({
              title: "Benchmarking Redis with k6",
              url: "https://k6.io/blog/benchmarking-redis-with-k6/",
              locale: "en"
          }),
      }];
    
        let error = produce(producer, messages);
        check(error, {
              "is sent": (err) => err == undefined,
        });
    }
    
    이러한 코드 블록은 다음과 같이 작동합니다.
  • 초기화 메시지 목록
  • 제품 함수를 호출하여 메시지를 발표합니다
  • 메시지가 성공적으로 발송되었는지 확인

  • 찢어지다
    완료되면 분리 기능을 생성하고 연결을 닫습니다.
    export function teardown(data) {
        producer.close();
        consumer.close();
    }
    

    실행 테스트
    파일을 저장하고 터미널에서 다음 명령을 실행합니다.
    ./k6 run --vus 50 --duration 5s test_script.js
    
    다음과 같은 출력이 표시되어야 합니다.
    running (05.0s), 00/50 VUs, 15136 complete and 0 interrupted iterations
    default ✓ [======================================] 50 VUs  5s
    
        ✓ is sent
    
        █ teardown
    
        checks.........................: 100.00% ✓ 15136  ✗ 0
        data_received..................: 0 B    0 B/s
        data_sent......................: 0 B    0 B/s
        iteration_duration.............: avg=16.49ms min=31.9µs med=13.52ms max=1.14s p(90)=28.55ms p(95)=36.46ms
        iterations.....................: 15136   3017.4609/s
        kafka.writer.dial.count........: 151    30.102841/s
        kafka.writer.error.count.......: 0      0/s
        kafka.writer.message.bytes.....: 5.2 MB  1.0 MB/s
        kafka.writer.message.count.....: 30272   6034.9218/s
        kafka.writer.rebalance.count...: 0      0/s
        kafka.writer.write.count.......: 30272   6034.9218/s
        vus............................: 5      min=5       max=50
        vus_max........................: 50     min=50      max=50
    

    부하 조절
    VU 수를 늘리면 부하를 쉽게 확장할 수 있습니다.예를 들어, 다음 명령은 1분간 500개의 VU 로드 테스트를 사용합니다.
    ./k6 run --vus 500 --duration 1m test_script.js
    

    If you are new to k6, check out how to configure the load options in the script or run a stress test with k6.



    테스트 시간 연장
    위의 스크립트는 Kafka 서버에 메시지를 생성하는 것에 관한 것입니다.사실상, 코드를 메시지를 생성하고 사용하는 테스트로 쉽게 수정할 수 있습니다.
    for 순환 코드 아래에 다음 코드만 추가하면 됩니다.
    let result = consume(consumer, 10);
    check(result, {
        "10 messages returned": (msgs) => msgs.length == 10,
    });
    
    코드는 매번 10개의 메시지를 읽을 것이다.더 많은 메시지를 사용하려면 값을 더 높은 값으로 수정하십시오.
    같은 명령을 사용하여 실행할 때 출력은 다음과 같습니다.
    running (05.0s), 00/50 VUs, 9778 complete and 0 interrupted iterations
    default ✓ [======================================] 50 VUs  5s
    
        ✓ is sent
        ✓ 10 messages returned
    
        █ teardown
    
        checks.........................: 100.00% ✓ 19556      ✗ 0
        data_received..................: 0 B    0 B/s
        data_sent......................: 0 B    0 B/s
        iteration_duration.............: avg=25.53ms min=41.4µs med=18ms max=1.41s p(90)=37.73ms p(95)=52.37ms
        iterations.....................: 9778   1946.80798/s
        kafka.reader.dial.count........: 50     9.955042/s
        kafka.reader.error.count.......: 0      0/s
        kafka.reader.fetches.count.....: 101    20.109184/s
        kafka.reader.message.bytes.....: 15 MB   2.9 MB/s
        kafka.reader.message.count.....: 97830   19478.034846/s
        kafka.reader.rebalance.count...: 0      0/s
        kafka.reader.timeouts.count....: 46     9.158638/s
        kafka.writer.dial.count........: 152    30.263327/s
        kafka.writer.error.count.......: 0      0/s
        kafka.writer.message.bytes.....: 3.4 MB  669 kB/s
        kafka.writer.message.count.....: 19556   3893.615961/s
        kafka.writer.rebalance.count...: 0      0/s
        kafka.writer.write.count.......: 19556   3893.615961/s
        vus............................: 50     min=50      max=50
        vus_max........................: 50     min=50      max=50
    

    k6의 카프카 도량
    기본적으로 k6는 자신의built-in metrics 자동 수집이 있습니다.이외에도 own custom metrics를 만들 수 있습니다.사용자 지정 지표는 다음 유형으로 나눌 수 있습니다.

  • Counter: 누적치 총화의 도량.

  • Gauge: 최소값, 최대값, 마지막 값에 추가된 메트릭을 저장합니다.

  • Rate: 제로 부가가치 비율을 추적하는 지표.

  • Trend: 부가값(최소값, 최대값, 평균값과 백분위수) 통계 데이터의 도량을 계산할 수 있습니다.
  • k6를 제외하고 k6 확장은 지표를 수집하여 k6 results output의 일부로 보고할 수 있다.이 예에서 xk6 카프카는 독자와 작가를 위해 개인 데이터를 수집한다.

    독자.
    reader의 지표를 봅시다.
    운율학
    타입
    묘사
    카프카.독자다이얼을 돌리다.헤아리다
    카운터
    독자가 카프카에 연결을 시도한 총 횟수.
    카프카.독자잘못 1헤아리다
    카운터
    카프카를 읽을 때 발생한 오류의 총 수입니다.
    카프카.독자끌어당기다헤아리다
    카운터
    리더가 카프카에서 대량의 정보를 얻는 총 횟수
    카프카.독자메모바이트
    카운터
    소모된 총 바이트 수.
    카프카.독자메모헤아리다
    카운터
    사용한 메일의 총 수입니다.
    카프카.독자다시 균형을 잡다.헤아리다
    카운터
    소비자 집단에서 어떤 주제의 재균형 총수 (폐기).
    카프카.독자타임.헤아리다
    카운터
    카프카에서 읽을 때 발생한 시간 초과 총수

    작가.
    writer에 대해 지표는 다음과 같다.
    운율학
    타입
    묘사
    카프카.작가다이얼을 돌리다.헤아리다
    카운터
    저자는 카프카에 연결된 총 횟수를 시도했다.
    카프카.작가잘못 1헤아리다
    카운터
    카프카에 쓸 때 발생한 오류의 총 수입니다.
    카프카.작가메모바이트
    카운터
    생성된 총 바이트 수입니다.
    카프카.작가메모헤아리다
    카운터
    생성된 메일의 총 수입니다.
    카프카.작가다시 균형을 잡다.헤아리다
    카운터
    테마의 재균형 총 수 (사용하지 않음).
    카프카.작가쓰다헤아리다
    카운터
    컴파일러는 카프카의 총 횟수에 메시지를 대량으로 기록합니다.
    더 많은 카프카 지표here를 찾을 수 있습니다.그러나 확장은 모든 지표를 수집하지 않았다.너는 이것GitHub issue에 따라 그들이 추가한 진도를 추적할 수 있다.

    더 많은 예
    그 밖에 xk6 카프카 메모리 라이브러리는 새로운 사용자에게 기존의 few test scripts를 제공했다.이 문서에는 다음과 같은 테스트가 포함되어 있습니다.

  • test_avro.js: 매번 200개의 Avro 메시지 테스트 카프카드를 교체한다.

  • test_avro_with_schema_registry.js: schema registry
  • 를 사용하여 매번 교체할 때마다 200개의 Avro 메시지 테스트 카프카드를 사용합니다

  • test_json.js: 매번 200개의 JSON 메시지 테스트 카프카드를 교체한다.

  • test_json_with_snappy_compression.js: snappy 압축
  • 을 사용하여 매번 교체할 때마다 200개의 JSON 메시지 테스트 카프카드를 사용

  • test_sasl_auth.js: SASL 인증을 바탕으로 매번 200개의 JSON 메시지 테스트 카프카드를 교체한다.

  • test_topics.js: 모든 카프카 구역의 테마를 표시하고 무작위 테마를 만듭니다.
  • 당신은 그것들을 마음대로 시험적으로 사용하고, 자신의 용례에 따라 코드를 수정할 수 있습니다.문제가 발생하면 보고on GitHub하십시오.

    결론
    한마디로 k6를 사용하여 아파치 카프카에 대한 부하 테스트를 하는 것이 훨씬 쉬워졌다.k6는 부하 테스트를 만들고 확장하는 데 기초를 제공했고 xk6 kafka는 kafka 서버와 상호작용하는 데 편리한 API를 제공했다.
    사용 가능한 k6 확장에 대한 더 많은 정보를 알고 싶으면 bundle builder page만 방문하십시오.이 페이지에서는 사용자 정의 k6 바이너리 파일을 구축하기 위해 명령을 생성할 수 있습니다.
    질문이 있거나 확장에 관심이 있으면 슬랙의 k6 커뮤니티에 가입하십시오.

    좋은 웹페이지 즐겨찾기