kafka connect e elasticsearch를 관찰할 수 있습니다.
21144 단어 elasticsearchtodayilearned
카프카 연결
O kafka connect é um serviço do kafka que serve para pegar dados de um serviço e jogar para um outro.
예를 들어 pegar os dados do meu banco de dados e passar para um arquivo txt, ou para um elastic search ou algo desse tipo
O kafka connect é um cluster com diversas maquinas que realizam essas tarefas, essas maquinas são chamadas de worker sendo que cada worker pode lidar com mais de uma tarefa.
카프카를 위한 대시보드
Irei deixar o yml은 최종 게시물을 작성하지 않습니다. Mas uma coisa q ele faz é colocar o control center da confluent na porta 9021 e é isso q vamos abrir.
No menu lateral do dashboard tem a opção de connectors onde ele mostra todos os clusters do kafka connect conectados atualmente, de inicio não vai ter nenhum, ainda temos que fazer essa conexão, quando subir o docker-compose vai perceber que ele vai criar duas 파스타 no seu diretório atual
data
e es01
essas pastas vão ser usadas para manter as configurações. Porem além delas também vamos criar uma 파스타 커넥터 com um arquivo elasticsearch.properties
, nesse arquivo vamos definir algumas configurações como o nome do desse conector, a classe dele, os tópicos que esse conector vai ouvir, a url de conexão, o tipo dos documentos eo conversor de valores para que os dados cheguem como jsonname=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
topics=route.new-direction,route.new-position
connection.url=http://es01:9200
type.name=_doc
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
schema.ignore=true
key.ignore=true
transforms=InsertField
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertField.timestamp.field=timestamp
Com esse arquivo criado podemos voltar ao dashboard na parte de conectores e adicionar um novo conector fazendo o upload dele. Se tudo der certo (caso algo dê errado escrevi o que aconteceu de errado comigo e como solucionei no final do post) na parte de connectors ele vai exibir um connector como running
Se formos agora no Kibana, que está na porta 5601 podemos abrir o menu lateral e ir em Stack Management
Nessa 페이지 항목 o Index Management que vai nos mostrar já os dois tópicos que selecionamos pra ele escutar. E temos também o Index Patterns.
Onde vamos criar um novo index-patern com base nos nossos tópicos, se eles já tiverem recebido uma mensagem com o kibana rodando quando criarmos o pattern ele já vai vai vir com os campos desse tópico
Agora se formos no menu lateral do kibana e clickar na parte de analytics ele já vai passar a mostrar todas as mensagem que recebemos em cada tópico
추가 정보 os tipos dos campos
Para ficar com os index melhor formatados apaguei os que foram gerados automaticamente e fui na parte de devTools do menu do kibana, lá escrevi os seguintes comandos.
PUT route.new-position
{
"mappings": {
"properties": {
"clientId": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"routeId": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"timestamp": {
"type": "date"
},
"finished": {
"type": "boolean"
},
"position": {
"type": "geo_point"
}
}
}
}
PUT route.new-direction
{
"mappings": {
"properties": {
"clientId": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"routeId": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"timestamp": {
"type": "date"
}
}
}
}
Agora o position vai ser lido como uma localização ao invés de numero como estava antes e podemos visualizar ele no mapa
clickando em 시각화 vamos para o mapa onde podemos visualizar esses 위치 no mapa
Criando 시각화
Na opção de visualizações vamos criar uma nova visualização do tipo Lens
Dentro da parte de visualizações podemos adicionar os dados que queremos e exibir-los da forma desejada
예를 들어, coloquei o contador de records do topico new-direction, métrica e coloquei o nome de
corridas
및 então salvei를 참조하십시오.Criei também a visualização de mapa e por fim criei um dashboard com todas essas informações
도커 작성
Aqui o docker compose que usei para aprender, lembre de trocar o ip para o seu próprio, caso não saiba qual o ip pode rodar esse comando
docker run -it --rm alpine nslookup host.docker.internal
que o docker te informa qual o ipversion: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
extra_hosts:
- "host.docker.internal:192.168.65.2"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://host.docker.internal:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
extra_hosts:
- "host.docker.internal:192.168.65.2"
kafka-topics-generator:
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
command: >
bash -c
"sleep 5s &&
kafka-topics --create --topic=route.new-direction --if-not-exists --bootstrap-server=kafka:9092 &&
kafka-topics --create --topic=route.new-position --if-not-exists --bootstrap-server=kafka:9092"
control-center:
image: confluentinc/cp-enterprise-control-center:6.0.1
hostname: control-center
depends_on:
- kafka
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:9092'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_CONNECT_CLUSTER: http://kafka-connect:8083
PORT: 9021
extra_hosts:
- "host.docker.internal:192.168.65.2"
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.0.0
container_name: kafka-connect
depends_on:
- zookeeper
- kafka
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# # Optional settings to include to support Confluent Control Center
# CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
# CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- $PWD/data:/data
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:10.0.1
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
extra_hosts:
- "host.docker.internal:192.168.65.2"
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.2
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- cluster.initial_master_nodes=es01
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- ./es01:/usr/share/elasticsearch/data
ports:
- 9200:9200
extra_hosts:
- "host.docker.internal:192.168.65.2"
kibana:
image: docker.elastic.co/kibana/kibana:7.11.2
container_name: kib01
ports:
- 5601:5601
environment:
ELASTICSEARCH_URL: http://es01:9200
ELASTICSEARCH_HOSTS: '["http://es01:9200"]'
extra_hosts:
- "host.docker.internal:192.168.65.2"
Encontrei 문제
sudo chown -R 1000:1000 es01
sysctl -w vm.max_map_count=262144
, porem isso só dura a sessão atual, se quiser uma solução mais duradoura nessa pagina encontrei tambemhttps://stackoverflow.com/questions/51445846/elasticsearch-max-virtual-memory-areas-vm-max-map-count-65530-is-too-low-inc
Reference
이 문제에 관하여(kafka connect e elasticsearch를 관찰할 수 있습니다.), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/yanpiing/observabilidade-com-kafka-connect-e-elasticsearch-30lb텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)