RxJava を使用する際、データストリームの処理をより柔軟かつ効果的に行うために、様々なユーティリティオペレーターが提供されています。本記事では、データストリームのタイミング制御、デバッグ、リソース管理に役立つ主要なオペレーターについて解説します。
RxJava のオペレーターは、Observable (観測可能なシーケンス) から新しい Observable を生成することで、データフローを変換・操作します。ユーティリティオペレーターは、これらのデータフローの振る舞いを補助・調整する役割を担います。
遅延実行オペレーター (Delay / DelaySubscription)
delay オペレーターは、Observable が生成する各アイテムの送信を、指定された時間だけ遅延させます。一方、delaySubscription は、Observable へのサブスクライブ自体を指定された時間だけ遅延させます。
// delay の例:各アイテムの送信を2秒遅延させる
Observable.interval(1, TimeUnit.SECONDS) // 1秒ごとに数値を発行
.take(5) // 最初の5つだけ取得
.delay(2, TimeUnit.SECONDS) // 送信を2秒遅延
.subscribe(value -> System.out.println("Delayed item: " + value),
error -> System.err.println("Error: " + error),
() -> System.out.println("Delay complete"));
/*
出力例 (初回発行から約3秒後に開始):
Delayed item: 0
Delayed item: 1
Delayed item: 2
Delayed item: 3
Delayed item: 4
Delay complete
*/
// delaySubscription の例:サブスクライブから2秒後に発行開始
Observable.interval(1, TimeUnit.SECONDS) // 1秒ごとに数値を発行
.take(5) // 最初の5つだけ取得
.delaySubscription(2, TimeUnit.SECONDS) // サブスクライブを2秒遅延
.subscribe(value -> System.out.println("Delayed subscription item: " + value),
error -> System.err.println("Error: " + error),
() -> System.out.println("DelaySubscription complete"));
/*
出力例 (初回発行から約2秒後に開始):
Delayed subscription item: 0
Delayed subscription item: 1
Delayed subscription item: 2
Delayed subscription item: 3
Delayed subscription item: 4
DelaySubscription complete
*/
// delay with a function (RxJava specific)
Observable.interval(100, TimeUnit.MILLISECONDS)
.delay(i -> Observable.timer(i * 100, TimeUnit.MILLISECONDS)) // 各アイテムの遅延時間を動的に設定
.timeInterval() // アイテム間の時間を計測
.take(5)
.subscribe(
timedValue -> System.out.println("OnNext: " + timedValue),
error -> System.err.println("Error: " + error),
() -> System.out.println("Complete")
);
/*
出力例:
OnNext: Timed[time=105, unit=MILLISECONDS, value=0]
OnNext: Timed[time=196, unit=MILLISECONDS, value=1]
OnNext: Timed[time=201, unit=MILLISECONDS, value=2]
OnNext: Timed[time=207, unit=MILLISECONDS, value=3]
OnNext: Timed[time=196, unit=MILLISECONDS, value=4]
Complete
*/
サイドエフェクトオペレーター (Do / Finally)
doOnEach (または doOnNext, doOnError, doOnComplete) オペレーターは、Observable のライフサイクル内の特定のポイント (アイテム発行前、エラー発生時、完了時など) で追加のアクション (サイドエフェクト) を実行するために使用されます。これはデバッグに非常に役立ちます。
doOnDispose は、サブスクリプションが破棄されたときに実行されるアクションを登録します。
doFinally オペレーターは、Observable が完了またはエラーで終了した後、あるいはサブスクリプションが破棄されたときに、必ず実行されるアクションを登録します。これはリソースのクリーンアップに便利です。
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subjects.PublishSubject;
// doOnEach, doOnSubscribe, doOnDispose, doFinally の例
PublishSubject<String> subject = PublishSubject.create();
Disposable disposable = subject
.doOnSubscribe(d -> System.out.println("Subscriber subscribed")) // サブスクライブ時
.doOnNext(item -> System.out.println("Logging OnNext: " + item)) // アイテム発行時
.doOnError(error -> System.err.println("Logging OnError: " + error.getMessage())) // エラー時
.doOnComplete(() -> System.out.println("Logging OnCompleted")) // 完了時
.doOnDispose(() -> System.out.println("Subscriber disposed")) // 解除時
.doFinally(() -> System.out.println("Finally action executed")) // 最終的に実行
.subscribe(
item -> System.out.println("Processing: " + item),
error -> System.err.println("Error received: " + error.getMessage()),
() -> System.out.println("Subscription completed normally")
);
subject.onNext("Hello");
subject.onNext("World");
// subject.onError(new RuntimeException("Something went wrong"));
subject.onComplete();
disposable.dispose(); // 明示的に解除
/*
出力例 (onError がコメントアウトされている場合):
Subscriber subscribed
Logging OnNext: Hello
Processing: Hello
Logging OnNext: World
Processing: World
Logging OnCompleted
Subscription completed normally
Finally action executed
Subscriber disposed
*/
通知の変換 (Materialize / Dematerialize)
materialize オペレーターは、Observable が発行する通常のデータ (onNext) に加え、完了通知 (onComplete) やエラー通知 (onError) も含めて、すべてを Notification オブジェクトに変換します。これにより、データフロー内で発生するすべてのイベントを統一的に扱うことができます。
dematerialize オペレーターは、materialize によって変換された Notification オブジェクトを元のイベント (onNext, onError, onComplete) に戻します。
import io.reactivex.rxjava3.core.Notification;
import io.reactivex.rxjava3.core.Observable;
// materialize と dematerialize の例
Observable<String> source = Observable.just("Data1", "Data2");
source.materialize()
.subscribe(notification -> System.out.println("Materialized: " + notification));
Observable.range(1, 3)
.concatWith(Observable.error(new Exception("Test Error"))) // エラーを発生させる
.materialize()
.subscribe(
notification -> System.out.println("Materialized with error: " + notification),
error -> System.err.println("Error during materialize processing: " + error.getMessage()) // materialize 自体のエラー
);
// dematerialize の例
Observable.just(
Notification.createNext("Success"),
Notification.createError(new Exception("Demo Error")),
Notification.createNext("Another Success") // この通知はエラー後に到達しない
)
.dematerialize()
.subscribe(
value -> System.out.println("Dematerialized next: " + value),
error -> System.err.println("Dematerialized error: " + error.getMessage()),
() -> System.out.println("Dematerialized complete")
);
/*
出力例:
Materialized: OnNextNotification[Data1]
Materialized: OnNextNotification[Data2]
Materialized: OnCompleteNotification
Materialized with error: OnNextNotification[1]
Materialized with error: OnNextNotification[2]
Materialized with error: OnNextNotification[3]
Materialized with error: OnErrorNotification[java.lang.Exception: Test Error]
Dematerialized next: Success
Dematerialized error: Demo Error
*/
時間計測オペレーター (TimeInterval / Timestamp)
timeInterval オペレーターは、Observable が発行する各アイテムの、直前のアイテムからの経過時間 (long 値) を Timed<T> オブジェクトとして通知します。これは、処理時間の計測や、イベント間の遅延分析に役立ちます。
timestamp オペレーターは、各アイテムに発行時のタイムスタンプ (long 値) を付与した Timed<T> オブジェクトを通知します。これは、イベントの発生時刻を記録するのに適しています。
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.schedulers.TestScheduler;
// timeInterval の例
Observable.interval(100, TimeUnit.MILLISECONDS) // 100ms ごとに発行
.take(3)
.timeInterval() // アイテム間の時間を計測
.subscribe(
timedValue -> System.out.println("TimeInterval: " + timedValue),
error -> System.err.println("Error: " + error),
() -> System.out.println("TimeInterval complete")
);
// timestamp の例 (TestScheduler を使用して時間を制御)
TestScheduler testScheduler = new TestScheduler();
Observable.interval(1, TimeUnit.SECONDS, testScheduler) // TestScheduler を使用
.take(3)
.timestamp(testScheduler) // TestScheduler の時間を使用
.subscribe(
timedValue -> System.out.println("Timestamp: " + timedValue),
error -> System.err.println("Error: " + error),
() -> System.out.println("Timestamp complete")
);
testScheduler.advanceTimeBy(5, TimeUnit.SECONDS); // 時間を進める
/*
出力例 (timeInterval):
TimeInterval: Timed[time=105, unit=MILLISECONDS, value=0]
TimeInterval: Timed[time=100, unit=MILLISECONDS, value=1]
TimeInterval: Timed[time=96, unit=MILLISECONDS, value=2]
TimeInterval complete
出力例 (timestamp):
Timestamp: Timed[time=1000, unit=MILLISECONDS, value=0]
Timestamp: Timed[time=2000, unit=MILLISECONDS, value=1]
Timestamp: Timed[time=3000, unit=MILLISECONDS, value=2]
Timestamp complete
*/
タイムアウトオペレーター (Timeout)
timeout オペレーターは、Observable が指定された時間内にアイテムを発行しない場合に、TimeoutException を発生させます。これにより、応答しないストリームからの無限待機を防ぐことができます。代替の Observable を指定することで、タイムアウト時にエラーの代わりに別のシーケンスを発行することも可能です。
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
// timeout の例
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3), // 最初の3つのアイテムは速やかに発行
Observable.timer(1, TimeUnit.SECONDS), // 次に1秒待機
Observable.interval(100, TimeUnit.MILLISECONDS).take(3) // その後また速やかに発行
)
.timeout(500, TimeUnit.MILLISECONDS) // 500ms 以上アイテム発行がなければタイムアウト
.subscribe(
value -> System.out.println("OnNext: " + value),
error -> {
if (error instanceof TimeoutException) {
System.err.println("Timeout occurred!");
} else {
System.err.println("Error: " + error.getMessage());
}
},
() -> System.out.println("Completed")
);
// timeout with fallback observable
Observable.concat(
Observable.interval(100, TimeUnit.MILLISECONDS).take(3),
Observable.timer(1, TimeUnit.SECONDS), // ここでタイムアウトが発生する
Observable.interval(100, TimeUnit.MILLISECONDS).take(3)
)
.timeout(
500, TimeUnit.MILLISECONDS, // タイムアウト時間
Observable.just(-1L) // タイムアウト時に発行される代替アイテム
)
.subscribe(
value -> System.out.println("OnNext with fallback: " + value),
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("Fallback complete")
);
Thread.sleep(4000); // 非同期処理のため、実行完了を待つ
/*
出力例:
OnNext: 0
OnNext: 1
OnNext: 2
Timeout occurred!
OnNext with fallback: 0
OnNext with fallback: 1
OnNext with fallback: 2
OnNext with fallback: -1
Fallback complete
*/
リソース管理オペレーター (Using)
using オペレーターは、Observable のライフサイクルと同期してリソースを作成・管理するための強力なツールです。指定されたファクトリ関数でリソースを作成し、そのリソースを使用して新しい Observable を生成します。Observable が完了、エラー、または破棄されたときに、リソースの dispose メソッドが自動的に呼び出され、リソースが確実に解放されます。
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
// Using operator example
// リソースを表すクラス (IDisposable を実装)
class ManagedResource implements Disposable {
private final String name;
private final AtomicBoolean disposed = new AtomicBoolean(false);
public ManagedResource(String name) {
this.name = name;
System.out.println("Resource '" + name + "' created.");
}
public void use() {
if (disposed.get()) {
throw new IllegalStateException("Cannot use disposed resource.");
}
System.out.println("Using resource '" + name + "'...");
}
@Override
public void dispose() {
if (disposed.compareAndSet(false, true)) {
System.out.println("Resource '" + name + "' disposed.");
}
}
@Override
public boolean isDisposed() {
return disposed.get();
}
}
// Observable.using を使用した例
Observable<Long> usageStream = Observable.using(
() -> new ManagedResource("MyResource"), // リソース作成ファクトリ
resource -> { // リソースを使用して Observable を生成
ManagedResource res = (ManagedResource) resource;
res.use();
return Observable.interval(500, TimeUnit.MILLISECONDS) // 0.5秒ごとに発行
.take(4) // 4回発行
.doOnNext(res::use); // 各アイテム発行時にリソースを使用
},
resource -> resource.dispose() // リソース解放処理
);
System.out.println("Starting subscription...");
Disposable subscription = usageStream.subscribe(
value -> System.out.println("Received: " + value),
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("Usage stream completed.")
);
try {
// 非同期処理のため、完了を待つ
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// subscription.dispose(); // 必要に応じて手動で解除
/*
出力例:
Starting subscription...
Resource 'MyResource' created.
Using resource 'MyResource'...
Received: 0
Using resource 'MyResource'...
Received: 1
Using resource 'MyResource'...
Received: 2
Using resource 'MyResource'...
Received: 3
Using resource 'MyResource'...
Usage stream completed.
Resource 'MyResource' disposed.
*/