一、RabbitMQ基础
RabbitMQ是一个消息中间件,用于在分布式应用程序中传递消息,提供了一种基于AMQP协议的可靠消息传递机制。 RabbitMQ的核心概念包括:生产者(Producer)、交换机(Exchange)、队列(Queue)和消费者(Consumer)。生产者将消息发布到交换机,交换机根据绑定的规则,将消息路由到一个或多个队列中,消费者从队列中接收消息。 RabbitMQ支持多种消息传递模式,包括点对点(Point-to-Point)、发布-订阅(Publish-Subscribe)、路由(Routing)和主题(Topics)等。
二、RabbitMQ Demo示例
1. Point-to-Point示例
在Point-to-Point模式下,生产者将消息发送给一个队列,消费者从该队列中接收消息。
// 生产者
public static void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
String message = "Hello RabbitMQ!";
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println("Sent message: " + message);
channel.close();
connection.close();
}
// 消费者
public static void receive() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
channel.basicConsume("hello", true, consumer);
channel.close();
connection.close();
}
2. Publish-Subscribe示例
在Publish-Subscribe模式下,生产者将消息发送给一个交换机,该交换机将消息路由到所有绑定了该交换机的队列中,每个消费者都从一个队列中接收消息。
// 生产者
public static void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", "fanout");
String message = "Hello RabbitMQ!";
channel.basicPublish("logs", "", null, message.getBytes());
System.out.println("Sent message: " + message);
channel.close();
connection.close();
}
// 消费者1
public static void receive1() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue1");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
// 消费者2
public static void receive2() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue2");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
3. Routing示例
在Routing模式下,生产者将消息发送给一个带有routing key的交换机,交换机根据队列与交换机的绑定关系,将消息路由到指定队列中,消费者从指定队列中接收消息。
// 生产者
public static void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", "direct");
String message = "Hello RabbitMQ!";
channel.basicPublish("direct_logs", "error", null, message.getBytes());
System.out.println("Sent message: " + message);
channel.close();
connection.close();
}
// 消费者1
public static void receive1() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "error");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue1");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
// 消费者2
public static void receive2() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "warning");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue2");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
4. Topics示例
在Topics模式下,生产者将消息发送给一个带有routing key的交换机,交换机根据队列与交换机的绑定关系,将消息路由到指定队列中,消费者从指定队列中接收消息。Topic模式下的routing key可以使用通配符*
和#
,*
表示匹配单个单词,#
表示匹配任意个单词。
// 生产者
public static void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_logs", "topic");
String message = "Hello RabbitMQ!";
channel.basicPublish("topic_logs", "user.info", null, message.getBytes());
System.out.println("Sent message: " + message);
channel.close();
connection.close();
}
// 消费者1
public static void receive1() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_logs", "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.info");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue1");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
// 消费者2
public static void receive2() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_logs", "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.error");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue2");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
三、总结
本文介绍了RabbitMQ的基础概念及其使用示例,希望可以对大家了解RabbitMQ有所帮助。