Spaces:
				
			
			
	
			
			
		Running
		
			on 
			
			Zero
	
	
	
			
			
	
	
	
	
		
		
		Running
		
			on 
			
			Zero
	| import os | |
| import copy | |
| import json | |
| import time | |
| import torch | |
| import argparse | |
| from PIL import Image | |
| import numpy as np | |
| import soundfile as sf | |
| from tqdm import tqdm | |
| from diffusers import DDPMScheduler | |
| from models import build_pretrained_models, AudioDiffusion | |
| from transformers import AutoProcessor, ClapModel | |
| import torchaudio | |
| import tools.torch_tools as torch_tools | |
| from datasets import load_dataset | |
| # Check if CUDA is available and set the device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print("Using device:", device) | |
| class dotdict(dict): | |
| """dot.notation access to dictionary attributes""" | |
| __getattr__ = dict.get | |
| __setattr__ = dict.__setitem__ | |
| __delattr__ = dict.__delitem__ | |
| def chunks(lst, n): | |
| """Yield successive n-sized chunks from lst.""" | |
| for i in range(0, len(lst), n): | |
| yield lst[i:i + n] | |
| def parse_args(): | |
| parser = argparse.ArgumentParser(description="Inference for text to audio generation task.") | |
| parser.add_argument( | |
| "--original_args", type=str, default=None, | |
| help="Path for summary jsonl file saved during training." | |
| ) | |
| parser.add_argument( | |
| "--model", type=str, default=None, | |
| help="Path for saved model bin file." | |
| ) | |
| parser.add_argument( | |
| "--vae_model", type=str, default="audioldm-s-full", | |
| help="Path for saved model bin file." | |
| ) | |
| parser.add_argument( | |
| "--num_steps", type=int, default=200, | |
| help="How many denoising steps for generation.", | |
| ) | |
| parser.add_argument( | |
| "--guidance", type=float, default=3, | |
| help="Guidance scale for classifier free guidance." | |
| ) | |
| parser.add_argument( | |
| "--batch_size", type=int, default=1, | |
| help="Batch size for generation.", | |
| ) | |
| parser.add_argument( | |
| "--num_samples", type=int, default=1, | |
| help="How many samples per prompt.", | |
| ) | |
| parser.add_argument( | |
| "--num_test_instances", type=int, default=-1, | |
| help="How many test instances to evaluate.", | |
| ) | |
| parser.add_argument( | |
| "--sample_rate", type=int, default=16000, | |
| help="Sample rate for audio output.", | |
| ) | |
| parser.add_argument( | |
| "--max_duration", type=int, default=10, | |
| help="Maximum length duration for generated audio." | |
| ) | |
| parser.add_argument( | |
| "--save_dir", type=str, default="./outputs/tmp", | |
| help="output save dir" | |
| ) | |
| parser.add_argument( | |
| "--data_path", type=str, default="data/video_processed/video_gt_augment", | |
| help="inference data path" | |
| ) | |
| args = parser.parse_args() | |
| return args | |
| def main(): | |
| args = parse_args() | |
| train_args = dotdict(json.loads(open(args.original_args).readlines()[0])) | |
| if "hf_model" not in train_args: | |
| train_args["hf_model"] = None | |
| # Load Models # | |
| name = train_args.vae_model | |
| vae, stft = build_pretrained_models(name) | |
| vae, stft = vae.to(device), stft.to(device) # Ensure models are on the correct device | |
| model_class = AudioDiffusion | |
| if train_args.ib: | |
| print("*****USING MODEL IMAGEBIND*****") | |
| from models_imagebind import AudioDiffusion_IB | |
| model_class = AudioDiffusion_IB | |
| elif train_args.lb: | |
| print("*****USING MODEL LANGUAGEBIND*****") | |
| from models_languagebind import AudioDiffusion_LB | |
| model_class = AudioDiffusion_LB | |
| elif train_args.jepa: | |
| print("*****USING MODEL JEPA*****") | |
| from models_vjepa import AudioDiffusion_JEPA | |
| model_class = AudioDiffusion_JEPA | |
| model = model_class( | |
| train_args.fea_encoder_name, | |
| train_args.scheduler_name, | |
| train_args.unet_model_name, | |
| train_args.unet_model_config, | |
| train_args.snr_gamma, | |
| train_args.freeze_text_encoder, | |
| train_args.uncondition, | |
| train_args.img_pretrained_model_path, | |
| train_args.task, | |
| train_args.embedding_dim, | |
| train_args.pe | |
| ) | |
| model.eval() | |
| # Load Trained Weight # | |
| try: | |
| if args.model.endswith(".pt") or args.model.endswith(".bin"): | |
| model.load_state_dict(torch.load(args.model, map_location=device), strict=False) | |
| else: | |
| from safetensors.torch import load_model | |
| load_model(model, args.model, strict=False) | |
| except OSError as e: | |
| print(f"Error loading model with safetensors: {e}") | |
| print("Falling back to torch.load") | |
| model.load_state_dict(torch.load(args.model, map_location=device), strict=False) | |
| model.to(device) | |
| scheduler = DDPMScheduler.from_pretrained(train_args.scheduler_name, subfolder="scheduler") | |
| sample_rate = args.sample_rate | |
| # Define max_len_in_seconds globally for consistency | |
| max_len_in_seconds = args.max_duration | |
| def audio_text_matching(waveforms, text, sample_freq=24000, max_len_in_seconds=max_len_in_seconds): | |
| new_freq = 48000 | |
| resampled = [] | |
| for wav in waveforms: | |
| x = torchaudio.functional.resample(torch.tensor(wav, dtype=torch.float).reshape(1, -1), orig_freq=sample_freq, new_freq=new_freq)[0].numpy() | |
| resampled.append(x[:new_freq*max_len_in_seconds]) | |
| inputs = clap_processor(text=text, audios=resampled, return_tensors="pt", padding=True, sampling_rate=48000) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = clap(**inputs) | |
| logits_per_audio = outputs.logits_per_audio | |
| ranks = torch.argsort(logits_per_audio.flatten(), descending=True).cpu().numpy() | |
| return ranks | |
| # Load Data # | |
| if train_args.prefix: | |
| prefix = train_args.prefix | |
| else: | |
| prefix = "" | |
| data_path = args.data_path | |
| wavname = [f"{name.split('.')[0]}.wav" for name in os.listdir(data_path)] | |
| video_features = [] | |
| for video_file in os.listdir(data_path): | |
| video_path = os.path.join(data_path, video_file) | |
| video_feature = torch_tools.load_video(video_path, frame_rate=2, size=224) | |
| print(video_feature.shape) | |
| video_features.append(video_feature.to(device)) # Move to device | |
| # Generate # | |
| num_steps, guidance, batch_size, num_samples = args.num_steps, args.guidance, args.batch_size, args.num_samples | |
| all_outputs = [] | |
| for k in tqdm(range(0, len(wavname), batch_size)): | |
| with torch.no_grad(): | |
| prompt = video_features[k: k+batch_size] | |
| latents = model.inference(scheduler, None, prompt, None, num_steps, guidance, num_samples, disable_progress=True, device=device) | |
| mel = vae.decode_first_stage(latents) | |
| wave = vae.decode_to_waveform(mel) | |
| # Ensure the waveform is exactly 8 seconds long | |
| num_samples_n_seconds = sample_rate * max_len_in_seconds | |
| wave = [wav[:num_samples_n_seconds] for wav in wave] | |
| all_outputs += [item for item in wave] | |
| # Save # | |
| exp_id = str(int(time.time())) | |
| if not os.path.exists("outputs"): | |
| os.makedirs("outputs") | |
| if num_samples == 1: | |
| output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}_augment".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate) | |
| os.makedirs(output_dir, exist_ok=True) | |
| for j, wav in enumerate(all_outputs): | |
| sf.write("{}/{}".format(output_dir, wavname[j]), wav, samplerate=sample_rate) | |
| else: | |
| for i in range(num_samples): | |
| output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}/rank_{}".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate, i+1) | |
| os.makedirs(output_dir, exist_ok=True) | |
| groups = list(chunks(all_outputs, num_samples)) | |
| for k in tqdm(range(len(groups))): | |
| wavs_for_text = groups[k] | |
| rank = audio_text_matching(wavs_for_text, text_prompts[k]) | |
| ranked_wavs_for_text = [wavs_for_text[r] for r in rank] | |
| for i, wav in enumerate(ranked_wavs_for_text): | |
| output_dir = "{}/{}_{}_steps_{}_guidance_{}_sampleRate_{}/rank_{}".format(args.save_dir, exp_id, "_".join(args.model.split("/")[1:-1]), num_steps, guidance, sample_rate, i+1) | |
| sf.write("{}/{}".format(output_dir, wavname[k]), wav, samplerate=sample_rate) | |
| if __name__ == "__main__": | |
| main() | 
