spring事物–05源码分析-创建事务

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

接上篇

第4篇中(https://blog.csdn.net/convict_eva/article/details/83544563) invokeWithinTransaction() 方法中调用了创建事务的方法:

TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); 源码如下:

    protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

        // If no name specified, apply method identification as transaction name.
        // 如果没有指定名字,使用方法的名称作为事务名
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }

        //前面说过的 TransactionStatus 
        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                /**
                * 使用定义的事务方法的配置信息
                * 事务由事务处理器来创建,同时返回TransactionStatus 记录当前的事务状态
                */
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        //准备TransactionInfo, TransactionInfo 对象封装了事务处理的配置信息以及TransactionStatus
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }
    /**
    * 准备TransactionInfo
    */
    protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
                @Nullable TransactionAttribute txAttr, String joinpointIdentification,
                @Nullable TransactionStatus status) {

        //new 一个TransactionInfo 对象
        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            // We need a transaction for this method...
            if (logger.isTraceEnabled()) {
                logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            // The transaction manager will flag an error if an incompatible tx already exists.
            /** 
            * 为 TransactionInfo 设置 transactionStatus 
            * transactionStatu 持有事务处理需要的数据,如:transaction对象就是transactionStatus持有的
            */
            txInfo.newTransactionStatus(status);
        }
        else {
            //不需要创建事务
            // The TransactionInfo.hasTransaction() method will return false. We created it only
            // to preserve the integrity of the ThreadLocal stack maintained in this class.
            if (logger.isTraceEnabled())
                logger.trace("Don't need to create transaction for [" + joinpointIdentification +
                        "]: This method isn't transactional.");
        }

        // We always bind the TransactionInfo to the thread, even if we didn't create
        // a new transaction here. This guarantees that the TransactionInfo stack
        // will be managed correctly even if no transaction was created by this aspect.
        /**
        * 把TransactionInfo 与线程绑定(ThreadLocal 实现的)
        * 同时在TransactionInfo 中由一个变量保存以前的TransactionInfo,这样就持有了一连串与事务处理相关的TransactionInfo
        * 虽然不一定需要创建新的事务,但是会在请求事务时创建TransactionInfo.
        *
        *
        txInfo.bindToThread();
        return txInfo;
    }

事务的创建是交给事务处理器来创建的,tm.getTransaction(txAttr) 方法封装了底层事务对象的创建
源码分析:
AbstractPlatformTransactionManager.getTransaction() 只是一个模板方法,这个模板会被具体的事务处理器所使用,如 DataSourceTransactionManager。
AbstractPlatformTransactionManager 会根据事务属性配置和当前进程绑定的事务信息,对事务是否需要创建及怎样创建进行一些通用的处理,然后把事务创建工作交给具体的事务处理器完成。不同的事务处理器创建事务过程是不同的,但是对事务属性和当前线程事务处理信息和处理是一样的,这就使用了模板方法。

    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
        // 这是一个抽象方法,获取事务由具体事务处理器实现。如 DataSourceTransactionManager
        Object transaction = doGetTransaction();

        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();

        /**
        * 如果没有配置事务属性,使用默认的事务属性 DefaultTransactionDefinition
        * 默认事务处理属性:
        * propagationBehavior= PROPAGATION_REQUIRED,isolationLevel=ISOLATION_DEFAULT,timeout=TIMEOUT_DEFAULT;readOnly=false
        */
        if (definition == null) {
            // Use defaults if no transaction definition given.
            definition = new DefaultTransactionDefinition();
        }

        /**
        * 检查当前线程是否已经存在事务,
        * 如果已经存在,那么需要根据事务定义的传播属性配置来处理事务的产生
        */
        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            // 对当前线程中已经存在事务进行处理,结果封装在TransactionStatus中
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        //检查timeout配置是否合法 TIMEOUT_DEFAULT=-1
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        /**
        * 当前线程没有事务存在,需要根据事务属性 来创建事务
        * 在if...else 中可以看到对事务传播属性设置的处理。
        */
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            //使用当前事务,但是当前事务是null,抛出异常
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

            //需要创建一个新的事务,当前线程中没有事务。

            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                // 是否是同步事务,默认为SYNCHRONIZATION_ALWAYS,需要同步事务。这时 newSynchronization=true
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

                //创建一个TransactionStatus,
                //在newTransactionStatus() 方法中,又对事务同步做了一次判断,判断当前线程中是否有事务同步器
                //最后调用 DefaultTransactionStatus 构造方法
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

                //创建事务的调用,由具体事务处理器完成。如DataSourceTransactionManager等
                //开启事务,通过数据库连接拿到隔离级别,设置autoCommit 为false,设置超时时间,把数据库连接放到当前线程中。(看的是DataSourceTransactionManager)
                doBegin(transaction, definition);
                //初始化事务同步器
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            //创建一个空的事务,transaction参数被设置为null, 所以TransatiuonStatus 中没有 transaction 对象
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

    /**
     * Create a TransactionStatus instance for the given arguments.
     */
    protected DefaultTransactionStatus newTransactionStatus(
            TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
            boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

        //真实的新的事务同步,如果不存在的话就要新创建一个
        boolean actualNewSynchronization = newSynchronization &&
                !TransactionSynchronizationManager.isSynchronizationActive();

        //调用构造方法,创建一个TransactionStatus
        return new DefaultTransactionStatus(
                transaction, newTransaction, actualNewSynchronization,
                definition.isReadOnly(), debug, suspendedResources);
    }

    /**
     * Initialize transaction synchronization as appropriate.

     初始化事务同步器

     */
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
            TransactionSynchronizationManager.initSynchronization();
        }
    }

上面是创建一个全新的事务的过程,下面当前线程已经存在,创建事务的过程:

    /**
     * Create a TransactionStatus for an existing transaction.

     如果当前线程中已经有事务存在了,创建事务方法

     */
    private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {

        //TransactionDefinition.PROPAGATION_NEVER 事务传播机制,以非事务方式执行,如果当前存在事务,则抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }

        //PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            //挂起当前事务
            Object suspendedResources = suspend(transaction);
            //新的事务同步标识
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            /**
            * 这里主要参数 transaction 为null,newTransaction 为false
            * 事务方法不需要在事务环境中执行,同时挂起事务的信息记录也保存在TransactionStatus 中
            * 这时包括了ThreadLoacl 对事务信息的记录
            */
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }

        /**
        * PROPAGATION_REQUIRES_NEW ,创建新的事务,同时把当前线程中存在的事务挂起。
        * 与创建上面说的全新事务过程类似,区别在于,在创建全新事务时不用考虑已有事务挂起
        * 在这里要考虑已有事务的挂起处理
        */
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            //挂起事务
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                //下面这些就是创建全新事务同样的方法
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException | Error beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }

        //嵌套事务的创建
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                // Create savepoint within existing Spring-managed transaction,
                // through the SavepointManager API implemented by TransactionStatus.
                // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                //如果有保存点,在spring管理的事务中,创建保存点
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                // 嵌套新的事务,也要开启,提交回滚等操作
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }

        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        //通常两种支持:PROPAGATION_SUPPORTS:当前没能事务就在非事务中运行,
        //PROPAGATION_REQUIRED:如果当前有事务就加入到当前事务中。
        //也就是说这两种方式,要和之前的事务保持一致
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
        if (isValidateExistingTransaction()) {
            //验证隔离级别一致性
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            //验证只读一致性
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        //创建事务,注意newTransaction 值为false
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

    /**
    * 事务的挂起。
    * 返回的 SuspendedResourcesHolder  对象,被保存在了TransactionStatus 对象中。为以后的事务处理做准备
    * 
    *
    */
    @Nullable
    protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
        //如果当前线程中有事务同步器
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            //所有的事务同步器都挂起,所谓挂起就是把数据库的连接设置为null
            List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
            try {
                Object suspendedResources = null;
                if (transaction != null) {
                    /**
                    * 挂起当前事务
                    * 这个挂起是由具体的事务处理器完成的。
                    * 如DataSourceTransactionManager,里面就是把当前连接设置为了null,把数据库资源解绑定
                    */
                    suspendedResources = doSuspend(transaction);
                }
                //在线程中保存事务处理相关信息,重置线程中相关的ThreadLoacl变量
                String name = TransactionSynchronizationManager.getCurrentTransactionName();
                TransactionSynchronizationManager.setCurrentTransactionName(null);
                boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
                boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                TransactionSynchronizationManager.setActualTransactionActive(false);
                return new SuspendedResourcesHolder(
                        suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
            }
            catch (RuntimeException | Error ex) {
                // doSuspend failed - original transaction is still active...
                // doSuspend失败,重新启动当前事务
                doResumeSynchronization(suspendedSynchronizations);
                throw ex;
            }
        }
        else if (transaction != null) {
            // Transaction active but no synchronization active.
            //新创建事务同步器
            Object suspendedResources = doSuspend(transaction);
            return new SuspendedResourcesHolder(suspendedResources);
        }
        else {
            // Neither transaction nor synchronization active.
            return null;
        }
    }

总结:先通过依赖注入把配置的事物管理器,事务属性等注入到 TransactionInterceptor 等相关对象中。然后在TransactionInterceptor 这个拦截器的invoke() 方法中根据事务属性创建事务。

TransactionAspectSupport内部类TransactionInfo 保存了当前TransactionStatus 和上一个TransactionInfo(类似一个链表),猜测:在事物提交或者回滚时,处理多个事务的嵌套。待下一篇

TransactionSynchronizationManager 里面就是一堆ThreadLocal 保证当前线程的事务信息:

    /**
    *当前线程绑定的所有事务resource 信息。   
    *不知道怎么用,DataSourceUtils里面有根据DataSource 获取事务配置信息的。知道的请留言啊!!!
    */
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");

    /**
    *当前线程的事务同步器
    */
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<>("Transaction synchronizations");

    //当前事务名称
    private static final ThreadLocal<String> currentTransactionName =
            new NamedThreadLocal<>("Current transaction name");

    //当前事务是否只读
    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
            new NamedThreadLocal<>("Current transaction read-only status");
    //当前事务隔离级别
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
            new NamedThreadLocal<>("Current transaction isolation level");
    //真实的事务
    private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<>("Actual transaction active");

还没有理解,望大神指点。(待更新)


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » spring事物–05源码分析-创建事务

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏