- MongoDB CRUD 操作 >
- MongoDB CRUD 教程 >
- 执行两个阶段的提交
执行两个阶段的提交¶
概要¶
这个文档为执行多文档操作或者说 ” 多文档事务 ” 提供一个模型,这个模型使用两阶段提交方法把数据写到多个文档中。除此之外,你可以扩展这个过程以提供一个 rollback-like f功能。
背景¶
在MongoDB里一个 document 上的操作总是原子的;然而,常常被称作 “多文档事务” 的包含多个文档的操作并不是原子的。由于文档可能会相当复杂并且包含多个 “嵌套的” 文档,单文档原子性为许多使用情况提供了必要的支持。
尽管当文档原子操作很强大,但是仍然有需要多文档事务的情况。当执行一个由连续操作组成的事务时,某些问题出现了,比如:
原子性:如果一个操作失败,事务内的之前的操作必须 ” 回滚 “到之前的状态(就是 “all or nothing” 里面的 “nothing”)。
一致性:如果一个严重的故障(比如网络或者硬件)打断了事务,数据库必须可以恢复到一致的状态。
对于需要多文档事务的情景,你可以在你的应用里实现两阶段提交以提供这些多文档更新的支持。使用两阶段提交保证数据是一致的,并且在发生错误的情况下,执行事务之前的状态是 recoverable (可恢复的) 。然而,在执行过程中,文档可以展示未确定的(事务提交之前的)数据和状态。
注解
因为MongoDB数据库里仅仅单文档的操作是原子的,两阶段提交仅仅可以提供一个 类似 事务的语义。对于应用来说在两阶段提交期间在中间点返回中间的数据或者回滚是有可能的。
模型¶
概述¶
假设一个情景,你想从账户 A 转钱到账户 B 。在关系型数据库系统里,你可以在一个多语句事务内从 A 账户上减去钱并且为 B 账户添加上钱。在MongoDB里,你可以模仿两阶段提交以达到一个类似的结果。
这个教程里的例子使用下面的两个集合:
命名为 accounts 的集合存储账户信息。
命名为 transactions 的集合存储有关转账事务的信息。
初始化源账户和目的账户¶
在 accounts 集合里分别为账户 A 和账户 B 插入一个文档。
db.accounts.insert(
[
{ _id: "A", balance: 1000, pendingTransactions: [] },
{ _id: "B", balance: 1000, pendingTransactions: [] }
]
)
这个操作返回一个包含操作状态的 BulkWriteResult() 对象。成功插入之后, BulkWriteResult() 的 nInserted 字段设置为 2 。
初始化转账记录¶
对于每一次转账的完成,往 transactions 集合里插入一条包含转账信息的文档。这个文档包含如下字段:
source 和 destination 字段,与 accounts 集合里的 _id 字段相关联的。
value 字段,指定影响 source 账户和 destination 账户 balance 的传输量,
state 字段,反应传输的当前状态。state 字段可以具有值 initial , pending , applied , done , canceling 和 canceled 。
lastModified 字段,反映了最后修改的日期。
想要初始化从账户 A 到账户 B 的 100 的转账,在 transactions 集合里插入一个包含转账信息的文档,设置交易 state 为 "initial",并且 lastModified 字段设置为当前日期:
db.transactions.insert(
{ _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() }
)
这个操作返回一个包含操作状态的 WriteResult() 对象。成功插入后, WriteResult() 对象的 nInserted 被设置为 1 。
使用两阶段提交在账户之间转移资金¶
检索未开始的交易记录。¶
在 transactions 集合里,查找处于 initial 状态的交易记录。目前 transactions 即合理仅仅有一个文档,即在 初始化转账记录 一步中添加的那一个。如果集合中有更多的文档,除非你配置额外的查询条件,否则这个查询将返回任何 initial 状态的交易记录。
var t = db.transactions.findOne( { state: "initial" } )
在 mongo 命令行中输入变量 t 以打印这个变量的内容。除了 lastModified 字段应该显示你插入操作的日期外,这个操作应该打印一个类似下面的内容。
{ "_id" : 1, "source" : "A", "destination" : "B", "value" : 100, "state" : "initial", "lastModified" : ISODate("2014-07-11T20:39:26.345Z") }
更新交易记录状态为pending。¶
设置交易记录 state 从 initial 变更为 pending ,然后使用 $currentDate 操作符设置 lastModified 字段为当前日期。
db.transactions.update(
{ _id: t._id, state: "initial" },
{
$set: { state: "pending" },
$currentDate: { lastModified: true }
}
)
这个操作返回一个包含操作状态的 WriteResult() 对象。在成功更新后, nMatched 和 nModified 显示为 1 。
在更新声明中, state: "initial" 条件保证没有其他的进程已经更新了这条记录。如果 nMatched 和 nModified 是 0 ,返回到第一步获取一个不同的交易记录并且重新开始这个过程。
把交易记录应用到两个账户。¶
如果 交易记录还没有应用到这些账户上,那么就使用 update() 方法应用交易记录 t 到两个账户上。在更新条件中,要包含条件 pendingTransactions: { $ne: t._id } 以避免重复应用交易记录,如果不止一次运行步骤。
想要应用交易记录到账户上,要更新 balance 字段和 pendingTransactions 字段。
更新源账户,从它的 balance 中减去交易记录的 value ,并且把交易记录的 _id 添加到它的 pendingTransactions 数组。
db.accounts.update(
{ _id: t.source, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }
)
在成功更新后,这个方法返回一个 nMatched 和 nModified 值为 1 的 WriteResult() 对象。
更新目的账户,为它的 balance 添加交易记录的 value ,并且把交易记录的 _id 添加到它的 pendingTransactions 数组。
db.accounts.update(
{ _id: t.destination, pendingTransactions: { $ne: t._id } },
{ $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }
)
在成功更新后,这个方法返回一个 nMatched 和 nModified 值为 1 的 WriteResult() 对象。
更新交易状态为 applied。¶
使用下面的 update() 操作设置交易记录的 state 为 applied 并且更新 lastModified 字段:
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "applied" },
$currentDate: { lastModified: true }
}
)
在成功更新后,这个方法返回一个 nMatched 和 nModified 值为 1 的 WriteResult() 对象。
更新两个账户的挂起(pending)交易记录列表。¶
从两个账户 pendingTransactions 数组中移除已经应用的交易记录 _id 。
更新源账户。
db.accounts.update(
{ _id: t.source, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)
在成功更新后,这个方法返回一个 nMatched 和 nModified 值为 1 的 WriteResult() 对象。
更新目的账户。
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{ $pull: { pendingTransactions: t._id } }
)
在成功更新后,这个方法返回一个 nMatched 和 nModified 值为 1 的 WriteResult() 对象。
更新交易记录状态为完成。¶
设置交易记录的 state 为 done 并且更新 lastModified 字段,以完成这笔交易。
db.transactions.update(
{ _id: t._id, state: "applied" },
{
$set: { state: "done" },
$currentDate: { lastModified: true }
}
)
在成功更新后,这个方法返回一个 nMatched 和 nModified 值为 1 的 WriteResult() 对象。
从失败场景中恢复¶
事务过程最重要的一部分不是上面典型的例子,而是当事务没有成功完成的时候从各种各样的失败场景中恢复的可能性。这一部分展现了可能的失败的概述并且提供从这些种事件中恢复的步骤。
恢复操作¶
两阶段提交模式允许应用运行序列以恢复事务,并且达到一致的状态。在应用启动的时候运行恢复操作,并且如果可能的话每隔一段时间,捕捉一些未完成的事务。
需要达到一致状态的时间取决于这个应用恢复每一个事务花费的时间。
下面的恢复程序使用 lastModified 日期作为 pending 的事务是否需要恢复的标识;特别地,如果 pending 或者 applied 事务在最后的30分钟内还没有被更新,程序将判定这些事务需要恢复。你可以使用不同的条件来做这个判断。
处于 Pending 状态的事务¶
要想从 “`Update transaction state to pending.`_” 步骤之后 “`Update transaction state to applied.`_” 步骤之前发生的错误恢复,请从 transactions 集合中检索出一个 pending 的事务来恢复:
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "pending", lastModified: { $lt: dateThreshold } } );
从步骤 “`Apply the transaction to both accounts.`_” 重新开始执行
处于 Applied 状态的事务¶
要想从 “`Update transaction state to applied.`_” 步骤之后 “`Update transaction state to done.`_” 步骤之前发生的错误恢复,请从 transactions 集合中检索出一个 applied 的事务来恢复:
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "applied", lastModified: { $lt: dateThreshold } } );
从步骤 “`Update both accounts’ list of pending transactions.`_“重新开始执行
回滚操作¶
有时候,你需要 “回滚” 或者撤销一个事务;比如,应用需要 “取消” 事务或者其中一个账户不存在或者停止在交易过程中存在(感觉不太贴切,贴出原文:or if one of the accounts does not exist or stops existing during the transaction.)。
处于 Applied 状态的事务¶
在 “`Update transaction state to applied.`_” 步骤之后,你 不 应该回滚事务。取而代之地,完成那个事务并且创建一个新的事务并通过切换源字段和目的字段的值来冲销交易记录。
处于 Pending 状态的事务¶
在 “`Update transaction state to pending.`_” 步骤之后并且 “`Update transaction state to applied.`_” 步骤之前,你可以使用下面的步骤回滚事务:
更新交易记录状态为 canceling。¶
更新交易记录的 state 字段从 pending 变为 canceling 。
db.transactions.update(
{ _id: t._id, state: "pending" },
{
$set: { state: "canceling" },
$currentDate: { lastModified: true }
}
)
在成功更新后,这个方法返回一个 nMatched 和 nModified 值为 1 的 WriteResult() 对象。
撤销针对两个账户的交易。¶
若要撤消对这两个帐户的交易,如果该交易已经应用了(applied),则反向交易 t 。在更新条件里,要包括条件 pendingTransactions: t._id ,这是为了仅仅当挂起的(pending)交易完成(applied)以后才更新这个账户。
更新目标账户,从它的 balance 中减去交易 value 并且从 pendingTransactions 数组中移除交易 _id 。
db.accounts.update(
{ _id: t.destination, pendingTransactions: t._id },
{
$inc: { balance: -t.value },
$pull: { pendingTransactions: t._id }
}
)
成功更新后,这个方法返回一个包含被置为1的 nMatched 和 nModified 的 WriteResult() 对象。如果在这之前挂起的交易还没有应用到这个账户上,将没有文档匹配这些更新条件并且 nMatched 和 nModified 将被置为 0 。
更新源账户,给它的 balance 增加交易 value 并且从 pendingTransactions 数组中移除交易 _id 。
db.accounts.update(
{ _id: t.source, pendingTransactions: t._id },
{
$inc: { balance: t.value},
$pull: { pendingTransactions: t._id }
}
)
成功更新后,这个方法返回一个包含被置为1的 nMatched 和 nModified 的 WriteResult() 对象。如果在这之前挂起的交易还没有应用到这个账户上,将没有文档匹配这些更新条件并且 nMatched 和 nModified 将被置为 0 。
更新交易状态为取消(canceled)¶
要完成回滚,请把交易 state 从 canceling 更新为 cancelled 。
db.transactions.update(
{ _id: t._id, state: "canceling" },
{
$set: { state: "cancelled" },
$currentDate: { lastModified: true }
}
)
在成功更新后,这个方法返回一个 nMatched 和 nModified 值为 1 的 WriteResult() 对象。
多个应用¶
存在部分交易,他们以便多个应用创建并且一致地运行操作,并且不会造成数据不一致或者冲突。在我们的过程中,想要更新或者检索交易文档,更新条件则包含一个关于 state 字段的条件以避免被多个应用重复应用交易记录。
例如, App1 和 App2 都获取到了处于 initial 状态的同一交易记录。在 App2 之前 App1 完成了整个交易。当 App2 试图执行 “`Update transaction state to pending.`_” 步骤的时候,包含 state: "initial" 要求的的更新条件将不会匹配到任何文档,并且 nMatched 和 nModified 的值将是 0 。这是向 App2 发出一个信号:返回到第一步对不同的交易记录重新开始这个过程。
当多个应用在运行的时候,在同一时间点仅仅一个应用可以处理某个给定的交易记录是非常重要的。因此,除了在更新条件里包含交易记录的期许状态之外,你还可以在交易记录文档自身中创建一个标记,它能够辨认正在操作这个交易记录文档的应用。使用 findAndModify() 方法在一步内修改交易记录并且获取此记录。
t = db.transactions.findAndModify(
{
query: { state: "initial", application: { $exists: false } },
update:
{
$set: { state: "pending", application: "App1" },
$currentDate: { lastModified: true }
},
new: true
}
)
改进交易操作以保证仅仅与 application 字段中标识符相匹配的应用程序才能应用该交易记录。
如果在交易执行期间应用 App1 失败的话,你可以使用 recovery procedures ,但是,应用程序应该保证在它们应用该交易之前 “拥有” 这个交易记录。例如查找并且继续挂起的工作,使用一个类似于下面的查询:
var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
db.transactions.find(
{
application: "App1",
state: "pending",
lastModified: { $lt: dateThreshold }
}
)
在生产应用中使用两阶段提交¶
上面的交易记录是有意的简单。例如,假设总是对一个账户进行回滚操作,账户余额有可能为负值。
生产实现可能会更复杂。通常情况下,账户需要当前余额,待定积分(pending credit)和挂起的借方(pending debits)信息等。
对于所有的交易,确保为你的部署使用一个级别的合适的 write concern 。