【RabbitMQ】消息确认机制的“事务机制”和“confirm模式”

消息确认机制的“事务机制”和“confirm模式”<1>

  • AMQP事务机制

  • 代码实现

  • 成功提交Demo

  • P生产者 * C消费者 * 运行结果

    1 * 异常回滚Demo 2
  • P生产者 * C消费者 * 运行结果

    1 * 总结 2
  • Confirm模式

在RabbitMQ中,我们可以通过持久化数据,解决RabbitMQ服务器异常 的数据丢失问题。
这里说的持久化属于生产者已经把消息发送到RabbitMQ了的情况下才能讨论持久化。
但是在生产者给RabbitMQ发送消息的这个过程,我们还有一个问题就是不确定是否发送到RabbitMQ服务器

问题

  • 生产者将消息发送出去之后,消息到底有没有到达RabbitMQ服务器,默认的情况是

不知道的

解决方案

  • AMQP协议实现的事务机制
  • Confirm模式

AMQP事务机制

  • txSelect:用户将当前channel设置成transation模式
  • txCommit:用于提交事务
  • txRollback:回滚事务

代码实现

成功提交Demo

P生产者

TxSend.class

1package com.springrabbitmq.tx; 2 3import com.rabbitmq.client.Channel; 4import com.rabbitmq.client.Connection; 5import com.springrabbitmq.util.RabbitMQConnectionUtil; 6 7import java.io.IOException; 8import java.util.concurrent.TimeoutException; 9 10/** 11 * @Title: 12 * @author: fly 13 * @date: 2020-3-11 14 */ 15public class TxSend { 16 private static final String QUEUE_NAME="test_queue_tx"; 17 public static void main(String[] args) throws IOException, TimeoutException { 18 19 Connection connection = RabbitMQConnectionUtil.getConnection(); 20 Channel channel = connection.createChannel(); 21 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 22 String msg="hello tx massage"; 23 24 try { 25 channel.txSelect(); 26 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); 27 channel.txCommit(); 28 System.out.println("Send"+msg); 29 } catch (Exception e) { 30// e.printStackTrace(); 31 channel.txRollback(); 32 System.out.println("Send massage txRollback!!!"); 33 } 34 channel.close(); 35 connection.close(); 36 } 37} 38 39 40

C消费者

TxRecv.class

1package com.springrabbitmq.tx; 2 3import com.rabbitmq.client.*; 4import com.springrabbitmq.util.RabbitMQConnectionUtil; 5 6import java.io.IOException; 7import java.util.concurrent.TimeoutException; 8 9/** 10 * @Title: 11 * @author: fly 12 * @date: 2020-3-11 13 */ 14public class TxRecv { 15 private static final String QUEUE_NAME="test_queue_tx"; 16 17 public static void main(String[] args) throws IOException, TimeoutException { 18 Connection connection = RabbitMQConnectionUtil.getConnection(); 19 Channel channel = connection.createChannel(); 20 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 21 //自动确认true 22 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 System.out.println("事务"+new String(body,"UTF-8")); 26 } 27 }); 28 } 29} 30 31

运行结果

事务提交成功!
在这里插入图片描述
在这里插入图片描述

异常回滚Demo

P生产者

TxSend.class

1package com.springrabbitmq.tx; 2 3import com.rabbitmq.client.Channel; 4import com.rabbitmq.client.Connection; 5import com.springrabbitmq.util.RabbitMQConnectionUtil; 6 7import java.io.IOException; 8import java.util.concurrent.TimeoutException; 9 10/** 11 * @Title: 12 * @author: fly 13 * @date: 2020-3-11 14 */ 15public class TxSend { 16 private static final String QUEUE_NAME="test_queue_tx"; 17 public static void main(String[] args) throws IOException, TimeoutException { 18 19 Connection connection = RabbitMQConnectionUtil.getConnection(); 20 Channel channel = connection.createChannel(); 21 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 22 String msg="hello tx massage"; 23 24 try { 25 channel.txSelect(); 26 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); 27 28 //提交前制造异常 29 int fly=1/0; 30 31 channel.txCommit(); 32 System.out.println("Send"+msg); 33 } catch (Exception e) { 34// e.printStackTrace(); 35 channel.txRollback(); 36 System.out.println("Send massage txRollback!!!"); 37 } 38 channel.close(); 39 connection.close(); 40 } 41} 42 43 44

C消费者

不改,与前面的消费者一致

运行结果

启动TxSend,事务回滚,消费者控制台没有接收到消息
在这里插入图片描述

总结

  • 这种模式通过txSelect、txCommit、txRollback发送AMQP协议确认消息,降低了消息的吞吐量,因为走的通信太多了,每一个txSelect、txCommit、txRollback都是一次请问,就会导致请求太多

Confirm模式

代码交流 2021