Hystrixの基本コンセプト
クラスレベルでのカスタム設定
- 柔軟なフォールバック戦略の設定:
- セマフォ/スレッド / タイムアウト(1秒)、サーキットブレーカー(エラーレート)
- HystrixCommandProperty
- フォールバック境界の特定方法:
- @HystrixCommand(Spring AOP)
- HystrixCommand 抽象クラス
- データ収集:
- サーキットブレーカーのトリガー条件(10秒 / 20リクエスト / エラーレート)-> データ収集と統計方法
- SEMAPHORE、最大同時実行数 -> AQS -> tryAcquire(), acquire()
- 動作への介入: フォールバック/サーキットブレーカー発動後、通常のビジネスへの影響
- 結果への介入: fallback()
- 自動復旧(サーキットブレーカー状態では、5秒ごとに復旧を試行)
Hystrixのサーキットブレーカー原理とリクエストプロキシの仕組み
Hystrixのデータ統計にはスライディングウィンドウが採用されている
スライディングウィンドウ: トラフィック制御技術
リクエストのプロキシAOP
RxJava
- Observable(観測可能なオブジェクト)
- Observer(観測者)
- Subscribe(購読)
Observable内のcallメソッドが実行される。
- キャッシュがある場合: toCache.toObservable(); -> toBlocking() -> (Observable().call())
Hystrixサーキットブレーカーのソースコード解析
Hystrixの@HystrixCommandアノテーションは、HystrixCommandAspectというアスペクトを通じて処理されます。
@Aroundアノテーションで宣言されたメソッドに注目します。これはリクエストマージングとフォールバックアノテーションのプロキシを担当しています。ここでは、HystrixCommandアノテーションに焦点を当てて詳細に分析します。
- getMethodFromTarget ターゲットメソッド情報の取得
- MetaHolder metaHolder = metaHolderFactory.create(joinPoint); メタデータの取得(呼び出しメソッド、HystrixPropertyアノテーションデータ、メソッドパラメータなど)
- HystrixCommandFactory.getInstance().create 呼び出し元の取得。このオブジェクトはコマンドオブジェクトを保持し、適切なタイミングでこのコマンドオブジェクトを通じて具体的なビジネスロジックを実行します
- execute、コマンドの実行
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
//...
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//非同期の場合はGenericObservableCommandを作成、それ以外はGenericCommandを作成
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
//レスポンシブかどうか(これらは同期なのでこのロジックが実行される)
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
CommandExecutor.execute
このメソッドは主にコマンドの実行に使用されます。コードからわかるように、ここには3つの実行タイプがあります。同期、非同期、およびレスポンシブです。レスポンシブはさらにCold Observable(observable.toObservable())とHot Observable(observable.observe())に分かれます。 デフォルトのexecutionType=SYNCHRONOUS(同期リクエスト)です。
- execute(): 同期実行、単一のオブジェクト結果を返し、エラーが発生した場合は例外をスローします。
- queue(): 非同期実行、Futureオブジェクトを返し、実行終了後に返される単一の結果を含みます。
- observe(): このメソッドは操作の複数の結果を表すObservableオブジェクトを返しますが、これはすでに購読者によって消費されています。
- toObservable(): このメソッドは操作の複数の結果を表すObservableオブジェクトを返しますが、手動で購読して消費する必要があります。
注意点として、HystrixはRxJavaフレームワークを使用しています。これはAndroidでよく使用されるリアクティブプログラミングフレームワークです。
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
// 同期
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
// 非同期
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
// レスポンシブ
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("サポートされていない実行タイプ: " + executionType);
}
}
HystrixCommand.execute()
次にHystrixCommand.execute()メソッドが呼び出され、このメソッドではまずqueue()を呼び出し、Futureオブジェクトを返します。
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
queueメソッドでは、Futureオブジェクトが返されます。このFutureオブジェクトの実装はfで、fは匿名内部クラスであり、Java.util.concurrentで定義された非同期戻り値付きオブジェクトです。queue().get()メソッドが呼び出されると、最終的にdelegate.getメソッドに委譲されます。
public Future<R> queue() {
/*
* Observable.toBlocking().toFuture()によって返されるFutureは、Future.cancel(boolean)の"mayInterrupt"フラグがtrueに設定された場合に
* 実行スレッドの割り込みを実行しないため、Futureの契約に従うために、それをラップする必要があります。
*/
// デリゲートオブジェクトを作成
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
/*
* ここでの有効な遷移はfalseからtrueのみです。f1とf2という2つのFutureがこのコマンドによって作成された場合(これは非常に奇妙ですが、
* これまで禁止されていません)、そしてf1.cancel(true)とf2.cancel(false)が異なるスレッドから呼び出された場合、
* interruptOnCancelがチェックされた時点でどの値が使用されるかは不明です。このシナリオを処理する最も一貫性のある方法は、
* *任意*のキャンセルが割り込み付きで呼び出された場合、その割り込み要求を取り消せないと考慮することです。
*/
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}
return res;
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
};
/* 立ち上がりエラー状態の特別な処理 */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// これらのタイプはqueue()ではなくqueue().get()からスローされます。実行エラーだからです
return f;
default:
// これらのタイプは拒否エラータイプなのでqueue()からスローされます
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}
return f;
}
toObservable()
RxJavaでは、いくつかの役割に分かれています
- Observable(観測可能なオブジェクト):主にイベントを生成する役割を担います
- Observer(観測者):イベントを受信し、対応する役割を担います
- Subscribe(購読):観測可能なオブジェクトと観測者を接続する役割を担います
- Event(イベント):観測可能なオブジェクト、観測者、コミュニケーションの媒体 queueでは、toObservable()メソッドを呼び出して観測可能なオブジェクトを作成します
AbstractCommand.toObservable
Observableを定義して観測可能なオブジェクトを作成し、この観測可能なオブジェクトはtoObservable().toBlocking().toFuture()によって処理されます。実際には、run()抽象メソッドの実行結果を取得できるFutureを返します。run()メソッドはサブクラスで実装され、通常のビジネスロジックを実行します。以下のコードでは、subscriberが存在する場合、Func0#call()メソッドが呼び出され、このsubscriberはtoBlocking()で購読されます。
- isRequestCachingEnabled()を呼び出してリクエスト結果キャッシュ機能が有効かどうかを判断し、有効でキャッシュにヒットした場合はObservable形式でキャッシュされた結果を返します
- 実行コマンドのObservableを作成:hystrixObservable
- キャッシュが有効でキャッシュにヒットしない場合、実行コマンドを購読したObservableを作成します:HystrixCommandResponseFromCache
- キャッシュに保存するObservableを作成:HystrixCachedObservable
- toCacheをキャッシュに追加し、キャッシュから取得するObservableを返します:fromCache
- 追加に失敗した場合: fromCache!=nullの場合、toCache.unsubscribe()メソッドを呼び出してHystrixCachedObservableの購読をキャンセルします
- 追加に成功した場合、toCache.toObservable()を呼び出してキャッシュObservableを取得します
- キャッシュ機能が有効でない場合、実行コマンドのObservableを返します
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
/* これはステートフルなオブジェクトなので、一度しか使用できません */
// CASでコマンドが一度だけ実行されることを保証
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("このインスタンスは一度だけ実行できます。新しいインスタンスをインスタンス化してください。");
//TODO このエラーの新しいタイプを作成
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " コマンドが複数回実行されました - これは許可されていません。", ex, null);
}
// コマンド開始タイムスタンプ
commandStartTimestamp = System.currentTimeMillis();
// ログ出力
if (properties.requestLogEnabled().get()) {
// 何が起こったかに関係なく、このコマンド実行をログに記録
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
// キャッシュのオン/オフ、キャッシュキー(これはHystrixのリクエストキャッシュ機能です。Hystrixはリクエスト結果をキャッシュし、同じキーを持つ次のリクエストはキャッシュから結果を取得してリクエストオーバーヘッドを削減します)
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* まずキャッシュから試みる */
// キャッシュメカニズムが有効な場合、キャッシュから結果を取得
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 実行コマンドのObservableを宣言
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// キャッシュに保存
// 実行コマンドのObservableを宣言
if (requestCacheEnabled && cacheKey != null) {
// キャッシュ用にラップ
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// 別のスレッドが先に勝ったので、代わりにキャッシュされた値を使用
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// さっきObservableCommandを作成したので、キャストして返す
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // クリーンアップを一度実行(通常の終了状態(この行)または購読解除(次の行))
.doOnUnsubscribe(unsubscribeCommandCleanup) // クリーンアップを一度実行
.doOnCompleted(fireOnCompletedHook);
}
});
実行コマンドのObservableの定義は以下の通りで、deferを使用してapplyHystrixSemanticsイベントを定義しています。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// commandStateがUNSUBSCRIBEDの場合、コマンドを実行しない
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
// 実行コマンドのObservableを返す
return applyHystrixSemantics(_cmd);
}
};
applyHystrixSemantics
キャッシュ機能が無効またはキャッシュにヒットしない場合、コードはapplyHystrixSemanticsを実行します。
- 渡された_cmdはGenericCommandであり、最終的にこのコマンドのrunメソッドを実行し、本質的にqueryOrderメソッドのプロキシを完了します。
- circuitBreaker.allowRequest()がtrueの場合、現在サーキットブレーカー状態ではないことを示し、正常に実行されます。それ以外の場合、handleShortCircuitViaFallbackを呼び出してサービスフォールバックを実現します。fallbackメソッドを設定している場合、設定されたfallback実行が取得されます。
実行パス: handleShortCircuitViaFallback -> getFallbackOrThrowException -> getFallbackObservable -> HystrixCommand.getFallbackObservable -> GenericCommand.getFallback();
- 現在hystrixがサーキットブレーカー状態でない場合、
- getExecutionSemaphore が現在の戦略がセマフォであるかどうかを判断(TryableSemaphoreNoOp/TryableSemaphoreActual)。如果是,则调用 tryAcquire 来获取信号量。如果当前信号量满了,则调用 handleSemaphoreRejectionViaFallback 方法。
- executeCommandAndObserve を呼び出してコマンド実行Observableを取得します。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// 実行開始をExecutionHookでマーク
// このフックが例外をスローした場合、フォールバックなしでフェイルファイルが発生します。状態は一貫性を保ちません
executionHook.onStart(_cmd);
/* 実行が許可されているかどうかを判断 */
if (circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* userThreadExecutionTimeを追跡するために使用 */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}