Jaeger 소스 확인 - All in One 모드
78824 단어 JaegerKubernetes
Jaeger의 All-in-one 모드는 주로 로컬 서비스를 빠르게 시작해서 테스트하는 데 사용되며, 그 중에서 Jaeger UI,collector,query,agent, 이런 구성 요소를 포함한다.이 모드에서 저장된 데이터는 메모리에 저장된 것이다.
All-in-one 모드를 시작하는 jaeger의 가장 간단한 방법은 Docker 렌즈를 사용하여 시작하는 것입니다.
$ docker run -d --name jaeger \
-e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 9411:9411 \
jaegertracing/all-in-one:1.8
다음 표는 Jaeger All-in-one 노출 포트 목록입니다.
포트
합의
구성 요소
기능
5775
UDP
agent
수락
zipkin.thrift
compact thrift 프로토콜(오래된 클라이언트만 사용)6831
UDP
agent
동의
jaeger.thrift
compact thrift 프로토콜6832
UDP
agent
동의
jaeger.thrift
binary thrift 프로토콜5778
HTTP
agent
서비스 구성
16686
HTTP
query
서비스 프런트엔드
14268
HTTP
collector
클라이언트로부터 직접 수신
jaeger.thrift
프로토콜9411
HTTP
collector
Zipkin 서비스와 호환(옵션)
코드 해석
이 블로그에서 사용하는 코드는 v190 버전입니다.
입구
All-in-one의 입구는
cmd/all-in-one/main.go
에 있습니다.실제로 모든 구성 요소의 입구는 cmd라는 가방 아래에 있습니다.All-in-one은 에이전트query와collector 세 개의 구성 요소를 시작해야 합니다. 모두 입구의 시작 함수에서 이루어집니다.다음은 구체적인 코드를 살펴보겠습니다.
준비 작업
var signalsChannel = make(chan os.Signal)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
먼저 시스템 인터럽트 (os.Interrupt) 와 시스템의kill 명령 (syscall.SIGTERM) 을 수신하는 채널을 만듭니다.
if os.Getenv(storage.SpanStorageTypeEnvVar) == "" {
os.Setenv(storage.SpanStorageTypeEnvVar, "memory") // other storage types default to SpanStorage
}
환경 변수를 설정합니다.
storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
strategyStoreFactory, err := ss.NewFactory(ss.FactoryConfigFromEnv())
위의 한 줄은 저장 공장을 초기화했고, 다음은 샘플링 전략의 공장이다.다음은 이 두 공장이 어떻게 초기화되었는지, 그리고 그들의 역할이 무엇인지 살펴보자.
storageFactory
storage.NewFactory
의 입참은storage.FactoryConfigFromEnvAndCLI
이다. 우선 이 방법을 살펴보자.func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig {
// span
spanStorageType := os.Getenv(SpanStorageTypeEnvVar)
if spanStorageType == "" {
// ,
spanStorageType = spanStorageTypeFromArgs(args, log)
}
if spanStorageType == "" {
// cassandraStorageType
spanStorageType = cassandraStorageType
}
//
spanWriterTypes := strings.Split(spanStorageType, ",")
// ,
if len(spanWriterTypes) > 1 {
fmt.Fprintf(log,
"WARNING: multiple span storage types have been specified. "+
"Only the first type (%s) will be used for reading and archiving.
",
spanWriterTypes[0],
)
}
// dependency
depStorageType := os.Getenv(DependencyStorageTypeEnvVar)
if depStorageType == "" {
depStorageType = spanWriterTypes[0]
}
return FactoryConfig{
SpanWriterTypes: spanWriterTypes,
SpanReaderType: spanWriterTypes[0],
DependenciesStorageType: depStorageType,
}
}
위의 코드를 통해 알 수 있듯이 이 방법의 역할은 환경 변수나 명령줄에서 저장 설정을 읽는 것이다. 현재는 다음과 같은 두 가지 유형의 저장이 있다.
// spans
SpanStorageTypeEnvVar = "SPAN_STORAGE_TYPE"
// dependencies
DependencyStorageTypeEnvVar = "DEPENDENCY_STORAGE_TYPE"
span에는 체인 추적 데이터가 저장되어 있고 dependency에는 관련 데이터가 저장되어 있습니다.Dependencies는 Jaeger UI 위 메뉴의 Dependencies 란입니다.
우리의 상황에서
FactoryConfigFromEnvAndCLI
방법의 반환값은 모두 memory
유형의 저장소이다.그리고 우리 다시
NewFactory
방법을 봅시다.func NewFactory(config FactoryConfig) (*Factory, error) {
f := &Factory{FactoryConfig: config}
uniqueTypes := map[string]struct{}{
f.SpanReaderType: {},
f.DependenciesStorageType: {},
}
//
for _, storageType := range f.SpanWriterTypes {
uniqueTypes[storageType] = struct{}{}
}
f.factories = make(map[string]storage.Factory)
//
for t := range uniqueTypes {
ff, err := f.getFactoryOfType(t)
if err != nil {
return nil, err
}
f.factories[t] = ff
}
return f, nil
}
위의 코드를 보면
NewFactory
방법은 어떤 저장 유형이 필요한지 판단하고 각각 실례화하는 과정이다.strategyStoreFactory
여기
NewFactory
방법도 마찬가지로 인삼FactoryConfigFromEnv
이 하나 있는데 우선 이 방법을 봅시다.func FactoryConfigFromEnv() FactoryConfig {
strategyStoreType := os.Getenv(SamplingTypeEnvVar)
if strategyStoreType == "" {
strategyStoreType = staticStrategyStoreType
}
return FactoryConfig{
StrategyStoreType: strategyStoreType,
}
}
위의 코드를 통해 알 수 있듯이
FactoryConfigFromEnv
방법의 역할은 환경 변수에서 샘플링 유형을 얻는 것이다.모두 두 종류의 샘플링 유형이 있다.staticStrategyStoreType = "static"
adaptiveStrategyStoreType = "adaptive"
기본 샘플링 형식은static입니다.
NewFactory
방법도 상기 방법과 유사하여 설정된 샘플의 유형을 얻어 실례화했다. 주의해야 할 것은 이곳의 샘플링 유형은static만 실현했고 adaptive 유형의 공장은 실현되지 않았다.func NewFactory(config FactoryConfig) (*Factory, error) {
f := &Factory{FactoryConfig: config}
uniqueTypes := map[string]struct{}{
f.StrategyStoreType: {},
}
f.factories = make(map[string]strategystore.Factory)
for t := range uniqueTypes {
ff, err := f.getFactoryOfType(t)
if err != nil {
return nil, err
}
f.factories[t] = ff
}
return f, nil
}
특히 이곳의 f는 진정한 공장이 아니라 f.factories에 존재한다.
구성 초기화
v := viper.New()
여기에 viper 실례를 새로 만듭니다. 이것은 저장 설정입니다.viper는 Go 응용 프로그램의 완전한 설정 솔루션입니다.
다음은command 설정에 대한 코드를 건너뛰고 뒷부분을 직접 보십시오.
flags.SetDefaultHealthCheckPort(collector.CollectorDefaultHealthCheckHTTPPort)
config.AddFlags(
v,
command,
flags.AddConfigFileFlag,
flags.AddFlags,
storageFactory.AddFlags,
agentApp.AddFlags,
agentRep.AddFlags,
agentTchanRep.AddFlags,
agentGrpcRep.AddFlags,
collector.AddFlags,
queryApp.AddFlags,
pMetrics.AddFlags,
strategyStoreFactory.AddFlags,
)
기본 건강 검사 포트를 설정하고
AddFlags
방법에서 기본 파라미터를 이전에 새로 만든viper에 기록합니다.설정 파라미터를 쓴 후에 준비 작업도 완성되었다.이제 시동 단계에 이르렀다.
부팅
시작 방식
cobra
에 설정된 명령줄을 사용했습니다. RunE
방법을 직접 보십시오.sFlags := new(flags.SharedFlags).InitFromViper(v)
logger, err := sFlags.NewLogger(zap.NewProductionConfig())
로그 설정을 초기화합니다. 기본 로그 단계는 info입니다.
hc, err := sFlags.NewHealthCheck(logger)
건강 검진 인터페이스를 감청하다.
mBldr := new(pMetrics.Builder).InitFromViper(v)
// Prometheus
rootMetricsFactory, err := mBldr.CreateMetricsFactory("")
// , Prometheus
metricsFactory := rootMetricsFactory.Namespace(metrics.NSOptions{Name: "jaeger", Tags: nil})
metrics는 관련 설정을 수집합니다. 기본적인metrics 서비스는Prometheus입니다.
storageFactory.InitFromViper(v)
if err := storageFactory.Initialize(metricsFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
메모리 공장 설정을 초기화하는 것은 사실 초기화된 구조체로 돌아가는 것이다. 여기서 이 구조체는
memory
유형이고 이 구조체는span의Reader
,Writer
와dependency의Reader
인터페이스를 실현했다.spanReader, err := storageFactory.CreateSpanReader()
spanWriter, err := storageFactory.CreateSpanWriter()
dependencyReader, err := storageFactory.CreateDependencyReader()
여기는 실례화된span의
Reader
,Writer
와dependency의Reader
인터페이스 세 개의 인터페이스입니다.우리는 이 세 개의 인터페이스가 어떻게 실례화되었는지 구체적으로 보았다.(memory 기반 구현에만 해당)func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
factory, ok := f.factories[f.SpanReaderType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanReaderType)
}
return factory.CreateSpanReader()
}
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return f.store, nil
}
위의 코드에서 알 수 있듯이
CreateSpanReader
방법은 store
구조를 직접 되돌려주었다. 왜냐하면 이 구조에서reader 관련 인터페이스가 이미 실현되었기 때문이다.func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
var writers []spanstore.Writer
for _, storageType := range f.SpanWriterTypes {
factory, ok := f.factories[storageType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", storageType)
}
writer, err := factory.CreateSpanWriter()
if err != nil {
return nil, err
}
writers = append(writers, writer)
}
if len(f.SpanWriterTypes) == 1 {
return writers[0], nil
}
return spanstore.NewCompositeWriter(writers...), nil
}
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return f.store, nil
}
SpanWriter의 논리와reader의 논리는 대체적으로 같다. 다른 것은 writer가 여러 개가 있을 수 있기 때문에 슬라이드에 넣어야 한다.
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
factory, ok := f.factories[f.DependenciesStorageType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.DependenciesStorageType)
}
return factory.CreateDependencyReader()
}
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store, nil
}
DependencyReader와 SpanReader는 논리적으로 일치합니다.
다음은 샘플링 전략 공장의 실례화이다.
strategyStoreFactory.InitFromViper(v)
strategyStore := initSamplingStrategyStore(strategyStoreFactory, metricsFactory, logger)
우선
InitFromViper
방법을 봅시다.func (f *Factory) InitFromViper(v *viper.Viper) {
for _, factory := range f.factories {
if conf, ok := factory.(plugin.Configurable); ok {
conf.InitFromViper(v)
}
}
}
// static/factory.go
func (f *Factory) InitFromViper(v *viper.Viper) {
f.options.InitFromViper(v)
}
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
opts.StrategiesFile = v.GetString(samplingStrategiesFile)
return opts
}
먼저 이전에 예치한 공장을 두루 훑어본 다음
InitFromViper
방법의 실현에 들어간다.여기는static/factory.go
이 실현 방법에 들어갑니다.그리고 samplingStrategiesFile
에 대응하는 값을 꺼내opts
변수에 넣는다.여기서 이 값은 빈 문자열입니다.다음은
initSamplingStrategyStore
방법입니다.func initSamplingStrategyStore(
samplingStrategyStoreFactory *ss.Factory,
metricsFactory metrics.Factory,
logger *zap.Logger,
) strategystore.StrategyStore {
if err := samplingStrategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
}
strategyStore, err := samplingStrategyStoreFactory.CreateStrategyStore()
if err != nil {
logger.Fatal("Failed to create sampling strategy store", zap.Error(err))
}
return strategyStore
}
위의 코드는 두 가지 일을 했다.
Initialize
방법을 호출했다. 이 방법은 여기서logger가 부여한factor 내 변수만 호출하고 깊이 있게 토론하지 않는다.또 다른 방법CreateStrategyStore
은 새로운 샘플링 정책 저장소입니다. 여기는 정적 정책을 사용한 다음에 이 정책을 저장합니다.다음은 몇 걸음을 생략하고 바로 'strategy_store.go
코드에 들어갑니다.func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
h := &strategyStore{
logger: logger,
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
}
strategies, err := loadStrategies(options.StrategiesFile)
if err != nil {
return nil, err
}
h.parseStrategies(strategies)
return h, nil
}
먼저
strategyStore
의 데이터 구조를 새로 만들었는데 그 중에서 serviceStrategies
는 맵 구조의 변수로 구체적인 서비스의 샘플링 전략을 저장한다.map의 키는 서비스의 이름입니다.loadStrategies
방법에서viper에 설정된 정책 경로에 따라 파일을 읽습니다. 이 방법은 비어 있기 때문에 이 방법은 바로 되돌아오고 되돌아오는 수치는 비어 있습니다.다음
parseStrategies
방법에서는 인삼이 비어 있기 때문에 샘플링 정책을 기본 매개 변수로 지정하고 되돌려줍니다.defaultStrategy = sampling.SamplingStrategyResponse{
StrategyType: sampling.SamplingStrategyType_PROBABILISTIC,
ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{
SamplingRate: defaultSamplingProbability,
},
}
기본적인 샘플링 전략은 확률 샘플링이고 사용하는 샘플링 확률은 0.001이다.
다음은 viper에서 파라미터를 가져와 다음 초기화를 위한 준비입니다.
// agent
aOpts := new(agentApp.Builder).InitFromViper(v)
// agent query , grpc tchannel
repOpts := new(agentRep.Options).InitFromViper(v)
// tchannel
tchannelRepOpts := agentTchanRep.NewBuilder().InitFromViper(v, logger)
// grpc
grpcRepOpts := new(agentGrpcRep.Options).InitFromViper(v)
// collector
cOpts := new(collector.CollectorOptions).InitFromViper(v)
// query
qOpts := new(queryApp.QueryOptions).InitFromViper(v)
이어서 이 세 가지 서비스를 각각 시작합니다.
startAgent(aOpts, repOpts, tchannelRepOpts, grpcRepOpts, cOpts, logger, metricsFactory)
grpcServer := startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, hc)
startQuery(qOpts, spanReader, dependencyReader, logger, rootMetricsFactory, metricsFactory, mBldr, hc, archiveOptions(storageFactory, logger))
각각 이 몇 개의 가동 과정을 보아라.
Agent 시작
func startAgent(
b *agentApp.Builder,
repOpts *agentRep.Options,
tchanRep *agentTchanRep.Builder,
grpcRepOpts *agentGrpcRep.Options,
cOpts *collector.CollectorOptions,
logger *zap.Logger,
baseFactory metrics.Factory,
) {
// metrics
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})
cp, err := createCollectorProxy(cOpts, repOpts, tchanRep, grpcRepOpts, logger, metricsFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
}
agent, err := b.CreateAgent(cp, logger, baseFactory)
if err != nil {
logger.Fatal("Unable to initialize Jaeger Agent", zap.Error(err))
}
logger.Info("Starting agent")
if err := agent.Run(); err != nil {
logger.Fatal("Failed to run the agent", zap.Error(err))
}
}
func createCollectorProxy(
cOpts *collector.CollectorOptions,
repOpts *agentRep.Options,
tchanRepOpts *agentTchanRep.Builder,
grpcRepOpts *agentGrpcRep.Options,
logger *zap.Logger,
mFactory metrics.Factory,
) (agentApp.CollectorProxy, error) {
switch repOpts.ReporterType {
case agentRep.GRPC:
grpcRepOpts.CollectorHostPort = append(grpcRepOpts.CollectorHostPort, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort))
return agentGrpcRep.NewCollectorProxy(grpcRepOpts, mFactory, logger)
case agentRep.TCHANNEL:
tchanRepOpts.CollectorHostPorts = append(tchanRepOpts.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort))
return agentTchanRep.NewCollectorProxy(tchanRepOpts, mFactory, logger)
default:
return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(repOpts.ReporterType)))
}
}
Agent 서비스를 시작하기 전에 먼저
createCollectorProxy
방법을 호출하여 Collector의 에이전트를 만들어서 Collector에 데이터를 보고하는 데 사용합니다.이 방법에서는 통신 프로토콜의 유형에 따라 다른 에이전트를 만듭니다.다음 호출
CreateAgent
방법으로 에이전트를 만듭니다. 인삼에 새로 만든 Collector 에이전트를 포함합니다.Agent 생성 인스턴스가 Agent 모듈 범위에 속하므로 이 메서드는 나중에 자세히 설명합니다.위의 절차에 오류가 없으면 에이전트를 시작합니다.
Collector 시작
Collector를 시작하는 코드가 비교적 길습니다. 다음에 단계별로 해체하십시오.
spanBuilder, err := collector.NewSpanHandlerBuilder(
cOpts,
spanWriter,
basic.Options.LoggerOption(logger),
basic.Options.MetricsFactoryOption(metricsFactory),
)
zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := spanBuilder.BuildHandlers()
spanBuilder를 새로 만들고 세 개의handler를 만듭니다.그중
zipkinSpansHandler
과jaegerBatchesHandler
는 TchanCollector 인터페이스를 실현하여 TchanRPC의 호출을 처리할 수 있다. {
ch, err := tchannel.NewChannel("jaeger-collector", &tchannel.ChannelOptions{})
server := thrift.NewServer(ch)
server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler))
server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler))
// handler
server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore)))
portStr := ":" + strconv.Itoa(cOpts.CollectorPort)
listener, err := net.Listen("tcp", portStr)
logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", cOpts.CollectorPort))
// tchan
ch.Serve(listener)
}
위의 코드는 tchannel을 시작해서 처리할 수 있습니다.
zipkinSpansHandler
와 jaegerBatchesHandler
.그런 다음 GRPC 서비스를 시작합니다.
func startGRPCServer(
port int,
handler *collectorApp.GRPCHandler,
samplingStore strategystore.StrategyStore,
logger *zap.Logger,
) (*grpc.Server, error) {
server := grpc.NewServer()
// grpc handler
_, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
if err != nil {
return nil, err
}
return server, err
}
다음 코드는 Zipkin Http Api를 처리하는 데 사용되며, Zipkin 서비스는 데이터를 Collecotor에 직접 보고할 수 있으며, 여기서는 확장되지 않습니다.
여기까지 에이전트 구성 요소가 시작되었습니다. 총 세 개의 서비스가 시작되었습니다. Tchannel, GRPC, 그리고 Zipkin을 전문적으로 처리하는 Http 서비스입니다.
Query 시작
tracer, closer, err := jaegerClientConfig.Configuration{
Sampler: &jaegerClientConfig.SamplerConfig{
Type: "const",
Param: 1.0,
},
RPCMetrics: true,
}.New(
"jaeger-query",
jaegerClientConfig.Metrics(rootFactory),
jaegerClientConfig.Logger(jaegerClientZapLog.NewLogger(logger)),
)
opentracing.SetGlobalTracer(tracer)
위의 코드는 자신의 정보를 수집하고 에이전트 구성 요소에 보고하기 위해tracer를 먼저 만들었습니다.
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query", Tags: nil}))
다음에
spanReader
에 실행 중인 지표 정보를 수집하기 위해metrics와 관련된 방법을 추가했다.여기에 장식기의 디자인 모델을 사용했다.handlerOpts = append(handlerOpts, queryApp.HandlerOptions.Logger(logger), queryApp.HandlerOptions.Tracer(tracer))
apiHandler := queryApp.NewAPIHandler(
spanReader,
depReader,
handlerOpts...)
r := mux.NewRouter()
if qOpts.BasePath != "/" {
r = r.PathPrefix(qOpts.BasePath).Subrouter()
}
// url
apiHandler.RegisterRoutes(r)
queryApp.RegisterStaticHandler(r, logger, qOpts)
// metrics handler
if h := metricsBuilder.Handler(); h != nil {
logger.Info("Registering metrics handler with jaeger-query HTTP server", zap.String("route", metricsBuilder.HTTPRoute))
r.Handle(metricsBuilder.HTTPRoute, h)
}
위 코드에서
apiHandler
는 내장된 모든 API의 집합에 해당하며 전송량의 기능을 이어받는다.go func() {
defer closer.Close()
if err := http.ListenAndServe(portStr, recoveryHandler(r)); err != nil {
logger.Fatal("Could not launch jaeger-query service", zap.Error(err))
}
hc.Set(healthcheck.Unavailable)
}()
마지막으로 협업을 시작해서 http 서비스를 실행합니다.
세 개의 구성 요소가 모두 시작된 후, jaeger all-in-one는 성공적으로 시작되었다.만약 데이터가 에이전트 구성 요소에 보고된다면, 웹 페이지를 열면 추적 상황을 볼 수 있을 것이다.
참조 문서
Golang Signal SIGKILL 및 SIGTERM, SIGINT
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
AKS (Azure Kubernetes Service) 입문 ~ 기초편 ~클라우드 엔지니어인데 Kubernetes를 제대로 만진 적이 없기 때문에 공부해 보았습니다. Azure × Kubernetes 기사가 많지 않으므로 정리해 둡니다. ↓AKS 포털 톱 화면 클러스터 가상화를 위한 설정...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.