您的位置:

RabbitMQ发送消息详解

一、RabbitMQ发送消息的方法

RabbitMQ是一个强大的分布式消息队列系统,可以实现应用程序之间的异步消息传递。RabbitMQ发送消息的方法非常简单,只需要通过客户端向指定的队列发送一条消息即可。


import pika
import json

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 发送消息到队列中
message = {'name': 'alice', 'age': 25}
channel.basic_publish(exchange='', routing_key='queue_name', body=json.dumps(message))

# 关闭连接
connection.close()

以上代码展示了如何在Python中使用pika库连接RabbitMQ,将消息发送到名为“queue_name”的队列中。

二、RabbitMQ定时发送消息

有时候需要在指定的时间发送消息。这时可以使用RabbitMQ的插件 —— RabbitMQ Delayed Message Exchange。

首先需要安装该插件,方法如下:


# 启用rabbitmq_management插件
rabbitmq-plugins enable rabbitmq_management
# 下载并安装Delayed Message插件
wget https://dl.bintray.com/rabbitmq/community-plugins/3.8.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-3.8.x-${VERSION}.ez
mv rabbitmq_delayed_message_exchange-3.8.x-${VERSION}.ez /usr/lib/rabbitmq/lib/rabbitmq_server-${VERSION}/plugins/

安装完毕后,即可通过指定消息的expiration属性来实现消息的定时发送。消息的expiration属性表示消息过期的时间,单位为毫秒。


import pika
import json

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 发送延迟消息到队列中
message = {'name': 'alice', 'age': 25}
channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='delayed_queue_name',
    body=json.dumps(message),
    properties=pika.BasicProperties(expiration='10000')
)

# 关闭连接
connection.close()

以上代码展示了如何在指定的队列“delayed_queue_name”中发送一条10秒后才会被消费的消息。

三、RabbitMQ发送消息后MQ没收到

当我们发送消息时,如果MQ没有收到消息,我们需要检查以下几点:

1、队列是否存在

发送消息前需先确认队列是否存在,否则之后发送的消息会导致MQ无法正常接收消息。可以通过以下代码创建队列:


import pika

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='queue_name')

# 关闭连接
connection.close()

2、交换机是否绑定了队列

在RabbitMQ中,消息通过交换机分发到不同的队列。在发送消息之前,需要确保该交换机已经与消费者队列进行了绑定。可以通过以下代码进行交换机和队列的绑定:


import pika

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 设置队列名称
queue_name = 'queue_name'

# 绑定交换机和队列
channel.exchange_declare(exchange='exchange_name', exchange_type='direct')
channel.queue_declare(queue=queue_name)
channel.queue_bind(queue=queue_name, exchange='exchange_name', routing_key='routing_key')

# 关闭连接
connection.close()

四、RabbitMQ发送消息的API

RabbitMQ提供了多种消息传递的API,包括AMQP、STOMP、MQTT等。其中,AMQP(Advanced Message Queuing Protocol)是RabbitMQ的默认协议,具有更高的可移植性和更好的性能。

以下是用AMQP协议发送消息的示例代码:


import pika

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='queue_name')

# 发送消息到队列中
channel.basic_publish(exchange='',
                      routing_key='queue_name',
                      body='Hello World')

# 关闭连接
connection.close()

五、RabbitMQ发送消息的参数

RabbitMQ发送消息时可以设置多个参数,比如消息的优先级,持久性等。下面是发送一个持久化的、有优先级的消息示例:


import pika

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='queue_name', durable=True)

# 发送有优先级的消息到队列中
channel.basic_publish(exchange='',
                      routing_key='queue_name',
                      body='Hello World',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 持久化消息
                          priority=1  # 设置消息优先级为1
                      ))

# 关闭连接
connection.close()

六、RabbitMQ发送消息设置编码格式

RabbitMQ发送消息时,默认的编码格式是二进制,如果要发送其他编码格式的消息,需要在发送消息前将消息进行编码。下面是发送一个UTF-8编码的消息示例:


import pika

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='queue_name')

# 发送UTF-8编码的消息到队列中
text = '你好世界'
channel.basic_publish(exchange='',
                      routing_key='queue_name',
                      body=text.encode('utf-8'))

# 关闭连接
connection.close()

七、RabbitMQ发送消息的调用方法

在RabbitMQ中,消息的发送默认是同步的,即消息发送完成后,程序会一直等待MQ的确认消息。

如果我们需要异步的方式发送消息,则我们可以通过以下方式实现:


import pika
import concurrent.futures

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='queue_name')

# 创建异步Executor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

# 异步发送消息到队列中
executor.submit(channel.basic_publish,
                exchange='',
                routing_key='queue_name',
                body='Hello World')

# 关闭连接
connection.close()

八、RabbitMQ异步发送消息

如果需要异步的方式发送消息,可以使用RabbitMQ提供的异步API —— aio-pika。

以下是使用aio-pika发送异步消息的示例代码:


import aio_pika
import asyncio

async def main(loop):
    # 连接rabbitmq
    connection = await aio_pika.connect_robust(
        host='localhost',
        loop=loop
    )
    channel = await connection.channel()

    # 创建队列
    queue_name = 'queue_name'
    queue = await channel.declare_queue(queue_name)

    # 发送异步消息到队列中
    message = aio_pika.Message(body=b"Hello World")
    await channel.default_exchange.publish(message, routing_key=queue_name)

    # 关闭连接
    await connection.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

九、RabbitMQ接收消息方式

在RabbitMQ中,我们可以使用常规方法接收消息,也可以通过消费者进行接收。

常规方法接收消息示例代码:


import pika

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='queue_name')

# 定义回调函数
def callback(ch, method, properties, body):
    print("Received %r" % body)

# 接收消息
channel.basic_consume(queue='queue_name',
                      on_message_callback=callback,
                      auto_ack=True)

# 开始消费
channel.start_consuming()

# 关闭连接
connection.close()

我们还可以通过消费者的方式进行消息的接收,以下是使用消费者的方法接收消息的示例代码:


import pika
import json

# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='queue_name')

# 定义消费者
def consumer(ch, method, properties, body):
    message = json.loads(body)
    print(f"Received message: {message}")

# 开始消费
channel.basic_consume(queue='queue_name', on_message_callback=consumer)

# 关闭连接
connection.close()
以上就是RabbitMQ发送消息的详细操作指南,包括了RabbitMQ发送消息的方法、RabbitMQ定时发送消息、RabbitMQ消息发送后MQ未收到、RabbitMQ发送消息的API、RabbitMQ发送消息的参数、RabbitMQ发送消息设置编码格式、RabbitMQ发送消息的调用方法、RabbitMQ异步发送消息、RabbitMQ接收消息方式等主题。通过这篇文章的学习,相信大家已经可以轻松地实现RabbitMQ消息的发送和接收了。