Spark에서 Kafka를 통해 외부 데이터를 읽습니다. "Kafka Connect MQTT"

이 장에서는 Kafka 커넥터 라이브러리를 사용하여 MQTT 주제에 액세스하고 MQTT에 생성된 메시지를 Kafka 주제에 기록하는 프로세스에 대해 설명합니다.
Kafka 커넥터를 사용하기 전에 다음을 확인하십시오.
  • Kafka 클러스터의 운영 환경이 설정되어 있는지 확인하십시오.
  • MQTT 브로커를 준비해야 합니다.

  • 1. connect-standalone.properties 편집



    Kafka에서 MQTT에 연결할 때 connect-standalone.properties를 사용합니다.
    다음과 같이 connect-standalone.properties를 편집해야 합니다.
    cd /usr/local/lib/kafka/config
    vi connect-standalone.properties
    # 編集
    bootstrap.servers=192.168.0.97:9092,192.168.0.98:9092,192.168.0.99:9092
    offset.storage.file.filename=/var/lib/connect.offsets
    

    옵션 사용법


    옵션
    해설


    bootstrap.servers

    각 노드의 IP로 설정합니다. 포트는 변함없이.

    offset.storage.file.filename
    오프셋 정보 저장 파일입니다. 이 파일은 한 번 빈 파일로 만들어야 합니다. 쓰기 권한 부여도 필수입니다.


    2. Kafka 커넥터 라이브러리 설정



    Kafka 커넥터를 이용하여 MQTT 브로커나 MQTT의 센서 데이터를 Apache Kafka에 흘려 갑니다.
    이 Kafka 커넥터는 Apache Kafka의 공식 커넥터가 아니라 커뮤니티에서 나온 것입니다.
    Github에서 소스 코드를 볼 때 다음 링크를 사용하십시오.
    kafka-connect-mqtt 리소스 다운로드 대상

    Kafka Connect MQTT 라이브러리를 Kafka libs에 복사합니다.
    대상 Jars:

    kafka-connect-mqtt-1.0-SNAPSHOT.jar
    org.eclipse.paho.client.mqttv3-1.0.2.jar

    복사 대상:

    /usr/local/lib/kafka/libs

    Kafka Connect MQTT용 구성 파일을 Kafka config에 복사합니다.
    대상 File:

    mqtt.properties

    복사 대상:

    /usr/local/lib/kafka/config

    mqtt.properties의 내용

    ##
    # Basic
    ##
    name=mqtt
    connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
    tasks.max=1
    ##
    #Settings
    ##
    # Where to put processed messages - default to mqttkafka.topic=topic-mqtt-kafka
    # What client id to use - defaults to null which means random client_id
    mqtt.client_id=mqtt-kafka-99
    # Use clean session in connection? - default truemqtt.clean_session=true
    # What mqtt connection timeout to use - defaults to 30 seconds
    mqtt.connection_timeout=30
    # What mqtt connection keep alive to use - defaults to 60 seconds
    mqtt.keep_alive_interval=60
    # Mqtt broker address to use - defaults to tcp://localhost:1883# if using TLS then certs can be used
    mqtt.server_uris=tcp://192.168.0.99:1883
    # Mqtt topic to listen to - defaults to # (wildcard - all)
    mqtt.topic=topic-mqtt
    # 인코딩이 없어지는 설정
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    key.converter=org.apache.kafka.connect.json.JsonConverter

    Kafka 커넥터 라이브러리 설정은 이상으로 완료되었습니다.
    이상의 설정이 끝나면 MQTT 브로커의 토픽 「topic-mqtt」->Apache Kafka의 토픽 「topic-mqtt-kafka」에 흘려 들여올 수 있었을 것입니다.

    3. Kafka 커넥터 MQTT 센서 데이터 획득 절차



    Kafka 커넥터가 설정되었으며 다음 절차에 따라
    Kafka의 소비자 MQTT 브로커에서 흘러 들어온 메시지를 받으십시오.
    Kafka 클러스터 시작
    cd /usr/local/lib/zookeeper
    bin/zkServer.sh start
    

    중지: zkServer.sh stop
    클러스터의 경우 모든 노드의 Zookeeper를 시작해야 합니다.
    Kafka 시작
    #Zookeeper起動
    cd /usr/local/lib/zookeeper
    bin/zkServer.sh start
    #Kafkaの起動
    cd /usr/local/lib/kafka
    bin/kafka-server-start.sh -daemon config/server.properties
    

    topic-mqtt-topic 생성(기존의 경우 필요 없음)
    bin/kafka-topics.sh --create --topic topic-mqtt-kafka --replication-factor 1 --partitions 1 --zookeeper 192.168.0.99:2181
    

    Kafka 커넥터 MQTT에 연결
    bin/connect-standalone.sh config/connect-standalone.properties config.mqtt.properties
    

    mosquitto에서 전송 (MQTT 전송 당)
    cd /usr/bin/
    mosquitto -c /etc/mosquitto/mosquitto.conf -v
    mosquitto_pub -h 192.168.0.99 -p 1883 -t topic-mqtt -m "Hello Kafka!"
    

    Kafka에서 수신
    cd /usr/local/lib/kafka
    bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.99:9092 --topic topic-mqtt-karka --from-beginning
    

    아래의 표시를 확인할 수 있으면 수신이 됩니다.
    ( 'Hello Kafka!')

    ☆★☆ 다음 문장으로 ☆★☆
    Spark에서 Kafka를 통해 외부 데이터를 읽습니다. "SparkStreaming으로 데이터 가져 오기"

    좋은 웹페이지 즐겨찾기