一、MQTT协议简介
MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布/订阅模式的消息传输协议,具有开放、简单和易于实现的特点。
MQTT协议可以用于物联网、移动设备等多种场景,主要用于设备与设备之间的通讯,将设备产生的数据传输到云平台或控制其他设备。
MQTT协议采用基于TCP/IP的传输方式,支持Quality of Service(QoS)0,1,2等多种服务质量级别,可以支持百万级别的TCP/IP连接,是当前IoT领域最为流行的协议之一。
二、ActiveMQ简介
ActiveMQ是一个开源的、基于Java的消息中间件,支持多种消息传输协议,例如TCP/IP、STOMP、WebSocket等。
ActiveMQ采用JMS(Java Message Service)作为消息API,可以与Java应用程序无缝集成,拥有高可伸缩性、高可用性、容错性等优点。
三、ActiveMQ支持的MQTT版本
ActiveMQ从版本5.9.0开始支持MQTT 3.1.1版本。
在ActiveMQ中,MQTT消息协议是作为插件存在的,因此需要使用ActiveMQ的命令来进行启用和管理。
四、使用ActiveMQ MQTT传输消息
我们可以通过ActiveMQ MQTT插件来实现基于MQTT协议的消息传输,下面是一个简单的示例,展示如何使用ActiveMQ MQTT发送和接收消息。
1、启用ActiveMQ MQTT插件
<transportConnectors>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?transport.defaultKeepAlive=60000&transport.closeAsync=true"/>
</transportConnectors>
在ActiveMQ的配置文件activemq.xml中,可以添加上述代码来启用MQTT插件。
2、使用ActiveMQ MQTT发送消息
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MQTTSender {
public static void main(String[] args) {
String topic = "MQTT Examples";
String content = "MQTT Test";
int qos = 2;
String broker = "tcp://localhost:1883";
String clientId = "JavaMQTTSender";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: "+content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
}
上述代码中,我们使用eclipse的paho.mqttv3包来实现消息发送。运行该代码后,可以在ActiveMQ控制台查看推送的消息。
3、使用ActiveMQ MQTT接收消息
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQTTSubscriber implements MqttCallback {
public static void main(String[] args) {
String topic = "MQTT Examples";
int qos = 2;
String broker = "tcp://localhost:1883";
String clientId = "JavaMQTTSubscriber";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
sampleClient.setCallback(new MQTTSubscriber());
sampleClient.subscribe(topic, qos);
System.out.println("Subscribing topic "+ topic +" with QoS "+qos);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
}
public void connectionLost(Throwable arg0) {
System.out.println("Connection lost");
}
public void deliveryComplete(IMqttDeliveryToken arg0) {
System.out.println("Delivery complete");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message arrived:");
System.out.println(" Topic: "+ topic);
System.out.println(" Message: "+ new String(message.getPayload()));
}
}
上述代码中,我们实现了一个MQTT client来接收消息。
五、总结
本文简单介绍了MQTT协议和ActiveMQ,以及如何通过ActiveMQ实现MQTT消息传输。
通过学习本文,您可以了解到:
1. 什么是MQTT协议,以及MQTT协议适用的场景
2. ActiveMQ支持的MQTT版本和如何启用MQTT插件
3. 如何使用ActiveMQ MQTT发送和接收消息