ItemReaderとItemWriterの基本概念
バッチ処理の本質は、大量のデータを読み込み、何らかの変換や計算を行い、結果を出力することです。Spring Batchフレームワークでは、この処理を実現するために3つのコアインターフェースを提供しています。
ItemReaderインターフェース
ItemReaderは、様々なデータソースからデータを取得するための抽象化されたインターフェースです。主なデータソースとして以下が挙げられます:
- フラットファイル: 固定長形式や区切り文字形式(CSV等)のファイルから1行ずつ読み込み
- XMLファイル: XSDスキーマによる検証を含むXMLデータの解析
- データベース: SQLクエリの結果セットをオブジェクトにマッピング
インターフェースの定義は以下の通りです:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
readメソッドは、アイテムが存在する場合はそのアイテムを返し、データが尽きた場合はnullを返します。これにより、呼び出し側は単純なループで全データを処理できます。
ItemWriterインターフェース
ItemWriterはItemReaderと対になるインターフェースで、データの出力を担当します。データベースへの挿入・更新、キューへの送信、ファイルへの書き込みなどが該当します。
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
writeメソッドはアイテムのリストを受け取ります。これは、チャンク単位でトランザクション管理を行うSpring Batchの特性上、複数のアイテムをまとめて処理する必要があるためです。書き込み完了後、必要に応じてフラッシュ処理を行います。
ItemProcessorによる変換処理
読み込みと書き込みの間でビジネスロジックを実行したい場合、ItemProcessorインターフェースを使用します。データの変換、フィルタリング、加工などを行えます。
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
入力型と出力型を別々に定義できるため、型変換を伴う処理にも対応しています。以下は、Product型をOrder型に変換する例です:
public class ProductToOrderProcessor implements ItemProcessor<Product, Order> {
@Override
public Order process(Product product) throws Exception {
if (product.getStock() <= 0) {
return null; // nullを返すとアイテムはフィルタリングされる
}
return new Order(product.getId(), product.getName(), calculateOrderQuantity(product));
}
private int calculateOrderQuantity(Product product) {
return Math.min(product.getStock(), 100);
}
}
Step定義への組み込み例:
@Bean
public Step processingStep() {
return stepBuilderFactory.get("processingStep")
.<Product, Order>chunk(100)
.reader(productItemReader())
.processor(productToOrderProcessor())
.writer(orderItemWriter())
.build();
}
フラットファイル処理の実装
フラットファイルは、バッチ処理で最も一般的なデータ交換形式の一つです。区切り文字形式と固定長形式の2種類があります。
FieldSetによるデータバインディング
FieldSetは、ファイルから読み込んだ文字列を型安全に扱うための抽象化です。JDBCのResultSetと同様の感覚で、インデックスまたはフィールド名で値にアクセスできます。
String[] columns = {"productName", "price", "available"};
FieldSet fieldSet = new DefaultFieldSet(new String[]{"Laptop", "98000", "true"}, columns);
String name = fieldSet.readString("productName");
int price = fieldSet.readInt(1);
boolean inStock = fieldSet.readBoolean("available");
// Date、Long、BigDecimalなどもサポート
FlatFileItemReaderの構成
FlatFileItemReaderは、フラットファイルを読み込むための主要なコンポーネントです。読み込み処理は以下の3段階で構成されます:
- ファイルから1行を読み込む
- LineTokenizerで行をFieldSetに変換
- FieldSetMapperでFieldSetをドメインオブジェクトにマッピング
LineMapperインターフェースは、これらを統合したエントリーポイントです:
public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
LineTokenizerは文字列行をFieldSetに分割します:
public interface LineTokenizer {
FieldSet tokenize(String line);
}
主な実装クラス:
- DelimitedLineTokenizer: CSVやTSVなどの区切り文字形式
- FixedLengthTokenizer: 固定長形式
- PatternMatchingCompositeLineTokenizer: 行パターンに応じたトークナイザの使い分け
FieldSetMapperはFieldSetをオブジェクトに変換します:
public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
DefaultLineMapperを使用した読み込み設定例:
@Bean
public FlatFileItemReader<Employee> employeeItemReader() {
DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("empId", "empName", "department", "salary");
tokenizer.setDelimiter(",");
BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Employee.class);
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
FlatFileItemReader<Employee> reader = new FlatFileItemReader<>();
reader.setName("employeeItemReader");
reader.setResource(new ClassPathResource("data/employees.csv"));
reader.setLineMapper(lineMapper);
reader.setLinesToSkip(1); // ヘッダー行をスキップ
return reader;
}
FlatFileItemWriterの構成
書き込み処理は読み込みの逆順で行われます。LineAggregatorインターフェースがオブジェクトを行文字列に変換します:
public interface LineAggregator<T> {
String aggregate(T item);
}
基本的な実装として、PassThroughLineAggregatorがあります:
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
@Override
public String aggregate(T item) {
return item.toString();
}
}
ドメインオブジェクトを書き出す場合、FieldExtractorを使用してオブジェクトからフィールド値を抽出します:
public interface FieldExtractor<T> {
Object[] extract(T item);
}
BeanWrapperFieldExtractorを使用した書き込み設定例:
@Bean
public FlatFileItemWriter<Employee> employeeItemWriter() {
BeanWrapperFieldExtractor<Employee> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[]{"empId", "empName", "department", "salary"});
DelimitedLineAggregator<Employee> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter("|");
lineAggregator.setFieldExtractor(extractor);
FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
writer.setName("employeeItemWriter");
writer.setResource(new FileSystemResource("output/processed_employees.txt"));
writer.setLineAggregator(lineAggregator);
writer.setHeaderCallback(headerWriter -> headerWriter.write("ID|NAME|DEPT|SALARY"));
return writer;
}
FlatFileItemWriterBuilderを使用した簡潔な設定:
@Bean
public FlatFileItemWriter<Order> orderItemWriter() {
return new FlatFileItemWriterBuilder<Order>()
.name("orderItemWriter")
.resource(new FileSystemResource("output/orders.dat"))
.delimited()
.delimiter("\t")
.names("orderId", "customerName", "totalAmount", "orderDate")
.headerCallback(header -> header.write("ORDER_ID\tCUSTOMER\tAMOUNT\tDATE"))
.append(true) // 既存ファイルに追記
.build();
}