spring batch 源码分析

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取10G资料包与项目实战视频资料

一、启动batch任务时,会调用job.execute(jobExecution): job为FlowJob类型,jobExecution调用jobRepository的createJobExecution方法生成。
FlowJob继承org.springframework.batch.core.job.AbstractJob,调用AbstractJob#execute方法执行job,这个方法负责执行job、处理所有的listeners和repository调用、将实际的处理委托给子类的doExecute方法。

  1. 为当前线程注册step context
    JobSynchronizationManager.register(execution);

在org.springframework.batch.core.scope.context.JobSynchronizationManager#register方法中,实际调用JobSynchronizationManager的manager属性的register方法来完成。

JobSynchronizationManager的manager属性,默认为

    new SynchronizationManagerSupport<JobExecution, JobContext>()

SynchronizationManagerSupport用来存储当前线程的execution,execution与context(new JobContext(execution))的map。

  1. 校验jobParameters
    jobParametersValidator.validate(execution.getJobParameters());

jobParameters不能为null,如果为null会抛出JobParametersInvalidException

  1. 自行listeners的beforeJob方法
    listener.beforeJob(execution);
  1. 调用doExecute方法执行job
    doExecute(execution);

二、org.springframework.batch.core.job.flow.FlowJob#doExecute方法执行job

  1. 创建JobFlowExecutor,JobFlowExecutor用在需要执行与JobExecution有关的flow的组件中
    JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(),
          new SimpleStepHandler(getJobRepository()), execution);

SimpleStepHandler负责管理repository,重启业务,在new SimpleStepHandler时,会新建ExecutionContext。

    new ExecutionContext()

这个类封装了一个ConcurrentHashMap,能够提供类型安全的read操作。

2.实际负责job的是flow.start(executor),flow是根据配置文件中定义的job生成的SimpleFlow

    executor.updateJobExecutionStatus(flow.start(executor).getStatus());

SimpleFlow的属性startState为StepState类型,值为job的第一个step定义,name为job的id.第一个step的id。

start方法中,会定义一个state并且将startState赋值给他,然后取得stateName(job的id.第一个step的id),最后调用resume方法

    if (startState == null) {
       initializeTransitions();
    }
    State state = startState;
    String stateName = state.getName();
    return resume(stateName, executor);

三、org.springframework.batch.core.job.flow.support.SimpleFlow#resume方法

resume方法的核心是调用state 的handle方法

    while (isFlowContinued(state, status, stepExecution)) {
       stateName = state.getName();
       try {
          status = state.handle(executor);
          stepExecution = executor.getStepExecution();
       }
       state = nextState(stateName, status, stepExecution);
    }
    FlowExecution result = new FlowExecution(stateName, status);

四、org.springframework.batch.core.job.flow.support.state.StepState#handle方法只有两行语句

    executor.abandonStepExecution();
    return new FlowExecutionStatus(executor.executeStep(step));

在启动新的step时,要更新上一次execution,确保他执行失败后在这次启动时能够被放弃。

    executor.executeStep(step)

org.springframework.batch.core.job.flow.JobFlowExecutor继承FlowExecutor接口,这个接口为FlowJob提供step by step执行的context和执行策略。

org.springframework.batch.core.job.flow.JobFlowExecutor#executeStep方法的核心

    StepExecution stepExecution = stepHandler.handleStep(step, execution);

org.springframework.batch.core.job.SimpleStepHandler#handleStep

    currentStepExecution = execution.createStepExecution(step.getName());
    currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
    step.execute(currentStepExecution);
    currentStepExecution.getExecutionContext().put("batch.executed", true);

org.springframework.batch.core.step.AbstractStep#execute

    doExecute(stepExecution);

org.springframework.batch.core.step.tasklet.TaskletStep#doExecute

这个方法中会创建一个Semaphore,这个信号量是为了step能够在不使用锁的情况下并发执行

    stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {

       @Override
       public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
             throws Exception {

          StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

          // Before starting a new transaction, check for
          // interruption.
          interruptionPolicy.checkInterrupted(stepExecution);

          RepeatStatus result;
          try {
             result = new TransactionTemplate(transactionManager, transactionAttribute)
             .execute(new ChunkTransactionCallback(chunkContext, semaphore));
          }
          catch (UncheckedTransactionException e) {
             // Allow checked exceptions to be thrown inside callback
             throw (Exception) e.getCause();
          }

          chunkListener.afterChunk(chunkContext);

          // Check for interruption after transaction as well, so that
          // the interrupted exception is correctly propagated up to
          // caller
          interruptionPolicy.checkInterrupted(stepExecution);

          return result;
       }

    });

org.springframework.batch.repeat.support.RepeatTemplate#iterate,执行batch callback直到completion policy任务已经完成。会等待整个batch完成再返回。

    result = executeInternal(callback);

org.springframework.batch.repeat.support.RepeatTemplate#executeInternal,循环执行interceptor和batch callback的内部方法

    RepeatInternalState state = createInternalState(context);

while (running) {

        if (running) 

result = getNextResult(context
, callback
, state)
; result = result.and(waitForResults(state))
;

executeAfterInterceptors(context
, result)
;执行createInternalState,会创建RepeatInternalStateSupport对象,这个对象的results属性为ResultHolderResultQueue类型

    new RepeatInternalStateSupport();
    this.results = new ResultHolderResultQueue(throttleLimit);

ResultHolderResultQueue类型的对象会根据job配置的throttleLimit来创建队列和信号量,用来多线程执行step,throttleLimit为期望得到的result的数量

    results = new PriorityBlockingQueue<ResultHolder>(throttleLimit, new ResultHolderComparator());
    waits = new Semaphore(throttleLimit);

五、org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate#getNextResult

这个方法利用TaskExecutor多线程执行,得到result;内部state是一个未执行完的result holders的队列;这个方法退出时,有返回值的holder不应该在队列中;队列被scoped在调用他的方法中,不需要synchronize access.

在创建ExecutingRunnable时,使用的是同一个context,即将多个线程的结果放入相同的context中。

    ExecutingRunnable runnable = null;
    ResultQueue<ResultHolder> queue = ((ResultQueueInternalState) state).getResultQueue();
    do {
       /*
        * Wrap the callback in a runnable that will add its result to the
        * queue when it is ready.
        */
       runnable = new ExecutingRunnable(callback, context, queue);
       /**  * Tell the runnable that it can expect a result. This could have * been in-lined with the constructor, but it might block, so it's * better to do it here, since we have the option (it's a private * class).  */  runnable.expect();
       taskExecutor.execute(runnable);
       /*
        * Allow termination policy to update its state. This must happen
        * immediately before or after the call to the task executor.
        */
       update(context);
       /*
        * Keep going until we get a result that is finished, or early
        * termination...
        */
    } while (queue.isEmpty() && !isComplete(context));
    /*
     * N.B. If the queue is empty then take() blocks until a result appears,
     * and there must be at least one because we just submitted one to the
     * task executor.
     */
    ResultHolder result = queue.take();
    return result.getResult();

在ExecutingRunnable的run方法中,会调用TaskletStep#doExecute方法在调用stepOperations.iterate时传入的callback

    result = callback.doInIteration(context);

之前传入的callback对象的doInIteration方法中,执行获取结果的逻辑

    result = new TransactionTemplate(transactionManager, transactionAttribute)
    .execute(new ChunkTransactionCallback(chunkContext, semaphore));

execute方法最终会调用

    result = action.doInTransaction(status);

ChunkTranscationCallback是TaskletStep的内部类,

org.springframework.batch.core.step.tasklet.TaskletStep.ChunkTransactionCallback#doInTransaction的核心:

    result = tasklet.execute(contribution, chunkContext);

org.springframework.batch.core.step.item.ChunkOrientedTasklet#execute

execute方法中的chunkProvider类型根据配置文件中是否为chunk 配置skip-policy而不同,

如果未配置就是org.springframework.batch.core.step.item.SimpleChunkProvider;

如果配置了就是org.springframework.batch.core.step.item.FaultTolerantChunkProcessor,他是SimpleChunkProvider的子类,因此在执行provide、process、postProcess时,实际还是调用的SimpleChunkProvider中定义的方法

    Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
    if (inputs == null) {
       inputs = chunkProvider.provide(contribution);
       if (buffering) {
          chunkContext.setAttribute(INPUTS_KEY, inputs);
       }
    }
    chunkProcessor.process(contribution, inputs);
    chunkProvider.postProcess(contribution, inputs);

    // Allow a message coming back from the processor to say that we
    // are not done yet
    if (inputs.isBusy()) {
       logger.debug("Inputs still busy");
       return RepeatStatus.CONTINUABLE;
    }

    chunkContext.removeAttribute(INPUTS_KEY);
    chunkContext.setComplete();
    return RepeatStatus.continueIf(!inputs.isEnd());
  1. org.springframework.batch.core.step.item.SimpleChunkProvider#provide
    final Chunk<I> inputs = new Chunk<I>();
    repeatOperations.iterate(new RepeatCallback() {

       @Override
       public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
          I item = null;
          try {
             item = read(contribution, inputs);
          }
          catch (SkipOverflowException e) {
             // read() tells us about an excess of skips by throwing an
             // exception
             return RepeatStatus.FINISHED;
          }
          if (item == null) {
             inputs.setEnd();
             return RepeatStatus.FINISHED;
          }
          inputs.add(item);
          contribution.incrementReadCount();
          return RepeatStatus.CONTINUABLE;
       }

    });

    return inputs;

repeatOperations跟上面TaskletStep#doExecute中的stepOperations一样,都是org.springframework.batch.repeat.support.RepeatTemplate类型,只是此时传入的callback的doInIteration方法实现不同

  1. org.springframework.batch.core.step.item.SimpleChunkProcessor#process
    // Allow temporary state to be stored in the user data field
    initializeUserData(inputs);
    // If there is no input we don't have to do anything more
    if (isComplete(inputs)) {
       return;
    }
    // Make the transformation, calling remove() on the inputs iterator if
    // any items are filtered. Might throw exception and cause rollback.
    Chunk<O> outputs = transform(contribution, inputs);
    // Adjust the filter count based on available data
    contribution.incrementFilterCount(getFilterCount(inputs, outputs));
    // Adjust the outputs if necessary for housekeeping purposes, and then
    // write them out...
    write(contribution, inputs, getAdjustedOutputs(inputs, outputs));

org.springframework.batch.core.step.item.FaultTolerantChunkProcessor#transform,会在这个方法中定义的RetryCallback对象的doWithRetry方法中调用配置文件中配置的processor

    Chunk<O> outputs = new Chunk<O>();
    @SuppressWarnings("unchecked")
    final UserData<O> data = (UserData<O>) inputs.getUserData();
    final Chunk<O> cache = data.getOutputs();
    final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator();
    final AtomicInteger count = new AtomicInteger(0);

    // final int scanLimit = processorTransactional && data.scanning() ? 1 :
    // 0;

    for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {

       final I item = iterator.next();

       RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() {

          @Override
          public O doWithRetry(RetryContext context) throws Exception {
             O output = null;
             try {
                count.incrementAndGet();
                O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
                if (cached != null && !processorTransactional) {
                   output = cached;
                }
                else {
                   output = doProcess(item);
                   if (output == null) {
                      data.incrementFilterCount();
                   } else if (!processorTransactional && !data.scanning()) {
                      cache.add(output);
                   }
                }
             }
             catch (Exception e) {
                if (rollbackClassifier.classify(e)) {
                   // Default is to rollback unless the classifier
                   // allows us to continue
                   throw e;
                }
                else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
                   // If we are not re-throwing then we should check if
                   // this is skippable
                   contribution.incrementProcessSkipCount();
                   logger.debug("Skipping after failed process with no rollback", e);
                   // If not re-throwing then the listener will not be
                   // called in next chunk.
                   callProcessSkipListener(item, e);
                }
                else {
                   // If it's not skippable that's an error in
                   // configuration - it doesn't make sense to not roll
                   // back if we are also not allowed to skip
                   throw new NonSkippableProcessException(
                         "Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.",
                         e);
                }
             }
             if (output == null) {
                // No need to re-process filtered items
                iterator.remove();
             }
             return output;
          }

       };

       RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {

          @Override
          public O recover(RetryContext context) throws Exception {
             Throwable e = context.getLastThrowable();
             if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
                iterator.remove(e);
                contribution.incrementProcessSkipCount();
                logger.debug("Skipping after failed process", e);
                return null;
             }
             else {
                if (rollbackClassifier.classify(e)) {
                   // Default is to rollback unless the classifier
                   // allows us to continue
                   throw new RetryException("Non-skippable exception in recoverer while processing", e);
                }
                iterator.remove(e);
                return null;
             }
          }

       };

       O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
             getInputKey(item), rollbackClassifier));
       if (output != null) {
          outputs.add(output);
       }

       /*
        * We only want to process the first item if there is a scan for a
        * failed item.
        */
       if (data.scanning()) {
          while (cacheIterator != null && cacheIterator.hasNext()) {
             outputs.add(cacheIterator.next());
          }
          // Only process the first item if scanning
          break;
       }
    }

    return outputs;

org.springframework.batch.core.step.item.BatchRetryTemplate#execute(org.springframework.retry.RetryCallback<T,E>, org.springframework.retry.RecoveryCallback, org.springframework.retry.RetryState)

    O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
             getInputKey(item), rollbackClassifier));

最终调用org.springframework.retry.support.RetryTemplate#doExecute,核心代码如下:

    RetryContext context = this.open(retryPolicy, state);
    RetrySynchronizationManager.register(context);
    try {
        boolean running = this.doOpenInterceptors(retryCallback, context);
        while(this.canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
            try {
                lastException = null;
                var24 = retryCallback.doWithRetry(context);
                return var24;
            } catch (Throwable var21) {
            }
        }
        var24 = this.handleRetryExhausted(recoveryCallback, context, state);
    } catch (Throwable var22) {
        throw wrapIfNecessary(var22);
    } finally {
        this.close(retryPolicy, context, state, lastException == null);
        this.doCloseInterceptors(retryCallback, context, lastException);
        RetrySynchronizationManager.clear();
    }
    return var24;
  1. org.springframework.batch.core.step.item.FaultTolerantChunkProcessor#write
    final UserData<O> data = (UserData<O>) inputs.getUserData();
    final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>();
    RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
       @Override
       public Object doWithRetry(RetryContext context) throws Exception {
          contextHolder.set(context);

          if (!data.scanning()) {
             chunkMonitor.setChunkSize(inputs.size());
             try {
                doWrite(outputs.getItems());
             }
             catch (Exception e) {
                if (rollbackClassifier.classify(e)) {
                   throw e;
                }
                /*
                 * If the exception is marked as no-rollback, we need to
                 * override that, otherwise there's no way to write the
                 * rest of the chunk or to honour the skip listener
                 * contract.
                 */
                throw new ForceRollbackForWriteSkipException(
                      "Force rollback on skippable exception so that skipped item can be located.", e);
             }
             contribution.incrementWriteCount(outputs.size());
          }
          else {
             scan(contribution, inputs, outputs, chunkMonitor, false);
          }
          return null;

       }
    };

    if (!buffering) {

       RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {

          @Override
          public Object recover(RetryContext context) throws Exception {

             Throwable e = context.getLastThrowable();
             if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
                throw new RetryException("Invalid retry state during write caused by "
                      + "exception that does not classify for rollback: ", e);
             }

             Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
             for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {

                inputIterator.next();
                outputIterator.next();

                checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
                if (!rollbackClassifier.classify(e)) {
                   throw new RetryException(
                         "Invalid retry state during recovery caused by exception that does not classify for rollback: ",
                         e);
                }

             }

             return null;

          }

       };

       batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
             BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));

    }
    else {

       RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {

          @Override
          public Object recover(RetryContext context) throws Exception {
             /*
              * If the last exception was not skippable we don't need to
              * do any scanning. We can just bomb out with a retry
              * exhausted.
              */
             if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {
                throw new ExhaustedRetryException(
                      "Retry exhausted after last attempt in recovery path, but exception is not skippable.",
                      context.getLastThrowable());
             }

             inputs.setBusy(true);
             data.scanning(true);
             scan(contribution, inputs, outputs, chunkMonitor, true);
             return null;
          }

       };

       try {
          batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
                rollbackClassifier));
       }
       catch (Exception e) {
          RetryContext context = contextHolder.get();
          if (!batchRetryTemplate.canRetry(context)) {
             /*
              * BATCH-1761: we need advance warning of the scan about to
              * start in the next transaction, so we can change the
              * processing behaviour.
              */
             data.scanning(true);
          }
          throw e;
       }

    }

    callSkipListeners(inputs, outputs);

疑问:

  1. 每个step的chunk在执行时,reader、processor、writer是否在同一个线程中执行?

  2. 如果reader、processor、writer可以在不同线程中执行,源码中在何处调度?并未发现相关代码.


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » spring batch 源码分析

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏