Go, Kafka, gRPC 및 MongoDB 마이크로서비스, 메트릭 및 추적 기능👋
Kafka 메시징 에이전트
gRPC gRPC의 Go 구현
MongoDB 데이터베이스로
Jaeger 오픈 소스, 종단 간 분산tracing
Prometheus 모니터링 및 경고
Grafana 프로메테우스(Prometheus)의 모든 제품으로 관측 가능한 계기판을 구성하는 데 사용
GitHub repository 에서 원본 코드를 찾을 수 있습니다
나는 Kafka에 중점을 두고 싶다. 왜냐하면 나는 처음 시도하는 것이기 때문에 하면서 배우고 싶다👨💻
물론 모든 마이크로 서비스는 모니터링과 추적이 필요하기 때문에 여기에 포함된다⚡️
Kafka는 높은 통량의 분포식 메시지 전달 시스템이다.
언제든지 하나의 에이전트만이 구역을 정하는 지도자가 될 수 있고 그 지도자만이 구역의 서비스 데이터를 받을 수 있다.
다른 매니저들은 데이터를 동기화한다.
프로듀서:
생산자는 데이터를 테마에 기록하고 어떤 에이전트와 구역에 기록할지 자동으로 알 수 있습니다.
에이전트가 실패하면 프로덕션이 자동으로 복원됩니다.
운영 업체는 데이터 쓰기 확인을 받을 수 있습니다.
min.insync를 지정할 수 있습니다.복사 복사본은 1(acks==1 당량)보다 낮거나 복사 계수가 높거나 둘 사이에 있기 때문에 b/w의 가용성과 일치성 사이의 균형을 잘 조절할 수 있습니다.
여기 좋아요article
소비자는 하나의 주제에서 데이터를 읽고, 어느 에이전트에서 데이터를 읽는지 안다.
데이터는 각 구역에서 순서대로 읽힌다.
소비자:
소비자는 소비자 집단의 데이터를 읽는다.그룹의 모든 사용자는 독점 구역에서 데이터를 읽습니다.
따라서 사용자가 구역보다 많으면 일부 사용자는 비활성 상태가 됩니다.
소비자 상쇄:
카프카 메모리 소비층이 읽던 편이량
그룹의 소비자가 데이터를 처리했을 때, 편향량을 제출해야 합니다.
만약 소비자의 정서가 저조하다면, 그것은 멈춘 곳에서 다시 읽을 수 있을 것이다.
소비자는 보상을 언제 제출할 것인지를 선택한다.
총 3개의 제공 사례:
최대 한 번:
또 하나 중요한 기능은 Compression입니다.
압축은 생산자 단계에서 사용되며 에이전트나 사용자 중 어떠한 설정 변경도 필요 없습니다.
기본적으로, 이것은 none입니다. 왜냐하면 시작점은 snappy나lz4이기 때문입니다.
지역 발전:
make local // runs docker-compose.local.yml
make crate_topics // create kafka topics
make mongo // load js init script to mongo docker container
make make_cert // generate local SLL certificates
make swagger // generate swagger documentation
UI 인터페이스는 다음 포트에서 사용할 수 있습니다.Jaeger 사용자 인터페이스:http://localhost:16686
프로메테우스 사용자 인터페이스:http://localhost:9090
Grafana 사용자 인터페이스:http://localhost:3000
카프카:http://localhost:9000
기본적으로 Swagger UI는 다음 장치에서 실행됩니다.https://localhost:5007/swagger/index.html
Grafana에서 플로메테우스를 도량원으로 선택한 다음 계기판을 만들어야 합니다.
좋은 카프카드 docker 설정과 enclouded UI는 confluent 이지만, 전 세계 인터넷의 절반을 로컬 pc로 다운로드할 수 있는 커다란 이미지 크기를 가지고 있습니다.🤖 이 때문에 UI 클라이언트를 사용하고 있습니다kafdrop.
Docker 작성로컬yml:
version: "3.8"
services:
zookeeper:
container_name: zookeeper
restart: always
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
networks:
- products_network
kafka1:
container_name: kafka1
image: confluentinc/cp-kafka:5.3.0
restart: always
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
- products_network
kafka2:
container_name: kafka2
restart: always
image: confluentinc/cp-kafka:5.3.0
hostname: kafka2
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 2
volumes:
- ./data/kafka2/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
- products_network
kafka3:
container_name: kafka3
image: confluentinc/cp-kafka:5.3.0
restart: always
hostname: kafka3
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka3/data:/var/lib/kafka/data
depends_on:
- zookeeper
networks:
- products_network
kafdrop:
container_name: kafdrop
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:19091"
depends_on:
- kafka1
- kafka2
- kafka3
networks:
- products_network
redis:
image: redis:6-alpine
container_name: user_redis
ports:
- "6379:6379"
restart: always
networks:
- products_network
prometheus:
container_name: prometheus_container
restart: always
image: prom/prometheus
volumes:
- ./monitoring/prometheus-local.yml:/etc/prometheus/prometheus.yml:Z
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--storage.tsdb.retention=20d'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
ports:
- '9090:9090'
networks:
- products_network
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks:
- products_network
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3000:3000'
networks:
- products_network
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- 5775:5775/udp
- 6831:6831/udp
- 6832:6832/udp
- 5778:5778
- 16686:16686
- 14268:14268
- 14250:14250
- 9411:9411
networks:
- products_network
mongodb:
image: mongo:latest
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: products
ports:
- 27017:27017
volumes:
- mongodb_data_container:/data/db
volumes:
mongodb_data_container:
networks:
products_network:
driver: bridge
프로덕션에서 자주 사용하는 Go 고객segmentio 및 sarama의 경우둘 다 좋아요. 어느 것을 선택하느냐에 따라 결정됩니다. 이 프로젝트에 대해서는segmentio를 사용했습니다.
나는 이곳에서 어떤 재미있는 업무 논리도 실현하지 못했고, 테스트도 소개하지 않았다. 왜냐하면 이때 충분한 시간이 없기 때문이다.
우리의 마이크로 서비스는 카프카, gRPC, REST를 통해 통신할 수 있습니다.
Makefile에서 유용한 명령을 모두 찾을 수 있습니다.
docker에서 카프카드 테마 만들기
docker exec -it kafka1 kafka-topics --zookeeper zookeeper:2181 --create --topic create-product --partitions 3 --replication-factor 2
MongoDB에 대해 자바스크립트 파일을 불러올 수 있습니다. 이 파일은 집합과 인덱스를 만들 수 있습니다.mongo admin -u admin -p admin < init.js
Segmentio 라이브러리api는 독자와 작가를 제공합니다.독자를 먼저 만들려면 다음과 같이 하십시오.
func (pcg *ProductsConsumerGroup) getNewKafkaReader(kafkaURL []string, topic, groupID string) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: kafkaURL,
GroupID: groupID,
Topic: topic,
MinBytes: minBytes,
MaxBytes: maxBytes,
QueueCapacity: queueCapacity,
HeartbeatInterval: heartbeatInterval,
CommitInterval: commitInterval,
PartitionWatchInterval: partitionWatchInterval,
Logger: kafka.LoggerFunc(pcg.log.Debugf),
ErrorLogger: kafka.LoggerFunc(pcg.log.Errorf),
MaxAttempts: maxAttempts,
Dialer: &kafka.Dialer{
Timeout: dialTimeout,
},
})
}
작성자:func (pcg *ProductsConsumerGroup) getNewKafkaWriter(topic string) *kafka.Writer {
w := &kafka.Writer{
Addr: kafka.TCP(pcg.Brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
RequiredAcks: writerRequiredAcks,
MaxAttempts: writerMaxAttempts,
Logger: kafka.LoggerFunc(pcg.log.Debugf),
ErrorLogger: kafka.LoggerFunc(pcg.log.Errorf),
Compression: compress.Snappy,
ReadTimeout: writerReadTimeout,
WriteTimeout: writerWriteTimeout,
}
return w
}
그리고 Worker Pools로 소비자 만들기func (pcg *ProductsConsumerGroup) consumeCreateProduct(
ctx context.Context,
cancel context.CancelFunc,
groupID string,
topic string,
workersNum int,
) {
r := pcg.getNewKafkaReader(pcg.Brokers, topic, groupID)
defer cancel()
defer func() {
if err := r.Close(); err != nil {
pcg.log.Errorf("r.Close", err)
cancel()
}
}()
w := pcg.getNewKafkaWriter(deadLetterQueueTopic)
defer func() {
if err := w.Close(); err != nil {
pcg.log.Errorf("w.Close", err)
cancel()
}
}()
pcg.log.Infof("Starting consumer group: %v", r.Config().GroupID)
wg := &sync.WaitGroup{}
for i := 0; i <= workersNum; i++ {
wg.Add(1)
go pcg.createProductWorker(ctx, cancel, r, w, wg, i)
}
wg.Wait()
}
직원이 메시지체를 검증한 다음usecase를 호출합니다. 오류가 돌아오면 다시 시도해 보십시오. 좋은 재시도 라이브러리는 retry-go입니다.만약 다시 실패한다면 오류 메시지를 매우 간단Dead Letter Queue에 발표할 것이다. 내가 말한 바와 같이 여기는 아무런 흥미로운 업무 논리도 실현되지 않았기 때문에 실제 생산에서 우리는 더욱 좋은 방식으로 오류 사례를 처리해야 한다.
메시지가 성공적으로 처리된 후에 제출합니다.
func (pcg *ProductsConsumerGroup) createProductWorker(
ctx context.Context,
cancel context.CancelFunc,
r *kafka.Reader,
w *kafka.Writer,
wg *sync.WaitGroup,
workerID int,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "ProductsConsumerGroup.createProductWorker")
defer span.Finish()
span.LogFields(log.String("ConsumerGroup", r.Config().GroupID))
defer wg.Done()
defer cancel()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
pcg.log.Errorf("FetchMessage", err)
return
}
pcg.log.Infof(
"WORKER: %v, message at topic/partition/offset %v/%v/%v: %s = %s\n",
workerID,
m.Topic,
m.Partition,
m.Offset,
string(m.Key),
string(m.Value),
)
incomingMessages.Inc()
var prod models.Product
if err := json.Unmarshal(m.Value, &prod); err != nil {
errorMessages.Inc()
pcg.log.Errorf("json.Unmarshal", err)
continue
}
if err := pcg.validate.StructCtx(ctx, prod); err != nil {
errorMessages.Inc()
pcg.log.Errorf("validate.StructCtx", err)
continue
}
if err := retry.Do(func() error {
created, err := pcg.productsUC.Create(ctx, &prod)
if err != nil {
return err
}
pcg.log.Infof("created product: %v", created)
return nil
},
retry.Attempts(retryAttempts),
retry.Delay(retryDelay),
retry.Context(ctx),
); err != nil {
errorMessages.Inc()
if err := pcg.publishErrorMessage(ctx, w, m, err); err != nil {
pcg.log.Errorf("publishErrorMessage", err)
continue
}
pcg.log.Errorf("productsUC.Create.publishErrorMessage", err)
continue
}
if err := r.CommitMessages(ctx, m); err != nil {
errorMessages.Inc()
pcg.log.Errorf("CommitMessages", err)
continue
}
successMessages.Inc()
}
}
저장소 레이어mongo-go-driver에서 데이터베이스와 상호 작용// Create Create new product
func (p *productMongoRepo) Create(ctx context.Context, product *models.Product) (*models.Product, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productMongoRepo.Create")
defer span.Finish()
collection := p.mongoDB.Database(productsDB).Collection(productsCollection)
product.CreatedAt = time.Now().UTC()
product.UpdatedAt = time.Now().UTC()
result, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
if err != nil {
return nil, errors.Wrap(err, "InsertOne")
}
objectID, ok := result.InsertedID.(primitive.ObjectID)
if !ok {
return nil, errors.Wrap(productErrors.ErrObjectIDTypeConversion, "result.InsertedID")
}
product.ProductID = objectID
return product, nil
}
다음은 gRPC 서비스에서 os create handler를 구현하고 github 저장소에서 찾을 수 있는 전체 코드입니다.// Create create new product
func (p *productService) Create(ctx context.Context, req *productsService.CreateReq) (*productsService.CreateRes, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
defer span.Finish()
createMessages.Inc()
catID, err := primitive.ObjectIDFromHex(req.GetCategoryID())
if err != nil {
errorMessages.Inc()
p.log.Errorf("primitive.ObjectIDFromHex: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
prod := &models.Product{
CategoryID: catID,
Name: req.GetName(),
Description: req.GetDescription(),
Price: req.GetPrice(),
ImageURL: &req.ImageURL,
Photos: req.GetPhotos(),
Quantity: req.GetQuantity(),
Rating: int(req.GetRating()),
}
created, err := p.productUC.Create(ctx, prod)
if err != nil {
errorMessages.Inc()
p.log.Errorf("productUC.Create: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
successMessages.Inc()
return &productsService.CreateRes{Product: created.ToProto()}, nil
}
및 REST API 프로세서 사용echo:// CreateProduct Create product
// @Tags Products
// @Summary Create new product
// @Description Create new single product
// @Accept json
// @Produce json
// @Success 201 {object} models.Product
// @Router /products [post]
func (p *productHandlers) CreateProduct() echo.HandlerFunc {
return func(c echo.Context) error {
span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "productHandlers.Create")
defer span.Finish()
createRequests.Inc()
var prod models.Product
if err := c.Bind(&prod); err != nil {
p.log.Errorf("c.Bind: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := p.validate.StructCtx(ctx, &prod); err != nil {
p.log.Errorf("validate.StructCtx: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := p.productUC.PublishCreate(ctx, &prod); err != nil {
p.log.Errorf("productUC.PublishCreate: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
successRequests.Inc()
return c.NoContent(http.StatusCreated)
}
}
프로그램 처리의 맨 위에 프로메테우스의 오류와 과정 지표를 기록합니다.소스 코드와 사용 가능한 모든 도구 목록이 포함된 저장소here👨💻 :)
나는 이 글이 유용하고 유익하기를 바란다. 나는 어떤 피드백이나 문제를 받아도 매우 기쁠 것이다.
Reference
이 문제에 관하여(Go, Kafka, gRPC 및 MongoDB 마이크로서비스, 메트릭 및 추적 기능👋), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/aleksk1ng/go-kafka-grpc-and-mongodb-microservice-with-metrics-and-tracing-448d텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)