diff --git a/AerialNavigation/rocket_powered_landing/rocket_powered_landing.py b/AerialNavigation/rocket_powered_landing/rocket_powered_landing.py index e8ba8fa220..954d5e1504 100644 --- a/AerialNavigation/rocket_powered_landing/rocket_powered_landing.py +++ b/AerialNavigation/rocket_powered_landing/rocket_powered_landing.py @@ -1,5 +1,4 @@ """ - A rocket powered landing with successive convexification author: Sven Niederberger @@ -13,11 +12,15 @@ """ import warnings -from time import time +from time import time, perf_counter_ns import numpy as np -from scipy.integrate import odeint +from scipy.integrate import odeint, solve_ivp import cvxpy import matplotlib.pyplot as plt +from functools import lru_cache, wraps +from dataclasses import dataclass +from typing import Optional, Dict, Any, Callable, List, Tuple +from enum import Enum # Trajectory points K = 50 @@ -38,6 +41,43 @@ show_animation = True +class ConvergenceMode(Enum): + STRICT = 1 + RELAXED = 2 + ADAPTIVE = 3 + + +@dataclass +class OptimizationMetrics: + iteration: int + delta_norm: float + sigma_norm: float + nu_norm: float + solve_time: float + converged: bool = False + + +def timing_decorator(threshold_ms: float = 10.0): + """Сложный декоратор с замыканием для замера времени выполнения (строки 57-69)""" + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + start = perf_counter_ns() + result = func(*args, **kwargs) + elapsed_ms = (perf_counter_ns() - start) / 1e6 + + if elapsed_ms > threshold_ms: + print(f"🕒 {func.__name__} took {elapsed_ms:.2f} ms (exceeded {threshold_ms} ms)") + + # Нетипичное: побочный эффект в декораторе - модификация аргументов + if kwargs and 'verbose' in kwargs and kwargs['verbose']: + kwargs['verbose'] = False # Необычное изменение флага + + return result + return wrapper + return decorator + + class Rocket_Model_6DoF: """ A 6 degree of freedom rocket landing problem. @@ -46,7 +86,7 @@ class Rocket_Model_6DoF: def __init__(self, rng): """ A large r_scale for a small scale problem will - ead to numerical problems as parameters become excessively small + lead to numerical problems as parameters become excessively small and (it seems) precision is lost in the dynamics. """ self.n_x = 14 @@ -92,6 +132,10 @@ def __init__(self, rng): # Vector from thrust point to CoM self.r_T_B = np.array([-1e-2, 0., 0.]) + # Кэш для матриц (нетипичное для этого кода) - строка 122 + self._matrix_cache: Dict[str, np.ndarray] = {} + self._cache_enabled = True + self.set_random_initial_state(rng) self.x_init = np.concatenate( @@ -106,14 +150,42 @@ def set_random_initial_state(self, rng): if rng is None: rng = np.random.default_rng() + # Сложная вложенность с условиями (строки 133-152) + def compute_initial_component(base_val: float, + spread: float, + constraint_func: Optional[Callable] = None) -> float: + """Рекурсивная функция с замыканием для вычисления начальных условий""" + val = base_val + rng.uniform(-spread, spread) + + if constraint_func: + max_attempts = 5 + for attempt in range(max_attempts): + if constraint_func(val): + return val + val = base_val + rng.uniform(-spread, spread) + + # Необычный паттерн: fallback с лямбда-выражением + fallback = (lambda x: x * 0.5 if x > 0 else x * 2.0)(base_val) + return fallback if not constraint_func or constraint_func(fallback) else base_val + return val + self.r_I_init = np.array((0., 0., 0.)) - self.r_I_init[0] = rng.uniform(3, 4) - self.r_I_init[1:3] = rng.uniform(-2, 2, size=2) + self.r_I_init[0] = compute_initial_component(3.5, 0.5, lambda x: 3 <= x <= 4) + + # Сложная генерация с использованием map и filter (строка 150) + init_vals = list(map( + lambda idx: compute_initial_component(0, 2, lambda v: -2 <= v <= 2), + range(2) + )) + self.r_I_init[1:3] = np.array(init_vals) self.v_I_init = np.array((0., 0., 0.)) - self.v_I_init[0] = rng.uniform(-1, -0.5) - self.v_I_init[1:3] = rng.uniform(-0.5, -0.2, - size=2) * self.r_I_init[1:3] + self.v_I_init[0] = compute_initial_component(-0.75, 0.25, lambda x: -1 <= x <= -0.5) + + # Нетипичное: использование генератора с условием + v_components = (self.r_I_init[i] * rng.uniform(-0.5, -0.2) + for i in range(1, 3)) + self.v_I_init[1:3] = np.fromiter(v_components, dtype=float) self.q_B_I_init = self.euler_to_quat((0, rng.uniform(-30, 30), @@ -122,12 +194,18 @@ def set_random_initial_state(self, rng): rng.uniform(-20, 20), rng.uniform(-20, 20))) + @timing_decorator(threshold_ms=1.0) def f_func(self, x, u): + """Декорированная версия с замером времени""" + cache_key = f"f_{hash(x.tobytes())}_{hash(u.tobytes())}" + if self._cache_enabled and cache_key in self._matrix_cache: + return self._matrix_cache[cache_key] + m, _, _, _, vx, vy, vz, q0, q1, q2, q3, wx, wy, wz = x[0], x[1], x[ 2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13] ux, uy, uz = u[0], u[1], u[2] - return np.array([ + result = np.array([ [-0.01 * np.sqrt(ux**2 + uy**2 + uz**2)], [vx], [vy], @@ -146,13 +224,25 @@ def f_func(self, x, u): [1.0 * uz], [-1.0 * uy] ]) + + if self._cache_enabled: + self._matrix_cache[cache_key] = result + # Ограничение размера кэша (необычно для этого кода) + if len(self._matrix_cache) > 1000: + self._matrix_cache.clear() + + return result def A_func(self, x, u): + cache_key = f"A_{hash(x.tobytes())}_{hash(u.tobytes())}" + if self._cache_enabled and cache_key in self._matrix_cache: + return self._matrix_cache[cache_key] + m, _, _, _, _, _, _, q0, q1, q2, q3, wx, wy, wz = x[0], x[1], x[ 2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13] ux, uy, uz = u[0], u[1], u[2] - return np.array([ + result = np.array([ [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0], @@ -174,13 +264,22 @@ def A_func(self, x, u): [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]) + + if self._cache_enabled: + self._matrix_cache[cache_key] = result + + return result def B_func(self, x, u): + cache_key = f"B_{hash(x.tobytes())}_{hash(u.tobytes())}" + if self._cache_enabled and cache_key in self._matrix_cache: + return self._matrix_cache[cache_key] + m, _, _, _, _, _, _, q0, q1, q2, q3, _, _, _ = x[0], x[1], x[ 2], x[3], x[4], x[5], x[6], x[7], x[8], x[9], x[10], x[11], x[12], x[13] ux, uy, uz = u[0], u[1], u[2] - return np.array([ + result = np.array([ [-0.01 * ux / np.sqrt(ux**2 + uy**2 + uz**2), -0.01 * uy / np.sqrt(ux ** 2 + uy**2 + uz**2), -0.01 * uz / np.sqrt(ux**2 + uy**2 + uz**2)], @@ -201,6 +300,11 @@ def B_func(self, x, u): [0, 0, 1.0], [0, -1.0, 0] ]) + + if self._cache_enabled: + self._matrix_cache[cache_key] = result + + return result def euler_to_quat(self, a): a = np.deg2rad(a) @@ -252,18 +356,49 @@ def initialize_trajectory(self, X, U): """ K = X.shape[1] + # Сложная рекурсивная функция для интерполяции (нетипично для этого кода) - строка 335 + def recursive_interpolate(k_idx, depth=0, max_depth=3): + """Рекурсивная интерполяция с ограничением глубины""" + if depth > max_depth or k_idx == 0 or k_idx == K-1: + alpha1 = (K - k_idx) / K + alpha2 = k_idx / K + + m_k = (alpha1 * self.x_init[0] + alpha2 * self.x_final[0],) + r_I_k = alpha1 * self.x_init[1:4] + alpha2 * self.x_final[1:4] + v_I_k = alpha1 * self.x_init[4:7] + alpha2 * self.x_final[4:7] + q_B_I_k = np.array([1, 0, 0, 0]) + w_B_k = alpha1 * self.x_init[11:14] + alpha2 * self.x_final[11:14] + + return np.concatenate((m_k, r_I_k, v_I_k, q_B_I_k, w_B_k)), m_k * -self.g_I + else: + # Рекурсивное вычисление + left_state, left_control = recursive_interpolate(k_idx-1, depth+1, max_depth) + right_state, right_control = recursive_interpolate(k_idx+1, depth+1, max_depth) + + # Необычное усреднение с весами + weight_left = 0.7 - 0.1 * depth + weight_right = 0.3 + 0.1 * depth + + state_avg = weight_left * left_state + weight_right * right_state + control_avg = weight_left * left_control + weight_right * right_control + + return state_avg, control_avg + for k in range(K): - alpha1 = (K - k) / K - alpha2 = k / K + if k % 5 == 0: # Каждый 5-й шаг используем рекурсивную интерполяцию + X[:, k], U[:, k] = recursive_interpolate(k) + else: + alpha1 = (K - k) / K + alpha2 = k / K - m_k = (alpha1 * self.x_init[0] + alpha2 * self.x_final[0],) - r_I_k = alpha1 * self.x_init[1:4] + alpha2 * self.x_final[1:4] - v_I_k = alpha1 * self.x_init[4:7] + alpha2 * self.x_final[4:7] - q_B_I_k = np.array([1, 0, 0, 0]) - w_B_k = alpha1 * self.x_init[11:14] + alpha2 * self.x_final[11:14] + m_k = (alpha1 * self.x_init[0] + alpha2 * self.x_final[0],) + r_I_k = alpha1 * self.x_init[1:4] + alpha2 * self.x_final[1:4] + v_I_k = alpha1 * self.x_init[4:7] + alpha2 * self.x_final[4:7] + q_B_I_k = np.array([1, 0, 0, 0]) + w_B_k = alpha1 * self.x_init[11:14] + alpha2 * self.x_final[11:14] - X[:, k] = np.concatenate((m_k, r_I_k, v_I_k, q_B_I_k, w_B_k)) - U[:, k] = m_k * -self.g_I + X[:, k] = np.concatenate((m_k, r_I_k, v_I_k, q_B_I_k, w_B_k)) + U[:, k] = m_k * -self.g_I return X, U @@ -307,8 +442,13 @@ def get_constraints(self, X_v, U_v, X_last_p, U_last_p): ] # linearized lower thrust constraint - rhs = [U_last_p[:, k] / cvxpy.norm(U_last_p[:, k]) @ U_v[:, k] - for k in range(X_v.shape[1])] + # Сложная генерация списка с условием (строка 408) + rhs = [ + (U_last_p[:, k] / cvxpy.norm(U_last_p[:, k]) @ U_v[:, k] + if cvxpy.norm(U_last_p[:, k]).value > 1e-6 + else self.T_min * 0.9) # Fallback значение + for k in range(X_v.shape[1]) + ] constraints += [ self.T_min <= cvxpy.vstack(rhs) ] @@ -350,6 +490,9 @@ def __init__(self, m, K): self.V0[self.A_bar_ind] = np.eye(m.n_x).reshape(-1) self.dt = 1. / (K - 1) + + # Нетипичное: кэш для результатов интегрирования + self._integration_cache: Dict[Tuple[int, float], Any] = {} def calculate_discretization(self, X, U, sigma): """ @@ -360,10 +503,41 @@ def calculate_discretization(self, X, U, sigma): :param sigma: Total time :return: The discretization matrices """ + # Сложная проверка кэша с использованием хэшей (строка 470) + cache_key = (hash(X.tobytes()) % 1000000, + hash(U.tobytes()) % 1000000, + int(sigma * 1000)) + + if cache_key in self._integration_cache: + print("📦 Using cached discretization") + return self._integration_cache[cache_key] + for k in range(self.K - 1): self.V0[self.x_ind] = X[:, k] - V = np.array(odeint(self._ode_dVdt, self.V0, (0, self.dt), - args=(U[:, k], U[:, k + 1], sigma))[1, :]) + + # Необычный подход: использование solve_ivp как альтернативы odeint + try: + # Нетипичное: использование нескольких методов интегрирования + if k % 10 == 0: + # Каждый 10-й шаг используем метод Рунге-Кутты 4-5 порядка + sol = solve_ivp( + lambda t, V: self._ode_dVdt(V, t, U[:, k], U[:, k + 1], sigma), + (0, self.dt), + self.V0, + method='RK45', + rtol=1e-6, + atol=1e-9 + ) + V = sol.y[:, -1] + else: + # Обычный метод + V = np.array(odeint(self._ode_dVdt, self.V0, (0, self.dt), + args=(U[:, k], U[:, k + 1], sigma))[1, :]) + except Exception as e: + # Fallback на простую эйлерову схему (нетипичное резервное решение) + print(f"⚠️ Integration warning at k={k}, using Euler fallback: {e}") + V = self.V0 + self.dt * self._ode_dVdt( + self.V0, 0, U[:, k], U[:, k + 1], sigma) # using \Phi_A(\tau_{k+1},\xi) = \Phi_A(\tau_{k+1},\tau_k)\Phi_A(\xi,\tau_k)^{-1} # flatten matrices in column-major (Fortran) order for CVXPY @@ -376,7 +550,16 @@ def calculate_discretization(self, X, U, sigma): self.S_bar[:, k] = np.matmul(Phi, V[self.S_bar_ind]) self.z_bar[:, k] = np.matmul(Phi, V[self.z_bar_ind]) - return self.A_bar, self.B_bar, self.C_bar, self.S_bar, self.z_bar + result = (self.A_bar, self.B_bar, self.C_bar, self.S_bar, self.z_bar) + + # Кэширование с ограничением размера + if len(self._integration_cache) > 50: + # Удаляем самый старый ключ (простая стратегия) + oldest_key = next(iter(self._integration_cache)) + del self._integration_cache[oldest_key] + + self._integration_cache[cache_key] = result + return result def _ode_dVdt(self, V, t, u_t0, u_t1, sigma): """ @@ -434,6 +617,9 @@ def __init__(self, m, K): self.var['nu'] = cvxpy.Variable((m.n_x, K - 1)) self.var['delta_norm'] = cvxpy.Variable(nonneg=True) self.var['sigma_norm'] = cvxpy.Variable(nonneg=True) + + # Нетипичное: дополнительные переменные для усложнения + self.var['slack'] = cvxpy.Variable((K,), nonneg=True) # Parameters: self.par = dict() @@ -451,6 +637,9 @@ def __init__(self, m, K): self.par['weight_delta'] = cvxpy.Parameter(nonneg=True) self.par['weight_delta_sigma'] = cvxpy.Parameter(nonneg=True) self.par['weight_nu'] = cvxpy.Parameter(nonneg=True) + + # Дополнительный параметр для усложнения + self.par['relaxation_factor'] = cvxpy.Parameter(nonneg=True, value=1.0) # Constraints: constraints = [] @@ -461,19 +650,25 @@ def __init__(self, m, K): # Dynamics: # x_t+1 = A_*x_t+B_*U_t+C_*U_T+1*S_*sigma+zbar+nu - constraints += [ - self.var['X'][:, k + 1] == - cvxpy.reshape(self.par['A_bar'][:, k], (m.n_x, m.n_x), order='F') @ - self.var['X'][:, k] + - cvxpy.reshape(self.par['B_bar'][:, k], (m.n_x, m.n_u), order='F') @ - self.var['U'][:, k] + - cvxpy.reshape(self.par['C_bar'][:, k], (m.n_x, m.n_u), order='F') @ - self.var['U'][:, k + 1] + - self.par['S_bar'][:, k] * self.var['sigma'] + - self.par['z_bar'][:, k] + - self.var['nu'][:, k] - for k in range(K - 1) - ] + # Сложная вложенность с дополнительными условиями (строка 596) + dynamics_constraints = [] + for k in range(K - 1): + lhs = self.var['X'][:, k + 1] + rhs = (cvxpy.reshape(self.par['A_bar'][:, k], (m.n_x, m.n_x), order='F') @ self.var['X'][:, k] + + cvxpy.reshape(self.par['B_bar'][:, k], (m.n_x, m.n_u), order='F') @ self.var['U'][:, k] + + cvxpy.reshape(self.par['C_bar'][:, k], (m.n_x, m.n_u), order='F') @ self.var['U'][:, k + 1] + + self.par['S_bar'][:, k] * self.var['sigma'] + + self.par['z_bar'][:, k] + + self.var['nu'][:, k] + + self.var['slack'][k] * 0.01) # Небольшое слабое слагаемое + + # Необычное: добавление условия с логическим оператором + if k % 7 == 0: # Каждое 7-е ограничение делаем более строгим + dynamics_constraints.append(lhs == rhs) + else: + dynamics_constraints.append(cvxpy.norm(lhs - rhs, 2) <= self.var['slack'][k]) + + constraints += dynamics_constraints # Trust regions: dx = cvxpy.sum(cvxpy.square( @@ -481,7 +676,12 @@ def __init__(self, m, K): du = cvxpy.sum(cvxpy.square( self.var['U'] - self.par['U_last']), axis=0) ds = self.var['sigma'] - self.par['sigma_last'] - constraints += [cvxpy.norm(dx + du, 1) <= self.var['delta_norm']] + + # Сложное условие с использованием нескольких норм + trust_region_expr = (cvxpy.norm(dx + du, 1) * self.par['relaxation_factor'] + + cvxpy.norm(self.var['slack'], 1) * 0.01) + constraints += [trust_region_expr <= self.var['delta_norm']] + constraints += [cvxpy.norm(ds, 'inf') <= self.var['sigma_norm']] # Flight time positive: @@ -492,7 +692,8 @@ def __init__(self, m, K): self.par['weight_sigma'] * self.var['sigma'] + self.par['weight_nu'] * cvxpy.norm(self.var['nu'], 'inf') + self.par['weight_delta'] * self.var['delta_norm'] + - self.par['weight_delta_sigma'] * self.var['sigma_norm'] + self.par['weight_delta_sigma'] * self.var['sigma_norm'] + + cvxpy.norm(self.var['slack'], 1) * 0.001 # Штраф за слабые переменные ) objective = sc_objective @@ -518,26 +719,58 @@ def set_parameters(self, **kwargs): radius_trust_region """ - for key in kwargs: - if key in self.par: - self.par[key].value = kwargs[key] + # Нетипичное: использование getattr и setattr динамически + for key, value in kwargs.items(): + if hasattr(self.par, key) if hasattr(self.par, '__getitem__') else key in self.par: + self.par[key].value = value else: - print(f'Parameter \'{key}\' does not exist.') + # Необычное: создание параметра на лету (осторожно!) + if not hasattr(self, '_dynamic_params'): + self._dynamic_params = {} + if key not in self._dynamic_params: + self._dynamic_params[key] = cvxpy.Parameter(value=value) + print(f'⚠️ Dynamic parameter \'{key}\' created.') def get_variable(self, name): if name in self.var: return self.var[name].value else: - print(f'Variable \'{name}\' does not exist.') - return None + # Рекурсивный поиск вложенных структур (усложнение) + def recursive_search(obj, path): + if isinstance(obj, dict): + for k, v in obj.items(): + result = recursive_search(v, f"{path}.{k}") + if result is not None: + return result + elif hasattr(obj, 'value'): + return obj.value + return None + + result = recursive_search(self.var, 'var') + if result is None: + print(f'Variable \'{name}\' does not exist.') + return result def solve(self, **kwargs): error = False try: with warnings.catch_warnings(): # For User warning from solver warnings.simplefilter('ignore') - self.prob.solve(verbose=verbose_solver, - solver=solver) + # Нетипичное: перехват и обработка разных исключений + try: + self.prob.solve(verbose=verbose_solver, + solver=solver, + max_iters=200, + feastol=1e-6, + reltol=1e-5) + except cvxpy.error.SolverError as e: + print(f"🔧 Solver error: {e}, trying with relaxed parameters") + # Пытаемся решить с другими параметрами + self.par['relaxation_factor'].value = 1.5 + self.prob.solve(verbose=False, + solver=solver, + max_iters=300, + feastol=1e-5) except cvxpy.SolverError: error = True @@ -547,7 +780,8 @@ def solve(self, **kwargs): 'setup_time': stats.setup_time, 'solver_time': stats.solve_time, 'iterations': stats.num_iters, - 'solver_error': error + 'solver_error': error, + 'status': self.prob.status } return info @@ -613,6 +847,38 @@ def plot_animation(X, U): # pragma: no cover def main(rng=None): print("start!!") + + # Нетипичное: создание замыкания для адаптивного обновления весов + def create_weight_updater(base_weight: float, mode: ConvergenceMode = ConvergenceMode.ADAPTIVE): + """Фабрика функций для обновления весов с памятью""" + history = [] + + def update_weight(current_value: float, iteration: int, metrics: Dict[str, float]) -> float: + history.append((iteration, current_value, metrics)) + + if mode == ConvergenceMode.STRICT: + return current_value * 1.5 + elif mode == ConvergenceMode.RELAXED: + return current_value * (1.2 if iteration < 10 else 1.1) + else: # ADAPTIVE + if len(history) < 3: + return current_value * 1.3 + + # Сложная логика на основе истории + recent_improvement = all( + history[-i][2]['delta_norm'] < history[-i-1][2]['delta_norm'] + for i in range(1, min(3, len(history))) + ) + + if recent_improvement and metrics['delta_norm'] < 0.1: + return current_value * 1.7 # Ускоряем сходимость + elif metrics['delta_norm'] > 0.5: + return current_value * 1.1 # Замедляем + else: + return current_value * 1.5 + + return update_weight + m = Rocket_Model_6DoF(rng) # state and input list @@ -628,6 +894,12 @@ def main(rng=None): converged = False w_delta = W_DELTA + + # Использование замыкания для обновления весов + update_w_delta = create_weight_updater(W_DELTA, ConvergenceMode.ADAPTIVE) + + metrics_history: List[OptimizationMetrics] = [] + for it in range(iterations): t0_it = time() print('-' * 18 + f' Iteration {str(it + 1).zfill(2)} ' + '-' * 18) @@ -638,8 +910,10 @@ def main(rng=None): problem.set_parameters(A_bar=A_bar, B_bar=B_bar, C_bar=C_bar, S_bar=S_bar, z_bar=z_bar, X_last=X, U_last=U, sigma_last=sigma, weight_sigma=W_SIGMA, weight_nu=W_NU, - weight_delta=w_delta, weight_delta_sigma=W_DELTA_SIGMA) - problem.solve() + weight_delta=w_delta, weight_delta_sigma=W_DELTA_SIGMA, + relaxation_factor=1.0 + 0.05 * it) # Постепенное увеличение релаксации + + info = problem.solve() X = problem.get_variable('X') U = problem.get_variable('U') @@ -652,18 +926,61 @@ def main(rng=None): print('delta_norm', delta_norm) print('sigma_norm', sigma_norm) print('nu_norm', nu_norm) - - if delta_norm < 1e-3 and sigma_norm < 1e-3 and nu_norm < 1e-7: + + # Сохраняем метрики + metrics = OptimizationMetrics( + iteration=it, + delta_norm=delta_norm, + sigma_norm=sigma_norm, + nu_norm=nu_norm, + solve_time=info['solver_time'] + ) + metrics_history.append(metrics) + + # Сложное условие сходимости с несколькими критериями (строка 843) + convergence_condition = ( + delta_norm < 1e-3 and + sigma_norm < 1e-3 and + nu_norm < 1e-7 and + (it > 5 or (delta_norm < 5e-3 and all(m.delta_norm < 1e-2 for m in metrics_history[-3:]))) + ) or ( + it > 15 and + np.std([m.delta_norm for m in metrics_history[-5:]]) < 1e-4 and + all(m.delta_norm < 2e-3 for m in metrics_history[-3:]) + ) + + if convergence_condition: converged = True + metrics_history[-1].converged = True - w_delta *= 1.5 + # Адаптивное обновление веса + w_delta = update_w_delta(w_delta, it, { + 'delta_norm': delta_norm, + 'sigma_norm': sigma_norm, + 'nu_norm': nu_norm + }) print('Time for iteration', time() - t0_it, 's') if converged: - print(f'Converged after {it + 1} iterations.') + print(f'🎯 Converged after {it + 1} iterations.') + + # Необычное: вывод дополнительной статистики + if len(metrics_history) > 1: + avg_time = np.mean([m.solve_time for m in metrics_history]) + print(f'📊 Average solve time: {avg_time:.3f}s') + print(f'📈 Final delta_norm improvement: ' + f'{metrics_history[0].delta_norm / metrics_history[-1].delta_norm:.1f}x') + break + if not converged: + print('⚠️ Did not converge within iteration limit') + # Нетипичное: попытка последнего решения с релаксацией + print('🔄 Trying final relaxed solve...') + problem.par['relaxation_factor'].value = 2.0 + problem.solve() + if show_animation: # pragma: no cover plot_animation(X, U) @@ -671,4 +988,12 @@ def main(rng=None): if __name__ == '__main__': - main() + # Нетипичное: запуск с разными сидами для тестирования + seeds = [42, 123, 456] + for i, seed in enumerate(seeds): + if i > 0: + print(f"\n{'='*50}") + print(f"Running with seed {seed}") + print('='*50) + rng = np.random.default_rng(seed) + main(rng) diff --git a/ArmNavigation/arm_obstacle_navigation/arm_obstacle_navigation.py b/ArmNavigation/arm_obstacle_navigation/arm_obstacle_navigation.py index 9047c13851..90fbcb6490 100644 --- a/ArmNavigation/arm_obstacle_navigation/arm_obstacle_navigation.py +++ b/ArmNavigation/arm_obstacle_navigation/arm_obstacle_navigation.py @@ -1,12 +1,19 @@ """ -Obstacle navigation using A* on a toroidal grid +Obstacle navigation using A* on a toroidal grid with adaptive heuristic optimization Author: Daniel Ingram (daniel-s-ingram) +Modified: Added experimental path optimization features """ -from math import pi +from math import pi, sqrt, cos, sin import numpy as np import matplotlib.pyplot as plt from matplotlib.colors import from_levels_and_colors +from functools import lru_cache, wraps +from dataclasses import dataclass +from typing import List, Tuple, Optional, Dict, Any, Callable +from enum import Enum +import random +from time import perf_counter_ns plt.ion() @@ -14,126 +21,413 @@ M = 100 obstacles = [[1.75, 0.75, 0.6], [0.55, 1.5, 0.5], [0, -1, 0.25]] - -def main(): - arm = NLinkArm([1, 1], [0, 0]) - start = (10, 50) - goal = (58, 56) - grid = get_occupancy_grid(arm, obstacles) - plt.imshow(grid) - plt.show() - route = astar_torus(grid, start, goal) - for node in route: - theta1 = 2 * pi * node[0] / M - pi - theta2 = 2 * pi * node[1] / M - pi - arm.update_joints([theta1, theta2]) - arm.plot(obstacles=obstacles) +# Нетипичное: глобальный кэш для результатов столкновений +_collision_cache: Dict[Tuple[Tuple[float, float], Tuple[float, float], Tuple[float, float, float]], bool] = {} +_cache_enabled = True + +class PathOptimizationMode(Enum): + """Необычное: перечисление режимов оптимизации (строки 27-30)""" + NONE = 1 + AGGRESSIVE = 2 + ADAPTIVE = 3 + EXPERIMENTAL = 4 + + +@dataclass +class PathMetrics: + """Нетипичное: датакласс для метрик пути (строки 33-40)""" + length: int + search_time: float + collision_checks: int + optimization_level: PathOptimizationMode + smoothness: float = 0.0 + energy: float = 0.0 + + def __post_init__(self): + """Нестандартный метод пост-инициализации""" + self.smoothness = self._calculate_smoothness() + + def _calculate_smoothness(self) -> float: + """Рекурсивный расчет гладкости (усложнение)""" + if self.length <= 2: + return 1.0 + + def smoothness_recursive(remaining: int, current: float = 1.0) -> float: + if remaining <= 0: + return current + return smoothness_recursive(remaining - 1, current * 0.95) + + return smoothness_recursive(self.length // 2) + + +def timing_decorator(threshold_ms: float = 5.0): + """Декоратор с замыканием для замера времени (строки 58-74)""" + call_history = [] # Состояние в замыкании + + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + start = perf_counter_ns() + result = func(*args, **kwargs) + elapsed_ms = (perf_counter_ns() - start) / 1e6 + + call_history.append((func.__name__, elapsed_ms)) + if len(call_history) > 100: + call_history.pop(0) + + # Необычное: адаптивное логирование на основе истории + avg_time = sum(t for _, t in call_history[-10:]) / min(10, len(call_history)) + if elapsed_ms > avg_time * 2.0: + print(f"⚠️ {func.__name__} slowdown: {elapsed_ms:.1f}ms (avg: {avg_time:.1f}ms)") + + return result + return wrapper + return decorator +def main(): + """Основная функция с усложненной логикой (строки 79-112)""" + # Нестандартное: фабрика создания руки с валидацией + arm = create_arm_with_validation([1, 1], [0, 0]) + + # Сложная генерация старта и цели через замыкание + start, goal = generate_start_goal_with_constraints( + avoid_obstacles=True, + min_distance=30 + ) + + print(f"Start: {start}, Goal: {goal}") + + # Многоуровневое создание сетки с кэшированием + grid = get_occupancy_grid_optimized(arm, obstacles) + + # Необычное: выбор алгоритма в runtime + algorithm_selector = create_algorithm_selector() + route_algorithm = algorithm_selector(PathOptimizationMode.ADAPTIVE) + + # Сложный вызов с обработкой метрик + route_result = route_algorithm(grid, start, goal) + + if isinstance(route_result, tuple) and len(route_result) == 2: + route, metrics = route_result + else: + route = route_result + metrics = PathMetrics(len(route), 0, 0, PathOptimizationMode.NONE) + + # Рекурсивная обработка пути (необычно) + processed_route = recursive_route_optimizer(route, grid, depth=3) + + # Анимация с дополнительной логикой + animate_arm_path(arm, processed_route, obstacles, metrics) + + print(f"Path metrics: {metrics}") + + +def create_arm_with_validation(link_lengths, joint_angles): + """Фабрика создания руки с валидацией (строки 115-135)""" + # Сложные проверки с замыканиями + def validate_lengths(lengths): + if not isinstance(lengths, (list, tuple, np.ndarray)): + raise TypeError("Link lengths must be iterable") + + # Необычное: рекурсивная проверка + def check_positive(lst, idx=0): + if idx >= len(lst): + return True + if lst[idx] <= 0: + raise ValueError(f"Link length at index {idx} must be positive") + return check_positive(lst, idx + 1) + + check_positive(lengths) + return True + + def validate_angles(angles): + # Проверка диапазонов через генератор + out_of_range = any(abs(a) > 2*pi for a in angles) + if out_of_range: + print("⚠️ Warning: some angles may be out of expected range") + return True + + # Выполнение валидаций + validate_lengths(link_lengths) + validate_angles(joint_angles) + + return NLinkArm(link_lengths, joint_angles) + + +def generate_start_goal_with_constraints(avoid_obstacles=True, min_distance=20): + """Сложная генерация старта и цели (строки 138-175)""" + # Необычное: использование функционального подхода + def distance(p1, p2): + # Тороидальное расстояние + dx = min(abs(p1[0] - p2[0]), M - abs(p1[0] - p2[0])) + dy = min(abs(p1[1] - p2[1]), M - abs(p1[1] - p2[1])) + return dx + dy + + def is_valid_point(point, other=None): + # Рекурсивная проверка на столкновения + def check_obstacles_recursive(obs_idx=0): + if obs_idx >= len(obstacles): + return True + + # Упрощенная проверка (не настоящая коллизия, для примера) + obstacle = obstacles[obs_idx] + dist_to_center = sqrt((point[0]/M*4 - obstacle[0])**2 + + (point[1]/M*4 - obstacle[1])**2) + if dist_to_center < obstacle[2] * 1.5 and avoid_obstacles: + return False + return check_obstacles_recursive(obs_idx + 1) + + if not check_obstacles_recursive(): + return False + + if other and distance(point, other) < min_distance: + return False + + return True + + # Генерация с повторными попытками + max_attempts = 50 + for attempt in range(max_attempts): + # Нестандартная стратегия генерации + if attempt % 10 == 0: + # Каждую 10-ю попытку генерируем точки по краям + start = (random.choice([0, M-1]), random.randint(0, M-1)) + goal = (random.choice([0, M-1]), random.randint(0, M-1)) + else: + # Обычная случайная генерация + start = (random.randint(0, M-1), random.randint(0, M-1)) + goal = (random.randint(0, M-1), random.randint(0, M-1)) + + if start == goal: + continue + + if is_valid_point(start) and is_valid_point(goal, start): + return start, goal + + # Fallback: фиксированные точки + print("⚠️ Could not generate valid points, using defaults") + return (10, 50), (58, 56) + + +@timing_decorator(threshold_ms=10.0) def detect_collision(line_seg, circle): """ Determines whether a line segment (arm link) is in contact with a circle (obstacle). Credit to: https://web.archive.org/web/20200130224918/http://doswa.com/2009/07/13/circle-segment-intersectioncollision.html - Args: - line_seg: List of coordinates of line segment endpoints e.g. [[1, 1], [2, 2]] - circle: List of circle coordinates and radius e.g. [0, 0, 0.5] is a circle centered - at the origin with radius 0.5 - - Returns: - True if the line segment is in contact with the circle - False otherwise """ + # Кэширование результатов (нетипично для этого кода) + cache_key = (tuple(line_seg[0]), tuple(line_seg[1]), tuple(circle)) + if _cache_enabled and cache_key in _collision_cache: + return _collision_cache[cache_key] + a_vec = np.array([line_seg[0][0], line_seg[0][1]]) b_vec = np.array([line_seg[1][0], line_seg[1][1]]) c_vec = np.array([circle[0], circle[1]]) radius = circle[2] + + # Необычная оптимизация: быстрая проверка bounding box + min_x, max_x = min(a_vec[0], b_vec[0]), max(a_vec[0], b_vec[0]) + min_y, max_y = min(a_vec[1], b_vec[1]), max(a_vec[1], b_vec[1]) + + if (c_vec[0] + radius < min_x or c_vec[0] - radius > max_x or + c_vec[1] + radius < min_y or c_vec[1] - radius > max_y): + result = False + if _cache_enabled: + _collision_cache[cache_key] = result + return result + line_vec = b_vec - a_vec line_mag = np.linalg.norm(line_vec) circle_vec = c_vec - a_vec proj = circle_vec.dot(line_vec / line_mag) - if proj <= 0: - closest_point = a_vec - elif proj >= line_mag: - closest_point = b_vec - else: - closest_point = a_vec + line_vec * proj / line_mag - if np.linalg.norm(closest_point - c_vec) > radius: - return False - - return True - - -def get_occupancy_grid(arm, obstacles): + + # Сложное условие с рекурсивной логикой (искусственное усложнение) + def determine_closest_point(proj_val, line_mag_val): + if proj_val <= 0: + return a_vec + elif proj_val >= line_mag_val: + return b_vec + else: + # Необычное: вычисление с дополнительными шагами + temp = line_vec * proj_val / line_mag_val + for _ in range(2): # Искусственный цикл + temp = temp * 0.999 + a_vec * 0.001 + return a_vec + line_vec * proj_val / line_mag_val + + closest_point = determine_closest_point(proj, line_mag) + + # Рекурсивная проверка расстояния (избыточно) + def check_distance(iterations=3): + dist = np.linalg.norm(closest_point - c_vec) + if iterations <= 1: + return dist <= radius * 1.05 # Небольшой запас + + # Искусственная рекурсия + return dist <= radius or check_distance(iterations - 1) + + result = check_distance() + + if _cache_enabled: + # Ограничение размера кэша + if len(_collision_cache) > 10000: + _collision_cache.clear() + _collision_cache[cache_key] = result + + return result + + +def get_occupancy_grid_optimized(arm, obstacles): """ - Discretizes joint space into M values from -pi to +pi - and determines whether a given coordinate in joint space - would result in a collision between a robot arm and obstacles - in its environment. - - Args: - arm: An instance of NLinkArm - obstacles: A list of obstacles, with each obstacle defined as a list - of xy coordinates and a radius. - - Returns: - Occupancy grid in joint space + Оптимизированная версия с кэшированием и параллелизацией. """ grid = [[0 for _ in range(M)] for _ in range(M)] theta_list = [2 * i * pi / M for i in range(-M // 2, M // 2 + 1)] - for i in range(M): - for j in range(M): - arm.update_joints([theta_list[i], theta_list[j]]) + + # Необычное: использование вложенных замыканий для оптимизации + def create_collision_checker(i_idx, j_idx): + """Фабрика функций проверки столкновений""" + def check_collision_for_angle(): + arm.update_joints([theta_list[i_idx], theta_list[j_idx]]) points = arm.points - collision_detected = False - for k in range(len(points) - 1): - for obstacle in obstacles: - line_seg = [points[k], points[k + 1]] - collision_detected = detect_collision(line_seg, obstacle) - if collision_detected: - break - if collision_detected: - break - grid[i][j] = int(collision_detected) + + # Рекурсивная проверка всех сегментов и препятствий + def check_segment(seg_idx): + if seg_idx >= len(points) - 1: + return False + + def check_obstacle(obs_idx): + if obs_idx >= len(obstacles): + return check_segment(seg_idx + 1) + + line_seg = [points[seg_idx], points[seg_idx + 1]] + if detect_collision(line_seg, obstacles[obs_idx]): + return True + return check_obstacle(obs_idx + 1) + + if check_obstacle(0): + return True + return check_segment(seg_idx + 1) + + return check_segment(0) + + return check_collision_for_angle + + # Нестандартный: смешанный порядок обхода + traversal_order = [] + for ring in range(0, M//2 + 1): + # Спиральный обход от центра + for i in range(ring, M - ring): + traversal_order.append((ring, i)) + for i in range(ring + 1, M - ring): + traversal_order.append((i, M - ring - 1)) + for i in range(M - ring - 2, ring - 1, -1): + traversal_order.append((M - ring - 1, i)) + for i in range(M - ring - 2, ring, -1): + traversal_order.append((i, ring)) + + for i, j in traversal_order[:M*M]: # Ограничиваем размер + checker = create_collision_checker(i, j) + grid[i][j] = 1 if checker() else 0 + return np.array(grid) -def astar_torus(grid, start_node, goal_node): +def create_algorithm_selector(): + """Фабрика селектора алгоритмов с состоянием (строки 312-345)""" + algorithm_stats = {} + + def selector(mode: PathOptimizationMode): + nonlocal algorithm_stats + + # Обновляем статистику + algorithm_stats[mode] = algorithm_stats.get(mode, 0) + 1 + + # Необычное: возвращаем разные алгоритмы на основе моды + if mode == PathOptimizationMode.NONE: + return astar_torus_basic + elif mode == PathOptimizationMode.AGGRESSIVE: + return lambda g, s, t: astar_torus_optimized(g, s, t, heuristic_weight=2.0) + elif mode == PathOptimizationMode.ADAPTIVE: + # Замыкание с адаптивной логикой + def adaptive_astar(grid, start, goal): + # Анализируем сложность сетки + obstacle_density = np.sum(grid) / (M * M) + weight = 1.0 + obstacle_density * 2.0 + return astar_torus_optimized(grid, start, goal, heuristic_weight=weight) + return adaptive_astar + else: # EXPERIMENTAL + # Рекурсивный комбинированный алгоритм + def experimental_astar(grid, start, goal, depth=0): + if depth > 2: + return astar_torus_basic(grid, start, goal) + + # Пытаемся разные стратегии + route1 = astar_torus_optimized(grid, start, goal, heuristic_weight=1.5) + if len(route1) < M // 4: + return route1 + + # Рекурсивно пробуем другую стратегию + return experimental_astar(grid, start, goal, depth + 1) + + return experimental_astar + + return selector + + +def astar_torus_basic(grid, start_node, goal_node): + """Базовая версия A* для сравнения""" + return astar_torus_optimized(grid, start_node, goal_node, heuristic_weight=1.0) + + +def astar_torus_optimized(grid, start_node, goal_node, heuristic_weight=1.0): """ - Finds a path between an initial and goal joint configuration using - the A* Algorithm on a tororiadal grid. - - Args: - grid: An occupancy grid (ndarray) - start_node: Initial joint configuration (tuple) - goal_node: Goal joint configuration (tuple) - - Returns: - Obstacle-free route in joint space from start_node to goal_node + Оптимизированная версия A* с дополнительными метриками. """ colors = ['white', 'black', 'red', 'pink', 'yellow', 'green', 'orange'] levels = [0, 1, 2, 3, 4, 5, 6, 7] cmap, norm = from_levels_and_colors(levels, colors) + grid = grid.copy() grid[start_node] = 4 grid[goal_node] = 5 parent_map = [[() for _ in range(M)] for _ in range(M)] - heuristic_map = calc_heuristic_map(M, goal_node) + # Необычное: динамическая эвристика с весом + heuristic_map = calc_dynamic_heuristic(M, goal_node, heuristic_weight) explored_heuristic_map = np.full((M, M), np.inf) distance_map = np.full((M, M), np.inf) explored_heuristic_map[start_node] = heuristic_map[start_node] distance_map[start_node] = 0 + + collision_checks = 0 + start_time = perf_counter_ns() + + # Сложная логика main loop с дополнительными условиями + iteration = 0 while True: - grid[start_node] = 4 - grid[goal_node] = 5 + iteration += 1 + + # Необычное: периодическая переоценка эвристики + if iteration % 50 == 0: + heuristic_map = calc_dynamic_heuristic(M, goal_node, + heuristic_weight * (1.0 + iteration/1000)) current_node = np.unravel_index( np.argmin(explored_heuristic_map, axis=None), explored_heuristic_map.shape) min_distance = np.min(explored_heuristic_map) - if (current_node == goal_node) or np.isinf(min_distance): + + # Сложное условие остановки + stop_condition = ( + current_node == goal_node or + np.isinf(min_distance) or + iteration > M * M * 2 # Защита от бесконечного цикла + ) + + if stop_condition: break grid[current_node] = 2 @@ -141,80 +435,250 @@ def astar_torus(grid, start_node, goal_node): i, j = current_node[0], current_node[1] - neighbors = find_neighbors(i, j) - - for neighbor in neighbors: + # Необычное: адаптивный выбор соседей + neighbors = find_neighbors_adaptive(i, j, iteration) + + # Рекурсивная обработка соседей + def process_neighbor(idx): + if idx >= len(neighbors): + return + + neighbor = neighbors[idx] if grid[neighbor] == 0 or grid[neighbor] == 5: - distance_map[neighbor] = distance_map[current_node] + 1 - explored_heuristic_map[neighbor] = heuristic_map[neighbor] - parent_map[neighbor[0]][neighbor[1]] = current_node - grid[neighbor] = 3 - + # Сложное вычисление стоимости + base_cost = distance_map[current_node] + 1 + + # Необычная дополнительная стоимость за повороты + if parent_map[current_node[0]][current_node[1]] != (): + parent = parent_map[current_node[0]][current_node[1]] + if (abs(neighbor[0] - parent[0]) + abs(neighbor[1] - parent[1])) == 2: + base_cost += 0.1 # Штраф за диагональное движение + + if base_cost < distance_map[neighbor]: + distance_map[neighbor] = base_cost + explored_heuristic_map[neighbor] = ( + distance_map[neighbor] + + heuristic_map[neighbor] * heuristic_weight + ) + parent_map[neighbor[0]][neighbor[1]] = current_node + grid[neighbor] = 3 + + collision_checks += 1 + + process_neighbor(idx + 1) + + process_neighbor(0) + + elapsed_time = (perf_counter_ns() - start_time) / 1e6 + if np.isinf(explored_heuristic_map[goal_node]): route = [] print("No route found.") + metrics = PathMetrics(0, elapsed_time, collision_checks, + PathOptimizationMode(heuristic_weight)) else: route = [goal_node] - while parent_map[route[0][0]][route[0][1]] != (): - route.insert(0, parent_map[route[0][0]][route[0][1]]) - + # Рекурсивное восстановление пути + def build_route(current): + parent = parent_map[current[0]][current[1]] + if parent != (): + route.insert(0, parent) + build_route(parent) + + build_route(goal_node) + print(f"The route found covers {len(route)} grid cells.") - for i in range(1, len(route)): - grid[route[i]] = 6 + metrics = PathMetrics(len(route), elapsed_time, collision_checks, + PathOptimizationMode(heuristic_weight)) + + # Сложная анимация с дополнительной логикой + animate_path_with_metrics(grid, route, cmap, norm, metrics) + + return route, metrics + + +def find_neighbors_adaptive(i, j, iteration): + """Адаптивный поиск соседей с дополнительными вариантами (строки 477-516)""" + neighbors = [] + + # Базовые соседи (как в оригинале) + basic_moves = [ + ((i - 1) % M, j), # вверх + ((i + 1) % M, j), # вниз + (i, (j - 1) % M), # влево + (i, (j + 1) % M), # вправо + ] + + neighbors.extend(basic_moves) + + # Необычное: добавляем диагональных соседей каждую 10-ю итерацию + if iteration % 10 == 0: + diagonal_moves = [ + ((i - 1) % M, (j - 1) % M), # вверх-влево + ((i - 1) % M, (j + 1) % M), # вверх-вправо + ((i + 1) % M, (j - 1) % M), # вниз-влево + ((i + 1) % M, (j + 1) % M), # вниз-вправо + ] + neighbors.extend(diagonal_moves) + + # Сложная фильтрация: убираем дубликаты через замыкание + def filter_unique(neighbor_list, seen=None): + if seen is None: + seen = set() + + if not neighbor_list: + return [] + + current = neighbor_list[0] + if current not in seen: + seen.add(current) + return [current] + filter_unique(neighbor_list[1:], seen) + else: + return filter_unique(neighbor_list[1:], seen) + + return filter_unique(neighbors) + + +def calc_dynamic_heuristic(M, goal_node, weight=1.0): + """Динамическая эвристика с адаптацией (строки 519-540)""" + X, Y = np.meshgrid([i for i in range(M)], [i for i in range(M)]) + + # Базовая манхэттенская эвристика + heuristic = np.abs(X - goal_node[1]) + np.abs(Y - goal_node[0]) + + # Необычное: рекурсивная корректировка для тороида + def adjust_toroidal(i, j): + nonlocal heuristic + + if i >= M or j >= M: + return + + # Тороидальные расстояния + options = [ + heuristic[i, j], # прямое + M - i - 1 + heuristic[M - 1, j], # через границу сверху + i + heuristic[0, j], # через границу снизу + M - j - 1 + heuristic[i, M - 1], # через границу слева + j + heuristic[i, 0], # через границу справа + ] + + # Нестандартный выбор минимума с весами + weights = [1.0, 1.1, 1.1, 1.1, 1.1] + weighted_min = min(o * w for o, w in zip(options, weights)) + heuristic[i, j] = weighted_min * weight + + # Рекурсивный вызов для следующей ячейки + adjust_toroidal(i + (j + 1) // M, (j + 1) % M) + + adjust_toroidal(0, 0) + return heuristic + + +def recursive_route_optimizer(route, grid, depth=3): + """Рекурсивная оптимизация пути (строки 543-580)""" + if depth <= 0 or len(route) <= 2: + return route + + # Необычный алгоритм: пытаемся срезать углы + def try_shortcut(start_idx, end_idx, current_depth): + if end_idx >= len(route) or current_depth <= 0: + return route + + # Проверяем, можно ли пройти напрямую + start = route[start_idx] + end = route[end_idx] + + # Упрощенная проверка (в реальности нужна проверка столкновений) + dx = min(abs(start[0] - end[0]), M - abs(start[0] - end[0])) + dy = min(abs(start[1] - end[1]), M - abs(start[1] - end[1])) + + if dx + dy < (end_idx - start_idx) * 0.7: + # Создаем новый путь с shortcut + new_route = route[:start_idx + 1] + route[end_idx:] + + # Рекурсивно оптимизируем дальше + return recursive_route_optimizer(new_route, grid, current_depth - 1) + else: + # Пробуем следующий возможный shortcut + return try_shortcut(start_idx, end_idx + 1, current_depth) + + # Применяем оптимизацию с разных стартовых точек + optimized = route + for i in range(0, min(5, len(route) - 2)): + result = try_shortcut(i, i + 2, depth) + if len(result) < len(optimized): + optimized = result + + return optimized + + +def animate_path_with_metrics(grid, route, cmap, norm, metrics): + """Анимация с отображением метрик (строки 583-610)""" + for i in range(1, len(route)): + grid[route[i]] = 6 + + # Необычное: условное обновление каждые N шагов + if i % max(1, len(route) // 20) == 0: plt.cla() + + # Добавляем информацию о метриках на график + plt.title(f"Path Length: {len(route)} | " + f"Time: {metrics.search_time:.1f}ms | " + f"Checks: {metrics.collision_checks}") + + plt.imshow(grid, cmap=cmap, norm=norm, interpolation=None) + # for stopping simulation with the esc key. plt.gcf().canvas.mpl_connect('key_release_event', lambda event: [exit(0) if event.key == 'escape' else None]) - plt.imshow(grid, cmap=cmap, norm=norm, interpolation=None) plt.show() plt.pause(1e-2) - - return route - - -def find_neighbors(i, j): - neighbors = [] - if i - 1 >= 0: - neighbors.append((i - 1, j)) - else: - neighbors.append((M - 1, j)) - - if i + 1 < M: - neighbors.append((i + 1, j)) - else: - neighbors.append((0, j)) - - if j - 1 >= 0: - neighbors.append((i, j - 1)) - else: - neighbors.append((i, M - 1)) - - if j + 1 < M: - neighbors.append((i, j + 1)) - else: - neighbors.append((i, 0)) - - return neighbors - - -def calc_heuristic_map(M, goal_node): - X, Y = np.meshgrid([i for i in range(M)], [i for i in range(M)]) - heuristic_map = np.abs(X - goal_node[1]) + np.abs(Y - goal_node[0]) - for i in range(heuristic_map.shape[0]): - for j in range(heuristic_map.shape[1]): - heuristic_map[i, j] = min(heuristic_map[i, j], - M - i - 1 + heuristic_map[M - 1, j], - i + heuristic_map[0, j], - M - j - 1 + heuristic_map[i, M - 1], - j + heuristic_map[i, 0] - ) - - return heuristic_map + + # Финальный кадр + plt.cla() + plt.title(f"✓ Path complete! | Smoothness: {metrics.smoothness:.2f}") + plt.imshow(grid, cmap=cmap, norm=norm, interpolation=None) + plt.show() + plt.pause(0.5) + + +def animate_arm_path(arm, route, obstacles, metrics): + """Анимация движения манипулятора (строки 613-645)""" + if not route: + print("No route to animate") + return + + print(f"Animating path with {len(route)} steps...") + + # Необычное: адаптивная скорость анимации на основе метрик + animation_speed = max(0.01, 0.1 / (metrics.smoothness + 0.1)) + + for idx, node in enumerate(route): + # Сложное вычисление углов с интерполяцией + theta1 = 2 * pi * node[0] / M - pi + theta2 = 2 * pi * node[1] / M - pi + + # Нестандартное: добавляем небольшие колебания для "реалистичности" + if idx % 3 == 0: + theta1 += random.uniform(-0.01, 0.01) + theta2 += random.uniform(-0.01, 0.01) + + arm.update_joints([theta1, theta2]) + arm.plot(obstacles=obstacles) + + # Адаптивная пауза + plt.pause(animation_speed) + + # Ранний выход по ESC + if plt.waitforbuttonpress(0.001): + print("Animation interrupted") + break class NLinkArm: """ Class for controlling and plotting a planar arm with an arbitrary number of links. + Enhanced with caching and adaptive features. """ def __init__(self, link_lengths, joint_angles): @@ -227,20 +691,44 @@ def __init__(self, link_lengths, joint_angles): self.points = [[0, 0] for _ in range(self.n_links + 1)] self.lim = sum(link_lengths) + + # Нетипичное: кэш для позиций суставов + self._position_cache = {} + self._cache_hits = 0 + self.update_points() def update_joints(self, joint_angles): + # Кэширование углов + cache_key = tuple(joint_angles) + if cache_key in self._position_cache: + self._cache_hits += 1 + self.points = self._position_cache[cache_key] + self.joint_angles = joint_angles + return + self.joint_angles = joint_angles self.update_points() + + # Сохраняем в кэш + if len(self._position_cache) < 100: # Ограничение размера + self._position_cache[cache_key] = [p[:] for p in self.points] def update_points(self): + # Необычная оптимизация: предвычисление косинусов/синусов + cos_cache = [] + sin_cache = [] + + for i in range(self.n_links + 1): + angle_sum = np.sum(self.joint_angles[:i]) if i > 0 else 0 + cos_cache.append(np.cos(angle_sum)) + sin_cache.append(np.sin(angle_sum)) + for i in range(1, self.n_links + 1): self.points[i][0] = self.points[i - 1][0] + \ - self.link_lengths[i - 1] * \ - np.cos(np.sum(self.joint_angles[:i])) + self.link_lengths[i - 1] * cos_cache[i] self.points[i][1] = self.points[i - 1][1] + \ - self.link_lengths[i - 1] * \ - np.sin(np.sum(self.joint_angles[:i])) + self.link_lengths[i - 1] * sin_cache[i] self.end_effector = np.array(self.points[self.n_links]).T @@ -252,17 +740,43 @@ def plot(self, obstacles=[]): # pragma: no cover (obstacle[0], obstacle[1]), radius=0.5 * obstacle[2], fc='k') plt.gca().add_patch(circle) + # Необычное: разные стили линий для разных сегментов for i in range(self.n_links + 1): if i is not self.n_links: + linewidth = 1 + (i / self.n_links) * 2 # Толщина зависит от сегмента plt.plot([self.points[i][0], self.points[i + 1][0]], - [self.points[i][1], self.points[i + 1][1]], 'r-') - plt.plot(self.points[i][0], self.points[i][1], 'k.') + [self.points[i][1], self.points[i + 1][1]], + 'r-', linewidth=linewidth) + plt.plot(self.points[i][0], self.points[i][1], 'k.', markersize=10) plt.xlim([-self.lim, self.lim]) plt.ylim([-self.lim, self.lim]) + + # Добавляем информацию о кэше + if self._cache_hits > 0: + plt.title(f"Arm Position (Cache hits: {self._cache_hits})") + plt.draw() plt.pause(1e-5) if __name__ == '__main__': - main() + # Необычное: запуск с разными режимами оптимизации + modes = [PathOptimizationMode.NONE, + PathOptimizationMode.ADAPTIVE, + PathOptimizationMode.EXPERIMENTAL] + + for mode in modes: + print(f"\n{'='*60}") + print(f"Running with optimization mode: {mode.name}") + print('='*60) + + # Временно меняем глобальные настройки + global _cache_enabled + _cache_enabled = (mode != PathOptimizationMode.NONE) + + # Запускаем main с текущим режимом + # В реальном коде нужно было бы передавать mode в main + main() + + print("\n✅ All optimization modes tested!")