Kafka 주제 관리를 자동화하는 방법

21542 단어 iaccicdkafka
Kafka 주제는 이벤트를 구성하는 데 사용되는 범주입니다. 다양한 종류의 이벤트를 보유하기 위해 다양한 주제를 생성하고, 동일한 종류의 이벤트에 대해 필터링 및 변환된 버전을 보유하기 위해 다양한 주제를 생성합니다. 개발자로서 이 주제는 시스템에서 이벤트가 흐르는 방식을 디자인할 때 가장 먼저 생각하는 것 중 하나입니다. 주제를 생성한 다음 주제에 대한 이벤트를 읽거나 씁니다.

인프라가 코드를 통해 관리되고 자동화되는 DevOps 기반 팀에서 Kafka 항목 생성 또는 삭제를 어떻게 자동화합니까?

이번 포스트에서는 자바스크립트를 사용하여 어떻게 하는지 보여드리겠습니다. 서로 다른 인프라 설정에 대해 동일한 것을 달성하는 다양한 방법이 있습니다. 이 게시물에서는 VPC 내에서만 액세스할 수 있는 Kafka 클러스터를 실행 중이고 Kubernetes 클러스터도 있다고 가정해 보겠습니다. 솔루션은 주제를 생성하거나 삭제해야 할 때마다 Kubernetes 작업으로 실행되는 JavaScript 앱이 될 것입니다.

왜 자바스크립트인가? 음, Bash의 복잡성 없이 스크립트를 작성하는 쉬운 방법입니다. JavaScript 개발자가 있는 경우 다른 개발자도 기여할 수 있습니다. Python 매장이라면 Python을 사용하여 솔루션을 적용할 수도 있습니다.

애플리케이션 설정



솔루션은 Node.js 애플리케이션이며 이를 위해서는 Node.js 프로젝트가 필요합니다. npm init 명령을 사용하여 새 프로젝트를 생성할 수 있습니다. Node.js 및 npm이 없는 경우 nodejs.org/en/download에서 필요한 바이너리를 다운로드하여 설치해야 합니다.

앱을 만들려는 디렉터리로 터미널을 연 다음 npm init -y 명령을 실행합니다. npm install kafkajs 명령을 사용하여 Kafka JavaScript 클라이언트를 종속 항목으로 설치합니다.

솔루션 구현



애플리케이션은 JSON 파일을 통해 생성/삭제할 주제 목록을 읽습니다. 여기서 달성하고자 하는 것은 누구나 GitHub 리포지토리에서 JSON 파일을 변경하고 변경 사항이 포함된 PR을 열 수 있는 워크플로우입니다. PR이 기본 브랜치에 병합되면 코드는 해당 파일에서 데이터를 읽은 다음 원하는 대로 주제 목록을 생성하거나 삭제합니다.

이를 달성하려면 다음 콘텐츠가 포함된 topic.json이라는 JSON 파일을 생성해야 합니다.

{
  "create": [],
  "delete": []
}


이 구조를 사용하면 만들거나 삭제할 항목의 이름이 포함된 문자열 배열을 가질 수 있습니다. 또한 소스 제어 시스템에서 해당 파일을 보면 Kafka에서 생성된 주제에 대한 아이디어를 얻을 수 있습니다.

다음으로 api.js 파일을 만듭니다. 다음 코드 조각을 복사하여 api.js에 붙여넣습니다.

async function createTopics(topics, kafkaAdmin) {
  if (topics.length > 0) {
    await kafkaAdmin.createTopics({
      topics: topics.map((topic) => ({
        topic,
        numPartitions: 1,
        replicationFactor: 3,
        configEntries: [{ name: "min.insync.replicas", value: "2" }],
      })),
    });
  }
}

async function deleteTopics(topics, kafkaAdmin) {
  if (topics.length > 0) {
    await kafkaAdmin.deleteTopics({ topics: topics });
  }
}

module.exports = { createTopics, deleteTopics };


이 모듈은 Kafka 주제를 생성하고 삭제하는 기능을 내보냅니다. createTopics 함수는 주제 배열과 Kafka 관리 클라이언트 인스턴스를 인수로 사용합니다. 그런 다음 kafkaAdmin.createTopics를 호출하여 주제를 만듭니다. 지정된 파티션 및 구성 항목의 수는 단지 예일 뿐입니다. 설정과 일치하도록 구성해야 합니다.

새 파일 index.js를 만들고 다음 코드를 붙여넣습니다.

const { Kafka } = require("kafkajs");
const { createTopics, deleteTopics } = require("./api");
const topics = require("../topics.json");

const username = process.env.KAFKA_USERNAME;
const password = process.env.KAFKA_PASSWORD;
const brokers = process.env.KAFKA_URL ? process.env.KAFKA_URL.split(",") : [];

if (!username && !password && brokers.length === 0) {
  throw new Error("Missing Kafka Client Credential");
}

const kafka = new Kafka({
  clientId: "admin-script",
  brokers: brokers,
  ssl: {
    rejectUnauthorized: false,
  },
  sasl: {
    mechanism: "scram-sha-512",
    username,
    password,
  },
});

const admin = kafka.admin();

admin.connect().then(async () => {
  const existingTopics = await admin.listTopics();

  const newTopics = topics.create.filter((x) => !existingTopics.includes(x));
  await createTopics(newTopics, admin);

  const deletionTopics = topics.delete.filter((x) =>
    existingTopics.includes(x)
  );
  await deleteTopics(deletionTopics, admin);

  await admin.disconnect();
});


위의 코드는 Kafka 클라이언트를 생성하고 Kafka admin API에 연결합니다. 연결이 설정된 후 함수 createTopicsdeleteTopics를 각각 호출한 다음 종료합니다.

GitHub 작업을 사용한 자동화



코드가 GitHub 리포지토리에 있다고 가정하고, topic.json 파일이 수정될 때마다 Node.js 앱을 실행하는 Kubernetes 작업을 실행하려고 합니다. 우리는 GitHub Actions를 사용하여 이를 수행할 것입니다.

kafka.yml 파일을 .github/workflows 디렉토리에 추가합니다.

name: Deploy Kafka Topics Job

on:
  push:
    branches: [main]

env:
  JOB_NAME: kafka-topics
  AWS_REGION: eu-west-1
  KUBERNETES_CLUSTER: demo-cluster
  KUBERNETES_NAMESPACE: default

jobs:
  build-and-push:
    name: Build & Push to ECR
    runs-on: ubuntu-latest
    steps:
      - name: Git checkout
        uses: actions/checkout@v3

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: eu-west-1

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Add short commit hash
        id: short-commit-hash
        run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"

      - name: Build Docker container and push to ECR
        uses: dfreilich/[email protected]
        env:
          ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
          IMAGE_TAG: ${{ steps.short-commit-hash.outputs.sha_short }}
        with:
          args: "build ${{ env.ECR_REGISTRY }}/${{ env.JOB_NAME}}:${{ env.IMAGE_TAG}} --builder heroku/buildpacks --buildpack heroku/nodejs --publish"

  deploy-job:
    name: Deploy to Kubernetes
    needs: [build-and-push]
    runs-on: ubuntu-latest
    steps:
      - name: Git checkout
        uses: actions/checkout@v3

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: eu-west-1

      - name: Login to Amazon ECR
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Add short commit hash
        id: short-commit-hash
        run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"

      - name: Set Image Name
        env:
          ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
          IMAGE_TAG: ${{ steps.short-commit-hash.outputs.sha_short }}
        run: 'echo "IMAGE_NAME=$(echo ${ECR_REGISTRY})/$(echo ${JOB_NAME}):$(echo ${IMAGE_TAG})" >> $GITHUB_ENV'

      - name: Create Job
        env:
          SHA: ${{ steps.short-commit-hash.outputs.sha_short }}
        run: |
          aws eks update-kubeconfig \
            --region ${AWS_REGION} \
            --name ${KUBERNETES_CLUSTER}

          cat <<EOF | kubectl apply -f -
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: ${JOB_NAME}-${SHA}
            namespace: ${KUBERNETES_NAMESPACE}
            labels:
              jobgroup: ${JOB_NAME}
          spec:
            ttlSecondsAfterFinished: 259200
            template:
              spec:
                containers:
                - name: ${JOB_NAME}-${SHA}
                  image: ${IMAGE_NAME}
                  envFrom:
                  - secretRef:
                      name: kafka-secrets
                restartPolicy: Never
            backoffLimit: 2
          EOF


위의 워크플로에는 두 가지 작업이 있습니다. build-and-push 작업은 Pack CLIPaketo Buildpacks을 사용하여 컨테이너 이미지를 빌드하고 빌드된 이미지를 AWS 컨테이너 레지스트리로 푸시합니다. deploy-job는 빌드된 이미지를 사용하여 Kubernetes 작업을 생성합니다.

이 워크플로는 AWS 및 GitHub 작업을 사용한다고 가정하지만 다른 소스 제어 및 CI/CD 시스템에 대해 유사한 것을 복제할 수 있습니다. 여기서 요점은 JavaScript, JSON 파일 및 GitHub Actions를 사용하여 JSON 파일이 변경될 때 Kafka 주제 생성 또는 삭제를 자동화하는 것입니다. 이 개념은 사용 사례에 맞게 조정할 수 있습니다.

좋은 웹페이지 즐겨찾기