Spring Batch 완전 가이드
Spring Batch는 대용량 데이터 처리를 위한 경량 배치 프레임워크다. 로깅/추적, 트랜잭션 관리, 작업 재시작, 건너뛰기, 리소스 관리 등 엔터프라이즈 배치 처리에 필수적인 기능을 제공한다.
Spring Batch란?
배치 처리의 특징
배치 처리 (Batch Processing):
- 대량의 데이터를 일괄 처리
- 사용자 개입 없이 자동 실행
- 정해진 시간에 실행 (스케줄링)
- 실패 시 재시작 및 복구 지원
배치 애플리케이션 요구사항
| 요구사항 | 설명 |
|---|---|
| 대용량 처리 | 수백만 건의 데이터를 효율적으로 처리 |
| 자동화 | 사용자 개입 없이 실행 |
| 견고성 | 잘못된 데이터로 인한 중단 방지 |
| 신뢰성 | 로깅, 알림을 통한 추적 |
| 성능 | 지정된 시간 내 처리 완료 |
Spring Batch vs Quartz
Spring Batch:
- "무엇을" 처리할지 정의
- 대용량 데이터 처리 로직
- 재시작, 건너뛰기, 청크 처리
Quartz:
- "언제" 실행할지 정의
- 스케줄링 (Cron 표현식)
- Spring Batch와 함께 사용
아키텍처
계층 구조
┌─────────────────────────────────────────────┐
│ Application Layer │
│ (개발자가 작성하는 Job, Step) │
├─────────────────────────────────────────────┤
│ Core Layer │
│ (JobLauncher, Job, Step, Flow 등) │
├─────────────────────────────────────────────┤
│ Infrastructure Layer │
│ (ItemReader, ItemWriter, JobRepository) │
└─────────────────────────────────────────────┘
핵심 컴포넌트 관계
JobLauncher
│
▼
Job ──────────────────────────────┐
│ │
├── Step 1 │
│ ├── ItemReader │
│ ├── ItemProcessor │
│ └── ItemWriter ▼
│ JobRepository
├── Step 2 (메타데이터 저장)
│ └── Tasklet │
│ │
└── Step 3 ──────────────────────┘
핵심 개념
Job
Job은 배치 처리의 최상위 단위로, 하나 이상의 Step으로 구성된다.
@Configuration
public class JobConfig {
@Bean
public Job sampleJob(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("sampleJob", jobRepository)
.start(step1)
.next(step2)
.build();
}
}
JobInstance
Job 실행의 논리적 단위
예시:
- Job: 일별 정산 배치
- JobInstance (1월 1일): 1월 1일 데이터 처리
- JobInstance (1월 2일): 1월 2일 데이터 처리
→ 동일한 JobParameters로 성공한 JobInstance는 재실행 불가
→ 실패한 JobInstance는 재실행 가능
JobParameters
// JobParameters로 JobInstance 식별
JobParameters params = new JobParametersBuilder()
.addString("date", "2024-01-01")
.addLong("seq", 1L)
.addLocalDateTime("runTime", LocalDateTime.now())
.toJobParameters();
jobLauncher.run(job, params);
JobExecution
JobInstance의 실제 실행 시도
JobInstance (1월 1일)
├── JobExecution #1 (실패)
├── JobExecution #2 (실패)
└── JobExecution #3 (성공)
JobExecution 정보:
- 상태 (COMPLETED, FAILED, STOPPED 등)
- 시작 시간, 종료 시간
- ExitStatus
- ExecutionContext
Step
Step은 Job의 독립적인 처리 단위다.
@Bean
public Step sampleStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<String> reader,
ItemProcessor<String, String> processor,
ItemWriter<String> writer) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
StepExecution
Step의 실제 실행 시도
StepExecution 정보:
- readCount: 읽은 아이템 수
- writeCount: 쓴 아이템 수
- commitCount: 커밋 횟수
- rollbackCount: 롤백 횟수
- skipCount: 건너뛴 아이템 수
- filterCount: 필터링된 아이템 수
ExecutionContext
Step 또는 Job 간 데이터를 공유하는 저장소.
// StepExecutionContext - Step 범위
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
ExecutionContext context = stepExecution.getExecutionContext();
context.put("key", "value");
}
// JobExecutionContext - Job 범위 (Step 간 공유)
@BeforeStep
public void saveToJobContext(StepExecution stepExecution) {
ExecutionContext jobContext = stepExecution.getJobExecution().getExecutionContext();
jobContext.put("sharedData", data);
}
Chunk 기반 처리
Chunk란?
Chunk: 한 번에 처리할 아이템 묶음
처리 흐름:
1. ItemReader가 chunk size만큼 읽기
2. 각 아이템을 ItemProcessor로 처리
3. 처리된 아이템을 ItemWriter로 일괄 쓰기
4. 트랜잭션 커밋
예시 (chunk size = 10):
┌─────────────────────────────────────────────┐
│ Read 10 items → Process each → Write all │
│ ↓ │
│ COMMIT │
├─────────────────────────────────────────────┤
│ Read 10 items → Process each → Write all │
│ ↓ │
│ COMMIT │
└─────────────────────────────────────────────┘
Chunk 처리 코드
@Bean
public Step chunkStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("chunkStep", jobRepository)
.<InputType, OutputType>chunk(100, transactionManager) // chunk size
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
Chunk Size 결정 기준
고려 사항:
- 메모리 사용량: 클수록 메모리 많이 사용
- 트랜잭션 크기: 클수록 롤백 시 손실 큼
- 처리 속도: 클수록 커밋 횟수 감소
- I/O 효율: 클수록 배치 I/O 효율적
일반적인 권장:
- 시작: 100 ~ 1000
- 성능 테스트 후 조정
ItemReader
주요 ItemReader 구현체
| 구현체 | 용도 |
|---|---|
| JdbcCursorItemReader | JDBC 커서로 DB 조회 |
| JdbcPagingItemReader | 페이징으로 DB 조회 |
| JpaPagingItemReader | JPA 페이징 조회 |
| JpaCursorItemReader | JPA 커서 조회 |
| FlatFileItemReader | 파일 읽기 (CSV, 고정길이) |
| JsonItemReader | JSON 파일 읽기 |
| StaxEventItemReader | XML 파일 읽기 |
JdbcCursorItemReader
@Bean
public JdbcCursorItemReader<Customer> jdbcCursorReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Customer>()
.name("customerReader")
.dataSource(dataSource)
.sql("SELECT id, name, email FROM customer WHERE status = ?")
.preparedStatementSetter(ps -> ps.setString(1, "ACTIVE"))
.rowMapper((rs, rowNum) -> Customer.builder()
.id(rs.getLong("id"))
.name(rs.getString("name"))
.email(rs.getString("email"))
.build())
.build();
}
JdbcPagingItemReader
@Bean
public JdbcPagingItemReader<Customer> jdbcPagingReader(DataSource dataSource) {
Map<String, Object> params = new HashMap<>();
params.put("status", "ACTIVE");
return new JdbcPagingItemReaderBuilder<Customer>()
.name("customerPagingReader")
.dataSource(dataSource)
.selectClause("SELECT id, name, email")
.fromClause("FROM customer")
.whereClause("WHERE status = :status")
.sortKeys(Map.of("id", Order.ASCENDING))
.parameterValues(params)
.pageSize(100)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.build();
}
JpaPagingItemReader
@Bean
public JpaPagingItemReader<Customer> jpaPagingReader(EntityManagerFactory emf) {
return new JpaPagingItemReaderBuilder<Customer>()
.name("customerJpaReader")
.entityManagerFactory(emf)
.queryString("SELECT c FROM Customer c WHERE c.status = :status")
.parameterValues(Map.of("status", "ACTIVE"))
.pageSize(100)
.build();
}
JpaCursorItemReader (Spring Batch 5.0+)
@Bean
public JpaCursorItemReader<Customer> jpaCursorReader(EntityManagerFactory emf) {
return new JpaCursorItemReaderBuilder<Customer>()
.name("customerCursorReader")
.entityManagerFactory(emf)
.queryString("SELECT c FROM Customer c WHERE c.status = :status")
.parameterValues(Map.of("status", "ACTIVE"))
.build();
}
FlatFileItemReader
@Bean
public FlatFileItemReader<Customer> flatFileReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerFileReader")
.resource(new ClassPathResource("customers.csv"))
.encoding("UTF-8")
.linesToSkip(1) // 헤더 스킵
.delimited()
.delimiter(",")
.names("id", "name", "email", "phone")
.targetType(Customer.class)
.build();
}
커스텀 ItemReader
public class CustomItemReader implements ItemReader<String> {
private final Iterator<String> iterator;
public CustomItemReader(List<String> items) {
this.iterator = items.iterator();
}
@Override
public String read() {
if (iterator.hasNext()) {
return iterator.next();
}
return null; // null 반환 시 읽기 종료
}
}
ItemProcessor
기본 ItemProcessor
@Bean
public ItemProcessor<Customer, CustomerDto> customerProcessor() {
return customer -> {
// null 반환 시 해당 아이템은 Writer로 전달되지 않음 (필터링)
if (!customer.isActive()) {
return null;
}
return CustomerDto.builder()
.id(customer.getId())
.name(customer.getName().toUpperCase())
.email(customer.getEmail())
.build();
};
}
ValidatingItemProcessor
@Bean
public ValidatingItemProcessor<Customer> validatingProcessor() {
ValidatingItemProcessor<Customer> processor = new ValidatingItemProcessor<>();
processor.setValidator(new CustomerValidator());
processor.setFilter(true); // 검증 실패 시 필터링 (예외 대신)
return processor;
}
public class CustomerValidator implements Validator<Customer> {
@Override
public void validate(Customer customer) throws ValidationException {
if (customer.getEmail() == null || !customer.getEmail().contains("@")) {
throw new ValidationException("Invalid email");
}
}
}
CompositeItemProcessor
@Bean
public CompositeItemProcessor<Customer, CustomerDto> compositeProcessor() {
return new CompositeItemProcessorBuilder<Customer, CustomerDto>()
.delegates(
validatingProcessor(),
transformProcessor(),
enrichProcessor()
)
.build();
}
ClassifierCompositeItemProcessor
@Bean
public ClassifierCompositeItemProcessor<Customer, CustomerDto> classifierProcessor() {
ClassifierCompositeItemProcessor<Customer, CustomerDto> processor =
new ClassifierCompositeItemProcessor<>();
processor.setClassifier(customer -> {
if ("VIP".equals(customer.getGrade())) {
return vipProcessor();
} else {
return normalProcessor();
}
});
return processor;
}
ItemWriter
주요 ItemWriter 구현체
| 구현체 | 용도 |
|---|---|
| JdbcBatchItemWriter | JDBC 배치 INSERT/UPDATE |
| JpaItemWriter | JPA persist/merge |
| FlatFileItemWriter | 파일 쓰기 |
| JsonFileItemWriter | JSON 파일 쓰기 |
| CompositeItemWriter | 여러 Writer 조합 |
JdbcBatchItemWriter
@Bean
public JdbcBatchItemWriter<Customer> jdbcBatchWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("INSERT INTO customer (id, name, email) VALUES (:id, :name, :email)")
.beanMapped()
.build();
}
// ItemPreparedStatementSetter 사용
@Bean
public JdbcBatchItemWriter<Customer> jdbcBatchWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("INSERT INTO customer (id, name, email) VALUES (?, ?, ?)")
.itemPreparedStatementSetter((customer, ps) -> {
ps.setLong(1, customer.getId());
ps.setString(2, customer.getName());
ps.setString(3, customer.getEmail());
})
.build();
}
JpaItemWriter
@Bean
public JpaItemWriter<Customer> jpaWriter(EntityManagerFactory emf) {
JpaItemWriter<Customer> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
writer.setUsePersist(true); // persist 사용 (기본: merge)
return writer;
}
FlatFileItemWriter
@Bean
public FlatFileItemWriter<Customer> fileWriter() {
BeanWrapperFieldExtractor<Customer> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[]{"id", "name", "email"});
DelimitedLineAggregator<Customer> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<Customer>()
.name("customerFileWriter")
.resource(new FileSystemResource("output/customers.csv"))
.encoding("UTF-8")
.headerCallback(writer -> writer.write("ID,NAME,EMAIL"))
.lineAggregator(lineAggregator)
.footerCallback(writer -> writer.write("Total: " + count))
.build();
}
CompositeItemWriter
@Bean
public CompositeItemWriter<Customer> compositeWriter() {
return new CompositeItemWriterBuilder<Customer>()
.delegates(
jpaWriter(),
kafkaWriter(),
cacheWriter()
)
.build();
}
커스텀 ItemWriter
public class CustomItemWriter implements ItemWriter<Customer> {
private final CustomerRepository repository;
@Override
public void write(Chunk<? extends Customer> chunk) {
for (Customer customer : chunk) {
repository.save(customer);
}
// 또는 repository.saveAll(chunk.getItems());
}
}
Tasklet
Chunk 방식이 아닌 단순 작업에 사용.
기본 Tasklet
@Bean
public Step taskletStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("taskletStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
// 단순 작업 수행
log.info("Tasklet 실행");
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
Tasklet 구현
@Component
public class CleanupTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// 임시 파일 삭제
Files.walk(Path.of("/tmp/batch"))
.filter(Files::isRegularFile)
.forEach(path -> {
try {
Files.delete(path);
} catch (IOException e) {
log.error("Failed to delete: {}", path, e);
}
});
return RepeatStatus.FINISHED;
}
}
RepeatStatus
RepeatStatus.FINISHED // 완료, 다음 Step으로 이동
RepeatStatus.CONTINUABLE // 반복 실행 (조건 충족까지)
Flow 제어
순차 실행
@Bean
public Job sequentialJob(JobRepository jobRepository) {
return new JobBuilder("sequentialJob", jobRepository)
.start(step1())
.next(step2())
.next(step3())
.build();
}
조건부 분기
@Bean
public Job conditionalJob(JobRepository jobRepository) {
return new JobBuilder("conditionalJob", jobRepository)
.start(step1())
.on("FAILED").to(failureStep())
.from(step1()).on("*").to(step2())
.from(step2())
.on("COMPLETED").to(successStep())
.from(step2()).on("*").to(errorStep())
.end()
.build();
}
ExitStatus 커스터마이징
@Bean
public Step decisionStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("decisionStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
// 조건에 따라 ExitStatus 설정
if (someCondition) {
contribution.setExitStatus(new ExitStatus("SKIP"));
}
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
@Bean
public Job jobWithCustomExit(JobRepository jobRepository) {
return new JobBuilder("jobWithCustomExit", jobRepository)
.start(decisionStep())
.on("SKIP").to(skipStep())
.from(decisionStep()).on("*").to(normalStep())
.end()
.build();
}
JobExecutionDecider
public class MyDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution,
StepExecution stepExecution) {
String status = someService.checkStatus();
if ("PROCESS".equals(status)) {
return new FlowExecutionStatus("PROCESS");
} else if ("SKIP".equals(status)) {
return new FlowExecutionStatus("SKIP");
}
return FlowExecutionStatus.COMPLETED;
}
}
@Bean
public Job deciderJob(JobRepository jobRepository) {
return new JobBuilder("deciderJob", jobRepository)
.start(initialStep())
.next(myDecider())
.on("PROCESS").to(processStep())
.on("SKIP").to(skipStep())
.from(myDecider()).on("*").to(defaultStep())
.end()
.build();
}
Flow 분리
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.next(step4())
.build();
}
@Bean
public Job flowJob(JobRepository jobRepository) {
return new JobBuilder("flowJob", jobRepository)
.start(flow1())
.next(flow2())
.end()
.build();
}
Skip과 Retry
Skip 설정
@Bean
public Step skipStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("skipStep", jobRepository)
.<String, String>chunk(100, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.skipLimit(10) // 최대 10개까지 스킵
.skip(ValidationException.class)
.skip(FlatFileParseException.class)
.noSkip(FileNotFoundException.class) // 이 예외는 스킵하지 않음
.build();
}
Skip Policy
@Bean
public Step customSkipStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("customSkipStep", jobRepository)
.<String, String>chunk(100, transactionManager)
.reader(reader())
.writer(writer())
.faultTolerant()
.skipPolicy(new CustomSkipPolicy())
.build();
}
public class CustomSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP = 100;
private int skipCount = 0;
@Override
public boolean shouldSkip(Throwable t, long skipCount) {
if (t instanceof ValidationException && this.skipCount < MAX_SKIP) {
this.skipCount++;
return true;
}
return false;
}
}
Retry 설정
@Bean
public Step retryStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("retryStep", jobRepository)
.<String, String>chunk(100, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.retryLimit(3) // 최대 3번 재시도
.retry(DeadlockLoserDataAccessException.class)
.retry(TransientDataAccessException.class)
.noRetry(ValidationException.class) // 이 예외는 재시도하지 않음
.build();
}
Retry + Skip 조합
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<Customer, Customer>chunk(100, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
// Retry 설정
.retryLimit(3)
.retry(TransientDataAccessException.class)
// Skip 설정 (Retry 실패 후 Skip)
.skipLimit(10)
.skip(ValidationException.class)
.skip(TransientDataAccessException.class)
.build();
}
Listener
JobExecutionListener
@Component
public class JobLoggingListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("Job 시작: {}", jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("Job 종료: {} - 상태: {}",
jobExecution.getJobInstance().getJobName(),
jobExecution.getStatus());
if (jobExecution.getStatus() == BatchStatus.FAILED) {
// 알림 발송
notificationService.sendAlert(jobExecution);
}
}
}
StepExecutionListener
@Component
public class StepLoggingListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
log.info("Step 시작: {}", stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("Step 종료: {} - Read: {}, Write: {}, Skip: {}",
stepExecution.getStepName(),
stepExecution.getReadCount(),
stepExecution.getWriteCount(),
stepExecution.getSkipCount());
// ExitStatus 변경 가능
return stepExecution.getExitStatus();
}
}
ChunkListener
@Component
public class ChunkLoggingListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
log.debug("Chunk 처리 시작");
}
@Override
public void afterChunk(ChunkContext context) {
StepExecution stepExecution = context.getStepContext().getStepExecution();
log.info("Chunk 처리 완료 - 누적 Write: {}", stepExecution.getWriteCount());
}
@Override
public void afterChunkError(ChunkContext context) {
log.error("Chunk 처리 중 오류 발생");
}
}
ItemReadListener / ItemProcessListener / ItemWriteListener
@Component
public class ItemLoggingListener implements ItemReadListener<Customer>,
ItemProcessListener<Customer, CustomerDto>,
ItemWriteListener<CustomerDto> {
@Override
public void onReadError(Exception ex) {
log.error("읽기 오류: {}", ex.getMessage());
}
@Override
public void onProcessError(Customer item, Exception ex) {
log.error("처리 오류 - ID: {}, Error: {}", item.getId(), ex.getMessage());
}
@Override
public void onWriteError(Exception ex, Chunk<? extends CustomerDto> items) {
log.error("쓰기 오류 - 건수: {}, Error: {}", items.size(), ex.getMessage());
}
}
SkipListener
@Component
public class SkipLoggingListener implements SkipListener<Customer, CustomerDto> {
@Override
public void onSkipInRead(Throwable t) {
log.warn("읽기 중 스킵: {}", t.getMessage());
}
@Override
public void onSkipInProcess(Customer item, Throwable t) {
log.warn("처리 중 스킵 - ID: {}, 사유: {}", item.getId(), t.getMessage());
// 스킵된 데이터 별도 저장
skipRepository.save(new SkipRecord(item.getId(), t.getMessage()));
}
@Override
public void onSkipInWrite(CustomerDto item, Throwable t) {
log.warn("쓰기 중 스킵 - ID: {}, 사유: {}", item.getId(), t.getMessage());
}
}
Listener 등록
@Bean
public Step stepWithListeners(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("stepWithListeners", jobRepository)
.<Customer, CustomerDto>chunk(100, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.listener(stepLoggingListener)
.listener(chunkLoggingListener)
.listener(itemLoggingListener)
.listener(skipLoggingListener)
.build();
}
@Bean
public Job jobWithListeners(JobRepository jobRepository, Step step) {
return new JobBuilder("jobWithListeners", jobRepository)
.listener(jobLoggingListener)
.start(step)
.build();
}
스케일링
Multi-threaded Step
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Customer, Customer>chunk(100, transactionManager)
.reader(synchronizedReader()) // Thread-safe Reader 필요
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.throttleLimit(4) // 동시 실행 스레드 수 제한
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("batch-");
return executor;
}
@Bean
@StepScope
public SynchronizedItemStreamReader<Customer> synchronizedReader() {
JdbcCursorItemReader<Customer> reader = new JdbcCursorItemReaderBuilder<Customer>()
// ... 설정
.build();
SynchronizedItemStreamReader<Customer> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate(reader);
return syncReader;
}
Parallel Steps
@Bean
public Job parallelJob(JobRepository jobRepository) {
Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.build();
Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
.start(step2())
.build();
Flow flow3 = new FlowBuilder<SimpleFlow>("flow3")
.start(step3())
.build();
Flow parallelFlow = new FlowBuilder<SimpleFlow>("parallelFlow")
.split(taskExecutor())
.add(flow1, flow2, flow3)
.build();
return new JobBuilder("parallelJob", jobRepository)
.start(parallelFlow)
.next(finalStep())
.end()
.build();
}
Partitioning
@Bean
public Step partitionedStep(JobRepository jobRepository) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", partitioner())
.step(workerStep())
.gridSize(4) // 파티션 수
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Partitioner partitioner() {
return gridSize -> {
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putLong("minId", i * 10000L);
context.putLong("maxId", (i + 1) * 10000L - 1);
partitions.put("partition" + i, context);
}
return partitions;
};
}
@Bean
public Step workerStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("workerStep", jobRepository)
.<Customer, Customer>chunk(100, transactionManager)
.reader(partitionedReader(null, null)) // @StepScope로 주입
.writer(writer())
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> partitionedReader(
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
Map<String, Object> params = new HashMap<>();
params.put("minId", minId);
params.put("maxId", maxId);
return new JdbcPagingItemReaderBuilder<Customer>()
.name("partitionedReader")
.dataSource(dataSource)
.selectClause("SELECT *")
.fromClause("FROM customer")
.whereClause("WHERE id BETWEEN :minId AND :maxId")
.sortKeys(Map.of("id", Order.ASCENDING))
.parameterValues(params)
.pageSize(100)
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.build();
}
메타데이터 스키마
주요 테이블
-- Job 관련
BATCH_JOB_INSTANCE -- JobInstance 정보
BATCH_JOB_EXECUTION -- JobExecution 정보
BATCH_JOB_EXECUTION_PARAMS -- JobParameters
-- Step 관련
BATCH_STEP_EXECUTION -- StepExecution 정보
BATCH_STEP_EXECUTION_CONTEXT -- Step ExecutionContext
-- 공통
BATCH_JOB_EXECUTION_CONTEXT -- Job ExecutionContext
테이블 관계
BATCH_JOB_INSTANCE (1) ─────┬───── (*) BATCH_JOB_EXECUTION
│ │
│ ├── (*) BATCH_JOB_EXECUTION_PARAMS
│ │
│ └── (1) BATCH_JOB_EXECUTION_CONTEXT
│
└───── (*) BATCH_STEP_EXECUTION
│
└── (1) BATCH_STEP_EXECUTION_CONTEXT
스키마 초기화
# application.yml
spring:
batch:
jdbc:
initialize-schema: always # always, embedded, never
table-prefix: BATCH_ # 테이블 접두사
Spring Batch 5.x 변경사항
JobBuilderFactory, StepBuilderFactory 제거
// Spring Batch 4.x (Deprecated)
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job oldJob() {
return jobBuilderFactory.get("oldJob")
.start(step())
.build();
}
// Spring Batch 5.x
@Bean
public Job newJob(JobRepository jobRepository) {
return new JobBuilder("newJob", jobRepository)
.start(step())
.build();
}
@Bean
public Step newStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("newStep", jobRepository)
.<String, String>chunk(100, transactionManager)
.reader(reader())
.writer(writer())
.build();
}
@EnableBatchProcessing 변경
// Spring Batch 5.x - Spring Boot 3.x에서는 자동 설정
// @EnableBatchProcessing 생략 가능
@SpringBootApplication
public class BatchApplication {
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
ItemWriter Chunk 파라미터
// Spring Batch 4.x
public void write(List<? extends Customer> items)
// Spring Batch 5.x
public void write(Chunk<? extends Customer> chunk)
JobParameter 타입 확장
// Spring Batch 4.x: String, Long, Double, Date만 지원
// Spring Batch 5.x: 모든 타입 지원
JobParameters params = new JobParametersBuilder()
.addLocalDate("date", LocalDate.now())
.addLocalDateTime("dateTime", LocalDateTime.now())
.addJobParameter("custom", new CustomType(), CustomType.class)
.toJobParameters();
실행 설정
애플리케이션 설정
spring:
batch:
job:
enabled: true # 자동 실행 여부 (기본: true)
name: myJob # 실행할 Job 이름 지정
jdbc:
initialize-schema: always
isolation-level-for-create: default
프로그래밍 방식 실행
@Component
@RequiredArgsConstructor
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final Job myJob;
@Scheduled(cron = "0 0 2 * * *") // 매일 02:00
public void runJob() {
try {
JobParameters params = new JobParametersBuilder()
.addLocalDateTime("runTime", LocalDateTime.now())
.toJobParameters();
JobExecution execution = jobLauncher.run(myJob, params);
log.info("Job 완료: {}", execution.getStatus());
} catch (Exception e) {
log.error("Job 실행 실패", e);
}
}
}
CommandLineRunner로 실행
@Component
@RequiredArgsConstructor
public class BatchRunner implements CommandLineRunner {
private final JobLauncher jobLauncher;
private final Job myJob;
@Override
public void run(String... args) throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("inputFile", args[0])
.addLocalDateTime("runTime", LocalDateTime.now())
.toJobParameters();
jobLauncher.run(myJob, params);
}
}
Best Practices
1. 멱등성 보장
// 동일한 데이터를 다시 처리해도 결과가 같도록
@Bean
public JdbcBatchItemWriter<Customer> idempotentWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("""
INSERT INTO customer (id, name, email, updated_at)
VALUES (:id, :name, :email, :updatedAt)
ON DUPLICATE KEY UPDATE
name = :name,
email = :email,
updated_at = :updatedAt
""")
.beanMapped()
.build();
}
2. 적절한 Chunk Size 설정
// 성능 테스트로 최적 chunk size 결정
// 일반적으로 100 ~ 1000 사이에서 시작
@Bean
public Step optimizedStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("optimizedStep", jobRepository)
.<Customer, Customer>chunk(500, transactionManager)
.reader(reader())
.writer(writer())
.build();
}
3. Reader 최적화
// Paging Reader 사용 시 정렬 키 지정 필수
@Bean
public JdbcPagingItemReader<Customer> optimizedReader(DataSource dataSource) {
return new JdbcPagingItemReaderBuilder<Customer>()
.name("optimizedReader")
.dataSource(dataSource)
.fetchSize(1000) // JDBC fetch size
.pageSize(1000) // 페이지 크기
.selectClause("SELECT id, name, email")
.fromClause("FROM customer")
.whereClause("WHERE status = 'ACTIVE'")
.sortKeys(Map.of("id", Order.ASCENDING)) // 정렬 키 필수
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.build();
}
4. 에러 처리 전략
@Bean
public Step robustStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("robustStep", jobRepository)
.<Customer, Customer>chunk(100, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
// 네트워크/DB 일시 오류는 재시도
.retryLimit(3)
.retry(TransientDataAccessException.class)
.retry(DeadlockLoserDataAccessException.class)
// 데이터 오류는 스킵
.skipLimit(100)
.skip(ValidationException.class)
// 스킵된 항목 로깅
.listener(skipLoggingListener)
.build();
}
5. 트랜잭션 관리
// Reader: 트랜잭션 외부 (각 청크마다 새로운 커서)
// Processor + Writer: 트랜잭션 내부
// Writer에서 외부 API 호출 시 주의
@Bean
public Step transactionAwareStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("transactionAwareStep", jobRepository)
.<Order, Order>chunk(100, transactionManager)
.reader(reader())
.processor(processor())
.writer(items -> {
// DB 저장 (트랜잭션 내)
repository.saveAll(items);
// 외부 API 호출은 별도 처리 권장
// (트랜잭션 롤백 시 API 호출은 되돌릴 수 없음)
})
.build();
}
실무 예제
CSV to Database
@Configuration
@RequiredArgsConstructor
public class CsvToDbJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final DataSource dataSource;
@Bean
public Job csvToDbJob() {
return new JobBuilder("csvToDbJob", jobRepository)
.start(csvToDbStep())
.build();
}
@Bean
public Step csvToDbStep() {
return new StepBuilder("csvToDbStep", jobRepository)
.<Customer, Customer>chunk(1000, transactionManager)
.reader(csvReader())
.processor(validatingProcessor())
.writer(jdbcWriter())
.faultTolerant()
.skipLimit(100)
.skip(FlatFileParseException.class)
.skip(ValidationException.class)
.listener(skipListener())
.build();
}
@Bean
@StepScope
public FlatFileItemReader<Customer> csvReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("csvReader")
.resource(new ClassPathResource("customers.csv"))
.linesToSkip(1)
.delimited()
.names("id", "name", "email", "phone")
.targetType(Customer.class)
.build();
}
@Bean
public ItemProcessor<Customer, Customer> validatingProcessor() {
return customer -> {
if (customer.getEmail() == null || !customer.getEmail().contains("@")) {
throw new ValidationException("Invalid email: " + customer.getId());
}
return customer;
};
}
@Bean
public JdbcBatchItemWriter<Customer> jdbcWriter() {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("INSERT INTO customer (id, name, email, phone) VALUES (:id, :name, :email, :phone)")
.beanMapped()
.build();
}
}
대용량 데이터 마이그레이션
@Configuration
@RequiredArgsConstructor
public class MigrationJobConfig {
@Bean
public Job migrationJob(JobRepository jobRepository, Step partitionedStep) {
return new JobBuilder("migrationJob", jobRepository)
.start(partitionedStep)
.build();
}
@Bean
public Step partitionedStep(JobRepository jobRepository,
Step workerStep,
TaskExecutor taskExecutor) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", rangePartitioner())
.step(workerStep)
.gridSize(10)
.taskExecutor(taskExecutor)
.build();
}
@Bean
public Partitioner rangePartitioner() {
return gridSize -> {
long min = jdbcTemplate.queryForObject("SELECT MIN(id) FROM source_table", Long.class);
long max = jdbcTemplate.queryForObject("SELECT MAX(id) FROM source_table", Long.class);
long range = (max - min) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putLong("minId", min + (i * range));
context.putLong("maxId", Math.min(min + ((i + 1) * range) - 1, max));
partitions.put("partition" + i, context);
}
return partitions;
};
}
@Bean
public Step workerStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("workerStep", jobRepository)
.<SourceEntity, TargetEntity>chunk(1000, transactionManager)
.reader(partitionedReader(null, null))
.processor(migrationProcessor())
.writer(jpaWriter())
.build();
}
@Bean
@StepScope
public JpaPagingItemReader<SourceEntity> partitionedReader(
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
return new JpaPagingItemReaderBuilder<SourceEntity>()
.name("partitionedReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT s FROM SourceEntity s WHERE s.id BETWEEN :minId AND :maxId ORDER BY s.id")
.parameterValues(Map.of("minId", minId, "maxId", maxId))
.pageSize(1000)
.build();
}
}
정리
| 구성요소 | 역할 |
|---|---|
| Job | 배치 처리의 최상위 단위 |
| Step | Job의 독립적인 처리 단위 |
| ItemReader | 데이터 읽기 |
| ItemProcessor | 데이터 변환/필터링 |
| ItemWriter | 데이터 쓰기 |
| JobRepository | 메타데이터 저장 |
| JobLauncher | Job 실행 |
Chunk vs Tasklet
| 방식 | 사용 시점 |
|---|---|
| Chunk | 대량 데이터 처리 (읽기-처리-쓰기 반복) |
| Tasklet | 단순 작업 (파일 삭제, 초기화 등) |
스케일링 전략
| 전략 | 설명 | 사용 시점 |
|---|---|---|
| Multi-threaded | Step 내 병렬 처리 | 간단한 병렬화 |
| Parallel Steps | Step 간 병렬 실행 | 독립적인 Step들 |
| Partitioning | 데이터 분할 처리 | 대용량 데이터 |
Spring Batch는 대용량 데이터 처리의 표준 프레임워크다. Chunk 기반 처리, Skip/Retry, Partitioning을 활용하면 안정적이고 확장 가능한 배치 시스템을 구축할 수 있다.
댓글