スクレイピング対象の分析と実装方針
Webサイトから動画コンテンツを収集するPythonスクリプトを実装します。対象サイトの構造変化に対応するため、リクエスト例外処理を強化しています。
シングルスレッド版実装
# coding: utf-8
import re
import requests
import hashlib
import time
import os
REQUEST_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'http://www.example.com'
}
def fetch_page_content(target_url):
try:
response = requests.get(target_url, headers=REQUEST_HEADERS, timeout=10)
response.raise_for_status()
return response.text
except requests.RequestException as error:
print(f"ページ取得エラー: {error}")
return None
def extract_video_links(page_html):
pattern = r'class="content".*?href="(.*?)"'
found_links = re.findall(pattern, page_html, re.S)
return found_links
def process_video_links(link_collection):
for link in link_collection:
if not link.startswith('http'):
full_url = f'http://www.example.com{link}'
else:
full_url = link
page_content = fetch_page_content(full_url)
if page_content:
video_pattern = r'id="player".*?src="(.*?)"'
video_matches = re.findall(video_pattern, page_content, re.S)
if video_matches:
download_video_file(video_matches[0])
DOWNLOAD_DIR = os.path.join(os.getcwd(), 'downloaded_videos')
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
def download_video_file(video_url):
try:
video_response = requests.get(video_url, headers=REQUEST_HEADERS, stream=True, timeout=30)
video_response.raise_for_status()
except requests.RequestException as e:
print(f"動画取得失敗: {e}")
return
hash_generator = hashlib.md5()
hash_generator.update(video_url.encode())
hash_generator.update(str(time.time_ns()).encode())
file_name = f"{hash_generator.hexdigest()}.mp4"
full_path = os.path.join(DOWNLOAD_DIR, file_name)
with open(full_path, 'wb') as output_file:
for chunk in video_response.iter_content(chunk_size=8192):
output_file.write(chunk)
print(f"保存完了: {full_path}")
def execute_crawling():
for page_num in range(1, 6):
page_url = f'http://www.example.com/list-{page_num}.html'
html_content = fetch_page_content(page_url)
if html_content:
links = extract_video_links(html_content)
process_video_links(links)
if __name__ == '__main__':
execute_crawling()
マルチスレッド版実装
# coding: utf-8
import requests
import re
import os
import hashlib
import time
from concurrent.futures import ThreadPoolExecutor
MAX_WORKERS = 20
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
HEADER_CONFIG = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def retrieve_page_data(url_address):
try:
result = requests.get(url_address, headers=HEADER_CONFIG, timeout=15)
result.raise_for_status()
return result.text
except requests.RequestException as e:
print(f"リクエストエラー: {e}")
return None
def parse_page_content(future_result):
content_text = future_result.result()
if not content_text:
return
link_pattern = r'class="item".*?href="(.*?)"'
detected_links = re.findall(link_pattern, content_text, re.S)
for video_link in detected_links:
executor.submit(handle_video_download, video_link)
def handle_video_download(link):
if not link.startswith('http'):
complete_url = f'http://www.example.com{link}'
else:
complete_url = link
page_data = retrieve_page_data(complete_url)
if page_data:
source_pattern = r'id="video".*?src="(.*?)"'
source_matches = re.findall(source_pattern, page_data, re.S)
if source_matches:
store_video_content(source_matches[0])
OUTPUT_FOLDER = os.path.join(os.getcwd(), 'multithread_videos')
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
def store_video_content(content_url):
try:
video_data = requests.get(content_url, headers=HEADER_CONFIG, timeout=25)
video_data.raise_for_status()
except requests.RequestException as e:
print(f"動画ダウンロードエラー: {e}")
return
hasher = hashlib.md5()
hasher.update(content_url.encode())
hasher.update(str(time.time_ns()).encode())
output_filename = f"{hasher.hexdigest()}.mp4"
file_path = os.path.join(OUTPUT_FOLDER, output_filename)
with open(file_path, 'wb') as file:
file.write(video_data.content)
print(f"ダウンロード済み: {file_path}")
def main_process():
for index in range(1, 6):
target_url = f'http://www.example.com/page-{index}.html'
future = executor.submit(retrieve_page_data, target_url)
future.add_done_callback(parse_page_content)
if __name__ == '__main__':
main_process()