详解Fescar分布式事物处理流程

2019/01/20

FESCAR (Fast easy commit and rollback),是阿里GTS的社区开源版本,基于现有的分布式事物解决方案,要么像XA这种会有严重的性能问题,要么像业务补偿,TCC,SAGA,可靠消息保证等,这种需要根据业务场景在应用中进行定制,我们不希望引入微服务后给业务层带来额外的研发负担,另外一方面不希望引入分布式事物后拖慢业务,所以FesCar的初衷就是对业务0侵入,并且高性能.

下面先通过官方上的介绍,看下fescar的思想,后面在结合代码看下fescar的具体实现细节

我们先简单回顾下XA里面分布式事物的三个角色,TM事物管理器.负责发起一个全局事物,TC事物协调器,独立在应用层外,负责维护全局事物和分支事物的状态,来决策提交还是回滚,RM:资源管理器,存储实际状态的容器,像硬盘,数据库,消息中间件,内存等都可以称为RM,Fescar将RM作为一个二方包的形式迁入到了应用层,作为应用层和TC进行通讯的代理,并且协助本地事物处理.

  • 1.TM向TC申请开启一个全局事物,全局事物创建成功,并返回一个唯一的XID
  • 2.XID在微服务调用链路中进行传递,dubbo可以通过filter方式,spring cloud也很方便,RPC服务端收到XID后存放在本地线程局部变量threadlocal中
  • 3.分支事物RM向TC注册分支事物,将其纳入XID对应的全局事物管理中
  • 4.TM向TC发起针对XID的全局事物的提交或者回滚操作
  • 5.TC调用XID管辖下的分支事物完成提交和回滚

FESCAR取消了数据库层的prepare操作,而是直接进行commit操作,这样就不会带来昂贵的数据库锁开销,而每一个commit操作对应的都是数据库的本地事物,这个改变是Fescar性能高的主要原因,同时使他牺牲了隔离性,导致目前Fescar只能支持读未提交的隔离级别,如果要实现读已提交需要应用层做一些定制.

Fescar的RM插件,重新实现了一遍JDBC,从而拦截掉数据库的sql进行解析,并生成undolog,以及事物提交后的增强处理,这种设计使应用方完全无感,只需要开启一个全局事物

undolog是存储在业务方本地的数据库实例里,这样业务更新和插入undolog在一个本地事物内,可以保证事物回滚的时候一定有undolog.

目前fescar刚开源,在可靠性上还需要验证,目前社区也在计划完善一些新功能.

下面我们分析下Fescar的一些核心功能

事物管理器

我们先看下全局事物的三个核心接口,begin,commit,rollback,我在代码中都加了注释

public interface GlobalTransaction {

    /**
     * Begin a new global transaction with default timeout and name.
     *
     * 开启一个全局事物,像TC发起请求,返回一个XID,并将这个XID进行缓存到ThreadLocal
     *
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    void begin() throws TransactionException;

    /**
     * Commit the global transaction.
     *
     * 提交一个全局事物,将XID发给TC,并清除threadlocal里的缓存
     *
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    void commit() throws TransactionException;

    /**
     * Rollback the global transaction.
     *
     * 回滚一个全局事物,将XID发给TC,并清除threadlocal里的缓存
     *
     * @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
     * out.
     */
    void rollback() throws TransactionException;
}

然后我们在看下事物处理模板,也就是我们使用的入口,也是接入fescar唯一要关心的一个地方

public class TransactionalTemplate {

    /**
     * Execute object.
     *
     * @param business the business 只需要传人一个TransactionalExecutor就可以了,业务实现放在execute里面就可以了
     * @return the object
     * @throws ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 2. begin transaction
        try {
            tx.begin(business.timeout(), business.name());

        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }

        Object rs = null;
        try {

            // Do Your Business
            rs = business.execute();

        } catch (Throwable ex) {

            // 3. any business exception, rollback.
            try {
                tx.rollback();

                // 3.1 Successfully rolled back
                throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);

            } catch (TransactionException txe) {
                // 3.2 Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.RollbackFailure, ex);

            }

        }

        // 4. everything is fine, commit.
        try {
            tx.commit();

        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);

        }
        return rs;
    }

}

有没有发现跟我们平时使用的JTA用法一样,由于分支事物是RPC调用,所以存在网络超时的情况,所以分支事物如果超时了,即使分支事物的本地执行成功了,全局事物一样会进行回滚,因为这里会捕获这个超时异常,后面我们在分析为什么要这样设计.

xid怎么在rpc中传递

我们在做分布式链路监控的时候,也需要在rpc之间传递一个traceid,方法类似,如果是dubbo,我们可以写一个filter

@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID();
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) {
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
            if (rpcXid != null) {
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation);

        } finally {
            if (bind) {
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }
}

RootContext是fescar的threadlocal容器,RpcContext是dubbo得threadcontext容器,Attachment可以让dubbo在远程调用过程中携带更多地参数,服务调用方传递xid,服务提供方接收xid并保存,服务调用结束记得清空threadlocal以防止内存泄露.

分支在一阶段的处理流程

PreparedStatementProxy -> ExecuteTemplate -> UpdateExecutor ->ConnectionProxy

  • 0.sql进入jdbc的PreparedStatement中,然后这个jdbc对象被PreparedStatementProxy代理,进入他的execute方法
  • 1.查看自己是否在一个Fescar的全局事物中,根据线程本地变量threadlocal中是否存在全局事物id进行判断,具体代码在ExecuteTemplate中
  • 2.如果存在,就进入jdbc代理中,解析sql,根据类型选择不同的执行器

  • 3.使用 select for update 查询获取原始当前update的记录当前值,就是修改前的值
  • 4.执行原本的update修改记录的值
  • 5.使用 select for update 查询获取update后的值,就是修改后的值
  • 6.根据修改前和修改后的值,生成undolog,并根据主键的值生成lockkey放入context中

  • 5.向TC请求,注册分支事物并将lockkey传给TC加上全局锁
  • 6.如果注册并且获取锁成功,将把undolog插入数据库中
  • 7.提交本地事物,并向TC反馈执行结果 (本地事物中包含原本的执行语句和插入undolog)

我们发现分支事物上的RM操作是基于statement和connection,在原来的connection上做了增强,用的时同一个物理connection,所以分支应用上的分支的定义为一个本地事物,所以 在一个RPC实现中,一个方法中如果存在多个sql语句,那么将会注册多个分支,向TC注册多次,如果这个方法在一个本地事物中,那么即使多个sql最终一起提交,并且只会向TC注册一次,undolog也是一起插入数据库的,这个地方注意下,如果这个connection是自动提交的,为了让update语句和插入undolog放在一个本地事物中,所以会将connection改为非自动提交,开启一个事物,在用完只会在改为自动提交,不要影响应用程序.

我们回顾下分支一阶段的处理流程

二阶段的处理流程

  • 1.如果所有的分支都返回执行成功,TC将立即释放全局锁,并且TC异步通知分支删除undolog

  • 2.如果有一个分支事物返回执行失败,则TC发起请求执行undolog,所有undolog执行成功了,释放锁,异步删除undolog

如何实现读已提交

目前官方给出的方案,在需要进行读取全局事物已经提交的记录的话,需要将select 语句后面加上for update,fescar发现加上排他锁后,会去TC获取对应的锁,如果没有锁上就执行,锁上就自旋等待,为了避免读取未提交的记录,后面全局事物回滚了,就导致脏读,当然目前可能大部分应用都可以接受这种情况,例如在扣商品份额的时候都会最终在校验一次, 当然我们也可以借助undolog实现一个read-view,让这条sql语句读取到这个全局事物还没有执行之前的数据.

目前fescar的实现方式是在sql解析后如果发现是select for update语句,将会进入SelectForUpdateExecutor执行器,不管是不是在一个全局事物中,都需要去TC看下是否被锁住,这里将不会获取锁,只是校验锁是否会释放.

如果分支超时,实际已经执行成功,那么肯定是已经向TC注册成功的,那么如果TM发起回滚,分支可以正常回滚,没有毛病,如果超时后,分支本地事物还没有提交,那么回滚请求已经到达分支,那么将会回滚失败,但是TC会重试不停进行回滚.

实现HA的挑战

TC目前是一个单点,如果需要集群部署,则需要一个服务发现的系统,让TC可以自动扩展,应用不需要关心TC具体节点,而TC的全局锁就不能直接放内存了,可能需要借助第三方存储系统,mysql或者etcd

实现XA的方案

可能需要在分支事物中,当解析到在一个全局事物中,不会进行commit,等到所有分支都返回成功了,事物管理器发起commit请求给TC,然后TC在通知各个分支进行提交,和rollback流程差不多