您的位置:

深入了解ActiveMQ MQTT消息通信

一、MQTT协议简介

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布/订阅模式的消息传输协议,具有开放、简单和易于实现的特点。

MQTT协议可以用于物联网、移动设备等多种场景,主要用于设备与设备之间的通讯,将设备产生的数据传输到云平台或控制其他设备。

MQTT协议采用基于TCP/IP的传输方式,支持Quality of Service(QoS)0,1,2等多种服务质量级别,可以支持百万级别的TCP/IP连接,是当前IoT领域最为流行的协议之一。

二、ActiveMQ简介

ActiveMQ是一个开源的、基于Java的消息中间件,支持多种消息传输协议,例如TCP/IP、STOMP、WebSocket等。

ActiveMQ采用JMS(Java Message Service)作为消息API,可以与Java应用程序无缝集成,拥有高可伸缩性、高可用性、容错性等优点。

三、ActiveMQ支持的MQTT版本

ActiveMQ从版本5.9.0开始支持MQTT 3.1.1版本。

在ActiveMQ中,MQTT消息协议是作为插件存在的,因此需要使用ActiveMQ的命令来进行启用和管理。

四、使用ActiveMQ MQTT传输消息

我们可以通过ActiveMQ MQTT插件来实现基于MQTT协议的消息传输,下面是一个简单的示例,展示如何使用ActiveMQ MQTT发送和接收消息。

1、启用ActiveMQ MQTT插件


  <transportConnectors>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?transport.defaultKeepAlive=60000&transport.closeAsync=true"/>
  </transportConnectors>

在ActiveMQ的配置文件activemq.xml中,可以添加上述代码来启用MQTT插件。

2、使用ActiveMQ MQTT发送消息


  import org.eclipse.paho.client.mqttv3.MqttClient;
  import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  import org.eclipse.paho.client.mqttv3.MqttException;
  import org.eclipse.paho.client.mqttv3.MqttMessage;
   
  public class MQTTSender {
   
    public static void main(String[] args) {
      String topic        = "MQTT Examples";
      String content      = "MQTT Test";
      int qos             = 2;
      String broker       = "tcp://localhost:1883";
      String clientId     = "JavaMQTTSender";
      MemoryPersistence persistence = new MemoryPersistence();
   
      try {
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        System.out.println("Connecting to broker: "+broker);
        sampleClient.connect(connOpts);
        System.out.println("Connected");
        System.out.println("Publishing message: "+content);
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        sampleClient.publish(topic, message);
        System.out.println("Message published");
        sampleClient.disconnect();
        System.out.println("Disconnected");
        System.exit(0);
      } catch(MqttException me) {
        System.out.println("reason "+me.getReasonCode());
        System.out.println("msg "+me.getMessage());
        System.out.println("loc "+me.getLocalizedMessage());
        System.out.println("cause "+me.getCause());
        System.out.println("excep "+me);
        me.printStackTrace();
      }
    }
   
  }

上述代码中,我们使用eclipse的paho.mqttv3包来实现消息发送。运行该代码后,可以在ActiveMQ控制台查看推送的消息。

3、使用ActiveMQ MQTT接收消息


  import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  import org.eclipse.paho.client.mqttv3.MqttCallback;
  import org.eclipse.paho.client.mqttv3.MqttClient;
  import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  import org.eclipse.paho.client.mqttv3.MqttException;
  import org.eclipse.paho.client.mqttv3.MqttMessage;
  import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
   
  public class MQTTSubscriber implements MqttCallback {
   
    public static void main(String[] args) {
      String topic        = "MQTT Examples";
      int qos             = 2;
      String broker       = "tcp://localhost:1883";
      String clientId     = "JavaMQTTSubscriber";
      MemoryPersistence persistence = new MemoryPersistence();
   
      try {
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        System.out.println("Connecting to broker: "+broker);
        sampleClient.connect(connOpts);
        System.out.println("Connected");
        sampleClient.setCallback(new MQTTSubscriber());
        sampleClient.subscribe(topic, qos);
        System.out.println("Subscribing topic "+ topic +" with QoS "+qos);
      } catch(MqttException me) {
        System.out.println("reason "+me.getReasonCode());
        System.out.println("msg "+me.getMessage());
        System.out.println("loc "+me.getLocalizedMessage());
        System.out.println("cause "+me.getCause());
        System.out.println("excep "+me);
        me.printStackTrace();
      }
    }
   
    public void connectionLost(Throwable arg0) {
      System.out.println("Connection lost");
    }
   
    public void deliveryComplete(IMqttDeliveryToken arg0) {
      System.out.println("Delivery complete");
    }
   
    public void messageArrived(String topic, MqttMessage message) throws Exception {
      System.out.println("Message arrived:");
      System.out.println("  Topic: "+ topic);
      System.out.println("  Message: "+ new String(message.getPayload()));
    }
  }

上述代码中,我们实现了一个MQTT client来接收消息。

五、总结

本文简单介绍了MQTT协议和ActiveMQ,以及如何通过ActiveMQ实现MQTT消息传输。

通过学习本文,您可以了解到:

1. 什么是MQTT协议,以及MQTT协议适用的场景

2. ActiveMQ支持的MQTT版本和如何启用MQTT插件

3. 如何使用ActiveMQ MQTT发送和接收消息