微サービス架构における通信基盤の選定
分散システムを構築する際、サービス間の効率的な通信は不可欠です。iChat プロジェクトでは、多数のマイクロサービスが協調して動作するため、信頼性の高い RPC 框架が必要でした。検討の結果、百度开源の bRPC を採用しました。これは C++ で記述された高性能なフレームワークであり、HTTP や Redis 等多种のプロトコルをサポートしています。
各サービスが独自にネットワーク通信やシリアライゼーションを実装するのは非効率です。bRPC を導入することで、接続管理、ロードバランシング、タイムアウト制御などの共通機能を統一し、開発チームはビジネスロジックに集中できるようになります。特に、Protobuf との親和性が高く、iChat の接口定義とスムーズに統合できる点が選定の決め手となりました。
接続プールの抽象化
bRPC の機能をそのまま使用するのではなく、サービス発見機構と連携させるため、独自のラッパー層を設計しました。まず、単一のサービス種類に対する複数の接続实例を管理するクラス RpcConnectionPool を定義します。これにより、特定のサービスに対する接続のライフサイクルを一元管理できます。
// 単一サービス向けの接続プール管理
class RpcConnectionPool {
public:
using ConnectionPtr = std::shared_ptr<brpc::Channel>;
using PoolPtr = std::shared_ptr<RpcConnectionPool>;
public:
explicit RpcConnectionPool(const std::string& svc_name)
: _service_name(svc_name), _current_index(0) {}
// エンドポイントの登録
void add_endpoint(const std::string& host) {
auto channel = std::make_shared<brpc::Channel>();
brpc::ChannelOptions opts;
opts.protocol = brpc::PROTOCOL_BAIDU_STD;
// タイムアウト等はデフォルト値を使用
if (channel->Init(host.c_str(), &opts) != 0) {
LOG_ERROR("Failed to init channel for {} at {}", _service_name, host);
return;
}
std::lock_guard<std::mutex> lock(_lock);
_endpoints.push_back(channel);
_host_map[host] = _endpoints.size() - 1;
}
// エンドポイントの削除
void remove_endpoint(const std::string& host) {
std::lock_guard<std::mutex> lock(_lock);
auto it = _host_map.find(host);
if (it == _host_map.end()) {
return;
}
size_t idx = it->second;
// 削除対象が現在の参照 индекс である場合、インデックスを調整
if (_current_index >= _endpoints.size() - 1) {
_current_index = 0;
}
_endpoints.erase(_endpoints.begin() + idx);
_host_map.erase(it);
// 削除後のインデックス再マップ処理は省略(簡略化のため)
}
// 接続の取得(ラウンドロビン)
ConnectionPtr select_channel() {
std::lock_guard<std::mutex> lock(_lock);
if (_endpoints.empty()) {
LOG_WARN("No available nodes for {}", _service_name);
return nullptr;
}
auto channel = _endpoints[_current_index];
_current_index = (_current_index + 1) % _endpoints.size();
return channel;
}
private:
std::string _service_name;
std::vector<ConnectionPtr> _endpoints;
std::unordered_map<std::string, size_t> _host_map;
size_t _current_index;
std::mutex _lock;
};
このクラスでは、std::vector を使用して接続实例を保持し、インデックスを用いたラウンドロビン方式で負荷分散を行います。各ホストアドレスとそのインデックスの対応表を保持することで、特定ノードの削除操作を効率的に行います。マルチスレッド環境での安全性を確保するため、すべての公開メソッド内部で mutex ロックを取得します。
サービス発見との連携
次に、複数のサービスプールを統括し、サービス発見システム(etcd)のイベントを受信するアダプタークラス ServiceDiscoveryAdapter を実装します。このクラスは、関心のあるサービス名を登録し、ノードのオンライン・オフラインイベントに応じて接続プールを動的に更新します。
// サービス発見イベントと接続プールを仲介するクラス
class ServiceDiscoveryAdapter {
public:
using Ptr = std::shared_ptr<ServiceDiscoveryAdapter>;
public:
// 監視対象サービスの登録
void subscribe(const std::string& service_name) {
std::lock_guard<std::mutex> lock(_mutex);
_monitored_services.insert(service_name);
}
// ノード上线イベント
void on_node_up(const std::string& instance_id, const std::string& host) {
std::string svc_name = extract_service_name(instance_id);
std::unique_lock<std::mutex> lock(_mutex);
if (_monitored_services.find(svc_name) == _monitored_services.end()) {
return; // 関心のないサービスは無視
}
auto pool = get_or_create_pool(svc_name);
lock.unlock(); // プール操作は内部でロックされるため解放
pool->add_endpoint(host);
LOG_INFO("Node online: {} at {}", svc_name, host);
}
// ノード下线イベント
void on_node_down(const std::string& instance_id, const std::string& host) {
std::string svc_name = extract_service_name(instance_id);
std::unique_lock<std::mutex> lock(_mutex);
if (_pool_map.find(svc_name) == _pool_map.end()) {
return;
}
auto pool = _pool_map[svc_name];
lock.unlock();
pool->remove_endpoint(host);
LOG_INFO("Node offline: {} at {}", svc_name, host);
}
// 接続の取得
RpcConnectionPool::ConnectionPtr get_channel(const std::string& service_name) {
std::lock_guard<std::mutex> lock(_mutex);
auto it = _pool_map.find(service_name);
if (it == _pool_map.end()) {
return nullptr;
}
return it->second->select_channel();
}
private:
RpcConnectionPool::PoolPtr get_or_create_pool(const std::string& name) {
if (_pool_map.find(name) == _pool_map.end()) {
_pool_map[name] = std::make_shared<RpcConnectionPool>(name);
}
return _pool_map[name];
}
std::string extract_service_name(const std::string& instance_id) {
auto pos = instance_id.find_last_of('/');
return (pos == std::string::npos) ? instance_id : instance_id.substr(0, pos);
}
std::unordered_set<std::string> _monitored_services;
std::unordered_map<std::string, RpcConnectionPool::PoolPtr> _pool_map;
std::mutex _mutex;
};
etcd などのサービス発見機構は、キーバリューストアにサービス实例の情報を登録します。インスタンス ID からサービス名を抽出し、事前に登録された監視リストと照合することで、不要なサービスへの接続尝试を防ぎます。プールが存在しない場合は自動的に生成し、存在する場合は既存のプールを更新します。これにより、サービス発見機構と bRPC の通信層を疎結合に保ちながら、動的なスケーリングに対応できます。
外部のビジネスロジックは、この ServiceDiscoveryAdapter を介してのみ通信チャネルを取得します。これにより、底层の接続管理やサービス発見の詳細が隠蔽され、システム全体の保守性が向上します。