iChat 微サービス間通信における bRPC ラッパーの設計

微サービス架构における通信基盤の選定

分散システムを構築する際、サービス間の効率的な通信は不可欠です。iChat プロジェクトでは、多数のマイクロサービスが協調して動作するため、信頼性の高い RPC 框架が必要でした。検討の結果、百度开源の bRPC を採用しました。これは C++ で記述された高性能なフレームワークであり、HTTP や Redis 等多种のプロトコルをサポートしています。

各サービスが独自にネットワーク通信やシリアライゼーションを実装するのは非効率です。bRPC を導入することで、接続管理、ロードバランシング、タイムアウト制御などの共通機能を統一し、開発チームはビジネスロジックに集中できるようになります。特に、Protobuf との親和性が高く、iChat の接口定義とスムーズに統合できる点が選定の決め手となりました。

接続プールの抽象化

bRPC の機能をそのまま使用するのではなく、サービス発見機構と連携させるため、独自のラッパー層を設計しました。まず、単一のサービス種類に対する複数の接続实例を管理するクラス RpcConnectionPool を定義します。これにより、特定のサービスに対する接続のライフサイクルを一元管理できます。

// 単一サービス向けの接続プール管理
class RpcConnectionPool {
public:
    using ConnectionPtr = std::shared_ptr<brpc::Channel>;
    using PoolPtr = std::shared_ptr<RpcConnectionPool>;

public:
    explicit RpcConnectionPool(const std::string& svc_name)
        : _service_name(svc_name), _current_index(0) {}

    // エンドポイントの登録
    void add_endpoint(const std::string& host) {
        auto channel = std::make_shared<brpc::Channel>();
        brpc::ChannelOptions opts;
        opts.protocol = brpc::PROTOCOL_BAIDU_STD;
        // タイムアウト等はデフォルト値を使用

        if (channel->Init(host.c_str(), &opts) != 0) {
            LOG_ERROR("Failed to init channel for {} at {}", _service_name, host);
            return;
        }

        std::lock_guard<std::mutex> lock(_lock);
        _endpoints.push_back(channel);
        _host_map[host] = _endpoints.size() - 1;
    }

    // エンドポイントの削除
    void remove_endpoint(const std::string& host) {
        std::lock_guard<std::mutex> lock(_lock);
        auto it = _host_map.find(host);
        if (it == _host_map.end()) {
            return;
        }

        size_t idx = it->second;
        // 削除対象が現在の参照 индекс である場合、インデックスを調整
        if (_current_index >= _endpoints.size() - 1) {
            _current_index = 0;
        }
        
        _endpoints.erase(_endpoints.begin() + idx);
        _host_map.erase(it);
        
        // 削除後のインデックス再マップ処理は省略(簡略化のため)
    }

    // 接続の取得(ラウンドロビン)
    ConnectionPtr select_channel() {
        std::lock_guard<std::mutex> lock(_lock);
        if (_endpoints.empty()) {
            LOG_WARN("No available nodes for {}", _service_name);
            return nullptr;
        }

        auto channel = _endpoints[_current_index];
        _current_index = (_current_index + 1) % _endpoints.size();
        return channel;
    }

private:
    std::string _service_name;
    std::vector<ConnectionPtr> _endpoints;
    std::unordered_map<std::string, size_t> _host_map;
    size_t _current_index;
    std::mutex _lock;
};

このクラスでは、std::vector を使用して接続实例を保持し、インデックスを用いたラウンドロビン方式で負荷分散を行います。各ホストアドレスとそのインデックスの対応表を保持することで、特定ノードの削除操作を効率的に行います。マルチスレッド環境での安全性を確保するため、すべての公開メソッド内部で mutex ロックを取得します。

サービス発見との連携

次に、複数のサービスプールを統括し、サービス発見システム(etcd)のイベントを受信するアダプタークラス ServiceDiscoveryAdapter を実装します。このクラスは、関心のあるサービス名を登録し、ノードのオンライン・オフラインイベントに応じて接続プールを動的に更新します。

// サービス発見イベントと接続プールを仲介するクラス
class ServiceDiscoveryAdapter {
public:
    using Ptr = std::shared_ptr<ServiceDiscoveryAdapter>;

public:
    // 監視対象サービスの登録
    void subscribe(const std::string& service_name) {
        std::lock_guard<std::mutex> lock(_mutex);
        _monitored_services.insert(service_name);
    }

    // ノード上线イベント
    void on_node_up(const std::string& instance_id, const std::string& host) {
        std::string svc_name = extract_service_name(instance_id);
        
        std::unique_lock<std::mutex> lock(_mutex);
        if (_monitored_services.find(svc_name) == _monitored_services.end()) {
            return; // 関心のないサービスは無視
        }

        auto pool = get_or_create_pool(svc_name);
        lock.unlock(); // プール操作は内部でロックされるため解放

        pool->add_endpoint(host);
        LOG_INFO("Node online: {} at {}", svc_name, host);
    }

    // ノード下线イベント
    void on_node_down(const std::string& instance_id, const std::string& host) {
        std::string svc_name = extract_service_name(instance_id);

        std::unique_lock<std::mutex> lock(_mutex);
        if (_pool_map.find(svc_name) == _pool_map.end()) {
            return;
        }
        auto pool = _pool_map[svc_name];
        lock.unlock();

        pool->remove_endpoint(host);
        LOG_INFO("Node offline: {} at {}", svc_name, host);
    }

    // 接続の取得
    RpcConnectionPool::ConnectionPtr get_channel(const std::string& service_name) {
        std::lock_guard<std::mutex> lock(_mutex);
        auto it = _pool_map.find(service_name);
        if (it == _pool_map.end()) {
            return nullptr;
        }
        return it->second->select_channel();
    }

private:
    RpcConnectionPool::PoolPtr get_or_create_pool(const std::string& name) {
        if (_pool_map.find(name) == _pool_map.end()) {
            _pool_map[name] = std::make_shared<RpcConnectionPool>(name);
        }
        return _pool_map[name];
    }

    std::string extract_service_name(const std::string& instance_id) {
        auto pos = instance_id.find_last_of('/');
        return (pos == std::string::npos) ? instance_id : instance_id.substr(0, pos);
    }

    std::unordered_set<std::string> _monitored_services;
    std::unordered_map<std::string, RpcConnectionPool::PoolPtr> _pool_map;
    std::mutex _mutex;
};

etcd などのサービス発見機構は、キーバリューストアにサービス实例の情報を登録します。インスタンス ID からサービス名を抽出し、事前に登録された監視リストと照合することで、不要なサービスへの接続尝试を防ぎます。プールが存在しない場合は自動的に生成し、存在する場合は既存のプールを更新します。これにより、サービス発見機構と bRPC の通信層を疎結合に保ちながら、動的なスケーリングに対応できます。

外部のビジネスロジックは、この ServiceDiscoveryAdapter を介してのみ通信チャネルを取得します。これにより、底层の接続管理やサービス発見の詳細が隠蔽され、システム全体の保守性が向上します。

タグ: bRPC C++ Microservices RPC ServiceDiscovery

5月19日 14:50 投稿