Node.js로 MQTT 메시지 기록을 유지하는 방법

MQTT protocol은 IoT 애플리케이션에서 매우 인기가 있습니다. 서로 다른 데이터 소스를 연결하는 간단한 방법입니다.
게시/구독 모델을 사용하여 애플리케이션과 때로는 MQTT 데이터의 기록을 보관하고 싶을 수도 있습니다.
모델 교육, 진단 또는 지표에 사용하십시오. 데이터 소스가 다른 형식의 데이터를 제공하는 경우
부동 소수점의 시계열로 해석되지 않으면 Reduct Storage가 필요합니다.

어떻게 작동하는지 알아보기 위해 간단한 MQTT 애플리케이션을 만들어 보겠습니다.

전제 조건



이 사용 예에는 다음과 같은 요구 사항이 있습니다.
  • 리눅스 AMD64
  • Docker 및 Docker Compose
  • NodeJS >= 16

  • Ubuntu 사용자인 경우 다음 명령을 사용하여 종속성을 설치합니다.

    $ sudo apt-get update
    $ sudo apt-get install docker-compose nodejs
    


    MQTT 브로커 실행 및 Docker Compose로 저장소 축소



    브로커와 저장소를 실행하는 가장 쉬운 방법은 Docker Compose를 사용하는 것입니다. 따라서 서비스가 포함된 예제 폴더에 docker-compose.yml 파일을 생성해야 합니다.

    version: "3"
    services:
      reduct-storage:
        image: reductstorage/engine:latest
        volumes:
          - ./data:/data
        ports:
          - "8383:8383"
    
      mqtt-broker:
        image: eclipse-mosquitto:1.6
        ports:
          - "1883:1883"
    


    그런 다음 구성을 실행합니다.

    docker-compose up
    


    Docker Compose는 이미지를 다운로드하고 컨테이너를 실행했습니다. MQTT 프로토콜용으로 포트 1883을, Reduct HTTP API용으로 포트 8383을 게시했다는 점에 유의하십시오.

    NodeJS 스크립트 작성



    이제 코드로 손을 더럽힐 준비가 되었습니다. NPM 패키지를 초기화하고 설치MQTT Client하고
    JavaScript Client SDK .

    $ npm init
    $ npm install --save reduct-js async-mqtt 
    


    모든 종속 항목이 설치되면 스크립트를 작성할 수 있습니다.

    const MQTT = require('async-mqtt');
    const {Client} = require('reduct-js');
    
    MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
      await mqttClient.subscribe('mqtt_data');
    
      const reductClient = new Client('http://localhost:8383');
      const bucket = await reductClient.getOrCreateBucket('mqtt');
    
      mqttClient.on('message', async (topic, msg) => {
        const data = msg.toString();
        await bucket.write('mqtt_data', data);
        console.log('Received message "%s" from topic "%s" was written', data,
            topic);
      });
    
    }).catch(error => console.error(error));
    


    코드를 자세히 살펴보겠습니다. 먼저 MQTT 브로커에 연결하고 주제를 구독해야 합니다. 주제 이름은 생산자가 알아야 하는 임의의 문자열입니다. 우리의 경우 mqtt_data입니다.

    
    MQTT.connectAsync('tcp://localhost:1883').then(async (mqttClient) => {
      await mqttClient.subscribe('mqtt_data');
    
      // rest of code
    }).catch(error => console.error(error));
    


    MQTT 연결이 성공하면 Reduct Storage 처리를 시작할 수 있습니다. 거기에 데이터 쓰기를 시작하려면 버킷이 필요합니다. 이름이 mqtt인 버킷을 생성하거나 기존 버킷을 가져옵니다.

    const reductClient = new Client('http://localhost:8383');
    const bucket = await reductClient.getOrCreateBucket('mqtt');
    


    마지막 단계는 수신된 메시지를 저장소에 쓰는 것입니다. 콜백을 사용해야 합니다.
    for event message , 그것을 잡으려면. 그런 다음 항목mqtt_data에 메시지를 작성합니다.

    mqttClient.on('message', async (topic, msg) => {
      const data = msg.toString();
      await bucket.write('mqtt_data', data);
      console.log('Received message "%s" from topic "%s" was written', data,
          topic);
    });
    

    bucket.write를 호출할 때 항목이 아직 없으면 버킷에 항목을 만듭니다. 그런 다음 현재 타임스탬프로 항목에 데이터를 씁니다. 이제 우리의 MQTT 데이터는 스토리지에서 안전하고 건전하며 동일한 것을 사용하여 액세스할 수 있습니다SDK.

    MQTT 주제에 데이터 게시



    스크립트를 실행하면 MQTT의 데이터가 없기 때문에 아무 작업도 수행하지 않습니다. 주제에 무언가를 게시해야 합니다.mqtt_data . 나는 mosquitto_pub을 사용하는 것을 선호합니다. Ubuntu 사용자의 경우 mosquitto-clients 패키지의 일부입니다.

    $ sudo apt-get install mosquitto-clients
    $ mosuitto_pub -t mqtt_data -m "Hello, world!"
    


    Reduction Storage에서 데이터 가져오기



    이제 MQTT에서 데이터를 가져와서 Reduct 저장소에 쓰는 방법을 알았지만 저장소에서 데이터를 읽으려면 약간의 NodejS 스크립트가 필요합니다.

    const {Client} = require('reduct-js');
    
    const client = new Client('http://localhost:8383');
    
    client.getBucket('mqtt').then(async (bucket) => {
      let data = await bucket.read('mqtt_data');
      console.log('Last record: %s', data);
    
      // Get data for the last hour
      const stopTime = BigInt(Date.now() * 1000);
      const startTime = stopTime - 3_600_000_000n;
    
      const records = await bucket.list('mqtt_data', startTime, stopTime);
      for (const record of records) {
        data = await bucket.read('mqtt_data', record.timestamp);
        console.log('Found record "%s" with timestamp "%d"', data, record.timestamp);
      }
    
    }).catch(error => console.error(error));
    
    


    항목의 최신 레코드를 읽는 것은 매우 쉽습니다.

    let data = await bucket.read('mqtt_data');
    


    그러나 임의의 레코드를 가져오려면 타임스탬프를 알아야 합니다. 일반적인 사용 사례는 일부 데이터를 읽는 것입니다.
    시간 간격. 해당 간격에 대한 레코드의 타임스탬프를 가져오려면 메서드Bucket.list를 사용해야 합니다. 그런 다음 Bucket.read를 사용하여 읽을 수 있습니다.

    const stopTime = BigInt(Date.now() * 1000);
    const startTime = stopTime - 3_600_000_000n;
    
    for await (const record of bucket.query('mqtt_data', startTime, stopTime)) {
        data = await record.read();
        console.log('Found record "%s" with timestamp "%d"', data, record.time);
    }
    


    저장소는 마이크로초 정밀도의 타임스탬프를 사용하므로 Date 클래스 및 number 유형을 사용할 수 없습니다.
    우리가 BigInt를 사용하는 이유는 무엇입니까?

    결론



    보시다시피 MQTT 프로토콜과 Reduct Storage는 NodeJS에서 매우 쉽게 함께 사용할 수 있는 매우 간단한 기술입니다.
    예제here의 소스 코드를 찾을 수 있습니다. 실행에 문제가 있거나 질문이 있는 경우. 부담없이 만드세요 an issue .

    이 튜토리얼이 도움이 되었기를 바랍니다. 감사!

    연결


  • Reduct Storage
  • JavaScript Client SDK
  • Mosquitto MQTT Broker
  • Full example on GitHub
  • 좋은 웹페이지 즐겨찾기