Spring事务tx源码分析(二)

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

最近想提升,苦逼程序猿,想了想还是拿最熟悉,之前也一直想看但没看的spring源码来看吧,正好最近在弄事务这部分的东西,就看了下,同时写下随笔记录下,以备后查。

spring tx源码分析

这里只分析简单事务也就是DataSourceTransactionManager

首先肯定找入口了,看过spring源码的同学一定都会找spring tx的入口就是在TxAdviceBeanDefinitionParser这里将解析tx的配置,生成TransactionInterceptor对象,这个也就是一个普通的切面类,只要符合AOP规则的调用都会进入此切面。

在invoke方法中最重要的一段代码:这里主要分析一个新的事务的开始过程

20191102100560\_1.png

        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        final TransactionAttribute txAttr =
                getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass);//获取配置的TransactionAttribute信息
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        final String joinpointIdentification = methodIdentification(invocation.getMethod(), targetClass);
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);//开启一个新的事务
            Object retVal = null;
            try {
                retVal = invocation.proceed();//原有逻辑执行
            }
            catch (Throwable ex) {
                completeTransactionAfterThrowing(txInfo, ex);//发生异常时候对异常的处理
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);//清理TransactionInfo信息
            }
            commitTransactionAfterReturning(txInfo);//提交事务
            return retVal;

20191102100560\_2.png

首先开启事务,也就是调用createTransactionIfNecessary方法:

20191102100560\_3.png

    protected TransactionInfo createTransactionIfNecessary(
                PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
            if (txAttr != null && txAttr.getName() == null) {
                txAttr = new DelegatingTransactionAttribute(txAttr) {
                    @Override
                    public String getName() {
                        return joinpointIdentification;
                    }
                };
            }
            TransactionStatus status = null;
            if (txAttr != null) {
                if (tm != null) {
                    status = tm.getTransaction(txAttr);
                }
                else {
                    }
                }
            }
            return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
        }

20191102100560\_4.png

这里其实主要就是调用PlatformTransactionManager的getTransactionf方法来获取TransactionStatus来开启一个事务:

20191102100560\_5.png

    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
            Object transaction = doGetTransaction();
            if (definition == null) {
                definition = new DefaultTransactionDefinition();
            }
            if (isExistingTransaction(transaction)) {//这个判断很重要,是否已经存在的一个transaction
                return handleExistingTransaction(definition, transaction, debugEnabled);//如果是存在的将进行一些处理
            }
            if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
                throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
            }
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
                throw new IllegalTransactionStateException(
                        "No existing transaction found for transaction marked with propagation 'mandatory'");
            }
            //如果是PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED这三种类型将开启一个新的事务
            else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                    definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                SuspendedResourcesHolder suspendedResources = suspend(null);
                try {
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    doBegin(transaction, definition);//开启新事物
                    prepareSynchronization(status, definition);
                    return status;
                }
                catch (RuntimeException ex) {
                    resume(null, suspendedResources);
                    throw ex;
                }
                catch (Error err) {
                    resume(null, suspendedResources);
                    throw err;
                }
            }
            else {
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
            }
        }

20191102100560\_6.png

这段代码比较长也是比较核心的一段代码,让我们来慢慢分析,首先这里将执行doGetTransaction方法来获取一个transaction

20191102100560\_7.png

    protected Object doGetTransaction() {
            DataSourceTransactionObject txObject = new DataSourceTransactionObject();
            txObject.setSavepointAllowed(isNestedTransactionAllowed());
            ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
            //这一行代码中TransactionSynchronizationManager很重要,是对connection的核心获取、持有、删除等
            txObject.setConnectionHolder(conHolder, false);
            //这里不论获取到或者获取不到都将此设置newConnectionHolder为false
            return txObject;
        }

20191102100560\_8.png

这段代码中主要是根据this.dataSource来获取ConnectionHolder,这个ConnectionHolder是放在TransactionSynchronizationManager的ThreadLocal中持有的,如果是第一次来获取,肯定得到是null。

接着代码往下将执行到isExistingTransaction(transaction),这里主要是依据下面代码判断:

    txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()

如果是第一次开启事务这里必然是false,否则将返回true。

我们这里先讨论第一次进入的情况,也就是false的时候将继续往下执行到了判断事务Propagation的时候了,如果Propagation为:ROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED中的一个将开启一个新事物,new一个新的DefaultTransactionStatus ,并且

newTransaction设置为true,这个状态很重要,因为后面的不论回滚、提交都是根据这个属性来判断是否在这个TransactionStatus上来进行。 接着将执行doBegin方法:
20191102100560\_9.png

    protected void doBegin(Object transaction, TransactionDefinition definition) {
            DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
            Connection con = null;
            try {
                if (txObject.getConnectionHolder() == null ||
                        txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                    Connection newCon = this.dataSource.getConnection();//从dataSource中获取一个Connection
                    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);//为当前Transaction设置ConnectionHolder,并且设置newConnectionHolder为true
                }
                txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
                con = txObject.getConnectionHolder().getConnection();
                //这里主要是根据definition对connection进行一些设置
                Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
                txObject.setPreviousIsolationLevel(previousIsolationLevel);
                if (con.getAutoCommit()) {//开启事务,设置autoCommit为false
                    txObject.setMustRestoreAutoCommit(true);
                    con.setAutoCommit(false);
                }
                //这里设置transactionActive为true,还记得签名判断是否存在的transaction吧?就是根据这个
                txObject.getConnectionHolder().setTransactionActive(true);
                int timeout = determineTimeout(definition);
                if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                    txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
                }
                if (txObject.isNewConnectionHolder()) {
                    //这里将当前的connection放入TransactionSynchronizationManager中持有,如果下次调用可以判断为已有的事务
                    TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
                }
            }
        }

20191102100560\_10.png

这里其实主要就是从dataSource中获取一个新的connection,形成一个ConnectionHolder,并且放入TransactionSynchronizationManager中持有,记得前面doGetTransaction方法吧,如果同一个线程,再此进入执行的话就会获取到同一个ConnectionHolder,在后面的isExistingTransaction方法也可以判定为是已有的transaction。

接下来将执行prepareSynchronization方法,主要是对TransactionSynchronizationManager的一系列设置。

然后将返回上层代码执行prepareTransactionInfo方法

20191102100560\_11.png

    protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
                TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {
            TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
            if (txAttr != null) {
                txInfo.newTransactionStatus(status);
            }
            txInfo.bindToThread();
            return txInfo;
        }

20191102100560\_12.png

这里其实比较简单主要生成一个TransactionInfo并绑定到当前线程的ThreadLocal

        private void bindToThread() {
                this.oldTransactionInfo = transactionInfoHolder.get();
                transactionInfoHolder.set(this);
            }

形成了一个链表,具体啥用我也暂时没看到,唯一看到的就是通过TransactionAspectSupport.currentTransactionStatus()可以获取当前的transaction状态。

然后再返回到上层代码,接着就是执行相应的逻辑代码了

    retVal = invocation.proceed();

执行过程的finally代码块将执行cleanupTransactionInfo(txInfo);

        private void restoreThreadLocalStatus() {
                transactionInfoHolder.set(this.oldTransactionInfo);
            }

这里就是将txInfo进行重置工作,让它恢复到前一个状态。

然后就是提交操作(commitTransactionAfterReturning)或者是回滚操作(completeTransactionAfterThrowing)了。

这里就拿提交操作来为例来说明,回滚操作类似:

    protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
            if (txInfo != null && txInfo.hasTransaction()) {
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            }
        }

实际就是执行的processCommit方法

20191102100560\_13.png

    private void processCommit(DefaultTransactionStatus status) throws TransactionException {
            try {
                boolean beforeCompletionInvoked = false;
                try {
                    prepareForCommit(status);
                    triggerBeforeCommit(status);
                    triggerBeforeCompletion(status);
                    beforeCompletionInvoked = true;
                    boolean globalRollbackOnly = false;
                    if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                        globalRollbackOnly = status.isGlobalRollbackOnly();
                    }
                    if (status.hasSavepoint()) {
                        status.releaseHeldSavepoint();
                    }
                    else if (status.isNewTransaction()) {
                        doCommit(status);
                    }
                    if (globalRollbackOnly) {
                        throw new UnexpectedRollbackException(
                                "Transaction silently rolled back because it has been marked as rollback-only");
                    }
                }
                catch (UnexpectedRollbackException ex) {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                    throw ex;
                }
                catch (TransactionException ex) {
                    if (isRollbackOnCommitFailure()) {
                        doRollbackOnCommitException(status, ex);
                    }
                    else {
                        triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                    }
                    throw ex;
                }
                catch (RuntimeException ex) {
                    if (!beforeCompletionInvoked) {
                        triggerBeforeCompletion(status);
                    }
                    doRollbackOnCommitException(status, ex);
                    throw ex;
                }
                catch (Error err) {
                    if (!beforeCompletionInvoked) {
                        triggerBeforeCompletion(status);
                    }
                    doRollbackOnCommitException(status, err);
                    throw err;
                }
                try {
                    triggerAfterCommit(status);
                }
                finally {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
                }

            }
            finally {
                cleanupAfterCompletion(status);
            }
        }

20191102100560\_14.png

首先将执行一些提交前的准备工作,这里将进行是否有savepoint判断status.hasSavepoint(),如果有的话将进行释放savePoint,即getConnectionHolderForSavepoint().getConnection().releaseSavepoint((Savepoint) savepoint);

接着就判断是否是新的transaction:status.isNewTransaction(),如果是的话将执行 doCommit(status);

20191102100560\_15.png

    protected void doCommit(DefaultTransactionStatus status) {
            DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
            Connection con = txObject.getConnectionHolder().getConnection();
            try {
                con.commit();
            }
            catch (SQLException ex) {
                throw new TransactionSystemException("Could not commit JDBC transaction", ex);
            }
        }

20191102100560\_16.png

其实也就是调用了Connection的commit()方法。

最后无论成功与否都将调用finally块中的cleanupAfterCompletion(status)

20191102100560\_17.png

    private void cleanupAfterCompletion(DefaultTransactionStatus status) {
            status.setCompleted();
            if (status.isNewSynchronization()) {
                TransactionSynchronizationManager.clear();//TransactionSynchronizationManager清理工作
            }
            if (status.isNewTransaction()) {
                doCleanupAfterCompletion(status.getTransaction());//这个比较重要
            }
            if (status.getSuspendedResources() != null) {
                resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
            }
        }

20191102100560\_18.png

首先对TransactionSynchronizationManager进行一系列清理工作,然后就将执行doCleanupAfterCompletion方法:

20191102100560\_19.png

        protected void doCleanupAfterCompletion(Object transaction) {
            DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
            if (txObject.isNewConnectionHolder()) {
                //从TransactionSynchronizationManager中解绑相应的connectionHolder
                TransactionSynchronizationManager.unbindResource(this.dataSource);
            }
            Connection con = txObject.getConnectionHolder().getConnection();
            try {
                //对获取到的Connection进行一些还原
                if (txObject.isMustRestoreAutoCommit()) {
                    con.setAutoCommit(true);
                }//对获取到的Connection进行一些还原
                DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
            }
            catch (Throwable ex) {
            }
            if (txObject.isNewConnectionHolder()) {
                //如果是newConnection将这个链接关闭,如果是连接池将还给连接池
                DataSourceUtils.releaseConnection(con, this.dataSource);
            }
            //这里将这只transactionActive为false
            txObject.getConnectionHolder().clear();
        }

20191102100560\_20.png

其实就是将TransactionSynchronizationManager中持有的connectionHolder释放,并且还原当前Connection 的状态,然后将对当前的transaction进行清理包括设置transactionActive为false等。

至此整个spring的事务过程也就结束了。

两篇比较好的关于事务的博客:

http://www.iteye.com/topic/78674

http://www.cnblogs.com/yangy608/archive/2011/06/29/2093478.html

关于事务的传播级别,这个比较准确:https://www.2cto.com/kf/201510/445660.html


来源:http://ddrv.cn

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Spring事务tx源码分析(二)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏