Kafka 주제 관리를 자동화하는 방법
인프라가 코드를 통해 관리되고 자동화되는 DevOps 기반 팀에서 Kafka 항목 생성 또는 삭제를 어떻게 자동화합니까?
이번 포스트에서는 자바스크립트를 사용하여 어떻게 하는지 보여드리겠습니다. 서로 다른 인프라 설정에 대해 동일한 것을 달성하는 다양한 방법이 있습니다. 이 게시물에서는 VPC 내에서만 액세스할 수 있는 Kafka 클러스터를 실행 중이고 Kubernetes 클러스터도 있다고 가정해 보겠습니다. 솔루션은 주제를 생성하거나 삭제해야 할 때마다 Kubernetes 작업으로 실행되는 JavaScript 앱이 될 것입니다.
왜 자바스크립트인가? 음, Bash의 복잡성 없이 스크립트를 작성하는 쉬운 방법입니다. JavaScript 개발자가 있는 경우 다른 개발자도 기여할 수 있습니다. Python 매장이라면 Python을 사용하여 솔루션을 적용할 수도 있습니다.
애플리케이션 설정
솔루션은 Node.js 애플리케이션이며 이를 위해서는 Node.js 프로젝트가 필요합니다.
npm init
명령을 사용하여 새 프로젝트를 생성할 수 있습니다. Node.js 및 npm이 없는 경우 nodejs.org/en/download에서 필요한 바이너리를 다운로드하여 설치해야 합니다.앱을 만들려는 디렉터리로 터미널을 연 다음
npm init -y
명령을 실행합니다. npm install kafkajs
명령을 사용하여 Kafka JavaScript 클라이언트를 종속 항목으로 설치합니다.솔루션 구현
애플리케이션은 JSON 파일을 통해 생성/삭제할 주제 목록을 읽습니다. 여기서 달성하고자 하는 것은 누구나 GitHub 리포지토리에서 JSON 파일을 변경하고 변경 사항이 포함된 PR을 열 수 있는 워크플로우입니다. PR이 기본 브랜치에 병합되면 코드는 해당 파일에서 데이터를 읽은 다음 원하는 대로 주제 목록을 생성하거나 삭제합니다.
이를 달성하려면 다음 콘텐츠가 포함된 topic.json이라는 JSON 파일을 생성해야 합니다.
{
"create": [],
"delete": []
}
이 구조를 사용하면 만들거나 삭제할 항목의 이름이 포함된 문자열 배열을 가질 수 있습니다. 또한 소스 제어 시스템에서 해당 파일을 보면 Kafka에서 생성된 주제에 대한 아이디어를 얻을 수 있습니다.
다음으로 api.js 파일을 만듭니다. 다음 코드 조각을 복사하여 api.js에 붙여넣습니다.
async function createTopics(topics, kafkaAdmin) {
if (topics.length > 0) {
await kafkaAdmin.createTopics({
topics: topics.map((topic) => ({
topic,
numPartitions: 1,
replicationFactor: 3,
configEntries: [{ name: "min.insync.replicas", value: "2" }],
})),
});
}
}
async function deleteTopics(topics, kafkaAdmin) {
if (topics.length > 0) {
await kafkaAdmin.deleteTopics({ topics: topics });
}
}
module.exports = { createTopics, deleteTopics };
이 모듈은 Kafka 주제를 생성하고 삭제하는 기능을 내보냅니다.
createTopics
함수는 주제 배열과 Kafka 관리 클라이언트 인스턴스를 인수로 사용합니다. 그런 다음 kafkaAdmin.createTopics
를 호출하여 주제를 만듭니다. 지정된 파티션 및 구성 항목의 수는 단지 예일 뿐입니다. 설정과 일치하도록 구성해야 합니다.새 파일 index.js를 만들고 다음 코드를 붙여넣습니다.
const { Kafka } = require("kafkajs");
const { createTopics, deleteTopics } = require("./api");
const topics = require("../topics.json");
const username = process.env.KAFKA_USERNAME;
const password = process.env.KAFKA_PASSWORD;
const brokers = process.env.KAFKA_URL ? process.env.KAFKA_URL.split(",") : [];
if (!username && !password && brokers.length === 0) {
throw new Error("Missing Kafka Client Credential");
}
const kafka = new Kafka({
clientId: "admin-script",
brokers: brokers,
ssl: {
rejectUnauthorized: false,
},
sasl: {
mechanism: "scram-sha-512",
username,
password,
},
});
const admin = kafka.admin();
admin.connect().then(async () => {
const existingTopics = await admin.listTopics();
const newTopics = topics.create.filter((x) => !existingTopics.includes(x));
await createTopics(newTopics, admin);
const deletionTopics = topics.delete.filter((x) =>
existingTopics.includes(x)
);
await deleteTopics(deletionTopics, admin);
await admin.disconnect();
});
위의 코드는 Kafka 클라이언트를 생성하고 Kafka admin API에 연결합니다. 연결이 설정된 후 함수
createTopics
및 deleteTopics
를 각각 호출한 다음 종료합니다.GitHub 작업을 사용한 자동화
코드가 GitHub 리포지토리에 있다고 가정하고, topic.json 파일이 수정될 때마다 Node.js 앱을 실행하는 Kubernetes 작업을 실행하려고 합니다. 우리는 GitHub Actions를 사용하여 이를 수행할 것입니다.
kafka.yml 파일을 .github/workflows 디렉토리에 추가합니다.
name: Deploy Kafka Topics Job
on:
push:
branches: [main]
env:
JOB_NAME: kafka-topics
AWS_REGION: eu-west-1
KUBERNETES_CLUSTER: demo-cluster
KUBERNETES_NAMESPACE: default
jobs:
build-and-push:
name: Build & Push to ECR
runs-on: ubuntu-latest
steps:
- name: Git checkout
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-1
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Add short commit hash
id: short-commit-hash
run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
- name: Build Docker container and push to ECR
uses: dfreilich/[email protected]
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
IMAGE_TAG: ${{ steps.short-commit-hash.outputs.sha_short }}
with:
args: "build ${{ env.ECR_REGISTRY }}/${{ env.JOB_NAME}}:${{ env.IMAGE_TAG}} --builder heroku/buildpacks --buildpack heroku/nodejs --publish"
deploy-job:
name: Deploy to Kubernetes
needs: [build-and-push]
runs-on: ubuntu-latest
steps:
- name: Git checkout
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-1
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Add short commit hash
id: short-commit-hash
run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
- name: Set Image Name
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
IMAGE_TAG: ${{ steps.short-commit-hash.outputs.sha_short }}
run: 'echo "IMAGE_NAME=$(echo ${ECR_REGISTRY})/$(echo ${JOB_NAME}):$(echo ${IMAGE_TAG})" >> $GITHUB_ENV'
- name: Create Job
env:
SHA: ${{ steps.short-commit-hash.outputs.sha_short }}
run: |
aws eks update-kubeconfig \
--region ${AWS_REGION} \
--name ${KUBERNETES_CLUSTER}
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
name: ${JOB_NAME}-${SHA}
namespace: ${KUBERNETES_NAMESPACE}
labels:
jobgroup: ${JOB_NAME}
spec:
ttlSecondsAfterFinished: 259200
template:
spec:
containers:
- name: ${JOB_NAME}-${SHA}
image: ${IMAGE_NAME}
envFrom:
- secretRef:
name: kafka-secrets
restartPolicy: Never
backoffLimit: 2
EOF
위의 워크플로에는 두 가지 작업이 있습니다.
build-and-push
작업은 Pack CLI 및 Paketo Buildpacks을 사용하여 컨테이너 이미지를 빌드하고 빌드된 이미지를 AWS 컨테이너 레지스트리로 푸시합니다. deploy-job
는 빌드된 이미지를 사용하여 Kubernetes 작업을 생성합니다.이 워크플로는 AWS 및 GitHub 작업을 사용한다고 가정하지만 다른 소스 제어 및 CI/CD 시스템에 대해 유사한 것을 복제할 수 있습니다. 여기서 요점은 JavaScript, JSON 파일 및 GitHub Actions를 사용하여 JSON 파일이 변경될 때 Kafka 주제 생성 또는 삭제를 자동화하는 것입니다. 이 개념은 사용 사례에 맞게 조정할 수 있습니다.
Reference
이 문제에 관하여(Kafka 주제 관리를 자동화하는 방법), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/pmbanugo/how-to-automate-kafka-topic-management-3kj7텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)