Java에서 MQTT를 사용하는 방법

26013 단어 javagithubmqtttutorial
MQTT은 사물 인터넷(IoT)용 OASIS 표준 메시징 프로토콜입니다. 작은 코드 풋프린트와 최소 네트워크 대역폭으로 원격 장치를 연결하는 데 이상적인 초경량 발행/구독 메시징 전송으로 설계되었습니다. 오늘날 MQTT는 자동차, 제조, 통신, 석유 및 가스 등과 같은 다양한 산업에서 사용됩니다.

이 글은 Java 프로젝트에서 MQTT를 사용하여 클라이언트와 브로커 간의 연결, 구독, 구독 취소, 게시 및 메시지 수신 기능을 구현하는 방법을 소개합니다.

종속성 추가



이 문서의 개발 환경은 다음과 같습니다.
  • 빌드 도구: Maven
  • IDE: IntelliJ IDEA
  • 자바: JDK 1.8.0

  • Java 언어에서 가장 널리 사용되는 MQTT 클라이언트 라이브러리인 Eclipse Paho Java Client을 클라이언트로 사용합니다.
    pom.xml 파일에 다음 종속성을 추가합니다.

    <dependencies>
       <dependency>
           <groupId>org.eclipse.paho</groupId>
           <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
           <version>1.2.5</version>
       </dependency>
    </dependencies>
    


    MQTT 연결 생성



    MQTT 브로커



    이 문서에서는 public MQTT broker을 기반으로 생성된 EMQX Cloud을 사용합니다. 서버 접속 정보는 다음과 같습니다.
  • 브로커: broker.emqx.io
  • TCP 포트: 1883
  • SSL/TLS 포트: 8883

  • 연결하다



    MQTT의 기본 연결 매개변수를 설정합니다. 사용자 이름과 암호는 선택 사항입니다.

    String broker = "tcp://broker.emqx.io:1883";
    // TLS/SSL
    // String broker = "ssl://broker.emqx.io:8883";
    String username = "emqx";
    String password = "public";
    String clientid = "publish_client";
    


    그런 다음 MQTT 클라이언트를 만들고 브로커에 연결합니다.

    MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(username);
    options.setPassword(password.toCharArray());
    client.connect(options);
    


    지침
  • MqttClient: MqttClient는 MQTT 작업이 완료되면 제어를 차단하고 응용 프로그램으로 반환하는 일련의 메서드를 제공합니다.
  • MqttClientPersistence: 전송 중인 아웃바운드 및 인바운드 메시지를 저장하여 지정된 QoS로 전달할 수 있도록 하는 데 사용되는 영구 데이터 저장소를 나타냅니다.
  • MqttConnectOptions: 클라이언트가 서버에 연결하는 방법을 제어하는 ​​옵션 세트를 보유합니다. 다음은 몇 가지 일반적인 방법입니다.
  • setUserName: 연결에 사용할 사용자 이름을 설정합니다.
  • setPassword: 연결에 사용할 암호를 설정합니다.
  • setCleanSession: 클라이언트와 서버가 다시 시작하고 다시 연결할 때 상태를 기억해야 하는지 여부를 설정합니다.
  • setKeepAliveInterval: "연결 유지"간격을 설정합니다.
  • setConnectionTimeout: 연결 시간 초과 값을 설정합니다.
  • setAutomaticReconnect: 연결이 끊어진 경우 클라이언트가 자동으로 서버에 다시 연결을 시도할지 여부를 설정합니다.


  • TLS/SSL로 연결



    TLS/SSL 연결에 자체 서명된 인증서를 사용하려면 pom.xml 파일에 bcpkix-jdk15on을 추가합니다.

    <!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on -->
    <dependency>
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcpkix-jdk15on</artifactId>
       <version>1.70</version>
    </dependency>
    
    


    그런 다음 다음 코드를 사용하여 SSLUtils.java 파일을 만듭니다.

    package io.emqx.mqtt;
    
    import org.bouncycastle.jce.provider.BouncyCastleProvider;
    import org.bouncycastle.openssl.PEMKeyPair;
    import org.bouncycastle.openssl.PEMParser;
    import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
    
    import javax.net.ssl.KeyManagerFactory;
    import javax.net.ssl.SSLContext;
    import javax.net.ssl.SSLSocketFactory;
    import javax.net.ssl.TrustManagerFactory;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.FileReader;
    import java.security.KeyPair;
    import java.security.KeyStore;
    import java.security.Security;
    import java.security.cert.CertificateFactory;
    import java.security.cert.X509Certificate;
    
    public class SSLUtils {
       public static SSLSocketFactory getSocketFactory(final String caCrtFile,
                                                       final String crtFile, final String keyFile, final String password)
               throws Exception {
           Security.addProvider(new BouncyCastleProvider());
    
           // load CA certificate
           X509Certificate caCert = null;
    
           FileInputStream fis = new FileInputStream(caCrtFile);
           BufferedInputStream bis = new BufferedInputStream(fis);
           CertificateFactory cf = CertificateFactory.getInstance("X.509");
    
           while (bis.available() > 0) {
               caCert = (X509Certificate) cf.generateCertificate(bis);
          }
    
           // load client certificate
           bis = new BufferedInputStream(new FileInputStream(crtFile));
           X509Certificate cert = null;
           while (bis.available() > 0) {
               cert = (X509Certificate) cf.generateCertificate(bis);
          }
    
           // load client private key
           PEMParser pemParser = new PEMParser(new FileReader(keyFile));
           Object object = pemParser.readObject();
           JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
           KeyPair key = converter.getKeyPair((PEMKeyPair) object);
           pemParser.close();
    
           // CA certificate is used to authenticate server
           KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
           caKs.load(null, null);
           caKs.setCertificateEntry("ca-certificate", caCert);
           TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");
           tmf.init(caKs);
    
           // client key and certificates are sent to server so it can authenticate
           KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
           ks.load(null, null);
           ks.setCertificateEntry("certificate", cert);
           ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
                   new java.security.cert.Certificate[]{cert});
           KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
                  .getDefaultAlgorithm());
           kmf.init(ks, password.toCharArray());
    
           // finally, create SSL socket factory
           SSLContext context = SSLContext.getInstance("TLSv1.2");
           context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
    
           return context.getSocketFactory();
      }
    }
    

    options를 다음과 같이 설정합니다.

    String broker = "ssl://broker.emqx.io:8883";
    // Set socket factory
    String caFilePath = "/cacert.pem";
    String clientCrtFilePath = "/client.pem";
    String clientKeyFilePath = "/client.key";
    SSLSocketFactory socketFactory = getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "");
    options.setSocketFactory(socketFactory);
    


    MQTT 메시지 게시



    주제 PublishSampleHello MQTT 메시지를 게시할 클래스mqtt/test를 만듭니다.

    package io.emqx.mqtt;
    
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class PublishSample {
    
       public static void main(String[] args) {
    
           String broker = "tcp://broker.emqx.io:1883";
           String topic = "mqtt/test";
           String username = "emqx";
           String password = "public";
           String clientid = "publish_client";
           String content = "Hello MQTT";
           int qos = 0;
    
           try {
               MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
               MqttConnectOptions options = new MqttConnectOptions();
               options.setUserName(username);
               options.setPassword(password.toCharArray());
               options.setConnectionTimeout(60);
          options.setKeepAliveInterval(60);
               // connect
               client.connect(options);
               // create message and setup QoS
               MqttMessage message = new MqttMessage(content.getBytes());
               message.setQos(qos);
               // publish message
               client.publish(topic, message);
               System.out.println("Message published");
               System.out.println("topic: " + topic);
               System.out.println("message content: " + content);
               // disconnect
               client.disconnect();
               // close client
               client.close();
          } catch (MqttException e) {
               throw new RuntimeException(e);
          }
      }
    }
    
    


    구독하다



    주제SubscribeSample를 구독할 클래스mqtt/test를 만듭니다.

    package io.emqx.mqtt;
    
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class SubscribeSample {
       public static void main(String[] args) {
           String broker = "tcp://broker.emqx.io:1883";
           String topic = "mqtt/test";
           String username = "emqx";
           String password = "public";
           String clientid = "subscribe_client";
           int qos = 0;
    
           try {
               MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
               // connect options
               MqttConnectOptions options = new MqttConnectOptions();
               options.setUserName(username);
               options.setPassword(password.toCharArray());
               options.setConnectionTimeout(60);
          options.setKeepAliveInterval(60);
               // setup callback
               client.setCallback(new MqttCallback() {
    
                   public void connectionLost(Throwable cause) {
                       System.out.println("connectionLost: " + cause.getMessage());
                  }
    
                   public void messageArrived(String topic, MqttMessage message) {
                       System.out.println("topic: " + topic);
                       System.out.println("Qos: " + message.getQos());
                       System.out.println("message content: " + new String(message.getPayload()));
    
                  }
    
                   public void deliveryComplete(IMqttDeliveryToken token) {
                       System.out.println("deliveryComplete---------" + token.isComplete());
                  }
    
              });
               client.connect(options);
               client.subscribe(topic, qos);
          } catch (Exception e) {
               e.printStackTrace();
          }
      }
    }
    
    


    Mqtt콜백:
  • connectionLost(Throwable cause): 이 메소드는 서버와의 연결이 끊어졌을 때 호출됩니다.
  • messageArrived(String topic, MqttMessage message): 서버에서 메시지가 도착하면 이 메소드가 호출됩니다.
  • deliveryComplete(IMqttDeliveryToken 토큰): 메시지 배달이 완료되고 모든 승인이 수신되면 호출됩니다.

  • 테스트



    그런 다음 SubscribeSample를 실행하여 mqtt/test 주제를 구독합니다. 그런 다음 PublishSample를 실행하여 mqtt/test 주제에 메시지를 게시합니다. 게시자가 메시지를 성공적으로 게시하고 구독자가 메시지를 받는 것을 볼 수 있습니다.



    이제 Paho Java 클라이언트를 MQTT 클라이언트로 사용하여 public MQTT server에 연결하고 메시지 게시 및 구독을 구현했습니다.

    전체 코드는 https://github.com/emqx/MQTT-Client-Examples/tree/master/mqtt-client-Java에서 확인할 수 있습니다.

    https://www.emqx.com에 원래 게시되었습니다.

    좋은 웹페이지 즐겨찾기