TTL队列(Time To Live)
TTL队列:简单的说就是队列中的消息是有时间限制的,如果超时,那么这个消息将会被队列删除
业务场景
淘宝购物,提交订单之后,在一定的时间内(30分钟)、订单如果没有支付,那么订单就会自动关闭,如果想要购买商品,就需要重新下单。实现这个功能,就使用到了这个 TTL队列.
代码实现
编写生产者
// 生产者
public class Producer {
private static final String QUEUE_NAME = "queue_ttl_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 设置队列的属性
Map<String, Object> map = new HashMap<>(1);
// 如果是 ttl 队列,需要设置键为:x-message-ttl,值为:过期时间(毫秒)
map.put("x-message-ttl", 10000);
channel.queueDeclare(QUEUE_NAME, false, false, false, map);
channel.basicPublish("", QUEUE_NAME, null, "ttl队列...".getBytes());
channel.close();
connection.close();
}
}
编写消费者
// 消费者
public class Consumer {
private static final String QUEUE_NAME = "queue_ttl_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
Map<String, Object> map = new HashMap<>(1);
// 如果是 ttl 队列,需要设置键为:x-message-ttl,值为:过期时间(毫秒)
map.put("x-message-ttl", 10000);
channel.queueDeclare(QUEUE_NAME, false, false, false, map);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者接收到消息:" + new String(body));
System.out.println(new SimpleDateFormat("hh: mm: ss").format(new Date()));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
System.out.println(new SimpleDateFormat("hh: mm: ss").format(new Date()));
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
测试TTL
启动消费者,再启动生产者,然后重新启动消费者
可以看出,生产者发送消息时,消费者可以接收到,当重新启动消费者时,由于间隔时间大于 TTL 设置的过期时间10秒,所以第二次并没有得到值,也就证明了,队列中的消息已经被删除。
小结
- 设置TTL队列,需要在创建队列的时候,设置上一个属性
x-message-ttl
- 设置方式:
map.put("x-message-ttl", 10000);
死信队列
什么是死信队列
当消息在队列中变成死信之后、可以定义它重新push 到另外一个交换机上、这个交换机 也有自己对应的队列 这个队列就称为死信队列。
消息变成死信的原因:
- 发送到队列中的消息被拒绝了
- 消息的 ttl 时间过期了
- 队列达到了最大长度,再往里面放信息,放不进去的消息
在满足死信的前提下,定义一个队列,这个队列专门用来存放死信,就是死信队列
死信队列也是一个正常的队列,和一般的队列没有什么区别
当这个队列中如果有这个死信的时候,rabbitmq 就会将这个消息自动发送到我们提前定义好的死信队列中去(简单的说就是路由到另外一个队列)
死信的流程:
生产者 --> 发送消息 --> TTL交换机 --> TTL队列 --> 变成死信队列 --> DLX交换机 --> DLX队列 --> 消费者
代码实现
这里使用 TTL 测试死信队列
编写生产者
// 生产者
public class Producer {
// ttl队列的交换机
private static final String EXCHANGE_NAME_TTL = "exchange_ttl";
// 路由的key
private static final String ROUTING_KEY = "dlx.km";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
for (int i = 0; i < 5; i++) {
channel.basicPublish(EXCHANGE_NAME_TTL, ROUTING_KEY, null, ("TTL队列测试死信队列" + i).getBytes());
}
channel.close();
connection.close();
}
}
编写消费者
// 消费者
public class Consumer {
// ttl队列的交换机
private static final String EXCHANGE_NAME_TTL = "exchange_ttl";
// ttl队列
private static final String QUEUE_NAME_TTL = "queue_ttl";
// dlx交换机
private static final String EXCHANGE_NAME_DLX = "exchange_dlx";
// dlx队列
private static final String QUEUE_NAME_DLX = "queue_dlx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 创建topic模式交换机,开启持久化
channel.exchangeDeclare(EXCHANGE_NAME_TTL, "topic", true);
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 10000);
map.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
channel.queueDeclare(QUEUE_NAME_TTL, true, false, false, map);
// 将队列和交换机进行绑定
channel.queueBind(QUEUE_NAME_TTL, EXCHANGE_NAME_TTL, "dlx.#");
// 开始声明死信交换机和死信队列
// 添加一个死信的属性 后面这个值就是死信队列交换机的名字
channel.exchangeDeclare(EXCHANGE_NAME_DLX, "topic", true);
channel.queueDeclare(QUEUE_NAME_DLX, true, false, false, null);
channel.queueBind(QUEUE_NAME_DLX, EXCHANGE_NAME_DLX, "#");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("成功获取到数据:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME_TTL, true, defaultConsumer);
// channel.basicConsume(QUEUE_NAME_DLX, true, defaultConsumer);
}
}
测试结果
先启动消费者,再启动生产者
当设置 channel.basicConsume(QUEUE_NAME_TTL, true, defaultConsumer);
时,也就是监听 TTL 队列消息的时候,控制台马上打印出下图所示信息。
当设置 channel.basicConsume(QUEUE_NAME_DLX, true, defaultConsumer);
时,监听死信队列的消息的时候,控制台在消息发送了 10秒钟之后才打印出以下信息。
证明 TTL过期的信息,成功进入到死信队列中。动态效果也可以在 RabbitMQ控制台中看出来。
小结
- 核心代码:
map.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX);
一定要设置 ttl 队列的死信属性 - 死信队列就是一个普通的队列
- 应用场景:消息的延迟发送,定时关闭订单等。
--end--