まず、Citus拡張機能を備えたPostgreSQL分散クラスタがKubernetes上に構築されていることを前提とします。
クラスタノードの状態を確認します(kubectl get po -n citus)。1台のCoordinatorノードと3台のWorkerノードが稼働中である必要があります。
NAME READY STATUS RESTARTS AGE
citus-coordinator-0 2/2 Running 0 4h
citus-worker-0 2/2 Running 0 30m
citus-worker-1 2/2 Running 0 29m
citus-worker-2 2/2 Running 0 28m
Coordinatorノードに接続し(kubectl -n citus exec -it citus-coordinator-0 -- bash)、Workerノードリストを取得します。
psql 'host=citus-coordinator user=postgres' -c "SELECT * FROM citus_get_active_worker_nodes();"
node_name | node_port
-----------------------------------------------------+-----------
citus-worker-0.citus-worker.citus.svc.cluster.local | 6432
citus-worker-1.citus-worker.citus.svc.cluster.local | 6432
citus-worker-2.citus-worker.citus.svc.cluster.local | 6432
分散テーブルの構築
create_distributed_table関数を使用して、センサーデータを保持するテーブルを分散化します。
CREATE TABLE sensor_readings (
sensor_id bigint,
reading_id bigserial,
timestamp timestamptz DEFAULT NOW(),
value jsonb NOT NULL,
PRIMARY KEY (sensor_id, reading_id)
);
SELECT create_distributed_table('sensor_readings', 'sensor_id');
特定のsensor_idに対するクエリは単一ノードにルーティングされ、複数IDにまたがるクエリは並列処理されます。サンプルデータを挿入します。
INSERT INTO sensor_readings (sensor_id, value)
SELECT s % 50, ('{"temp":'||(20 + random()*10)||'}')::jsonb
FROM generate_series(1,500000) s;
sensor_id=5の最新3件を取得:
SELECT * FROM sensor_readings
WHERE sensor_id = 5
ORDER BY timestamp DESC
LIMIT 3;
sensor_id | reading_id | timestamp | value
-----------+------------+-----------------------------+---------------------
5 | 499905 | 2023-08-15 07:22:18.342+00 | {"temp":27.345}
5 | 499805 | 2023-08-15 07:22:18.342+00 | {"temp":22.876}
5 | 499705 | 2023-08-15 07:22:18.342+00 | {"temp":29.102}
集計クエリの実行計画を確認:
EXPLAIN SELECT COUNT(*) FROM sensor_readings;
Custom Scan (Citus Adaptive)
Task Count: 32
Tasks Shown: One of 32
-> Task
Query: SELECT COUNT(*) AS count FROM public.sensor_readings_102012
Node: host=citus-worker-0 ...
共置テーブルの活用
センサー情報テーブルを共置設定で作成し、分散結合を最適化します。
CREATE TABLE sensors (
sensor_id bigint PRIMARY KEY,
location text,
category_id int
);
SELECT create_distributed_table(
'sensors',
'sensor_id',
colocate_with := 'sensor_readings'
);
INSERT INTO sensors
SELECT s, 'Building-'||(s%10), 100
FROM generate_series(0,49) s;
カテゴリID=100のセンサー平均値を取得:
SELECT AVG((value->>'temp')::numeric)
FROM sensor_readings
JOIN sensors USING (sensor_id)
WHERE category_id = 100;
参照テーブルの実装
小規模な次元データ用に全ノードに複製する参照テーブルを作成します。
CREATE TABLE sensor_categories (
category_id int PRIMARY KEY,
category_name text NOT NULL
);
SELECT create_reference_table('sensor_categories');
INSERT INTO sensor_categories VALUES (100, 'Temperature');
参照テーブルを結合したクエリ:
SELECT s.sensor_id, r.timestamp, r.value->>'temp', c.category_name
FROM sensor_readings r
JOIN sensors s USING (sensor_id)
JOIN sensor_categories c USING (category_id)
WHERE c.category_name = 'Temperature'
ORDER BY r.timestamp DESC
LIMIT 3;
列ストアの導入
列指向ストレージを使用してデータ圧縮を実現します。
CREATE TABLE readings_columnar (
sensor_id bigint,
reading_id bigserial,
timestamp timestamptz,
value jsonb
) USING columnar;
INSERT INTO readings_columnar
SELECT * FROM sensor_readings;
CREATE TABLE readings_row AS SELECT * FROM sensor_readings;
ストレージ効率を比較:
List of relations
Schema | Name | Type | Size
--------+-----------------------+----------+------------
public | readings_columnar | table | 12 MB
public | readings_row | table | 420 MB
列ストアは数十倍の圧縮率を実現し、分析ワークロードに最適です。ただし、更新/削除操作はサポートされていないため、新規データは行ストア、歴史データは列ストアで管理するハイブリッド戦略が推奨されます。