ZooKeeper クライアントとの永続的な接続を管理し、自動再接続・監視機能を備えた単一ヒープ生成的なハンドラクラスを C++ で実装します。
コピー禁止基底クラス
class CNonCopyable {
protected:
CNonCopyable() = default;
virtual ~CNonCopyable() = default;
CNonCopyable(const CNonCopyable&) = delete;
CNonCopyable& operator=(const CNonCopyable&) = delete;
};
RAII マテックスロック・自動解放クラス
#ifndef _AUTOLOCK_H_
#define _AUTOLOCK_H_
#include <pthread.h>
class CScopedLock {
public:
explicit CScopedLock(pthread_mutex_t& mtx) : m_mutex(mtx) {
pthread_mutex_lock(&m_mutex);
}
~CScopedLock() {
pthread_mutex_unlock(&m_mutex);
}
private:
pthread_mutex_t& m_mutex;
};
#endif
ZooKeeper 接続ハンドラインターフェース
#ifndef _ZK_CONNECTION_H_
#define _ZK_CONNECTION_H_
#include "CNonCopyable.h"
#include <string>
#include <set>
#include <map>
#include <pthread.h>
#include <zookeeper.h>
using StringSet = std::set<std::string>;
using ResetCallback = void(*)();
class CZkConnection : public CNonCopyable {
public:
static CZkConnection* GetInstance();
int Initialize(const std::string& hosts, int timeoutMs);
int Finalize();
int Exists(const std::string& path, zstat_t* stat);
int Create(const std::string& path, const std::string& data, bool sequential, std::string& createdPath);
int Remove(const std::string& path, int version = -1);
int GetChildren(const std::string& path, StringSet& children);
int GetData(const std::string& path, std::string& data, zstat_t* stat);
int GetChildrenWatched(const std::string& path, watcher_fn watcher, StringSet& children);
int GetDataWatched(const std::string& path, watcher_fn watcher, std::string& data, zstat_t* stat);
int SetData(const std::string& path, const std::string& value);
int RegisterResetCallback(const std::string& name, ResetCallback cb);
int UnregisterResetCallback(const std::string& name);
private:
CZkConnection();
void OnSessionEvent(zhandle_t* zh, int type, int state, const char* path);
int Reconnect();
int PollHealth();
bool IsAlive() const;
static void * HealthCheckLoop(void*);
private:
pthread_t m_healthThread = 0;
zhandle_t* m_handle = nullptr;
bool m_running = false;
std::string m_zkHosts;
int m_sessionTimeout;
std::map<std::string, ResetCallback> m_reconnectHandlers;
pthread_mutex_t m_mutex = PTHREAD_MUTEX_INITIALIZER;
};
#endif
接続ハンドラの実装概要
// static initialization
CZkConnection* CZkConnection::s_instance = nullptr;
pthread_mutex_t CZkConnection::s_mutex = PTHREAD_MUTEX_INITIALIZER;
CZkConnection* CZkConnection::GetInstance() {
if (!s_instance) {
CScopedLock lock(s_mutex);
if (!s_instance) {
s_instance = new CZkConnection();
}
}
return s_instance;
}
int CZkConnection::Initialize(const std::string& hosts, int timeoutMs) {
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
m_zkHosts = hosts;
m_sessionTimeout = timeoutMs;
if (!m_healthThread) {
m_running = true;
pthread_create(&m_healthThread, nullptr, CZkConnection::HealthCheckLoop, nullptr);
}
return 0;
}
int CZkConnection::Finalize() {
m_running = false;
if (m_handle) {
zookeeper_close(m_handle);
m_handle = nullptr;
}
return 0;
}
int CZkConnection::Reconnect() {
CScopedLock lock(m_mutex);
zhandle_t* old = m_handle;
m_handle = zookeeper_init(m_zkHosts.c_str(), &CZkConnection::OnSessionEvent,
m_sessionTimeout, 0, nullptr, 0);
if (!m_handle) {
return -1;
}
for (const auto& [_, cb] : m_reconnectHandlers) {
cb();
}
if (old) zookeeper_close(old);
return 0;
}
void CZkConnection::OnSessionEvent(zhandle_t* zh, int type, int state, const char* path, void*) {
printf("Session Event: type=%d, state=%d, path=%s\n", type, state, path ? path : "(null)");
}
int CZkConnection::PollHealth() {
zstat_t stat{};
if (!m_handle || ZOK != zoo_exists(m_handle, "/", 0, &stat)) {
return Reconnect();
}
return 0;
}
void* CZkConnection::HealthCheckLoop(void*) {
while (GetInstance()->IsAlive()) {
GetInstance()->PollHealth();
usleep(500000); // 500 ms
}
return nullptr;
}
// Primitive Operations
int CZkConnection::Exists(const std::string& path, zstat_t* stat) {
return zoo_exists(m_handle, path.c_str(), 0, stat);
}
int CZkConnection::Create(const std::string& path, const std::string& val, bool seq, std::string& outPath) {
char buffer[256];
int flags = ZOO_EPHEMERAL | (seq ? ZOO_SEQUENCE : 0);
int rc = zoo_create(m_handle, path.c_str(), val.data(), val.size(),
&ZOO_OPEN_ACL_UNSAFE, flags, buffer, sizeof(buffer));
if (rc == ZOK) outPath = buffer;
return rc;
}
int CZkConnection::Remove(const std::string& path, int ver) {
return zoo_delete(m_handle, path.c_str(), ver);
}
int CZkConnection::GetChildren(const std::string& path, StringSet& out) {
struct String_vector vec{};
int rc = zoo_get_children(m_handle, path.c_str(), 0, &vec);
for (int i = 0; i < vec.count; ++i) out.insert(vec.data[i]);
deallocate_String_vector(&vec);
return rc;
}
int CZkConnection::GetData(const std::string& path, std::string& out, zstat_t* stat) {
char buf[1024];
int len = sizeof(buf);
int rc = zoo_get(m_handle, path.c_str(), 0, buf, &len, stat);
if (rc == ZOK) out.assign(buf, len);
return rc;
}
// Watched Variants
int CZkConnection::GetChildrenWatched(const std::string& path, watcher_fn watcher, StringSet& children) {
struct String_vector vec{};
int rc = zoo_wget_children(m_handle, path.c_str(), watcher, nullptr, &vec);
for (int i = 0; i < vec.count; ++i) children.insert(vec.data[i]);
deallocate_String_vector(&vec);
return rc;
}
int CZkConnection::GetDataWatched(const std::string& path, watcher_fn watcher, std::string& data, zstat_t* stat) {
char buf[1024];
int len = sizeof(buf);
int rc = zoo_wget(m_handle, path.c_str(), watcher, nullptr, buf, &len, stat);
if (rc == ZOK) data.assign(buf, len);
return rc;
}
int CZkConnection::SetData(const std::string& path, const std::string& val) {
return zoo_set(m_handle, path.c_str(), val.data(), val.size(), -1);
}
int CZkConnection::RegisterResetCallback(const std::string& name, ResetCallback cb) {
CScopedLock lock(m_mutex);
m_reconnectHandlers[name] = cb;
return 0;
}
int CZkConnection::UnregisterResetCallback(const std::string& name) {
CScopedLock lock(m_mutex);
m_reconnectHandlers.erase(name);
return 0;
}
bool CZkConnection::IsAlive() const { return m_running; }
使用例(テスト)
#include "CZkConnection.h"
#include <unistd.h>
#include <iostream>
int main() {
const std::string zkAddrs = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181";
CZkConnection* conn = CZkConnection::GetInstance();
conn->Initialize(zkAddrs, 10000);
sleep(1);
zstat_t st{};
conn->Exists("/", &st);
std::string createdPath;
conn->Create("/demo/node_", "value", true, createdPath);
std::cout << "Created: " << createdPath << std::endl;
std::string val;
zstat_t st2{};
conn->GetData("/demo/node_", val, &st2);
std::cout << "Read: " << val << std::endl;
conn->SetData("/demo/node_", "updated");
conn->GetData("/demo/node_", val, &st2);
std::cout << "After update: " << val << std::endl;
StringSet children;
conn->GetChildren("/demo", children);
for (const auto& c : children) {
std::cout << "Child: " << c << std::endl;
}
sleep(30);
conn->Finalize();
return 0;
}