Spring Batch์์ ๋๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ๋ค๋ค๋ณธ ๊ฐ๋ฐ์๋ผ๋ฉด ํ ๋ฒ์ฏค ๊ณ ๋ฏผํ์ ๊ฒ์
๋๋ค.
โJPA ๊ธฐ๋ฐ์ผ๋ก ์ปค์(Cursor)์ฒ๋ผ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ ์ ์์๊น?โ
์ค๋์ ๊ทธ ํต์ฌ ์ปดํฌ๋ํธ์ธ JpaCursorItemReader ์ ์ญ์ฌ์, ์ HibernateCursorItemReader๊ฐ ์ฌ๋ผ์ก๋์ง, ๊ทธ๋ฆฌ๊ณ MySQL๊ณผ PostgreSQL์ ์ปค์ ๋์ ์ฐจ์ด๊น์ง ์ค์ ์คํ ๋ฐ์ดํฐ์ ํจ๊ป ์ ๋ฆฌํด๋ณด๊ฒ ์ต๋๋ค.
JpaCursorItemReader๋ Spring Batch 4.3 (2020๋
๊ฒฝ) ์ ๋์
๋ ์ปดํฌ๋ํธ์
๋๋ค.
๊ทธ ์ด์ ๊น์ง๋ JPA ํ๊ฒฝ์์ ์ปค์ ๊ธฐ๋ฐ ๋ฐฐ์น๋ฅผ ๊ตฌํํ๋ ค๋ฉด Hibernate์ ScrollableResults๋ฅผ ์ง์ ์ฌ์ฉํ๊ฑฐ๋, PagingItemReader๋ฅผ ์ฐํ์ ์ผ๋ก ์ด์ฉํด์ผ ํ์ต๋๋ค.
ํ์ง๋ง Paging ๋ฐฉ์์ offset ๊ธฐ๋ฐ ํ์ด์ง ์ฟผ๋ฆฌ ํน์ฑ์ ํฐ ๋ฐ์ดํฐ์
์์๋ ์ฑ๋ฅ์ด ๊ธ๊ฒฉํ ์ ํ๋๋ ๋ฌธ์ ๊ฐ ์์์ต๋๋ค.
๊ทธ๋์ Spring ํ์ โJPA ํ๊ฒฝ์์๋ ์ปค์์ฒ๋ผ ์์ฐจ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฝ์โ๋ ๋ชฉํ๋ก JpaCursorItemReader๋ฅผ ๋์
ํ์ต๋๋ค.
์ด๊ธฐ JpaCursorItemReader๋ ์ด๋ฆ๋ง Cursor์์ ๋ฟ, ์ค์ DB ์ปค์(fetchSize ๊ธฐ๋ฐ streaming) ๊ฐ ์๋ JPA EntityManager์ ResultList ์ ์ฒด ๋ก๋ฉ ๊ตฌ์กฐ์์ต๋๋ค.
์ฆ, ๋ด๋ถ์ ์ผ๋ก๋ ์๋ ๋ก์ง์ด ๋ฐ๋ณต๋์์ต๋๋ค.
Query query = entityManager.createQuery("SELECT e FROM Entity e");
List<Entity> results = query.getResultList(); // ์ ์ฒด fetch
๊ฒฐ๊ตญ โ์ปค์์ฒ๋ผ ์๊ฒผ์ง๋ง ์ ํ ์ปค์๊ฐ ์๋์๋โ ๊ฒ์
๋๋ค.
์ด ๋๋ฌธ์ ์์ญ๋ง ๊ฑด ์ด์ ์ฒ๋ฆฌ ์ ํ ๋ฉ๋ชจ๋ฆฌ ๊ธ๋ฑ โ OutOfMemoryError ๊ฐ ์์ฃผ ๋ฐ์ํ์ต๋๋ค.
Spring Batch 5.2.0-M1๋ถํฐ JpaCursorItemReader๋ JPA 2.2์ getResultStream() API๋ฅผ ํ์ฉํ๋๋ก ๊ฐ์ ๋์์ต๋๋ค.
์ด์ Hibernate 5.5+ ํ๊ฒฝ์์๋ ์ค์ ๋ก Streaming + FetchSize ์กฐํฉ์ด ์๋ํฉ๋๋ค.
๐ ์ฆ,
Query.getResultStream()โ JDBC ๋๋ผ์ด๋ฒ์ fetchSize๊น์ง ๋ด๋ ค๊ฐ๋ฉฐ,
๋๋ผ์ด๋ฒ ์์ค์์ ์ปค์๊ฐ ์ ์ง๋ฉ๋๋ค.
ํ์ง๋ง ์ด ์ญ์ JPA ๊ตฌํ์ฒด์ DB ๋๋ผ์ด๋ฒ์ ์ง์ ์ฌ๋ถ์ ๋ฐ๋ผ ๋ฌ๋ผ์ง๋๋ค.
MySQL์ ์ด ๊ธฐ๋ฅ์ ์์ ํ ์ง์ํ์ง ์์ง๋ง, PostgreSQL์ ์๋นํ ์์ฑ๋ ๋๊ฒ ์ง์ํฉ๋๋ค.
์ด๊ฒ์ด ์ด ๊ธ์ ์ฐ๊ฒ ๋ ์ด์ ์ด๊ธฐ๋ ํ๋ฐ์, PostgreSQL์ด ์ปค์ ๊ธฐ๋ฅ์ ํ๋ฅญํ๊ฒ ์ง์ํ๋ ๊ฒ๊ณผ๋ ๋ฌ๋ฆฌ, MySQL์์๋ ๋ณ๋์ ์ค์ ์ด ํ์ํ๋ค๋ ๊ฒ์ ๊นจ๋ฌ์๊ธฐ ๋๋ฌธ์ ๋๋ค.
Spring Batch 3.x ~ 4.0 ์์ ์๋ HibernateCursorItemReader๊ฐ ์กด์ฌํ์ต๋๋ค.
๊ทธ๋ฌ๋ ์ด ์ปดํฌ๋ํธ๋ ๋ค์๊ณผ ๊ฐ์ ์ด์ ๋ก ํ๊ธฐ๋์์ต๋๋ค.
ScrollableResults ์ฌ์ฉ ์ Session ์ ์ง ์๊ฐ์ด ๊ธธ์ด์ ธ DB Connection Pool์ด ์ฝ๊ฒ ๊ณ ๊ฐ๋์์ต๋๋ค.getResultStream()์ด ๋์
๋๋ฉฐ, HibernateCursorItemReader๋ ์ค๋ณต ๊ธฐ๋ฅ์ด ๋์์ต๋๋ค.์ฆ, Hibernate ์ ์ฉ์์ JPA ํ์ค ๊ธฐ๋ฐ(JpaCursorItemReader)์ผ๋ก ๋ฐฉํฅ์ด ์ฎ๊ฒจ๊ฐ ๊ฒ์
๋๋ค.
์ปค์ ๊ธฐ๋ฐ ํ์นญ์ ๋จ์ํ โfetchSize๋ฅผ ์ค์ ํ๋ฉด ๋๋คโ ์์ค์ด ์๋๋๋ค.
JDBC ๋๋ผ์ด๋ฒ๊ฐ ์ด๋ค ์์ ์ ์ปค์๋ฅผ ํธ๋ฆฌ๊ฑฐํ ์ง์ ๋ฐ๋ผ ์์ ํ ๋ค๋ฅด๊ฒ ์๋ํฉ๋๋ค.
| ๊ตฌ๋ถ | MySQL | PostgreSQL |
|---|---|---|
| JDBC ์ต์ | useServerPrepStmts, useCursorFetch=true, defaultFetchSize ํ์ |
defaultFetchSize๋ง์ผ๋ก ์ถฉ๋ถ |
| ์คํ ๋ฐฉ์ | ์๋ฒ ์ฌ์ด๋ ์ปค์๋ฅผ ๋ช ์์ ์ผ๋ก ํ์ฑํํด์ผ ํจ | ๊ธฐ๋ณธ์ ์ผ๋ก ์๋ฒ ์ปค์๋ฅผ ์ง์ |
| ์ฟผ๋ฆฌ ์ ์ก ๊ตฌ์กฐ | ์ ์ฒด ๊ฒฐ๊ณผ๋ฅผ ํด๋ผ์ด์ธํธ๋ก ๋ฒํผ๋ง ํ ์ ๋ฌ (์ต์ ๋นํ์ฑ ์) | ์๋ฒ ๋ด๋ถ ์ปค์ ์ ์ง ํ ํ์ํ ๋๋ง fetch |
| ์ฒ ํ | ๋จ์ผ ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ๋น ๋ฅด๊ฒ ๋ฐํ | ๋๊ท๋ชจ ์คํธ๋ฆฌ๋ฐ๊ณผ ํธ๋์ญ์ ์์ ์ฑ ์ค์ฌ |
| ์ค์ง์ ์ปค์ ๋ชจ๋ | ์ต์ ์์ผ๋ฉด โ๊ฐ์ง ์ปค์โ | ํญ์ โ์ง์ง ์ปค์โ |
๊ฒฐ๊ตญ MySQL์์๋
useServerPrepStmts + useCursorFetch=true + defaultFetchSize=N
์ธ ์ต์
์ด ๋ชจ๋ ํ์ํด์ผ ์ค์ ์ปค์ ๊ธฐ๋ฐ ์คํธ๋ฆฌ๋ฐ์ด ์๋ํฉ๋๋ค.
MySQL JDBC๋๋ผ์ด๋ฒ๋ฅผ ์ฌ์ฉํ ๋, ์ต์ ์ง์ ์์ด๋ Cursor๊ธฐ๋ฐ ๋ฐฐ์น์ฒ๋ฆฌ๊ฐ ์๋๋๋ก ๋์ํ์ง ์์์ ์ฆ๋ช ํ๊ธฐ ์ํด, ๊ฐ๋จํ ์คํ์ ํ๋ ์ค๋นํ์ต๋๋ค.
ํด์ฑ๋๊ธฐ ์ ์ ๋ฌธ์์ด์ ๊ฐ์ง๋ BeforeHash ์ํฐํฐ์, ํด๋น ๋ฌธ์์ด์ ํด์ฑํ ๊ฒฐ๊ณผ๋ฅผ ์ ์ฅํ๋ AfterHash์ํฐํฐ๋ฅผ ์ ์ํ๊ฒ ์ต๋๋ค:
@Entity
public class BeforeHash {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, columnDefinition = "TEXT")
private String content;
}
@Entity
public class AfterHash {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, columnDefinition = "TEXT")
private String content;
@OneToOne
@JoinColumn(name = "source_id", nullable = false)
private BeforeHash beforeHash;
}
์ฌ๊ธฐ์ ์ค์ํ๊ฒ์ Reader ๋ฟ์ด๋ฏ๋ก, Processor๋ Writer๋ ์๋ตํ๊ณ , Reader ๊ตฌ์ฑ๋ง ์ดํด๋ณด๊ฒ ์ต๋๋ค:
@Configuration
public class BeforeHashItemReaderConfig {
@Bean
@StepScope
@Qualifier("beforeHashItemReader")
public JpaCursorItemReader<BeforeHash> beforeHashItemReader(EntityManagerFactory emf) {
return new JpaCursorItemReaderBuilder<BeforeHash>()
.name("beforeHashItemReader")
.entityManagerFactory(emf)
.queryString(
"""
SELECT b FROM BeforeHash b
WHERE NOT EXISTS
(SELECT a FROM AfterHash a WHERE a.beforeHash.id = b.id)
ORDER BY b.id ASC
"""
)
.build();
}
}
์์ง AfterHash๊ฐ ์์ฑ๋์ง ์์ BeforeHash๋ค์ ๋ชจ์์ Cursor ๋ฐฉ์์ผ๋ก ๊ฐ์ ธ์ค๋๋ก ํฉ๋๋ค.
Step ๋ฐ Job์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค:
@Configuration
public class HashJobStep {
@Bean
@Qualifier("hashStep")
public Step hashStep(
JobRepository jobRepository,
PlatformTransactionManager txManager,
@Qualifier("beforeHashItemReader")ItemReader<BeforeHash> reader,
HashItemProcessor processor,
@Qualifier("afterHashCreateWriter") ItemWriter<AfterHash> writer
) {
return new StepBuilder("hashStep", jobRepository)
.<BeforeHash, AfterHash>chunk(500, txManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
@Qualifier("hashJob")
public Job hashJob(
JobRepository jobRepository,
@Qualifier("hashStep") Step hashStep
) {
return new JobBuilder("hashJob", jobRepository)
.start(hashStep)
.build();
}
}
500๊ฐ์ฉ ์ฒญํฌ๋ก ๋ฌถ์ด ์ฒ๋ฆฌํ๋๋ก ํด๋ณด๊ฒ ์ต๋๋ค.
ํ ์คํธ ๋ฐ์ดํฐ๋ก๋, BeforeHash ํ ์ด๋ธ์ ์ฝ 150๋ง๊ฐ์ Row๋ฅผ ์ค๋นํด ๋์๋๋ฐ์, ๊ธฐ๋ณธ์ ์ผ๋ก ํ์ ์ฌ๋ผ๊ฐ ์๋ ๋ฐ์ดํฐ๋ค(ํด๋์ค ๋ก๋, String ์์ ํ ๋ฑ)์ด ์์ด์ ์๋์ ์ผ๋ก ํฌ๊ฒ ๋น๊ต๋ฅผ ํ๊ธฐ ์ํด ๋ง์ ๋ฐ์ดํฐ๋ฅผ ์ค๋นํด ๋์์ต๋๋ค.
select count(*) from before_hash;

์ด์ ์ค์ Job์ ํธ๋ฆฌ๊ฑฐ ํด๋ณด๊ฒ ์ต๋๋ค.
useCursorFetch=false)spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/batch
?useSSL=true
&useUnicode=true
&maxQuerySizeToLog=999999
&zeroDateTimeBehavior=convertToNull
&rewriteBatchedStatements=true
๋ฐ๋ก ์ ์ค์ ์ด, ์ ๊ฐ ์ฒ์ MySQL๋ก Cursor ๋ฐฐ์น ์ฒ๋ฆฌ๋ฅผ ํ์ ๋์ ์ค์ ์
๋๋ค.
๋ฐ๋ก useServerPrepStmts, defaultFetchSize๋ useCursorFetch์ต์
์ ์ง์ ํ์ง ์์์ต๋๋ค.
์ด ๊ฒฝ์ฐ, 150๋ง๊ฑด์ ๋ฐ์ดํฐ๋ฅผ JpaCursorItemReader๋ก ๋ฐฐ์น์ฒ๋ฆฌํ๋ ๊ฒฝ์ฐ, ํ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ ์๋์ฒ๋ผ ์ ํ์ ์ผ๋ก ์ฆ๊ฐํฉ๋๋ค.
๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ JVM์ ์ง์ ์ฌ๋ฆฌ๊ณ ์ฒ๋ฆฌํ๊ธฐ ๋๋ฌธ์ ๋๋ค.

fetchSize ์ค์ ๋ฌด์, ์ ์ฒด ๊ฒฐ๊ณผ๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฌํฉ๋๋ค.spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/batch
?useSSL=true
&useUnicode=true
&useServerPrepStmts=true
&maxQuerySizeToLog=999999
&zeroDateTimeBehavior=convertToNull
&rewriteBatchedStatements=true
&useServerPrepStmts=true # ๐ก ์ถ๊ฐ!
&defaultFetchSize=500 # ๐ก ์ถ๊ฐ!
&useCursorFetch=true # ๐ก ์ถ๊ฐ!
์ด๋ฒ์ ์ปค์๋ฅผ ํ์ฑํํ๊ธฐ ์ํด useServerPrepStmts, defaultFetchSize, useCursorFetch ์ต์
์ ํ์ฑํํ์ต๋๋ค.
๊ฒฐ๊ณผ๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.

ํ๋ฉ๋ชจ๋ฆฌ ์ ์ ์จ๋ฆฌ ๋๋๋๋ก ์ค์ด๋ ๊ฒ์ด ๋ณด์ด๋์?
์์ ํ ์ปค์ ๋์์ ๊ธฐ๋ํ๋ ค๋ฉด, ์ฌ์ ํ JPA ๊ตฌํ์ฒด์ DB ๋๋ผ์ด๋ฒ์ ํ์กฐ๊ฐ ํ์์ ๋๋ค.
useServerPrepStmts + useCursorFetch=true + defaultFetchSize(๊ธฐ๋ณธ๊ฐ 500) ๋ช
์Cursor์ ๋์ ๋ฐฉ์์ ์ดํดํ๋ ค๋ฉด, PreparedStatement๋ผ๋ ๊ฒ์ ๋จผ์ ์ดํดํด์ผ ํ๋๋ฐ์, ์์๊ฐ ๋ฐ๋ ๋๋์ด ์์ง๋ง ๋ค์ ์ํฐํด์์๋ ์ด์ ๊ดํด ์ดํด๋ณด๊ฒ ์ต๋๋ค.