Go, NATS, gRPC 및 PostgreSQL clean architecture 마이크로서비스(모니터링 및 추적 기능 포함)👋

안녕하세요, 본문에서 저는 다음과 같은 방법으로 마이크로 서비스를 만들어 보려고 합니다.🚀
NATS 메시징 에이전트
gRPC gRPC의 Go 구현
PostgreSQL 데이터베이스로
Jaeger 오픈 소스, 종단 간 분산tracing
Prometheus 모니터링 및 경고
Grafana 프로메테우스(Prometheus)의 모든 제품으로 관측 가능한 계기판을 구성하는 데 사용
MailHog 웹 및 API 기반 SMTP 테스트
Redis Golang의 유형 보안 Redis 클라이언트
Nginx HTTP 및 리버스 프록시 서버
swag 당당하게
마이그레이션용 migrate
GitHub repository 에서 원본 코드를 찾을 수 있습니다
이 문장에서 우리는 NATS Streaming를 중점적으로 강조하고자 한다. 이것은 Go로 쓴 것이다.🤩.
문서에서 보듯이 다음과 같은 몇 가지 좋은 특성이 있습니다.

  • 메시지/이벤트 지속성NATS 스트림은 메모리, 평면 파일 또는 데이터베이스에서 구성 가능한 메시지 지속성을 제공합니다.

  • 게시 서버와 서버 사이(게시 작업에 사용) 및 구독 서버와 서버 사이(메일 전달 확인에 사용) 최소한 한 번의 메일 확인을 전달한다.메시지는 서버가 메모리나 보조 메모리(또는 다른 외부 메모리)에 저장하고 필요에 따라 조건에 맞는 구독 클라이언트에게 다시 보냅니다.

  • Publisher rate limiting MaxPubAcksInFlight 옵션은 게시자가 주어진 시간에 전송할 수 있는 확인되지 않은 메시지의 수를 효과적으로 제한합니다.이 최대치에 도달했을 때, 확인되지 않은 메시지 수가 지정한 제한보다 적을 때까지 더 많은 비동기 발표 호출이 막힐 것입니다.

  • 모든 구독자의 속도 일치/제한 맥스인플라이트 옵션은 E가 지정한 NATS 흐름에서 지정한 구독에 대한 최대 미완성 확인수(이미 발송되었지만 확인되지 않은 메시지)를 허용합니다.이 제한에 도달하면 NATS 흐름은 확인되지 않은 메시지 수가 지정한 제한보다 적을 때까지 이 구독에 메시지를 보내는 것을 중단합니다.

  • 장기 구독은 클라이언트가 다시 시작한 후에도 '장기 이름' 을 지정합니다.

  • 테마의 역사 메시지 재방송 새 구독은 구독 테마의 채널에 저장된 메시지 흐름에서 시작 위치를 지정할 수 있습니다.이 옵션을 사용하면 메시지 전송을 다음 시간부터 시작할 수 있습니다.
  • 항목에 가장 먼저 저장된 메시지
  • 현재 구독이 시작되기 전에 이 테마에 최근에 저장된 메시지입니다.이것은 일반적으로 '마지막 값' 이나 '초기 값' 캐시로 여겨진다.
  • 나초 단위의 특정 날짜/시간
  • 마지막 30초와 같은 현재 서버 날짜/시간의 역사적 오프셋 수입니다.
  • 특정 메시지 시퀀스 번호
  • 이 예에 대해 나는 어떤 재미있는 업무 논리도 실현하지 못했고 테스트도 언급하지 않았다.
    마이크로서비스는 전자메일을 보내고 PostgreSQL에 저장하며 NATS, gRPC, REST를 통해 통신할 수 있다.
    NATS 구독자는 데이터를 조회하는 데 사용되는 전자 우편 이벤트, REST, gRPC를 처리합니다.
    지역 발전:
    make cert // generates tls certificates
    make migrate_up // run sql migrations
    make swagger // generate swagger documentation
    make local or develop // for run docker compose files
    
    docker의run all에 대해makedevelop을 실행할 수 있습니다. 이것은 열 리셋 기능을 가지고 있습니다.
    UI 인터페이스는 다음 포트에서 사용할 수 있습니다.

    Jaeger 사용자 인터페이스:http://localhost:16686


    프로메테우스 사용자 인터페이스:http://localhost:9090


    Grafana 사용자 인터페이스:http://localhost:3000


    NATS 사용자 인터페이스:http://localhost:8222/


    MailHog:http://localhost:8025/


    기본적으로 Swagger UI는 다음 장치에서 실행됩니다.https://localhost:5000/swagger/index.html




    Grafana에서 플로메테우스를 도량원으로 선택한 다음 계기판을 만들어야 합니다.

    Docker 작성로컬이 프로젝트의 yml:
    version: "3.8"
    
    services:
      nginx:
        container_name: nginx_microservice
        ports:
          - 8080:8080
          - 443:443
        build:
          context: ./nginx
          dockerfile: Dockerfile
        networks:
          - nats
    
      nats-streaming:
        container_name: nats-streaming
        image: nats-streaming:latest
        ports:
          - "8222:8222"
          - "4222:4222"
          - "6222:6222"
        networks: [ "nats" ]
        restart: always
        command: [
            '-p',
            '4222',
            '-m',
            '8222',
            '-hbi',
            '5s',
            '-hbt',
            '5s',
            '-hbf',
            '2',
            '-SD',
            '-cid',
            'microservice',
        ]
    
      mails_postgesql:
        image: postgres:13-alpine
        container_name: mails_postgesql
        expose:
          - "5432"
        ports:
          - "5432:5432"
        restart: always
        environment:
          - POSTGRES_USER=postgres
          - POSTGRES_PASSWORD=postgres
          - POSTGRES_DB=mails_db
          - POSTGRES_HOST=5432
        command: -p 5432
        volumes:
          - ./mails_pgdata:/var/lib/postgresql/data
        networks: [ "nats" ]
    
      mailhog:
        container_name: mailhog
        image: mailhog/mailhog:latest
        ports:
          - "1025:1025"
          - "8025:8025"
        restart: always
        networks: [ "nats" ]
    
    
      redis:
        image: redis:6-alpine
        restart: always
        container_name: user_redis
        ports:
          - "6379:6379"
        networks: [ "nats" ]
    
      prometheus:
        container_name: prometheus_container
        restart: always
        image: prom/prometheus
        volumes:
          - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:Z
        command:
          - '--config.file=/etc/prometheus/prometheus.yml'
          - '--storage.tsdb.path=/prometheus'
          - '--storage.tsdb.retention=20d'
          - '--web.console.libraries=/usr/share/prometheus/console_libraries'
          - '--web.console.templates=/usr/share/prometheus/consoles'
        ports:
          - '9090:9090'
        networks: [ "nats" ]
    
      node_exporter:
        container_name: node_exporter_container
        restart: always
        image: prom/node-exporter
        ports:
          - '9101:9100'
        networks: [ "nats" ]
    
      grafana:
        container_name: grafana_container
        restart: always
        image: grafana/grafana
        ports:
          - '3000:3000'
        networks: [ "nats" ]
    
    
      jaeger:
        container_name: jaeger_container
        restart: always
        image: jaegertracing/all-in-one:1.21
        environment:
          - COLLECTOR_ZIPKIN_HTTP_PORT=9411
        ports:
          - 5775:5775/udp
          - 6831:6831/udp
          - 6832:6832/udp
          - 5778:5778
          - 16686:16686
          - 14268:14268
          - 14250:14250
          - 9411:9411
        networks: [ "nats" ]
    
    
    networks:
      nats:
        name: nats
    
    응용 프로그램을 시작할 때, 우리는 viperyaml 설정을 불러와서 필요한 모든 내용을 초기화하고 프로그램을 실행합니다.
    // Run start application
    func (s *server) Run() error {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
    
        smtpClient := smtp.NewSmtpClient(s.cfg)
        publisher := nats.NewPublisher(s.natsConn)
        emailPgRepo := repository.NewEmailPGRepository(s.pgxPool)
        emailRedisRepo := repository.NewEmailRedisRepository(s.redis)
        emailUC := usecase.NewEmailUseCase(s.log, emailPgRepo, publisher, smtpClient, emailRedisRepo)
    
        im := interceptors.NewInterceptorManager(s.log, s.cfg)
        mw := middlewares.NewMiddlewareManager(s.log, s.cfg)
    
        validate := validator.New()
    
        go func() {
            emailSubscriber := nats.NewEmailSubscriber(s.natsConn, s.log, emailUC, validate)
            emailSubscriber.Run(ctx)
        }()
    
        go func() {
            s.log.Infof("Server is listening on PORT: %s", s.cfg.HTTP.Port)
            s.runHttpServer()
        }()
    
        metricsServer := echo.New()
        go func() {
            metricsServer.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
            s.log.Infof("Metrics server is running on port: %s", s.cfg.Metrics.Port)
            if err := metricsServer.Start(s.cfg.Metrics.Port); err != nil {
                s.log.Error(err)
                cancel()
            }
        }()
        v1 := s.echo.Group("/api/v1")
        v1.Use(mw.Metrics)
    
        emailHandlers := emailsV1.NewEmailHandlers(v1.Group("/email"), emailUC, s.log, validate)
        emailHandlers.MapRoutes()
    
        l, err := net.Listen("tcp", s.cfg.GRPC.Port)
        if err != nil {
            return errors.Wrap(err, "net.Listen")
        }
        defer l.Close()
    
        cert, err := tls.LoadX509KeyPair(certFile, keyFile)
        if err != nil {
            s.log.Fatalf("failed to load key pair: %s", err)
        }
    
        grpcServer := grpc.NewServer(
            grpc.Creds(credentials.NewServerTLSFromCert(&cert)),
            grpc.KeepaliveParams(keepalive.ServerParameters{
                MaxConnectionIdle: s.cfg.GRPC.MaxConnectionIdle * time.Minute,
                Timeout:           s.cfg.GRPC.Timeout * time.Second,
                MaxConnectionAge:  s.cfg.GRPC.MaxConnectionAge * time.Minute,
                Time:              s.cfg.GRPC.Timeout * time.Minute,
            }),
            grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
                grpc_ctxtags.UnaryServerInterceptor(),
                grpc_opentracing.UnaryServerInterceptor(),
                grpc_prometheus.UnaryServerInterceptor,
                grpcrecovery.UnaryServerInterceptor(),
                im.Logger,
            ),
            ),
        )
    
        emailGRPCService := emailGrpc.NewEmailGRPCService(emailUC, s.log, validate)
        emailService.RegisterEmailServiceServer(grpcServer, emailGRPCService)
        grpc_prometheus.Register(grpcServer)
    
        s.log.Infof("GRPC Server is listening on port: %s", s.cfg.GRPC.Port)
        s.log.Fatal(grpcServer.Serve(l))
    
        if s.cfg.HTTP.Development {
            reflection.Register(grpcServer)
        }
    
        quit := make(chan os.Signal, 1)
        signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
    
        select {
        case v := <-quit:
            s.log.Errorf("signal.Notify: %v", v)
        case done := <-ctx.Done():
            s.log.Errorf("ctx.Done: %v", done)
        }
    
        if err := s.echo.Server.Shutdown(ctx); err != nil {
            return errors.Wrap(err, "echo.Server.Shutdown")
        }
    
        if err := metricsServer.Shutdown(ctx); err != nil {
            s.log.Errorf("metricsServer.Shutdown: %v", err)
        }
    
        grpcServer.GracefulStop()
        s.log.Info("Server Exited Properly")
    
        return nil
    }
    
    내가 사용하는 REST httpecho에 대해 나는 또 다른 인기 있는 선택이 gin라고 생각한다.
    swag는 RESTful API 문서를 생성하는 데 사용됩니다.
    이메일 프로세서를 만들어 요청을 수락하고 범위를 추적하기 시작하며 validator로 입력을 검증하고 예시 방법을 호출합니다.
    // Create Create
    // @Tags Emails
    // @Summary Create new email
    // @Description Create new email and send it
    // @Accept json
    // @Produce json
    // @Success 201 {object} models.Email
    // @Router /email [post]
    func (h *emailHandlers) Create() echo.HandlerFunc {
        return func(c echo.Context) error {
            span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "emailHandlers.Create")
            defer span.Finish()
            createRequests.Inc()
    
            var mail models.Email
            if err := c.Bind(&mail); err != nil {
                errorRequests.Inc()
                h.log.Errorf("c.Bind: %v", err)
                return httpErrors.ErrorCtxResponse(c, err)
            }
    
            if err := h.validate.StructCtx(ctx, &mail); err != nil {
                errorRequests.Inc()
                h.log.Errorf("validate.StructCtx: %v", err)
                return httpErrors.ErrorCtxResponse(c, err)
            }
    
            if err := h.emailUC.PublishCreate(ctx, &mail); err != nil {
                errorRequests.Inc()
                h.log.Errorf("emailUC.PublishCreate: %v", err)
                return httpErrors.ErrorCtxResponse(c, err)
            }
    
            successRequests.Inc()
            return c.NoContent(http.StatusCreated)
        }
    }
    
    gRPC에 대한 서비스 방법의 생각은 같지만 우선 원형 파일을 만들어야 한다.
    syntax = "proto3";
    
    import "google/protobuf/timestamp.proto";
    
    //protoc --go_out=plugins=grpc:. *.proto
    
    package emailService;
    option go_package = ".;emailService";
    
    message Email {
      string EmailID = 1;
      string From = 2;
      string To = 3;
      string Subject = 4;
      string Message = 5;
      google.protobuf.Timestamp CreatedAt = 6;
    }
    
    message Empty {}
    
    message CreateReq {
      string From = 1;
      string To = 2;
      string Subject = 3;
      string Message = 4;
    }
    
    message CreateRes {
      string status = 1;
    }
    
    message GetByIDReq {
      string EmailID = 1;
    }
    
    message GetByIDRes {
      Email Email = 1;
    }
    
    message SearchReq {
      string Search = 1;
      int64 page = 2;
      int64 size = 3;
    }
    
    message SearchRes {
      int64 TotalCount = 1;
      int64 TotalPages = 2;
      int64 Page = 3;
      int64 Size = 4;
      bool HasMore = 5;
      repeated Email Emails = 6;
    }
    
    service EmailService {
      rpc Create(CreateReq) returns (CreateRes) {}
      rpc GetByID(GetByIDReq) returns (GetByIDRes) {}
      rpc Search(SearchReq) returns (SearchRes) {}
    }
    
    gRPC는 REST와 동일한 비즈니스 논리로 e-메일 프로세서를 만듭니다.
    // Create create email
    func (e *emailGRPCService) Create(ctx context.Context, req *emailService.CreateReq) (*emailService.CreateRes, error) {
        span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
        defer span.Finish()
        createRequests.Inc()
    
        m := &models.Email{
            From:    req.GetFrom(),
            To:      req.GetTo(),
            Subject: req.GetSubject(),
            Message: req.GetMessage(),
        }
    
        if err := e.validator.StructCtx(ctx, m); err != nil {
            errorRequests.Inc()
            e.log.Errorf("validator.StructCtx: %v", err)
            return nil, grpcErrors.ErrorResponse(err, err.Error())
        }
    
        if err := e.emailUC.Create(ctx, m); err != nil {
            errorRequests.Inc()
            e.log.Errorf("emailUC.Create: %v", err)
            return nil, grpcErrors.ErrorResponse(err, err.Error())
        }
    
        successRequests.Inc()
        return &emailService.CreateRes{Status: "Ok"}, nil
    }
    
    e-메일 생성 예를 사용하여 데이터베이스에 레코드를 만들고 e-메일 보내기 이벤트를 게시합니다.
    // Create create new email saves in db
    func (e *emailUseCase) Create(ctx context.Context, email *models.Email) error {
        span, ctx := opentracing.StartSpanFromContext(ctx, "emailUseCase.Create")
        defer span.Finish()
    
        created, err := e.emailPGRepo.Create(ctx, email)
        if err != nil {
            return errors.Wrap(err, "emailPGRepo.Create")
        }
    
        mailBytes, err := json.Marshal(created)
        if err != nil {
            return errors.Wrap(err, "json.Marshal")
        }
    
        return e.publisher.Publish(sendEmailSubject, mailBytes)
    }
    
    저장소 작성 방법은 데이터를 데이터베이스에 저장합니다.
    박사후와의 상호 작용pgx:
    // Create create new email
    func (e *emailPGRepository) Create(ctx context.Context, email *models.Email) (*models.Email, error) {
        span, ctx := opentracing.StartSpanFromContext(ctx, "emailPGRepository.Create")
        defer span.Finish()
    
        var mail models.Email
        if err := e.db.QueryRow(
            ctx,
            createEmailQuery,
            &email.From,
            &email.To,
            &email.Subject,
            &email.Message,
        ).Scan(&mail.EmailID, &mail.From, &mail.To, &mail.Subject, &mail.Message, &mail.CreatedAt); err != nil {
            return nil, errors.Wrap(err, "Scan")
        }
    
        return &mail, nil
    }
    
    Jaeger는 매우 좋은 네트워크 인터페이스를 가지고 있어 http://localhost:16686에서 우리의 추적을 볼 수 있다

    MailHog e- 메일 테스트에 적합한 솔루션입니다.
    여기에 사용된 go smtp 클라이언트go-simple-mail.
    NATS는 간단한 웹 ui와 함께 제공됩니다.

    구독자는 가장 재미있는 부분이다. 우선 우리는 worker pool 서브다차원 데이터 집합을 주제에 사용한다.
    // Subscribe subscribe to subject and run workers with given callback for handling messages
    func (s *emailSubscriber) Subscribe(subject, qgroup string, workersNum int, cb stan.MsgHandler) {
        s.log.Infof("Subscribing to Subject: %v, group: %v", subject, qgroup)
        wg := &sync.WaitGroup{}
    
        for i := 0; i <= workersNum; i++ {
            wg.Add(1)
            go s.runWorker(
                wg,
                i,
                s.stanConn,
                subject,
                qgroup,
                cb,
                stan.SetManualAckMode(),
                stan.AckWait(ackWait),
                stan.DurableName(durableName),
                stan.MaxInflight(maxInflight),
                stan.DeliverAllAvailable(),
            )
        }
        wg.Wait()
    }
    
    작업자는conn.QueueSubscribe 방법을 실행하고 테마와 대기열 그룹 이름을 전달합니다.
    메시지 및 nats 옵션을 매개변수로 처리하는 콜백:
    func (s *emailSubscriber) runWorker(
        wg *sync.WaitGroup,
        workerID int,
        conn stan.Conn,
        subject string,
        qgroup string,
        cb stan.MsgHandler,
        opts ...stan.SubscriptionOption,
    ) {
        s.log.Infof("Subscribing worker: %v, subject: %v, qgroup: %v", workerID, subject, qgroup)
        defer wg.Done()
    
        _, err := conn.QueueSubscribe(subject, qgroup, cb, opts...)
        if err != nil {
            s.log.Errorf("WorkerID: %v, QueueSubscribe: %v", workerID, err)
            if err := conn.Close(); err != nil {
                s.log.Errorf("WorkerID: %v, conn.Close error: %v", workerID, err)
            }
        }
    }
    
    processCreateEmail 처리 전자 우편 이벤트 만들기, 추적 범위 시작, 도량 계수기 증가,
    그리고 메시지 데이터를 풀고usecase create 방법을 호출합니다. 실패하면 retry-go를 사용하여 세 번 다시 시도합니다.
    만약 여전히 실패한다면 우리는 현재 메시지가 재교부되었는지 검사할 것입니다. 재교부 계수 >maxRedeliveryCount (귀하의 업무 논리에 따라 3배의 제한이 있음) 는 오류 처리 사례가 매우 다를 수 있습니다. 이것은 귀하의 서비스 업무 논리에 달려 있습니다. 이 예에서 사용된 Dead Letter Queue 방법입니다.
    func (s *emailSubscriber) processCreateEmail(ctx context.Context) stan.MsgHandler {
        return func(msg *stan.Msg) {
            span, ctx := opentracing.StartSpanFromContext(ctx, "emailSubscriber.processCreateEmail")
            defer span.Finish()
    
            s.log.Infof("subscriber process Create Email: %s", msg.String())
            totalSubscribeMessages.Inc()
    
            var m models.Email
            if err := json.Unmarshal(msg.Data, &m); err != nil {
                errorSubscribeMessages.Inc()
                s.log.Errorf("json.Unmarshal : %v", err)
                return
            }
    
            if err := retry.Do(func() error {
                return s.emailUC.Create(ctx, &m)
            },
                retry.Attempts(retryAttempts),
                retry.Delay(retryDelay),
                retry.Context(ctx),
            ); err != nil {
                errorSubscribeMessages.Inc()
                s.log.Errorf("emailUC.Create : %v", err)
    
                if msg.Redelivered && msg.RedeliveryCount > maxRedeliveryCount {
                    if err := s.publishErrorMessage(ctx, msg, err); err != nil {
                        s.log.Errorf("publishErrorMessage : %v", err)
                        return
                    }
                    if err := msg.Ack(); err != nil {
                        s.log.Errorf("msg.Ack: %v", err)
                        return
                    }
                }
                return
            }
    
            if err := msg.Ack(); err != nil {
                s.log.Errorf("msg.Ack: %v", err)
            }
            successSubscribeMessages.Inc()
        }
    }
    
    데이터를 조회하기 위해서 GetByID와 검색 프로세서가 있습니다.
    온전한 원본 코드와 당신이 찾을 수 있는 모든 도구 목록here👨‍💻 :)
    나는 이 글이 유용하고 유익하기를 바란다. 나는 어떤 피드백이나 문제를 받아도 매우 기쁠 것이다.

    좋은 웹페이지 즐겨찾기