Spring Bootのリアクティブプログラミングにおいて、Fluxは非同期データストリームを表します。その内容を出力するには、subscribe()メソッドでストリームを購読する必要があります。以下に、Fluxの内容を出力する主要な方法をいくつか紹介します。
1. subscribe()を使用した内容の出力
subscribe()はFluxデータストリームを実行させる最も基本的な方法です。データ、エラー、完了シグナルをそれぞれ処理するコールバック関数を指定できます。
コード例
import reactor.core.publisher.Flux;
public class FluxOutputExample {
public static void main(String[] args) {
Flux<String> flux = Flux.fromArray(new String[]{"Alpha", "Bravo", "Charlie"});
// 基本的な購読方法
flux.subscribe(
data -> System.out.println("受信: " + data), // 各データの処理
error -> System.err.println("エラー: " + error), // エラーの処理
() -> System.out.println("完了!") // ストリーム終了時の処理
);
}
}
出力:
受信: Alpha
受信: Bravo
受信: Charlie
完了!
2. blockLast()またはblockFirst()の使用(テスト用)
Fluxが有限(例:Flux.just()やFlux.fromIterable())の場合、blockLast()で最後の要素を待機し、blockFirst()で最初の要素を待機できます。
注意:
block*()メソッドはスレッドをブロックするため、テストやコマンドラインプログラムでのみ使用し、WebFluxでは避けるべきです。
コード例
Flux<String> flux = Flux.just("Apple", "Banana");
// 最後の要素をブロックして取得(ストリーム全体が実行される)
String last = flux.blockLast();
System.out.println("最後の要素: " + last);
// 最初の要素をブロックして取得
String first = flux.blockFirst();
System.out.println("最初の要素: " + first);
出力:
最後の要素: Banana
最初の要素: Apple
3. toIterable()またはtoStream()の使用
FluxをIterableやStreamに変換し、それを反復処理して出力できます(同期処理のシナリオで有効)。
注意:
toStream()はスレッドをブロックするため、テストやコマンドラインプログラムでのみ使用してください。
コード例
Flux<String> flux = Flux.just("X", "Y", "Z");
// Iterableに変換(非ブロッキングだが、反復処理時に購読がトリガーされる)
Iterable<String> iterable = flux.toIterable();
for (String s : iterable) {
System.out.println("Iterable: " + s);
}
// Streamに変換(ブロッキング)
flux.toStream().forEach(s -> System.out.println("Stream: " + s));
出力:
Iterable: X
Iterable: Y
Iterable: Z
Stream: X
Stream: Y
Stream: Z
4. log()を使用したデバッグ出力
log()はFluxの内部イベント(onSubscribe、onNext、onError、onCompleteなど)をログに出力するため、デバッグに非常に便利です。
コード例
Flux<String> flux = Flux.just("One", "Two", "Three")
.log(); // ログ出力
flux.subscribe();
出力(ログ例):
INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
INFO reactor.Flux.Array.1 - | request(unbounded)
INFO reactor.Flux.Array.1 - | onNext(One)
INFO reactor.Flux.Array.1 - | onNext(Two)
INFO reactor.Flux.Array.1 - | onNext(Three)
INFO reactor.Flux.Array.1 - | onComplete()
5. WebFluxでFluxを返し、Springに処理させる
Spring WebFluxでは、Fluxを直接返すと、Springが自動的に購読し、データストリームを処理します(例:HTTPレスポンス)。
コード例
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class MyController {
@GetMapping("/stream")
public Flux<String> getStream() {
Flux<String> flux = Flux.just("こんにちは", "世界", "!");
flux.subscribe(data -> System.out.println("ログ: " + data)); // オプション:ログ記録
return flux; // Springが自動的に購読して返却
}
}
http://localhost:8080/streamにアクセスした際のコンソール出力:
ログ: こんにちは
ログ: 世界
ログ: !
HTTPレスポンス:
こんにちは
世界
!
動的ストリームの使用例
Flux<String> dynamicFlux = Flux.create(sink -> {
for (int i = 0; i < 5; i++) {
sink.next("動的アイテム " + i);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
sink.error(e);
}
}
sink.complete();
});
dynamicFlux
.doOnNext(item -> System.out.println("[doOnNext] " + item))
.doOnRequest(request -> System.out.println("[doOnRequest] " + request))
.doOnSubscribe(subscription -> System.out.println("[doOnSubscribe] " + subscription))
.doOnEach(signal -> System.out.println("[doOnEach] " + signal))
.doOnComplete(() -> System.out.println("[doOnComplete]"))
.subscribe(
data -> System.out.println("受信: " + data),
error -> System.err.println("エラー: " + error),
() -> System.out.println("完了!")
);
まとめ
| 方法 | 適用シーン | ブロッキング | 備考 |
|---|---|---|---|
subscribe() |
一般的な使用 | 非ブロッキング | 推奨される方法 |
blockLast() / blockFirst() |
テスト | ブロッキング | テスト専用 |
toIterable() / toStream() |
同期反復処理 | ブロッキング | 注意して使用 |
log() |
デバッグ | 非ブロッキング | 内部イベントをログ出力 |
| Spring WebFlux自動購読 | Webアプリケーション | 非ブロッキング | ベストプラクティス |
推奨される方法
- 通常のコードでは:
subscribe()を使用してデータを処理します。 - テストでは:
blockLast()やStepVerifier(Reactorのテストユーティリティ)を使用します。 - WebFluxでは:
Fluxを直接返し、Springに購読を処理させます。 - デバッグ時:
log()を使用してデータストリームのイベントを確認します。
付録:StepVerifierを使用したFluxのテスト(高度)
JUnitでFluxをテストする場合、StepVerifierが推奨されます。
import reactor.test.StepVerifier;
Flux<String> flux = Flux.just("A", "B", "C");
StepVerifier.create(flux)
.expectNext("A")
.expectNext("B")
.expectNext("C")
.verifyComplete(); // ストリームが正常に終了することを検証
これにより、Fluxの内容をテストしつつ、スレッドをブロックすることなく検証できます。