方案1–业务方自己实现
假设消息中间件没有提供“事务消息”功能,比如你用的是Kafka。那如何解决这个问题呢?
解决方案如下:
(1)Producer端准备1张消息表,把update DB和insert message这2个操作,放在一个DB事务里面。
(2)准备一个后台程序,源源不断的把消息表中的message传送给消息中间件。失败了,不断重试重传。允许消息重复,但消息不会丢,顺序也不会打乱。
(3)Consumer端准备一个判重表。处理过的消息,记在判重表里面。实现业务的幂等。但这里又涉及一个原子性问题:如果保证消息消费 + insert message到判重表这2个操作的原子性?
消费成功,但insert判重表失败,怎么办?关于这个,在Kafka的源码分析系列,第1篇, exactly once问题的时候,有过讨论。
通过上面3步,我们基本就解决了这里update db和发送网络消息这2个操作的原子性问题。
但这个方案的一个缺点就是:需要设计DB消息表,同时还需要一个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合额外增加业务方的负担。
方案2 – RocketMQ 事务消息
为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。
具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。
具体来说,上面的2个步骤,被分解成3个步骤:
(1) 发送Prepared消息
(2) update DB
(3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。
可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?
具体代码实现如下:
也就是定义了一个checkListener,RocketMQ会回调此Listener,从而实现上面所说的方案。
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
public TransactionSendResult sendMessageInTransaction(.....) {
// 逻辑代码,非实际代码
// 1.发送消息
sendResult = this.send(msg);
// sendResult.getSendStatus() == SEND_OK
// 2.如果消息发送成功,处理与消息关联的本地事务单元
LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
// 3.结束事务
this.endTransaction(sendResult, localTransactionState, localException);
}
总结:对比方案2和方案1,RocketMQ最大的改变,其实就是把“扫描消息表”这个事情,不让业务方做,而是消息中间件帮着做了。
至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”,记录事物执行状态。
人工介入
可能有人又要说了,无论方案1,还是方案2,发送端把消息成功放入了队列,但消费端消费失败怎么办?
消费失败了,重试,还一直失败怎么办?是不是要自动回滚整个流程?
答案是人工介入。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题。比如自动回滚失败,又怎么处理?
对应这种极低概率的case,采取人工处理,会比实现一个高复杂的自动化回滚系统,更加可靠,也更加简单。
相关推荐
中间件作业,做有关事务处理的源代码和完整报告
中间件是一种处于系统软件(操作系统和网络软件)与应用软件之间的软件,它能使应用软件之间进行跨网络的透明访问和协同工作.ppt介绍了远程过程调用中间件、分布式对象中间件、事务处理中间件 以及消息中间件的基本...
提高数据库事务处理性能的中间件设计.pdf
它给程序员提供了一个事务处理的,程序员可以使用这个程序接口编写高速而且可靠的分布式应用程序———基于事务处理的应用程序。交易中间件向用户提供一系列的服务,如应用管理、管理控制、应用程序间的消息传递等。...
TongLINK/Q是和IBM MQ相近的消息中间件,其最主要的功能是保证消息的一致性,举一个例子,如果一个应用从socket中接收了银行A发来的数据,在处理数据过程中,应用程序崩溃,这时,银行A数据就会丢失,银行业务数据就...
消息队列之事务消息?RocketMQ和Kafka是怎么做的? 比 RocketMQ 更好的事务消息实现是什么? Kafka的索引设计有什么亮点? Kafka日志段如何读写解析? Kafka控制器事件处理全流程解析 Kafka请求处理全流程解析 Kafka...
作一个最简单的消息中间件(单机可达 180K TPS),支持顺序、事务、延时、过期、请求等消息特性。高吞吐量、低延迟,集群模式每秒能处理百万消息,最低延迟不到1毫秒。集群模式支持服务节点热扩展。流量高时随时加,...
第7章中间件中的事务处理 第8章coRBA高级技术 第9彦无线、移动中间件 第10章反射中间件 第1I孽网络即插即用中间件 第12章Web服务 第13章其他中间件技术 第14章 中间件的典型应用 附录1常见中间件平台比较 附录2名词...
系统介绍IBM大型机S/390事务处理中间件CICS,共分为11章
在Producer方面,分为普通发送者和事务消息发送者,其中普通发送者构建Netty客户端向Broker发送消息,而事务消息发送者还需要构建Netty服务端以供Broker回查本地事务状态。消息存储方面,探讨了Broker如何存储...
行业分类-设备装置-事务中间件机器环境中处理数据库状态通知的系统和方法
300M资源,微服务架构面试专题系列(MySQL,JVM,并发编程,RabbitMQ消息中间件,Spring)。 囊括的知识点非常多 1. Java基础包括了:集合,HashMap,JVM等常见考点, 说一下 JVM 的主要组成部分及其作用? 说一下 ...
中间件技术,联机事务处理
设计了一种Jboss中间件异常事务恢复功能,并实现了maven工程中交易服务的异常事务处理和恢复过程,该过程能在断电、宕机、程序故障等异常情况发生时,及时恢复最近一次正常工作的交易服务状态和数据库数据。...
最近想给我的框架加一种功能,就是比如给一个方法加一个事务的特性Attribute,那这个方法就会启用事务处理。给一个方法加一个缓存特性,那这个方法就会进行缓存。 这个也是网上说的面向切面编程AOP。 AOP的概念也很...
WebSphere® MQ (也称...IBM 消息中间件MQ以其独特的安全机制、简便快速的编程风格、卓越不凡的稳定性、可扩展性和跨平台性,以及强大的事务处理能力和消息通讯能力,成为业界市场占有率最高的消息中间件产品。
WebSphere® MQ (也称...IBM 消息中间件MQ以其独特的安全机制、简便快速的编程风格、卓越不凡的稳定性、可扩展性和跨平台性,以及强大的事务处理能力和消息通讯能力,成为业界市场占有率最高的消息中间件产品。
WebSphere® MQ (也称...IBM 消息中间件MQ以其独特的安全机制、简便快速的编程风格、卓越不凡的稳定性、可扩展性和跨平台性,以及强大的事务处理能力和消息通讯能力,成为业界市场占有率最高的消息中间件产品。
WebSphere® MQ (也称...IBM 消息中间件MQ以其独特的安全机制、简便快速的编程风格、卓越不凡的稳定性、可扩展性和跨平台性,以及强大的事务处理能力和消息通讯能力,成为业界市场占有率最高的消息中间件产品。
- 异常处理 - 多线程编程 2. 数据库: - 熟悉SQL语言 - 了解关系型数据库和非关系型数据库 - 数据库连接池 - 数据库事务 3. Spring框架: - Spring Boot - Spring MVC - Spring Data - Spring ...