Spring Bootにおけるリアクティブプログラミングの核心は非同期データストリームとイベント駆動に基づいており、非ブロッキングI/Oとバックプレッシャー(Backpressure)メカニズムを通じて、高スループット、低遅延のデータ処理を実現します。主要コンポーネントはSpring WebFlux(ReactorライブラリによるReactive Streams仕様の実装)であり、主にFlux(0-N個の要素)とMono(0-1個の要素)の2つの型を使用して非同期シーケンスを表現します。以下に詳細な原理、主要クラスとメソッド、使用例を説明します。
一、基本原理
- 非ブロッキングI/O イベントループ(Event Loop)と少数のスレッドで多数のリクエストを処理し、スレッドのブロッキングを回避します。例えば、HTTPリクエストやデータベース操作はすべて非同期で実行されます。
- バックプレッシャー(Backpressure) 消費者がデータフロー速度を制御し、生産者の過負荷を防ぎます。例えば、下流の処理が遅い場合、上流は自動的にデータ送信を減らします。
- リアクティブストリーム仕様 Reactive Streams標準に基づき、4つの主要な役割があります:
- Publisher(パブリッシャー):データを生成(
Flux/Monoなど)。 - Subscriber(サブスクライバー):データを消費。
- Subscription(サブスクリプション):データフローを管理。
- Processor(プロセッサー):パブリッシャーでありサブスクライバーでもある(比較的使用されません)。
二、主要クラスとメソッド
1. Flux と Mono
Flux<T>:0-N個の要素を表す非同期シーケンス。
// Fluxの作成
Flux<String> flux = Flux.just("Alpha", "Beta", "Gamma"); // 静的データ
Flux<Integer> range = Flux.range(1, 10); // 1から10の整数
Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); // 1秒ごとに数値を生成
// オペレーターの例
Flux<String> upperCase = flux.map(String::toUpperCase); // 変換
Flux<String> filtered = flux.filter(s -> s.startsWith("A")); // フィルタリング
Mono<T>:0-1個の要素を表す非同期シーケンス。
// Monoの作成
Mono<String> mono = Mono.just("こんにちは"); // 静的データ
Mono<String> empty = Mono.empty(); // 空のシーケンス
Mono<String> error = Mono.error(new RuntimeException("エラー")); // エラーシーケンス
// オペレーターの例
Mono<String> concat = Mono.just("こんにちは").map(s -> s + " 世界"); // 文字列の結合
2. 主なオペレーター
| オペレーター | 機能 | 例 |
|---|---|---|
map |
要素の変換 | flux.map(String::toUpperCase) |
filter |
要素のフィルタリング | flux.filter(s -> s.length() > 1) |
flatMap |
非同期変換と結果のマージ | flux.flatMap(s -> Mono.just(s.toUpperCase())) |
zip |
複数ストリームのマージ | Flux.zip(flux1, flux2).map(tuple -> tuple.getT1() + tuple.getT2()) |
onErrorReturn |
エラー発生時のデフォルト値返却 | flux.onErrorReturn("DEFAULT") |
subscribeOn |
サブスクライブ時のスレッドプール指定 | flux.subscribeOn(Schedulers.boundedElastic()) |
3. Spring WebFluxの主要コンポーネント
@RestController:リアクティブエンドポイントを定義し、Flux/Monoを返します。
@RestController
@RequestMapping("/api/products")
public class ProductController {
@GetMapping
public Flux<Product> getAllProducts() {
return productService.findAll(); // 非同期ストリームを返す
}
@GetMapping("/{id}")
public Mono<Product> getProduct(@PathVariable String id) {
return productService.findById(id); // 単一の非同期結果を返す
}
}
ReactiveMongoRepository:リアクティブMongoDB操作インターフェース。
public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
Flux<Product> findByNameContaining(String keyword); // カスタムクエリ
}
WebTestClient:リアクティブAPIのテストツール。
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class ProductControllerTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void testGetAllProducts() {
webTestClient.get().uri("/api/products")
.exchange()
.expectStatus().isOk()
.expectBodyList(Product.class);
}
}
三、完全な使用例
1. プロジェクトのセットアップ
- 依存関係(
pom.xml):``` <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
#### **2. データモデル**
public class Product { private String id; private String name; private String description; private LocalDateTime createdAt; // コンストラクタ、getter/setterは省略 }
#### **3. Repository層**
public interface ProductRepository extends ReactiveMongoRepository<Product, String> { Flux<Product> findByNameContaining(String keyword); }
#### **4. Service層**
@Service public class ProductService { private final ProductRepository productRepository;
public ProductService(ProductRepository productRepository) {
this.productRepository = productRepository;
}
public Flux<Product> findAll() {
return productRepository.findAll();
}
public Mono<Product> findById(String id) {
return productRepository.findById(id);
}
public Mono<Product> save(Product product) {
product.setCreatedAt(LocalDateTime.now());
return productRepository.save(product);
}
}
#### **5. Controller層**
@RestController @RequestMapping("/api/products") public class ProductController { private final ProductService productService;
public ProductController(ProductService productService) {
this.productService = productService;
}
@GetMapping
public Flux<Product> getAllProducts() {
return productService.findAll();
}
@GetMapping("/{id}")
public Mono<Product> getProduct(@PathVariable String id) {
return productService.findById(id);
}
@PostMapping
public Mono<Product> createProduct(@RequestBody Product product) {
return productService.save(product);
}
}
#### **6. APIのテスト**
- **プロジェクト起動**後、`curl`またはPostmanでテスト:```
# すべての製品を取得
curl -X GET http://localhost:8080/api/products
# 製品を作成
curl -X POST http://localhost:8080/api/products \
-H "Content-Type: application/json" \
-d '{"name":"Spring Bootリアクティブ","description":"詳細な説明...","manufacturer":"テスト社"}'
四、適用シーンと注意点
- 適用シーン
- 高スループットAPIゲートウェイ(例:1秒あたり10万+リクエストの処理)。
- リアルタイムメッセージ配信(例:チャットルーム、株価情報)。
- ストリーミング大規模データ処理(例:ログ分析、IoTデバイスデータ収集)。
- 注意点
- 全チェーンリアクティブ:データベースドライバやHTTPクライアントなど、すべてのコンポーネントがリアクティブ対応していることを確認。ブロッキング操作(例:JDBC)を避ける。
- デバッグの難易度:非同期コードのデバッグとエラートレースは複雑。
WebFluxのログ設定を推奨:``` logging: level: reactor.core.publisher: DEBUG
- **学習曲線**:バックプレッシャーやオペレーターなどの概念理解が必要。シンプルなシナリオ(例:CRUDインターフェース)から始めることを推奨。