ZIO 기반 OpenTelemetry 분산 추적

소개하다.


이 글은 사용 ZIOzio-telemetry Scala 응용 프로그램 구현 OpenTelemetry 분포식 추적에 대한 간략한 설명입니다.
소스 코드를 사용할 수 있습니다here.
이것은 이러한 기술에 대한 소개가 아니지만, 여기에는 몇 가지 좋은 독서가 있다.
  • NewRelic 소개Distributed Tracing.
  • Lightstep 쌍OpenTelemetry에 대한 간략한 요약.
  • 초보적 실시


    프레젠테이션을 위해 manual instrumentation 릴리즈modified에서 zio-grpc 및 HTTP 통신을 포함합니다helloworld example.

  • 원본:hello-client송신대HelloRequestxname,귀환대hello-serverHello,x

  • 수정: 원래 동작을 제외하고 클라이언트는 선택적인 정수 필드HelloResponse를 보내고 서버는 그 값에 따라 gRPC에 HTTP 요청을 실행한다.
  • HTTPBin

    새 플래그 메시지 추가하기


    // The greeting service definition.
    service Greeter {
      // Sends a greeting
      rpc SayHello (HelloRequest) returns (HelloReply) {}
    }
    
    // The request message containing the user's name.
    message HelloRequest {
      string name = 1;
      google.protobuf.Int32Value guess = 2;
    }
    
    // The response message containing the greetings
    message HelloReply {
      string message = 1;
    }
    

    zio grpc 의존 항목 추가


    resolvers += Resolver.sonatypeRepo("snapshots")
    
    addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")
    
    addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.4")
    
    val zioGrpcVersion = "0.5.1+12-93cdbe22-SNAPSHOT"
    
    libraryDependencies ++= Seq(
      "com.thesamet.scalapb.zio-grpc" %% "zio-grpc-codegen" % zioGrpcVersion,
      "com.thesamet.scalapb"          %% "compilerplugin"   % "0.11.5"
    )
    

    추측 설정

  • 부터 guessScala 코드를 생성합니다.
  • HTTP 클라이언트에 의존 .
  • val grpcVersion = "1.41.0"
    val sttpVersion = "3.3.15"
    
    val scalaPBRuntime = "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
    
    val grpcRuntimeDeps = Seq(
      "io.grpc"      % "grpc-netty" % grpcVersion,
      scalaPBRuntime,
      scalaPBRuntime % "protobuf"
    )
    
    val sttpZioDeps = Seq(
      "com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion
    )
    
    lazy val root = Project("opentelemetry-distributed-tracing-zio", file(".")).aggregate(zio)
    
    lazy val zio = commonProject("zio").settings(
      Compile / PB.targets := Seq(
        scalapb.gen(grpc = true)          -> (Compile / sourceManaged).value,
        scalapb.zio_grpc.ZioCodeGenerator -> (Compile / sourceManaged).value
      ),
      libraryDependencies ++= grpcRuntimeDeps ++ sttpZioDeps
    )
    

    sttp 클라이언트 구현

  • localhost:9000을 가리키는 gRPC 클라이언트를 만듭니다.
  • 1개build.sbt를 보냅니다.
  • 병행 발송 5helloworld.proto.
  • 잘못된 추측이 담긴 싱글HelloRequest을 보낸다.
  • 완료를 인쇄하고 종료합니다.
  • object ZClient extends zio.App {
      private val clientLayer = GreeterClient.live(
        ZManagedChannel(
          ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext()
        )
      )
    
      private val singleHello = GreeterClient.sayHello(HelloRequest("World"))
    
      private val multipleHellos = ZIO.collectAllParN(5)(
        List(
          GreeterClient.sayHello(HelloRequest("1", Some(1))),
          GreeterClient.sayHello(HelloRequest("2", Some(2))),
          GreeterClient.sayHello(HelloRequest("3", Some(3))),
          GreeterClient.sayHello(HelloRequest("4", Some(4))),
          GreeterClient.sayHello(HelloRequest("5", Some(5)))
        )
      )
    
      private val invalidHello = GreeterClient.sayHello(HelloRequest("Invalid", Some(-1))).ignore
    
      private def myAppLogic =
        singleHello *> multipleHellos *> invalidHello *> putStrLn("Done")
    
      def run(args: List[String]): URIO[ZEnv, ExitCode] =
        myAppLogic.provideCustomLayer(clientLayer).exitCode
    }
    

    서버 구현

  • HelloRequest 0보다 작으면 요청이 실패합니다.
  • HelloRequest 값을 기준으로 일정 시간 지연한 후 HTTPBin에 요청을 보냅니다.
  • type ZGreeterEnv = Clock with Random with SttpClient
    
    object ZGreeterImpl extends RGreeter[ZGreeterEnv] {
    
      def sayHello(request: HelloRequest): ZIO[ZGreeterEnv, Status, HelloReply] = {
        val guess = request.guess.getOrElse(0)
        for {
          _      <- ZIO.fail(Status.INVALID_ARGUMENT).when(guess < 0)
          code   <- ???
          delayMs = ???
          _      <- httpRequest(code)
                      .delay(delayMs.millis)
                      .mapError(ex => Status.INTERNAL.withCause(ex))
        } yield HelloReply(s"Hello, ${request.name}")
      }
    
      def httpRequest(code: Int): RIO[SttpClient, Unit] =
        send(basicRequest.get(uri"https://httpbin.org/status/$code")).unit
    }
    

    그것을 운행하다

  • 서버를 실행하려면:
  • $ sbt "zio/runMain com.github.tuleism.ZServer"
    
    [info] running (fork) com.github.tuleism.ZServer
    [info] Server is running. Press Ctrl-C to stop.
    
  • 클라이언트를 실행하려면:
  • $ sbt "zio/runMain com.github.tuleism.ZClient"
    
    [info] running (fork) com.github.tuleism.ZClient
    [info] Done
    [success] Total time: 12 s
    
    이때 우리는 클라이언트의 초기화와 완성에 약 12초가 걸린다는 것만 알았다.
    분포식 추적을 추가해서 이 점에 대한 더 많은 견해를 얻을 수 있도록 하겠습니다.

    공통 추적 요구 사항

  • 클라이언트와 서버에 대해 우리는 하나Tracer를 가져오고 창설과 관리를 담당하는 대상Spans을 가져야 한다.
  • 추적 데이터는 Jaeger에 전송되었고 후자는 독립적collector으로 충당되었다.

  • 새 종속성 추가


  • zio-telemetry's OpenTelemetry module .
  • 또한 zio-config 파일에서 추적 설정을 읽고 zio-magic 연결을 간소화하는 데 의존합니다.
  • val openTelemetryVersion = "1.6.0"
    val zioConfigVersion     = "1.0.10"
    val zioMagicVersion      = "0.3.9"
    val zioTelemetryVersion  = "0.8.2"
    
    val openTelemetryDeps = Seq(
      "io.opentelemetry" % "opentelemetry-exporter-jaeger"    % openTelemetryVersion,
      "io.opentelemetry" % "opentelemetry-sdk"                % openTelemetryVersion,
      "io.opentelemetry" % "opentelemetry-extension-noop-api" % s"$openTelemetryVersion-alpha"
    )
    
    val zioConfigDeps = Seq(
      "dev.zio" %% "zio-config"          % zioConfigVersion,
      "dev.zio" %% "zio-config-magnolia" % zioConfigVersion,
      "dev.zio" %% "zio-config-typesafe" % zioConfigVersion
    )
    
    val zioMagicDeps = Seq(
      "io.github.kitlangton" %% "zio-magic" % zioMagicVersion
    )
    
    val zioTelemetryDeps = Seq(
      "dev.zio"                       %% "zio-opentelemetry"                   % zioTelemetryVersion,
      "com.softwaremill.sttp.client3" %% "zio-telemetry-opentelemetry-backend" % sttpVersion
    )
    

    즈레야 구성 레이어 추가


    tracing {
      enable = false
      enable = ${?TRACING_ENABLE}
      endpoint = "http://127.0.0.1:14250"
      endpoint = ${?JAEGER_ENDPOINT}
    }
    
    case class AppConfig(tracing: TracingConfig)
    
    case class TracingConfig(enable: Boolean, endpoint: String)
    
    object AppConfig {
      private val configDescriptor = descriptor[AppConfig]
    
      val live: Layer[ReadError[String], Has[AppConfig]] = TypesafeConfig.fromDefaultLoader(configDescriptor)
    }
    

    추측층 추가

  • 설정에 따라 noopguess을 만들거나 Jaeger에 데이터를 보내는 noop을 만듭니다.
  • 일단 우리가 그것을 가지게 되면 Tracer층을 구축할 수 있고 우리는 많은 를 방문할 수 있다.
  • object ZTracer {
      private val InstrumentationName = "com.github.tuleism"
    
      private def managed(serviceName: String, endpoint: String) = {
        val resource = Resource.builder().put(ResourceAttributes.SERVICE_NAME, serviceName).build()
        for {
          spanExporter   <- ZManaged.fromAutoCloseable(
                              Task(JaegerGrpcSpanExporter.builder().setEndpoint(endpoint).build())
                            )
          spanProcessor  <- ZManaged.fromAutoCloseable(UIO(SimpleSpanProcessor.create(spanExporter)))
          tracerProvider <- UIO(
                              SdkTracerProvider.builder().addSpanProcessor(spanProcessor).setResource(resource).build()
                            ).toManaged_
          openTelemetry  <- UIO(OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build()).toManaged_
          tracer         <- UIO(openTelemetry.getTracer(InstrumentationName)).toManaged_
        } yield tracer
      }
    
      def live(serviceName: String): RLayer[Has[TracingConfig], Has[Tracer]] =
        (
          for {
            config <- ZIO.service[TracingConfig].toManaged_
            tracer <- if (!config.enable) {
                        Task(NoopOpenTelemetry.getInstance().getTracer(InstrumentationName)).toManaged_
                      } else {
                        managed(serviceName, config.endpoint)
                      }
          } yield tracer
        ).toLayer
    }
    

    zio 원격 측정의 유용한 조작 새 서버


    HTTP 클라이언트에 대한 명령 삽입

  • 개봉 즉시 사용 .
  • OpenTelemetrysttp backend에 따라 HTTP 특정 속성을 추가했습니다.
  • object SttpTracing {
      private val wrapper = new ZioTelemetryOpenTelemetryTracer {
        def before[T](request: Request[T, Nothing]): RIO[Tracing, Unit] =
          Tracing.setAttribute(SemanticAttributes.HTTP_METHOD.getKey, request.method.method) *>
            Tracing.setAttribute(SemanticAttributes.HTTP_URL.getKey, request.uri.toString()) *>
            ZIO.unit
    
        def after[T](response: Response[T]): RIO[Tracing, Unit] =
          Tracing.setAttribute(SemanticAttributes.HTTP_STATUS_CODE.getKey, response.code.code) *>
            ZIO.unit
      }
    
      val live = AsyncHttpClientZioBackend.layer().flatMap { hasBackend =>
        ZIO
          .service[Tracing.Service]
          .map { tracing =>
            ZioTelemetryOpenTelemetryBackend(hasBackend.get, tracing, wrapper)
          }
          .toLayer
      }
    }
    

    약속 gRPC 서버에 기기 설치


    우리는 Tracer를 추가할 수 있으며 를 사용하여 서버를 변경할 필요가 없다.각 요청에 대해 다음을 수행합니다.
  • 우리는 ZTransform를 사용한다. 전파된 상하문을 추출하고zio-telemetry's spanFrom 사용gRPC Metadata한 다음에 바로 새로운 하위Tracing를 시작한다.
  • 우리는 W3C Trace Context formatTracing 명칭에 사용된 완전한 방법명을 방문할 수 있다.
  • OpenTelemetryRequestContext에 따라 gRPC의 특정 속성을 추가했습니다.
  • object GrpcTracing {
      private val propagator: TextMapPropagator = W3CTraceContextPropagator.getInstance()
    
      private val metadataGetter: TextMapGetter[Metadata] = new TextMapGetter[Metadata] {
        override def keys(carrier: Metadata): java.lang.Iterable[String] =
          carrier.keys()
    
        override def get(carrier: Metadata, key: String): String =
          carrier.get(
            Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
          )
      }
    
      private def withSemanticAttributes[R, A](effect: ZIO[R, Status, A]): ZIO[Tracing with R, Status, A] =
        Tracing.setAttribute(SemanticAttributes.RPC_SYSTEM.getKey, "grpc") *>
          effect
            .tapBoth(
              status =>
                Tracing.setAttribute(
                  SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey,
                  status.getCode.value()
                ),
              _ =>
                Tracing.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey, Status.OK.getCode.value())
            )
    
      def serverTracingTransform[R]: ZTransform[R, Status, R with Tracing with Has[RequestContext]] =
        new ZTransform[R, Status, R with Tracing with Has[RequestContext]] {
    
          def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing with Has[RequestContext], Status, A] =
            for {
              rc       <- ZIO.service[RequestContext]
              metadata <- rc.metadata.wrap(identity)
              result   <- withSemanticAttributes(io)
                            .spanFrom(
                              propagator,
                              metadata,
                              metadataGetter,
                              rc.methodDescriptor.getFullMethodName,
                              SpanKind.SERVER,
                              { case _ => StatusCode.ERROR }
                            )
            } yield result
    
          def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing with Has[RequestContext], Status, A] =
            ???
        }
    }
    

    약속 주 서버 업데이트

  • 추적에 필요한 레이어를 추가합니다.
  • 원시Span로 변환.
  • import zio.magic._
    
    object ZServer extends ServerMain {
      private val requirements =
        ZLayer
          .wire[ZEnv with ZGreeterEnv](
            ZEnv.live,
            AppConfig.live.narrow(_.tracing),
            ZTracer.live("hello-server"),
            Tracing.live,
            SttpTracing.live
          )
          .orDie
    
      def services: ServiceList[Any] =
        ServiceList
          .add(ZGreeterImpl.transform[ZGreeterEnv, Has[RequestContext]](GrpcTracing.serverTracingTransform))
          .provideLayer(requirements)
    }
    

    신규 고객


    현재 컨텍스트 주입 gRPC 메타데이터의 경우


    object GrpcTracing {
    
      ...
    
      private val metadataSetter: TextMapSetter[Metadata] = (carrier, key, value) =>
        carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)
    
      val contextPropagationClientInterceptor: ZClientInterceptor[Tracing] = ZClientInterceptor.headersUpdater {
        (_, _, metadata) =>
          metadata.wrapM(Tracing.inject(propagator, _, metadataSetter))
      }
    
      ...
    
    }
    
    object ZClient extends zio.App {
      private val clientLayer = GreeterClient.live(
        ZManagedChannel(
          ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext(),
          Seq(GrpcTracing.contextPropagationClientInterceptor)
        )
      )
    
      ...
    }
    

    컨텍스트 전파 요청마다 경계를 시작합니다

  • 사용ZGreeterImpl으로 관련 gRPC 속성을 기록합니다.
  • object GrpcTracing {
    
      ...
    
      def clientTracingTransform[R]: ZTransform[R, Status, R with Tracing] =
        new ZTransform[R, Status, R with Tracing] {
          def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing, Status, A] = withSemanticAttributes(io)
    
          def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing, Status, A] = ???
        }
    }
    
  • 서버와 달리 Span 객체에 액세스할 수 없으므로 메소드 이름을 수동으로 설정해야 합니다.
  • 추가 ZTransforms도 가동했습니다.
  • object ZClient extends zio.App {
    
      ...
    
    
      private def errorToStatusCode[E]: PartialFunction[E, StatusCode] = { case _ => StatusCode.ERROR }
    
      private def sayHello(request: HelloRequest) =
        GreeterClient
          .sayHello(request)
          .span(
            GreeterGrpc.METHOD_SAY_HELLO.getFullMethodName,
            SpanKind.CLIENT,
            errorToStatusCode
          )
    
      private val singleHello = sayHello(HelloRequest("World"))
        .span("singleHello", toErrorStatus = errorToStatusCode)
    
      private val multipleHellos = ZIO
        .collectAllParN(5)(
          List(
            sayHello(HelloRequest("1", Some(1))),
            sayHello(HelloRequest("2", Some(2))),
            sayHello(HelloRequest("3", Some(3))),
            sayHello(HelloRequest("4", Some(4))),
            sayHello(HelloRequest("5", Some(5)))
          )
        )
        .span("multipleHellos", toErrorStatus = errorToStatusCode)
    
      private val invalidHello = sayHello(HelloRequest("Invalid", Some(-1))).ignore
        .span("invalidHello", toErrorStatus = errorToStatusCode)
    }
    

    원하는 도면층 추가


    object ZClient extends zio.App {
    
      ...
    
      private val requirements = ZLayer
        .wire[ZEnv with Tracing](
          ZEnv.live,
          AppConfig.live.narrow(_.tracing),
          ZTracer.live("hello-client"),
          Tracing.live
        ) >+> clientLayer
    
      def run(args: List[String]): URIO[ZEnv, ExitCode] =
        myAppLogic.provideCustomLayer(requirements).exitCode
    }
    

    공연 시간


    Jaeger에게 Docker를 입히다

  • 추적 데이터는 포트 14250으로 전송됩니다.
  • 우리는 에서 볼 수 있다Jaeger UI.
  • $ docker run --rm --name jaeger \
      -p 16686:16686 \
      -p 14250:14250 \
      jaegertracing/all-in-one:1.25
    

    http://localhost:16686 서버 시작


    $ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZServer"
    
    [info] running (fork) com.github.tuleism.ZServer
    [info] Server is running. Press Ctrl-C to stop.
    

    클라이언트 시작


    $ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZClient"
    
    [info] running (fork) com.github.tuleism.ZClient
    [info] Done
    [success] Total time: 12 s
    

    분산 추적이 시작됩니다!

  • 이제 RequestContext에 대한 자세한 내용을 볼 수 있습니다.
  • 와 최장 지연Span.

  • 로그와의 통합

  • 다음 로그 메시지에 추적 상하문을 추가합니다.
  • 우리는 를 사용할 것이다. 이것은 우리가 가장 좋아하는 로그 라이브러리이다.
  • 참조specification.
  • 고이즈미 무대 로그 종속성 추가


    val izumiVersion         = "1.0.8"
    
    val loggingDeps = Seq(
      "io.7mind.izumi" %% "logstage-core"          % izumiVersion,
      "io.7mind.izumi" %% "logstage-adapter-slf4j" % izumiVersion
    )
    

    차이점 로그 설정

  • 현재 추적 상하문이 유효하면 multipleHellos, guess를 로그 상하문에 추가합니다.
  • object Logging {
      private def baseLogger = IzLogger()
    
      val live: ZLayer[Has[Tracing.Service], Nothing, Has[LogZIO.Service]] =
        (
          for {
            tracing <- ZIO.service[Tracing.Service]
          } yield LogZIO.withDynamicContext(baseLogger)(
            Tracing.getCurrentSpanContext
              .map(spanContext =>
                if (spanContext.isValid)
                  CustomContext(
                    "trace_id"    -> spanContext.getTraceId,
                    "span_id"     -> spanContext.getSpanId,
                    "trace_flags" -> spanContext.getTraceFlags.asHex()
                  )
                else
                  CustomContext.empty
              )
              .provide(Has(tracing))
          )
        ).toLayer
    }
    

    로그 메시지 추가

  • 예: trace_id.
  • object ZClient extends zio.App {
    
      ...
    
      private val singleHello = (
        for {
          _ <- log.info("singleHello")
          _ <- sayHello(HelloRequest("World"))
        } yield ()
      ).span("singleHello", toErrorStatus = errorToStatusCode)
    
    }
    

    샘플 로그


    [info] running (fork) com.github.tuleism.ZClient
    [info] I 2021-11-01T22:59:10.881 (ZClient.scala:37)  …tuleism.ZClient.singleHello [24:zio-default-async-11] trace_id=9c8a7ebb87381293bc8937a5f7673cb9, span_id=cb7c9a440472e1be, trace_flags=01 singleHello
    [info] I 2021-11-01T22:59:14.064 (ZClient.scala:44)  …eism.ZClient.multipleHellos [21:zio-default-async-8 ] trace_id=fe405246fbaa5f876c19f14fa649a99f, span_id=bef19494bef4106e, trace_flags=01 multipleHellos
    [info] I 2021-11-01T22:59:18.171 (ZClient.scala:60)  …uleism.ZClient.invalidHello [26:zio-default-async-13] trace_id=be5ccd425e0cfb01fd97274abd0c4d72, span_id=ea6499fb9a7c8d28, trace_flags=01 invalidHello
    [info] I 2021-11-01T22:59:18.272 (ZClient.scala:66)  ….tuleism.ZClient.myAppLogic [15:zio-default-async-2 ] Done
    [success] Total time: 12 s
    

    추가 설명

  • HTTP 5xx 응답을 받으면 에 따라 span_id 상태를 오류로 설정해야 합니다.그러나 그것은 현재 singleHello이다.
  • 우리는 semantic convention 클라이언트를 위해 추적을 실현해야 한다.
  • 좋은 웹페이지 즐겨찾기