Skip to content

RabbitMQ 延迟队列 — 基于死信队列实现

1. 什么是延迟队列?

延迟队列是一种消息队列机制,允许消息在指定时间后才被消费者处理。在电商、定时任务、消息重试等场景中,延迟队列有广泛的应用。RabbitMQ 本身不支持直接的延迟队列,但可以通过 死信队列(Dead Letter Queue, DLQ) 结合 TTL(Time-To-Live) 机制实现。

2. 什么是死信队列(DLQ)?

死信队列(DLQ) 是 RabbitMQ 处理特殊消息的机制。当消息在原始队列(普通队列)中满足以下条件之一时,会被转发到 死信交换机(DLX, Dead Letter Exchange),然后再路由到 死信队列(DLQ),供后续处理:

  • 消息 TTL 过期:消息在队列中的存活时间到期后仍未被消费。
  • 队列长度超限:队列已满,新消息无法进入队列。
  • 消息被拒绝(NACK/Requeue=False):消费者显式拒绝消息且不重新入队。

我们可以利用 TTL + DLX 机制实现 RabbitMQ 的延迟队列功能。

3. 基于死信队列的延迟队列实现

3.1 实现思路

  1. 创建普通队列(延迟队列),该队列的消息会在 TTL 过期后进入 死信交换机(DLX)
  2. 配置死信交换机(DLX),当消息在普通队列中过期后,会自÷动路由到死信交换机。
  3. 创建死信队列(真正的消费队列),并绑定到死信交换机,消费者监听这个队列。
  4. 生产者发送消息,消息存活时间由 TTL 决定,达到延迟效果。

画板


3.2 代码实现(Java + Spring Boot)

1. 配置队列和交换机

java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    // 普通队列(延迟队列)
    public static final String DELAYED_QUEUE = "delayed_queue";
    // 死信交换机
    public static final String DLX_EXCHANGE = "dlx_exchange";
    // 死信队列(真正的消费队列)
    public static final String REAL_QUEUE = "real_queue";

    // 创建延迟队列(绑定死信交换机)
    @Bean
    public Queue delayedQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 10000); // 10 秒后消息过期
        args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 绑定死信交换机
        args.put("x-dead-letter-routing-key", "dlx_routing_key"); // 绑定死信队列的路由键
        return new Queue(DELAYED_QUEUE, true, false, false, args);
    }

    // 创建死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE);
    }

    // 创建死信队列
    @Bean
    public Queue realQueue() {
        return new Queue(REAL_QUEUE, true);
    }

    // 绑定死信队列到死信交换机
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(realQueue()).to(dlxExchange()).with("dlx_routing_key");
    }
}

2. 生产者发送消息

java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageProducer {

    private final RabbitTemplate rabbitTemplate;

    public MessageProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    // 发送消息到延迟队列
    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("delayed_queue", message);
        System.out.println(" [x] Sent '" + message + "' to delayed_queue");
    }
}

3. 消费者监听死信队列(延迟后消费)

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumer {

    @RabbitListener(queues = "real_queue")
    public void receiveMessage(String message) {
        System.out.println(" [x] Received '" + message + "' from real_queue");
    }
}

4. 测试流程

  1. 启动 RabbitMQ,确保 RabbitMQ 服务器正在运行。
  2. 启动 Spring Boot 项目,自动创建 RabbitMQ 交换机和队列。
  3. 调用 sendMessage() 方法,将消息发送到 delayed_queue
  4. 等待 10 秒,消息会进入 real_queue,消费者会消费该消息并打印到控制台。

5. 关键点解析

1. 为什么使用死信队列?

RabbitMQ 本身不支持 消息定时投递,而 TTL + DLX 机制可以间接实现延迟队列:

  • 通过 TTL 限制消息存活时间,达到 延迟 效果。
  • 通过 死信交换机,在消息过期后自动转发到真正的消费队列。

2. 如何控制不同的延迟时间?

RabbitMQ 不支持 单条消息的 TTL 配置,而是对整个队列生效。但可以通过 自定义 TTL 方式实现不同的延迟:

java
public void sendMessageWithTTL(String message, int ttl) {
    MessageProperties properties = new MessageProperties();
    properties.setExpiration(String.valueOf(ttl)); // 设置消息的过期时间(单位:毫秒)
    Message msg = new Message(message.getBytes(), properties);
    rabbitTemplate.send("delayed_queue", msg);
}

这样,每条消息都可以有不同的延迟时间,而不会影响整个队列。

6. 适用场景

  • 订单超时取消:电商订单未支付,在 30 分钟后自动取消。
  • 延迟任务调度:用户注册后 10 分钟推送欢迎消息。
  • 消息重试机制:如果消费失败,10 秒后重试消费。

7. 总结

本文介绍了 RabbitMQ 延迟队列 的实现方式,利用 TTL + DLX 机制,通过 Java + Spring Boot 编写了完整的 生产者、消费者和 RabbitMQ 配置。如果需要更加精准的延迟投递,可以考虑使用 RabbitMQ x-delayed-message 插件,但 TTL + DLX 方式更加通用。

🚀 希望这篇文章能帮助你理解 RabbitMQ 延迟队列的实现方式! 🎯

前端知识体系 · wcrane