本稿では、PythonのGUIフレームワークであるPyQt5、オーディオ再生ライブラリであるPygame、そしてウェブからのコンテンツ取得に利用するurllib (またはrequests) を組み合わせて、オンライン音楽プレイヤーを構築する方法について解説します。
ユーザーインターフェースの応答性を保ちながら、楽曲情報の検索、ダウンロード、そして再生といった処理を非同期で行うために、マルチスレッドプログラミングを積極的に活用します。
1. 楽曲情報の非同期検索
UIのフリーズを防ぐため、楽曲の検索やメタデータ取得はメインスレッドとは別のワーカースレッドで行います。ここでは、検索クエリに基づいて楽曲のタイトル、アーティスト名、ダウンロードURLを取得するSongSearchWorkerクラスを定義します。実際のウェブスクレイピングやAPIリクエストは、このスレッド内で実行されます。
ここでは簡略化のため、架空のデータベースから情報を取得する形式で実装します。実際のアプリケーションでは、requestsやBeautifulSoupなどを用いて特定の音楽サービスから情報を取得することになります。
from PyQt5.QtCore import QThread, pyqtSignal
import time
import random
# 検索結果のシミュレーションデータ(実際にはWeb APIやスクレイピングから取得)
# 例として、特定のキーワードで検索された楽曲リストを模擬します。
SIMULATED_MUSIC_DB = {
"ポップ": [
{"title": "夢のメロディ", "artist": "アーティストA", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-1.mp3"},
{"title": "輝くステップ", "artist": "バンドB", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-2.mp3"},
],
"ロック": [
{"title": "エナジーソング", "artist": "ロッカーズC", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-3.mp3"},
{"title": "魂の叫び", "artist": "ザ・ギタリストD", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-4.mp3"},
],
"テスト": [ # デモ用に固定のURLを用意
{"title": "デモ楽曲1", "artist": "テストアーティスト", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-1.mp3"},
{"title": "デモ楽曲2", "artist": "テストアーティスト", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-2.mp3"},
{"title": "デモ楽曲3", "artist": "テストアーティスト", "url": "https://www.soundhelix.com/examples/mp3/SoundHelix-Song-3.mp3"},
]
}
class SongSearchWorker(QThread):
# 検索完了時に楽曲リスト(辞書型)をemitするシグナル
search_finished = pyqtSignal(list)
# エラー発生時にエラーメッセージをemitするシグナル
search_failed = pyqtSignal(str)
def __init__(self, query_text="", parent=None):
super().__init__(parent)
self.query_text = query_text
def run(self):
try:
print(f"楽曲を検索中: {self.query_text}...")
time.sleep(1.5) # ネットワーク遅延をシミュレート
results = []
normalized_query = self.query_text.lower()
if normalized_query in SIMULATED_MUSIC_DB:
results = SIMULATED_MUSIC_DB[normalized_query]
else:
# 検索結果がない場合、デモ楽曲を提供
print(f"クエリ '{self.query_text}' に直接一致する結果が見つかりません。デモ楽曲を表示します。")
results = SIMULATED_MUSIC_DB["テスト"]
# 処理された結果をemit
self.search_finished.emit(results)
print("検索が完了しました。")
except Exception as e:
self.search_failed.emit(f"検索中にエラーが発生しました: {e}")
print(f"検索エラー: {e}")
2. 楽曲ファイルのダウンロード処理
楽曲のダウンロードも、UIをブロックしないように別のスレッドで行います。MusicDownloaderクラスは、指定されたURLからMP3ファイルをダウンロードし、進行状況をUIに通知するためのシグナルを提供します。
import os
import urllib.request
from PyQt5.QtCore import QThread, pyqtSignal
class MusicDownloader(QThread):
# ダウンロード完了時にローカルファイルパスをemitするシグナル
download_completed = pyqtSignal(str)
# ダウンロードエラー発生時にメッセージをemitするシグナル
download_error = pyqtSignal(str)
# ダウンロード進行状況(パーセンテージ)をemitするシグナル
progress_updated = pyqtSignal(int)
def __init__(self, download_url, song_title, parent=None):
super().__init__(parent)
self.download_url = download_url
self.song_title = song_title
self.local_save_path = ""
# ダウンロードの進行状況を報告するためのコールバック関数
def _reporthook(self, block_count, block_size, total_size):
if total_size > 0:
percentage = int(block_count * block_size * 100 / total_size)
self.progress_updated.emit(percentage)
def run(self):
try:
download_directory = "downloaded_music"
os.makedirs(download_directory, exist_ok=True) # ディレクトリが存在しない場合作成
# 安全なファイル名を生成
safe_filename = "".join(c for c in self.song_title if c.isalnum() or c in (' ', '.', '_')).rstrip()
if not safe_filename:
safe_filename = "unknown_track"
self.local_save_path = os.path.join(download_directory, f"{safe_filename}.mp3")
print(f"楽曲をダウンロード中: {self.download_url} -> {self.local_save_path}")
# urllib.request.urlretrieveを使用してファイルをダウンロード
urllib.request.urlretrieve(self.download_url, self.local_save_path, reporthook=self._reporthook)
print("ダウンロードが完了しました。")
self.download_completed.emit(self.local_save_path)
except Exception as e:
error_msg = f"ダウンロード中にエラーが発生しました: {e}"
self.download_error.emit(error_msg)
print(f"ダウンロードエラー: {e}")
3. プレイヤーの再生制御ロジック
PyQt5のGUIとPygameのオーディオミキサーを連携させ、楽曲の再生を制御します。ここでは、リスト選択、再生/一時停止、次/前の楽曲への移動、再生モードの変更などの機能について説明します。
import pygame
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QLineEdit, QListWidget, QLabel
from PyQt5.QtCore import Qt, QTimer, QObject, pyqtSignal
import qtawesome # アイコン表示用ライブラリ
# ... (SongSearchWorker と MusicDownloader クラスの定義がここにあるものとします)
class MusicPlayer(QMainWindow):
current_playlist_data = [] # 楽曲情報のリスト: [{"title": "", "artist": "", "url": ""}, ...]
current_track_index = -1
is_playback_paused = False
is_download_active = False
playback_sequence_mode = "sequential" # "sequential", "random", "repeat_one"
def __init__(self):
super().__init__()
self.setWindowTitle("オンライン音楽プレイヤー")
self.setGeometry(100, 100, 800, 600)
pygame.mixer.init() # Pygameミキサーの初期化
self.setup_ui_components() # UIコンポーネントのセットアップ
self.connect_signals_slots() # シグナルとスロットの接続
self.start_playback_monitor() # 再生監視スレッドの開始
self.update_playback_mode_icon() # 初期再生モードのアイコンを設定
def setup_ui_components(self):
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# 検索入力とボタン
search_layout = QHBoxLayout()
self.search_input_field = QLineEdit()
self.search_input_field.setPlaceholderText("楽曲名またはキーワードを入力...")
self.search_button = QPushButton("検索")
search_layout.addWidget(self.search_input_field)
search_layout.addWidget(self.search_button)
main_layout.addLayout(search_layout)
# 楽曲リスト表示
self.music_list_widget = QListWidget()
main_layout.addWidget(self.music_list_widget)
# プレイヤーコントロール
control_layout = QHBoxLayout()
self.prev_button = QPushButton(qtawesome.icon('fa.backward', color='white', font=18), "")
self.play_pause_button = QPushButton(qtawesome.icon('fa.play', color='white', font=18), "")
self.next_button = QPushButton(qtawesome.icon('fa.forward', color='white', font=18), "")
self.mode_button = QPushButton(qtawesome.icon('fa.list', color='white', font=18), "") # デフォルトはシーケンシャル
control_layout.addStretch()
control_layout.addWidget(self.prev_button)
control_layout.addWidget(self.play_pause_button)
control_layout.addWidget(self.next_button)
control_layout.addWidget(self.mode_button)
control_layout.addStretch()
main_layout.addLayout(control_layout)
# ステータス表示
self.status_display_label = QLabel("準備完了")
self.status_display_label.setAlignment(Qt.AlignCenter)
main_layout.addWidget(self.status_display_label)
def connect_signals_slots(self):
self.search_button.clicked.connect(self.initiate_search)
self.music_list_widget.itemDoubleClicked.connect(self.play_selected_track)
self.play_pause_button.clicked.connect(self.toggle_playback_pause)
self.next_button.clicked.connect(self.play_next_track)
self.prev_button.clicked.connect(self.play_previous_track)
self.mode_button.clicked.connect(self.cycle_playback_mode)
def initiate_search(self):
query = self.search_input_field.text()
self.status_display_label.setText("楽曲を検索中...")
self.search_worker_thread = SongSearchWorker(query)
self.search_worker_thread.search_finished.connect(self.display_search_results)
self.search_worker_thread.search_failed.connect(lambda msg: self.status_display_label.setText(msg))
self.search_worker_thread.start()
def display_search_results(self, results):
self.current_playlist_data = results
self.music_list_widget.clear()
if not results:
self.status_display_label.setText("検索結果が見つかりませんでした。")
return
for song_info in results:
self.music_list_widget.addItem(f"{song_info['title']} - {song_info['artist']}")
self.status_display_label.setText(f"{len(results)}件の楽曲が見つかりました。")
def play_selected_track(self, item):
self.current_track_index = self.music_list_widget.row(item)
self.start_track_playback()
def toggle_playback_pause(self):
if self.is_download_active: # ダウンロード中は操作不可
return
if pygame.mixer.music.get_busy() and not self.is_playback_paused:
pygame.mixer.music.pause()
self.is_playback_paused = True
self.play_pause_button.setIcon(qtawesome.icon('fa.play', color='white', font=18))
self.status_display_label.setText(f"一時停止中: {self.current_playlist_data[self.current_track_index]['title']}")
elif self.is_playback_paused:
pygame.mixer.music.unpause()
self.is_playback_paused = False
self.play_pause_button.setIcon(qtawesome.icon('fa.pause', color='white', font=18))
self.status_display_label.setText(f"再生中: {self.current_playlist_data[self.current_track_index]['title']}")
elif not pygame.mixer.music.get_busy() and self.current_track_index != -1:
self.start_track_playback() # 再生中でない場合、現在の曲を再生開始
def start_track_playback(self):
if self.current_track_index == -1 or not self.current_playlist_data:
return
selected_song = self.current_playlist_data[self.current_track_index]
self.status_display_label.setText(f"ダウンロード中: {selected_song['title']}")
self.is_download_active = True
self.play_pause_button.setIcon(qtawesome.icon('fa.pause', color='white', font=18))
self.is_playback_paused = False # 新しい曲を再生するときは一時停止状態を解除
try:
pygame.mixer.music.stop() # 現在再生中の音楽を停止
except pygame.error:
pass # ミキサーが初期化されていないか、音楽がロードされていない場合
# 既存のダウンローダースレッドがあれば停止
if hasattr(self, 'download_worker_thread') and self.download_worker_thread.isRunning():
self.download_worker_thread.quit()
# ダウンロードを開始
self.download_worker_thread = MusicDownloader(selected_song['url'], selected_song['title'])
self.download_worker_thread.download_completed.connect(self.on_download_success)
self.download_worker_thread.download_error.connect(self.on_download_failure)
self.download_worker_thread.progress_updated.connect(self.update_download_progress_ui)
self.download_worker_thread.start()
def on_download_success(self, local_filepath):
self.is_download_active = False
current_song_info = self.current_playlist_data[self.current_track_index]
self.status_display_label.setText(f"再生中: {current_song_info['title']}")
try:
pygame.mixer.music.load(local_filepath)
pygame.mixer.music.play()
except pygame.error as e:
self.status_display_label.setText(f"再生エラー: {e}")
print(f"Pygame再生エラー: {e}")
def on_download_failure(self, message):
self.is_download_active = False
self.status_display_label.setText(f"ダウンロードエラー: {message}")
self.play_pause_button.setIcon(qtawesome.icon('fa.play', color='white', font=18))
def update_download_progress_ui(self, progress):
self.status_display_label.setText(f"ダウンロード中... {progress}%")
def play_next_track(self):
if not self.current_playlist_data: return
if self.playback_sequence_mode == "random":
self.current_track_index = random.randint(0, len(self.current_playlist_data) - 1)
elif self.playback_sequence_mode == "sequential":
self.current_track_index = (self.current_track_index + 1) % len(self.current_playlist_data)
elif self.playback_sequence_mode == "repeat_one":
pass # 同じ曲を繰り返すため、インデックスは変更しない
self.start_track_playback()
def play_previous_track(self):
if not self.current_playlist_data: return
if self.playback_sequence_mode == "random":
self.current_track_index = random.randint(0, len(self.current_playlist_data) - 1)
else: # sequential or repeat_one
self.current_track_index = (self.current_track_index - 1 + len(self.current_playlist_data)) % len(self.current_playlist_data)
self.start_track_playback()
def cycle_playback_mode(self):
modes = ["sequential", "random", "repeat_one"]
current_mode_idx = modes.index(self.playback_sequence_mode)
self.playback_sequence_mode = modes[(current_mode_idx + 1) % len(modes)]
self.update_playback_mode_icon()
self.status_display_label.setText(f"再生モード: {self.get_japanese_mode_name(self.playback_sequence_mode)}")
def get_japanese_mode_name(self, mode):
if mode == "sequential": return "シーケンシャル再生"
if mode == "random": return "ランダム再生"
if mode == "repeat_one": return "1曲リピート"
return ""
def update_playback_mode_icon(self):
if self.playback_sequence_mode == "sequential":
self.mode_button.setIcon(qtawesome.icon('fa.list', color='white', font=18))
elif self.playback_sequence_mode == "random":
self.mode_button.setIcon(qtawesome.icon('fa.random', color='white', font=18))
elif self.playback_sequence_mode == "repeat_one":
self.mode_button.setIcon(qtawesome.icon('fa.retweet', color='white', font=18))
def start_playback_monitor(self):
# 楽曲再生状態を監視するワーカースレッドを起動
self.monitor_thread = QThread()
self.monitor_worker = PlaybackMonitorWorker(self)
self.monitor_worker.moveToThread(self.monitor_thread)
self.monitor_thread.started.connect(self.monitor_worker.run)
self.monitor_worker.track_finished_signal.connect(self.handle_track_finished)
self.monitor_thread.start()
def handle_track_finished(self):
# 楽曲が自然に終了した場合の処理
if not self.is_playback_paused and not self.is_download_active:
if self.playback_sequence_mode == "sequential":
self.play_next_track()
elif self.playback_sequence_mode == "random":
self.play_next_track() # ランダムモードでも次の曲へ
elif self.playback_sequence_mode == "repeat_one":
self.start_track_playback() # 同じ曲をリピート再生
# バックグラウンドで再生状態を監視するワーカースレッド
class PlaybackMonitorWorker(QObject):
track_finished_signal = pyqtSignal()
def __init__(self, player_ref, parent=None):
super().__init__(parent)
self.player_ref = player_ref # MusicPlayerインスタンスへの参照
self._is_running = True
def run(self):
while self._is_running:
try:
# Pygameミキサーが再生中ではなく、一時停止でもなく、ダウンロード中でもない場合
if not pygame.mixer.music.get_busy() and not self.player_ref.is_playback_paused and not self.player_ref.is_download_active:
# current_track_indexが設定されており、かつget_pos()が-1(再生終了)の場合
if self.player_ref.current_track_index != -1 and pygame.mixer.music.get_pos() == -1:
self.track_finished_signal.emit() # 楽曲終了シグナルを発行
time.sleep(1) # 1秒ごとにチェック
except Exception as e:
print(f"再生監視スレッドエラー: {e}")
time.sleep(1)
def stop(self):
self._is_running = False
# アプリケーションのエントリポイント
if __name__ == '__main__':
app = QApplication([])
player = MusicPlayer()
player.show()
app.exec_()