Go, NATS, gRPC 및 PostgreSQL clean architecture 마이크로서비스(모니터링 및 추적 기능 포함)👋
78298 단어 microservicesgrpcgonats
NATS 메시징 에이전트
gRPC gRPC의 Go 구현
PostgreSQL 데이터베이스로
Jaeger 오픈 소스, 종단 간 분산tracing
Prometheus 모니터링 및 경고
Grafana 프로메테우스(Prometheus)의 모든 제품으로 관측 가능한 계기판을 구성하는 데 사용
MailHog 웹 및 API 기반 SMTP 테스트
Redis Golang의 유형 보안 Redis 클라이언트
Nginx HTTP 및 리버스 프록시 서버
swag 당당하게
마이그레이션용 migrate
GitHub repository 에서 원본 코드를 찾을 수 있습니다
이 문장에서 우리는 NATS Streaming를 중점적으로 강조하고자 한다. 이것은 Go로 쓴 것이다.🤩.
문서에서 보듯이 다음과 같은 몇 가지 좋은 특성이 있습니다.
메시지/이벤트 지속성NATS 스트림은 메모리, 평면 파일 또는 데이터베이스에서 구성 가능한 메시지 지속성을 제공합니다.
게시 서버와 서버 사이(게시 작업에 사용) 및 구독 서버와 서버 사이(메일 전달 확인에 사용) 최소한 한 번의 메일 확인을 전달한다.메시지는 서버가 메모리나 보조 메모리(또는 다른 외부 메모리)에 저장하고 필요에 따라 조건에 맞는 구독 클라이언트에게 다시 보냅니다.
Publisher rate limiting MaxPubAcksInFlight 옵션은 게시자가 주어진 시간에 전송할 수 있는 확인되지 않은 메시지의 수를 효과적으로 제한합니다.이 최대치에 도달했을 때, 확인되지 않은 메시지 수가 지정한 제한보다 적을 때까지 더 많은 비동기 발표 호출이 막힐 것입니다.
모든 구독자의 속도 일치/제한 맥스인플라이트 옵션은 E가 지정한 NATS 흐름에서 지정한 구독에 대한 최대 미완성 확인수(이미 발송되었지만 확인되지 않은 메시지)를 허용합니다.이 제한에 도달하면 NATS 흐름은 확인되지 않은 메시지 수가 지정한 제한보다 적을 때까지 이 구독에 메시지를 보내는 것을 중단합니다.
장기 구독은 클라이언트가 다시 시작한 후에도 '장기 이름' 을 지정합니다.
테마의 역사 메시지 재방송 새 구독은 구독 테마의 채널에 저장된 메시지 흐름에서 시작 위치를 지정할 수 있습니다.이 옵션을 사용하면 메시지 전송을 다음 시간부터 시작할 수 있습니다.
마이크로서비스는 전자메일을 보내고 PostgreSQL에 저장하며 NATS, gRPC, REST를 통해 통신할 수 있다.
NATS 구독자는 데이터를 조회하는 데 사용되는 전자 우편 이벤트, REST, gRPC를 처리합니다.
지역 발전:
make cert // generates tls certificates
make migrate_up // run sql migrations
make swagger // generate swagger documentation
make local or develop // for run docker compose files
docker의run all에 대해makedevelop을 실행할 수 있습니다. 이것은 열 리셋 기능을 가지고 있습니다.UI 인터페이스는 다음 포트에서 사용할 수 있습니다.
Jaeger 사용자 인터페이스:http://localhost:16686
프로메테우스 사용자 인터페이스:http://localhost:9090
Grafana 사용자 인터페이스:http://localhost:3000
NATS 사용자 인터페이스:http://localhost:8222/
MailHog:http://localhost:8025/
기본적으로 Swagger UI는 다음 장치에서 실행됩니다.https://localhost:5000/swagger/index.html
Grafana에서 플로메테우스를 도량원으로 선택한 다음 계기판을 만들어야 합니다.
Docker 작성로컬이 프로젝트의 yml:
version: "3.8"
services:
nginx:
container_name: nginx_microservice
ports:
- 8080:8080
- 443:443
build:
context: ./nginx
dockerfile: Dockerfile
networks:
- nats
nats-streaming:
container_name: nats-streaming
image: nats-streaming:latest
ports:
- "8222:8222"
- "4222:4222"
- "6222:6222"
networks: [ "nats" ]
restart: always
command: [
'-p',
'4222',
'-m',
'8222',
'-hbi',
'5s',
'-hbt',
'5s',
'-hbf',
'2',
'-SD',
'-cid',
'microservice',
]
mails_postgesql:
image: postgres:13-alpine
container_name: mails_postgesql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=mails_db
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./mails_pgdata:/var/lib/postgresql/data
networks: [ "nats" ]
mailhog:
container_name: mailhog
image: mailhog/mailhog:latest
ports:
- "1025:1025"
- "8025:8025"
restart: always
networks: [ "nats" ]
redis:
image: redis:6-alpine
restart: always
container_name: user_redis
ports:
- "6379:6379"
networks: [ "nats" ]
prometheus:
container_name: prometheus_container
restart: always
image: prom/prometheus
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:Z
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--storage.tsdb.retention=20d'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
ports:
- '9090:9090'
networks: [ "nats" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "nats" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3000:3000'
networks: [ "nats" ]
jaeger:
container_name: jaeger_container
restart: always
image: jaegertracing/all-in-one:1.21
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
ports:
- 5775:5775/udp
- 6831:6831/udp
- 6832:6832/udp
- 5778:5778
- 16686:16686
- 14268:14268
- 14250:14250
- 9411:9411
networks: [ "nats" ]
networks:
nats:
name: nats
응용 프로그램을 시작할 때, 우리는 viperyaml 설정을 불러와서 필요한 모든 내용을 초기화하고 프로그램을 실행합니다.// Run start application
func (s *server) Run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
smtpClient := smtp.NewSmtpClient(s.cfg)
publisher := nats.NewPublisher(s.natsConn)
emailPgRepo := repository.NewEmailPGRepository(s.pgxPool)
emailRedisRepo := repository.NewEmailRedisRepository(s.redis)
emailUC := usecase.NewEmailUseCase(s.log, emailPgRepo, publisher, smtpClient, emailRedisRepo)
im := interceptors.NewInterceptorManager(s.log, s.cfg)
mw := middlewares.NewMiddlewareManager(s.log, s.cfg)
validate := validator.New()
go func() {
emailSubscriber := nats.NewEmailSubscriber(s.natsConn, s.log, emailUC, validate)
emailSubscriber.Run(ctx)
}()
go func() {
s.log.Infof("Server is listening on PORT: %s", s.cfg.HTTP.Port)
s.runHttpServer()
}()
metricsServer := echo.New()
go func() {
metricsServer.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
s.log.Infof("Metrics server is running on port: %s", s.cfg.Metrics.Port)
if err := metricsServer.Start(s.cfg.Metrics.Port); err != nil {
s.log.Error(err)
cancel()
}
}()
v1 := s.echo.Group("/api/v1")
v1.Use(mw.Metrics)
emailHandlers := emailsV1.NewEmailHandlers(v1.Group("/email"), emailUC, s.log, validate)
emailHandlers.MapRoutes()
l, err := net.Listen("tcp", s.cfg.GRPC.Port)
if err != nil {
return errors.Wrap(err, "net.Listen")
}
defer l.Close()
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
s.log.Fatalf("failed to load key pair: %s", err)
}
grpcServer := grpc.NewServer(
grpc.Creds(credentials.NewServerTLSFromCert(&cert)),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: s.cfg.GRPC.MaxConnectionIdle * time.Minute,
Timeout: s.cfg.GRPC.Timeout * time.Second,
MaxConnectionAge: s.cfg.GRPC.MaxConnectionAge * time.Minute,
Time: s.cfg.GRPC.Timeout * time.Minute,
}),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_opentracing.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
grpcrecovery.UnaryServerInterceptor(),
im.Logger,
),
),
)
emailGRPCService := emailGrpc.NewEmailGRPCService(emailUC, s.log, validate)
emailService.RegisterEmailServiceServer(grpcServer, emailGRPCService)
grpc_prometheus.Register(grpcServer)
s.log.Infof("GRPC Server is listening on port: %s", s.cfg.GRPC.Port)
s.log.Fatal(grpcServer.Serve(l))
if s.cfg.HTTP.Development {
reflection.Register(grpcServer)
}
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
select {
case v := <-quit:
s.log.Errorf("signal.Notify: %v", v)
case done := <-ctx.Done():
s.log.Errorf("ctx.Done: %v", done)
}
if err := s.echo.Server.Shutdown(ctx); err != nil {
return errors.Wrap(err, "echo.Server.Shutdown")
}
if err := metricsServer.Shutdown(ctx); err != nil {
s.log.Errorf("metricsServer.Shutdown: %v", err)
}
grpcServer.GracefulStop()
s.log.Info("Server Exited Properly")
return nil
}
내가 사용하는 REST httpecho에 대해 나는 또 다른 인기 있는 선택이 gin라고 생각한다.및 swag는 RESTful API 문서를 생성하는 데 사용됩니다.
이메일 프로세서를 만들어 요청을 수락하고 범위를 추적하기 시작하며 validator로 입력을 검증하고 예시 방법을 호출합니다.
// Create Create
// @Tags Emails
// @Summary Create new email
// @Description Create new email and send it
// @Accept json
// @Produce json
// @Success 201 {object} models.Email
// @Router /email [post]
func (h *emailHandlers) Create() echo.HandlerFunc {
return func(c echo.Context) error {
span, ctx := opentracing.StartSpanFromContext(c.Request().Context(), "emailHandlers.Create")
defer span.Finish()
createRequests.Inc()
var mail models.Email
if err := c.Bind(&mail); err != nil {
errorRequests.Inc()
h.log.Errorf("c.Bind: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := h.validate.StructCtx(ctx, &mail); err != nil {
errorRequests.Inc()
h.log.Errorf("validate.StructCtx: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
if err := h.emailUC.PublishCreate(ctx, &mail); err != nil {
errorRequests.Inc()
h.log.Errorf("emailUC.PublishCreate: %v", err)
return httpErrors.ErrorCtxResponse(c, err)
}
successRequests.Inc()
return c.NoContent(http.StatusCreated)
}
}
gRPC에 대한 서비스 방법의 생각은 같지만 우선 원형 파일을 만들어야 한다.syntax = "proto3";
import "google/protobuf/timestamp.proto";
//protoc --go_out=plugins=grpc:. *.proto
package emailService;
option go_package = ".;emailService";
message Email {
string EmailID = 1;
string From = 2;
string To = 3;
string Subject = 4;
string Message = 5;
google.protobuf.Timestamp CreatedAt = 6;
}
message Empty {}
message CreateReq {
string From = 1;
string To = 2;
string Subject = 3;
string Message = 4;
}
message CreateRes {
string status = 1;
}
message GetByIDReq {
string EmailID = 1;
}
message GetByIDRes {
Email Email = 1;
}
message SearchReq {
string Search = 1;
int64 page = 2;
int64 size = 3;
}
message SearchRes {
int64 TotalCount = 1;
int64 TotalPages = 2;
int64 Page = 3;
int64 Size = 4;
bool HasMore = 5;
repeated Email Emails = 6;
}
service EmailService {
rpc Create(CreateReq) returns (CreateRes) {}
rpc GetByID(GetByIDReq) returns (GetByIDRes) {}
rpc Search(SearchReq) returns (SearchRes) {}
}
gRPC는 REST와 동일한 비즈니스 논리로 e-메일 프로세서를 만듭니다.// Create create email
func (e *emailGRPCService) Create(ctx context.Context, req *emailService.CreateReq) (*emailService.CreateRes, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "productService.Create")
defer span.Finish()
createRequests.Inc()
m := &models.Email{
From: req.GetFrom(),
To: req.GetTo(),
Subject: req.GetSubject(),
Message: req.GetMessage(),
}
if err := e.validator.StructCtx(ctx, m); err != nil {
errorRequests.Inc()
e.log.Errorf("validator.StructCtx: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
if err := e.emailUC.Create(ctx, m); err != nil {
errorRequests.Inc()
e.log.Errorf("emailUC.Create: %v", err)
return nil, grpcErrors.ErrorResponse(err, err.Error())
}
successRequests.Inc()
return &emailService.CreateRes{Status: "Ok"}, nil
}
e-메일 생성 예를 사용하여 데이터베이스에 레코드를 만들고 e-메일 보내기 이벤트를 게시합니다.// Create create new email saves in db
func (e *emailUseCase) Create(ctx context.Context, email *models.Email) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "emailUseCase.Create")
defer span.Finish()
created, err := e.emailPGRepo.Create(ctx, email)
if err != nil {
return errors.Wrap(err, "emailPGRepo.Create")
}
mailBytes, err := json.Marshal(created)
if err != nil {
return errors.Wrap(err, "json.Marshal")
}
return e.publisher.Publish(sendEmailSubject, mailBytes)
}
저장소 작성 방법은 데이터를 데이터베이스에 저장합니다.박사후와의 상호 작용pgx:
// Create create new email
func (e *emailPGRepository) Create(ctx context.Context, email *models.Email) (*models.Email, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "emailPGRepository.Create")
defer span.Finish()
var mail models.Email
if err := e.db.QueryRow(
ctx,
createEmailQuery,
&email.From,
&email.To,
&email.Subject,
&email.Message,
).Scan(&mail.EmailID, &mail.From, &mail.To, &mail.Subject, &mail.Message, &mail.CreatedAt); err != nil {
return nil, errors.Wrap(err, "Scan")
}
return &mail, nil
}
Jaeger는 매우 좋은 네트워크 인터페이스를 가지고 있어 http://localhost:16686에서 우리의 추적을 볼 수 있다MailHog e- 메일 테스트에 적합한 솔루션입니다.
여기에 사용된 go smtp 클라이언트go-simple-mail.
NATS는 간단한 웹 ui와 함께 제공됩니다.
구독자는 가장 재미있는 부분이다. 우선 우리는 worker pool 서브다차원 데이터 집합을 주제에 사용한다.
// Subscribe subscribe to subject and run workers with given callback for handling messages
func (s *emailSubscriber) Subscribe(subject, qgroup string, workersNum int, cb stan.MsgHandler) {
s.log.Infof("Subscribing to Subject: %v, group: %v", subject, qgroup)
wg := &sync.WaitGroup{}
for i := 0; i <= workersNum; i++ {
wg.Add(1)
go s.runWorker(
wg,
i,
s.stanConn,
subject,
qgroup,
cb,
stan.SetManualAckMode(),
stan.AckWait(ackWait),
stan.DurableName(durableName),
stan.MaxInflight(maxInflight),
stan.DeliverAllAvailable(),
)
}
wg.Wait()
}
작업자는conn.QueueSubscribe 방법을 실행하고 테마와 대기열 그룹 이름을 전달합니다.메시지 및 nats 옵션을 매개변수로 처리하는 콜백:
func (s *emailSubscriber) runWorker(
wg *sync.WaitGroup,
workerID int,
conn stan.Conn,
subject string,
qgroup string,
cb stan.MsgHandler,
opts ...stan.SubscriptionOption,
) {
s.log.Infof("Subscribing worker: %v, subject: %v, qgroup: %v", workerID, subject, qgroup)
defer wg.Done()
_, err := conn.QueueSubscribe(subject, qgroup, cb, opts...)
if err != nil {
s.log.Errorf("WorkerID: %v, QueueSubscribe: %v", workerID, err)
if err := conn.Close(); err != nil {
s.log.Errorf("WorkerID: %v, conn.Close error: %v", workerID, err)
}
}
}
processCreateEmail 처리 전자 우편 이벤트 만들기, 추적 범위 시작, 도량 계수기 증가,그리고 메시지 데이터를 풀고usecase create 방법을 호출합니다. 실패하면 retry-go를 사용하여 세 번 다시 시도합니다.
만약 여전히 실패한다면 우리는 현재 메시지가 재교부되었는지 검사할 것입니다. 재교부 계수 >maxRedeliveryCount (귀하의 업무 논리에 따라 3배의 제한이 있음) 는 오류 처리 사례가 매우 다를 수 있습니다. 이것은 귀하의 서비스 업무 논리에 달려 있습니다. 이 예에서 사용된 Dead Letter Queue 방법입니다.
func (s *emailSubscriber) processCreateEmail(ctx context.Context) stan.MsgHandler {
return func(msg *stan.Msg) {
span, ctx := opentracing.StartSpanFromContext(ctx, "emailSubscriber.processCreateEmail")
defer span.Finish()
s.log.Infof("subscriber process Create Email: %s", msg.String())
totalSubscribeMessages.Inc()
var m models.Email
if err := json.Unmarshal(msg.Data, &m); err != nil {
errorSubscribeMessages.Inc()
s.log.Errorf("json.Unmarshal : %v", err)
return
}
if err := retry.Do(func() error {
return s.emailUC.Create(ctx, &m)
},
retry.Attempts(retryAttempts),
retry.Delay(retryDelay),
retry.Context(ctx),
); err != nil {
errorSubscribeMessages.Inc()
s.log.Errorf("emailUC.Create : %v", err)
if msg.Redelivered && msg.RedeliveryCount > maxRedeliveryCount {
if err := s.publishErrorMessage(ctx, msg, err); err != nil {
s.log.Errorf("publishErrorMessage : %v", err)
return
}
if err := msg.Ack(); err != nil {
s.log.Errorf("msg.Ack: %v", err)
return
}
}
return
}
if err := msg.Ack(); err != nil {
s.log.Errorf("msg.Ack: %v", err)
}
successSubscribeMessages.Inc()
}
}
데이터를 조회하기 위해서 GetByID와 검색 프로세서가 있습니다.온전한 원본 코드와 당신이 찾을 수 있는 모든 도구 목록here👨💻 :)
나는 이 글이 유용하고 유익하기를 바란다. 나는 어떤 피드백이나 문제를 받아도 매우 기쁠 것이다.
Reference
이 문제에 관하여(Go, NATS, gRPC 및 PostgreSQL clean architecture 마이크로서비스(모니터링 및 추적 기능 포함)👋), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/aleksk1ng/go-nats-grpc-and-postgresql-clean-architecture-microservice-with-monitoring-and-tracing-2kka텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)