PythonにおけるSQLite3を使用したデータベース操作ガイド

データベースの基礎とPythonの役割

大規模なデータを扱う場合、単なるファイル操作ではなく、データベース(DB)を利用することが推奨されます。データベースは、構造化されたデータを効率的に管理・検索・更新するためのシステムです。Pythonには標準ライブラリとしてsqlite3が含まれており、設定不要ですぐに軽量なリレーショナルデータベースを利用可能です。

リレーショナルデータベース(RDB)では、データを「テーブル(表)」として管理します。主要な用語は以下の通りです:

  • テーブル(Table): データを格納する2次元の表。
  • カラム(Column/属性): テーブル内の縦方向の列。データの種類(ID、名前など)を定義します。
  • ロウ(Row/レコード/タプル): テーブル内の横方向の行。1件分のデータに相当します。
  • 主キー(Primary Key): 各レコードを一意に識別するためのID。

SQLite3モジュールによるデータベース接続

PythonでSQLiteを操作するには、まずsqlite3モジュールをインポートし、データベースファイルに接続します。接続文字列に:memory:を指定すると、ディスクを使わずメモリ上のみに一時的なデータベースを作成することも可能です。

基本的な操作の流れは以下の通りです。

  1. sqlite3.connect()でデータベース接続オブジェクトを生成する。
  2. 接続オブジェクトからcursor()メソッドでカーソルオブジェクトを生成する。
  3. カーソルのexecute()メソッドを用いてSQLコマンドを実行する。
  4. データの取得にはfetchone()fetchall()を使用する。
  5. データの変更(挿入・更新・削除)を行った場合はcommit()で確定する。
  6. 最後にclose()でリソースを解放する。

以下は、安全なリソース管理を考慮した接続とテーブル作成の実装例です。

import sqlite3

# データベースファイル(存在しない場合は新規作成)
db_path = 'company.db'

try:
    # コンテキストマネージャを使用した接続(自動的にクローズされる)
    with sqlite3.connect(db_path) as connection:
        cursor = connection.cursor()
        
        # 従業員テーブルの作成(既に存在する場合はエラーを回避)
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS employees (
            employee_id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            department TEXT,
            score INTEGER
        );
        """
        cursor.execute(create_table_sql)
        print("テーブルの準備が完了しました。")

except sqlite3.Error as e:
    print(f"データベースエラーが発生しました: {e}")

データの操作:CRUD処理

データの挿入(INSERT)

SQLインジェクションを防ぐため、プレースホルダ(?)を使用してパラメータをバインドすることが推奨されます。

data_list = [
    (101, '田中 太郎', '営業部', 85),
    (102, '佐藤 花子', '開発部', 92),
    (103, '鈴木 一郎', '人事部', 78)
]

try:
    with sqlite3.connect('company.db') as conn:
        cursor = conn.cursor()
        
        # 複数行の一括挿入
        insert_sql = "INSERT INTO employees (employee_id, name, department, score) VALUES (?, ?, ?, ?)"
        cursor.executemany(insert_sql, data_list)
        
        # 名前付きパラメータを使用した挿入の例
        another_sql = "INSERT INTO employees (name, department, score) VALUES (:name, :dept, :score)"
        cursor.execute(another_sql, {"name": "高橋 次郎", "dept": "総務部", "score": 88})
        
        conn.commit() # 変更を確定
        print(f"{cursor.rowcount}件のレコードを挿入しました。")

except sqlite3.Error as e:
    print(f"挿入エラー: {e}")

データの検索(SELECT)

execute()でクエリを実行した後、カーソルをイテレータとして扱うか、fetchall()で全件取得します。

try:
    with sqlite3.connect('company.db') as conn:
        cursor = conn.cursor()
        
        # 特定の条件で検索(スコアが80点以上)
        sql = "SELECT name, score FROM employees WHERE score >= ?"
        cursor.execute(sql, (80,))
        
        print("--- 高スコア社員一覧 ---")
        # カーソルを直接ループして1行ずつ取得
        for row in cursor:
            print(f"名前: {row[0]}, スコア: {row[1]}")

except sqlite3.Error as e:
    print(f"検索エラー: {e}")

データの更新(UPDATE)と削除(DELETE)

更新と削除も同様にSQL文を実行し、最後にcommit()を呼び出します。

try:
    with sqlite3.connect('company.db') as conn:
        cursor = conn.cursor()
        
        # 更新:鈴木一郎のスコアを95に変更
        update_sql = "UPDATE employees SET score = ? WHERE name = ?"
        cursor.execute(update_sql, (95, '鈴木 一郎'))
        print(f"{cursor.rowcount}件を更新しました。")
        
        # 削除:IDが101のレコードを削除
        delete_sql = "DELETE FROM employees WHERE employee_id = ?"
        cursor.execute(delete_sql, (101,))
        print(f"{cursor.rowcount}件を削除しました。")
        
        conn.commit()

except sqlite3.Error as e:
    print(f"更新・削除エラー: {e}")

高度なSQLクエリの活用

データ分析や集計には、SQLの集計関数やグループ化機能を活用します。

集計とグループ化(GROUP BY / HAVING)

GROUP BY句を使用して特定のカラムごとにデータをグループ化し、COUNTAVGなどの関数で集計を行います。HAVING句は、グループ化された結果に対するフィルタリング条件を指定します。

try:
    with sqlite3.connect('company.db') as conn:
        cursor = conn.cursor()
        
        # 部署ごとの平均スコアを計算し、平均点が80点以上の部署のみを表示
        stats_sql = """
        SELECT department, AVG(score) as avg_score, COUNT(*) as count
        FROM employees
        GROUP BY department
        HAVING avg_score >= 80
        ORDER BY avg_score DESC
        """
        cursor.execute(stats_sql)
        
        results = cursor.fetchall()
        for dept, avg, cnt in results:
            print(f"部署: {dept}, 平均点: {avg:.2f}, 人数: {cnt}人")

except sqlite3.Error as e:
    print(f"集計エラー: {e}")

複雑な検索条件(LIKE, IN, BETWEEN)

文字列の部分一致検索にはLIKE、範囲指定にはBETWEEN、リスト内の値との照合にはINを使用します。

try:
    with sqlite3.connect('company.db') as conn:
        cursor = conn.cursor()
        
        # 名前が'田'で始まる社員を検索(ワイルドカード %)
        cursor.execute("SELECT * FROM employees WHERE name LIKE '田%'")
        print("【名字が田の社員】", cursor.fetchall())
        
        # IDが102から104の範囲の社員
        cursor.execute("SELECT * FROM employees WHERE employee_id BETWEEN 102 AND 104")
        print("【ID範囲検索】", cursor.fetchall())

except sqlite3.Error as e:
    print(f"検索エラー: {e}")

実践例:TkinterとSQLiteを連携した抽選アプリケーション

データベースに格納された名前を読み込み、GUIアプリケーションでランダムに表示するプログラムを作成します。tkinterを使用してUIを構築し、afterメソッドでアニメーション効果を実現します。

import sqlite3
import random
import tkinter as tk
from tkinter import messagebox

class LotteryApp:
    def __init__(self, root):
        self.root = root
        self.root.title("社員抽選システム")
        self.root.geometry("400x250")
        
        self.is_running = False
        self.candidates = []
        self.load_candidates()
        
        # ラベルの設定
        self.label = tk.Label(root, text="準備中...", font=("Helvetica", 24), fg="blue")
        self.label.pack(pady=40)
        
        # ボタンフレーム
        btn_frame = tk.Frame(root)
        btn_frame.pack()
        
        self.start_btn = tk.Button(btn_frame, text="開始", command=self.start_lottery, width=10, bg="#dddddd")
        self.start_btn.pack(side=tk.LEFT, padx=20)
        
        self.stop_btn = tk.Button(btn_frame, text="停止", command=self.stop_lottery, width=10, bg="#ffdddd")
        self.stop_btn.pack(side=tk.LEFT, padx=20)
        
        # データベース接続のクローズ処理(アプリ終了時)
        self.root.protocol("WM_DELETE_WINDOW", self.on_close)

    def load_candidates(self):
        """データベースから候補者名を読み込む"""
        try:
            # データベースが存在しない場合は作成し、ダミーデータを入れる
            with sqlite3.connect('lottery.db') as conn:
                cursor = conn.cursor()
                cursor.execute("CREATE TABLE IF NOT EXISTS participants (id INTEGER PRIMARY KEY, name TEXT)")
                
                # データが0件なら初期データを投入
                count = cursor.execute("SELECT count(*) FROM participants").fetchone()[0]
                if count == 0:
                    dummy_data = [(1, '山田'), (2, '岡田'), (3, '横山'), (4, '内村'), (5, '田中')]
                    cursor.executemany("INSERT INTO participants VALUES (?, ?)", dummy_data)
                    conn.commit()
                
                # 名前リストを取得
                rows = cursor.execute("SELECT name FROM participants").fetchall()
                self.candidates = [row[0] for row in rows]
                
        except sqlite3.Error as e:
            messagebox.showerror("エラー", f"データベース接続に失敗しました: {e}")

    def start_lottery(self):
        if not self.is_running and self.candidates:
            self.is_running = True
            self.update_label()

    def stop_lottery(self):
        self.is_running = False

    def update_label(self):
        if self.is_running:
            selected = random.choice(self.candidates)
            self.label.config(text=selected)
            # 50ミリ秒後に再帰呼び出し
            self.root.after(50, self.update_label)

    def on_close(self):
        # アプリケーション終了時の処理
        self.root.destroy()

if __name__ == "__main__":
    root = tk.Tk()
    app = LotteryApp(root)
    root.mainloop()

このコードでは、LotteryAppクラスを定義し、データベースとの接続管理とUIのロジックを分離しています。起動時にlottery.dbを確認し、存在しない場合は自動的にテーブルとサンプルデータを作成するため、そのまま実行して動作を確認できます。

5月13日 09:51 投稿