ZooKeeper を用いたシンプルなサービス登録・検出機能(C++): 接続管理層の実装

ZooKeeper クライアントとの永続的な接続を管理し、自動再接続・監視機能を備えた単一ヒープ生成的なハンドラクラスを C++ で実装します。

コピー禁止基底クラス

class CNonCopyable {
protected:
    CNonCopyable() = default;
    virtual ~CNonCopyable() = default;
    CNonCopyable(const CNonCopyable&) = delete;
    CNonCopyable& operator=(const CNonCopyable&) = delete;
};

RAII マテックスロック・自動解放クラス

#ifndef _AUTOLOCK_H_
#define _AUTOLOCK_H_

#include <pthread.h>

class CScopedLock {
public:
    explicit CScopedLock(pthread_mutex_t& mtx) : m_mutex(mtx) {
        pthread_mutex_lock(&m_mutex);
    }

    ~CScopedLock() {
        pthread_mutex_unlock(&m_mutex);
    }

private:
    pthread_mutex_t& m_mutex;
};

#endif

ZooKeeper 接続ハンドラインターフェース

#ifndef _ZK_CONNECTION_H_
#define _ZK_CONNECTION_H_

#include "CNonCopyable.h"
#include <string>
#include <set>
#include <map>
#include <pthread.h>
#include <zookeeper.h>

using StringSet = std::set<std::string>;

using ResetCallback = void(*)();

class CZkConnection : public CNonCopyable {
public:
    static CZkConnection* GetInstance();

    int Initialize(const std::string& hosts, int timeoutMs);
    int Finalize();

    int Exists(const std::string& path, zstat_t* stat);
    int Create(const std::string& path, const std::string& data, bool sequential, std::string& createdPath);
    int Remove(const std::string& path, int version = -1);
    int GetChildren(const std::string& path, StringSet& children);
    int GetData(const std::string& path, std::string& data, zstat_t* stat);
    int GetChildrenWatched(const std::string& path, watcher_fn watcher, StringSet& children);
    int GetDataWatched(const std::string& path, watcher_fn watcher, std::string& data, zstat_t* stat);
    int SetData(const std::string& path, const std::string& value);
    
    int RegisterResetCallback(const std::string& name, ResetCallback cb);
    int UnregisterResetCallback(const std::string& name);

private:
    CZkConnection();
    void OnSessionEvent(zhandle_t* zh, int type, int state, const char* path);
    int Reconnect();
    int PollHealth();
    bool IsAlive() const;

    static void * HealthCheckLoop(void*);

private:
    pthread_t m_healthThread = 0;
    zhandle_t* m_handle = nullptr;
    bool m_running = false;

    std::string m_zkHosts;
    int m_sessionTimeout;
    std::map<std::string, ResetCallback> m_reconnectHandlers;
    pthread_mutex_t m_mutex = PTHREAD_MUTEX_INITIALIZER;
};

#endif

接続ハンドラの実装概要

// static initialization
CZkConnection* CZkConnection::s_instance = nullptr;
pthread_mutex_t CZkConnection::s_mutex = PTHREAD_MUTEX_INITIALIZER;

CZkConnection* CZkConnection::GetInstance() {
    if (!s_instance) {
        CScopedLock lock(s_mutex);
        if (!s_instance) {
            s_instance = new CZkConnection();
        }
    }
    return s_instance;
}

int CZkConnection::Initialize(const std::string& hosts, int timeoutMs) {
    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    m_zkHosts = hosts;
    m_sessionTimeout = timeoutMs;
    
    if (!m_healthThread) {
        m_running = true;
        pthread_create(&m_healthThread, nullptr, CZkConnection::HealthCheckLoop, nullptr);
    }
    return 0;
}

int CZkConnection::Finalize() {
    m_running = false;
    if (m_handle) {
        zookeeper_close(m_handle);
        m_handle = nullptr;
    }
    return 0;
}

int CZkConnection::Reconnect() {
    CScopedLock lock(m_mutex);
    zhandle_t* old = m_handle;
    m_handle = zookeeper_init(m_zkHosts.c_str(), &CZkConnection::OnSessionEvent,
                              m_sessionTimeout, 0, nullptr, 0);
    if (!m_handle) {
        return -1;
    }

    for (const auto& [_, cb] : m_reconnectHandlers) {
        cb();
    }

    if (old) zookeeper_close(old);
    return 0;
}

void CZkConnection::OnSessionEvent(zhandle_t* zh, int type, int state, const char* path, void*) {
    printf("Session Event: type=%d, state=%d, path=%s\n", type, state, path ? path : "(null)");
}

int CZkConnection::PollHealth() {
    zstat_t stat{};
    if (!m_handle || ZOK != zoo_exists(m_handle, "/", 0, &stat)) {
        return Reconnect();
    }
    return 0;
}

void* CZkConnection::HealthCheckLoop(void*) {
    while (GetInstance()->IsAlive()) {
        GetInstance()->PollHealth();
        usleep(500000); // 500 ms
    }
    return nullptr;
}

// Primitive Operations
int CZkConnection::Exists(const std::string& path, zstat_t* stat) {
    return zoo_exists(m_handle, path.c_str(), 0, stat);
}

int CZkConnection::Create(const std::string& path, const std::string& val, bool seq, std::string& outPath) {
    char buffer[256];
    int flags = ZOO_EPHEMERAL | (seq ? ZOO_SEQUENCE : 0);
    int rc = zoo_create(m_handle, path.c_str(), val.data(), val.size(),
                        &ZOO_OPEN_ACL_UNSAFE, flags, buffer, sizeof(buffer));
    if (rc == ZOK) outPath = buffer;
    return rc;
}

int CZkConnection::Remove(const std::string& path, int ver) {
    return zoo_delete(m_handle, path.c_str(), ver);
}

int CZkConnection::GetChildren(const std::string& path, StringSet& out) {
    struct String_vector vec{};
    int rc = zoo_get_children(m_handle, path.c_str(), 0, &vec);
    for (int i = 0; i < vec.count; ++i) out.insert(vec.data[i]);
    deallocate_String_vector(&vec);
    return rc;
}

int CZkConnection::GetData(const std::string& path, std::string& out, zstat_t* stat) {
    char buf[1024];
    int len = sizeof(buf);
    int rc = zoo_get(m_handle, path.c_str(), 0, buf, &len, stat);
    if (rc == ZOK) out.assign(buf, len);
    return rc;
}

// Watched Variants
int CZkConnection::GetChildrenWatched(const std::string& path, watcher_fn watcher, StringSet& children) {
    struct String_vector vec{};
    int rc = zoo_wget_children(m_handle, path.c_str(), watcher, nullptr, &vec);
    for (int i = 0; i < vec.count; ++i) children.insert(vec.data[i]);
    deallocate_String_vector(&vec);
    return rc;
}

int CZkConnection::GetDataWatched(const std::string& path, watcher_fn watcher, std::string& data, zstat_t* stat) {
    char buf[1024];
    int len = sizeof(buf);
    int rc = zoo_wget(m_handle, path.c_str(), watcher, nullptr, buf, &len, stat);
    if (rc == ZOK) data.assign(buf, len);
    return rc;
}

int CZkConnection::SetData(const std::string& path, const std::string& val) {
    return zoo_set(m_handle, path.c_str(), val.data(), val.size(), -1);
}

int CZkConnection::RegisterResetCallback(const std::string& name, ResetCallback cb) {
    CScopedLock lock(m_mutex);
    m_reconnectHandlers[name] = cb;
    return 0;
}

int CZkConnection::UnregisterResetCallback(const std::string& name) {
    CScopedLock lock(m_mutex);
    m_reconnectHandlers.erase(name);
    return 0;
}

bool CZkConnection::IsAlive() const { return m_running; }

使用例(テスト)

#include "CZkConnection.h"
#include <unistd.h>
#include <iostream>

int main() {
    const std::string zkAddrs = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181";
    CZkConnection* conn = CZkConnection::GetInstance();
    
    conn->Initialize(zkAddrs, 10000);
    sleep(1);

    zstat_t st{};
    conn->Exists("/", &st);

    std::string createdPath;
    conn->Create("/demo/node_", "value", true, createdPath);
    std::cout << "Created: " << createdPath << std::endl;

    std::string val;
    zstat_t st2{};
    conn->GetData("/demo/node_", val, &st2);
    std::cout << "Read: " << val << std::endl;

    conn->SetData("/demo/node_", "updated");
    conn->GetData("/demo/node_", val, &st2);
    std::cout << "After update: " << val << std::endl;

    StringSet children;
    conn->GetChildren("/demo", children);
    for (const auto& c : children) {
        std::cout << "Child: " << c << std::endl;
    }

    sleep(30);
    conn->Finalize();
    return 0;
}

タグ: ZooKeeper C++11 pthread Network-Programming service-discovery

6月3日 19:26 投稿