Rust에서 MQTT를 사용하는 방법
39154 단어 tutorialrustmqttprogramming
MQTT은 게시/구독 모델을 기반으로 하는 일종의 경량 IoT 메시징 프로토콜입니다. 매우 적은 코드와 대역폭을 사용하여 네트워크 장비에 대한 실시간 신뢰할 수 있는 메시지 서비스를 제공할 수 있습니다. 또한 IoT, 모바일 인터넷, 스마트 하드웨어, IoV, 전력 및 에너지 산업에서 널리 사용됩니다.
이 글은 주로 Rust 프로젝트에서 paho-mqtt 클라이언트 라이브러리를 사용하는 방법과 클라이언트와 MQTT 브로커 사이에 연결, 구독, 메시징 및 구독 취소 등을 구현하는 방법을 소개합니다.
프로젝트 초기화
이 프로젝트는 개발 및 테스트를 위해 Rust 1.44.0을 사용하고 Cargo 1.44.0 패키지 관리 도구를 사용하여 관리하고 있으며 독자는 다음 명령을 사용하여 Rust의 현재 버전을 확인할 수 있습니다.
~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)
MQTT 클라이언트 라이브러리 선택
paho-mqtt는 현재 Rust에서 가장 다재다능하고 널리 사용되는 MQTT 클라이언트입니다. 최신 버전
0.7.1
은 MQTT v5, 3.1.1, 3.1을 지원하며 표준 TCP, SSL/TLS, WebSockets 및 QoS 지원 0, 1, 2 등을 통한 데이터 전송도 지원합니다.초기화 프로젝트
다음 명령을 실행하여
mqtt-example
라는 새 Rust 프로젝트를 만듭니다.~ cargo new mqtt-example
Created binary (application) `mqtt-example` package
프로젝트에서
Cargo.toml
파일을 편집하고 paho-mqtt
라이브러리의 주소를 dependencies
에 추가하고 구독, 게시 코드 파일에 해당하는 바이너리 파일을 지정합니다.[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }
[[bin]]
name = "sub"
path = "src/sub/main.rs"
[[bin]]
name = "pub"
path = "src/pub/main.rs"
러스트 MQTT의 사용
클라이언트 연결 만들기
이 글에서는 테스트 연결의 MQTT 브로커로 EMQX에서 제공하는 the free public MQTT broker을 사용합니다. 이 서비스는 EMQXMQTT IoT cloud platform를 기반으로 만들어집니다. 서버 접속 정보는 다음과 같습니다.
MQTT 브로커 연결 매개변수 구성
MQTT 브로커 연결 주소(포트 포함), 주제(여기서는 두 가지 주제를 구성했습니다) 및 클라이언트 ID를 구성합니다.
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
MQTT 연결 코드 작성
MQTT 연결 코드를 작성하고 사용자 경험을 개선하기 위해 이진 파일을 실행할 때 연결 주소를 명령줄 인수로 전달할 수 있습니다. 일반적으로 클라이언트를 생성한 다음
broker.emqx.io
에 연결해야 합니다.let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
메시지 게시
여기서 우리는 루프의 패리티에 따라 두 주제
rust/mqtt
및 rust/test
에 총 다섯 개의 메시지를 게시합니다.for num in 0..5 {
let content = "Hello world! ".to_string() + &num.to_string();
let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
if num % 2 == 0 {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
} else {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
}
let tok = cli.publish(msg);
if let Err(e) = tok {
println!("Error sending message: {:?}", e);
break;
}
}
구독하다
클라이언트가 연결되기 전에 소비자를 초기화해야 합니다. 여기서 우리는 소비자의 메시지 큐를 루프 처리하고 구독한 주제 이름과 수신된 메시지의 내용을 출력합니다.
fn subscribe_topics(cli: &mqtt::Client) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}
fn main() {
...
// Initialize the consumer before connecting.
let rx = cli.start_consuming();
...
// Subscribe topics.
subscribe_topics(&cli);
println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
}
else if !cli.is_connected() {
if try_reconnect(&cli) {
println!("Resubscribe topics...");
subscribe_topics(&cli);
} else {
break;
}
}
}
...
}
완전한 코드
메시지 게시를 위한 코드
use std::{
env,
process,
time::Duration
};
extern crate paho_mqtt as mqtt;
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;
fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
// Create a message and publish it.
// Publish message to 'test' and 'hello' topics.
for num in 0..5 {
let content = "Hello world! ".to_string() + &num.to_string();
let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
if num % 2 == 0 {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
} else {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
}
let tok = cli.publish(msg);
if let Err(e) = tok {
println!("Error sending message: {:?}", e);
break;
}
}
// Disconnect from the broker.
let tok = cli.disconnect(None);
println!("Disconnect from the broker");
tok.unwrap();
}
구독 코드
사용자 경험을 개선하기 위해 메시지 구독의 연결이 끊어지고 연결이 다시 설정된 후 주제가 다시 구독됩니다.
use std::{
env,
process,
thread,
time::Duration
};
extern crate paho_mqtt as mqtt;
const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];
// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{
println!("Connection lost. Waiting to retry connection");
for _ in 0..12 {
thread::sleep(Duration::from_millis(5000));
if cli.reconnect().is_ok() {
println!("Successfully reconnected");
return true;
}
}
println!("Unable to reconnect after several attempts.");
false
}
// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}
fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();
// Create a client.
let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});
// Initialize the consumer before connecting.
let rx = cli.start_consuming();
// Define the set of options for the connection.
let lwt = mqtt::MessageBuilder::new()
.topic("test")
.payload("Consumer lost connection")
.finalize();
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(false)
.will_message(lwt)
.finalize();
// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}
// Subscribe topics.
subscribe_topics(&cli);
println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
}
else if !cli.is_connected() {
if try_reconnect(&cli) {
println!("Resubscribe topics...");
subscribe_topics(&cli);
} else {
break;
}
}
}
// If still connected, then disconnect now.
if cli.is_connected() {
println!("Disconnecting");
cli.unsubscribe_many(DFLT_TOPICS).unwrap();
cli.disconnect(None).unwrap();
}
println!("Exiting");
}
실행 및 테스트
바이너리 파일 컴파일
다음 명령은
sub
디렉토리에 pub
, mqtt-example/target/debug
바이너리 파일을 생성합니다.cargo build
메시지 구독
sub
바이너리 파일을 실행하고 메시지가 게시될 때까지 기다립니다.메시지 게시
pub
바이너리 파일을 실행하면 각각 주제 rust/test
및 rust/mqtt
에 메시지가 게시된 것을 확인할 수 있습니다.한편 게시된 메시지는 메시지 구독에서도 볼 수 있습니다.
지금까지 paho-mqtt 클라이언트를 사용하여 public MQTT broker에 연결하고 테스트 클라이언트와 MQTT 브로커 간의 연결, 메시지 게시 및 구독을 구현했습니다.
https://en.wikipedia.org/wiki/Rust_(programming_language) ↩
Reference
이 문제에 관하여(Rust에서 MQTT를 사용하는 방법), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/emqx/how-to-use-mqtt-in-rust-5fne텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)