합류 - MQTT <-> KAFKA
지원되는 Spark Streaming 소스
MQTT가 무엇인지 찾는 사람들을 위해?
https://mqtt.org/
따라서 스파크에서 mqtt 데이터를 스트림에 사용할 수 있도록 하려면 MQTT를 Kafka와 연결해야 합니다. 이를 달성하는 데 사용할 수 있는 많은 도구가 있습니다. 그 중 하나는 https://www.confluent.io/입니다.
MQTT 장치 데이터를 Kafka 대기열로 가져오려면 아래 단계를 따르십시오.
합류 설치
-> install confluent in local machine (https://www.confluent.io/installation/)
-> set the path for confluent cli
서비스 상태 확인
confluent local services kafka status
Confluent CLI를 사용하여 KAFKA 시작
confluent local services kafka start
KAFKA를 시작한 후 KAFKA CONNECT 서비스를 시작하십시오.
confluent local services connect start
모스키토 설치
데이터를 게시하고 구독할 수 있는 주제를 노출하는 MQTT 브로커를 실행하려면 mosquitto가 필요합니다.
https://mosquitto.org/
brew instal mosquitto
로컬에서 모기 브로커 시작
brew services start mosquitto
Mosquitto 주제에 게시할 수 있는지 확인
mosquitto_pub -h localhost -p 1883 -t temperature -m "sample-msg-1"
*주제를 구독하고 메시지를 보내보세요 *
mosquitto_sub -h localhost -p 1883 -t temperature
MQTT 커넥터 플러그인 설치
confluent-hub install confluentinc/kafka-connect-mqtt:latest
연결 서비스 다시 시작 및 시작
confluent local services connect stop && confluent local services connect start
아래 명령을 사용하여 MQTT 플러그인이 설치되어 있는지 확인하십시오.
curl -s "http://localhost:8083/connector-plugins"
아래 명령을 사용하여 MQTT 브로커와 KAFKA 연결
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "tcp://127.0.0.1:1883",
"mqtt.topics" : "temperature",
"kafka.topic" : "mqtt.temperature",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'
커넥터의 상태가 실행 중인지 확인하십시오.
curl -s "http://localhost:8083/connectors"
curl -s "http://localhost:8083/connectors/mqtt-source/status"
*KAFKA 토픽 만들기 *
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mqtt.temperature
최종적으로 두 명의 소비자를 생성하여 테스트
mosquitto_sub -h localhost -p 1883 -t temperature
kafka-console-consumer --bootstrap-server localhost:9092 --topic mqtt.temperature --property print.key=true --from-beginning
이제 데이터를 mqtt 주제에 게시하면 위에서 생성한 두 소비자 모두 mqtt 주제에 푸시된 데이터를 수신하게 됩니다.
Reference
이 문제에 관하여(합류 - MQTT <-> KAFKA), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/hariraghupathy/confluent-mqtt-kafka-38ac텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)