Posts Spring Batch - Job 개념
Post
Cancel

Spring Batch - Job 개념

개념

Job

배치 작업

img
(출처 : https://docs.spring.io/spring-batch/reference/domain.html)

  • 하나의 Job은 하나 이상의 Step을 가지며, 각각의 Step은 하나의 ItemReader, 하나의 ItemProcessor, 그리고 하나의 ItemWriter를 가진다.
  • 즉, Job은 Step 인스턴스들을 담는 컨테이너이다.
  • Job은 JobLauncher를 통해 실행되며, 현재 실행 중인 프로세스에 대한 메타데이터는 JobRepository에 저장된다.
    • JobRepository : Spring Batch에서 JobExecutionStepExecution 같은 다양한 영속화된 도메인 객체들에 대해 기본적인 CRUD(생성, 조회, 수정, 삭제) 작업을 수행하기 위한 저장소

JobInstance

배치 작업의 실행 단위

img
(출처 : https://docs.spring.io/spring-batch/reference/domain.html)

  • 예를 들어 (위 그림의) EndOfDay Job의 경우 하루마다 하나의 논리적 JobInstance가 존재한다.
    • 만약 1월 1일 실행이 처음에 실패하고 다음 날 다시 실행된다 하더라도, 그것은 여전히 1월 1일 실행이다.
    • 실행은 보통 처리되는 데이터와 대응된다. 즉, 1월 1일 실행은 1월 1일 데이터를 처리한다는 의미이다.
    • 즉, JobInstance는 여러 번의 실행(JobExecution)을 가질 수 있다.
  • 특정 Job과 그것을 식별하는 JobParameters에 대응되는 JobInstance은 동시에 하나만(또는 한번만) 실행될 수 있다.
    • JobInstance = Job + 식별용 JobParameters
    • allowStartIfComplete 속성을 활용하면 완료된 Job을 재실행할 수 있다.
  • BATCH_JOB_INSTANCEJOB_KEY가 파라미터의 해시값
    • 파라미터 없는 경우에도 해시값 만들어짐

img

  • 새로운 JobInstance를 사용하는 것은 “처음부터 시작”을 의미하며, 기존 인스턴스를 사용하는 것은 일반적으로(ExecutionContext을 활용하여) “이전 실행 지점부터 이어서 시작”을 의미한다.

JobExecution

‘실행 시도’를 나타내는 개념

  • 해당 실행과 연결된 JobInstance는 실행이 성공적으로 완료될 때까지 완료된 것으로 간주되지 않는다.
  • 예를 들어 앞서 설명한 EndOfDay Job을 기준으로, 2017년 1월 1일의 JobInstance가 첫 실행에서 실패했다고 가정해 보자.
  • 동일한 식별 JobParameters(2017-01-01)로 다시 실행하면 새로운 JobExecution이 생성된다.
  • 그러나 여전히 JobInstance는 하나뿐이다.

예시 상황

EndOfDayJob 2017-01-01 실행 (21:00 시작 → 21:30 실패)
다음날 같은 파라미터로 재실행 → 성공 (21:00 → 21:30)
이어서 2017-01-02 실행 → 성공 (21:31 → 22:29)

  • BATCH_JOB_INSTANCE 테이블
JOB_INST_IDJOB_NAME
1EndOfDayJob
2EndOfDayJob
  • BATCH_JOB_EXECUTION 테이블
JOB_EXEC_IDJOB_INST_IDSTART_TIMEEND_TIMESTATUS
112017-01-01 21:002017-01-01 21:30FAILED
212017-01-02 21:002017-01-02 21:30COMPLETED
322017-01-02 21:312017-01-02 22:29COMPLETED

코드 살짝 들여다보기

Job 계층구조

1
2
3
4
Job (Interface)
   └── AbstractJob (Abstract Class)
         ├── SimpleJob (Concrete Class)
         └── FlowJob (Concrete Class)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// org.springframework.batch.core.job.AbstractJob
abstract protected void doExecute(JobExecution execution) throws JobExecutionException;

@Override
public final void execute(JobExecution execution) {

	Assert.notNull(execution, "jobExecution must not be null");
	execution.getExecutionContext().put(SpringBatchVersion.BATCH_VERSION_KEY, SpringBatchVersion.getVersion());

  ...

	try (Observation.Scope scope = observation.openScope()) {

		jobParametersValidator.validate(execution.getJobParameters());

		if (execution.getStatus() != BatchStatus.STOPPING) {

			execution.setStartTime(LocalDateTime.now());
			updateStatus(execution, BatchStatus.STARTED);

			listener.beforeJob(execution);

			try {
				doExecute(execution);

        ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// org.springframework.batch.core.job.SimpleJob#doExecute
@Override
protected void doExecute(JobExecution execution)
  throws JobInterruptedException, JobRestartException, StartLimitExceededException {

  StepExecution stepExecution = null;
  for (Step step : steps) {
    stepExecution = handleStep(step, execution);
    if (stepExecution.getStatus() != BatchStatus.COMPLETED) {
      //
      // Terminate the job if a step fails
      //
      break;
    }
  }

  //
  // Update the job status to be the same as the last step
  //
  if (stepExecution != null) {
    if (logger.isDebugEnabled()) {
      logger.debug("Upgrading JobExecution status: " + stepExecution);
    }
    execution.upgradeStatus(stepExecution.getStatus());
    execution.setExitStatus(stepExecution.getExitStatus());
  }
}

JobInstance는 JobParameters로 식별

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// org.springframework.batch.core.repository.dao.JdbcJobInstanceDao#createJobInstance

@Override
public JobInstance createJobInstance(String jobName, JobParameters jobParameters) {

  Assert.notNull(jobName, "Job name must not be null.");
  Assert.notNull(jobParameters, "JobParameters must not be null.");

  Assert.state(getJobInstance(jobName, jobParameters) == null, "JobInstance must not already exist");

  Long jobInstanceId = jobInstanceIncrementer.nextLongValue();

  JobInstance jobInstance = new JobInstance(jobInstanceId, jobName);
  jobInstance.incrementVersion();

  Object[] parameters = new Object[] { jobInstanceId, jobName, jobKeyGenerator.generateKey(jobParameters),
    jobInstance.getVersion() };
  getJdbcTemplate().update(getQuery(CREATE_JOB_INSTANCE), parameters,
    new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.INTEGER });

  return jobInstance;
}

특정 Job과 그것을 식별하는 JobParameters에 대응되는 JobInstance은 한번만 실행 가능

  • 관련 에러 메세지
1
batch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=10, version=5, name=step1, status=COMPLETED, exitStatus=COMPLETED, ...
  • 호출 흐름

img

  • job이름 , 파라미터로 실행 내역을 찾는다.
1
2
3
4
5
6
7
8
9
10
// org.springframework.batch.core.launch.support.TaskExecutorJobLauncher#run
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

		try {
			taskExecutor.execute(new Runnable() {

				@Override
				public void run() {
            ...
						job.execute(jobExecution);
  • 찾은 실행 내역에서 JobInstance를 가져온다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// org.springframework.batch.core.job.SimpleStepHandler#handleStep
JobInstance jobInstance = execution.getJobInstance();

StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) {
	// If the last execution of this step was in the same job, it's
	// probably intentional so we want to run it again...
	if (logger.isInfoEnabled()) {
		logger.info(String.format(
				"Duplicate step [%s] detected in execution of job=[%s]. "
						+ "If either step fails, both will be executed again on restart.",
				step.getName(), jobInstance.getJobName()));
	}
	lastStepExecution = null;
}
StepExecution currentStepExecution = lastStepExecution;

if (shouldStart(lastStepExecution, execution, step)) {
  ...
}
  • jobRepository.getLastStepExecution 호출로 실행되는 쿼리
1
2
3
4
5
6
-- org.springframework.batch.core.repository.dao.JdbcStepExecutionDao#GET_LAST_STEP_EXECUTION
SELECT ...
FROM BATCH_JOB_EXECUTION JE
    JOIN BATCH_STEP_EXECUTION SE ON SE.JOB_EXECUTION_ID = JE.JOB_EXECUTION_ID
WHERE JE.JOB_INSTANCE_ID = ?
  AND SE.STEP_NAME = ?
  • StepExecution의 stepStatus 가 COMPLETED인데 allowStartIfComplete 속성이 false이거나 stepStatus가 ABANDONED 이면 특정 job을 동일한 파라미터로 재실행 불가능
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// org.springframework.batch.core.job.SimpleStepHandler#shouldStart
protected boolean shouldStart(StepExecution lastStepExecution, JobExecution jobExecution, Step step)
		throws JobRestartException, StartLimitExceededException {

	BatchStatus stepStatus;
	if (lastStepExecution == null) {
		stepStatus = BatchStatus.STARTING;
	}
	else {
		stepStatus = lastStepExecution.getStatus();
	}

  ...

	if ((stepStatus == BatchStatus.COMPLETED && !step.isAllowStartIfComplete())
			|| stepStatus == BatchStatus.ABANDONED) {
		// step is complete, false should be returned, indicating that the
		// step should not be started
		if (logger.isInfoEnabled()) {
			logger.info("Step already complete or not restartable, so no action to execute: " + lastStepExecution);
		}
		return false;
	}

  ...

}

특정 Job과 그것을 식별하는 JobParameters에 대응되는 JobInstance은 동시에 하나만 실행 가능

  • 관련 에러 메세지
1
2
Caused by: org.springframework.batch.core.repository.JobExecutionAlreadyRunningException: A job
execution for this job is already running: JobInstance: id=1, ...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// org.springframework.batch.core.repository.support.SimpleJobRepository#createJobExecution
@Override
public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
		throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {

	Assert.notNull(jobName, "Job name must not be null.");
	Assert.notNull(jobParameters, "JobParameters must not be null.");

	JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
	ExecutionContext executionContext;

	if (jobInstance != null) {

		List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);

		if (executions.isEmpty()) {
			throw new IllegalStateException("Cannot find any job execution for job instance: " + jobInstance);
		}

		for (JobExecution execution : executions) {
			if (execution.isRunning()) {
				throw new JobExecutionAlreadyRunningException(
						"A job execution for this job is already running: " + jobInstance);
			}

      ...
		}
		executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
	}

  ...

	return jobExecution;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// org.springframework.batch.core.BatchStatus
public enum BatchStatus {

  /**
   * The batch job has successfully completed its execution.
   */
  COMPLETED,
  /**
   * Status of a batch job prior to its execution.
   */
  STARTING,
  /**
   * Status of a batch job that is running.
   */
  STARTED,
  /**
   * Status of batch job waiting for a step to complete before stopping the batch job.
   */
  STOPPING,
  /**
   * Status of a batch job that has been stopped by request.
   */
  STOPPED,
  /**
   * Status of a batch job that has failed during its execution.
   */
  FAILED,
  /**
   * Status of a batch job that did not stop properly and can not be restarted.
   */
  ABANDONED,
  /**
   * Status of a batch job that is in an uncertain state.
   */
  UNKNOWN;

  public boolean isRunning() {
    return this == STARTING || this == STARTED || this == STOPPING;
  }

  ...

}

참고 자료

This post is licensed under CC BY 4.0 by the author.