File size: 3,522 Bytes
fc56851
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from diffusers.modular_pipelines import ModularPipelineBlocks, InputParam, OutputParam, ModularPipeline, PipelineState
import numpy as np
import torch
import PIL
from typing import List
from diffusers.modular_pipelines.wan.before_denoise import WanInputStep


def calculate_dimensions(image, mod_value):
    """
    Calculate output dimensions based on resolution settings.

    Args:
        image: PIL Image
        mod_value: Modulo value for dimension alignment

    Returns:
        Tuple of (width, height)
    """

    # Get max area from preset or override
    target_area = 720 * 1280

    # Calculate dimensions maintaining aspect ratio
    aspect_ratio = image.height / image.width
    calculated_height = round(np.sqrt(target_area * aspect_ratio)) // mod_value * mod_value
    calculated_width = round(np.sqrt(target_area / aspect_ratio)) // mod_value * mod_value

    return calculated_width, calculated_height


# Make the input step aware of `negative_prompt_embeds`.
# ChronoEdit uses a `guidance_scale` of 1.
class ChronoEditInputStep(WanInputStep):
    model_name = "chronoedit"

    @property
    def inputs(self) -> List[InputParam]:
        return [
            InputParam("num_videos_per_prompt", default=1),
            InputParam(
                "prompt_embeds",
                required=True,
                type_hint=torch.Tensor,
                description="Pre-generated text embeddings. Can be generated from text_encoder step.",
            ),
            InputParam(
                "negative_prompt_embeds",
                type_hint=torch.Tensor,
                description="Pre-generated negative text embeddings. Can be generated from text_encoder step.",
            ),
        ]


class ChronoEditImageInputStep(ModularPipelineBlocks):
    model_name = "chronoedit"

    @property
    def inputs(self) -> List[InputParam]:
        return [InputParam(name="image")]

    @property
    def intermediate_outputs(self) -> List[OutputParam]:
        return [
            OutputParam(name="image", type_hint=PIL.Image.Image),
            OutputParam(name="height", type_hint=int, description="The height set w.r.t input image and specs"),
            OutputParam(name="width", type_hint=int, description="The width set w.r.t input image and specs"),
        ]

    def __call__(self, components: ModularPipeline, state: PipelineState) -> PipelineState:
        block_state = self.get_block_state(state)
        image = block_state.image
        mod_value = components.vae_scale_factor_spatial * components.transformer.config.patch_size[1]
        
        width, height = calculate_dimensions(image, mod_value)
        block_state.image = image.resize((width, height))
        block_state.height = height 
        block_state.width = width
        
        self.set_block_state(state, block_state)
        return components, state