发件箱模式简介
一个微服务可能需要执行“存数据库”和“发送事件”两个步骤。例如发布一篇文章后,需要更新作者的发文统计信息。业务上要求两个操作同时失败,或者同时成功,而不能出现一个成功一个失败。假如最终文章发布了,更新发文统计失败了,就会导致数据不一致。
发件箱模式是解决这个问题的最常用模式,其原理为:
- 本地业务作为一个事务运行,在提交事务之前,将事件写入到消息表;提交事务时,会同时提交业务,以及事件
通过轮询消息表或者监听binlog方式,将事件发给消息队列
- 轮询方式:每隔1s或者0.2s取出消息表中事件,发给消息队列,然后删除事件
- 监听binlog方式:通过Debezium等数据库工具,监听数据库的binlog,获取事件,发送给消息队列
- 编写消费者,处理事件
由于1中,业务和事件的提交是在同一个事务,保证了两者会同时提交。
在步骤2,3中,都是不会失败的操作,如果中间发生宕机事件等,都会重试,并最终成功。
对于前述的发文后提交统计信息场景,上述方案保证了统计信息被最终更新,数据会达到最终一致
多数据库的问题
在当今流行的微服务架构下,通常一个微服务会采用一个单独的数据库。当多个服务需要使用发件箱模式时,那么传统的发件箱架构就比较难以维护。
- 采用轮询方式获取事件:需要在轮询任务中,编写多个数据库的轮询任务
- 采用监听binlog获取事件:需要监听多个数据库的binlog
上述两种获取事件的方式,在面对数量较多的数据库,可维护性差。而且该架构的弹性并不好,假如数据库多,而时间产生的事件少,也会导致该架构的负载高,浪费资源。最理想的架构负载是,只跟发送的事件数量相关,跟其他因素无关。
解决方案
开源分布式事务框架 https://github.com/dtm-labs/dtm 里面的二阶段消息,可以很好的处理这个问题。下面是一个跨行转账业务的使用示例:
msg := dtmcli.NewMsg(DtmServer, gid).
Add(busi.Busi+"/TransIn", &TransReq{Amount: 30})
err := msg.DoAndSubmitDB(busi.Busi+"/QueryPreparedB", db, func(tx *sql.Tx) error {
return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS")
})
这部分代码中
- 首先生成一个DTM的msg全局事务,传递dtm的服务器地址和全局事务id
- 给msg添加一个分支业务逻辑,这里的业务逻辑为余额转入操作TransIn,然后带上这个服务需要传递的数据,金额30元
然后调用msg的DoAndSubmitDB,这个函数保证业务成功执行和msg全局事务提交,要么同时成功,要么同时失败
- 第一个参数为回查URL,详细含义稍后说
- 第二个参数为sql.DB,是业务访问的数据库对象
- 第三个参数是业务函数,我们这个例子中的业务是给A扣减30元余额
成功流程
DoAndSubmitDB是如何保证业务成功执行与msg提交的原子性的呢?请看如下的时序图:
一般情况下,时序图中的5个步骤会正常完成,整个业务按照预期进行,全局事务完成。这里面有个新的内容需要解释一下,就是msg的提交是按照两个阶段发起的,第一阶段调用Prepare,第二阶段调用Commit,DTM收到Prepare调用后,不会调用分支事务,而是等待后续的Submit。只有收到了Submit,开始分支调用,最终完成全局事务。
异常情况
在分布式系统中,各类的宕机和网络异常都是需要考虑的,下面我们来看看可能发生的问题:
首先我们要达到的最重要目标是业务成功执行和msg事务是原子操作,那么假如前面时序图中,当Prepare
消息发送成功之后,Submit
消息发送成功之前,出现异常宕机会如何?这个时候dtm会检测到该事务超时,会进行回查。对于开发人员来说,该回查很简单,只需要粘贴如下代码即可:
app.GET(BusiAPI+"/QueryPreparedB", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).QueryPrepared(dbGet())
}))
如果您使用的不是go框架gin,那么您需要根据您的框架做一些小修改,但是该代码是通用的,适合您的每个业务。
回查的主要原理主要是通过消息表,但是dtm的回查经过仔细的论证,能够处理以下情况:
- 回查时,本地事务未开始
- 回查时,本地事务还在进行中
- 回查时,本地事务已回滚
- 回查时,本地事务已提交
详细的回查原理有些复杂,已申请了专利,这里不做详细介绍,详情可以参考https://dtm.pub/practice/msg.html
多数据库支持
该方案下,如果您需要处理多数据库,运维层面,只需要给相应的库创建好消息表;代码层面,只需要在回查的地方,传入不同的数据库连接即可。
对比于原有的轮询表,以及监听binlog方案,运维成本大大降低。该架构的负载仅仅与事件数量相关,跟数据库数量等其他因素无关,具备了更好的弹性。
更多存储引擎的支持
dtm的二阶段消息,不仅提供了数据库的支持DoAndSubmitDB
,还提供了NoSQL的支持
Mongo支持
下面这段代码,可以保证Mongo下的业务和消息两者同时提交
err := msg.DoAndSubmit(busi.Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return bb.MongoCall(MongoGet(), func(sc mongo.SessionContext) error {
return SagaMongoAdjustBalance(sc, sc.Client(), TransOutUID, -reqFrom(c).Amount, reqFrom(c).TransOutResult)
})
})
Redis支持
下面这段代码,可以保证Redis下的业务和消息两者同时提交
err := msg.DoAndSubmit(busi.Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400)
})
dtm的回查方案可以很容易的扩展到其他各种各样的支持事务的存储引擎
方案特点
二阶段消息下具备以下特点:
- 优雅的支持了多数据库
- 不仅支持SQL数据库,还支持了Mongo,Redis等NoSQL
- 代码简短,比通常的发件箱模式代码量大幅减少
- 整个架构和开发过程不涉及消息队列,只涉及api,更容易上手
- 负载仅仅与消息量有关,与涉及的数据库数量无关
对比RocketMQ事务消息
回查的这种形式最早是在RocketMQ的事务消息中提出的,但是作者全网查找了回查的例子,以及各种案例,都未找到能够把各种异常情况都处理好的回查方案。已找到的方案中,都未能够正确处理”本地事务还在进行中“的这种情况,都会存在极端情况导致数据不一致,详情参考https://dtm.pub/practice/msg.html。
另外dtm的二阶段消息,不需要引入队列,或者也可以结合其他的消息队列使用,因此使用范围更广
小结
本文介绍的dtm二阶段消息,更好的支持多数据库的情况。该架构方案,具备诸多优点,可以完美的替代发件箱模式,给开发者带来更简单易用的架构。
欢迎访问https://github.com/dtm-labs/dtm 并star支持我们