Spaces:
Build error
Build error
| from typing import Any, Dict, Optional | |
| import torch | |
| from diffusers.models.attention_processor import Attention | |
| def construct_pix2pix_attention(hidden_states_dim, norm_type="none"): | |
| if norm_type == "layernorm": | |
| norm = torch.nn.LayerNorm(hidden_states_dim) | |
| else: | |
| norm = torch.nn.Identity() | |
| attention = Attention( | |
| query_dim=hidden_states_dim, | |
| heads=8, | |
| dim_head=hidden_states_dim // 8, | |
| bias=True, | |
| ) | |
| # NOTE: xformers 0.22 does not support batchsize >= 4096 | |
| attention.xformers_not_supported = True # hacky solution | |
| return norm, attention | |
| class ExtraAttnProc(torch.nn.Module): | |
| def __init__( | |
| self, | |
| chained_proc, | |
| enabled=False, | |
| name=None, | |
| mode='extract', | |
| with_proj_in=False, | |
| proj_in_dim=768, | |
| target_dim=None, | |
| pixel_wise_crosspond=False, | |
| norm_type="none", # none or layernorm | |
| crosspond_effect_on="all", # all or first | |
| crosspond_chain_pos="parralle", # before or parralle or after | |
| simple_3d=False, | |
| views=4, | |
| ) -> None: | |
| super().__init__() | |
| self.enabled = enabled | |
| self.chained_proc = chained_proc | |
| self.name = name | |
| self.mode = mode | |
| self.with_proj_in=with_proj_in | |
| self.proj_in_dim = proj_in_dim | |
| self.target_dim = target_dim or proj_in_dim | |
| self.hidden_states_dim = self.target_dim | |
| self.pixel_wise_crosspond = pixel_wise_crosspond | |
| self.crosspond_effect_on = crosspond_effect_on | |
| self.crosspond_chain_pos = crosspond_chain_pos | |
| self.views = views | |
| self.simple_3d = simple_3d | |
| if self.with_proj_in and self.enabled: | |
| self.in_linear = torch.nn.Linear(self.proj_in_dim, self.target_dim, bias=False) | |
| if self.target_dim == self.proj_in_dim: | |
| self.in_linear.weight.data = torch.eye(proj_in_dim) | |
| else: | |
| self.in_linear = None | |
| if self.pixel_wise_crosspond and self.enabled: | |
| self.crosspond_norm, self.crosspond_attention = construct_pix2pix_attention(self.hidden_states_dim, norm_type=norm_type) | |
| def do_crosspond_attention(self, hidden_states: torch.FloatTensor, other_states: torch.FloatTensor): | |
| hidden_states = self.crosspond_norm(hidden_states) | |
| batch, L, D = hidden_states.shape | |
| assert hidden_states.shape == other_states.shape, f"got {hidden_states.shape} and {other_states.shape}" | |
| # to -> batch * L, 1, D | |
| hidden_states = hidden_states.reshape(batch * L, 1, D) | |
| other_states = other_states.reshape(batch * L, 1, D) | |
| hidden_states_catted = other_states | |
| hidden_states = self.crosspond_attention( | |
| hidden_states, | |
| encoder_hidden_states=hidden_states_catted, | |
| ) | |
| return hidden_states.reshape(batch, L, D) | |
| def __call__( | |
| self, attn: Attention, hidden_states, encoder_hidden_states=None, attention_mask=None, | |
| ref_dict: dict = None, mode=None, **kwargs | |
| ) -> Any: | |
| if not self.enabled: | |
| return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) | |
| if encoder_hidden_states is None: | |
| encoder_hidden_states = hidden_states | |
| assert ref_dict is not None | |
| if (mode or self.mode) == 'extract': | |
| ref_dict[self.name] = hidden_states | |
| hidden_states1 = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) | |
| if self.pixel_wise_crosspond and self.crosspond_chain_pos == "after": | |
| ref_dict[self.name] = hidden_states1 | |
| return hidden_states1 | |
| elif (mode or self.mode) == 'inject': | |
| ref_state = ref_dict.pop(self.name) | |
| if self.with_proj_in: | |
| ref_state = self.in_linear(ref_state) | |
| B, L, D = ref_state.shape | |
| if hidden_states.shape[0] == B: | |
| modalities = 1 | |
| views = 1 | |
| else: | |
| modalities = hidden_states.shape[0] // B // self.views | |
| views = self.views | |
| if self.pixel_wise_crosspond: | |
| if self.crosspond_effect_on == "all": | |
| ref_state = ref_state[:, None].expand(-1, modalities * views, -1, -1).reshape(-1, *ref_state.shape[-2:]) | |
| if self.crosspond_chain_pos == "before": | |
| hidden_states = hidden_states + self.do_crosspond_attention(hidden_states, ref_state) | |
| hidden_states1 = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) | |
| if self.crosspond_chain_pos == "parralle": | |
| hidden_states1 = hidden_states1 + self.do_crosspond_attention(hidden_states, ref_state) | |
| if self.crosspond_chain_pos == "after": | |
| hidden_states1 = hidden_states1 + self.do_crosspond_attention(hidden_states1, ref_state) | |
| return hidden_states1 | |
| else: | |
| assert self.crosspond_effect_on == "first" | |
| # hidden_states [B * modalities * views, L, D] | |
| # ref_state [B, L, D] | |
| ref_state = ref_state[:, None].expand(-1, modalities, -1, -1).reshape(-1, ref_state.shape[-2], ref_state.shape[-1]) # [B * modalities, L, D] | |
| def do_paritial_crosspond(hidden_states, ref_state): | |
| first_view_hidden_states = hidden_states.view(-1, views, hidden_states.shape[1], hidden_states.shape[2])[:, 0] # [B * modalities, L, D] | |
| hidden_states2 = self.do_crosspond_attention(first_view_hidden_states, ref_state) # [B * modalities, L, D] | |
| hidden_states2_padded = torch.zeros_like(hidden_states).reshape(-1, views, hidden_states.shape[1], hidden_states.shape[2]) | |
| hidden_states2_padded[:, 0] = hidden_states2 | |
| hidden_states2_padded = hidden_states2_padded.reshape(-1, hidden_states.shape[1], hidden_states.shape[2]) | |
| return hidden_states2_padded | |
| if self.crosspond_chain_pos == "before": | |
| hidden_states = hidden_states + do_paritial_crosspond(hidden_states, ref_state) | |
| hidden_states1 = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) # [B * modalities * views, L, D] | |
| if self.crosspond_chain_pos == "parralle": | |
| hidden_states1 = hidden_states1 + do_paritial_crosspond(hidden_states, ref_state) | |
| if self.crosspond_chain_pos == "after": | |
| hidden_states1 = hidden_states1 + do_paritial_crosspond(hidden_states1, ref_state) | |
| return hidden_states1 | |
| elif self.simple_3d: | |
| B, L, C = encoder_hidden_states.shape | |
| mv = self.views | |
| encoder_hidden_states = encoder_hidden_states.reshape(B // mv, mv, L, C) | |
| ref_state = ref_state[:, None] | |
| encoder_hidden_states = torch.cat([encoder_hidden_states, ref_state], dim=1) | |
| encoder_hidden_states = encoder_hidden_states.reshape(B // mv, 1, (mv+1) * L, C) | |
| encoder_hidden_states = encoder_hidden_states.repeat(1, mv, 1, 1).reshape(-1, (mv+1) * L, C) | |
| return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) | |
| else: | |
| ref_state = ref_state[:, None].expand(-1, modalities * views, -1, -1).reshape(-1, ref_state.shape[-2], ref_state.shape[-1]) | |
| encoder_hidden_states = torch.cat([encoder_hidden_states, ref_state], dim=1) | |
| return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) | |
| else: | |
| raise NotImplementedError("mode or self.mode is required to be 'extract' or 'inject'") | |
| def add_extra_processor(model: torch.nn.Module, enable_filter=lambda x:True, **kwargs): | |
| return_dict = torch.nn.ModuleDict() | |
| proj_in_dim = kwargs.get('proj_in_dim', False) | |
| kwargs.pop('proj_in_dim', None) | |
| def recursive_add_processors(name: str, module: torch.nn.Module): | |
| for sub_name, child in module.named_children(): | |
| if "ref_unet" not in (sub_name + name): | |
| recursive_add_processors(f"{name}.{sub_name}", child) | |
| if isinstance(module, Attention): | |
| new_processor = ExtraAttnProc( | |
| chained_proc=module.get_processor(), | |
| enabled=enable_filter(f"{name}.processor"), | |
| name=f"{name}.processor", | |
| proj_in_dim=proj_in_dim if proj_in_dim else module.cross_attention_dim, | |
| target_dim=module.cross_attention_dim, | |
| **kwargs | |
| ) | |
| module.set_processor(new_processor) | |
| return_dict[f"{name}.processor".replace(".", "__")] = new_processor | |
| for name, module in model.named_children(): | |
| recursive_add_processors(name, module) | |
| return return_dict | |
| def switch_extra_processor(model, enable_filter=lambda x:True): | |
| def recursive_add_processors(name: str, module: torch.nn.Module): | |
| for sub_name, child in module.named_children(): | |
| recursive_add_processors(f"{name}.{sub_name}", child) | |
| if isinstance(module, ExtraAttnProc): | |
| module.enabled = enable_filter(name) | |
| for name, module in model.named_children(): | |
| recursive_add_processors(name, module) | |
| class multiviewAttnProc(torch.nn.Module): | |
| def __init__( | |
| self, | |
| chained_proc, | |
| enabled=False, | |
| name=None, | |
| hidden_states_dim=None, | |
| chain_pos="parralle", # before or parralle or after | |
| num_modalities=1, | |
| views=4, | |
| base_img_size=64, | |
| ) -> None: | |
| super().__init__() | |
| self.enabled = enabled | |
| self.chained_proc = chained_proc | |
| self.name = name | |
| self.hidden_states_dim = hidden_states_dim | |
| self.num_modalities = num_modalities | |
| self.views = views | |
| self.base_img_size = base_img_size | |
| self.chain_pos = chain_pos | |
| self.diff_joint_attn = True | |
| def __call__( | |
| self, | |
| attn: Attention, | |
| hidden_states: torch.FloatTensor, | |
| encoder_hidden_states: Optional[torch.FloatTensor] = None, | |
| attention_mask: Optional[torch.FloatTensor] = None, | |
| **kwargs | |
| ) -> torch.Tensor: | |
| if not self.enabled: | |
| return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) | |
| B, L, C = hidden_states.shape | |
| mv = self.views | |
| hidden_states = hidden_states.reshape(B // mv, mv, L, C).reshape(-1, mv * L, C) | |
| hidden_states = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) | |
| return hidden_states.reshape(B // mv, mv, L, C).reshape(-1, L, C) | |
| def add_multiview_processor(model: torch.nn.Module, enable_filter=lambda x:True, **kwargs): | |
| return_dict = torch.nn.ModuleDict() | |
| def recursive_add_processors(name: str, module: torch.nn.Module): | |
| for sub_name, child in module.named_children(): | |
| if "ref_unet" not in (sub_name + name): | |
| recursive_add_processors(f"{name}.{sub_name}", child) | |
| if isinstance(module, Attention): | |
| new_processor = multiviewAttnProc( | |
| chained_proc=module.get_processor(), | |
| enabled=enable_filter(f"{name}.processor"), | |
| name=f"{name}.processor", | |
| hidden_states_dim=module.inner_dim, | |
| **kwargs | |
| ) | |
| module.set_processor(new_processor) | |
| return_dict[f"{name}.processor".replace(".", "__")] = new_processor | |
| for name, module in model.named_children(): | |
| recursive_add_processors(name, module) | |
| return return_dict | |
| def switch_multiview_processor(model, enable_filter=lambda x:True): | |
| def recursive_add_processors(name: str, module: torch.nn.Module): | |
| for sub_name, child in module.named_children(): | |
| recursive_add_processors(f"{name}.{sub_name}", child) | |
| if isinstance(module, Attention): | |
| processor = module.get_processor() | |
| if isinstance(processor, multiviewAttnProc): | |
| processor.enabled = enable_filter(f"{name}.processor") | |
| for name, module in model.named_children(): | |
| recursive_add_processors(name, module) | |
| class NNModuleWrapper(torch.nn.Module): | |
| def __init__(self, module): | |
| super().__init__() | |
| self.module = module | |
| def forward(self, *args, **kwargs): | |
| return self.module(*args, **kwargs) | |
| def __getattr__(self, name: str): | |
| try: | |
| return super().__getattr__(name) | |
| except AttributeError: | |
| return getattr(self.module, name) | |
| class AttnProcessorSwitch(torch.nn.Module): | |
| def __init__( | |
| self, | |
| proc_dict: dict, | |
| enabled_proc="default", | |
| name=None, | |
| switch_name="default_switch", | |
| ): | |
| super().__init__() | |
| self.proc_dict = torch.nn.ModuleDict({k: (v if isinstance(v, torch.nn.Module) else NNModuleWrapper(v)) for k, v in proc_dict.items()}) | |
| self.enabled_proc = enabled_proc | |
| self.name = name | |
| self.switch_name = switch_name | |
| self.choose_module(enabled_proc) | |
| def choose_module(self, enabled_proc): | |
| self.enabled_proc = enabled_proc | |
| assert enabled_proc in self.proc_dict.keys() | |
| def __call__( | |
| self, | |
| *args, | |
| **kwargs | |
| ) -> torch.FloatTensor: | |
| used_proc = self.proc_dict[self.enabled_proc] | |
| return used_proc(*args, **kwargs) | |
| def add_switch(model: torch.nn.Module, module_filter=lambda x:True, switch_dict_fn=lambda x: {"default": x}, switch_name="default_switch", enabled_proc="default"): | |
| return_dict = torch.nn.ModuleDict() | |
| def recursive_add_processors(name: str, module: torch.nn.Module): | |
| for sub_name, child in module.named_children(): | |
| if "ref_unet" not in (sub_name + name): | |
| recursive_add_processors(f"{name}.{sub_name}", child) | |
| if isinstance(module, Attention): | |
| processor = module.get_processor() | |
| if module_filter(processor): | |
| proc_dict = switch_dict_fn(processor) | |
| new_processor = AttnProcessorSwitch( | |
| proc_dict=proc_dict, | |
| enabled_proc=enabled_proc, | |
| name=f"{name}.processor", | |
| switch_name=switch_name, | |
| ) | |
| module.set_processor(new_processor) | |
| return_dict[f"{name}.processor".replace(".", "__")] = new_processor | |
| for name, module in model.named_children(): | |
| recursive_add_processors(name, module) | |
| return return_dict | |
| def change_switch(model: torch.nn.Module, switch_name="default_switch", enabled_proc="default"): | |
| def recursive_change_processors(name: str, module: torch.nn.Module): | |
| for sub_name, child in module.named_children(): | |
| recursive_change_processors(f"{name}.{sub_name}", child) | |
| if isinstance(module, Attention): | |
| processor = module.get_processor() | |
| if isinstance(processor, AttnProcessorSwitch) and processor.switch_name == switch_name: | |
| processor.choose_module(enabled_proc) | |
| for name, module in model.named_children(): | |
| recursive_change_processors(name, module) | |
| ########## Hack: Attention fix ############# | |
| from diffusers.models.attention import Attention | |
| def forward( | |
| self, | |
| hidden_states: torch.FloatTensor, | |
| encoder_hidden_states: Optional[torch.FloatTensor] = None, | |
| attention_mask: Optional[torch.FloatTensor] = None, | |
| **cross_attention_kwargs, | |
| ) -> torch.Tensor: | |
| r""" | |
| The forward method of the `Attention` class. | |
| Args: | |
| hidden_states (`torch.Tensor`): | |
| The hidden states of the query. | |
| encoder_hidden_states (`torch.Tensor`, *optional*): | |
| The hidden states of the encoder. | |
| attention_mask (`torch.Tensor`, *optional*): | |
| The attention mask to use. If `None`, no mask is applied. | |
| **cross_attention_kwargs: | |
| Additional keyword arguments to pass along to the cross attention. | |
| Returns: | |
| `torch.Tensor`: The output of the attention layer. | |
| """ | |
| # The `Attention` class can call different attention processors / attention functions | |
| # here we simply pass along all tensors to the selected processor class | |
| # For standard processors that are defined here, `**cross_attention_kwargs` is empty | |
| return self.processor( | |
| self, | |
| hidden_states, | |
| encoder_hidden_states=encoder_hidden_states, | |
| attention_mask=attention_mask, | |
| **cross_attention_kwargs, | |
| ) | |
| Attention.forward = forward |