ZIO 기반 OpenTelemetry 분산 추적
소개하다.
이 글은 사용 ZIO 과 zio-telemetry Scala 응용 프로그램 구현 OpenTelemetry 분포식 추적에 대한 간략한 설명입니다.
소스 코드를 사용할 수 있습니다here.
이것은 이러한 기술에 대한 소개가 아니지만, 여기에는 몇 가지 좋은 독서가 있다.
초보적 실시
프레젠테이션을 위해 manual instrumentation 릴리즈modified에서 zio-grpc 및 HTTP 통신을 포함합니다helloworld example.
원본:
hello-client
송신대HelloRequest
xname
,귀환대hello-server
Hello,x수정: 원래 동작을 제외하고 클라이언트는 선택적인 정수 필드
HelloResponse
를 보내고 서버는 그 값에 따라 gRPC에 HTTP 요청을 실행한다.새 플래그 메시지 추가하기
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
google.protobuf.Int32Value guess = 2;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
zio grpc 의존 항목 추가
resolvers += Resolver.sonatypeRepo("snapshots")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.4")
val zioGrpcVersion = "0.5.1+12-93cdbe22-SNAPSHOT"
libraryDependencies ++= Seq(
"com.thesamet.scalapb.zio-grpc" %% "zio-grpc-codegen" % zioGrpcVersion,
"com.thesamet.scalapb" %% "compilerplugin" % "0.11.5"
)
추측 설정
guess
Scala 코드를 생성합니다.val grpcVersion = "1.41.0"
val sttpVersion = "3.3.15"
val scalaPBRuntime = "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
val grpcRuntimeDeps = Seq(
"io.grpc" % "grpc-netty" % grpcVersion,
scalaPBRuntime,
scalaPBRuntime % "protobuf"
)
val sttpZioDeps = Seq(
"com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion
)
lazy val root = Project("opentelemetry-distributed-tracing-zio", file(".")).aggregate(zio)
lazy val zio = commonProject("zio").settings(
Compile / PB.targets := Seq(
scalapb.gen(grpc = true) -> (Compile / sourceManaged).value,
scalapb.zio_grpc.ZioCodeGenerator -> (Compile / sourceManaged).value
),
libraryDependencies ++= grpcRuntimeDeps ++ sttpZioDeps
)
sttp 클라이언트 구현
build.sbt
를 보냅니다.helloworld.proto
.HelloRequest
을 보낸다.object ZClient extends zio.App {
private val clientLayer = GreeterClient.live(
ZManagedChannel(
ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext()
)
)
private val singleHello = GreeterClient.sayHello(HelloRequest("World"))
private val multipleHellos = ZIO.collectAllParN(5)(
List(
GreeterClient.sayHello(HelloRequest("1", Some(1))),
GreeterClient.sayHello(HelloRequest("2", Some(2))),
GreeterClient.sayHello(HelloRequest("3", Some(3))),
GreeterClient.sayHello(HelloRequest("4", Some(4))),
GreeterClient.sayHello(HelloRequest("5", Some(5)))
)
)
private val invalidHello = GreeterClient.sayHello(HelloRequest("Invalid", Some(-1))).ignore
private def myAppLogic =
singleHello *> multipleHellos *> invalidHello *> putStrLn("Done")
def run(args: List[String]): URIO[ZEnv, ExitCode] =
myAppLogic.provideCustomLayer(clientLayer).exitCode
}
서버 구현
HelloRequest
0보다 작으면 요청이 실패합니다.HelloRequest
값을 기준으로 일정 시간 지연한 후 HTTPBin에 요청을 보냅니다.type ZGreeterEnv = Clock with Random with SttpClient
object ZGreeterImpl extends RGreeter[ZGreeterEnv] {
def sayHello(request: HelloRequest): ZIO[ZGreeterEnv, Status, HelloReply] = {
val guess = request.guess.getOrElse(0)
for {
_ <- ZIO.fail(Status.INVALID_ARGUMENT).when(guess < 0)
code <- ???
delayMs = ???
_ <- httpRequest(code)
.delay(delayMs.millis)
.mapError(ex => Status.INTERNAL.withCause(ex))
} yield HelloReply(s"Hello, ${request.name}")
}
def httpRequest(code: Int): RIO[SttpClient, Unit] =
send(basicRequest.get(uri"https://httpbin.org/status/$code")).unit
}
그것을 운행하다
$ sbt "zio/runMain com.github.tuleism.ZServer"
[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.
$ sbt "zio/runMain com.github.tuleism.ZClient"
[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s
이때 우리는 클라이언트의 초기화와 완성에 약 12초가 걸린다는 것만 알았다.분포식 추적을 추가해서 이 점에 대한 더 많은 견해를 얻을 수 있도록 하겠습니다.
공통 추적 요구 사항
새 종속성 추가
zio-telemetry's OpenTelemetry module .
val openTelemetryVersion = "1.6.0"
val zioConfigVersion = "1.0.10"
val zioMagicVersion = "0.3.9"
val zioTelemetryVersion = "0.8.2"
val openTelemetryDeps = Seq(
"io.opentelemetry" % "opentelemetry-exporter-jaeger" % openTelemetryVersion,
"io.opentelemetry" % "opentelemetry-sdk" % openTelemetryVersion,
"io.opentelemetry" % "opentelemetry-extension-noop-api" % s"$openTelemetryVersion-alpha"
)
val zioConfigDeps = Seq(
"dev.zio" %% "zio-config" % zioConfigVersion,
"dev.zio" %% "zio-config-magnolia" % zioConfigVersion,
"dev.zio" %% "zio-config-typesafe" % zioConfigVersion
)
val zioMagicDeps = Seq(
"io.github.kitlangton" %% "zio-magic" % zioMagicVersion
)
val zioTelemetryDeps = Seq(
"dev.zio" %% "zio-opentelemetry" % zioTelemetryVersion,
"com.softwaremill.sttp.client3" %% "zio-telemetry-opentelemetry-backend" % sttpVersion
)
즈레야 구성 레이어 추가
tracing {
enable = false
enable = ${?TRACING_ENABLE}
endpoint = "http://127.0.0.1:14250"
endpoint = ${?JAEGER_ENDPOINT}
}
case class AppConfig(tracing: TracingConfig)
case class TracingConfig(enable: Boolean, endpoint: String)
object AppConfig {
private val configDescriptor = descriptor[AppConfig]
val live: Layer[ReadError[String], Has[AppConfig]] = TypesafeConfig.fromDefaultLoader(configDescriptor)
}
추측층 추가
guess
을 만들거나 Jaeger에 데이터를 보내는 noop을 만듭니다.Tracer
층을 구축할 수 있고 우리는 많은
를 방문할 수 있다.object ZTracer {
private val InstrumentationName = "com.github.tuleism"
private def managed(serviceName: String, endpoint: String) = {
val resource = Resource.builder().put(ResourceAttributes.SERVICE_NAME, serviceName).build()
for {
spanExporter <- ZManaged.fromAutoCloseable(
Task(JaegerGrpcSpanExporter.builder().setEndpoint(endpoint).build())
)
spanProcessor <- ZManaged.fromAutoCloseable(UIO(SimpleSpanProcessor.create(spanExporter)))
tracerProvider <- UIO(
SdkTracerProvider.builder().addSpanProcessor(spanProcessor).setResource(resource).build()
).toManaged_
openTelemetry <- UIO(OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build()).toManaged_
tracer <- UIO(openTelemetry.getTracer(InstrumentationName)).toManaged_
} yield tracer
}
def live(serviceName: String): RLayer[Has[TracingConfig], Has[Tracer]] =
(
for {
config <- ZIO.service[TracingConfig].toManaged_
tracer <- if (!config.enable) {
Task(NoopOpenTelemetry.getInstance().getTracer(InstrumentationName)).toManaged_
} else {
managed(serviceName, config.endpoint)
}
} yield tracer
).toLayer
}
zio 원격 측정의 유용한 조작 새 서버
HTTP 클라이언트에 대한 명령 삽입
object SttpTracing {
private val wrapper = new ZioTelemetryOpenTelemetryTracer {
def before[T](request: Request[T, Nothing]): RIO[Tracing, Unit] =
Tracing.setAttribute(SemanticAttributes.HTTP_METHOD.getKey, request.method.method) *>
Tracing.setAttribute(SemanticAttributes.HTTP_URL.getKey, request.uri.toString()) *>
ZIO.unit
def after[T](response: Response[T]): RIO[Tracing, Unit] =
Tracing.setAttribute(SemanticAttributes.HTTP_STATUS_CODE.getKey, response.code.code) *>
ZIO.unit
}
val live = AsyncHttpClientZioBackend.layer().flatMap { hasBackend =>
ZIO
.service[Tracing.Service]
.map { tracing =>
ZioTelemetryOpenTelemetryBackend(hasBackend.get, tracing, wrapper)
}
.toLayer
}
}
약속 gRPC 서버에 기기 설치
우리는
Tracer
를 추가할 수 있으며
를 사용하여 서버를 변경할 필요가 없다.각 요청에 대해 다음을 수행합니다.Tracing
를 시작한다.Tracing
명칭에 사용된 완전한 방법명을 방문할 수 있다.object GrpcTracing {
private val propagator: TextMapPropagator = W3CTraceContextPropagator.getInstance()
private val metadataGetter: TextMapGetter[Metadata] = new TextMapGetter[Metadata] {
override def keys(carrier: Metadata): java.lang.Iterable[String] =
carrier.keys()
override def get(carrier: Metadata, key: String): String =
carrier.get(
Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
)
}
private def withSemanticAttributes[R, A](effect: ZIO[R, Status, A]): ZIO[Tracing with R, Status, A] =
Tracing.setAttribute(SemanticAttributes.RPC_SYSTEM.getKey, "grpc") *>
effect
.tapBoth(
status =>
Tracing.setAttribute(
SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey,
status.getCode.value()
),
_ =>
Tracing.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey, Status.OK.getCode.value())
)
def serverTracingTransform[R]: ZTransform[R, Status, R with Tracing with Has[RequestContext]] =
new ZTransform[R, Status, R with Tracing with Has[RequestContext]] {
def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing with Has[RequestContext], Status, A] =
for {
rc <- ZIO.service[RequestContext]
metadata <- rc.metadata.wrap(identity)
result <- withSemanticAttributes(io)
.spanFrom(
propagator,
metadata,
metadataGetter,
rc.methodDescriptor.getFullMethodName,
SpanKind.SERVER,
{ case _ => StatusCode.ERROR }
)
} yield result
def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing with Has[RequestContext], Status, A] =
???
}
}
약속 주 서버 업데이트
Span
로 변환.import zio.magic._
object ZServer extends ServerMain {
private val requirements =
ZLayer
.wire[ZEnv with ZGreeterEnv](
ZEnv.live,
AppConfig.live.narrow(_.tracing),
ZTracer.live("hello-server"),
Tracing.live,
SttpTracing.live
)
.orDie
def services: ServiceList[Any] =
ServiceList
.add(ZGreeterImpl.transform[ZGreeterEnv, Has[RequestContext]](GrpcTracing.serverTracingTransform))
.provideLayer(requirements)
}
신규 고객
현재 컨텍스트 주입 gRPC 메타데이터의 경우
object GrpcTracing {
...
private val metadataSetter: TextMapSetter[Metadata] = (carrier, key, value) =>
carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)
val contextPropagationClientInterceptor: ZClientInterceptor[Tracing] = ZClientInterceptor.headersUpdater {
(_, _, metadata) =>
metadata.wrapM(Tracing.inject(propagator, _, metadataSetter))
}
...
}
object ZClient extends zio.App {
private val clientLayer = GreeterClient.live(
ZManagedChannel(
ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext(),
Seq(GrpcTracing.contextPropagationClientInterceptor)
)
)
...
}
컨텍스트 전파 요청마다 경계를 시작합니다
ZGreeterImpl
으로 관련 gRPC 속성을 기록합니다.object GrpcTracing {
...
def clientTracingTransform[R]: ZTransform[R, Status, R with Tracing] =
new ZTransform[R, Status, R with Tracing] {
def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing, Status, A] = withSemanticAttributes(io)
def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing, Status, A] = ???
}
}
Span
객체에 액세스할 수 없으므로 메소드 이름을 수동으로 설정해야 합니다.ZTransform
s도 가동했습니다.object ZClient extends zio.App {
...
private def errorToStatusCode[E]: PartialFunction[E, StatusCode] = { case _ => StatusCode.ERROR }
private def sayHello(request: HelloRequest) =
GreeterClient
.sayHello(request)
.span(
GreeterGrpc.METHOD_SAY_HELLO.getFullMethodName,
SpanKind.CLIENT,
errorToStatusCode
)
private val singleHello = sayHello(HelloRequest("World"))
.span("singleHello", toErrorStatus = errorToStatusCode)
private val multipleHellos = ZIO
.collectAllParN(5)(
List(
sayHello(HelloRequest("1", Some(1))),
sayHello(HelloRequest("2", Some(2))),
sayHello(HelloRequest("3", Some(3))),
sayHello(HelloRequest("4", Some(4))),
sayHello(HelloRequest("5", Some(5)))
)
)
.span("multipleHellos", toErrorStatus = errorToStatusCode)
private val invalidHello = sayHello(HelloRequest("Invalid", Some(-1))).ignore
.span("invalidHello", toErrorStatus = errorToStatusCode)
}
원하는 도면층 추가
object ZClient extends zio.App {
...
private val requirements = ZLayer
.wire[ZEnv with Tracing](
ZEnv.live,
AppConfig.live.narrow(_.tracing),
ZTracer.live("hello-client"),
Tracing.live
) >+> clientLayer
def run(args: List[String]): URIO[ZEnv, ExitCode] =
myAppLogic.provideCustomLayer(requirements).exitCode
}
공연 시간
Jaeger에게 Docker를 입히다
$ docker run --rm --name jaeger \
-p 16686:16686 \
-p 14250:14250 \
jaegertracing/all-in-one:1.25
http://localhost:16686 서버 시작
$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZServer"
[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.
클라이언트 시작
$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZClient"
[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s
분산 추적이 시작됩니다!
RequestContext
에 대한 자세한 내용을 볼 수 있습니다.Span
.로그와의 통합
고이즈미 무대 로그 종속성 추가
val izumiVersion = "1.0.8"
val loggingDeps = Seq(
"io.7mind.izumi" %% "logstage-core" % izumiVersion,
"io.7mind.izumi" %% "logstage-adapter-slf4j" % izumiVersion
)
차이점 로그 설정
multipleHellos
, guess
를 로그 상하문에 추가합니다.object Logging {
private def baseLogger = IzLogger()
val live: ZLayer[Has[Tracing.Service], Nothing, Has[LogZIO.Service]] =
(
for {
tracing <- ZIO.service[Tracing.Service]
} yield LogZIO.withDynamicContext(baseLogger)(
Tracing.getCurrentSpanContext
.map(spanContext =>
if (spanContext.isValid)
CustomContext(
"trace_id" -> spanContext.getTraceId,
"span_id" -> spanContext.getSpanId,
"trace_flags" -> spanContext.getTraceFlags.asHex()
)
else
CustomContext.empty
)
.provide(Has(tracing))
)
).toLayer
}
로그 메시지 추가
trace_id
.object ZClient extends zio.App {
...
private val singleHello = (
for {
_ <- log.info("singleHello")
_ <- sayHello(HelloRequest("World"))
} yield ()
).span("singleHello", toErrorStatus = errorToStatusCode)
}
샘플 로그
[info] running (fork) com.github.tuleism.ZClient
[info] I 2021-11-01T22:59:10.881 (ZClient.scala:37) …tuleism.ZClient.singleHello [24:zio-default-async-11] trace_id=9c8a7ebb87381293bc8937a5f7673cb9, span_id=cb7c9a440472e1be, trace_flags=01 singleHello
[info] I 2021-11-01T22:59:14.064 (ZClient.scala:44) …eism.ZClient.multipleHellos [21:zio-default-async-8 ] trace_id=fe405246fbaa5f876c19f14fa649a99f, span_id=bef19494bef4106e, trace_flags=01 multipleHellos
[info] I 2021-11-01T22:59:18.171 (ZClient.scala:60) …uleism.ZClient.invalidHello [26:zio-default-async-13] trace_id=be5ccd425e0cfb01fd97274abd0c4d72, span_id=ea6499fb9a7c8d28, trace_flags=01 invalidHello
[info] I 2021-11-01T22:59:18.272 (ZClient.scala:66) ….tuleism.ZClient.myAppLogic [15:zio-default-async-2 ] Done
[success] Total time: 12 s
추가 설명
span_id
상태를 오류로 설정해야 합니다.그러나 그것은 현재
와singleHello
이다.Reference
이 문제에 관하여(ZIO 기반 OpenTelemetry 분산 추적), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/tuleism/opentelemetry-distributed-tracing-with-zio-45di텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)