File size: 5,841 Bytes
8720f73 5231cb0 8720f73 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
import sys
import cv2
import os
from datasets import load_dataset
sys.path.append('Effort-AIGI-Detection/DeepfakeBench/training')
from effort_inf import infer_imgs
from tqdm import tqdm
print(infer_imgs([cv2.imread('Effort-AIGI-Detection/figs/deepfake_tab1.png')]))
local_eval = 'qafigs_machine' in os.environ
def get_first_n_frames(video_path, n):
frames = []
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise IOError(f"Cannot open video file: {video_path}")
count = 0
while count < n:
ret, frame = cap.read()
if not ret:
break # stop if video ends before n frames
frames.append(frame)
count += 1
cap.release()
return frames
import cv2
import random
def get_random_n_frames(video_path, n):
frames = []
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise IOError(f"Cannot open video file: {video_path}")
# total number of frames in the video
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if n > total_frames:
raise ValueError(f"Video only has {total_frames} frames, but {n} requested")
# pick n unique random frame indices
frame_indices = sorted(random.sample(range(total_frames), n))
for idx in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx) # jump to frame index
ret, frame = cap.read()
if not ret:
continue
frames.append(frame)
cap.release()
return frames
from pathlib import Path
from datetime import datetime
import uuid
from typing import Optional, Union
def save_video_bytes(
video_bytes: bytes,
original_remote_path: Union[str, Path],
dest_dir: Optional[Union[str, Path]] = None,
prefer_original_name: bool = True,
) -> str:
"""
Save raw video bytes to a local file, preserving the original extension(s).
Args:
video_bytes: The raw bytes of the video.
original_remote_path: The file path/name from the other system
(used only to extract the extension and optionally the stem).
dest_dir: Directory to save into (defaults to current working directory).
prefer_original_name: If True, try to use the original filename; if it exists,
a unique suffix will be appended. If False, always generate
a new UUID-based name.
Returns:
The absolute path (as a string) to the saved local file.
"""
if not isinstance(video_bytes, (bytes, bytearray)):
raise TypeError("video_bytes must be bytes or bytearray")
remote = Path(str(original_remote_path))
# Keep compound suffixes if any (e.g., '.tar.gz'); for videos it's typically one suffix like '.mp4'
compound_suffix = "".join(remote.suffixes)
if not compound_suffix:
raise ValueError("Original path has no extension; cannot determine output format.")
# Choose destination directory
out_dir = Path(dest_dir) if dest_dir is not None else Path.cwd()
out_dir.mkdir(parents=True, exist_ok=True)
# Build a candidate filename
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
if prefer_original_name and remote.stem:
base_stem = remote.stem
else:
# short UUID to keep names readable
base_stem = f"video-{uuid.uuid4().hex[:8]}"
candidate = out_dir / f"{base_stem}{compound_suffix}"
# Ensure we don't overwrite existing files; if it exists, append a unique suffix
if candidate.exists():
candidate = out_dir / f"{base_stem}-{timestamp}-{uuid.uuid4().hex[:6]}{compound_suffix}"
# Write atomically-like: open with 'xb' to fail if somehow it still exists between the check and write
try:
with open(candidate, "xb") as f:
f.write(video_bytes)
except FileExistsError:
# Extremely rare race; retry with a guaranteed-unique name
candidate = out_dir / f"{base_stem}-{timestamp}-{uuid.uuid4().hex}{compound_suffix}"
with open(candidate, "xb") as f:
f.write(video_bytes)
return str(candidate.resolve())
def predict_video(bytes, path, n=30):
# first save the video somewhere
local_path = save_video_bytes(bytes, path)
frames = get_random_n_frames(local_path, n)
all_probs = infer_imgs(frames)
final_probs = sum(all_probs)/len(all_probs)
os.remove(local_path)
return 'real' if final_probs < 0.47 else 'generated', final_probs
test = False
# if 'qafigs_machine' in os.environ:
if test:
path = '/home/rayan.banerjee/Downloads/real video.mp4'
vbytes = open(path, 'rb')
print(predict_video(vbytes.read(), path))
else:
DATASET_PATH = "/tmp/data" if not local_eval else '/l/users/rayan.banerjee/safevideo/live_dataset'
dataset_remote = load_dataset(DATASET_PATH, split="test", streaming=True)
print(dataset_remote)
ids = []
preds = []
scores = []
times = []
import time
loaded = False
for el in tqdm(dataset_remote, total = len([el for el in dataset_remote])):
start_time = time.time_ns()
try:
pred, score = predict_video(el['video']['bytes'], el['video']['path'])
loaded = True
except:
pred = 'generated'
score = 1
score = score * 2 - 1
id = el['id']
times.append((time.time_ns() - start_time)/10e9)
ids.append(id)
scores.append(score)
preds.append(pred)
import pandas as pd
pd.DataFrame({
'id': ids,
'pred': preds,
'score': scores,
'time': times
}).to_csv('submission.csv', index=False)
if not loaded:
# this means none of our submissions worked, we will throw an error just for the sake of debugging
raise Exception('Failed to load a single video')
|