Rust에서 MQTT를 사용하는 방법

Rust는 성능과 안전성, 특히 안전한 동시성을 위해 설계된 다중 패러다임 프로그래밍 언어입니다. Rust는 문법적으로 C++와 유사하지만 참조를 검증하기 위해 빌림 검사기를 사용하여 메모리 안전을 보장할 수 있습니다. Rust는 가비지 컬렉션 없이 메모리 안전성을 달성하며 참조 카운팅은 선택 사항입니다. 1

MQTT은 게시/구독 모델을 기반으로 하는 일종의 경량 IoT 메시징 프로토콜입니다. 매우 적은 코드와 대역폭을 사용하여 네트워크 장비에 대한 실시간 신뢰할 수 있는 메시지 서비스를 제공할 수 있습니다. 또한 IoT, 모바일 인터넷, 스마트 하드웨어, IoV, 전력 및 에너지 산업에서 널리 사용됩니다.

이 글은 주로 Rust 프로젝트에서 paho-mqtt 클라이언트 라이브러리를 사용하는 방법과 클라이언트와 MQTT 브로커 사이에 연결, 구독, 메시징 및 구독 취소 등을 구현하는 방법을 소개합니다.

프로젝트 초기화



이 프로젝트는 개발 및 테스트를 위해 Rust 1.44.0을 사용하고 Cargo 1.44.0 패키지 관리 도구를 사용하여 관리하고 있으며 독자는 다음 명령을 사용하여 Rust의 현재 버전을 확인할 수 있습니다.

~ rustc --version
rustc 1.44.0 (49cae5576 2020-06-01)


MQTT 클라이언트 라이브러리 선택



paho-mqtt는 현재 Rust에서 가장 다재다능하고 널리 사용되는 MQTT 클라이언트입니다. 최신 버전0.7.1은 MQTT v5, 3.1.1, 3.1을 지원하며 표준 TCP, SSL/TLS, WebSockets 및 QoS 지원 0, 1, 2 등을 통한 데이터 전송도 지원합니다.

초기화 프로젝트



다음 명령을 실행하여 mqtt-example 라는 새 Rust 프로젝트를 만듭니다.

~ cargo new mqtt-example
    Created binary (application) `mqtt-example` package


프로젝트에서 Cargo.toml 파일을 편집하고 paho-mqtt 라이브러리의 주소를 dependencies에 추가하고 구독, 게시 코드 파일에 해당하는 바이너리 파일을 지정합니다.

[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }

[[bin]]
name = "sub"
path = "src/sub/main.rs"

[[bin]]
name = "pub"
path = "src/pub/main.rs"


러스트 MQTT의 사용



클라이언트 연결 만들기



이 글에서는 테스트 연결의 MQTT 브로커로 EMQX에서 제공하는 the free public MQTT broker을 사용합니다. 이 서비스는 EMQXMQTT IoT cloud platform를 기반으로 만들어집니다. 서버 접속 정보는 다음과 같습니다.
  • 브로커: broker.emqx.io
  • TCP 포트: 1883
  • 웹 소켓 포트: 8083

  • MQTT 브로커 연결 매개변수 구성



    MQTT 브로커 연결 주소(포트 포함), 주제(여기서는 두 가지 주제를 구성했습니다) 및 클라이언트 ID를 구성합니다.

    const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
    const DFLT_CLIENT:&str = "rust_publish";
    const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
    


    MQTT 연결 코드 작성



    MQTT 연결 코드를 작성하고 사용자 경험을 개선하기 위해 이진 파일을 실행할 때 연결 주소를 명령줄 인수로 전달할 수 있습니다. 일반적으로 클라이언트를 생성한 다음 broker.emqx.io에 연결해야 합니다.

    let host = env::args().nth(1).unwrap_or_else(||
        DFLT_BROKER.to_string()
    );
    
    // Define the set of options for the create.
    // Use an ID for a persistent session.
    let create_opts = mqtt::CreateOptionsBuilder::new()
        .server_uri(host)
        .client_id(DFLT_CLIENT.to_string())
        .finalize();
    
    // Create a client.
    let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
        println!("Error creating the client: {:?}", err);
        process::exit(1);
    });
    
    // Define the set of options for the connection.
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(true)
        .finalize();
    
    // Connect and wait for it to complete or fail.
    if let Err(e) = cli.connect(conn_opts) {
        println!("Unable to connect:\n\t{:?}", e);
        process::exit(1);
    }
    


    메시지 게시



    여기서 우리는 루프의 패리티에 따라 두 주제rust/mqttrust/test에 총 다섯 개의 메시지를 게시합니다.

    for num in 0..5 {
        let content =  "Hello world! ".to_string() + &num.to_string();
        let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
        if num % 2 == 0 {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
            msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
        } else {
            println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
        }
        let tok = cli.publish(msg);
    
                if let Err(e) = tok {
                        println!("Error sending message: {:?}", e);
                        break;
                }
    }
    


    구독하다



    클라이언트가 연결되기 전에 소비자를 초기화해야 합니다. 여기서 우리는 소비자의 메시지 큐를 루프 처리하고 구독한 주제 이름과 수신된 메시지의 내용을 출력합니다.

    fn subscribe_topics(cli: &mqtt::Client) {
        if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
            println!("Error subscribes topics: {:?}", e);
            process::exit(1);
        }
    }
    
    fn main() {
        ...
        // Initialize the consumer before connecting.
        let rx = cli.start_consuming();
        ...
        // Subscribe topics.
        subscribe_topics(&cli);
    
        println!("Processing requests...");
        for msg in rx.iter() {
            if let Some(msg) = msg {
                println!("{}", msg);
            }
            else if !cli.is_connected() {
                if try_reconnect(&cli) {
                    println!("Resubscribe topics...");
                    subscribe_topics(&cli);
                } else {
                    break;
                }
            }
        }
        ...
    }
    


    완전한 코드



    메시지 게시를 위한 코드




    use std::{
        env,
        process,
        time::Duration
    };
    
    extern crate paho_mqtt as mqtt;
    
    const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
    const DFLT_CLIENT:&str = "rust_publish";
    const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
    // Define the qos.
    const QOS:i32 = 1;
    
    fn main() {
        let host = env::args().nth(1).unwrap_or_else(||
            DFLT_BROKER.to_string()
        );
    
        // Define the set of options for the create.
        // Use an ID for a persistent session.
        let create_opts = mqtt::CreateOptionsBuilder::new()
            .server_uri(host)
            .client_id(DFLT_CLIENT.to_string())
            .finalize();
    
        // Create a client.
        let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
            println!("Error creating the client: {:?}", err);
            process::exit(1);
        });
    
        // Define the set of options for the connection.
        let conn_opts = mqtt::ConnectOptionsBuilder::new()
            .keep_alive_interval(Duration::from_secs(20))
            .clean_session(true)
            .finalize();
    
        // Connect and wait for it to complete or fail.
        if let Err(e) = cli.connect(conn_opts) {
            println!("Unable to connect:\n\t{:?}", e);
            process::exit(1);
        }
    
        // Create a message and publish it.
        // Publish message to 'test' and 'hello' topics.
        for num in 0..5 {
            let content =  "Hello world! ".to_string() + &num.to_string();
            let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
            if num % 2 == 0 {
                println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
                msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
            } else {
                println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
            }
            let tok = cli.publish(msg);
    
                    if let Err(e) = tok {
                            println!("Error sending message: {:?}", e);
                            break;
                    }
        }
    
    
        // Disconnect from the broker.
        let tok = cli.disconnect(None);
        println!("Disconnect from the broker");
        tok.unwrap();
    }
    


    구독 코드



    사용자 경험을 개선하기 위해 메시지 구독의 연결이 끊어지고 연결이 다시 설정된 후 주제가 다시 구독됩니다.

    use std::{
        env,
        process,
        thread,
        time::Duration
    };
    
    extern crate paho_mqtt as mqtt;
    
    const DFLT_BROKER:&str = "tcp://broker.emqx.io:1883";
    const DFLT_CLIENT:&str = "rust_subscribe";
    const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
    // The qos list that match topics above.
    const DFLT_QOS:&[i32] = &[0, 1];
    
    // Reconnect to the broker when connection is lost.
    fn try_reconnect(cli: &mqtt::Client) -> bool
    {
        println!("Connection lost. Waiting to retry connection");
        for _ in 0..12 {
            thread::sleep(Duration::from_millis(5000));
            if cli.reconnect().is_ok() {
                println!("Successfully reconnected");
                return true;
            }
        }
        println!("Unable to reconnect after several attempts.");
        false
    }
    
    // Subscribes to multiple topics.
    fn subscribe_topics(cli: &mqtt::Client) {
        if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
            println!("Error subscribes topics: {:?}", e);
            process::exit(1);
        }
    }
    
    fn main() {
        let host = env::args().nth(1).unwrap_or_else(||
            DFLT_BROKER.to_string()
        );
    
        // Define the set of options for the create.
        // Use an ID for a persistent session.
        let create_opts = mqtt::CreateOptionsBuilder::new()
            .server_uri(host)
            .client_id(DFLT_CLIENT.to_string())
            .finalize();
    
        // Create a client.
        let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
            println!("Error creating the client: {:?}", err);
            process::exit(1);
        });
    
        // Initialize the consumer before connecting.
        let rx = cli.start_consuming();
    
        // Define the set of options for the connection.
        let lwt = mqtt::MessageBuilder::new()
            .topic("test")
            .payload("Consumer lost connection")
            .finalize();
        let conn_opts = mqtt::ConnectOptionsBuilder::new()
            .keep_alive_interval(Duration::from_secs(20))
            .clean_session(false)
            .will_message(lwt)
            .finalize();
    
        // Connect and wait for it to complete or fail.
        if let Err(e) = cli.connect(conn_opts) {
            println!("Unable to connect:\n\t{:?}", e);
            process::exit(1);
        }
    
        // Subscribe topics.
        subscribe_topics(&cli);
    
        println!("Processing requests...");
        for msg in rx.iter() {
            if let Some(msg) = msg {
                println!("{}", msg);
            }
            else if !cli.is_connected() {
                if try_reconnect(&cli) {
                    println!("Resubscribe topics...");
                    subscribe_topics(&cli);
                } else {
                    break;
                }
            }
        }
    
        // If still connected, then disconnect now.
        if cli.is_connected() {
            println!("Disconnecting");
            cli.unsubscribe_many(DFLT_TOPICS).unwrap();
            cli.disconnect(None).unwrap();
        }
        println!("Exiting");
    }
    


    실행 및 테스트



    바이너리 파일 컴파일



    다음 명령은 sub 디렉토리에 pub , mqtt-example/target/debug 바이너리 파일을 생성합니다.

    cargo build
    




    메시지 구독


    sub 바이너리 파일을 실행하고 메시지가 게시될 때까지 기다립니다.



    메시지 게시


    pub 바이너리 파일을 실행하면 각각 주제 rust/testrust/mqtt 에 메시지가 게시된 것을 확인할 수 있습니다.


    한편 게시된 메시지는 메시지 구독에서도 볼 수 있습니다.



    지금까지 paho-mqtt 클라이언트를 사용하여 public MQTT broker에 연결하고 테스트 클라이언트와 MQTT 브로커 간의 연결, 메시지 게시 및 구독을 구현했습니다.



    https://en.wikipedia.org/wiki/Rust_(programming_language)

    좋은 웹페이지 즐겨찾기