|
|
import os |
|
|
import re |
|
|
import json |
|
|
import time |
|
|
import random |
|
|
import argparse, pdb |
|
|
from os import path |
|
|
import google.generativeai as genai |
|
|
from moviepy.editor import VideoFileClip |
|
|
from camel.models import ModelFactory |
|
|
from camel.types import ModelType, ModelPlatformType |
|
|
from camel.configs import GeminiConfig |
|
|
from typing import List |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
genai.configure(api_key="") |
|
|
|
|
|
_num_at_start = re.compile(r'^\s*["\']?(\d+)') |
|
|
def sort_by_leading_number(paths: List[str]) -> List[str]: |
|
|
def key(p: str): |
|
|
name = Path(p).name |
|
|
m = _num_at_start.match(name) |
|
|
return (int(m.group(1)) if m else float('inf'), name) |
|
|
return sorted(paths, key=key) |
|
|
dataset_path = "/path/to/data" |
|
|
dataset_list = sort_by_leading_number(os.listdir(dataset_path)) |
|
|
|
|
|
|
|
|
def eval_ip(root_path, clip_duration, model_list, prompt_path, question_path, test_type='image'): |
|
|
tmp_dir = "tmp" |
|
|
os.makedirs(tmp_dir, exist_ok=True) |
|
|
gemini_model = genai.GenerativeModel("models/gemini-2.5-pro-flash") |
|
|
|
|
|
with open(prompt_path, 'r') as f: prompt = f.readlines() |
|
|
prompt = "/n".join(prompt) |
|
|
with open(question_path, 'r') as f: questions = json.load(f) |
|
|
|
|
|
result_each_question = [] |
|
|
for question in questions: |
|
|
video_ids = question["videos"] |
|
|
querys = question["querys"] |
|
|
qs = question["questions"] |
|
|
|
|
|
|
|
|
video_clips_path = {} |
|
|
for model in model_list: video_clips_path[model] = [] |
|
|
|
|
|
start_p2v = None |
|
|
for vid_id in video_ids: |
|
|
tmp_dir_id = path.join(tmp_dir, str(vid_id)) |
|
|
os.makedirs(tmp_dir_id, exist_ok=True) |
|
|
for model in model_list: |
|
|
if model == 'p2v': video_path = path.join(root_path, "paper2video", str(vid_id), '3_merage.mp4') |
|
|
elif model == 'p2v-o': video_path = path.join(root_path, "paper2video_wo_presenter", str(vid_id), 'result.mp4') |
|
|
elif model == 'veo3': video_path = path.join(root_path, "veo3", str(vid_id)+".mp4") |
|
|
elif model == 'wan2.2': video_path = path.join(root_path, "wan2.2", str(int(vid_id)-1), "result.mp4") |
|
|
elif model == 'presentagent': video_path = path.join(root_path, "presentagent", str(vid_id), "result.mp4") |
|
|
elif model == 'human-made': video_path = path.join(dataset_path, dataset_list[int(vid_id)-1], "gt_presentation_video.mp4") |
|
|
|
|
|
video = VideoFileClip(video_path) |
|
|
start = random.uniform(0, video.duration-clip_duration-1) |
|
|
end = start + clip_duration |
|
|
if model == 'p2v' or model == "p2v-o": |
|
|
if start_p2v is None: |
|
|
start_p2v = random.uniform(0, video.duration-clip_duration-1) |
|
|
start = start_p2v |
|
|
end = start_p2v + clip_duration |
|
|
else: |
|
|
start = start_p2v |
|
|
end = start_p2v + clip_duration |
|
|
else: |
|
|
start = random.uniform(0, video.duration-clip_duration-1) |
|
|
end = start + clip_duration |
|
|
|
|
|
clip_save_path = path.join(tmp_dir_id, model+".mp4") |
|
|
subclip = video.subclip(start, end) |
|
|
subclip.write_videofile(clip_save_path, codec="libx264", audio_codec="aac") |
|
|
video_clips_path[model].append(clip_save_path) |
|
|
|
|
|
result_each_model = {} |
|
|
for model in model_list: |
|
|
video_input = video_clips_path[model] |
|
|
videos = upload_videos(video_input) |
|
|
result_each_model[model] = [] |
|
|
for idx, query in enumerate(querys): |
|
|
if test_type == 'image': |
|
|
query = query[0] |
|
|
query_state = genai.upload_file(path=query, mime_type="image/png") |
|
|
elif test_type == 'aduio': |
|
|
query = query[1] |
|
|
|
|
|
answer = idx |
|
|
ori_idxs = [0, 1, 2, 3] |
|
|
shuffled_idx = ori_idxs.copy() |
|
|
random.shuffle(shuffled_idx) |
|
|
mapping = {orig: shuffled for orig, shuffled in zip(ori_idxs, shuffled_idx)} |
|
|
new_answer = mapping[idx] |
|
|
new_qs = [qs[mapping[idx]] for idx in ori_idxs] |
|
|
|
|
|
contents = [prompt, "Here are the quary", genai.get_file(query_state.name), "Here are the video clips"] |
|
|
contents.extend(videos) |
|
|
contents.extend(["Here are the questions"]) |
|
|
contents.extend(new_qs) |
|
|
|
|
|
response = gemini_model.generate_content(contents) |
|
|
|
|
|
match = re.search(r"My choice:\s*(\d+)", response.text) |
|
|
if match: choice_num = int(match.group(1)) - 1 |
|
|
if choice_num == new_answer: |
|
|
result_each_model[model].append([query, new_qs, choice_num, new_answer, True]) |
|
|
else: |
|
|
result_each_model[model].append([query, new_qs, choice_num, new_answer, False]) |
|
|
result_each_question.append(result_each_model) |
|
|
print(result_each_question) |
|
|
with open("ip_qa_result.json", 'w') as f: json.dump(result_each_question, f, indent=4) |
|
|
|
|
|
def upload_videos(video_list): |
|
|
videos = video_list.copy() |
|
|
for idx, value in enumerate(videos): |
|
|
videos[idx] = genai.upload_file(path=value, mime_type="video/mp4") |
|
|
while True: |
|
|
flag = True |
|
|
for idx, value in enumerate(videos): |
|
|
file_state = genai.get_file(videos[idx].name) |
|
|
if file_state.state.name != "ACTIVE": |
|
|
flag = False |
|
|
time.sleep(5) |
|
|
print(f"waiting 5 seconds...") |
|
|
break |
|
|
if flag: break |
|
|
for idx, value in enumerate(videos): |
|
|
videos[idx] = genai.get_file(videos[idx].name) |
|
|
return videos |
|
|
|
|
|
if __name__ == "__main__": |
|
|
clip_duration = 4 |
|
|
prompt_path = "./prompt/ip_qa.txt" |
|
|
model_list = ["p2v", "p2v-o", "veo3", "wan2.2", "presentagent", "human-made"] |
|
|
root_path = "/path/to/result" |
|
|
question_path = "ip_qa.json" |
|
|
eval_ip(root_path, clip_duration, model_list, prompt_path, question_path) |