Spaces:
Running
on
Zero
Running
on
Zero
| # utils/ai_generator_diffusers_flux.py | |
| import os | |
| import utils.constants as constants | |
| #import spaces | |
| import gradio as gr | |
| from torch import __version__ as torch_version_, version, cuda, bfloat16, float32, Generator, no_grad, backends | |
| from diffusers import FluxPipeline,FluxImg2ImgPipeline,FluxControlPipeline | |
| import accelerate | |
| from transformers import AutoTokenizer | |
| import safetensors | |
| #import xformers | |
| #from diffusers.utils import load_image | |
| #from huggingface_hub import hf_hub_download | |
| from PIL import Image | |
| from tempfile import NamedTemporaryFile | |
| from utils.image_utils import ( | |
| crop_and_resize_image, | |
| ) | |
| from utils.version_info import ( | |
| get_torch_info, | |
| # get_diffusers_version, | |
| # get_transformers_version, | |
| # get_xformers_version, | |
| initialize_cuda, | |
| release_torch_resources | |
| ) | |
| import gc | |
| from utils.lora_details import get_trigger_words, approximate_token_count, split_prompt_precisely | |
| #from utils.color_utils import detect_color_format | |
| #import utils.misc as misc | |
| #from pathlib import Path | |
| import warnings | |
| warnings.filterwarnings("ignore", message=".*Torch was not compiled with flash attention.*") | |
| #print(torch_version_) # Ensure it's 2.0 or newer | |
| #print(cuda.is_available()) # Ensure CUDA is available | |
| PIPELINE_CLASSES = { | |
| "FluxPipeline": FluxPipeline, | |
| "FluxImg2ImgPipeline": FluxImg2ImgPipeline, | |
| "FluxControlPipeline": FluxControlPipeline | |
| } | |
| #@spaces.GPU() | |
| def generate_image_from_text( | |
| text, | |
| model_name="black-forest-labs/FLUX.1-dev", | |
| lora_weights=None, | |
| conditioned_image=None, | |
| image_width=1344, | |
| image_height=848, | |
| guidance_scale=3.5, | |
| num_inference_steps=50, | |
| seed=0, | |
| additional_parameters=None, | |
| progress=gr.Progress(track_tqdm=True) | |
| ): | |
| from src.condition import Condition | |
| device = "cuda" if cuda.is_available() else "cpu" | |
| print(f"device:{device}\nmodel_name:{model_name}\n") | |
| # Initialize the pipeline | |
| pipe = FluxPipeline.from_pretrained( | |
| model_name, | |
| torch_dtype=bfloat16 if device == "cuda" else float32 | |
| ).to(device) | |
| pipe.enable_model_cpu_offload() | |
| # Access the tokenizer from the pipeline | |
| tokenizer = pipe.tokenizer | |
| # Handle add_prefix_space attribute | |
| if getattr(tokenizer, 'add_prefix_space', False): | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
| # Update the pipeline's tokenizer | |
| pipe.tokenizer = tokenizer | |
| # Load and apply LoRA weights | |
| if lora_weights: | |
| for lora_weight in lora_weights: | |
| lora_configs = constants.LORA_DETAILS.get(lora_weight, []) | |
| if lora_configs: | |
| for config in lora_configs: | |
| weight_name = config.get("weight_name") | |
| adapter_name = config.get("adapter_name") | |
| pipe.load_lora_weights( | |
| lora_weight, | |
| weight_name=weight_name, | |
| adapter_name=adapter_name, | |
| use_auth_token=constants.HF_API_TOKEN | |
| ) | |
| else: | |
| pipe.load_lora_weights(lora_weight, use_auth_token=constants.HF_API_TOKEN) | |
| # Set the random seed for reproducibility | |
| generator = Generator(device=device).manual_seed(seed) | |
| conditions = [] | |
| # Handle conditioned image if provided | |
| if conditioned_image is not None: | |
| conditioned_image = crop_and_resize_image(conditioned_image, 1024, 1024) | |
| condition = Condition("subject", conditioned_image) | |
| conditions.append(condition) | |
| # Prepare parameters for image generation | |
| generate_params = { | |
| "prompt": text, | |
| "height": image_height, | |
| "width": image_width, | |
| "guidance_scale": guidance_scale, | |
| "num_inference_steps": num_inference_steps, | |
| "generator": generator, | |
| "conditions": conditions if conditions else None | |
| } | |
| if additional_parameters: | |
| generate_params.update(additional_parameters) | |
| generate_params = {k: v for k, v in generate_params.items() if v is not None} | |
| # Generate the image | |
| result = pipe(**generate_params) | |
| image = result.images[0] | |
| pipe.unload_lora_weights() | |
| # Clean up | |
| del result | |
| del conditions | |
| del generator | |
| del pipe | |
| cuda.empty_cache() | |
| cuda.ipc_collect() | |
| return image | |
| #@spaces.GPU(progress=gr.Progress(track_tqdm=True)) | |
| def generate_image_lowmem( | |
| text, | |
| neg_prompt=None, | |
| model_name="black-forest-labs/FLUX.1-dev", | |
| lora_weights=None, | |
| conditioned_image=None, | |
| image_width=1368, | |
| image_height=848, | |
| guidance_scale=3.5, | |
| num_inference_steps=30, | |
| seed=0, | |
| true_cfg_scale=1.0, | |
| pipeline_name="FluxPipeline", | |
| strength=0.75, | |
| additional_parameters=None, | |
| progress=gr.Progress(track_tqdm=True) | |
| ): | |
| # Retrieve the pipeline class from the mapping | |
| pipeline_class = PIPELINE_CLASSES.get(pipeline_name) | |
| if not pipeline_class: | |
| raise ValueError(f"Unsupported pipeline type '{pipeline_name}'. " | |
| f"Available options: {list(PIPELINE_CLASSES.keys())}") | |
| initialize_cuda() | |
| device = "cuda" if cuda.is_available() else "cpu" | |
| from src.condition import Condition | |
| print(f"device:{device}\nmodel_name:{model_name}\nlora_weights:{lora_weights}\n") | |
| print(f"\n {get_torch_info()}\n") | |
| # Disable gradient calculations | |
| with no_grad(): | |
| # Initialize the pipeline inside the context manager | |
| pipe = pipeline_class.from_pretrained( | |
| model_name, | |
| torch_dtype=bfloat16 if device == "cuda" else float32 | |
| ).to(device) | |
| # Optionally, don't use CPU offload if not necessary | |
| # alternative version that may be more efficient | |
| # pipe.enable_sequential_cpu_offload() | |
| if pipeline_name == "FluxPipeline": | |
| pipe.enable_model_cpu_offload() | |
| pipe.vae.enable_slicing() | |
| pipe.vae.enable_tiling() | |
| else: | |
| pipe.enable_model_cpu_offload() | |
| # Access the tokenizer from the pipeline | |
| tokenizer = pipe.tokenizer | |
| # Check if add_prefix_space is set and convert to slow tokenizer if necessary | |
| if getattr(tokenizer, 'add_prefix_space', False): | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False, device_map = 'cpu') | |
| # Update the pipeline's tokenizer | |
| pipe.tokenizer = tokenizer | |
| pipe.to(device) | |
| flash_attention_enabled = backends.cuda.flash_sdp_enabled() | |
| if flash_attention_enabled == False: | |
| #Enable xFormers memory-efficient attention (optional) | |
| #pipe.enable_xformers_memory_efficient_attention() | |
| print("\nEnabled xFormers memory-efficient attention.\n") | |
| else: | |
| pipe.attn_implementation="flash_attention_2" | |
| print("\nEnabled flash_attention_2.\n") | |
| condition_type = "subject" | |
| # Load LoRA weights | |
| # note: does not yet handle multiple LoRA weights with different names, needs .set_adapters(["depth", "hyper-sd"], adapter_weights=[0.85, 0.125]) | |
| if lora_weights: | |
| for lora_weight in lora_weights: | |
| lora_configs = constants.LORA_DETAILS.get(lora_weight, []) | |
| lora_weight_set = False | |
| if lora_configs: | |
| for config in lora_configs: | |
| # Load LoRA weights with optional weight_name and adapter_name | |
| if 'weight_name' in config: | |
| weight_name = config.get("weight_name") | |
| adapter_name = config.get("adapter_name") | |
| lora_collection = config.get("lora_collection") | |
| if weight_name and adapter_name and lora_collection and lora_weight_set == False: | |
| pipe.load_lora_weights( | |
| lora_collection, | |
| weight_name=weight_name, | |
| adapter_name=adapter_name, | |
| token=constants.HF_API_TOKEN | |
| ) | |
| lora_weight_set = True | |
| print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}, lora_collection={lora_collection}\n") | |
| elif weight_name and adapter_name==None and lora_collection and lora_weight_set == False: | |
| pipe.load_lora_weights( | |
| lora_collection, | |
| weight_name=weight_name, | |
| token=constants.HF_API_TOKEN | |
| ) | |
| lora_weight_set = True | |
| print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}, lora_collection={lora_collection}\n") | |
| elif weight_name and adapter_name and lora_weight_set == False: | |
| pipe.load_lora_weights( | |
| lora_weight, | |
| weight_name=weight_name, | |
| adapter_name=adapter_name, | |
| token=constants.HF_API_TOKEN | |
| ) | |
| lora_weight_set = True | |
| print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}\n") | |
| elif weight_name and adapter_name==None and lora_weight_set == False: | |
| pipe.load_lora_weights( | |
| lora_weight, | |
| weight_name=weight_name, | |
| token=constants.HF_API_TOKEN | |
| ) | |
| lora_weight_set = True | |
| print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}\n") | |
| elif lora_weight_set == False: | |
| pipe.load_lora_weights( | |
| lora_weight, | |
| token=constants.HF_API_TOKEN | |
| ) | |
| lora_weight_set = True | |
| print(f"\npipe.load_lora_weights({lora_weight}, weight_name={weight_name}, adapter_name={adapter_name}\n") | |
| # Apply 'pipe' configurations if present | |
| if 'pipe' in config: | |
| pipe_config = config['pipe'] | |
| for method_name, params in pipe_config.items(): | |
| method = getattr(pipe, method_name, None) | |
| if method: | |
| print(f"Applying pipe method: {method_name} with params: {params}") | |
| method(**params) | |
| else: | |
| print(f"Method {method_name} not found in pipe.") | |
| if 'condition_type' in config: | |
| condition_type = config['condition_type'] | |
| if condition_type == "coloring": | |
| #pipe.enable_coloring() | |
| print("\nEnabled coloring.\n") | |
| elif condition_type == "deblurring": | |
| #pipe.enable_deblurring() | |
| print("\nEnabled deblurring.\n") | |
| elif condition_type == "fill": | |
| #pipe.enable_fill() | |
| print("\nEnabled fill.\n") | |
| elif condition_type == "depth": | |
| #pipe.enable_depth() | |
| print("\nEnabled depth.\n") | |
| elif condition_type == "canny": | |
| #pipe.enable_canny() | |
| print("\nEnabled canny.\n") | |
| elif condition_type == "subject": | |
| #pipe.enable_subject() | |
| print("\nEnabled subject.\n") | |
| else: | |
| print(f"Condition type {condition_type} not implemented.") | |
| else: | |
| pipe.load_lora_weights(lora_weight, use_auth_token=constants.HF_API_TOKEN) | |
| # Set the random seed for reproducibility | |
| generator = Generator(device=device).manual_seed(seed) | |
| conditions = [] | |
| if conditioned_image is not None: | |
| conditioned_image = crop_and_resize_image(conditioned_image, image_width, image_height) | |
| condition = Condition(condition_type, conditioned_image) | |
| conditions.append(condition) | |
| print(f"\nAdded conditioned image.\n {conditioned_image.size}") | |
| # Prepare the parameters for image generation | |
| additional_parameters ={ | |
| "strength": strength, | |
| "image": conditioned_image, | |
| } | |
| else: | |
| print("\nNo conditioned image provided.") | |
| if neg_prompt!=None: | |
| true_cfg_scale=1.1 | |
| additional_parameters ={ | |
| "negative_prompt": neg_prompt, | |
| "true_cfg_scale": true_cfg_scale, | |
| } | |
| # handle long prompts by splitting them | |
| if approximate_token_count(text) > 76: | |
| prompt, prompt2 = split_prompt_precisely(text) | |
| prompt_parameters = { | |
| "prompt" : prompt, | |
| "prompt_2": prompt2 | |
| } | |
| else: | |
| prompt_parameters = { | |
| "prompt" :text | |
| } | |
| additional_parameters.update(prompt_parameters) | |
| # Combine all parameters | |
| generate_params = { | |
| "height": image_height, | |
| "width": image_width, | |
| "guidance_scale": guidance_scale, | |
| "num_inference_steps": num_inference_steps, | |
| "generator": generator, } | |
| if additional_parameters: | |
| generate_params.update(additional_parameters) | |
| generate_params = {k: v for k, v in generate_params.items() if v is not None} | |
| print(f"generate_params: {generate_params}") | |
| # Generate the image | |
| result = pipe(**generate_params) | |
| image = result.images[0] | |
| # Clean up | |
| del result | |
| del conditions | |
| del generator | |
| # Delete the pipeline and clear cache | |
| del pipe | |
| cuda.empty_cache() | |
| cuda.ipc_collect() | |
| print(cuda.memory_summary(device=None, abbreviated=False)) | |
| return image | |
| def generate_ai_image_local ( | |
| map_option, | |
| prompt_textbox_value, | |
| neg_prompt_textbox_value, | |
| model="black-forest-labs/FLUX.1-dev", | |
| lora_weights=None, | |
| conditioned_image=None, | |
| height=512, | |
| width=912, | |
| num_inference_steps=30, | |
| guidance_scale=3.5, | |
| seed=777, | |
| pipeline_name="FluxPipeline", | |
| strength=0.75, | |
| progress=gr.Progress(track_tqdm=True) | |
| ): | |
| release_torch_resources() | |
| print(f"Generating image with lowmem") | |
| try: | |
| if map_option != "Prompt": | |
| prompt = constants.PROMPTS[map_option] | |
| negative_prompt = constants.NEGATIVE_PROMPTS.get(map_option, "") | |
| else: | |
| prompt = prompt_textbox_value | |
| negative_prompt = neg_prompt_textbox_value or "" | |
| #full_prompt = f"{prompt} {negative_prompt}" | |
| additional_parameters = {} | |
| if lora_weights: | |
| for lora_weight in lora_weights: | |
| lora_configs = constants.LORA_DETAILS.get(lora_weight, []) | |
| for config in lora_configs: | |
| if 'parameters' in config: | |
| additional_parameters.update(config['parameters']) | |
| elif 'trigger_words' in config: | |
| trigger_words = get_trigger_words(lora_weight) | |
| prompt = f"{trigger_words} {prompt}" | |
| for key, value in additional_parameters.items(): | |
| if key in ['height', 'width', 'num_inference_steps', 'max_sequence_length']: | |
| additional_parameters[key] = int(value) | |
| elif key in ['guidance_scale','true_cfg_scale']: | |
| additional_parameters[key] = float(value) | |
| height = additional_parameters.pop('height', height) | |
| width = additional_parameters.pop('width', width) | |
| num_inference_steps = additional_parameters.pop('num_inference_steps', num_inference_steps) | |
| guidance_scale = additional_parameters.pop('guidance_scale', guidance_scale) | |
| print("Generating image with the following parameters:") | |
| print(f"Model: {model}") | |
| print(f"LoRA Weights: {lora_weights}") | |
| print(f"Prompt: {prompt}") | |
| print(f"Neg Prompt: {negative_prompt}") | |
| print(f"Height: {height}") | |
| print(f"Width: {width}") | |
| print(f"Number of Inference Steps: {num_inference_steps}") | |
| print(f"Guidance Scale: {guidance_scale}") | |
| print(f"Seed: {seed}") | |
| print(f"Additional Parameters: {additional_parameters}") | |
| print(f"Conditioned Image: {conditioned_image}") | |
| print(f"Conditioned Image Strength: {strength}") | |
| print(f"pipeline: {pipeline_name}") | |
| image = generate_image_lowmem( | |
| text=prompt, | |
| model_name=model, | |
| neg_prompt=negative_prompt, | |
| lora_weights=lora_weights, | |
| conditioned_image=conditioned_image, | |
| image_width=width, | |
| image_height=height, | |
| guidance_scale=guidance_scale, | |
| num_inference_steps=num_inference_steps, | |
| seed=seed, | |
| pipeline_name=pipeline_name, | |
| strength=strength, | |
| additional_parameters=additional_parameters | |
| ) | |
| with NamedTemporaryFile(delete=False, suffix=".png") as tmp: | |
| image.save(tmp.name, format="PNG") | |
| constants.temp_files.append(tmp.name) | |
| print(f"Image saved to {tmp.name}") | |
| gc.collect() | |
| return tmp.name | |
| except Exception as e: | |
| print(f"Error generating AI image: {e}") | |
| gc.collect() | |
| return None | |
| # does not work | |
| def merge_LoRA_weights(model="black-forest-labs/FLUX.1-dev", | |
| lora_weights="Borcherding/FLUX.1-dev-LoRA-FractalLand-v0.1"): | |
| model_suffix = model.split("/")[-1] | |
| if model_suffix not in lora_weights: | |
| raise ValueError(f"The model suffix '{model_suffix}' must be in the lora_weights string '{lora_weights}' to proceed.") | |
| pipe = FluxPipeline.from_pretrained(model, torch_dtype=bfloat16) | |
| pipe.load_lora_weights(lora_weights) | |
| pipe.save_lora_weights(os.getenv("TMPDIR")) | |
| lora_name = lora_weights.split("/")[-1] + "-merged" | |
| pipe.save_pretrained(lora_name) | |
| pipe.unload_lora_weights() | |