一、RabbitMQ的基本概念
在学习RabbitMQ之前,必须理解RabbitMQ的基本概念及其作用。RabbitMQ是一个开源的消息队列系统,用于快速、可靠地处理存储和转发消息。它基于AMQP(高级消息队列协议)开发,提供了许多高级特性,如消息持久性、发布/订阅模式和自动健康监测等。
// RabbitMQ基本操作示例代码 using RabbitMQ.Client; using System.Text; // 生产者,发布消息 IModel channel = connection.CreateModel(); channel.BasicPublish(exchange: "", routingKey: "hello", body: Encoding.UTF8.GetBytes("Hello World!")); // 消费者,接收消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] messageBodyBytes = ea.Body.ToArray(); string message = Encoding.UTF8.GetString(messageBodyBytes); Console.WriteLine("Received {0}", message); }; channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
上面的代码示例展示了如何在生产者端发送消息,并在消费者端接收消息。其中,生产者通过BasicPublish()方法来发布消息,消费者则通过BasicConsume()方法来订阅消息。另外,RabbitMQ还支持交换机(exchange)机制,用于在队列之间进行消息路由。
二、交换机的类型
RabbitMQ支持四种类型的交换机:direct、fanout、topic和headers。它们分别对应着不同的消息路由策略。其中,direct模式是最简单的模式,它会将消息路由到与routing key完全匹配的队列中。而fanout模式则是广播模式,它会将消息路由到所有绑定到该交换机上的队列中。topic模式则是按照正则表达式匹配规则来路由消息,而headers则是通过消息头中的键值对来路由消息。
// Direct模式示例代码 // 生产者,发布消息 channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); // 消费者,接收消息 channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); var queueName = channel.QueueDeclare().QueueName; foreach (var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); } var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
上面的代码示例演示了如何使用direct模式来发送和接收消息。其中,生产者通过ExchangeDeclare()方法来声明交换机,通过BasicPublish()方法来发布消息。消费者则通过QueueDeclare()方法来创建队列,通过QueueBind()方法来绑定队列和交换机,然后通过BasicConsume()方法来订阅消息。
三、消息的持久化
当RabbitMQ服务器关闭或者崩溃时,所有未被处理的消息都会丢失,这是非常危险的。为了避免这种情况的发生,我们可以将消息设置为持久化。当消息被设置为持久化时,RabbitMQ会将消息写入到硬盘中,这样即使服务器崩溃,消息也可以被恢复。
// 持久化示例代码 // 生产者,发布持久化消息 channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout); var properties = channel.CreateBasicProperties(); properties.Persistent = true; var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: properties, body: body); // 消费者,接收持久化消息 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); };
上面的代码示例演示了如何将消息设置为持久化。在生产者端,我们通过设置properties的Persistent属性来将消息设置为持久化消息。而在消费者端,则需要在QueueDeclare()方法中设置durable为true来声明队列为持久化队列。此外,在处理完一条消息后,需要使用BasicAck()方法来确认消息已经被消费。
四、RabbitMQ的高可用性
RabbitMQ支持多个节点之间的集群,从而提高了消息队列的高可用性。在RabbitMQ集群中,每个节点都具有相同的角色,并且它们之间可以共享相同的队列和交换机。当其中一个节点崩溃时,其他节点会自动接管它的任务,从而保证了消息队列的可靠性。
// RabbitMQ集群示例代码 // 创建RabbitMQ连接 var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672, VirtualHost = "/" }; var connection = factory.CreateConnection(); // 创建Model var channel = connection.CreateModel(); // 声明Exchange,Queue,并绑定。其中,Queue需要设置为exclusive=false,以支持多个消费者。 channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); // 创建消费者,用于消费消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
上面的代码示例展示了如何在RabbitMQ集群中创建Exchange、Queue并进行绑定。其中,生产者和消费者都应该连接到集群中的某一个节点进行消息传递。