From 9d1aad85aef299abfc82adfc7dd69dd9df521922 Mon Sep 17 00:00:00 2001 From: Patrick Date: Thu, 3 Apr 2025 09:37:14 -0700 Subject: [PATCH] ux update to terminal --- tetra/core/client/client_manager.py | 312 ++++++++++++- tetra/core/client/remote_execution.py | 151 +++++-- tetra/core/utils/terminal.py | 622 ++++++++++++++++++++++++++ 3 files changed, 1037 insertions(+), 48 deletions(-) create mode 100644 tetra/core/utils/terminal.py diff --git a/tetra/core/client/client_manager.py b/tetra/core/client/client_manager.py index b8e1fdab..d446db70 100644 --- a/tetra/core/client/client_manager.py +++ b/tetra/core/client/client_manager.py @@ -1,12 +1,25 @@ import base64 import cloudpickle +import time from functools import wraps -from typing import Union, List +from typing import Union, List, Dict from .remote_execution import RemoteExecutionClient from ..resources.serverless import ServerlessResource from ..resources.resource_manager import ResourceManager +from ..utils.terminal import ( + Spinner, print_tetra, print_error, print_success, + print_warning, style_text, print_header, print_separator, + print_box, print_step, TetraNotifier, show_summary, SmartProgress, + print_info +) from ... import remote_execution_pb2 +# Keep track of servers already announced/connected to avoid redundant messages +_initialized_servers = set() +# Keep track of whether we've shown the welcome message +_shown_welcome = False +# Track operations for summary +_operations = [] def get_function_source(func): """Extract the function source code without the decorator.""" @@ -63,15 +76,57 @@ def remote( def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): + global _shown_welcome, _operations + + # Start timing the operation + start_time = time.time() + operation_record = { + "operation": f"Execute {func.__name__}", + "start_time": start_time, + "success": False, + } + + # Show welcome message only once per session + if not _shown_welcome: + TetraNotifier.welcome() + _shown_welcome = True + global_client = RemoteExecutionClient() _resource_manager = ResourceManager() + effective_server_spec = None # Initialize with a default - # Determine if we're using dynamic provisioning or static server + # Print execution step + print_separator() + print_step(1, f"Preparing {style_text(func.__name__, 'bright_yellow', 'bold')}", + "Setting up resources and initializing execution environment") + + # Resource provisioning if resource_config: # Dynamic provisioning try: - # Get or create the resource - server_name = await _resource_manager.get_or_create_resource(resource_config) + # Show spinner while provisioning + with Spinner(f"Provisioning compute resources for {func.__name__}...", + spinner_type="dots", + icon="server", + color="bright_yellow"): + server_name = await _resource_manager.get_or_create_resource(resource_config) + + # Only show the detailed notification for newly created resources + if server_name not in _initialized_servers: + # Get resource details for display + resource_details = {} + for rid, details in _resource_manager._resources.items(): + if details["server_name"] == server_name: + resource_details = { + "gpu": details.get("gpuIds", "N/A"), + "template": details.get("templateId", "N/A"), + "endpoint": details.get("endpoint_url", "N/A") + } + break + + TetraNotifier.server_ready(server_name, resource_details) + else: + print_success(f"Using existing compute resource: {style_text(server_name, 'bright_green', 'bold')}") # Check if server is already registered if server_name not in global_client.servers: @@ -89,26 +144,74 @@ async def wrapper(*args, **kwargs): resource_details = _resource_manager._resources[resource_id] - # Register with the client - endpoint_url = resource_details["endpoint_url"] - print( - f"Registering RunPod endpoint: {server_name} at {endpoint_url}" - ) - await global_client.add_runpod_server( - server_name, endpoint_url - ) + # Register with the client ONLY if not already initialized + if server_name not in _initialized_servers: + endpoint_url = resource_details["endpoint_url"] + print_step(2, "Connecting to compute resource", + f"Establishing secure connection to {style_text(server_name, 'bright_cyan')}") + + with Spinner(f"Connecting to {server_name}...", + spinner_type="bounce", + icon="network", + color="bright_blue"): + await global_client.add_runpod_server( + server_name, endpoint_url + ) + + print_success(f"Secure connection established to {style_text(server_name, 'bright_cyan', 'bold')}") + _initialized_servers.add(server_name) + else: + # If already initialized, just ensure it's in the client's list + if server_name not in global_client.servers: + endpoint_url = resource_details["endpoint_url"] + await global_client.add_runpod_server(server_name, endpoint_url) # Ensure there's a pool for this resource pool_name = f"pool_{resource_id}" if pool_name not in global_client.pools: global_client.create_pool(pool_name, [server_name]) - # Use the server name for execution effective_server_spec = server_name except Exception as e: - raise Exception(f"Failed to provision resource: {str(e)}") + elapsed = time.time() - start_time + operation_record.update({ + "success": False, + "result": "Failed to provision", + "duration": f"{elapsed:.1f}s", + "error": str(e) + }) + _operations.append(operation_record) + + print_error(f"Failed to provision resource: {str(e)}") + raise # Re-raise the exception after logging + else: + # Handle the case where no resource_config is provided + # Allow for server_spec to be passed directly instead + # First, check if this function has been called with a server_spec argument + server_spec = kwargs.get('server_spec') + if server_spec: + # Use the provided server_spec directly + effective_server_spec = server_spec + # Remove server_spec from kwargs to avoid passing it to the function + del kwargs['server_spec'] + print_success(f"Using specified server: {style_text(server_spec, 'bright_green', 'bold')}") + else: + # No explicit resource or server specification + elapsed = time.time() - start_time + operation_record.update({ + "success": False, + "result": "No target specified", + "duration": f"{elapsed:.1f}s", + }) + _operations.append(operation_record) + + print_error("Execution requires either resource_config or a server_spec parameter.") + raise ValueError("No execution target specified (resource_config or server_spec needed).") + print_step(3, "Preparing function for remote execution", + f"Serializing {style_text(func.__name__, 'bright_magenta', 'bold')} and its arguments") + source = get_function_source(func) # Serialize arguments using cloudpickle instead of JSON @@ -130,22 +233,195 @@ async def wrapper(*args, **kwargs): # Add dependencies if provided if dependencies: + dependencies_str = ", ".join(dependencies) + print_info(f"Including dependencies: {style_text(dependencies_str, 'bright_blue')}") request_args["dependencies"] = dependencies request = remote_execution_pb2.FunctionRequest(**request_args) - stub = global_client.get_stub(effective_server_spec, fallback) + try: + print_step(4, "Establishing execution stub", + f"Getting execution stub for {style_text(effective_server_spec, 'bright_cyan')}") + + with Spinner(f"Preparing communication channel...", + spinner_type="arrows", + icon="network", + color="bright_blue"): + stub = global_client.get_stub(effective_server_spec, fallback) + + except ValueError as e: + elapsed = time.time() - start_time + operation_record.update({ + "success": False, + "result": "Failed to get stub", + "duration": f"{elapsed:.1f}s", + "error": str(e) + }) + _operations.append(operation_record) + + print_error(f"Failed to get execution stub: {e}") + raise try: + print_step(5, "Executing function remotely", + f"Running {style_text(func.__name__, 'bright_magenta', 'bold')} on {style_text(effective_server_spec, 'bright_cyan', 'bold')}") + + # Notify about job submission + TetraNotifier.job_submitted(func.__name__, effective_server_spec) + + # The actual execution happens in the stub, which now has its own spinner + execution_start = time.time() response = await stub.ExecuteFunction(request) + execution_time = time.time() - execution_start + if response.success: + # Record successful operation + elapsed = time.time() - start_time + operation_record.update({ + "success": True, + "result": "Success", + "duration": f"{elapsed:.1f}s", + "execution_time": execution_time + }) + _operations.append(operation_record) + + # Show completion notification + TetraNotifier.job_completed(func.__name__, execution_time) + # Deserialize result using cloudpickle instead of JSON - return cloudpickle.loads(base64.b64decode(response.result)) + result = cloudpickle.loads(base64.b64decode(response.result)) + + # Give a visual separator before showing results + print_separator() + print_box( + f"Function: {style_text(func.__name__, 'bright_magenta', 'bold')}\n" + + f"Server: {style_text(effective_server_spec, 'bright_cyan')}\n" + + f"Execution time: {style_text(f'{execution_time:.2f}s', 'bright_yellow')}\n" + + f"Total time: {style_text(f'{elapsed:.2f}s', 'bright_yellow')}", + title="Execution Summary", + color="bright_green" + ) + + return result else: - raise Exception(f"Remote execution failed: {response.error}") + error_msg = f"Remote execution failed: {response.error}" + + # Record failed operation + elapsed = time.time() - start_time + operation_record.update({ + "success": False, + "result": "Execution failed", + "duration": f"{elapsed:.1f}s", + "error": response.error, + "execution_time": execution_time + }) + _operations.append(operation_record) + + print_error(error_msg) + raise Exception(error_msg) except Exception as e: - raise Exception(f"All execution attempts failed: {str(e)}") + # Catch potential communication errors or other exceptions during execution + error_msg = f"Error during remote execution of {func.__name__}: {str(e)}" + print_error(error_msg) + + # Attempt fallback if specified + if fallback: + print_warning(f"Attempting fallback execution on {style_text(fallback, 'bright_yellow')}...") + try: + print_step(6, "Executing fallback", + f"Retrying {style_text(func.__name__, 'bright_magenta', 'bold')} on fallback {style_text(fallback, 'bright_yellow', 'bold')}") + + fallback_start = time.time() + fallback_stub = global_client.get_stub(fallback) + response = await fallback_stub.ExecuteFunction(request) + fallback_time = time.time() - fallback_start + + if response.success: + # Record successful fallback + elapsed = time.time() - start_time + operation_record.update({ + "success": True, + "result": "Fallback succeeded", + "duration": f"{elapsed:.1f}s", + "execution_time": fallback_time, + "fallback_used": True + }) + _operations.append(operation_record) + + print_success(f"Fallback execution of {style_text(func.__name__, 'bright_magenta', 'bold')} succeeded.") + TetraNotifier.job_completed(f"{func.__name__} (fallback)", fallback_time) + + # Give a visual separator before showing results + print_separator() + print_box( + f"Function: {style_text(func.__name__, 'bright_magenta', 'bold')} (Fallback)\n" + + f"Server: {style_text(fallback, 'bright_yellow')}\n" + + f"Execution time: {style_text(f'{fallback_time:.2f}s', 'bright_yellow')}\n" + + f"Total time: {style_text(f'{elapsed:.2f}s', 'bright_yellow')}", + title="Fallback Execution Summary", + color="bright_green" + ) + + return cloudpickle.loads(base64.b64decode(response.result)) + else: + error_msg = f"Fallback execution failed: {response.error}" + + # Record failed fallback + elapsed = time.time() - start_time + operation_record.update({ + "success": False, + "result": "All attempts failed", + "duration": f"{elapsed:.1f}s", + "error": f"Primary: {str(e)}, Fallback: {response.error}", + "fallback_used": True + }) + _operations.append(operation_record) + + print_error(error_msg) + raise Exception(error_msg) + except Exception as fallback_e: + error_msg = f"Fallback execution also failed: {str(fallback_e)}" + + # Record failed fallback with exception + elapsed = time.time() - start_time + operation_record.update({ + "success": False, + "result": "All attempts failed", + "duration": f"{elapsed:.1f}s", + "error": f"Primary: {str(e)}, Fallback exception: {str(fallback_e)}", + "fallback_used": True + }) + _operations.append(operation_record) + + print_error(error_msg) + raise Exception(f"All execution attempts failed. Original error: {str(e)}. Fallback error: {str(fallback_e)}") + else: + # Record the error without fallback + elapsed = time.time() - start_time + operation_record.update({ + "success": False, + "result": "Failed", + "duration": f"{elapsed:.1f}s", + "error": str(e) + }) + _operations.append(operation_record) + + raise Exception(f"All execution attempts failed: {str(e)}") # Re-raise if no fallback + finally: + # If we have accumulated operations, show a summary + if len(_operations) >= 5: + show_summary(_operations) + # Reset the operations list to avoid showing redundant information + _operations = [] return wrapper return decorator + +def show_execution_history(): + """Display a summary of all remote executions performed in this session.""" + global _operations + if _operations: + show_summary(_operations) + return len(_operations) + return 0 diff --git a/tetra/core/client/remote_execution.py b/tetra/core/client/remote_execution.py index f29b6291..e1fb3afb 100644 --- a/tetra/core/client/remote_execution.py +++ b/tetra/core/client/remote_execution.py @@ -1,8 +1,14 @@ import grpc.aio import random -from typing import Union, List +import logging +import time +from typing import Union, List, Dict, Optional from ..utils.singleton import SingletonMixin from ..resources.resource_manager import ResourceManager +from ..utils.terminal import ( + Spinner, print_tetra, print_success, print_error, print_warning, + style_text, print_timestamp, print_step, TetraNotifier +) from ... import remote_execution_pb2_grpc, remote_execution_pb2 @@ -70,22 +76,31 @@ def __init__(self, endpoint_url): import runpod import os - # Set RunPod API key + # Configure RunPod API key api_key = os.environ.get("RUNPOD_API_KEY") if not api_key: raise ValueError("RUNPOD_API_KEY environment variable is not set") - runpod.api_key = api_key + # Get runpod logger + self.runpod_logger = logging.getLogger("runpod") + self.original_runpod_log_level = self.runpod_logger.level + self.endpoint_url = endpoint_url - # Extract endpoint ID from URL self.endpoint_id = endpoint_url.strip("/").split("/")[-1] - print( - f"Initialized RunPod stub for endpoint: {endpoint_url} (ID: {self.endpoint_id})" - ) # Initialize the RunPod endpoint self.endpoint = runpod.Endpoint(self.endpoint_id) + + # Job state tracking + self.active_jobs = {} + + def _silence_runpod_logs(self): + self.original_runpod_log_level = self.runpod_logger.level + self.runpod_logger.setLevel(logging.WARNING) # Silence INFO logs + + def _restore_runpod_logs(self): + self.runpod_logger.setLevel(self.original_runpod_log_level) async def ExecuteFunction(self, request): """ @@ -110,32 +125,108 @@ async def ExecuteFunction(self, request): if hasattr(request, "dependencies") and request.dependencies: payload["dependencies"] = [dep for dep in request.dependencies] - print(f"Executing function on RunPod endpoint ID: {self.endpoint_id}") + # Use spinner and silence runpod logs during network operations + run_request = None + status = None + output = None + + # Track timing for job execution + job_start_time = time.time() try: - # Run using the RunPod SDK (non-async) loop = asyncio.get_event_loop() - run_request = await loop.run_in_executor( - None, lambda: self.endpoint.run({"input": payload}) + with Spinner( + f"Submitting '{request.function_name}' to RunPod compute service...", + spinner_type="dots", + color="bright_yellow", + icon="rocket" + ): + self._silence_runpod_logs() + run_request = await loop.run_in_executor( + None, lambda: self.endpoint.run({"input": payload}) + ) + self._restore_runpod_logs() + + # Try to extract job ID if available + job_id = getattr(run_request, "id", None) + job_id_str = f" (ID: {job_id})" if job_id else "" + + print_timestamp( + f"{style_text('✦', 'bright_cyan')} Job submitted to RunPod compute{job_id_str}", + color="bright_white" ) - # print(f"Job submitted with ID: {run_request.id}") - - # Initial status check without blocking - status = await loop.run_in_executor(None, lambda: run_request.status()) - - print(f"Initial job status: {status}") - - # Wait for completion with timeout - if status != "COMPLETED": - output = await loop.run_in_executor( - None, - lambda: run_request.output(timeout=300), # 5 minute timeout + # Show execution spinner with time tracking + spinner = Spinner( + f"Running {style_text(request.function_name, 'bright_magenta', 'bold')} on cloud compute...", + spinner_type="moon", + color="bright_cyan", + icon="compute" + ) + + # Continuously update the spinner message with elapsed time + with spinner: + self._silence_runpod_logs() + # Initial status check + status = await loop.run_in_executor(None, lambda: run_request.status()) + + # Wait for completion only if not already completed + if status != "COMPLETED": + # Get intermediate status updates every 2 seconds + remaining_time = 300 # 5 minute timeout + interval = 2 + + while status != "COMPLETED" and remaining_time > 0: + # Update spinner message with elapsed time + elapsed = time.time() - job_start_time + spinner.update_message( + f"Running {style_text(request.function_name, 'bright_magenta', 'bold')} " + + f"on cloud compute... [{style_text(f'{elapsed:.1f}s', 'bright_yellow')}]" + ) + + # Wait for a bit before checking again + await asyncio.sleep(interval) + remaining_time -= interval + + # Check status + status = await loop.run_in_executor(None, lambda: run_request.status()) + + if status == "FAILED": + # If status is FAILED, break early + break + + # Get the output once we're done or timed out + if status == "COMPLETED": + output = await loop.run_in_executor(None, lambda: run_request.output()) + else: + # If not completed, try with timeout + output = await loop.run_in_executor( + None, + lambda: run_request.output(timeout=remaining_time) + ) + else: + # Already completed, just get the output + output = await loop.run_in_executor(None, lambda: run_request.output()) + + self._restore_runpod_logs() + + # Calculate total execution time + execution_time = time.time() - job_start_time + execution_time_formatted = f"{execution_time:.2f}" + + # Show completion message + if status == "COMPLETED": + print_timestamp( + f"{style_text('✓', 'bright_green')} Remote execution completed in {style_text(execution_time_formatted + 's', 'bright_yellow')}", + color="bright_green" ) else: - output = await loop.run_in_executor(None, lambda: run_request.output()) - - print(f"Job completed, output received") + # If timeout but still got output + print_timestamp( + f"{style_text('⚠', 'bright_yellow')} Remote execution status: {style_text(status, 'bright_yellow')} " + + f"after {style_text(execution_time_formatted + 's', 'bright_yellow')}", + color="bright_yellow" + ) # Process the output if isinstance(output, dict) and "success" in output: @@ -148,7 +239,7 @@ async def ExecuteFunction(self, request): success=False, error=output.get("error", "Unknown error") ) else: - # Direct output from RunPod + # Direct output from RunPod assumed successful serialized_result = base64.b64encode(cloudpickle.dumps(output)).decode( "utf-8" ) @@ -157,10 +248,10 @@ async def ExecuteFunction(self, request): ) except Exception as e: + self._restore_runpod_logs() # Ensure logs are restored on error import traceback - error_traceback = traceback.format_exc() - print(f"Exception during RunPod execution: {str(e)}\n{error_traceback}") + print_error(f"Exception during RunPod execution: {str(e)}\n{error_traceback}") return remote_execution_pb2.FunctionResponse( success=False, error=f"RunPod request failed: {str(e)}" ) @@ -176,7 +267,7 @@ async def ExecuteFunction(self, request): try: return await self.primary_stub.ExecuteFunction(request) except Exception as e: - print(f"Primary server failed: {e}, trying fallback...") + print_warning(f"Primary server failed: {e}, trying fallback...") fallback_stub = self.client.get_stub(self.fallback_spec) return await fallback_stub.ExecuteFunction(request) diff --git a/tetra/core/utils/terminal.py b/tetra/core/utils/terminal.py new file mode 100644 index 00000000..09331b07 --- /dev/null +++ b/tetra/core/utils/terminal.py @@ -0,0 +1,622 @@ +""" +Terminal utilities for enhancing command line interfaces with rich formatting. + +This module provides a collection of functions and classes for creating beautiful, +informative terminal user interfaces with colors, formatting, progress indicators, +and interactive elements. + +Features: +- Automatic terminal capability detection (colors, emoji support) +- Dynamic width adjustment for different terminal sizes +- Styled text with colors and formatting +- Icons and emoji with fallbacks for different terminals +- Progress bars with ETA calculation +- Spinners with multiple animation styles +- Boxed messages and headers +- Operation summaries and statistics + +Examples: + # Basic styled messages + print_info("This is an informational message") + print_success("Operation completed successfully") + print_warning("This might need attention") + print_error("Something went wrong") + + # Progress tracking + with Spinner("Processing...", spinner_type="dots"): + # Do some work + time.sleep(2) + + # Progress bar + items = list(range(100)) + for item in SmartProgress(items, desc="Processing items"): + # Process item + time.sleep(0.01) + + # Boxed messages + print_box( + "Your task has completed successfully!", + title="Success", + color="bright_green" + ) +""" +import sys +import time +import threading +import shutil +import os +from datetime import datetime +from typing import List, Optional, Dict, Any, Callable + +# Detect color support +def supports_color(): + """Determine if the terminal supports color output. + + Returns False if: + - NO_COLOR environment variable is set + - Output is not a TTY + - Running on certain platforms/environments known to not support color + """ + # Check NO_COLOR environment variable (standard for disabling color) + if os.environ.get('NO_COLOR', ''): + return False + + # Check if output is a TTY + if not hasattr(sys.stdout, 'isatty') or not sys.stdout.isatty(): + return False + + # Check platform-specific issues + plat = sys.platform + if plat == 'Pocket PC': + return False + + # Windows support + if plat == 'win32' and 'ANSICON' not in os.environ: + # Check for Windows Terminal, Windows ConPTY or modern PowerShell + if not any(term in os.environ.get('TERM_PROGRAM', '') + for term in ['vscode', 'Windows Terminal']): + return os.environ.get('WT_SESSION', '') != '' + + return True + +# Color mode setting +USE_COLORS = supports_color() + +# Get terminal width +def get_term_width(): + """Get current terminal width, defaults to 80 if can't be determined""" + try: + return shutil.get_terminal_size().columns + except: + return 80 + +TERM_WIDTH = get_term_width() # Initial width + +# ANSI escape codes for colors and styles +# Use raw strings or actual escape sequences to prevent double escaping +COLORS = { + "black": "\033[30m", + "red": "\033[31m", + "green": "\033[32m", + "yellow": "\033[33m", + "blue": "\033[34m", + "magenta": "\033[35m", + "cyan": "\033[36m", + "white": "\033[37m", + "bright_black": "\033[90m", + "bright_red": "\033[91m", + "bright_green": "\033[92m", + "bright_yellow": "\033[93m", + "bright_blue": "\033[94m", + "bright_magenta": "\033[95m", + "bright_cyan": "\033[96m", + "bright_white": "\033[97m", + "reset": "\033[0m", +} + +# Empty color codes for terminals that don't support color +NO_COLORS = {color: "" for color in COLORS} + +STYLES = { + "bold": "\033[1m", + "dim": "\033[2m", + "italic": "\033[3m", + "underline": "\033[4m", + "blink": "\033[5m", + "reverse": "\033[7m", + "hidden": "\033[8m", + "strikethrough": "\033[9m", + "reset": "\033[0m", +} + +# Detect emoji support +def supports_emoji(): + """Determine if the terminal might support emoji characters.""" + # Most modern terminals support emoji, but some older ones don't + # This is a very rough heuristic - in reality it's complex to detect + term = os.environ.get('TERM', '') + if term in ['dumb', 'vt100', 'vt102', 'xterm-mono']: + return False + if sys.platform == 'win32' and not ('CMDER_ROOT' in os.environ or 'WT_SESSION' in os.environ): + # Basic Windows cmd.exe doesn't support emoji well + return False + return True + +USE_EMOJI = supports_emoji() + +# Icons with fallbacks for terminals that don't support emoji +ICONS = { + "info": COLORS["bright_blue"] + ("ℹ" if USE_EMOJI else "i") + COLORS["reset"], + "success": COLORS["bright_green"] + ("✓" if USE_EMOJI else "+") + COLORS["reset"], + "warning": COLORS["bright_yellow"] + ("⚠" if USE_EMOJI else "!") + COLORS["reset"], + "error": COLORS["bright_red"] + ("✖" if USE_EMOJI else "x") + COLORS["reset"], + "debug": COLORS["bright_magenta"] + ("⚙" if USE_EMOJI else "D") + COLORS["reset"], + "tetra": COLORS["bright_cyan"] + ("◊" if USE_EMOJI else "T") + COLORS["reset"], + "rocket": COLORS["bright_yellow"] + ("🚀" if USE_EMOJI else ">") + COLORS["reset"], + "server": COLORS["bright_cyan"] + ("🖥️" if USE_EMOJI else "S") + COLORS["reset"], + "network": COLORS["bright_blue"] + ("🌐" if USE_EMOJI else "N") + COLORS["reset"], + "function": COLORS["bright_magenta"] + ("⚡" if USE_EMOJI else "F") + COLORS["reset"], + "timer": COLORS["bright_yellow"] + ("⏱️" if USE_EMOJI else "t") + COLORS["reset"], + "ml": COLORS["bright_green"] + ("🧠" if USE_EMOJI else "ML") + COLORS["reset"], + "compute": COLORS["bright_yellow"] + ("🔢" if USE_EMOJI else "C") + COLORS["reset"], +} + +# Box drawing characters +BOX = { + "tl": "╭", # top left + "tr": "╮", # top right + "bl": "╰", # bottom left + "br": "╯", # bottom right + "h": "─", # horizontal + "v": "│", # vertical + "ltee": "├", # left tee + "rtee": "┤", # right tee + "ttee": "┬", # top tee + "btee": "┴", # bottom tee + "cross": "┼", # cross +} + +def get_horiz_line(): + """Get a horizontal line with current terminal width""" + return COLORS["bright_black"] + BOX["h"] * (get_term_width() - 2) + COLORS["reset"] + +HORIZ_LINE = get_horiz_line() # Initial line + +def style_text(text: str, color: Optional[str] = None, style: Optional[str] = None) -> str: + """Applies color and style to text using ANSI escape codes.""" + if not USE_COLORS: + return text + + prefix = "" + if color and color in COLORS: + prefix += COLORS[color] + if style and style in STYLES: + prefix += STYLES[style] + + if prefix: + return f"{prefix}{text}{STYLES['reset']}" + return text + +def print_styled(text: str, color: Optional[str] = None, style: Optional[str] = None, icon: Optional[str] = None, end: str = "\n"): + """Prints styled text with an optional icon.""" + icon_str = f"{ICONS[icon]} " if icon and icon in ICONS else "" + styled_msg = style_text(text, color, style) + print(f"{icon_str}{styled_msg}", end=end) + +def print_info(text: str): + print_styled(text, color="bright_blue", icon="info") + +def print_success(text: str): + print_styled(text, color="bright_green", icon="success") + +def print_warning(text: str): + print_styled(text, color="bright_yellow", icon="warning") + +def print_error(text: str): + print_styled(text, color="bright_red", icon="error") + +def print_debug(text: str): + print_styled(text, color="bright_magenta", icon="debug") + +def print_tetra(text: str, style: Optional[str] = None): + """Prints messages related to Tetra core operations.""" + print_styled(text, color="bright_cyan", style=style, icon="tetra") + +def print_header(text: str, icon: Optional[str] = None): + """Prints a prominent header that adjusts to terminal width.""" + term_width = get_term_width() + padding = (term_width - len(text) - 6) // 2 + left_pad = BOX["h"] * padding + right_pad = BOX["h"] * (padding if len(text) % 2 == 0 else padding + 1) + + icon_str = f"{ICONS[icon]} " if icon and icon in ICONS else "" + + header = f"{BOX['tl']}{left_pad} {icon_str}{style_text(text, 'bright_white', 'bold')} {right_pad}{BOX['tr']}" + print(style_text(header, "bright_cyan")) + +def print_subheader(text: str, icon: Optional[str] = None): + """Prints a subheading.""" + icon_str = f"{ICONS[icon]} " if icon and icon in ICONS else "" + styled_text = style_text(text, "bright_white", "bold") + print(f"{icon_str}{styled_text}") + print(style_text(BOX["h"] * len(text), "bright_black")) + +def print_box(content: str, title: Optional[str] = None, color: str = "bright_cyan"): + """Prints content in a box with an optional title, adjusting to terminal width.""" + term_width = get_term_width() + + # Split content by newlines and find the longest line to determine box width + lines = content.split("\n") + width = max(max(len(line) for line in lines), len(title) if title else 0) + 4 + + # Ensure the box fits in the terminal + width = min(width, term_width - 2) + + # Top border with title if provided + if title: + title_styled = style_text(f" {title} ", color, "bold") + title_len = len(title) + 2 + left_pad = (width - title_len) // 2 + right_pad = width - left_pad - title_len + + top_border = f"{BOX['tl']}{BOX['h'] * left_pad}{title_styled}{BOX['h'] * right_pad}{BOX['tr']}" + else: + top_border = f"{BOX['tl']}{BOX['h'] * width}{BOX['tr']}" + + print(style_text(top_border, color)) + + # Content lines + for line in lines: + # Truncate line if it's too long for the box + if len(line) > width - 2: + display_line = line[:width-5] + "..." + else: + display_line = line + + padding = width - len(display_line) - 2 + print(style_text(f"{BOX['v']} {display_line}{' ' * padding} {BOX['v']}", color)) + + # Bottom border + bottom_border = f"{BOX['bl']}{BOX['h'] * width}{BOX['br']}" + print(style_text(bottom_border, color)) + +def print_step(step_num: int, title: str, description: Optional[str] = None): + """Prints a step in a multi-step process.""" + step_header = f"{style_text(str(step_num), 'bright_white', 'bold')}. {style_text(title, 'bright_cyan', 'bold')}" + print(step_header) + if description: + print(f" {description}") + +def print_separator(): + """Prints a horizontal separator line that adjusts to terminal width.""" + print(get_horiz_line()) + +def timestamp(): + """Returns current time formatted as a string.""" + return datetime.now().strftime("%H:%M:%S") + +def print_timestamp(message: str, color: Optional[str] = None): + """Prints a message with timestamp.""" + time_str = style_text(f"[{timestamp()}]", "bright_black") + msg = style_text(message, color) if color else message + print(f"{time_str} {msg}") + +class Spinner: + """A fancy terminal spinner with multiple animation options.""" + SPINNERS = { + "dots": ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"], + "classic": ["◐", "◓", "◑", "◒"], + "arrows": ["←", "↖", "↑", "↗", "→", "↘", "↓", "↙"], + "bounce": ["⠁", "⠂", "⠄", "⡀", "⢀", "⠠", "⠐", "⠈"], + "moon": ["🌑", "🌒", "🌓", "🌔", "🌕", "🌖", "🌗", "🌘"], + "pulse": [" ", "▃", "▄", "▅", "▆", "▇", "█", "▇", "▆", "▅", "▄", "▃"], + } + + def __init__(self, message: str = "Working...", + delay: float = 0.1, + spinner_type: str = "dots", + color: str = "bright_cyan", + icon: Optional[str] = None, + throttle_after: float = 30.0): + self.spinner = self.SPINNERS.get(spinner_type, self.SPINNERS["dots"]) + self.delay = delay + self.message = message + self.color = color + self.icon = icon + self._running = False + self._thread: Optional[threading.Thread] = None + self._start_time = 0 + self.throttle_after = throttle_after # Time in seconds after which to throttle updates + self.throttle_factor = 1.0 # Starts at normal speed + + def _spin(self): + i = 0 + self._start_time = time.time() + icon_str = f"{ICONS[self.icon]} " if self.icon and self.icon in ICONS else "" + last_update_time = self._start_time + + while self._running: + current_time = time.time() + elapsed = current_time - self._start_time + + # Implement throttling for long-running spinners + # Gradually reduce update frequency to save CPU + if elapsed > self.throttle_after: + # Calculate throttle factor based on elapsed time + # The longer it runs, the less frequently we update + self.throttle_factor = min(5.0, 1.0 + (elapsed - self.throttle_after) / 30.0) + + # Only update if enough time has passed based on throttle factor + if current_time - last_update_time >= (self.delay * self.throttle_factor): + spinner_frame = self.spinner[i % len(self.spinner)] + elapsed_str = f" ({elapsed:.1f}s)" if elapsed > 3.0 else "" + + # Compose the entire spinner line + spinner_line = f"\r{icon_str}{style_text(spinner_frame, self.color)} {self.message}{elapsed_str}" + + sys.stdout.write(spinner_line) + sys.stdout.flush() + i += 1 + last_update_time = current_time + + # Sleep for a fixed small time to avoid CPU hogging + time.sleep(self.delay) + + # Clear spinner line when done + sys.stdout.write("\r" + " " * (len(self.message) + 20) + "\r") + sys.stdout.flush() + + def start(self): + if not self._thread or not self._thread.is_alive(): + self._running = True + self._thread = threading.Thread(target=self._spin, daemon=True) + self._thread.start() + + def stop(self): + if self._thread and self._thread.is_alive(): + self._running = False + self._thread.join() + self._thread = None + + def update_message(self, message: str): + """Update the spinner message while it's running.""" + self.message = message + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + if exc_type: + print_error(f"Operation failed: {exc_val}") + return False + +class SmartProgress: + """Advanced progress bar with ETA and flexible customization.""" + def __init__(self, + iterable=None, + total=None, + desc="Processing", + unit="it", + color="bright_cyan", + bar_format=None, + width=40): + self.iterable = iterable + self.total = total if total is not None else len(iterable) if iterable is not None else 100 + self.desc = desc + self.unit = unit + self.color = color + self.width_setting = width # Store width setting + self.n = 0 + self.start_t = None + self.last_print_n = 0 + self.bar_format = bar_format or self._default_bar_format + + def _default_bar_format(self, n, total, elapsed): + """Default format for the progress bar.""" + # Use dynamic width that adapts to current terminal size + term_width = get_term_width() + bar_length = min(self.width_setting, term_width - 30) + filled_len = int(round(bar_length * n / float(total))) if total > 0 else 0 + filled_len = min(filled_len, bar_length) + + # Progress bar + bar = '█' * filled_len + '▯' * (bar_length - filled_len) + + # Percentage + percent = f"{100 * (n / float(total)):.1f}%" if total > 0 else "0.0%" + + # ETA calculation + if n > 0 and elapsed > 0: + rate = n / elapsed + remaining_items = total - n + eta = remaining_items / rate if rate > 0 else 0 + eta_str = f"ETA: {self._format_interval(eta)}" if n < total else f"Time: {self._format_interval(elapsed)}" + else: + eta_str = "ETA: --:--" + + return f"{style_text(self.desc, self.color)}: |{style_text(bar, self.color)}| {percent} ({n}/{total}, {eta_str})" + + def _format_interval(self, t): + """Format a time interval in a human-readable way.""" + mins, s = divmod(int(t), 60) + h, m = divmod(mins, 60) + if h > 0: + return f"{h:d}:{m:02d}:{s:02d}" + else: + return f"{m:02d}:{s:02d}" + + def update(self, n=1): + """Update the progress bar by advancing n units.""" + self.n += n + if self.start_t is None: + self.start_t = time.time() + + # Only refresh display occasionally to avoid slowdowns from terminal rendering + if self.n == self.total or (self.n - self.last_print_n) >= max(1, self.total / 100): + elapsed = time.time() - self.start_t + display = self.bar_format(self.n, self.total, elapsed) + sys.stdout.write(f"\r{display}") + sys.stdout.flush() + self.last_print_n = self.n + + if self.n >= self.total: + sys.stdout.write("\n") + sys.stdout.flush() + + def close(self): + """Close the progress bar.""" + if self.start_t is not None: + self.update(self.total - self.n) # Fill to 100% + + def __iter__(self): + """Iterate over the wrapped iterable.""" + if self.iterable is None: + raise ValueError("Iterable not provided") + + self.n = 0 + self.start_t = time.time() + for obj in self.iterable: + yield obj + self.update() + + self.close() + + def __enter__(self): + if self.iterable is None: + self.start_t = time.time() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + +def show_summary(operations: List[Dict[str, Any]]): + """Displays a summary of operations that were performed.""" + if not operations: + return + + title = "Operation Summary" + print_header(title) + + # Calculate field widths + op_field = max(len(op["operation"]) for op in operations) + 2 + result_field = max(len(op.get("result", "")) for op in operations) + 2 + time_field = 8 # Fixed width for time + + # Print header row + header = f"{style_text('Operation', 'bright_white', 'bold'):{op_field}} " \ + f"{style_text('Result', 'bright_white', 'bold'):{result_field}} " \ + f"{style_text('Time', 'bright_white', 'bold'):{time_field}}" + print(header) + print(style_text("-" * (op_field + result_field + time_field + 2), "bright_black")) + + # Print each operation + for op in operations: + status_color = "bright_green" if op.get("success", True) else "bright_red" + status_icon = "✓" if op.get("success", True) else "✗" + + operation = style_text(op["operation"], "bright_cyan") + result = style_text(f"{status_icon} {op.get('result', '')}", status_color) + duration = op.get("duration", "") + duration_text = style_text(duration, "bright_black") if duration else "" + + print(f"{operation:{op_field}} {result:{result_field}} {duration_text:{time_field}}") + + print_separator() + +# Custom notification classes for distinct messages with personality +class TetraNotifier: + """Creates themed notifications with personality.""" + + @staticmethod + def server_ready(server_name: str, details: Optional[Dict] = None): + """Notification when a server is ready.""" + print_box( + f"Server {style_text(server_name, 'bright_green', 'bold')} is ready and waiting for your tasks!\n" + + (f"Details: {details}" if details else ""), + title="Server Ready", + color="bright_green" + ) + + @staticmethod + def job_submitted(func_name: str, server: str): + """Notification when a job is submitted.""" + print_timestamp( + f"{ICONS['rocket']} Job {style_text(func_name, 'bright_yellow', 'bold')} sent to " + + f"{style_text(server, 'bright_cyan')}. Fasten your seatbelts!", + "bright_white" + ) + + @staticmethod + def job_completed(func_name: str, duration: float): + """Notification when a job completes.""" + duration_str = f"{duration:.2f}s" if duration < 60 else f"{int(duration/60)}m {int(duration%60)}s" + print_box( + f"The function {style_text(func_name, 'bright_magenta', 'bold')} has completed in {duration_str}. " + + "Your results are ready!", + title="Task Complete", + color="bright_green" + ) + + @staticmethod + def welcome(): + """Welcome message with Tetra ASCII art.""" + tetra_art = r""" + _______ _ + |__ __| | | + | | ____| |_ _ __ __ _ + | |/ _ \ __| '__/ _` | + | | __/ |_| | | (_| | + |_|\___|\__|_| \__,_| + """ + + print_box( + style_text(tetra_art, "bright_cyan") + + "\nWelcome to Tetra - Distributed Inference Made Simple" + + "\nPowering your ML functions across the compute universe", + color="bright_cyan" + ) + +# Example usage (can be removed or kept for testing) +if __name__ == "__main__": + TetraNotifier.welcome() + + print_header("Terminal UX Demo", "rocket") + + print_step(1, "Basic Message Styles") + print_info("This is an informational message.") + print_success("Operation completed successfully.") + print_warning("Something might need attention.") + print_error("An error occurred during the process.") + print_debug("Here's some debug information.") + print_tetra("Initializing Tetra core components...", style="bold") + print_separator() + + print_step(2, "Spinner Demo") + for spinner_type in ["classic", "dots", "arrows", "bounce", "moon", "pulse"]: + with Spinner(f"Testing {spinner_type} spinner...", spinner_type=spinner_type, icon="compute"): + time.sleep(1.5) + print_success(f"{spinner_type.capitalize()} spinner works!") + print_separator() + + print_step(3, "Progress Bar Demo") + items = list(range(100)) + for _ in SmartProgress(items, desc="Processing items", unit="items", color="bright_green"): + time.sleep(0.01) + print_separator() + + print_step(4, "Box Messages") + TetraNotifier.server_ready("gpu-server-01", {"gpu": "RTX 4090", "mem": "24GB"}) + TetraNotifier.job_submitted("image_generation", "gpu-cluster") + TetraNotifier.job_completed("transformer_inference", 45.3) + print_separator() + + print_step(5, "Operation Summary") + operations = [ + {"operation": "Initialize model", "result": "Success", "duration": "5.2s", "success": True}, + {"operation": "Load dataset", "result": "Success", "duration": "2.1s", "success": True}, + {"operation": "Server connection", "result": "Failed", "duration": "0.5s", "success": False}, + {"operation": "Fallback execution", "result": "Success", "duration": "8.7s", "success": True}, + ] + show_summary(operations) \ No newline at end of file