PyQt5とPygameを用いたWeb音楽プレイヤーの実装:楽曲取得と再生制御

本稿では、PythonのGUIフレームワークであるPyQt5、オーディオ再生ライブラリであるPygame、そしてウェブからのコンテンツ取得に利用するurllib (またはrequests) を組み合わせて、オンライン音楽プレイヤーを構築する方法について解説します。

ユーザーインターフェースの応答性を保ちながら、楽曲情報の検索、ダウンロード、そして再生といった処理を非同期で行うために、マルチスレッドプログラミングを積極的に活用します。

1. 楽曲情報の非同期検索

UIのフリーズを防ぐため、楽曲の検索やメタデータ取得はメインスレッドとは別のワーカースレッドで行います。ここでは、検索クエリに基づいて楽曲のタイトル、アーティスト名、ダウンロードURLを取得するSongSearchWorkerクラスを定義します。実際のウェブスクレイピングやAPIリクエストは、このスレッド内で実行されます。

ここでは簡略化のため、架空のデータベースから情報を取得する形式で実装します。実際のアプリケーションでは、requestsBeautifulSoupなどを用いて特定の音楽サービスから情報を取得することになります。

from PyQt5.QtCore import QThread, pyqtSignal
import time
import random

# 検索結果のシミュレーションデータ(実際にはWeb APIやスクレイピングから取得)
# 例として、特定のキーワードで検索された楽曲リストを模擬します。
SIMULATED_MUSIC_DB = {
    "ポップ": [
        {"title": "夢のメロディ", "artist": "アーティストA", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-1.mp3"},
        {"title": "輝くステップ", "artist": "バンドB", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-2.mp3"},
    ],
    "ロック": [
        {"title": "エナジーソング", "artist": "ロッカーズC", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-3.mp3"},
        {"title": "魂の叫び", "artist": "ザ・ギタリストD", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-4.mp3"},
    ],
    "テスト": [ # デモ用に固定のURLを用意
        {"title": "デモ楽曲1", "artist": "テストアーティスト", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-1.mp3"},
        {"title": "デモ楽曲2", "artist": "テストアーティスト", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-2.mp3"},
        {"title": "デモ楽曲3", "artist": "テストアーティスト", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-3.mp3"},
    ]
}

class SongSearchWorker(QThread):
    # 検索完了時に楽曲リスト(辞書型)をemitするシグナル
    search_finished = pyqtSignal(list)
    # エラー発生時にエラーメッセージをemitするシグナル
    search_failed = pyqtSignal(str)

    def __init__(self, query_text="", parent=None):
        super().__init__(parent)
        self.query_text = query_text

    def run(self):
        try:
            print(f"楽曲を検索中: {self.query_text}...")
            time.sleep(1.5) # ネットワーク遅延をシミュレート

            results = []
            normalized_query = self.query_text.lower()
            if normalized_query in SIMULATED_MUSIC_DB:
                results = SIMULATED_MUSIC_DB[normalized_query]
            else:
                # 検索結果がない場合、デモ楽曲を提供
                print(f"クエリ '{self.query_text}' に直接一致する結果が見つかりません。デモ楽曲を表示します。")
                results = SIMULATED_MUSIC_DB["テスト"]

            # 処理された結果をemit
            self.search_finished.emit(results)
            print("検索が完了しました。")
        except Exception as e:
            self.search_failed.emit(f"検索中にエラーが発生しました: {e}")
            print(f"検索エラー: {e}")

2. 楽曲ファイルのダウンロード処理

楽曲のダウンロードも、UIをブロックしないように別のスレッドで行います。MusicDownloaderクラスは、指定されたURLからMP3ファイルをダウンロードし、進行状況をUIに通知するためのシグナルを提供します。

import os
import urllib.request
from PyQt5.QtCore import QThread, pyqtSignal

class MusicDownloader(QThread):
    # ダウンロード完了時にローカルファイルパスをemitするシグナル
    download_completed = pyqtSignal(str)
    # ダウンロードエラー発生時にメッセージをemitするシグナル
    download_error = pyqtSignal(str)
    # ダウンロード進行状況(パーセンテージ)をemitするシグナル
    progress_updated = pyqtSignal(int)

    def __init__(self, download_url, song_title, parent=None):
        super().__init__(parent)
        self.download_url = download_url
        self.song_title = song_title
        self.local_save_path = ""

    # ダウンロードの進行状況を報告するためのコールバック関数
    def _reporthook(self, block_count, block_size, total_size):
        if total_size > 0:
            percentage = int(block_count * block_size * 100 / total_size)
            self.progress_updated.emit(percentage)

    def run(self):
        try:
            download_directory = "downloaded_music"
            os.makedirs(download_directory, exist_ok=True) # ディレクトリが存在しない場合作成

            # 安全なファイル名を生成
            safe_filename = "".join(c for c in self.song_title if c.isalnum() or c in (' ', '.', '_')).rstrip()
            if not safe_filename:
                safe_filename = "unknown_track"
            self.local_save_path = os.path.join(download_directory, f"{safe_filename}.mp3")

            print(f"楽曲をダウンロード中: {self.download_url} -> {self.local_save_path}")
            # urllib.request.urlretrieveを使用してファイルをダウンロード
            urllib.request.urlretrieve(self.download_url, self.local_save_path, reporthook=self._reporthook)
            print("ダウンロードが完了しました。")
            self.download_completed.emit(self.local_save_path)
        except Exception as e:
            error_msg = f"ダウンロード中にエラーが発生しました: {e}"
            self.download_error.emit(error_msg)
            print(f"ダウンロードエラー: {e}")

3. プレイヤーの再生制御ロジック

PyQt5のGUIとPygameのオーディオミキサーを連携させ、楽曲の再生を制御します。ここでは、リスト選択、再生/一時停止、次/前の楽曲への移動、再生モードの変更などの機能について説明します。

import pygame
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit, QListWidget, QLabel
from PyQt5.QtCore import Qt, QTimer, QObject, pyqtSignal
import qtawesome # アイコン表示用ライブラリ

# ... (SongSearchWorker と MusicDownloader クラスの定義がここにあるものとします)

class MusicPlayer(QMainWindow):
    current_playlist_data = [] # 楽曲情報のリスト: [{"title": "", "artist": "", "url": ""}, ...]
    current_track_index = -1
    is_playback_paused = False
    is_download_active = False
    playback_sequence_mode = "sequential" # "sequential", "random", "repeat_one"

    def __init__(self):
        super().__init__()
        self.setWindowTitle("オンライン音楽プレイヤー")
        self.setGeometry(100, 100, 800, 600)

        pygame.mixer.init() # Pygameミキサーの初期化

        self.setup_ui_components() # UIコンポーネントのセットアップ
        self.connect_signals_slots() # シグナルとスロットの接続
        self.start_playback_monitor() # 再生監視スレッドの開始

        self.update_playback_mode_icon() # 初期再生モードのアイコンを設定

    def setup_ui_components(self):
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        main_layout = QVBoxLayout(central_widget)

        # 検索入力とボタン
        search_layout = QHBoxLayout()
        self.search_input_field = QLineEdit()
        self.search_input_field.setPlaceholderText("楽曲名またはキーワードを入力...")
        self.search_button = QPushButton("検索")
        search_layout.addWidget(self.search_input_field)
        search_layout.addWidget(self.search_button)
        main_layout.addLayout(search_layout)

        # 楽曲リスト表示
        self.music_list_widget = QListWidget()
        main_layout.addWidget(self.music_list_widget)

        # プレイヤーコントロール
        control_layout = QHBoxLayout()
        self.prev_button = QPushButton(qtawesome.icon('fa.backward', color='white', font=18), "")
        self.play_pause_button = QPushButton(qtawesome.icon('fa.play', color='white', font=18), "")
        self.next_button = QPushButton(qtawesome.icon('fa.forward', color='white', font=18), "")
        self.mode_button = QPushButton(qtawesome.icon('fa.list', color='white', font=18), "") # デフォルトはシーケンシャル
        
        control_layout.addStretch()
        control_layout.addWidget(self.prev_button)
        control_layout.addWidget(self.play_pause_button)
        control_layout.addWidget(self.next_button)
        control_layout.addWidget(self.mode_button)
        control_layout.addStretch()
        main_layout.addLayout(control_layout)

        # ステータス表示
        self.status_display_label = QLabel("準備完了")
        self.status_display_label.setAlignment(Qt.AlignCenter)
        main_layout.addWidget(self.status_display_label)

    def connect_signals_slots(self):
        self.search_button.clicked.connect(self.initiate_search)
        self.music_list_widget.itemDoubleClicked.connect(self.play_selected_track)
        self.play_pause_button.clicked.connect(self.toggle_playback_pause)
        self.next_button.clicked.connect(self.play_next_track)
        self.prev_button.clicked.connect(self.play_previous_track)
        self.mode_button.clicked.connect(self.cycle_playback_mode)

    def initiate_search(self):
        query = self.search_input_field.text()
        self.status_display_label.setText("楽曲を検索中...")
        self.search_worker_thread = SongSearchWorker(query)
        self.search_worker_thread.search_finished.connect(self.display_search_results)
        self.search_worker_thread.search_failed.connect(lambda msg: self.status_display_label.setText(msg))
        self.search_worker_thread.start()

    def display_search_results(self, results):
        self.current_playlist_data = results
        self.music_list_widget.clear()
        if not results:
            self.status_display_label.setText("検索結果が見つかりませんでした。")
            return

        for song_info in results:
            self.music_list_widget.addItem(f"{song_info['title']} - {song_info['artist']}")
        self.status_display_label.setText(f"{len(results)}件の楽曲が見つかりました。")

    def play_selected_track(self, item):
        self.current_track_index = self.music_list_widget.row(item)
        self.start_track_playback()

    def toggle_playback_pause(self):
        if self.is_download_active: # ダウンロード中は操作不可
            return

        if pygame.mixer.music.get_busy() and not self.is_playback_paused:
            pygame.mixer.music.pause()
            self.is_playback_paused = True
            self.play_pause_button.setIcon(qtawesome.icon('fa.play', color='white', font=18))
            self.status_display_label.setText(f"一時停止中: {self.current_playlist_data[self.current_track_index]['title']}")
        elif self.is_playback_paused:
            pygame.mixer.music.unpause()
            self.is_playback_paused = False
            self.play_pause_button.setIcon(qtawesome.icon('fa.pause', color='white', font=18))
            self.status_display_label.setText(f"再生中: {self.current_playlist_data[self.current_track_index]['title']}")
        elif not pygame.mixer.music.get_busy() and self.current_track_index != -1:
             self.start_track_playback() # 再生中でない場合、現在の曲を再生開始

    def start_track_playback(self):
        if self.current_track_index == -1 or not self.current_playlist_data:
            return

        selected_song = self.current_playlist_data[self.current_track_index]
        self.status_display_label.setText(f"ダウンロード中: {selected_song['title']}")
        self.is_download_active = True
        self.play_pause_button.setIcon(qtawesome.icon('fa.pause', color='white', font=18))
        self.is_playback_paused = False # 新しい曲を再生するときは一時停止状態を解除

        try:
            pygame.mixer.music.stop() # 現在再生中の音楽を停止
        except pygame.error:
            pass # ミキサーが初期化されていないか、音楽がロードされていない場合

        # 既存のダウンローダースレッドがあれば停止
        if hasattr(self, 'download_worker_thread') and self.download_worker_thread.isRunning():
            self.download_worker_thread.quit()

        # ダウンロードを開始
        self.download_worker_thread = MusicDownloader(selected_song['url'], selected_song['title'])
        self.download_worker_thread.download_completed.connect(self.on_download_success)
        self.download_worker_thread.download_error.connect(self.on_download_failure)
        self.download_worker_thread.progress_updated.connect(self.update_download_progress_ui)
        self.download_worker_thread.start()

    def on_download_success(self, local_filepath):
        self.is_download_active = False
        current_song_info = self.current_playlist_data[self.current_track_index]
        self.status_display_label.setText(f"再生中: {current_song_info['title']}")
        try:
            pygame.mixer.music.load(local_filepath)
            pygame.mixer.music.play()
        except pygame.error as e:
            self.status_display_label.setText(f"再生エラー: {e}")
            print(f"Pygame再生エラー: {e}")

    def on_download_failure(self, message):
        self.is_download_active = False
        self.status_display_label.setText(f"ダウンロードエラー: {message}")
        self.play_pause_button.setIcon(qtawesome.icon('fa.play', color='white', font=18))

    def update_download_progress_ui(self, progress):
        self.status_display_label.setText(f"ダウンロード中... {progress}%")

    def play_next_track(self):
        if not self.current_playlist_data: return
        
        if self.playback_sequence_mode == "random":
            self.current_track_index = random.randint(0, len(self.current_playlist_data) - 1)
        elif self.playback_sequence_mode == "sequential":
            self.current_track_index = (self.current_track_index + 1) % len(self.current_playlist_data)
        elif self.playback_sequence_mode == "repeat_one":
            pass # 同じ曲を繰り返すため、インデックスは変更しない

        self.start_track_playback()

    def play_previous_track(self):
        if not self.current_playlist_data: return
        
        if self.playback_sequence_mode == "random":
            self.current_track_index = random.randint(0, len(self.current_playlist_data) - 1)
        else: # sequential or repeat_one
            self.current_track_index = (self.current_track_index - 1 + len(self.current_playlist_data)) % len(self.current_playlist_data)
        self.start_track_playback()

    def cycle_playback_mode(self):
        modes = ["sequential", "random", "repeat_one"]
        current_mode_idx = modes.index(self.playback_sequence_mode)
        self.playback_sequence_mode = modes[(current_mode_idx + 1) % len(modes)]
        self.update_playback_mode_icon()
        self.status_display_label.setText(f"再生モード: {self.get_japanese_mode_name(self.playback_sequence_mode)}")

    def get_japanese_mode_name(self, mode):
        if mode == "sequential": return "シーケンシャル再生"
        if mode == "random": return "ランダム再生"
        if mode == "repeat_one": return "1曲リピート"
        return ""

    def update_playback_mode_icon(self):
        if self.playback_sequence_mode == "sequential":
            self.mode_button.setIcon(qtawesome.icon('fa.list', color='white', font=18))
        elif self.playback_sequence_mode == "random":
            self.mode_button.setIcon(qtawesome.icon('fa.random', color='white', font=18))
        elif self.playback_sequence_mode == "repeat_one":
            self.mode_button.setIcon(qtawesome.icon('fa.retweet', color='white', font=18))

    def start_playback_monitor(self):
        # 楽曲再生状態を監視するワーカースレッドを起動
        self.monitor_thread = QThread()
        self.monitor_worker = PlaybackMonitorWorker(self)
        self.monitor_worker.moveToThread(self.monitor_thread)
        self.monitor_thread.started.connect(self.monitor_worker.run)
        self.monitor_worker.track_finished_signal.connect(self.handle_track_finished)
        self.monitor_thread.start()

    def handle_track_finished(self):
        # 楽曲が自然に終了した場合の処理
        if not self.is_playback_paused and not self.is_download_active:
            if self.playback_sequence_mode == "sequential":
                self.play_next_track()
            elif self.playback_sequence_mode == "random":
                self.play_next_track() # ランダムモードでも次の曲へ
            elif self.playback_sequence_mode == "repeat_one":
                self.start_track_playback() # 同じ曲をリピート再生

# バックグラウンドで再生状態を監視するワーカースレッド
class PlaybackMonitorWorker(QObject):
    track_finished_signal = pyqtSignal()

    def __init__(self, player_ref, parent=None):
        super().__init__(parent)
        self.player_ref = player_ref # MusicPlayerインスタンスへの参照
        self._is_running = True

    def run(self):
        while self._is_running:
            try:
                # Pygameミキサーが再生中ではなく、一時停止でもなく、ダウンロード中でもない場合
                if not pygame.mixer.music.get_busy() and not self.player_ref.is_playback_paused and not self.player_ref.is_download_active:
                    # current_track_indexが設定されており、かつget_pos()が-1(再生終了)の場合
                    if self.player_ref.current_track_index != -1 and pygame.mixer.music.get_pos() == -1:
                        self.track_finished_signal.emit() # 楽曲終了シグナルを発行
                time.sleep(1) # 1秒ごとにチェック
            except Exception as e:
                print(f"再生監視スレッドエラー: {e}")
                time.sleep(1)

    def stop(self):
        self._is_running = False

# アプリケーションのエントリポイント
if __name__ == '__main__':
    app = QApplication([])
    player = MusicPlayer()
    player.show()
    app.exec_()

タグ: PyQt5 pygame Python Web Scraping Multi-threading

5月15日 08:03 投稿