Go, Kafka, gRPC 및 MongoDB 마이크로서비스, 메트릭 및 추적 기능👋

50103 단어 mongodbkafkagrpcgo
이 글은 다음과 같은 방법으로 청결 체계 구조의 마이크로 서비스를 실현하고자 한다.🚀
Kafka 메시징 에이전트
gRPC gRPC의 Go 구현
MongoDB 데이터베이스로
Jaeger 오픈 소스, 종단 간 분산tracing
Prometheus 모니터링 및 경고
Grafana 프로메테우스(Prometheus)의 모든 제품으로 관측 가능한 계기판을 구성하는 데 사용
GitHub repository 에서 원본 코드를 찾을 수 있습니다
나는 Kafka에 중점을 두고 싶다. 왜냐하면 나는 처음 시도하는 것이기 때문에 하면서 배우고 싶다👨‍💻
물론 모든 마이크로 서비스는 모니터링과 추적이 필요하기 때문에 여기에 포함된다⚡️
Kafka는 높은 통량의 분포식 메시지 전달 시스템이다.
언제든지 하나의 에이전트만이 구역을 정하는 지도자가 될 수 있고 그 지도자만이 구역의 서비스 데이터를 받을 수 있다.
다른 매니저들은 데이터를 동기화한다.
프로듀서:
생산자는 데이터를 테마에 기록하고 어떤 에이전트와 구역에 기록할지 자동으로 알 수 있습니다.
에이전트가 실패하면 프로덕션이 자동으로 복원됩니다.
운영 업체는 데이터 쓰기 확인을 받을 수 있습니다.
  • acks=0.프로듀서는 확인을 요구하지 않는다. 이는 데이터를 잃어버릴 수도 있다는 뜻이다.
  • acks=1.생산 업체들은 지도자의 확인을 기다릴 것이며, 어떤 경우에는 데이터 분실도 제한될 수 있다.
  • acks=all.Leader + 사본 확인, min.insync에서 지정한 프록시 수량의 확인이 필요합니다.던전 프록시 설정 키입니다. 현재 동기화된 던전이 적으면 생성에 실패합니다.
    min.insync를 지정할 수 있습니다.복사 복사본은 1(acks==1 당량)보다 낮거나 복사 계수가 높거나 둘 사이에 있기 때문에 b/w의 가용성과 일치성 사이의 균형을 잘 조절할 수 있습니다.
    여기 좋아요article
  • 프로듀서는 메시지가 있는 키를 보낼 수 있습니다.
  • 중요한 데이터를 순환을 통해 보내지 않으면
  • 키를 보내면 키의 모든 메시지가 항상 같은 구역으로 전송됩니다.
  • 소비자:
    소비자는 하나의 주제에서 데이터를 읽고, 어느 에이전트에서 데이터를 읽는지 안다.
    데이터는 각 구역에서 순서대로 읽힌다.
    소비자:
    소비자는 소비자 집단의 데이터를 읽는다.그룹의 모든 사용자는 독점 구역에서 데이터를 읽습니다.
    따라서 사용자가 구역보다 많으면 일부 사용자는 비활성 상태가 됩니다.
    소비자 상쇄:
    카프카 메모리 소비층이 읽던 편이량
    그룹의 소비자가 데이터를 처리했을 때, 편향량을 제출해야 합니다.
    만약 소비자의 정서가 저조하다면, 그것은 멈춘 곳에서 다시 읽을 수 있을 것이다.
    소비자는 보상을 언제 제출할 것인지를 선택한다.
    총 3개의 제공 사례:
    최대 한 번:
  • 메시지 수신 즉시 오프셋 제출
  • 프로세스가 실패하면 메시지를 잃어버리고 더 이상 읽지 않습니다.
  • 한 번 이상:
  • 메시지 처리 후 오프셋 제출
  • 프로세스가 실패하면 메시지를 다시 읽습니다.
  • 이것은 중복 처리 정보를 초래할 수 있기 때문에 처리가 幂 등이고 시스템에 영향을 주지 않도록 확보하는 것이 매우 중요하다.
  • 단 한 번만:
  • kafkastreamsapi를 통해kakfa에서kafka로의 통신을 실현할 수 있다.
  • 외부 시스템의 사용에 대해 우리는 幂 등 소비자를 사용해야 한다.
  • 압축
    또 하나 중요한 기능은 Compression입니다.
    압축은 생산자 단계에서 사용되며 에이전트나 사용자 중 어떠한 설정 변경도 필요 없습니다.
    기본적으로, 이것은 none입니다. 왜냐하면 시작점은 snappy나lz4이기 때문입니다.
    지역 발전:
    make local // runs docker-compose.local.yml
    make crate_topics // create kafka topics
    make mongo // load js init script to mongo docker container
    make make_cert // generate local SLL certificates
    make swagger // generate swagger documentation
    
    UI 인터페이스는 다음 포트에서 사용할 수 있습니다.

    Jaeger 사용자 인터페이스:http://localhost:16686


    프로메테우스 사용자 인터페이스:http://localhost:9090


    Grafana 사용자 인터페이스:http://localhost:3000


    카프카:http://localhost:9000


    기본적으로 Swagger UI는 다음 장치에서 실행됩니다.https://localhost:5007/swagger/index.html





    Grafana에서 플로메테우스를 도량원으로 선택한 다음 계기판을 만들어야 합니다.

    좋은 카프카드 docker 설정과 enclouded UI는 confluent 이지만, 전 세계 인터넷의 절반을 로컬 pc로 다운로드할 수 있는 커다란 이미지 크기를 가지고 있습니다.🤖 이 때문에 UI 클라이언트를 사용하고 있습니다kafdrop.
    Docker 작성로컬yml:
    version: "3.8"
    
    services:
      zookeeper:
        container_name: zookeeper
        restart: always
        image: zookeeper:3.4.9
        hostname: zookeeper
        ports:
          - "2181:2181"
        environment:
          ZOO_MY_ID: 1
          ZOO_PORT: 2181
          ZOO_SERVERS: server.1=zookeeper:2888:3888
        volumes:
          - ./data/zookeeper/data:/data
          - ./data/zookeeper/datalog:/datalog
        networks:
          - products_network
    
    
      kafka1:
        container_name: kafka1
        image: confluentinc/cp-kafka:5.3.0
        restart: always
        hostname: kafka1
        ports:
          - "9091:9091"
        environment:
          KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
          KAFKA_BROKER_ID: 1
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        volumes:
          - ./data/kafka1/data:/var/lib/kafka/data
        depends_on:
          - zookeeper
        networks:
          - products_network
    
      kafka2:
        container_name: kafka2
        restart: always
        image: confluentinc/cp-kafka:5.3.0
        hostname: kafka2
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_BROKER_ID: 2
        volumes:
          - ./data/kafka2/data:/var/lib/kafka/data
        depends_on:
          - zookeeper
        networks:
          - products_network
    
      kafka3:
        container_name: kafka3
        image: confluentinc/cp-kafka:5.3.0
        restart: always
        hostname: kafka3
        ports:
          - "9093:9093"
        environment:
          KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
          KAFKA_BROKER_ID: 3
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        volumes:
          - ./data/kafka3/data:/var/lib/kafka/data
        depends_on:
          - zookeeper
        networks:
          - products_network
    
      kafdrop:
        container_name: kafdrop
        image: obsidiandynamics/kafdrop
        restart: "no"
        ports:
          - "9000:9000"
        environment:
          KAFKA_BROKERCONNECT: "kafka1:19091"
        depends_on:
          - kafka1
          - kafka2
          - kafka3
        networks:
          - products_network
    
      redis:
        image: redis:6-alpine
        container_name: user_redis
        ports:
          - "6379:6379"
        restart: always
        networks:
          - products_network
    
      prometheus:
        container_name: prometheus_container
        restart: always
        image: prom/prometheus
        volumes:
          - ./monitoring/prometheus-local.yml:/etc/prometheus/prometheus.yml:Z
        command:
          - '--config.file=/etc/prometheus/prometheus.yml'
          - '--storage.tsdb.path=/prometheus'
          - '--storage.tsdb.retention=20d'
          - '--web.console.libraries=/usr/share/prometheus/console_libraries'
          - '--web.console.templates=/usr/share/prometheus/consoles'
        ports:
          - '9090:9090'
        networks:
          - products_network
    
    
      node_exporter:
        container_name: node_exporter_container
        restart: always
        image: prom/node-exporter
        ports:
          - '9101:9100'
        networks:
          - products_network
    
      grafana:
        container_name: grafana_container
        restart: always
        image: grafana/grafana
        ports:
          - '3000:3000'
        networks:
          - products_network
    
      jaeger:
        container_name: jaeger_container
        restart: always
        image: jaegertracing/all-in-one:1.21
        environment:
          - COLLECTOR_ZIPKIN_HTTP_PORT=9411
        ports:
          - 5775:5775/udp
          - 6831:6831/udp
          - 6832:6832/udp
          - 5778:5778
          - 16686:16686
          - 14268:14268
          - 14250:14250
          - 9411:9411
        networks:
          - products_network
    
      mongodb:
        image: mongo:latest
        restart: always
        environment:
          MONGO_INITDB_ROOT_USERNAME: admin
          MONGO_INITDB_ROOT_PASSWORD: admin
          MONGODB_DATABASE: products
        ports:
          - 27017:27017
        volumes:
          - mongodb_data_container:/data/db
    
    volumes:
      mongodb_data_container:
    
    networks:
      products_network:
        driver: bridge
    
    프로덕션에서 자주 사용하는 Go 고객segmentiosarama의 경우
    둘 다 좋아요. 어느 것을 선택하느냐에 따라 결정됩니다. 이 프로젝트에 대해서는segmentio를 사용했습니다.
    나는 이곳에서 어떤 재미있는 업무 논리도 실현하지 못했고, 테스트도 소개하지 않았다. 왜냐하면 이때 충분한 시간이 없기 때문이다.
    우리의 마이크로 서비스는 카프카, gRPC, REST를 통해 통신할 수 있습니다.
    Makefile에서 유용한 명령을 모두 찾을 수 있습니다.
    docker에서 카프카드 테마 만들기
    docker exec -it kafka1 kafka-topics --zookeeper zookeeper:2181 --create --topic create-product --partitions 3 --replication-factor 2
    
    MongoDB에 대해 자바스크립트 파일을 불러올 수 있습니다. 이 파일은 집합과 인덱스를 만들 수 있습니다.
    mongo admin -u admin -p admin < init.js
    
    Segmentio 라이브러리api는 독자와 작가를 제공합니다.
    독자를 먼저 만들려면 다음과 같이 하십시오.
    func (pcg *ProductsConsumerGroup) getNewKafkaReader(kafkaURL []string, topic, groupID string) *kafka.Reader {
        return kafka.NewReader(kafka.ReaderConfig{
            Brokers:                kafkaURL,
            GroupID:                groupID,
            Topic:                  topic,
            MinBytes:               minBytes,
            MaxBytes:               maxBytes,
            QueueCapacity:          queueCapacity,
            HeartbeatInterval:      heartbeatInterval,
            CommitInterval:         commitInterval,
            PartitionWatchInterval: partitionWatchInterval,
            Logger:                 kafka.LoggerFunc(pcg.log.Debugf),
            ErrorLogger:            kafka.LoggerFunc(pcg.log.Errorf),
            MaxAttempts:            maxAttempts,
            Dialer: &kafka.Dialer{
                Timeout: dialTimeout,
            },
        })
    }
    
    작성자:
    func (pcg *ProductsConsumerGroup) getNewKafkaWriter(topic string) *kafka.Writer {
        w := &kafka.Writer{
            Addr:         kafka.TCP(pcg.Brokers...),
            Topic:        topic,
            Balancer:     &kafka.LeastBytes{},
            RequiredAcks: writerRequiredAcks,
            MaxAttempts:  writerMaxAttempts,
            Logger:       kafka.LoggerFunc(pcg.log.Debugf),
            ErrorLogger:  kafka.LoggerFunc(pcg.log.Errorf),
            Compression:  compress.Snappy,
            ReadTimeout:  writerReadTimeout,
            WriteTimeout: writerWriteTimeout,
        }
        return w
    }
    
    그리고 Worker Pools로 소비자 만들기
    func (pcg *ProductsConsumerGroup) consumeCreateProduct(
        ctx context.Context,
        cancel context.CancelFunc,
        groupID string,
        topic string,
        workersNum int,
    ) {
        r := pcg.getNewKafkaReader(pcg.Brokers, topic, groupID)
        defer cancel()
        defer func() {
            if err := r.Close(); err != nil {
                pcg.log.Errorf("r.Close", err)
                cancel()
            }
        }()
    
        w := pcg.getNewKafkaWriter(deadLetterQueueTopic)
        defer func() {
            if err := w.Close(); err != nil {
                pcg.log.Errorf("w.Close", err)
                cancel()
            }
        }()
    
        pcg.log.Infof("Starting consumer group: %v", r.Config().GroupID)
    
        wg := &sync.WaitGroup{}
        for i := 0; i <= workersNum; i++ {
            wg.Add(1)
            go pcg.createProductWorker(ctx, cancel, r, w, wg, i)
        }
        wg.Wait()
    }
    
    직원이 메시지체를 검증한 다음usecase를 호출합니다. 오류가 돌아오면 다시 시도해 보십시오. 좋은 재시도 라이브러리는 retry-go입니다.
    만약 다시 실패한다면 오류 메시지를 매우 간단Dead Letter Queue에 발표할 것이다. 내가 말한 바와 같이 여기는 아무런 흥미로운 업무 논리도 실현되지 않았기 때문에 실제 생산에서 우리는 더욱 좋은 방식으로 오류 사례를 처리해야 한다.
    메시지가 성공적으로 처리된 후에 제출합니다.
    func (pcg *ProductsConsumerGroup) createProductWorker(
        ctx context.Context,
        cancel context.CancelFunc,
        r *kafka.Reader,
        w *kafka.Writer,
        wg *sync.WaitGroup,
        workerID int,
    ) {
        span, ctx := opentracing.StartSpanFromContext(ctx, "ProductsConsumerGroup.createProductWorker")
        defer span.Finish()
    
        span.LogFields(log.String("ConsumerGroup", r.Config().GroupID))
    
        defer wg.Done()
        defer cancel()
    
        for {
            m, err := r.FetchMessage(ctx)
            if err != nil {
                pcg.log.Errorf("FetchMessage", err)
                return
            }
    
            pcg.log.Infof(
                "WORKER: %v, message at topic/partition/offset %v/%v/%v: %s = %s\n",
                workerID,
                m.Topic,
                m.Partition,
                m.Offset,
                string(m.Key),
                string(m.Value),
            )
            incomingMessages.Inc()
    
            var prod models.Product
            if err := json.Unmarshal(m.Value, &prod); err != nil {
                errorMessages.Inc()
                pcg.log.Errorf("json.Unmarshal", err)
                continue
            }
    
            if err := pcg.validate.StructCtx(ctx, prod); err != nil {
                errorMessages.Inc()
                pcg.log.Errorf("validate.StructCtx", err)
                continue
            }
    
            if err := retry.Do(func() error {
                created, err := pcg.productsUC.Create(ctx, &prod)
                if err != nil {
                    return err
                }
                pcg.log.Infof("created product: %v", created)
                return nil
            },
                retry.Attempts(retryAttempts),
                retry.Delay(retryDelay),
                retry.Context(ctx),
            ); err != nil {
                errorMessages.Inc()
    
                if err := pcg.publishErrorMessage(ctx, w, m, err); err != nil {
                    pcg.log.Errorf("publishErrorMessage", err)
                    continue
                }
                pcg.log.Errorf("productsUC.Create.publishErrorMessage", err)
                continue
            }
    
            if err := r.CommitMessages(ctx, m); err != nil {
                errorMessages.Inc()
                pcg.log.Errorf("CommitMessages", err)
                continue
            }
    
            successMessages.Inc()
        }
    }
    
    저장소 레이어mongo-go-driver에서 데이터베이스와 상호 작용
    // Create Create new product
    func (p *productMongoRepo) Create(ctx context.Context, product *models.Product) (*models.Product, error) {
        span, ctx := opentracing.StartSpanFromContext(ctx, "productMongoRepo.Create")
        defer span.Finish()
    
        collection := p.mongoDB.Database(productsDB).Collection(productsCollection)
    
        product.CreatedAt = time.Now().UTC()
        product.UpdatedAt = time.Now().UTC()
    
        result, err := collection.InsertOne(ctx, product, &options.InsertOneOptions{})
        if err != nil {
            return nil, errors.Wrap(err, "InsertOne")
        }
    
        objectID, ok := result.InsertedID.(primitive.ObjectID)
        if !ok {
            return nil, errors.Wrap(productErrors.ErrObjectIDTypeConversion, "result.InsertedID")
        }
    
        product.ProductID = objectID
    
        return product, nil
    }
    
    다음은 gRPC 서비스에서 os create handler를 구현하고 github 저장소에서 찾을 수 있는 전체 코드입니다.
    // Create create new product
    func (p *productService) Create(ctx context.Context, req *productsService.CreateReq) (*productsService.CreateRes, error) {
        span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
        defer span.Finish()
        createMessages.Inc()
    
        catID, err := primitive.ObjectIDFromHex(req.GetCategoryID())
        if err != nil {
            errorMessages.Inc()
            p.log.Errorf("primitive.ObjectIDFromHex: %v", err)
            return nil, grpcErrors.ErrorResponse(err, err.Error())
        }
    
        prod := &models.Product{
            CategoryID:  catID,
            Name:        req.GetName(),
            Description: req.GetDescription(),
            Price:       req.GetPrice(),
            ImageURL:    &req.ImageURL,
            Photos:      req.GetPhotos(),
            Quantity:    req.GetQuantity(),
            Rating:      int(req.GetRating()),
        }
    
        created, err := p.productUC.Create(ctx, prod)
        if err != nil {
            errorMessages.Inc()
            p.log.Errorf("productUC.Create: %v", err)
            return nil, grpcErrors.ErrorResponse(err, err.Error())
        }
    
        successMessages.Inc()
        return &productsService.CreateRes{Product: created.ToProto()}, nil
    }
    
    및 REST API 프로세서 사용echo:
    // CreateProduct Create product
    // @Tags Products
    // @Summary Create new product
    // @Description Create new single product
    // @Accept json
    // @Produce json
    // @Success 201 {object} models.Product
    // @Router /products [post]
    func (p *productHandlers) CreateProduct() echo.HandlerFunc {
        return func(c echo.Context) error {
            span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "productHandlers.Create")
            defer span.Finish()
            createRequests.Inc()
    
            var prod models.Product
            if err := c.Bind(&prod); err != nil {
                p.log.Errorf("c.Bind: %v", err)
                return httpErrors.ErrorCtxResponse(c, err)
            }
    
            if err := p.validate.StructCtx(ctx, &prod); err != nil {
                p.log.Errorf("validate.StructCtx: %v", err)
                return httpErrors.ErrorCtxResponse(c, err)
            }
    
            if err := p.productUC.PublishCreate(ctx, &prod); err != nil {
                p.log.Errorf("productUC.PublishCreate: %v", err)
                return httpErrors.ErrorCtxResponse(c, err)
            }
    
            successRequests.Inc()
            return c.NoContent(http.StatusCreated)
        }
    }
    
    프로그램 처리의 맨 위에 프로메테우스의 오류와 과정 지표를 기록합니다.
    소스 코드와 사용 가능한 모든 도구 목록이 포함된 저장소here👨‍💻 :)
    나는 이 글이 유용하고 유익하기를 바란다. 나는 어떤 피드백이나 문제를 받아도 매우 기쁠 것이다.

    좋은 웹페이지 즐겨찾기