합류 - MQTT <-> KAFKA

3968 단어
Spark 구조적 스트리밍에는 최신 버전에서 MQTT 데이터 소스를 수신하는 데 몇 가지 제한 사항이 있습니다. 사용 가능한 일부 확장이 있지만 Spark 버전 2.0만 지원합니다.

지원되는 Spark Streaming 소스
  • 플룸
  • 카프카
  • 파일 소스
  • TCP/IP 포트
  • 키네시스
  • 트위터

  • MQTT가 무엇인지 찾는 사람들을 위해?
    https://mqtt.org/

    따라서 스파크에서 mqtt 데이터를 스트림에 사용할 수 있도록 하려면 MQTT를 Kafka와 연결해야 합니다. 이를 달성하는 데 사용할 수 있는 많은 도구가 있습니다. 그 중 하나는 https://www.confluent.io/입니다.

    MQTT 장치 데이터를 Kafka 대기열로 가져오려면 아래 단계를 따르십시오.

    합류 설치

        -> install confluent in local machine (https://www.confluent.io/installation/)
        -> set the path for confluent cli
    


    서비스 상태 확인

        confluent local services kafka status
    


    Confluent CLI를 사용하여 KAFKA 시작

        confluent local services kafka start
    


    KAFKA를 시작한 후 KAFKA CONNECT 서비스를 시작하십시오.

        confluent local services connect start
    


    모스키토 설치

    데이터를 게시하고 구독할 수 있는 주제를 노출하는 MQTT 브로커를 실행하려면 mosquitto가 필요합니다.
    https://mosquitto.org/

        brew instal mosquitto
    


    로컬에서 모기 브로커 시작

        brew services start mosquitto
    


    Mosquitto 주제에 게시할 수 있는지 확인

        mosquitto_pub -h localhost -p 1883 -t temperature -m "sample-msg-1"
    


    *주제를 구독하고 메시지를 보내보세요 *

        mosquitto_sub -h localhost -p 1883 -t temperature 
    


    MQTT 커넥터 플러그인 설치

          confluent-hub install confluentinc/kafka-connect-mqtt:latest
    


    연결 서비스 다시 시작 및 시작

          confluent local services connect stop && confluent local services connect start
    


    아래 명령을 사용하여 MQTT 플러그인이 설치되어 있는지 확인하십시오.

          curl -s "http://localhost:8083/connector-plugins"
    


    아래 명령을 사용하여 MQTT 브로커와 KAFKA 연결

    curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
        "name" : "mqtt-source",
    "config" : {
        "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
        "tasks.max" : "1",
        "mqtt.server.uri" : "tcp://127.0.0.1:1883",
        "mqtt.topics" : "temperature",
        "kafka.topic" : "mqtt.temperature",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "confluent.license":""
        }
    }'
    


    커넥터의 상태가 실행 중인지 확인하십시오.

          curl -s "http://localhost:8083/connectors"
          curl -s "http://localhost:8083/connectors/mqtt-source/status"
    


    *KAFKA 토픽 만들기 *

        kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mqtt.temperature
    


    최종적으로 두 명의 소비자를 생성하여 테스트

       mosquitto_sub -h localhost -p 1883 -t temperature
       kafka-console-consumer --bootstrap-server localhost:9092 --topic mqtt.temperature --property print.key=true --from-beginning
    


    이제 데이터를 mqtt 주제에 게시하면 위에서 생성한 두 소비자 모두 mqtt 주제에 푸시된 데이터를 수신하게 됩니다.

    좋은 웹페이지 즐겨찾기