大規模検索ログ解析におけるTopN問題の解法
大容量ファイルから頻出検索語を抽出する手法について考察する。メモリ制約下での効率的な処理が鍵となる。
分散処理による解決策
from collections import defaultdict
import heapq
def process_chunk(file_path, chunk_size=1024):
counter = defaultdict(int)
with open(file_path) as f:
while True:
lines = f.readlines(chunk_size)
if not lines:
break
for line in lines:
words = line.strip().split()
for word in words:
counter[word] += 1
return counter
def merge_results(*counters):
merged = defaultdict(int)
for c in counters:
for k, v in c.items():
merged[k] += v
return merged
def get_top_n(file_path, n, num_chunks=10):
chunk_results = []
for _ in range(num_chunks):
chunk_results.append(process_chunk(file_path))
final_counts = merge_results(*chunk_results)
heap = []
for word, count in final_counts.items():
heapq.heappush(heap, (count, word))
if len(heap) > n:
heapq.heappop(heap)
return [item[1] for item in heap]
生産者-消費者モデルの実装例
import threading
import time
import random
class SharedQueue:
def __init__(self, max_size):
self.queue = []
self.max_size = max_size
self.lock = threading.Lock()
self.not_empty = threading.Condition(self.lock)
self.not_full = threading.Condition(self.lock)
def put(self, item):
with self.not_full:
while len(self.queue) >= self.max_size:
self.not_full.wait()
self.queue.append(item)
self.not_empty.notify()
def get(self):
with self.not_empty:
while not self.queue:
self.not_empty.wait()
item = self.queue.pop(0)
self.not_full.notify()
return item
def producer(queue, pid):
while True:
data = random.randint(1, 100)
print(f"Producer {pid} produced: {data}")
queue.put(data)
time.sleep(0.5)
def consumer(queue, cid):
while True:
data = queue.get()
print(f"Consumer {cid} consumed: {data}")
time.sleep(0.1)
def main():
queue = SharedQueue(10)
producers = [threading.Thread(target=producer, args=(queue, i)) for i in range(2)]
consumers = [threading.Thread(target=consumer, args=(queue, i)) for i in range(3)]
for t in producers + consumers:
t.daemon = True
t.start()
time.sleep(5)
分散システム設計課題
注文処理システムの設計原則
- 負荷分散:
- シャーディング基準: ユーザIDハッシュ vs 地域ベース
- 動的リバランシング手法
- 冪等性保証:
- ユニークトランザクションIDの生成
- 冪等トークンの適用パターン
- 障害対応戦略:
- 指数バックオフアルゴリズム
- デッドレターキューの活用
データ処理技術に関する質問
- ストリーム処理におけるExactly-Once保証の実現方法
- データ偏在問題の検出と解決手法
- リアルタイムデータ更新の監視方法