ZooKeeper、KafkaクラスタとFilebeat+Kafka+ELKアーキテクチャの実装ガイド

一、ZooKeeper

1.1 概要

ZooKeeperは、分散アプリケーションの管理フレームワークです。オープンソースの分散型サービスで、分散環境における一貫性の問題を解決するためのApacheプロジェクトです。

1.2 ZooKeeperの役割

主な役割として、分散アプリケーションクラスタにおける一貫性の問題解決があります。ファイルシステムとして、各種分散アプリケーションの登録とメタ情報の保存管理を行います。通知機構として、ノードまたはサービスの状態に問題が発生した場合にクライアントに通知します。

1.3 ZooKeeperの特徴

  1. リーダー(Leader)と複数のフォロワー(Follower)で構成されるクラスタ
  2. クラスタ内の半数以上のノードが稼働していればサービスを継続できるため、奇数台のサーバーが推奨されます
  3. グローバルデータの一貫性:各サーバーは同一データのコピーを保持し、クライアントがどのサーバーに接続してもデータは一致します
  4. 更新リクエストは順次実行され、同一クライアントからの更新リクエストは送信順に処理されます(FIFO)
  5. データ更新の原子性:データ更新は成功か失敗のいずれかです
  6. リアルタイム性:一定時間内にクライアントは最新データを読み取ることができます

1.4 ZooKeeperのデータ構造

ZooKeeperのデータモデルはLinuxファイルシステムに類似しており、全体として木構造と見なすことができます。各ノードはZNodeと呼ばれます。各ZNodeはデフォルトで1MBのデータを保存でき、パスによって一意に識別されます。

1.5 ZooKeeperの適用シナリオ

  1. 統一命名サービス:分散環境でアプリケーション/サービスの一貫した命名を実現
  2. 統一設定管理:設定情報をZNodeに保存し、クライアントが監視することで変更を検知
  3. 統一クラスタ管理:ノードの状態変化を監視し、情報をZNodeに保存
  4. サーバーの動的上下線:クライアントがサーバーのオンライン/オフライン状態をリアルタイムで把握
  5. ソフトウェアロードバランシング:各サーバーのアクセス数を記録し、アクセス数の少ないサーバーにリクエストを割り当て

1.6 ZooKeeperの選挙メカニズム

初回起動時

サーバーノードの`myid`を比較し、最も大きい`myid`を持つノードが他のノードからの投票を獲得します。投票数がサーバーノード数の半数を超えた場合に`leader`に選出され、他のノードは`follower`となります。後から`myid`が大きいノードがクラスタに参加しても、選挙結果は影響を受けません。

2回目以降の起動時

  • 非leaderノードが故障した場合:新しいノードをフォロワーとして追加し、既存のleaderと接続してデータを同期
  • leaderノードが故障した場合:新しいleaderを選択
    1. 各ノードの`Epoch`(選挙参加回数)を比較し、最も大きいノードがleaderに選出
    2. 同じEpochの場合、`zxid`(書き込み操作のトランザクションID)を比較し、最も大きいノードがleaderに選出
    3. zxidも同じ場合、`sid`(myidと同等)を比較し、最も大きいノードがleaderに選出

二、ZooKeeperクラスタのデプロイ

2.1 準備

以下の3台のサーバーでZooKeeperクラスタを構築します。


192.168.2.100
192.168.2.102
192.168.2.103

2.2 インストール手順

ステップ1:インストール前準備


# ファイアウォールの停止
systemctl stop firewalld
systemctl disable firewalld
setenforce 0

# JDKのインストール
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version

# インストールパッケージのダウンロード
cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz

ステップ2:ZooKeeperのインストール


# パッケージの解凍
cd /opt
tar -zxvf apache-zookeeper-3.6.4-bin.tar.gz
mv apache-zookeeper-3.6.4-bin /usr/local/zookeeper-3.6.4

# 設定ファイルの準備
cd /usr/local/zookeeper-3.6.4/conf/
cp zoo_sample.cfg zoo.cfg

# 設定ファイルの編集
vim zoo.cfg

設定ファイルの主なパラメータ:


tickTime=2000                    # 通信ハートビート時間(ミリ秒)
initLimit=10                     # LeaderとFollowerの初期接続時の最大ハートビート数
syncLimit=5                      # LeaderとFollower間の同期通信タイムアウト時間
dataDir=/usr/local/zookeeper-3.6.4/data  # データ保存ディレクトリ
dataLogDir=/usr/local/zookeeper-3.6.4/logs # ログ保存ディレクトリ
clientPort=2181                  # クライアント接続ポート

# クラスタ情報
server.1=192.168.2.100:3188:3288
server.2=192.168.2.102:3188:3288
server.3=192.168.2.103:3188:3288

ステップ3:ディレクトリとファイルの作成


# ディレクトリの作成
mkdir -p /usr/local/zookeeper-3.6.4/data
mkdir -p /usr/local/zookeeper-3.6.4/logs

# myidファイルの作成(各ノードで異なる値を設定)
echo 1 > /usr/local/zookeeper-3.6.4/data/myid
echo 2 > /usr/local/zookeeper-3.6.4/data/myid
echo 3 > /usr/local/zookeeper-3.6.4/data/myid

ステップ4:起動スクリプトの作成


vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig: 2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.6.4'

case $1 in
  start)
    echo "---------- zookeeper 起動 ------------"
    $ZK_HOME/bin/zkServer.sh start
  ;;
  stop)
    echo "---------- zookeeper 停止 ------------"
    $ZK_HOME/bin/zkServer.sh stop
  ;;
  restart)
    echo "---------- zookeeper 再起動 ------------"
    $ZK_HOME/bin/zkServer.sh restart
  ;;
  status)
    echo "---------- zookeeper 状態 ------------"
    $ZK_HOME/bin/zkServer.sh status
  ;;
  *)
    echo "Usage: $0 {start|stop|restart|status}"
esac

ステップ5:サービスの起動


# 実行権限の設定とサービス登録
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper

# サービスの起動
service zookeeper start

# 状態の確認
service zookeeper status

三、メッセージキュー

3.1 メッセージキューの必要性

高負荷環境下では、同期処理が追いつかずリクエストがブロックされることがあります。例えば、大量のリクエストがデータベースに同時にアクセスすると、行ロックやテーブルロックが発生し、リクエストスレッドが過剰に蓄積され、too many connectionエラーが発生して雪崩効果を引き起こします。メッセージキューを使用することで、非同期処理を実現しシステムの負荷を軽減できます。

3.2 中間件

中間件はアプリケーションの連携解除と非同期処理を実現します。Webアプリケーション型(プロキシサーバー)にはNginx、Haproxy、Tomcat PHPなどがあり、メッセージキュー型(MQ)にはActiveMQ、RabbitMQ、RocketMQ、Kafkaなどがあります。

3.3 メッセージキューの利点

  1. アプリケーションの連携解除:両方の処理プロセスを独立して拡張または修正でき、同じインターフェース制約を遵守する限り問題ありません
  2. 回復性:プロセス間の結合度を下げるため、メッセージ処理プロセスが停止しても、キューに追加されたメッセージはシステム復旧後に処理できます
  3. データバッファリング:データがシステムを通過する速度を制御・最適化し、メッセージ生成と消費の速度不一致を解決します
  4. 柔軟性とピーク処理能力:アクセス量が急増してもアプリケーションは機能し続けますが、このような突発的なトラフィックは稀です。ピークアクセスに対応するためのリソースを常に確保しておくのは非効率的です。メッセージキューにより、重要なコンポーネントは突発的な過負荷リクエストで完全に停止することなく対応できます
  5. 非同期通信:ユーザーがすぐにメッセージを処理する必要や望まない場合に、メッセージをキューに追加し、必要な時に処理できます

3.4 メッセージキューの2つのモード

3.4.1 ポイントツーポイントモード

対1の関係で、消費者がデータをプル(取得)し、メッセージを消費するとメッセージは削除されます。

3.4.2 パブリッシュ/サブスクライブモード

対多の関係で、オブザーバーパターンとも呼ばれます。データが生成されるとすべての消費者にプッシュ(送信)され、消費者はメッセージを消費してもメッセージは削除されません。

四、Kafka

4.1 概要

Kafkaは、パブリッシュ/サブスクライブモデルに基づく分散型メッセージキュー(MQ)で、主にビッグデータ分野のリアルタイム計算およびログ収集に使用されます。

4.2 特性

  1. スループット高遅延低:Kafkaは1秒間に数十万メッセージを処理でき、遅延は数ミリ秒に達します。各トピックは複数のパーティションに分割でき、Consumer Groupがパーティションを消費することで負荷分散能力と消費能力が向上します
  2. スケーラビリティ:Kafkaクラスタはホット拡張をサポートします
  3. 永続性と信頼性:メッセージはローカルディスクに永続化され、データバックアップによるデータ損失防止をサポートします
  4. フォールトトレランス:クラスタ内のノード障害を許容します(複数レプリカの場合、レプリカ数がnならn-1ノードの障害を許容)
  5. 高同時実行性:数千クライアントの同時読み書きをサポートします

4.3 Kafkaシステムアーキテクチャ

4.3.1 コアコンポーネント

  1. Broker:Kafkaサーバーノード。クラスタは複数のbrokerで構成され、1つのbrokerは複数のトピックを保持できます
  2. Consumer:brokerからデータをプルできます。複数のトピックのデータを消費できます
  3. Consumer Group(CG):実際のメッセージ購読者。1つ以上のコンシューマーで構成され、グループ内のメンバーは同じパーティションデータを重複して消費できません
  4. Producer:データのパブリッシャー。メッセージをKafkaのトピックにプッシュします。brokerはプロデューサーから送信されたメッセージを受け取り、現在のデータを追加するセグメントファイルに追加します
  5. Topic:キューのようなもので、プロデューサーとコンシューマーは両方ともトピックを対象とします
  6. Zookeeper:Kafkaクラスタのメタデータを保存し、プロデューサーとコンシューマーの動作はZookeeperの管理とサポートを必要とします

4.3.2 概念の拡張

  1. Partition:トピックは1つ以上のパーティションに分割でき、各パーティションは順序付きのキューです。Kafkaはパーティション内の記録の順序を保証しますが、トピック内の異なるパーティションの順序は保証しません。各トピックは少なくとも1つのパーティションを持ち、プロデューサーがデータを生成すると、割り当て戦略に基づいてパーティションが選択され、メッセージは指定されたパーティションのキューの末尾に追加されます
  2. Replica:クラスタ内の特定ノードに障害が発生した場合にそのノードのパーティションデータが失われないようにするためのメカニズム。トピックの各パーティションには複数のレプリカ(1つのリーダーと複数のフォロワー)があります
  3. Leader:各パーティションには複数のレプリアがあり、そのうち1つだけがLeaderとして機能し、現在のデータの読み書きを担当します
  4. Follower:Leaderに追随し、すべての書き込みリクエストはLeaderを経由してルーティングされます。データの変更はすべてのFollowerにブロードキャストされ、FollowerはLeaderとデータを同期させます。Followerはバックアップのみを担当し、データの読み書きは担当しません。Leaderに障害が発生すると、Followerから新しいLeaderが選出されます
  5. Offset:各メッセージを一意に識別します。オフセットはデータの読み取り位置を決定し、スレッドの安全性の問題はありません。コンシューマーはオフセットを使用して次に読み取るメッセージ(消費位置)を決定します

五、Kafkaクラスタのデプロイ

5.1 ソフトウェアパッケージのダウンロード


cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz

5.2 Kafkaのインストール

ステップ1:ソフトウェアパッケージの解凍


cd /opt/
tar zxvf kafka_2.13-2.8.2.tgz
mv kafka_2.13-2.8.2 /usr/local/kafka

ステップ2:設定ファイルの変更


cd /usr/local/kafka/config/
cp server.properties{,.bak}
vim server.properties

設定ファイルの主なパラメータ:


broker.id=0                          # brokerのグローバル一意ID
listeners=PLAINTEXT://192.168.80.10:9092  # 監听IPとポート
num.network.threads=3               # ネットワークリクエスト処理スレッド数
num.io.threads=8                    # ディスクIO処理スレッド数
socket.send.buffer.bytes=102400     # 送信ソケットのバッファサイズ
socket.receive.buffer.bytes=102400  # 受信ソケットのバッファサイズ
socket.request.max.bytes=104857600  # リクエストソケットのバッファサイズ
log.dirs=/usr/local/kafka/logs      # ログとデータの保存パス
num.partitions=1                    # トピックのデフォルトパーティション数
num.recovery.threads.per.data.dir=1 # データディレクトリの回復・クリーンアップスレッド数
log.retention.hours=168             # セグメントファイルの保持時間(時間)
log.segment.bytes=1073741824        # セグメントファイルの最大サイズ
zookeeper.connect=192.168.2.100:2181,192.168.2.102:2181,192.168.2.103:2181  # Zookeeperクラスタ接続先

ステップ3:環境変数の設定


vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile

ステップ4:起動スクリプトの作成と起動


vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'

case $1 in
  start)
    echo "---------- Kafka 起動 ------------"
    ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
  ;;
  stop)
    echo "---------- Kafka 停止 ------------"
    ${KAFKA_HOME}/bin/kafka-server-stop.sh
  ;;
  restart)
    $0 stop
    $0 start
  ;;
  status)
    echo "---------- Kafka 状態 ------------"
    count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
    if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
  ;;
  *)
    echo "Usage: $0 {start|stop|restart|status}"
esac

# 実行権限の設定とサービス登録
chmod +x /etc/init.d/kafka
chkconfig --add kafka

# サービスの起動
service kafka start

六、Filebeat+Kafka+ELKアーキテクチャの構築

サーバー IP コンポーネント
Node1 192.168.2.100 Elasticsearch、Kibana、Zookeeper、Kafka
Node2 192.168.2.102 Elasticsearch、Zookeeper、Kafka
Apache 192.168.2.103 Logstash、Apache、Zookeeper、Kafka
Filebeat 192.168.2.105 Filebeat、Zookeeper、Kafka

6.1 Zookeeper+Kafkaクラスタのデプロイ

前述のZooKeeperクラスタデプロイとKafkaクラスタデプロイを参照してください。

6.2 Filebeatのデプロイ

ステップ1:事前準備


# ホスト名の変更
hostnamectl set-hostname filebeat
# ファイアウォールとSELinuxの停止
systemctl disable firewalld --now
setenforce 0

ステップ2:Filebeatのインストール


cd /opt
tar zxvf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 /usr/local/filebeat

ステップ3:設定ファイルの編集


cd /usr/local/filebeat
vim filebeat.yml

設定ファイルの例:


filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/httpd/access_log
  tags: ["access"]
  
- type: log
  enabled: true
  paths:
    - /var/log/httpd/error_log
  tags: ["error"]

# Kafkaへの出力設定
output.kafka:
  enabled: true
  hosts: ["192.168.2.100:9092","192.168.2.102:9092","192.168.2.103:9092"]
  topic: "httpd"

ステップ4:Filebeatの起動


./filebeat -e -c filebeat.yml

6.3 ELKのデプロイ

ELK(Elasticsearch、Logstash、Kibana)のデプロイ方法は関連ドキュメントを参照してください。

6.4 Logstash設定ファイルの作成


cd /etc/logstash/conf.d/
vim kafka.conf

設定ファイルの例:


input {
  kafka {
    bootstrap_servers => "192.168.2.100:9092,192.168.2.102:9092,192.168.2.103:9092"
    topics => "httpd"
    type => "httpd_kafka"
    codec => "json"
    auto_offset_reset => "latest"
    decorate_events => true
  }
}

output {
  if "access" in [tags] {
    elasticsearch {
      hosts => ["192.168.2.100:9200"]
      index => "httpd_access-%{+YYYY.MM.dd}"
    }
  }
  
  if "error" in [tags] {
    elasticsearch {
      hosts => ["192.168.2.100:9200"]
      index => "httpd_error-%{+YYYY.MM.dd}"
    }
  }
  
  stdout { codec => rubydebug }
}

# Logstashの起動
logstash -f kafka.conf

6.5 動作テスト

  1. ブラウザで http://192.168.2.100:5601 にアクセスしKibanaにログイン
  2. "Create Index Pattern"ボタンをクリックし、インデックスパターン"httpd-*"を作成
  3. "Discover"ボタンをクリックしてログ情報を確認

タグ: ZooKeeper Kafka ELKスタック 分散システム メッセージキュー

5月31日 00:36 投稿