RabbitMQ高级特性之死信队列和延迟队列

RabbitMQ-----死信队列

消息成为死信的三种情况

  1. 队列长度达到限制
  2. 消费者拒绝消费消息,basicNack/basicReject,并且不把消息重回队列
  3. 源队列存在消息过期限制,消息超时未消费

总结了以下两种配置

第一种配置,所有参数都放在map里面,map里的key可以看RabbirMQ中queue的Arguments参数

1@Configuration 2public class RabbitMQConfig { 3 /** 4 * 正常交换机的名称 5 */ 6 public static final String TOPIC_EXCHANGE_BOOT = "direct_exchange-boot"; 7 /** 8 * s信交换机的名称 9 */ 10 public static final String DLX_EXCHANGE_BOOT = "dlx_exchange-boot"; 11 /** 12 * 正常队列的名称 13 */ 14 public static final String TOPIC_QUEUE1_BOOT = "direct_queue1"; 15 /** 16 * s信队列的名称 17 */ 18 public static final String DLX_QUEUE1_BOOT = "dlx_queue1"; 19 20 21 /** 22 * 声明队列 23 * 24 * @return 25 */ 26 @Bean("itemqueue") 27 public Queue queueDeclare() { 28 Map<String, Object> args = new HashMap<>(2); 29 // x-dead-letter-exchange 30 args.put("x-dead-letter-exchange", DLX_EXCHANGE_BOOT); 31 // x-dead-letter-routing-key 32 args.put("x-dead-letter-routing-key", DLX_QUEUE1_BOOT); 33 //x-message-ttl 34 args.put("x-message-ttl", 9000); 35 //x-max-length 36 args.put("x-max-length", 99); 37 return QueueBuilder.durable(TOPIC_QUEUE1_BOOT).withArguments(args).build(); 38 } 39 40 /** 41 * 声明死信队列 42 * 43 * @return 44 */ 45 @Bean("dlxqueue") 46 public Queue queueDlx() { 47 return QueueBuilder.durable(DLX_QUEUE1_BOOT).build(); 48 } 49 50 /** 51 * 声明s信交换机 52 * 53 * @return 54 */ 55 @Bean("dlxexchange") 56 public Exchange exchangeDlx() { 57 //return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_BOOT).durable(true).build(); 58 return ExchangeBuilder.directExchange(DLX_EXCHANGE_BOOT).durable(true).build(); 59 } 60 61 /** 62 * 声明正常的交换机 63 * 64 * @return 65 */ 66 @Bean("exchange") 67 public Exchange exchangetopic() { 68 //return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_BOOT).durable(true).build(); 69 return ExchangeBuilder.directExchange(TOPIC_EXCHANGE_BOOT).durable(true).build(); 70 } 71 72 /** 73 * 队列绑定交换机 74 * 75 * @param queue 76 * @param exchange 77 * @return 78 */ 79 @Bean 80 public Binding queueBindExchange(@Qualifier("itemqueue") Queue queue, @Qualifier("exchange") Exchange exchange) { 81 //return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); 82 return BindingBuilder.bind(queue).to(exchange).with("direct_queue1").noargs(); 83 } 84 85 86 /** 87 * s信队列绑定s信交换机 88 * 89 * @param queue 90 * @param exchange 91 * @return 92 */ 93 @Bean 94 public Binding dlxQueueBindDlxExchange(@Qualifier("dlxqueue") Queue queue, @Qualifier("dlxexchange") Exchange exchange) { 95 //return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); 96 return BindingBuilder.bind(queue).to(exchange).with("dlx_queue1").noargs(); 97 } 98 99 100} 101 102 103

第二种配置,ttl配置在生产者中

1MessagePostProcessor messagePostProcessor=message -> { 2 MessageProperties messageProperties = message.getMessageProperties(); 3 //设置编码 4 messageProperties.setContentEncoding("utf-8"); 5 //设置过期时间10*1000毫秒 6 messageProperties.setExpiration("5000"); 7 return message; 8 }; 9 10

流程
结构

生产者

1@RunWith(SpringRunner.class) 2@SpringBootTest 3public class RabbitMQTest { 4 5 @Autowired 6 private RabbitTemplate rabbitTemplate; 7 8 @Test 9 public void test(){ 10 for (int i = 0; i < 10000; i++) { 11 /*MessagePostProcessor messagePostProcessor=message -> { 12 MessageProperties messageProperties = message.getMessageProperties(); 13// 设置编码 14 messageProperties.setContentEncoding("utf-8"); 15// 设置过期时间10*1000毫秒 16 messageProperties.setExpiration("5000"); 17 return message; 18 };*/ 19 rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_BOOT, 20 "direct_queue1", i+""); 21 22 } 23 } 24} 25 26 27

消费者

1@Component 2@RabbitListener(queues = "dlx_queue1") 3public class MyListen { 4 @RabbitHandler 5 public void getMessage(String msg, Channel channel, Message message) throws IOException { 6 System.out.println("接受到的消息:"+msg); 7 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); 8 } 9 10} 11 12

延迟队列

延迟队列故名思议:即消息进去队列后并不是立即被消费,而是到达某一时间后,才会被消费,其实RabbitMQ中并没有延迟队列,其本质是借助ttl和死信队列结合构成与延迟队列一样的效果
延迟队列

代码交流 2021