forked from modelscope/DiffSynth-Studio
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpredict.py
More file actions
156 lines (136 loc) · 5.14 KB
/
predict.py
File metadata and controls
156 lines (136 loc) · 5.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# Prediction interface for Cog ⚙️
# https://cog.run/python
import os
import subprocess
import time
import torch
from cog import BasePredictor, Input, Path, BaseModel
from diffsynth import (
save_video,
ModelManager,
SVDVideoPipeline,
HunyuanDiTImagePipeline,
)
os.environ["TOKENIZERS_PARALLELISM"] = "True"
# Download the models following the instruction in https://github.com/modelscope/DiffSynth-Studio/blob/main/examples/ExVideo/ExVideo_svd_test.py
# Then push and load from replicate.delivery for faster booting
MODEL_URL = (
"https://weights.replicate.delivery/default/modelscope/DiffSynth-Studio/ExVideo.tar"
)
MODEL_CACHE = "demo_model_cache"
class ModelOutput(BaseModel):
image: Path
video: Path
upscale_video: Path
def download_weights(url, dest):
start = time.time()
print("downloading url: ", url)
print("downloading to: ", dest)
subprocess.check_call(["pget", "-x", url, dest], close_fds=False)
print("downloading took: ", time.time() - start)
class Predictor(BasePredictor):
def setup(self) -> None:
"""Load the model into memory to make running multiple predictions efficient"""
if not os.path.exists(MODEL_CACHE):
download_weights(MODEL_URL, MODEL_CACHE)
image_model_manager = ModelManager(
torch_dtype=torch.float16,
device="cuda",
file_path_list=[
f"{MODEL_CACHE}/HunyuanDiT/t2i/clip_text_encoder/pytorch_model.bin",
f"{MODEL_CACHE}/HunyuanDiT/t2i/mt5/pytorch_model.bin",
f"{MODEL_CACHE}/HunyuanDiT/t2i/model/pytorch_model_ema.pt",
f"{MODEL_CACHE}/HunyuanDiT/t2i/sdxl-vae-fp16-fix/diffusion_pytorch_model.bin",
],
)
self.image_pipe = HunyuanDiTImagePipeline.from_model_manager(
image_model_manager
)
video_model_manager = ModelManager(
torch_dtype=torch.float16,
device="cuda",
file_path_list=[
f"{MODEL_CACHE}/stable_video_diffusion/svd_xt.safetensors",
f"{MODEL_CACHE}/stable_video_diffusion/model.fp16.safetensors",
],
)
self.video_pipe = SVDVideoPipeline.from_model_manager(video_model_manager)
def predict(
self,
prompt: str = Input(
description="Input prompt",
default="bonfire, on the stone",
),
negative_prompt: str = Input(
description="Specify things to not see in the output",
default="错误的眼睛,糟糕的人脸,毁容,糟糕的艺术,变形,多余的肢体,模糊的颜色,模糊,重复,病态,残缺,",
),
num_frames: int = Input(
description="Number of the output frames", default=128, le=128
),
num_inference_steps: int = Input(
description="Number of denoising steps for image and video generation",
ge=1,
le=500,
default=25,
),
num_inference_steps_upscale_video: int = Input(
description="Number of denoising steps for upscaling the video",
ge=1,
le=500,
default=25,
),
seed: int = Input(
description="Random seed. Leave blank to randomize the seed", default=None
),
) -> ModelOutput:
"""Run a single prediction on the model"""
if seed is None:
seed = int.from_bytes(os.urandom(2), "big")
print(f"Using seed: {seed}")
image = self.image_pipe(
prompt=prompt,
negative_prompt=negative_prompt,
num_inference_steps=num_inference_steps,
height=1024,
width=1024,
)
# Now, generate a video with resolution of 512. 20GB VRAM is required.
video = self.video_pipe(
input_image=image.resize((512, 512)),
num_frames=num_frames,
fps=30,
height=512,
width=512,
motion_bucket_id=127,
num_inference_steps=num_inference_steps,
min_cfg_scale=2,
max_cfg_scale=2,
contrast_enhance_scale=1.2,
)
# Upscale the video. 52GB VRAM is required.
upscale_video = self.video_pipe(
input_image=image.resize((1024, 1024)),
input_video=[frame.resize((1024, 1024)) for frame in video],
denoising_strength=0.5,
num_frames=num_frames,
fps=30,
height=1024,
width=1024,
motion_bucket_id=127,
num_inference_steps=num_inference_steps_upscale_video,
min_cfg_scale=2,
max_cfg_scale=2,
contrast_enhance_scale=1.2,
)
out_image_path = "/tmp/image.png"
image.save(out_image_path)
out_video_path = "/tmp/video_512.mp4"
save_video(video, out_video_path, fps=30)
upscale_video_path = "/tmp/video_1024.mp4"
save_video(upscale_video, upscale_video_path, fps=30)
return ModelOutput(
image=Path(out_image_path),
video=Path(out_video_path),
upscale_video=Path(upscale_video_path),
)