RabbitMQ之TTL队列及死信队列

lz 1年前 ⋅ 877 阅读

TTL队列(Time To Live)

TTL队列:简单的说就是队列中的消息是有时间限制的,如果超时,那么这个消息将会被队列删除

业务场景

淘宝购物,提交订单之后,在一定的时间内(30分钟)、订单如果没有支付,那么订单就会自动关闭,如果想要购买商品,就需要重新下单。实现这个功能,就使用到了这个 TTL队列.

代码实现

编写生产者

// 生产者
public class Producer {
    private static final String QUEUE_NAME = "queue_ttl_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 设置队列的属性
        Map<String, Object> map = new HashMap<>(1);
        // 如果是 ttl 队列,需要设置键为:x-message-ttl,值为:过期时间(毫秒)
        map.put("x-message-ttl", 10000);
        channel.queueDeclare(QUEUE_NAME, false, false, false, map);
        channel.basicPublish("", QUEUE_NAME, null, "ttl队列...".getBytes());
        channel.close();
        connection.close();
    }
}

编写消费者

// 消费者
public class Consumer {
    private static final String QUEUE_NAME = "queue_ttl_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        Map<String, Object> map = new HashMap<>(1);
        // 如果是 ttl 队列,需要设置键为:x-message-ttl,值为:过期时间(毫秒)
        map.put("x-message-ttl", 10000);
        channel.queueDeclare(QUEUE_NAME, false, false, false, map);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者接收到消息:" + new String(body));
                System.out.println(new SimpleDateFormat("hh: mm: ss").format(new Date()));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        System.out.println(new SimpleDateFormat("hh: mm: ss").format(new Date()));
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

测试TTL

启动消费者,再启动生产者,然后重新启动消费者

可以看出,生产者发送消息时,消费者可以接收到,当重新启动消费者时,由于间隔时间大于 TTL 设置的过期时间10秒,所以第二次并没有得到值,也就证明了,队列中的消息已经被删除。

小结

  1. 设置TTL队列,需要在创建队列的时候,设置上一个属性 x-message-ttl
  2. 设置方式:map.put("x-message-ttl", 10000);

死信队列

什么是死信队列

当消息在队列中变成死信之后、可以定义它重新push 到另外一个交换机上、这个交换机 也有自己对应的队列 这个队列就称为死信队列。

消息变成死信的原因:

  1. 发送到队列中的消息被拒绝了
  2. 消息的 ttl 时间过期了
  3. 队列达到了最大长度,再往里面放信息,放不进去的消息

在满足死信的前提下,定义一个队列,这个队列专门用来存放死信,就是死信队列

死信队列也是一个正常的队列,和一般的队列没有什么区别

当这个队列中如果有这个死信的时候,rabbitmq 就会将这个消息自动发送到我们提前定义好的死信队列中去(简单的说就是路由到另外一个队列)

死信的流程:

生产者 --> 发送消息 --> TTL交换机 --> TTL队列 --> 变成死信队列 --> DLX交换机 --> DLX队列 --> 消费者

代码实现

这里使用 TTL 测试死信队列

编写生产者

// 生产者
public class Producer {
    // ttl队列的交换机
    private static final String EXCHANGE_NAME_TTL = "exchange_ttl";
    // 路由的key
    private static final String ROUTING_KEY = "dlx.km";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(EXCHANGE_NAME_TTL, ROUTING_KEY, null, ("TTL队列测试死信队列" + i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

编写消费者

// 消费者
public class Consumer {
    // ttl队列的交换机
    private static final String EXCHANGE_NAME_TTL = "exchange_ttl";
    // ttl队列
    private static final String QUEUE_NAME_TTL = "queue_ttl";
    // dlx交换机
    private static final String EXCHANGE_NAME_DLX = "exchange_dlx";
    // dlx队列
    private static final String QUEUE_NAME_DLX = "queue_dlx";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        // 创建topic模式交换机,开启持久化
        channel.exchangeDeclare(EXCHANGE_NAME_TTL, "topic", true);
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 10000);
        map.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
        channel.queueDeclare(QUEUE_NAME_TTL, true, false, false, map);
        // 将队列和交换机进行绑定
        channel.queueBind(QUEUE_NAME_TTL, EXCHANGE_NAME_TTL, "dlx.#");

        // 开始声明死信交换机和死信队列
        // 添加一个死信的属性  后面这个值就是死信队列交换机的名字
        channel.exchangeDeclare(EXCHANGE_NAME_DLX, "topic", true);
        channel.queueDeclare(QUEUE_NAME_DLX, true, false, false, null);
        channel.queueBind(QUEUE_NAME_DLX, EXCHANGE_NAME_DLX, "#");

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("成功获取到数据:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME_TTL, true, defaultConsumer);
//        channel.basicConsume(QUEUE_NAME_DLX, true, defaultConsumer);
    }
}

测试结果

先启动消费者,再启动生产者

当设置 channel.basicConsume(QUEUE_NAME_TTL, true, defaultConsumer); 时,也就是监听 TTL 队列消息的时候,控制台马上打印出下图所示信息。

当设置 channel.basicConsume(QUEUE_NAME_DLX, true, defaultConsumer); 时,监听死信队列的消息的时候,控制台在消息发送了 10秒钟之后才打印出以下信息。

证明 TTL过期的信息,成功进入到死信队列中。动态效果也可以在 RabbitMQ控制台中看出来。

小结

  1. 核心代码:map.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX); 一定要设置 ttl 队列的死信属性
  2. 死信队列就是一个普通的队列
  3. 应用场景:消息的延迟发送,定时关闭订单等。

--end--

 

版权 本文为TIMO社区原创文章,转载无需和我联系,但请注明来自TIMO社区 http://timo.aikanmv.cn