使用prepared消息和确认消息是一种常见的基于消息中间件实现分布式事务的方式。具体实现流程如下:
- 事务协调器向消息中间件发送Start消息,表示开始一个新的分布式事务。消息中间件返回Transaction ID(tid),表示该事务的唯一标识。
- 业务系统向消息中间件发送Prepare消息,并携带该事务的tid和需要执行的本地事务数据。同时,业务系统需要等待消息中间件回复Prepare-Ack消息。
- 消息中间件收到Prepare消息后,向所有参与者节点分别发送Prepare消息,并携带该事务的tid和参与者需要执行的操作数据。参与者节点如果能够成功执行操作,则向消息中间件返回Prepare-ok消息。
- 消息中间件依次接收每个参与者节点的Prepare-ok消息,如果所有参与者节点都返回了Prepare-ok消息,则向业务系统回复Prepare-ack消息,并进入提交状态。如果有任何一个参与者节点返回了Prepare-fail消息,则向业务系统回复Prepare-fail消息,并进入回滚状态。
- 如果消息中间件回复了Prepare-ack消息,则业务系统开始执行本地事务,并将结果存储到准备提交的消息数据结构中。同时,业务系统需要发起Confirm消息,携带该事务的tid和本地事务执行结果。
- 消息中间件收到Confirm消息后,向所有参与者节点分别发送Commit消息或Rollback消息。参与者节点需要根据消息携带的tid和本地事务执行结果,决定是执行Commit操作还是Rollback操作,并返回Commit-ok或Rollback-ok消息。
- 消息中间件依次接收每个参与者节点的Commit-ok或Rollback-ok消息,如果正常接收到了所有参与者节点的消息,则向业务系统回复Confirm-ack消息,并完成该事务的提交或回滚操作。
疑问:如果有节点没有commit-ok消息呢?
如果有参与者节点在接收了Commit消息后没有正常返回Commit-ok消息,那么消息中间件需要进行异常处理。
一种处理方式是等待所有参与者节点的反馈,如果在超时时间内收到反馈则继续往下执行,到达Commit-ok或Rollback-ok消息对应的阶段,而缺失的参与者节点则被认为是异常的,进入补偿阶段进行回滚操作。这需要利用消息中间件保证消息的幂等性。
- 手动补偿:由管理员手动对已经提交的事务进行回滚操作。这种方式需要手动介入,操作费用高,且容易出错。
- 基于本地事务回滚:由消息中间件向参与者发送回滚消息,让参与者执行本地事务回滚操作,保证事务的一致性。这种方式简单易行,但是需要保证参与者的本地事务支持回滚操作。
- 基于日志记录的回滚:在事务提交的过程中,记录所有涉及到的数据和操作,如果出现异常,则利用记录的日志进行回滚操作。这种方式需要保证日志的完整性和正确性,并且需要对没有回滚的日志进行处理,否则可能会导致数据不一致。
- 基于分布式事务管理器的回滚:利用分布式事务管理器(如TCC、Seata)提供的回滚机制来实现补偿操作,保证事务的一致性。这种方式依赖于分布式事务管理器的技术选型,并且需要进行适当的配置和使用。
- 基于消息队列的重试机制:将无法提交或回滚的事务信息重新发送至消息队列,等待下一次操作,如果多次尝试后仍然失败,则需要进行手动补偿操作。
- 基于定时任务的重试机制:利用定时任务对未能成功提交或回滚的事务进行自动重试,如果多次尝试后仍然失败,则需要进行手动补偿操作。
- 基于容错性框架的自动重试:依据具体业务需求选择适合的分布式容错框架,通过配置自动重试次数和时间间隔等参数来实现自动重试,减少手动干预。
怕说不清楚画张草图示意一下大致流程:
- 以上流程仅适用于最终一致性;
- 消息队列需要支持事务型消息;
- 注意节点1本地数据操作时保留消息标识,以便消息队列询问消息源。
消息事务是指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。在此用Kafka 为例做进一步解说吧!
一、基本概念
为了支持事务,Kafka 0.11.0版本引入以下概念:
1.事务协调者:类似于消费组负载均衡的协调者,每一个实现事务的生产端都被分配到一个事务协调者(Transaction Coordinator)。
2.引入一个内部Kafka Topic作为事务Log:类似于消费管理Offset的Topic,事务Topic本身也是持久化的,日志信息记录事务状态信息,由事务协调者写入。
3.引入控制消息(Control Messages):这些消息是客户端产生的并写入到主题的特殊消息,但对于使用者来说不可见。它们是用来让broker告知消费者之前拉取的消息是否被原子性提交。
4.引入TransactionId:不同生产实例使用同一个TransactionId表示是同一个事务,可以跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作,避免事务僵死。
5.Producer ID:每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。主要是为提供幂等性时引入的。
6.Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。
7.每个生产者增加一个epoch:用于标识同一个事务Id在一次事务中的epoch,每次初始化事务时会递增,从而让服务端可以知道生产者请求是否旧的请求。
8.幂等性:保证发送单个分区的消息只会发送一次,不会出现重复消息。增加一个幂等性的开关enable.idempotence,可以独立与事务使用,即可以只开启幂等但不开启事务
二、事务流程
1、查找事务协调者
生产者会首先发起一个查找事务协调者的请求(FindCoordinatorRequest)。协调者会负责分配一个PID给生产者。类似于消费组的协调者。
2、获取produce ID
在知道事务协调者后,生产者需要往协调者发送初始化pid请求(initPidRequest)。这个请求分两种情况:
●不带transactionID
这种情况下直接生成一个新的produce ID即可,返回给客户端
●带transactionID
这种情况下,kafka根据transactionalId获取对应的PID,这个对应关系是保存在事务日志中(上图2a)。这样可以确保相同的TransactionId返回相同的PID,用于恢复或者终止之前未完成的事务。
3、启动事务
生产者通过调用beginTransaction接口启动事务,此时只是内部的状态记录为事务开始,但是事务协调者认为事务开始只有当生产者开始发送第一条消息才开始。
4、消费和生产配合过程
这一步是消费和生成互相配合完成事务的过程,其中涉及多个请求:
●增加分区到事务请求
当生产者有新分区要写入数据,则会发送AddPartitionToTxnRequest到事务协调者。协调者会处理请求,主要做的事情是更新事务元数据信息,并把信息写入到事务日志中(事务Topic)。
●生产请求
生产者通过调用send接口发送数据到分区,这些请求新增pid,epoch和sequence number字段。
●增加消费offset到事务
生产者通过新增的snedOffsets ToTransaction接口,会发送某个分区的Offset信息到事务协调者。协调者会把分区信息增加到事务中。
●事务提交offset请求
当生产者调用事务提交offset接口后,会发送一个TxnOffsetCommitRequest请求到消费组协调者,消费组协调者会把offset存储在__consumer-offsets Topic中。协调者会根据请求的PID和epoch验证生产者是否允许发起这个请求。 消费offset只有当事务提交后才对外可见。
5、提交或回滚事务
用户通过调用commitTransaction或abortTranssaction方法提交或回滚事务。
●EndTxnRequest
当生产者完成事务后,客户端需要显式调用结束事务或者回滚事务。前者会使得消息对消费者可见,后者会对生产数据标记为Abort状态,使得消息对消费者不可见。无论是提交或者回滚,都是发送一个EndTnxRequest请求到事务协调者,写入PREPARE_COMMIT或者PREPARE_ABORT信息到事务记录日志中(5.1a)。
●WriteTxnMarkerRequest
这个请求是事务协调者向事务中每个TopicPartition的Leader发送的。每个Broker收到请求后会写入COMMIT(PID)或者ABORT(PID)控制信息到数据日志中(5.2a)。
这个信息用于告知消费者当前消息是哪个事务,消息是否应该接受或者丢弃。而对于未提交消息,消费者会缓存该事务的消息直到提交或者回滚。
这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的Offset存于__consumer_offsets中,Transaction Coordinator也需要向该内部Topic的各Partition的Leader发送WriteTxnMarkerRequest从而写入COMMIT(PID)或COMMIT(PID)控制信息(5.2a 左边)。
●写入最终提交或回滚信息
当提交和回滚信息写入数据日子后,事务协调者会往事务日志中写入最终的提交或者终止信息以表示事务已经完成(图5.3),此时大部分于事务有关系的消息都可以被删除(通过标记后面在日志压缩时会被移除),我们只需要保留事务ID以及其时间戳即可。