Temporal Python SDKを使用したサービス監視:監視ツールの統合

監視システムの概要

Temporal Python SDKは、完全な監視サポートを提供しており、主に以下の2つのモジュールを通じて実現されています:**OpenTelemetryの統合**と**メトリクスの収集**。前者は分散トレーシングのために使用され、後者は重要な指標の収集に使用されます。

主要な監視コンポーネント

  • 分散トレーシング:OpenTelemetryインターセプタを使用してワークフロー、アクティビティ、シグナルなどの操作の全チャネル追跡を実現
  • パフォーマンス指標:Metricモジュールを使用してワークフローの実行時間、アクティビティの成功率、キューの長さなどの重要な指標を収集
  • カスタム監視:ユーザーが監視の次元を拡張し、ビジネスに関連する指標を追加できる

OpenTelemetryの分散トレーシング統合

OpenTelemetryはオープンソースの観測性フレームワークで、分散トレーシング、指標収集、ログ記録の統一ソリューションを提供しています。Temporal Python SDKはインターセプターパターンを使用してOpenTelemetryとのシームレスな統合を実現しています。

快速开始

OpenTelemetryのトレーシングを有効にするには、TracingInterceptorを作成し、それをクライアントまたはワーカーの設定に追加します:

from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.client import Client
from temporalio.worker import Worker

# トレーシングインターセプター付きのクライアントを作成
client = await Client.connect(
    "localhost:7233",
    interceptors=[TracingInterceptor()],
)

# トレーシングインターセプター付きのワーカーを作成
worker = Worker(
    client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],
    interceptors=[TracingInterceptor()],
)

トレースコンテキストの伝播

TracingInterceptorは自動的にトレースコンテキストの伝播を処理し、サービス間の呼び出しでのトレースの連続性を確保します。主な実装は`_context_to_headers`と`_context_from_headers`メソッドにあります。

def _context_to_headers(
    self, headers: Mapping[str, temporalio.api.common.v1.Payload]
) -> Mapping[str, temporalio.api.common.v1.Payload]:
    carrier: _CarrierDict = {}
    self.text_map_propagator.inject(carrier)
    if carrier:
        headers = {
            **headers,
            self.header_key: self.payload_converter.to_payloads([carrier])[0],
        }
    return headers

トレースデータタイプ

統合後、以下のような操作タイプが自動的にトレースされ、対応するSpanが生成されます:

  • ワークフロー操作:開始、シグナル、クエリ、更新
  • アクティビティ実行:スケジューリング、実行、完了
  • シグナル処理:受信、処理

Metricメトリクスの収集

Metricモジュールは、ワークフローとアクティビティの実行状況を量化的に評価するための主要な指標収集機能を提供します。このモジュールは現在不安定ですが、重要な指標の収集には既に使用可能です。

主なメトリクスタイプ

Metricモジュールは、異なる監視ニーズに対応するため、以下の種類のメトリクスをサポートしています:

  • カウンター(Counter):累積値(例:ワークフローの実行回数)
  • ヒストグラム(Histogram):分布統計(例:ワークフローの実行時間)
  • ゲージ(Gauge):瞬時値(例:並行ワークフローの数)

具体的な実装については、Metricモジュールのソースコードを参照してください。以下はカウンターを使用する例です:

from temporalio.bridge.metric import MetricMeter
from temporalio.bridge.runtime import Runtime

# ランタイムとメトリックメーターを作成
runtime = Runtime()
meter = MetricMeter.create(runtime)

# カウンターを作成
counter = meter.create_counter(
    "workflow_executions_total",
    "Total number of workflow executions",
    "count"
)

# 指標を記録
counter.add(1, meter.default_attributes)

主要なメトリクス定義

メトリクス名 タイプ 説明
workflow_executions_total Counter ワークフローの実行総数
workflow_execution_duration Histogram ワークフローの実行持続時間
activity_executions_total Counter アクティビティの実行総数
activity_execution_duration Histogram アクティビティの実行持続時間
task_queue_length Gauge タスクキューの長さ

監視ツールの統合実践

Prometheusの統合

メトリクスをPrometheusにエクスポートするには、opentelemetry-exporter-prometheusを使用します:

from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider

# Prometheusエクスポータを設定
reader = PrometheusMetricReader(prefix="temporal_")
provider = MeterProvider(metric_readers=[reader])
set_meter_provider(provider)

# Temporalクライアントとワーカーを作成(上記と同じ)

起動後、`http://localhost:8000`からPrometheusのメトリクスエンドポイントにアクセスできます。

Grafanaの可視化

Grafanaを使用すると、重要なメトリクスのトレンドを示す豊富な監視ダッシュボードを作成できます。以下は、ワークフローの実行遅延を表示する簡単なGrafanaクエリの例です:

histogram_quantile(0.95, sum(rate(temporal_workflow_execution_duration_bucket[5m])) by (le))

このクエリは、95パーセンタイルのワークフロー実行遅延を表示し、パフォーマンス問題の特定に役立ちます。

アラート構成

Prometheus AlertManagerを使用してアラートルールを構成し、異常状況を迅速に検出できます:

groups:
- name: temporal_alerts
  rules:
  - alert: WorkflowErrorRateHigh
    expr: sum(rate(temporal_workflow_failures_total[5m])) / sum(rate(temporal_workflow_executions_total[5m])) > 0.05
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "High workflow error rate"
      description: "Error rate is above 5% for 2 minutes (current value: {{ $value }})"

高度な監視設定

カスタムトレース属性

TracingInterceptorを拡張することで、カスタムトレース属性を追加し、トレースデータをより豊かにすることができます:

class CustomTracingInterceptor(TracingInterceptor):
    def _completed_span(self, *args, **kwargs):
        span = super()._completed_span(*args, **kwargs)
        # カスタム属性を追加
        span.set_attribute("custom_attr", "value")
        return span

メトリクスの集約とフィルタリング

Metricモジュールでは、属性を使用してメトリクスを集約およびフィルタリングし、多面的な分析を行うことができます:

# 属性付きのメトリクスを作成
attrs = meter.default_attributes.with_additional_attributes({
    "workflow_type": "MyWorkflow",
    "task_queue": "my-task-queue"
})
counter.add(1, attrs)

監視データのサンプリング

監視データ量が大きくなりすぎないように、サンプリング戦略を構成できます:

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased

# 10%のサンプリング率を設定
provider = TracerProvider(sampler=TraceIdRatioBased(0.1))
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
set_tracer_provider(provider)

ベストプラクティスと注意点

パフォーマンスへの影響

監視自体が一定のパフォーマンスオーバーヘッドをもたらすため、以下の点に注意してください:

  • 本番環境では適切なサンプリングレートを使用する
  • 高頻度のアクティビティに多くのカスタム属性を追加しない
  • メトリクス収集をバッチ処理する

バージョン互換性

Metricモジュールは現在不安定であり、APIの変更がある可能性があります。

"""Metrics using SDK Core. (unstable)

Nothing in this module should be considered stable. The API may change.
"""

使用する際はバージョンの変更に注意し、APIの変更による問題を避けることが重要です。

トラブルシューティング

監視システムに問題が発生した場合、以下の手順でトラブルシューティングを行ってください:

  1. OpenTelemetry exporterの構成を確認する
  2. すべてのクライアントとワーカーにTracingInterceptorが正しく追加されていることを確認する
  3. MetricMeterが正常に作成されていることを確認する
  4. ネットワーク接続と権限設定を確認する

関連するトラブルシューティングツールとしては、テストディレクトリ内のopentelemetryテストコードを参照してください。

タグ: Temporal OpenTelemetry Python Metrics monitoring

6月14日 21:50 投稿