-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBO_Iterator.py
More file actions
295 lines (251 loc) · 10.1 KB
/
BO_Iterator.py
File metadata and controls
295 lines (251 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import numpy as np
from typing import List, Tuple, Dict, Union, Iterator, Callable, Any
import torch
from torch.quasirandom import SobolEngine
from abc import ABC, abstractmethod
from ax.modelbridge.generation_strategy import GenerationStrategy, GenerationStep
from ax.modelbridge.registry import Models
from ax.service.ax_client import AxClient
from ax.service.utils.instantiation import ObjectiveProperties
class BaseOptimizerIterator(ABC, Iterator):
"""
A generic base class for optimization iterators.
"""
def __init__(
self,
param_names: List[str],
param_bounds: List[Tuple[float, float]],
objective_function: Callable[[List[float]], Tuple[float, float]] = None,
threshold: Union[float, None] = None,
maximize: bool = True,
record_data: bool = True, # <-- Add a boolean flag here
):
if len(param_names) != len(param_bounds):
raise ValueError("param_names and param_bounds must match in length.")
self.param_names = param_names
self.param_bounds = param_bounds
self.objective_function = objective_function
self.threshold = threshold
self.maximize = maximize
# Whether to store observed data
self.record_data = record_data
# Store all observations in a list of {"params": ..., "objectives": ...}
self.observations = []
self.current_step = 0
self.final_model = None # To store the final model or state
# Track best (for convenience)
self.best_objectives = float("-inf") if maximize else float("inf")
self.best_params = None
def evaluate_objective(self, params_dict: Dict[str, float]) -> Tuple[float, float]:
"""
Evaluate the objective_function (if defined) on the provided params_dict.
"""
if self.objective_function is None:
raise ValueError("Objective function is not defined.")
return self.objective_function([params_dict[name] for name in self.param_names])
def record_observation(
self,
params_dict: Dict[str, float],
objectives_tuple: Union[float, Tuple[float, float]],
) -> None:
"""
Store the newly observed params & objectives in self.observations
"""
if self.record_data:
self.observations.append(
{"params": params_dict, "objectives": objectives_tuple}
)
def should_stop(self, objectives: Union[float, Tuple[float, float]]) -> bool:
"""
Check whether we've exceeded (for maximize) or fallen below (for minimize)
the threshold.
"""
if self.threshold is None:
return False
# If objectives is a tuple, interpret the first element as the primary objective
primary_obj = objectives[0] if isinstance(objectives, tuple) else objectives
if self.maximize:
return primary_obj >= self.threshold
else:
return primary_obj <= self.threshold
def get_all_observations(self) -> Dict[str, Any]:
"""
Return a dict containing all observed (params, objectives) pairs
and also the best result found so far.
"""
params_list = [obs["params"] for obs in self.observations]
objectives_list = [obs["objectives"] for obs in self.observations]
return {
"params": params_list,
"objectives": objectives_list,
"best_result": {
"params": self.best_params,
"objectives": self.best_objectives,
},
}
def get_final_model(self) -> Any:
"""Retrieve the final model or state after optimization."""
return self.final_model
@abstractmethod
def __next__(self) -> Dict[str, float]:
pass
class BayesianOptimizerIterator(BaseOptimizerIterator):
def __init__(
self,
objective_function: Callable[[List[float]], Tuple[float, float]],
param_names: List[str],
param_bounds: List[Tuple[float, float]],
num_sobol: int = 20,
num_gpei: int = 30,
max_parallelism: int = None,
threshold: Union[float, None] = None,
epsilon: float = 0.001,
patience: int = 20,
maximize: bool = True,
record_data: bool = True,
):
super().__init__(
param_names=param_names,
param_bounds=param_bounds,
objective_function=objective_function,
threshold=threshold,
maximize=maximize,
record_data=record_data,
)
self.num_sobol = num_sobol
self.num_gpei = num_gpei
self.num_trials = num_sobol + num_gpei
self.max_parallelism = max_parallelism
self.epsilon = epsilon
self.patience = patience
self.no_improvement_count = 0
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.generation_strategy = GenerationStrategy(
steps=[
GenerationStep(model=Models.SOBOL, num_trials=num_sobol),
GenerationStep(
model=Models.GPEI,
num_trials=num_gpei,
max_parallelism=max_parallelism,
model_kwargs={"torch_device": self.device},
),
]
)
self.ax_client = AxClient(generation_strategy=self.generation_strategy)
self.ax_client.create_experiment(
name="sobol_gpei_optimization",
parameters=[
{"name": name, "type": "range", "bounds": bounds}
for name, bounds in zip(param_names, param_bounds)
],
objectives={"objective": ObjectiveProperties(minimize=not maximize)},
)
def __next__(self) -> Dict[str, float]:
if self.current_step >= self.num_trials:
self.final_model = self.ax_client.generation_strategy.model
raise StopIteration
# Get next params & evaluate
trial_params, trial_index = self.ax_client.get_next_trial()
obj_mean, obj_sem = self.evaluate_objective(trial_params)
self.ax_client.complete_trial(trial_index, {"objective": (obj_mean, obj_sem)})
# Record in the iterator's internal observations list (only if record_data=True)
self.record_observation(trial_params, (obj_mean, obj_sem))
# Update best
if (self.maximize and obj_mean > self.best_objectives) or (
not self.maximize and obj_mean < self.best_objectives
):
self.best_objectives = obj_mean
self.best_params = trial_params
self.no_improvement_count = 0
else:
# Only increment no_improvement_count in GPEI phase
if self.current_step >= self.num_sobol:
self.no_improvement_count += 1
# Threshold check
if self.should_stop(self.best_objectives):
print("Stopping early: Threshold exceeded.")
self.final_model = self.ax_client.generation_strategy.model
raise StopIteration
# Patience check (only in GPEI)
if (
self.current_step >= self.num_sobol
and self.no_improvement_count > self.patience
):
print("Stopping early: No improvement under GPEI for too long.")
self.final_model = self.ax_client.generation_strategy.model
raise StopIteration
self.current_step += 1
return trial_params
class SobolIterator(BaseOptimizerIterator):
def __init__(
self,
param_names: List[str],
param_bounds: List[Tuple[float, float]],
n_sobol: int = 30,
record_data: bool = True,
**kwargs,
):
"""
Initialize the Sobol Iterator.
"""
super().__init__(
param_names=param_names,
param_bounds=param_bounds,
record_data=record_data,
**kwargs,
)
self.n_sobol = n_sobol
self.sobol_engine = SobolEngine(dimension=len(param_names), scramble=True)
def __next__(self) -> Dict[str, float]:
if self.current_step >= self.n_sobol:
raise StopIteration
sobol_pt = self.sobol_engine.draw(1).numpy()[0]
params_dict = {
name: low + sobol_pt[i] * (high - low)
for i, (name, (low, high)) in enumerate(
zip(self.param_names, self.param_bounds)
)
}
self.current_step += 1
if self.objective_function:
objectives_tuple = self.evaluate_objective(params_dict)
self.record_observation(params_dict, objectives_tuple)
# Update best
primary_obj = objectives_tuple[0] # (mean, sem)
if (self.maximize and primary_obj > self.best_objectives) or (
not self.maximize and primary_obj < self.best_objectives
):
self.best_objectives = primary_obj
self.best_params = params_dict
if self.should_stop(objectives_tuple):
print("Stopping early: threshold exceeded.")
raise StopIteration
return params_dict
class SyntheticGaussian:
"""
A synthetic response object simulating a noisy Gaussian-like surface
as the objective.
"""
def __init__(self, centers: List[float], sigma: float = 0.1, n_samples: int = 1):
self.centers = np.array(centers)
self.sigma = sigma
self.n_samples = n_samples
def read(self, params: List[float]) -> Tuple[float, float]:
"""
Compute the objective value (mean, sem).
"""
params_arr = np.array(params)
# Gaussian-like function
base_obj = np.exp(
-np.sum((params_arr - self.centers) ** 2) / (2 * self.sigma**2)
)
if self.n_samples > 1:
# Multiple draws => add noise, average, compute sem
noisy_objs = [
base_obj + np.random.normal(0, self.sigma)
for _ in range(self.n_samples)
]
mean_obj = np.mean(noisy_objs)
sem_obj = np.std(noisy_objs) / np.sqrt(self.n_samples)
return mean_obj, sem_obj
return base_obj, 0.0