大規模IT企業における5年目エンジニアの技術面接体験談

大規模検索ログ解析におけるTopN問題の解法

大容量ファイルから頻出検索語を抽出する手法について考察する。メモリ制約下での効率的な処理が鍵となる。

分散処理による解決策

from collections import defaultdict
import heapq

def process_chunk(file_path, chunk_size=1024):
    counter = defaultdict(int)
    with open(file_path) as f:
        while True:
            lines = f.readlines(chunk_size)
            if not lines:
                break
            for line in lines:
                words = line.strip().split()
                for word in words:
                    counter[word] += 1
    return counter

def merge_results(*counters):
    merged = defaultdict(int)
    for c in counters:
        for k, v in c.items():
            merged[k] += v
    return merged

def get_top_n(file_path, n, num_chunks=10):
    chunk_results = []
    for _ in range(num_chunks):
        chunk_results.append(process_chunk(file_path))
    
    final_counts = merge_results(*chunk_results)
    
    heap = []
    for word, count in final_counts.items():
        heapq.heappush(heap, (count, word))
        if len(heap) > n:
            heapq.heappop(heap)
    
    return [item[1] for item in heap]

生産者-消費者モデルの実装例

import threading
import time
import random

class SharedQueue:
    def __init__(self, max_size):
        self.queue = []
        self.max_size = max_size
        self.lock = threading.Lock()
        self.not_empty = threading.Condition(self.lock)
        self.not_full = threading.Condition(self.lock)
    
    def put(self, item):
        with self.not_full:
            while len(self.queue) >= self.max_size:
                self.not_full.wait()
            self.queue.append(item)
            self.not_empty.notify()
    
    def get(self):
        with self.not_empty:
            while not self.queue:
                self.not_empty.wait()
            item = self.queue.pop(0)
            self.not_full.notify()
            return item

def producer(queue, pid):
    while True:
        data = random.randint(1, 100)
        print(f"Producer {pid} produced: {data}")
        queue.put(data)
        time.sleep(0.5)

def consumer(queue, cid):
    while True:
        data = queue.get()
        print(f"Consumer {cid} consumed: {data}")
        time.sleep(0.1)

def main():
    queue = SharedQueue(10)
    producers = [threading.Thread(target=producer, args=(queue, i)) for i in range(2)]
    consumers = [threading.Thread(target=consumer, args=(queue, i)) for i in range(3)]
    
    for t in producers + consumers:
        t.daemon = True
        t.start()
    
    time.sleep(5)

分散システム設計課題

注文処理システムの設計原則

  • 負荷分散:
    • シャーディング基準: ユーザIDハッシュ vs 地域ベース
    • 動的リバランシング手法
  • 冪等性保証:
    • ユニークトランザクションIDの生成
    • 冪等トークンの適用パターン
  • 障害対応戦略:
    • 指数バックオフアルゴリズム
    • デッドレターキューの活用

データ処理技術に関する質問

  1. ストリーム処理におけるExactly-Once保証の実現方法
  2. データ偏在問題の検出と解決手法
  3. リアルタイムデータ更新の監視方法

タグ: TopN検索 並行処理 分散システム 技術面接 データ処理

6月18日 19:15 投稿