キャッシュの多層化アーキテクチャ
Eureka Serverは、サービスレジストリ情報を保持するために「メモリ内レジストリ」「読み書きキャッシュ」「読み取り専用キャッシュ」という3層構造を採用しています。デフォルト設定では、30秒間隔で読み書きキャッシュから読み取り専用キャッシュへのデータ同期が実行され、60秒周期で90秒以上応答のないインスタンスの削除処理が行われます。クライアントは30秒ごとに読み取り専用キャッシュから最新情報を取得し、新規登録時はメモリ内レジストリへ直接書き込まれます。
多層キャッシュの設計思想
多層キャッシュを導入する主な目的は、高頻度な登録・更新時のロック競合を回避し、スループットを維持することにあります。APモデルを基本設計としているEurekaでは、強い整合性よりも可用性と最終整合性を優先します。そのため、書き込みはメモリレジストリへ即時反映し、その後キャッシュを無効化します。読み取り時は読み取り専用キャッシュを優先し、ミスの場合は読み書きキャッシュ、さらにメモリレジストリとフェッチ順で検索を行います。
関連する設定プロパティとしては、同期間隔を制御する responseCacheUpdateIntervalMs(デフォルト30秒)と、読み書きキャッシュの有効期限を定義する responseCacheAutoExpirationInSeconds(デフォルト180秒)が用意されています。
登録時のキャッシュ無効化処理
サービス登録メソッドの完了時点では、キャッシュの一貫性を保つために読み書きキャッシュの関連エントリを破棄する処理が呼び出されます。以下の実装では、キー単位での無効化と、リージョン別の派生キーに対する一括削除を行っています。
public void flushCachedEntries(CacheKey... targetKeys) {
for (CacheKey key : targetKeys) {
LOGGER.debug("Flushing cache entry for app: {}, type: {}", key.getAppName(), key.getType());
rwCacheStore.evict(key);
Set<CacheKey> regionalVariants = regionKeyIndex.get(key);
if (regionalVariants != null && !regionalVariants.isEmpty()) {
regionalVariants.forEach(variant -> {
LOGGER.debug("Flushing regional variant: {}", variant);
rwCacheStore.evict(variant);
});
}
}
}
定期同期タスクの実装
キャッシュ管理クラスの初期化処理では、バックグラウンドで定期同期を実行するスケジューラが起動されます。このタスクは読み書きキャッシュの内容を読み取り専用キャッシュへ反映し、差分がある場合のみ更新を行います。
private ScheduledTask buildSyncRunnable() {
return new ScheduledTask() {
@Override
public void execute() {
LOGGER.debug("Synchronizing RO cache from RW cache");
rwCacheStore.entrySet().stream().forEach(entry -> {
CacheKey currentKey = entry.getKey();
try {
CurrentRequestContext.setVersion(currentKey.getVersion());
CacheValue rwValue = rwCacheStore.get(currentKey);
CacheValue roValue = roCacheStore.get(currentKey);
if (!Objects.equals(rwValue, roValue)) {
roCacheStore.put(currentKey, rwValue);
}
} catch (Exception ex) {
LOGGER.error("Sync failed for key {}", currentKey, ex);
} finally {
CurrentRequestContext.clearVersion();
}
});
}
};
}
インスタンスのハートビートと状態更新フロー
クライアントアプリケーションは初期化処理にて、サーバーへの死活監視タスクをスケジューリングします。このタスクは指定された間隔で実行され、スレッドプール上で定期通信を行います。
ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
Runnable renewalWorker = new PeriodicRenewalWorker();
long checkCycleSec = config.getRenewalInterval();
heartbeatScheduler.scheduleAtFixedRate(
renewalWorker,
checkCycleSec,
checkCycleSec,
TimeUnit.SECONDS
);
定期実行ワーカーの実装では、サーバー側へ更新要求を送信し、正常に応答があった場合は最終通信時刻をシステムクロックで上書きします。
サーバーサイドのステータス反映
受信したリクエストはRESTエンドポイント経由でルーティングされ、インスタンスリソースの処理メソッドへ渡されます。ここではクライアントからのステータス変更要求を処理するロジックに焦点を当てています。
@PUT
@Path("leaseStatus")
public Response processStatusChange(
@QueryParam("newState") String updatedState,
@HeaderParam("x-replication") String replicationFlag,
@QueryParam("syncTime") String dirtyTime) {
try {
InstanceRegistryEntry existingEntry = registry.lookup(appIdentifier, instanceId);
if (existingEntry == null) {
LOGGER.warn("Target instance missing: {}", instanceId);
return Response.status(Status.NOT_FOUND).build();
}
boolean completed = registry.updateLeaseStatus(
appIdentifier,
instanceId,
InstanceStatus.fromLabel(updatedState),
dirtyTime,
"1".equals(replicationFlag)
);
if (completed) {
LOGGER.info("Lease updated: {}/{} -> {}", appIdentifier, instanceId, updatedState);
return Response.ok().build();
}
return Response.serverError().build();
} catch (Exception err) {
LOGGER.error("Update failed", err);
return Response.serverError().build();
}
}
登録情報の更新とリース維持
内部処理では、対象アプリのインスタンスマップを排他ロックして取得し、対応するリースオブジェクトを探します。リースが存在する場合、最終更新時刻をリフレッシュし、ステータス変更があれば各種管理マップとキューを更新します。
public boolean updateLeaseStatus(String appName, String instanceId,
InstanceStatus targetStatus, String syncTime,
boolean isReplica) {
try {
readLock.lock();
updateMetrics.increment(isReplica);
Map appRecords = localRegistry.get(appName);
LeaseRecord activeLease = (appRecords != null) ? appRecords.get(instanceId) : null;
if (activeLease == null) {
return false;
}
activeLease.refreshTimestamp();
RegisteredInstance holder = activeLease.getPayload();
if (holder != null && !holder.getCurrentState().equals(targetStatus)) {
if (InstanceStatus.UP.equals(targetStatus)) {
activeLease.markServiceActive();
}
statusOverrideMap.put(instanceId, targetStatus);
holder.setOverriddenState(targetStatus);
holder.setState(targetStatus, false);
long replicaSyncTime = Long.parseLong(syncTime != null ? syncTime : "0");
if (replicaSyncTime > holder.getLastModifiedTime()) {
holder.setLastModifiedTime(replicaSyncTime);
}
holder.setChangeType(ActionType.UPDATE);
changeHistory.add(new ChangeRecord(activeLease));
holder.setLastUpdateInstant();
flushCachedEntries(appName, holder.getVip(), holder.getSecureVip());
}
return true;
} finally {
readLock.unlock();
}
}