RabbitMQ发送消息详解

发布时间:2023-05-18

RabbitMQ发送消息的方法

RabbitMQ是一个强大的分布式消息队列系统,可以实现应用程序之间的异步消息传递。RabbitMQ发送消息的方法非常简单,只需要通过客户端向指定的队列发送一条消息即可。

import pika
import json
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 发送消息到队列中
message = {'name': 'alice', 'age': 25}
channel.basic_publish(exchange='', routing_key='queue_name', body=json.dumps(message))
# 关闭连接
connection.close()

以上代码展示了如何在Python中使用pika库连接RabbitMQ,将消息发送到名为“queue_name”的队列中。

RabbitMQ定时发送消息

有时候需要在指定的时间发送消息。这时可以使用RabbitMQ的插件 —— RabbitMQ Delayed Message Exchange。 首先需要安装该插件,方法如下:

# 启用rabbitmq_management插件
rabbitmq-plugins enable rabbitmq_management
# 下载并安装Delayed Message插件
wget https://dl.bintray.com/rabbitmq/community-plugins/3.8.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-3.8.x-${VERSION}.ez
mv rabbitmq_delayed_message_exchange-3.8.x-${VERSION}.ez /usr/lib/rabbitmq/lib/rabbitmq_server-${VERSION}/plugins/

安装完毕后,即可通过指定消息的expiration属性来实现消息的定时发送。消息的expiration属性表示消息过期的时间,单位为毫秒。

import pika
import json
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 发送延迟消息到队列中
message = {'name': 'alice', 'age': 25}
channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='delayed_queue_name',
    body=json.dumps(message),
    properties=pika.BasicProperties(expiration='10000')
)
# 关闭连接
connection.close()

以上代码展示了如何在指定的队列“delayed_queue_name”中发送一条10秒后才会被消费的消息。

RabbitMQ发送消息后MQ没收到

当我们发送消息时,如果MQ没有收到消息,我们需要检查以下几点:

1、队列是否存在

发送消息前需先确认队列是否存在,否则之后发送的消息会导致MQ无法正常接收消息。可以通过以下代码创建队列:

import pika
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='queue_name')
# 关闭连接
connection.close()

2、交换机是否绑定了队列

在RabbitMQ中,消息通过交换机分发到不同的队列。在发送消息之前,需要确保该交换机已经与消费者队列进行了绑定。可以通过以下代码进行交换机和队列的绑定:

import pika
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 设置队列名称
queue_name = 'queue_name'
# 绑定交换机和队列
channel.exchange_declare(exchange='exchange_name', exchange_type='direct')
channel.queue_declare(queue=queue_name)
channel.queue_bind(queue=queue_name, exchange='exchange_name', routing_key='routing_key')
# 关闭连接
connection.close()

RabbitMQ发送消息的API

RabbitMQ提供了多种消息传递的API,包括AMQP、STOMP、MQTT等。其中,AMQP(Advanced Message Queuing Protocol)是RabbitMQ的默认协议,具有更高的可移植性和更好的性能。 以下是用AMQP协议发送消息的示例代码:

import pika
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='queue_name')
# 发送消息到队列中
channel.basic_publish(exchange='',
                      routing_key='queue_name',
                      body='Hello World')
# 关闭连接
connection.close()

RabbitMQ发送消息的参数

RabbitMQ发送消息时可以设置多个参数,比如消息的优先级,持久性等。下面是发送一个持久化的、有优先级的消息示例:

import pika
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='queue_name', durable=True)
# 发送有优先级的消息到队列中
channel.basic_publish(exchange='',
                      routing_key='queue_name',
                      body='Hello World',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 持久化消息
                          priority=1  # 设置消息优先级为1
                      ))
# 关闭连接
connection.close()

RabbitMQ发送消息设置编码格式

RabbitMQ发送消息时,默认的编码格式是二进制,如果要发送其他编码格式的消息,需要在发送消息前将消息进行编码。下面是发送一个UTF-8编码的消息示例:

import pika
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='queue_name')
# 发送UTF-8编码的消息到队列中
text = '你好世界'
channel.basic_publish(exchange='',
                      routing_key='queue_name',
                      body=text.encode('utf-8'))
# 关闭连接
connection.close()

RabbitMQ发送消息的调用方法

在RabbitMQ中,消息的发送默认是同步的,即消息发送完成后,程序会一直等待MQ的确认消息。 如果我们需要异步的方式发送消息,则我们可以通过以下方式实现:

import pika
import concurrent.futures
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='queue_name')
# 创建异步Executor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# 异步发送消息到队列中
executor.submit(channel.basic_publish,
                exchange='',
                routing_key='queue_name',
                body='Hello World')
# 关闭连接
connection.close()

RabbitMQ异步发送消息

如果需要异步的方式发送消息,可以使用RabbitMQ提供的异步API —— aio-pika。 以下是使用aio-pika发送异步消息的示例代码:

import aio_pika
import asyncio
async def main(loop):
    # 连接rabbitmq
    connection = await aio_pika.connect_robust(
        host='localhost',
        loop=loop
    )
    channel = await connection.channel()
    # 创建队列
    queue_name = 'queue_name'
    queue = await channel.declare_queue(queue_name)
    # 发送异步消息到队列中
    message = aio_pika.Message(body=b"Hello World")
    await channel.default_exchange.publish(message, routing_key=queue_name)
    # 关闭连接
    await connection.close()
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

RabbitMQ接收消息方式

在RabbitMQ中,我们可以使用常规方法接收消息,也可以通过消费者进行接收。 常规方法接收消息示例代码:

import pika
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='queue_name')
# 定义回调函数
def callback(ch, method, properties, body):
    print("Received %r" % body)
# 接收消息
channel.basic_consume(queue='queue_name',
                      on_message_callback=callback,
                      auto_ack=True)
# 开始消费
channel.start_consuming()
# 关闭连接
connection.close()

我们还可以通过消费者的方式进行消息的接收,以下是使用消费者的方法接收消息的示例代码:

import pika
import json
# 连接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='queue_name')
# 定义消费者
def consumer(ch, method, properties, body):
    message = json.loads(body)
    print(f"Received message: {message}")
# 开始消费
channel.basic_consume(queue='queue_name', on_message_callback=consumer)
# 关闭连接
connection.close()

以上就是RabbitMQ发送消息的详细操作指南,包括了RabbitMQ发送消息的方法、RabbitMQ定时发送消息、RabbitMQ消息发送后MQ未收到、RabbitMQ发送消息的API、RabbitMQ发送消息的参数、RabbitMQ发送消息设置编码格式、RabbitMQ发送消息的调用方法、RabbitMQ异步发送消息、RabbitMQ接收消息方式等主题。通过这篇文章的学习,相信大家已经可以轻松地实现RabbitMQ消息的发送和接收了。