深層偽造技術の概要
深層偽造(Deepfake)は、特に映像と音声において、人工知能技術を用いて生成された偽造メディアを指します。深層学習アルゴリズム、特に敵対的生成ネットワーク(GAN)を活用し、実データの特徴を学習して新たな迫真的なデータを生成します。
この技術は様々な分野で革新的な可能性を示す一方、悪用による深刻な危険性も存在します。政治的には虚偽情報の拡散や世論操作、経済的には企業イメージの毀損や市場混乱、法的には偽造証拠による司法判断の誤りなど、多岐にわたる影響が懸念されています。
深層偽造検出コンペティションの背景
- 背景: 深層偽造技術の急速な進展に伴い、偽造画像検出モデルの開発と最適化が求められています
- 課題: 人物画像が深層偽造であるかどうかを判定し、その確率スコアを出力すること
データセットの構成
- 訓練セットと検証セット: 動画ファイルと対応するラベルを提供(0: 実画像、1: 深層偽造)
実装プロセスの詳細
1. ファイル行数の確認
!wc -l /kaggle/input/ffdv-sample-dataset/ffdv_phase1_sample/train_label.txt
!wc -l /kaggle/input/ffdv-sample-dataset/ffdv_phase1_sample/val_label.txt
2. 動画表示機能
from IPython.display import Video
Video("/kaggle/input/ffdv-sample-dataset/ffdv_phase1_sample/valset/00882a2832edbcab1d3dfc4cc62cfbb9.mp4", embed=True)
3. 必要なライブラリのインストール
!pip install moviepy librosa matplotlib numpy timm
4. ライブラリのインポートと設定
import torch
torch.manual_seed(0)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data.dataset import Dataset
import timm
import time
import pandas as pd
import numpy as np
import cv2, glob, os
from PIL import Image
import moviepy.editor as mp
import librosa
5. MELスペクトログラム生成関数
def create_mel_spectrogram(video_file, n_mels=128, fmax=8000, output_size=(256, 256)):
# 音声抽出
audio_file = 'temp_audio.wav'
video_clip = mp.VideoFileClip(video_file)
video_clip.audio.write_audiofile(audio_file, verbose=False, logger=None)
# 音声データ読み込み
audio_data, sample_rate = librosa.load(audio_file)
# MELスペクトログラム生成
mel_spectrogram = librosa.feature.melspectrogram(y=audio_data, sr=sample_rate, n_mels=n_mels)
# dB単位への変換
mel_db = librosa.power_to_db(mel_spectrogram, ref=np.max)
# 0-255範囲への正規化
mel_normalized = cv2.normalize(mel_db, None, 0, 255, cv2.NORM_MINMAX)
# データ型変換
mel_normalized = mel_normalized.astype(np.uint8)
# リサイズ処理
resized_image = cv2.resize(mel_normalized, output_size, interpolation=cv2.INTER_LINEAR)
return resized_image
6. 画像生成プロセス
!mkdir ffdv_phase1_sample
!mkdir ffdv_phase1_sample/trainset
!mkdir ffdv_phase1_sample/valset
for video_file in glob.glob('/kaggle/input/ffdv-sample-dataset/ffdv_phase1_sample/trainset/*.mp4')[:400]:
mel_image = create_mel_spectrogram(video_file)
cv2.imwrite('./ffdv_phase1_sample/trainset/' + video_file.split('/')[-1][:-4] + '.jpg', mel_image)
for video_file in glob.glob('/kaggle/input/ffdv-sample-dataset/ffdv_phase1_sample/valset/*.mp4'):
mel_image = create_mel_spectrogram(video_file)
cv2.imwrite('./ffdv_phase1_sample/valset/' + video_file.split('/')[-1][:-4] + '.jpg', mel_image)
7. メトリクス管理クラス
class MetricTracker:
def __init__(self, metric_name, format_str=':f'):
self.name = metric_name
self.format = format_str
self.reset_values()
def reset_values(self):
self.current = 0
self.average = 0
self.total = 0
self.samples = 0
def update_values(self, value, count=1):
self.current = value
self.total += value * count
self.samples += count
self.average = self.total / self.samples
def __str__(self):
format_string = '{name} {current' + self.format + '} ({average' + self.format + '})'
return format_string.format(**self.__dict__)
class TrainingProgress:
def __init__(self, total_batches, *metrics):
self.batch_format = self._create_batch_format(total_batches)
self.metrics = metrics
self.header = ""
def display_progress(self, batch):
entries = [self.header + self.batch_format.format(batch)]
entries += [str(metric) for metric in self.metrics]
print('\t'.join(entries))
def _create_batch_format(self, total_batches):
digit_count = len(str(total_batches // 1))
format_pattern = '{:' + str(digit_count) + 'd}'
return '[' + format_pattern + '/' + format_pattern.format(total_batches) + ']'
8. 検証関数
def evaluate_model(validation_loader, network, loss_function):
time_tracker = MetricTracker('Time', ':6.3f')
loss_tracker = MetricTracker('Loss', ':.4e')
accuracy_tracker = MetricTracker('Accuracy', ':6.2f')
progress_display = TrainingProgress(len(validation_loader), time_tracker, loss_tracker, accuracy_tracker)
# 評価モードへの切り替え
network.eval()
with torch.no_grad():
start_time = time.time()
for batch_idx, (inputs, targets) in enumerate(validation_loader):
inputs = inputs.cuda()
targets = targets.cuda()
# 出力計算
outputs = network(inputs)
loss = loss_function(outputs, targets)
# 精度計算
batch_accuracy = (outputs.argmax(1).view(-1) == targets.float().view(-1)).float().mean() * 100
loss_tracker.update_values(loss.item(), inputs.size(0))
accuracy_tracker.update_values(batch_accuracy, inputs.size(0))
# 時間計測
time_tracker.update_values(time.time() - start_time)
start_time = time.time()
print(' * Accuracy {accuracy.avg:.3f}'.format(accuracy=accuracy_tracker))
return accuracy_tracker
9. データセットクラスの定義
class DeepfakeDataset(Dataset):
def __init__(self, image_paths, image_labels, transform_ops=None):
self.image_paths = image_paths
self.image_labels = image_labels
if transform_ops is not None:
self.transform_ops = transform_ops
else:
self.transform_ops = None
def __getitem__(self, index):
image = Image.open(self.image_paths[index]).convert('RGB')
if self.transform_ops is not None:
image = self.transform_ops(image)
return image, torch.from_numpy(np.array(self.image_labels[index]))
def __len__(self):
return len(self.image_paths)
10. データローダーの設定
train_loader = torch.utils.data.DataLoader(
DeepfakeDataset(train_label['path'].values, train_label['target'].values,
transforms.Compose([
transforms.Resize((256, 256)),
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
), batch_size=40, shuffle=True, num_workers=12, pin_memory=True
)
val_loader = torch.utils.data.DataLoader(
DeepfakeDataset(val_label['path'].values, val_label['target'].values,
transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
), batch_size=40, shuffle=False, num_workers=10, pin_memory=True
)
11. モデル構築と学習
network = timm.create_model('resnet18', pretrained=True, num_classes=2)
network = network.cuda()
loss_function = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.Adam(network.parameters(), 0.003)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.85)
best_accuracy = 0.0
for epoch in range(10):
lr_scheduler.step()
print('Epoch: ', epoch)
train(train_loader, network, loss_function, optimizer, epoch)
val_accuracy = evaluate_model(val_loader, network, loss_function)
if val_accuracy.avg.item() > best_accuracy:
best_accuracy = round(val_accuracy.avg.item(), 2)
torch.save(network.state_dict(), f'./model_{best_accuracy}.pt')
性能向上のための改善策
モデルアーキテクチャの強化
network = timm.create_model('efficientnet_b3', pretrained=True, num_classes=2)
network = network.cuda()
loss_function = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.Adam(network.parameters(), lr=0.001, weight_decay=1e-4)
lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, verbose=True)
高度なデータ拡張の適用
train_loader = torch.utils.data.DataLoader(
DeepfakeDataset(train_label['path'].values, train_label['target'].values,
transforms.Compose([
transforms.Resize((256, 256)),
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(),
transforms.RandomRotation(20),
transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.3),
transforms.RandomResizedCrop(256, scale=(0.8, 1.0)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
), batch_size=40, shuffle=True, num_workers=12, pin_memory=True
)
CutMixデータ拡張の実装
def apply_cutmix(inputs, labels, beta_param=1.0):
lambda_val = np.random.beta(beta_param, beta_param)
shuffle_indices = torch.randperm(inputs.size()[0]).cuda()
labels_a = labels
labels_b = labels[shuffle_indices]
# バウンディングボックスの生成
box_x1, box_y1, box_x2, box_y2 = generate_random_box(inputs.size(), lambda_val)
# 画像の混合
inputs[:, :, box_x1:box_x2, box_y1:box_y2] = inputs[shuffle_indices, :, box_x1:box_x2, box_y1:box_y2]
# 混合比率の調整
lambda_val = 1 - ((box_x2 - box_x1) * (box_y2 - box_y1) / (inputs.size()[-1] * inputs.size()[-2]))
return inputs, labels_a, labels_b, lambda_val
def generate_random_box(dimensions, lambda_val):
width = dimensions[2]
height = dimensions[3]
cut_ratio = np.sqrt(1. - lambda_val)
cut_width = int(width * cut_ratio)
cut_height = int(height * cut_ratio)
center_x = np.random.randint(width)
center_y = np.random.randint(height)
x1 = np.clip(center_x - cut_width // 2, 0, width)
y1 = np.clip(center_y - cut_height // 2, 0, height)
x2 = np.clip(center_x + cut_width // 2, 0, width)
y2 = np.clip(center_y + cut_height // 2, 0, height)
return x1, y1, x2, y2