您的位置:

RabbitMQ消息队列详解:使用、优化和应用场景

一、什么是消息队列

在软件系统中,不同应用程序之间经常需要进行异步通信。通常情况下,这些通信实现起来非常复杂。使用消息队列可以解决这个问题。

消息队列就是一种分布式的消息传递系统。消息发送者把消息发送给队列,消息接收者从队列中获取消息来进行处理。消息传递过程中,队列就像一个中间代理,并且具有消息缓存的功能。这样,消息发送者和接收者就可以独立进行操作,并且不会出现因为通信故障导致的消息丢失情况。

二、为什么要使用消息队列

使用消息队列的主要目的是为了解耦合。异步通信可以让系统中的应用程序之间不需要进行直接交互,从而减少了它们之间的依赖关系。消息队列还可以减轻系统负载,针对高并发场景,消息队列可以帮助处理大量流量并发请求,从而避免系统崩溃。

三、使用场景

下面介绍一些常见的使用场景:

1、任务队列:MQ可以代替定时任务框架。定时任务的缺点在于无法对细微的更新进行感知,并且随着时间的推移,任务积累得越来越多。而使用MQ则可以进行及时的任务派发。

2、处理程序解耦合:服务A想要调用服务B,但是它们的依赖关系很紧密。如果他们之间使用消息队列,那么两个应用程序就没有直接的依赖关系。因此,可以在不改变代码的情况下,对服务A和服务B进行部署和更新。

3、流量削峰:应用程序处理流量的峰值是自然而然的,但是如果峰值超出了它的承载能力,这可能会导致应用程序崩溃。使用消息队列,程序可以在峰值时刻暂存请求,等到峰值结束后再慢慢处理这些请求。这就是流量削峰。

四、RabbitMQ使用教程

1、同步阻塞模式

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

上述代码演示了消费者向队列发送消息的过程。BlockingConnection方法返回一个Connection对象,Channel对象则用于发消息。

2、接收消息

import pika

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

上述代码演示了消费者从队列中接收消息的过程。basic_consume()方法表示从队列开始消费消息,当有消息到达时,就会触发回调函数callback()。这个回调函数打印出消息内容。

3、优化

下面介绍一些RabbitMQ的优化措施:

1、消息持久化:当消息被发送到队列后,如果在RabbitMQ关闭前都没有被消费,那么这条消息就会被删除。所以,为了防止消息在RabbitMQ宕机后丢失,需要将消息持久化。

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(
    exchange='',
    routing_key='hello',
    body='Hello World!',
    properties=pika.BasicProperties(delivery_mode=2)
)
print(" [x] Sent 'Hello World!'")
connection.close()

将消息持久化的方法是将队列的durable参数和消息的delivery_mode参数均设置为2。

2、利用Publisher Confirms实现事务控制:所有发送到RabbitMQ服务中的消息都会由RabbitMQ作为生产者发送到交换器。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.confirm_delivery()  # 开启delivery_confirm模式

channel.basic_publish(exchange='',
                      routing_key='hello.pika.confirm',
                      body='hello world from pika confirm.',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 确认消息持久化
                      ))

confirmed = channel.wait_for_confirms()
if confirmed:
    print('消息发送成功')
else:
    print('消息发送失败')
    
connection.close()

在开启delivery_confirm模式后,每次发送一条消息,都要等待Broker的回应,只有Broker出现问题或者消息写入后收不到Broker回应才会触发confirm回调函数。

五、应用场景

消息队列适用于很多场景,如:

1、电商场景:解决订单流程中库存锁定、创建订单、支付等问题。MQ在这场景中主要用于订单下发和处理,并且在处理这些订单的时候,需要根据产品库存、用户余额等信息进行处理。

2、异步处理:在大数据处理或计算过程中,结果处理时间较长,需要将任务分到不同的节点进行异步处理,这样可以节省大量计算资源。

3、发布订阅模式:气象局、证券公司等需要实时发布信息的机构可以使用RabbitMQ实现信息的实时订阅。

六、结束语

消息队列已经成为了现代软件工程中不可或缺的重要组件。本文介绍了RabbitMQ的使用和优化,以及应用场景。希望通过本文的介绍,能够让读者更好地理解消息队列的工作原理并且应用到实际场景中。