Spark에서 Kafka를 통해 외부 데이터를 읽습니다. "Kafka Connect MQTT"
Kafka 커넥터를 사용하기 전에 다음을 확인하십시오.
1. connect-standalone.properties 편집
Kafka에서 MQTT에 연결할 때 connect-standalone.properties를 사용합니다.
다음과 같이 connect-standalone.properties를 편집해야 합니다.
cd /usr/local/lib/kafka/config
vi connect-standalone.properties
# 編集
bootstrap.servers=192.168.0.97:9092,192.168.0.98:9092,192.168.0.99:9092
offset.storage.file.filename=/var/lib/connect.offsets
옵션 사용법
옵션
해설
bootstrap.servers
각 노드의 IP로 설정합니다. 포트는 변함없이.
offset.storage.file.filename
오프셋 정보 저장 파일입니다. 이 파일은 한 번 빈 파일로 만들어야 합니다. 쓰기 권한 부여도 필수입니다.
2. Kafka 커넥터 라이브러리 설정
Kafka 커넥터를 이용하여 MQTT 브로커나 MQTT의 센서 데이터를 Apache Kafka에 흘려 갑니다.
이 Kafka 커넥터는 Apache Kafka의 공식 커넥터가 아니라 커뮤니티에서 나온 것입니다.
Github에서 소스 코드를 볼 때 다음 링크를 사용하십시오.
kafka-connect-mqtt 리소스 다운로드 대상
Kafka Connect MQTT 라이브러리를 Kafka libs에 복사합니다.
대상 Jars:
kafka-connect-mqtt-1.0-SNAPSHOT.jar
org.eclipse.paho.client.mqttv3-1.0.2.jar
복사 대상:
/usr/local/lib/kafka/libs
Kafka Connect MQTT용 구성 파일을 Kafka config에 복사합니다.
대상 File:
mqtt.properties
복사 대상:
/usr/local/lib/kafka/config
mqtt.properties의 내용
##
# Basic
##
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
##
#Settings
##
# Where to put processed messages - default to mqtt
kafka.topic=topic-mqtt-kafka
# What client id to use - defaults to null
which means random client_id
mqtt.client_id=mqtt-kafka-99
# Use clean session in connection? - default true
mqtt.clean_session=true
# What mqtt connection timeout to use - defaults to 30
seconds
mqtt.connection_timeout=30
# What mqtt connection keep alive to use - defaults to 60
seconds
mqtt.keep_alive_interval=60
# Mqtt broker address to use - defaults to tcp://localhost:1883
# if using TLS then certs can be used
mqtt.server_uris=tcp://192.168.0.99:1883
# Mqtt topic to listen to - defaults to #
(wildcard - all)
mqtt.topic=topic-mqtt
# 인코딩이 없어지는 설정
key.converter.schemas.enable=false
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
Kafka 커넥터 라이브러리 설정은 이상으로 완료되었습니다.
이상의 설정이 끝나면 MQTT 브로커의 토픽 「topic-mqtt」->Apache Kafka의 토픽 「topic-mqtt-kafka」에 흘려 들여올 수 있었을 것입니다.
3. Kafka 커넥터 MQTT 센서 데이터 획득 절차
Kafka 커넥터가 설정되었으며 다음 절차에 따라
Kafka의 소비자 MQTT 브로커에서 흘러 들어온 메시지를 받으십시오.
Kafka 클러스터 시작
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
중지: zkServer.sh stop
클러스터의 경우 모든 노드의 Zookeeper를 시작해야 합니다.
Kafka 시작
#Zookeeper起動
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
#Kafkaの起動
cd /usr/local/lib/kafka
bin/kafka-server-start.sh -daemon config/server.properties
topic-mqtt-topic 생성(기존의 경우 필요 없음)
bin/kafka-topics.sh --create --topic topic-mqtt-kafka --replication-factor 1 --partitions 1 --zookeeper 192.168.0.99:2181
Kafka 커넥터 MQTT에 연결
bin/connect-standalone.sh config/connect-standalone.properties config.mqtt.properties
mosquitto에서 전송 (MQTT 전송 당)
cd /usr/bin/
mosquitto -c /etc/mosquitto/mosquitto.conf -v
mosquitto_pub -h 192.168.0.99 -p 1883 -t topic-mqtt -m "Hello Kafka!"
Kafka에서 수신
cd /usr/local/lib/kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.99:9092 --topic topic-mqtt-karka --from-beginning
아래의 표시를 확인할 수 있으면 수신이 됩니다.
( 'Hello Kafka!')
☆★☆ 다음 문장으로 ☆★☆
Spark에서 Kafka를 통해 외부 데이터를 읽습니다. "SparkStreaming으로 데이터 가져 오기"
Reference
이 문제에 관하여(Spark에서 Kafka를 통해 외부 데이터를 읽습니다. "Kafka Connect MQTT"), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/gakuseikai/items/1cef77ac6eb960bb08bd
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
cd /usr/local/lib/kafka/config
vi connect-standalone.properties
# 編集
bootstrap.servers=192.168.0.97:9092,192.168.0.98:9092,192.168.0.99:9092
offset.storage.file.filename=/var/lib/connect.offsets
Kafka 커넥터를 이용하여 MQTT 브로커나 MQTT의 센서 데이터를 Apache Kafka에 흘려 갑니다.
이 Kafka 커넥터는 Apache Kafka의 공식 커넥터가 아니라 커뮤니티에서 나온 것입니다.
Github에서 소스 코드를 볼 때 다음 링크를 사용하십시오.
kafka-connect-mqtt 리소스 다운로드 대상
Kafka Connect MQTT 라이브러리를 Kafka libs에 복사합니다.
대상 Jars:
kafka-connect-mqtt-1.0-SNAPSHOT.jar
org.eclipse.paho.client.mqttv3-1.0.2.jar
복사 대상:
/usr/local/lib/kafka/libs
Kafka Connect MQTT용 구성 파일을 Kafka config에 복사합니다.
대상 File:
mqtt.properties
복사 대상:
/usr/local/lib/kafka/config
mqtt.properties의 내용
##
# Basic
##
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
##
#Settings
##
# Where to put processed messages - default to
mqtt
kafka.topic=topic-mqtt-kafka# What client id to use - defaults to
null
which means random client_idmqtt.client_id=mqtt-kafka-99
# Use clean session in connection? - default
true
mqtt.clean_session=true# What mqtt connection timeout to use - defaults to
30
secondsmqtt.connection_timeout=30
# What mqtt connection keep alive to use - defaults to
60
secondsmqtt.keep_alive_interval=60
# Mqtt broker address to use - defaults to
tcp://localhost:1883
# if using TLS then certs can be usedmqtt.server_uris=tcp://192.168.0.99:1883
# Mqtt topic to listen to - defaults to
#
(wildcard - all)mqtt.topic=topic-mqtt
# 인코딩이 없어지는 설정
key.converter.schemas.enable=false
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
Kafka 커넥터 라이브러리 설정은 이상으로 완료되었습니다.
이상의 설정이 끝나면 MQTT 브로커의 토픽 「topic-mqtt」->Apache Kafka의 토픽 「topic-mqtt-kafka」에 흘려 들여올 수 있었을 것입니다.
3. Kafka 커넥터 MQTT 센서 데이터 획득 절차
Kafka 커넥터가 설정되었으며 다음 절차에 따라
Kafka의 소비자 MQTT 브로커에서 흘러 들어온 메시지를 받으십시오.
Kafka 클러스터 시작
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
중지: zkServer.sh stop
클러스터의 경우 모든 노드의 Zookeeper를 시작해야 합니다.
Kafka 시작
#Zookeeper起動
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
#Kafkaの起動
cd /usr/local/lib/kafka
bin/kafka-server-start.sh -daemon config/server.properties
topic-mqtt-topic 생성(기존의 경우 필요 없음)
bin/kafka-topics.sh --create --topic topic-mqtt-kafka --replication-factor 1 --partitions 1 --zookeeper 192.168.0.99:2181
Kafka 커넥터 MQTT에 연결
bin/connect-standalone.sh config/connect-standalone.properties config.mqtt.properties
mosquitto에서 전송 (MQTT 전송 당)
cd /usr/bin/
mosquitto -c /etc/mosquitto/mosquitto.conf -v
mosquitto_pub -h 192.168.0.99 -p 1883 -t topic-mqtt -m "Hello Kafka!"
Kafka에서 수신
cd /usr/local/lib/kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.99:9092 --topic topic-mqtt-karka --from-beginning
아래의 표시를 확인할 수 있으면 수신이 됩니다.
( 'Hello Kafka!')
☆★☆ 다음 문장으로 ☆★☆
Spark에서 Kafka를 통해 외부 데이터를 읽습니다. "SparkStreaming으로 데이터 가져 오기"
Reference
이 문제에 관하여(Spark에서 Kafka를 통해 외부 데이터를 읽습니다. "Kafka Connect MQTT"), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://qiita.com/gakuseikai/items/1cef77ac6eb960bb08bd
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
#Zookeeper起動
cd /usr/local/lib/zookeeper
bin/zkServer.sh start
#Kafkaの起動
cd /usr/local/lib/kafka
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-topics.sh --create --topic topic-mqtt-kafka --replication-factor 1 --partitions 1 --zookeeper 192.168.0.99:2181
bin/connect-standalone.sh config/connect-standalone.properties config.mqtt.properties
cd /usr/bin/
mosquitto -c /etc/mosquitto/mosquitto.conf -v
mosquitto_pub -h 192.168.0.99 -p 1883 -t topic-mqtt -m "Hello Kafka!"
cd /usr/local/lib/kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.99:9092 --topic topic-mqtt-karka --from-beginning
Reference
이 문제에 관하여(Spark에서 Kafka를 통해 외부 데이터를 읽습니다. "Kafka Connect MQTT"), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/gakuseikai/items/1cef77ac6eb960bb08bd텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)