1. Redis分散ロックの仕組み Redisにおける分散ロックは、複数のクライアントが同一リソースを同時に操作する競合状態を防ぐために使われます。ロック取得にはSETNXコマンド(存在しない場合のみ設定)を使用し、成功時は1を返却します。ロックの保持を確実にするため、トランザクション処理とPIPELINEの併用が必要です。
2. トランザクションの役割 RedisではMULTI/EXECコマンドでトランザクションを実装します。複数のコマンドを一括で実行し、途中でエラーが発生した場合でも全操作をキャンセルします。WATCHコマンドでロックキーを監視し、状態変化時に自動的にリトライします。
3. 実装サンプルコード 以下は分散ロックを用いたトランザクション処理の例です。パイプライン処理内でロック取得→リソース更新→ロック解放の一連の流れを実現しています。
import redis
from redis.exceptions import WatchError
# Redisクライアント初期化
client = redis.StrictRedis(host='127.0.0.1', port=6379)
def execute_critical_section(resource_id):
lock_key = f'resource_lock:{resource_id}'
pipeline = client.pipeline()
while True:
try:
pipeline.watch(lock_key)
if client.exists(lock_key):
continue
pipeline.multi()
pipeline.setex(lock_key, 5, 'locked') # 5秒間有効
pipeline.hincrby('counter', 'value', -1)
pipeline.execute()
break
except WatchError:
continue
finally:
client.delete(lock_key)
4. 分散ロックの設計条件
- 同時性: 任意のタイミングで1つのクライアントのみがロックを保持
- 安定性: クライアント異常時でもロックが自動解除されるメカニズム
- 非ブロッキング: ロック取得失敗時は即座に終了
5. Pythonによる分散ロッククラス 以下の実装では、ユニークな識別子を付与することで誤ったロック解放を防止しています。
import threading
import time
import socket
import os
class DistributedLocker:
def __init__(self, redis_client):
self.redis = redis_client
self.identifier = f'{socket.gethostname()}:{os.getpid()}:{threading.get_ident()}'
def acquire(self, key, expire=10):
while True:
if self.redis.set(key, self.identifier, nx=True, ex=expire):
return True
time.sleep(0.01)
def release(self, key):
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(script, 1, key, self.identifier)
6. 実装フロー
- ロックキーに対してSETNXコマンドでロック取得を試みる
- 取得成功時はトランザクション開始
- リソース更新処理を実行
- 最終的にLuaスクリプトで安全なロック解放を実施