Spring BatchのItemReaderとItemWriterによるファイル入出力処理の実装

ItemReaderとItemWriterの基本概念

バッチ処理の本質は、大量のデータを読み込み、何らかの変換や計算を行い、結果を出力することです。Spring Batchフレームワークでは、この処理を実現するために3つのコアインターフェースを提供しています。

ItemReaderインターフェース

ItemReaderは、様々なデータソースからデータを取得するための抽象化されたインターフェースです。主なデータソースとして以下が挙げられます:

  • フラットファイル: 固定長形式や区切り文字形式(CSV等)のファイルから1行ずつ読み込み
  • XMLファイル: XSDスキーマによる検証を含むXMLデータの解析
  • データベース: SQLクエリの結果セットをオブジェクトにマッピング

インターフェースの定義は以下の通りです:

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

readメソッドは、アイテムが存在する場合はそのアイテムを返し、データが尽きた場合はnullを返します。これにより、呼び出し側は単純なループで全データを処理できます。

ItemWriterインターフェース

ItemWriterはItemReaderと対になるインターフェースで、データの出力を担当します。データベースへの挿入・更新、キューへの送信、ファイルへの書き込みなどが該当します。

public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}

writeメソッドはアイテムのリストを受け取ります。これは、チャンク単位でトランザクション管理を行うSpring Batchの特性上、複数のアイテムをまとめて処理する必要があるためです。書き込み完了後、必要に応じてフラッシュ処理を行います。

ItemProcessorによる変換処理

読み込みと書き込みの間でビジネスロジックを実行したい場合、ItemProcessorインターフェースを使用します。データの変換、フィルタリング、加工などを行えます。

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

入力型と出力型を別々に定義できるため、型変換を伴う処理にも対応しています。以下は、Product型をOrder型に変換する例です:

public class ProductToOrderProcessor implements ItemProcessor<Product, Order> {
    
    @Override
    public Order process(Product product) throws Exception {
        if (product.getStock() <= 0) {
            return null; // nullを返すとアイテムはフィルタリングされる
        }
        return new Order(product.getId(), product.getName(), calculateOrderQuantity(product));
    }
    
    private int calculateOrderQuantity(Product product) {
        return Math.min(product.getStock(), 100);
    }
}

Step定義への組み込み例:

@Bean
public Step processingStep() {
    return stepBuilderFactory.get("processingStep")
            .<Product, Order>chunk(100)
            .reader(productItemReader())
            .processor(productToOrderProcessor())
            .writer(orderItemWriter())
            .build();
}

フラットファイル処理の実装

フラットファイルは、バッチ処理で最も一般的なデータ交換形式の一つです。区切り文字形式と固定長形式の2種類があります。

FieldSetによるデータバインディング

FieldSetは、ファイルから読み込んだ文字列を型安全に扱うための抽象化です。JDBCのResultSetと同様の感覚で、インデックスまたはフィールド名で値にアクセスできます。

String[] columns = {"productName", "price", "available"};
FieldSet fieldSet = new DefaultFieldSet(new String[]{"Laptop", "98000", "true"}, columns);

String name = fieldSet.readString("productName");
int price = fieldSet.readInt(1);
boolean inStock = fieldSet.readBoolean("available");
// Date、Long、BigDecimalなどもサポート

FlatFileItemReaderの構成

FlatFileItemReaderは、フラットファイルを読み込むための主要なコンポーネントです。読み込み処理は以下の3段階で構成されます:

  1. ファイルから1行を読み込む
  2. LineTokenizerで行をFieldSetに変換
  3. FieldSetMapperでFieldSetをドメインオブジェクトにマッピング

LineMapperインターフェースは、これらを統合したエントリーポイントです:

public interface LineMapper<T> {
    T mapLine(String line, int lineNumber) throws Exception;
}

LineTokenizerは文字列行をFieldSetに分割します:

public interface LineTokenizer {
    FieldSet tokenize(String line);
}

主な実装クラス:

  • DelimitedLineTokenizer: CSVやTSVなどの区切り文字形式
  • FixedLengthTokenizer: 固定長形式
  • PatternMatchingCompositeLineTokenizer: 行パターンに応じたトークナイザの使い分け

FieldSetMapperはFieldSetをオブジェクトに変換します:

public interface FieldSetMapper<T> {
    T mapFieldSet(FieldSet fieldSet) throws BindException;
}

DefaultLineMapperを使用した読み込み設定例:

@Bean
public FlatFileItemReader<Employee> employeeItemReader() {
    DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<>();
    
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames("empId", "empName", "department", "salary");
    tokenizer.setDelimiter(",");
    
    BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(Employee.class);
    
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
    
    FlatFileItemReader<Employee> reader = new FlatFileItemReader<>();
    reader.setName("employeeItemReader");
    reader.setResource(new ClassPathResource("data/employees.csv"));
    reader.setLineMapper(lineMapper);
    reader.setLinesToSkip(1); // ヘッダー行をスキップ
    
    return reader;
}

FlatFileItemWriterの構成

書き込み処理は読み込みの逆順で行われます。LineAggregatorインターフェースがオブジェクトを行文字列に変換します:

public interface LineAggregator<T> {
    String aggregate(T item);
}

基本的な実装として、PassThroughLineAggregatorがあります:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {
    @Override
    public String aggregate(T item) {
        return item.toString();
    }
}

ドメインオブジェクトを書き出す場合、FieldExtractorを使用してオブジェクトからフィールド値を抽出します:

public interface FieldExtractor<T> {
    Object[] extract(T item);
}

BeanWrapperFieldExtractorを使用した書き込み設定例:

@Bean
public FlatFileItemWriter<Employee> employeeItemWriter() {
    BeanWrapperFieldExtractor<Employee> extractor = new BeanWrapperFieldExtractor<>();
    extractor.setNames(new String[]{"empId", "empName", "department", "salary"});
    
    DelimitedLineAggregator<Employee> lineAggregator = new DelimitedLineAggregator<>();
    lineAggregator.setDelimiter("|");
    lineAggregator.setFieldExtractor(extractor);
    
    FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>();
    writer.setName("employeeItemWriter");
    writer.setResource(new FileSystemResource("output/processed_employees.txt"));
    writer.setLineAggregator(lineAggregator);
    writer.setHeaderCallback(headerWriter -> headerWriter.write("ID|NAME|DEPT|SALARY"));
    
    return writer;
}

FlatFileItemWriterBuilderを使用した簡潔な設定:

@Bean
public FlatFileItemWriter<Order> orderItemWriter() {
    return new FlatFileItemWriterBuilder<Order>()
            .name("orderItemWriter")
            .resource(new FileSystemResource("output/orders.dat"))
            .delimited()
            .delimiter("\t")
            .names("orderId", "customerName", "totalAmount", "orderDate")
            .headerCallback(header -> header.write("ORDER_ID\tCUSTOMER\tAMOUNT\tDATE"))
            .append(true)  // 既存ファイルに追記
            .build();
}

タグ: Spring Batch ItemReader ItemWriter FlatFileItemReader FlatFileItemWriter

5月21日 08:12 投稿