RabbitMQ Demo

发布时间:2023-05-22

一、RabbitMQ基础

RabbitMQ是一个消息中间件,用于在分布式应用程序中传递消息,提供了一种基于AMQP协议的可靠消息传递机制。 RabbitMQ的核心概念包括:生产者(Producer)、交换机(Exchange)、队列(Queue)和消费者(Consumer)。生产者将消息发布到交换机,交换机根据绑定的规则,将消息路由到一个或多个队列中,消费者从队列中接收消息。 RabbitMQ支持多种消息传递模式,包括点对点(Point-to-Point)、发布-订阅(Publish-Subscribe)、路由(Routing)和主题(Topics)等。

二、RabbitMQ Demo示例

1. Point-to-Point示例

在Point-to-Point模式下,生产者将消息发送给一个队列,消费者从该队列中接收消息。

// 生产者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);
    String message = "Hello RabbitMQ!";
    channel.basicPublish("", "hello", null, message.getBytes());
    System.out.println("Sent message: " + message);
    channel.close();
    connection.close();
}
// 消费者
public static void receive() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("hello", false, false, false, null);
    System.out.println("Waiting for messages...");
    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message);
        }
    };
    channel.basicConsume("hello", true, consumer);
    channel.close();
    connection.close();
}

2. Publish-Subscribe示例

在Publish-Subscribe模式下,生产者将消息发送给一个交换机,该交换机将消息路由到所有绑定了该交换机的队列中,每个消费者都从一个队列中接收消息。

// 生产者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("logs", "fanout");
    String message = "Hello RabbitMQ!";
    channel.basicPublish("logs", "", null, message.getBytes());
    System.out.println("Sent message: " + message);
    channel.close();
    connection.close();
}
// 消费者1
public static void receive1() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("logs", "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "logs", "");
    System.out.println("Waiting for messages...");
    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue1");
        }
    };
    channel.basicConsume(queueName, true, consumer);
    channel.close();
    connection.close();
}
// 消费者2
public static void receive2() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("logs", "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "logs", "");
    System.out.println("Waiting for messages...");
    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue2");
        }
    };
    channel.basicConsume(queueName, true, consumer);
    channel.close();
    connection.close();
}

3. Routing示例

在Routing模式下,生产者将消息发送给一个带有routing key的交换机,交换机根据队列与交换机的绑定关系,将消息路由到指定队列中,消费者从指定队列中接收消息。

// 生产者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("direct_logs", "direct");
    String message = "Hello RabbitMQ!";
    channel.basicPublish("direct_logs", "error", null, message.getBytes());
    System.out.println("Sent message: " + message);
    channel.close();
    connection.close();
}
// 消费者1
public static void receive1() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("direct_logs", "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "direct_logs", "error");
    System.out.println("Waiting for messages...");
    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue1");
        }
    };
    channel.basicConsume(queueName, true, consumer);
    channel.close();
    connection.close();
}
// 消费者2
public static void receive2() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("direct_logs", "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "direct_logs", "warning");
    System.out.println("Waiting for messages...");
    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue2");
        }
    };
    channel.basicConsume(queueName, true, consumer);
    channel.close();
    connection.close();
}

4. Topics示例

在Topics模式下,生产者将消息发送给一个带有routing key的交换机,交换机根据队列与交换机的绑定关系,将消息路由到指定队列中,消费者从指定队列中接收消息。Topic模式下的routing key可以使用通配符*#*表示匹配单个单词,#表示匹配任意个单词。

// 生产者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("topic_logs", "topic");
    String message = "Hello RabbitMQ!";
    channel.basicPublish("topic_logs", "user.info", null, message.getBytes());
    System.out.println("Sent message: " + message);
    channel.close();
    connection.close();
}
// 消费者1
public static void receive1() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("topic_logs", "topic");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "topic_logs", "*.info");
    System.out.println("Waiting for messages...");
    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue1");
        }
    };
    channel.basicConsume(queueName, true, consumer);
    channel.close();
    connection.close();
}
// 消费者2
public static void receive2() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare("topic_logs", "topic");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "topic_logs", "*.error");
    System.out.println("Waiting for messages...");
    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue2");
        }
    };
    channel.basicConsume(queueName, true, consumer);
    channel.close();
    connection.close();
}

三、总结

本文介绍了RabbitMQ的基础概念及其使用示例,希望可以对大家了解RabbitMQ有所帮助。