-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathimage.py
More file actions
189 lines (151 loc) · 7.23 KB
/
image.py
File metadata and controls
189 lines (151 loc) · 7.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import os
import time
import random
import numpy as np
import torch
from PIL import Image
import folder_paths
from . import register_node
global_session_image = None
def tensor_to_temp_image(tensor_image, prefix="session_img"):
temp_dir = folder_paths.get_temp_directory()
filename = f"{prefix}_{random.randint(10000, 99999)}.png"
filepath = os.path.join(temp_dir, filename)
img_array = 255. * tensor_image[0].cpu().numpy()
img = Image.fromarray(np.clip(img_array, 0, 255).astype(np.uint8))
img.save(filepath)
return {"filename": filename, "subfolder": "", "type": "temp"}
@register_node
class SessionImageReceiver:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"initial_image": ("IMAGE",),
"current_loop_index": ("INT", {"default": 0, "min": 0, "max": 10000}),
},
}
RETURN_TYPES = ("IMAGE",)
RETURN_NAMES = ("current_image",)
OUTPUT_NODE = True
FUNCTION = "get_image"
CATEGORY = "🔁 Sequential Batcher/Image"
@classmethod
def IS_CHANGED(cls, **kwargs):
return time.time()
def get_image(self, initial_image, current_loop_index):
from . import loop
global global_session_image
loop_idx = current_loop_index[0] if isinstance(current_loop_index, list) else current_loop_index
is_first = (loop_idx == 0)
# Usamos getattr para evitar errores si el bucle aún no ha inicializado las variables
accumulated = getattr(loop, 'global_accumulated_frames', 0)
source_total = getattr(loop, 'global_source_frame_count', 1)
print(f"\n{'='*50}")
print(f"📥 [DEBUG] NODO: Image Receiver")
print(f" -> Ciclo actual: {loop_idx} | Progreso global: {accumulated} / {source_total} frames")
if is_first or global_session_image is None:
global_session_image = initial_image.clone().cpu()
print(f" -> 🆕 Iniciando sesión con la imagen ORIGINAL.")
selected = initial_image
else:
print(f" -> ♻️ Usando el Keyframe validado y rescatado de la RAM.")
selected = global_session_image
print(f" -> 🖼️ Tensor shape: {selected.shape}")
print(f"{'='*50}\n")
ui_image = tensor_to_temp_image(selected, "receiver")
return {"ui": {"images": [ui_image]}, "result": (selected, )}
@register_node
class SessionImageSender:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"generated_images": ("IMAGE",),
"current_loop_index": ("INT", {"default": 0, "min": 0, "max": 10000}),
"detect_faces": ("BOOLEAN", {"default": True}),
},
}
RETURN_TYPES = ("IMAGE",)
RETURN_NAMES = ("VALIDATED_IMAGES",)
OUTPUT_NODE = True
FUNCTION = "set_image"
CATEGORY = "🔁 Sequential Batcher/Image"
@classmethod
def IS_CHANGED(cls, **kwargs):
return time.time()
def set_image(self, generated_images, current_loop_index, detect_faces):
from . import loop
import cv2
import numpy as np
import os, folder_paths, torch
from PIL import Image
if generated_images is None:
raise ValueError("❌ ERROR CRÍTICO: No se recibieron imágenes en el Sender.")
global global_session_image
loop_idx = current_loop_index[0] if isinstance(current_loop_index, list) else current_loop_index
batch_size = generated_images.shape[0]
best_idx = batch_size - 1
print(f"\n{'='*50}")
print(f"📤 [DEBUG] NODO: Image Sender (Filtro Dinámico)")
print(f" -> Frames recibidos de la IA: {batch_size}")
if detect_faces:
try:
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
face_cascade = cv2.CascadeClassifier(cascade_path)
print(f" -> 🕵️ Buscando el último rostro frontal en reversa (Desde frame {batch_size - 1} hasta 0)...")
found = False
for i in range(batch_size - 1, -1, -1):
img_np = (generated_images[i].cpu().numpy() * 255.0).astype(np.uint8)
gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30))
if len(faces) > 0:
best_idx = i
found = True
print(f" -> ✅ Rostro detectado en el frame [{i}].")
break
if not found:
print(f" -> ⚠️ ALERTA: No se detectó rostro frontal en NINGÚN frame. Aceptando todo el lote para evitar bucles infinitos.")
best_idx = batch_size - 1
except Exception as e:
print(f" -> ⚠️ Error en OpenCV al validar: {e}")
else:
print(f" -> ⏩ Detección de rostros desactivada. Usando el último frame absoluto del lote ({batch_size - 1}).")
best_idx = batch_size - 1
# 1. TRUNCAR EL TENSOR
valid_images = generated_images[:best_idx + 1]
frames_accepted = best_idx + 1
# 2. AVANZAR LA MÁQUINA
stride = getattr(loop, 'global_select_every_nth', 1)
advanced_original_frames = frames_accepted * stride
if not hasattr(loop, 'global_accumulated_frames'):
loop.global_accumulated_frames = 0
# LTX Mode Fix: Avoid counting the overlap frame in the progression
ltx_mode_active = getattr(loop, 'global_ltx_mode', False)
if ltx_mode_active:
# We subtract 1 from frames_accepted to account for the anchor frame
# The overlap frame shouldn't advance the global accumulated frames count
# since it will be reused as the start of the next chunk.
advanced_original_frames = max(1, (frames_accepted - 1) * stride)
loop.global_accumulated_frames += advanced_original_frames
source_total = getattr(loop, 'global_source_frame_count', 1)
# 🚀 NUEVO: Detectar si es el ciclo final o un ciclo único
is_final_cycle = loop.global_accumulated_frames >= source_total
print(f" -> ✂️ Tensor truncado a {frames_accepted} frames válidos.")
print(f" -> 📈 Timeline avanzado a {loop.global_accumulated_frames} / {source_total}")
last_frame = valid_images[-1:].clone().cpu()
global_session_image = last_frame
# 🛡️ Solo guardamos el keyframe si habrá un próximo ciclo
if not is_final_cycle:
out_dir = folder_paths.get_output_directory()
filename = f"keyframe_{loop_idx:03d}.png"
filepath = os.path.join(out_dir, filename)
img_array = 255. * last_frame[0].numpy()
img = Image.fromarray(np.clip(img_array, 0, 255).astype(np.uint8))
img.save(filepath)
print(f" -> 💾 Keyframe seguro guardado: {filename}")
else:
print(f" -> 🚀 Ciclo final/único detectado. Omitiendo guardado de keyframe.")
print(f"{'='*50}\n")
ui_image = tensor_to_temp_image(last_frame, "sender")
return {"ui": {"images": [ui_image]}, "result": (valid_images, )}