Spring Bootのリアクティブプログラミング:FluxとMonoのデータ出力方法

Spring Bootのリアクティブプログラミングにおいて、Fluxは非同期データストリームを表します。その内容を出力するには、subscribe()メソッドでストリームを購読する必要があります。以下に、Fluxの内容を出力する主要な方法をいくつか紹介します。

1. subscribe()を使用した内容の出力

subscribe()Fluxデータストリームを実行させる最も基本的な方法です。データ、エラー、完了シグナルをそれぞれ処理するコールバック関数を指定できます。

コード例

import reactor.core.publisher.Flux;

public class FluxOutputExample {
    public static void main(String[] args) {
        Flux<String> flux = Flux.fromArray(new String[]{"Alpha", "Bravo", "Charlie"});

        // 基本的な購読方法
        flux.subscribe(
            data -> System.out.println("受信: " + data), // 各データの処理
            error -> System.err.println("エラー: " + error), // エラーの処理
            () -> System.out.println("完了!") // ストリーム終了時の処理
        );
    }
}

出力

受信: Alpha
受信: Bravo
受信: Charlie
完了!

2. blockLast()またはblockFirst()の使用(テスト用)

Fluxが有限(例:Flux.just()Flux.fromIterable())の場合、blockLast()で最後の要素を待機し、blockFirst()で最初の要素を待機できます。

注意: block*()メソッドはスレッドをブロックするため、テストやコマンドラインプログラムでのみ使用し、WebFluxでは避けるべきです。

コード例

Flux<String> flux = Flux.just("Apple", "Banana");

// 最後の要素をブロックして取得(ストリーム全体が実行される)
String last = flux.blockLast();
System.out.println("最後の要素: " + last);

// 最初の要素をブロックして取得
String first = flux.blockFirst();
System.out.println("最初の要素: " + first);

出力

最後の要素: Banana
最初の要素: Apple

3. toIterable()またはtoStream()の使用

FluxIterableStreamに変換し、それを反復処理して出力できます(同期処理のシナリオで有効)。

注意: toStream()はスレッドをブロックするため、テストやコマンドラインプログラムでのみ使用してください。

コード例

Flux<String> flux = Flux.just("X", "Y", "Z");

// Iterableに変換(非ブロッキングだが、反復処理時に購読がトリガーされる)
Iterable<String> iterable = flux.toIterable();
for (String s : iterable) {
    System.out.println("Iterable: " + s);
}

// Streamに変換(ブロッキング)
flux.toStream().forEach(s -> System.out.println("Stream: " + s));

出力

Iterable: X
Iterable: Y
Iterable: Z
Stream: X
Stream: Y
Stream: Z

4. log()を使用したデバッグ出力

log()Fluxの内部イベント(onSubscribeonNextonErroronCompleteなど)をログに出力するため、デバッグに非常に便利です。

コード例

Flux<String> flux = Flux.just("One", "Two", "Three")
    .log(); // ログ出力

flux.subscribe();

出力(ログ例)

INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
INFO  reactor.Flux.Array.1 - | request(unbounded)
INFO  reactor.Flux.Array.1 - | onNext(One)
INFO  reactor.Flux.Array.1 - | onNext(Two)
INFO  reactor.Flux.Array.1 - | onNext(Three)
INFO  reactor.Flux.Array.1 - | onComplete()

5. WebFluxでFluxを返し、Springに処理させる

Spring WebFluxでは、Fluxを直接返すと、Springが自動的に購読し、データストリームを処理します(例:HTTPレスポンス)。

コード例

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class MyController {
    @GetMapping("/stream")
    public Flux<String> getStream() {
        Flux<String> flux = Flux.just("こんにちは", "世界", "!");
        flux.subscribe(data -> System.out.println("ログ: " + data)); // オプション:ログ記録
        return flux; // Springが自動的に購読して返却
    }
}

http://localhost:8080/streamにアクセスした際のコンソール出力

ログ: こんにちは
ログ: 世界
ログ: !

HTTPレスポンス

こんにちは
世界
!

動的ストリームの使用例

        Flux<String> dynamicFlux = Flux.create(sink -> {
            for (int i = 0; i < 5; i++) {
                sink.next("動的アイテム " + i);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    sink.error(e);
                }
            }
            sink.complete();
        });

        dynamicFlux
            .doOnNext(item -> System.out.println("[doOnNext] " + item))
            .doOnRequest(request -> System.out.println("[doOnRequest] " + request))
            .doOnSubscribe(subscription -> System.out.println("[doOnSubscribe] " + subscription))
            .doOnEach(signal -> System.out.println("[doOnEach] " + signal))
            .doOnComplete(() -> System.out.println("[doOnComplete]"))
            .subscribe(
                data -> System.out.println("受信: " + data),
                error -> System.err.println("エラー: " + error),
                () -> System.out.println("完了!")
            );

まとめ

方法 適用シーン ブロッキング 備考
subscribe() 一般的な使用 非ブロッキング 推奨される方法
blockLast() / blockFirst() テスト ブロッキング テスト専用
toIterable() / toStream() 同期反復処理 ブロッキング 注意して使用
log() デバッグ 非ブロッキング 内部イベントをログ出力
Spring WebFlux自動購読 Webアプリケーション 非ブロッキング ベストプラクティス

推奨される方法

  • 通常のコードでは: subscribe()を使用してデータを処理します。
  • テストでは: blockLast()StepVerifier(Reactorのテストユーティリティ)を使用します。
  • WebFluxでは: Fluxを直接返し、Springに購読を処理させます。
  • デバッグ時: log()を使用してデータストリームのイベントを確認します。

付録:StepVerifierを使用したFluxのテスト(高度)

JUnitでFluxをテストする場合、StepVerifierが推奨されます。

import reactor.test.StepVerifier;

Flux<String> flux = Flux.just("A", "B", "C");

StepVerifier.create(flux)
    .expectNext("A")
    .expectNext("B")
    .expectNext("C")
    .verifyComplete(); // ストリームが正常に終了することを検証

これにより、Fluxの内容をテストしつつ、スレッドをブロックすることなく検証できます。

タグ: Spring Boot Reactor Flux Mono WebFlux

5月20日 09:33 投稿