-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathregressor.py
More file actions
1010 lines (846 loc) · 37.3 KB
/
regressor.py
File metadata and controls
1010 lines (846 loc) · 37.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from deap import algorithms, tools, gp, base, creator
from deap.tools import migRing
import numpy as np
import operator
from typing import List, Dict, Callable
from os.path import join
import os
import ray
import random
from flex.gp.util import mapper, max_func, min_func, avg_func, std_func, fitness_value
from flex.gp.sympy import stringify_for_sympy
from flex.gp.numpy_primitives import conversion_rules
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_is_fitted, validate_data
from sympy.parsing.sympy_parser import parse_expr
from functools import partial
from itertools import chain
import numpy.typing as npt
from jax import Array
# reducing the number of threads launched by fitness evaluations
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["NUM_INTER_THREADS"] = "1"
os.environ["NUM_INTRA_THREADS"] = "1"
os.environ["XLA_FLAGS"] = (
"--xla_cpu_multi_thread_eigen=false " "intra_op_parallelism_threads=1"
)
class GPSymbolicRegressor(RegressorMixin, BaseEstimator):
"""Symbolic regression via Genetic Programming (GP).
This regressor evolves symbolic expressions represented as GP trees in order
to minimize a user-defined fitness function. It is built on top of DEAP and
follows the scikit-learn estimator interface.
The regressor supports:
- Arbitrary user-defined fitness, prediction, and scoring functions
- Multi-island evolution with periodic migration
- Elitism and overlapping or non-overlapping generations
- Parallel fitness evaluation using Ray
- Validation-set monitoring
- Conversion of the best individuals to a SymPy expression
Args:
pset_config: set of primitives and terminals (loosely or strongly typed).
fitness: fitness evaluation function. It must return a tuple containing a
single scalar fitness value, e.g. `(fitness_value,)`.
predict_func: function that returns a prediction given an individual and
a test dataset as inputs.
score_func: score metric used for validation and for the `score` method.
select_fun: string representing the selection operator to use.
select_args: stringified dictionary of keyword arguments passed to the
selection operator. The string is evaluated at runtime.
mut_fun: mutation operator.
mut_args: arguments for the mutation operator.
expr_mut_fun: expression generator used during mutation.
expr_mut_args: arguments for the mutation expression generator.
crossover_fun: crossover operator.
crossover_args: arguments for the crossover operator.
min_height: minimum height of GP trees at initialization.
max_height: maximum height of GP trees at initialization.
max_length: maximum number of nodes allowed in a GP tree.
num_individuals: population size per island.
generations: number of generations.
num_islands: number of islands (for a multi-island model).
remove_init_duplicates: whether to remove duplicate individuals from
the initial populations.
mig_freq: migration frequency (in generations).
mig_frac: fraction of individuals exchanged during migration.
crossover_prob: probability of applying crossover.
mut_prob: probability of applying mutation.
frac_elitist: fraction of elite individuals preserved each generation.
overlapping_generation: True if the offspring competes with the parents
for survival.
common_data: dictionary of arguments shared between fitness, prediction,
and scoring functions.
validate: whether to use a validation dataset.
preprocess_args: configuration for a function applied to individuals prior
to fitness evaluation. It must contain three keys: `func`, the callable to
execute. It must accept an individual and the toolbox as its first two
arguments; `func_args`: a dictionary of additional arguments for
func; `callback`: a function used to assign the resulting preprocessed
values back to each individual.
callback_func: function called after fitness evaluation to perform custom
processing.
seed_str: list of GP expressions used to seed the initial population.
print_log: whether to print the log containing the population statistics
during the run.
num_best_inds_str: number of best individuals printed at each generation.
save_best_individual: whether to save the string representation of the best
individual.
save_train_fit_history: whether to save the training fitness history.
output_path: directory where outputs are saved.
batch_size : batch size used for Ray-based fitness evaluation.
num_cpus: number of CPUs allocated to each Ray task.
max_calls: maximum number of tasks a Ray worker can execute before restart.
The default is `0`, which means infinite number of tasks.
custom_logger: user-defined logging function called with the best individuals.
multiprocessing: whether to use Ray for parallel fitness evaluation.
"""
def __init__(
self,
pset_config: gp.PrimitiveSet | gp.PrimitiveSetTyped,
fitness: Callable,
predict_func: Callable,
score_func: Callable | None = None,
select_fun: str = "tools.selection.tournament_with_elitism",
select_args: str = "{'num_elitist': self.n_elitist, 'tournsize': 3, 'stochastic_tourn': { 'enabled': False, 'prob': [0.8, 0.2] }}", # noqa: E501
mut_fun: str = "gp.mutUniform",
mut_args: str = "{'expr': toolbox.expr_mut, 'pset': pset}",
expr_mut_fun: str = "gp.genHalfAndHalf",
expr_mut_args: str = "{'min_': 1, 'max_': 3}",
crossover_fun: str = "gp.cxOnePoint",
crossover_args: str = "{}",
min_height: int = 1,
max_height: int = 3,
max_length: int = 100,
num_individuals: int = 10,
generations: int = 1,
num_islands: int = 1,
remove_init_duplicates: bool = False,
mig_freq: int = 10,
mig_frac: float = 0.05,
crossover_prob: float = 0.5,
mut_prob: float = 0.2,
frac_elitist: float = 0.0,
overlapping_generation: bool = False,
common_data: Dict | None = None,
validate: bool = False,
preprocess_args: Dict | None = None,
callback_func: Callable | None = None,
seed_str: List[str] | None = None,
print_log: bool = False,
num_best_inds_str: int = 1,
save_best_individual: bool = False,
save_train_fit_history: bool = False,
output_path: str | None = None,
batch_size: int = 1,
num_cpus: int = 1,
max_calls: int = 0,
custom_logger: Callable = None,
multiprocessing: bool = True,
):
super().__init__()
self.pset_config = pset_config
self.fitness = fitness
self.score_func = score_func
self.predict_func = predict_func
self.print_log = print_log
self.num_best_inds_str = num_best_inds_str
self.preprocess_args = preprocess_args
self.callback_func = callback_func
self.save_best_individual = save_best_individual
self.save_train_fit_history = save_train_fit_history
self.output_path = output_path
self.batch_size = batch_size
self.common_data = common_data
self.num_individuals = num_individuals
self.generations = generations
self.num_islands = num_islands
self.crossover_prob = crossover_prob
self.mut_prob = mut_prob
self.select_fun = select_fun
self.select_args = select_args
self.mut_fun = mut_fun
self.mut_args = mut_args
self.expr_mut_fun = expr_mut_fun
self.expr_mut_args = expr_mut_args
self.crossover_fun = crossover_fun
self.crossover_args = crossover_args
self.min_height = min_height
self.max_height = max_height
self.max_length = max_length
self.mig_freq = mig_freq
self.mig_frac = mig_frac
self.overlapping_generation = overlapping_generation
self.validate = validate
self.frac_elitist = frac_elitist
self.seed_str = seed_str
self.num_cpus = num_cpus
self.remove_init_duplicates = remove_init_duplicates
self.max_calls = max_calls
self.custom_logger = custom_logger
self.multiprocessing = multiprocessing
def __sklearn_tags__(self):
# since we are allowing cases in which y=None
# we need to modify the tag requires_y to False
# (check sklearn docs)
tags = super().__sklearn_tags__()
tags.target_tags.required = False
return tags
@property
def n_elitist(self):
"""Compute the number of elitists in the population"""
return int(self.frac_elitist * self.num_individuals)
def get_params(self, deep: bool = True):
return self.__dict__
def __creator_toolbox_pset_config(self):
"""Initialize toolbox and individual creator based on config file.
Returns:
a tuple containing the initialized toolbox and the primitive set.
"""
pset = self.pset_config
toolbox = base.Toolbox()
# SELECTION
toolbox.register("select", eval(self.select_fun), **eval(self.select_args))
# MUTATION
toolbox.register(
"expr_mut", eval(self.expr_mut_fun), **eval(self.expr_mut_args)
)
toolbox.register("mutate", eval(self.mut_fun), **eval(self.mut_args))
# CROSSOVER
toolbox.register("mate", eval(self.crossover_fun), **eval(self.crossover_args))
toolbox.decorate(
"mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17)
)
toolbox.decorate(
"mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17)
)
# INDIVIDUAL GENERATOR/CREATOR
toolbox.register(
"expr",
gp.genHalfAndHalf,
pset=pset,
min_=self.min_height,
max_=self.max_height,
max_length=self.max_length,
)
if not hasattr(creator, "FitnessMin"):
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
if not hasattr(creator, "Individual"):
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)
createIndividual = creator.Individual
toolbox.register(
"individual", tools.initIterate, createIndividual, toolbox.expr
)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)
if self.seed_str is not None:
self.seed_ind = [
createIndividual.from_string(i, pset) for i in self.seed_str
]
return toolbox, pset
def __init_data_store(self):
"""Initialize the store data dict with the common parameters."""
self.__data_store = dict()
if self.common_data is not None:
# FIXME: does everything work when the functions do not have common args?
self.__store_fit_score_common_args(self.common_data)
def __store_fit_score_common_args(self, data: Dict):
"""Store names and values of the arguments that are in common between
the fitness and the error metric functions in the common object space.
Args:
data: dictionary containing arguments names and values.
"""
self.__store_shared_objects("common", data)
def __store_datasets(self, datasets: Dict[str, npt.NDArray | Array]):
"""Store datasets with the corresponding label ("train", "val" or "test")
in the common object space. The datasets are passed as parameters to
the fitness, and possibly to the error metric and the prediction functions.
Args:
datasets: the keys are 'train', 'val' and 'test' denoting the training,
the validation and the test datasets, respectively. The associated
values are numpy or jax arrays.
"""
for dataset_label, dataset_data in datasets.items():
self.__store_shared_objects(dataset_label, dataset_data)
def __store_shared_objects(self, label: str, data: Dict):
"""Store a dictionary of data in the internal data store, optionally
converting values to Ray object references for shared-memory access.
Args:
label: key under which the data dictionary will be stored internally.
data: dictionary of objects to store.
"""
for key, value in data.items():
# replace each item of the dataset with its obj ref
if not isinstance(value, ray.ObjectRef) and self.multiprocessing:
data[key] = ray.put(value)
self.__data_store[label] = data
def __fetch_shared_objects(self, stored_data: Dict):
"""Retrieve objects from the Ray object store and reconstruct
a local dictionary of concrete values.
Args:
stored_data: dictionary potentially containing ``ray.ObjectRef`` values.
Returns:
a new dictionary where all Ray object references have been
dereferenced into concrete Python objects.
"""
fetched_data = dict()
for key, value in stored_data.items():
if isinstance(value, ray.ObjectRef):
fetched_data[key] = ray.get(value)
else:
fetched_data[key] = value
return fetched_data
def __print(self, message: str):
"""Helper to handle conditional printing.
Args:
message: message to print.
"""
if self.print_log:
print(message, flush=True)
def __init_stats_log(self):
"""Initialize logbook to collect statistics."""
self.__logbook = tools.Logbook()
# Headers of fields to be printed during log
if self.validate:
self.__logbook.header = "gen", "evals", "fitness", "size", "valid"
self.__logbook.chapters["valid"].header = ("valid_score",)
else:
self.__logbook.header = "gen", "evals", "fitness", "size"
self.__logbook.chapters["fitness"].header = "min", "avg", "max", "std"
self.__logbook.chapters["size"].header = "min", "avg", "max", "std"
# Initialize variables for statistics
self.__stats_fit = tools.Statistics(fitness_value)
self.__stats_size = tools.Statistics(len)
self.__mstats = tools.MultiStatistics(
fitness=self.__stats_fit, size=self.__stats_size
)
self.__mstats.register("avg", avg_func)
self.__mstats.register("std", std_func)
self.__mstats.register("min", min_func)
self.__mstats.register("max", max_func)
self.__train_fit_history = []
def __compute_valid_stats(self, pop: List, toolbox: base.Toolbox):
"""Compute the validation score of the best individual.
Args:
pop: a given population.
toolbox: the toolbox for the evolution.
Returns:
the validation score.
"""
best = tools.selBest(pop, k=1)
# FIXME: ugly way of handling lists/tuples; assume evaluate_val_score returns a
# single-valued tuple as eval_val_fit
valid_score = toolbox.map(toolbox.evaluate_val_score, best)[0]
return valid_score
def get_pop_stats(self):
"""Get population stats."""
pop = self.__flatten_list(self.__pop)
return self.__mstats.compile(pop)
def __stats(self, pop: List, gen: int, evals: int, toolbox: base.Toolbox):
"""Compute and print statistics of a population.
Args:
pop: a given population.
gen: the generation number.
evals: the number of the evaluations in the current generation.
toolbox: the toolbox for the evolution.
"""
# LINE_UP = '\033[1A'
# LINE_CLEAR = '\x1b[2K'
# Compile statistics for the current population
record = self.get_pop_stats()
# record the statistics in the logbook
if self.validate:
# compute satistics related to the validation set
valid_score = self.__compute_valid_stats(pop, toolbox)
record["valid"] = {"valid_score": valid_score}
self.__logbook.record(gen=gen, evals=evals, **record)
if self.print_log:
# Print statistics for the current population
# print(LINE_UP, end=LINE_CLEAR, flush=True)
print(self.__logbook.stream, flush=True)
def __get_remote(self, f: Callable):
"""Wraps a function for parallel execution if multiprocessing is enabled.
Args:
f: The function to be executed, typically a task or objective function.
Returns:
The Ray remote handle if multiprocessing is active, otherwise the
original function.
"""
if self.multiprocessing:
return ray.remote(num_cpus=self.num_cpus, max_calls=self.max_calls)(
f
).remote
else:
return f
def __register_fitness_func(self, toolbox: base.Toolbox):
"""Register fitness function in the toolbox.
Args:
toolbox: the toolbox for the evolution.
"""
store = self.__data_store
args_train = store["common"] | store["train"]
toolbox.register(
"evaluate_train", self.__get_remote(self.fitness), **args_train
)
def __register_val_funcs(self, toolbox: base.Toolbox):
"""Register the functions needed for validation, i.e. the score metric and the
fitness function. Must be called after storing the datasets in the common
obj space.
Args:
toolbox: the toolbox for the evolution.
"""
store = self.__data_store
args_val = store["common"] | store["val"]
toolbox.register(
"evaluate_val_fit", self.__get_remote(self.fitness), **args_val
)
toolbox.register(
"evaluate_val_score", self.__get_remote(self.score_func), **args_val
)
def __register_map(self, toolbox: base.Toolbox):
"""Register mapper in the toolbox.
Args:
toolbox: the toolbox for the evolution.
"""
if self.multiprocessing:
toolbox_ref = ray.put(toolbox)
toolbox.register(
"map", mapper, toolbox_ref=toolbox_ref, batch_size=self.batch_size
)
else:
def base_mapper(f, individuals, toolbox):
individuals_batch = [[ind] for ind in individuals]
fitnesses = map(partial(f, toolbox=toolbox), individuals_batch)
return list(chain.from_iterable(fitnesses))
toolbox.register("map", base_mapper, toolbox=toolbox)
def _prepare_fit(
self,
X: npt.NDArray | Array,
y: npt.NDArray | Array,
X_val: npt.NDArray | Array,
y_val: npt.NDArray | Array,
):
"""Prepare datasets, internal state, and the DEAP toolbox for evolution.
Args:
X: training input features.
y: training target values. Can be None for unsupervised tasks.
X_val: validation input features.
y_val: validation target values. Can be None for unsupervised tasks.
Returns:
a configured DEAP toolbox containing registered evaluation and
preprocessing functions.
"""
validated_data = validate_data(
self,
X,
y,
accept_sparse=False,
skip_check_array=True,
# ensure_2d=False,
# allow_nd=True,
# multi_output=True,
)
if y is None:
X = validated_data
train_data = {"X": X}
else:
X, y = validated_data
train_data = {"X": X, "y": y}
if self.validate and X_val is not None:
if y_val is None:
val_data = {"X": X_val}
else:
val_data = {"X": X_val, "y": y_val}
datasets = {"train": train_data, "val": val_data}
else:
datasets = {"train": train_data}
# config individual creator and toolbox
toolbox, _ = self.__creator_toolbox_pset_config()
self.__init_data_store()
self.__store_datasets(datasets)
self.__init_stats_log()
# register functions for fitness evaluation (train/val)
self.__register_map(toolbox)
self.__register_fitness_func(toolbox)
if self.validate and self.score_func is not None:
self.__register_val_funcs(toolbox)
if self.preprocess_args is not None:
toolbox.register(
"preprocess_func",
self.__get_remote(self.preprocess_args["func"]),
**self.preprocess_args["func_args"],
)
return toolbox
# @_fit_context(prefer_skip_nested_validation=True)
def fit(
self,
X: npt.NDArray | Array,
y: npt.NDArray | Array = None,
X_val: npt.NDArray | Array = None,
y_val: npt.NDArray | Array = None,
):
"""Fits the training data using GP-based symbolic regression.
This method initializes the populations, evaluates the fitness of the
individuals, and evolves the populations for the specified number of
generations.
Args:
X: training input data.
y: training targets. If None, the fitness function must not require
targets.
X_val: validation input data.
y_val: validation targets.
"""
toolbox = self._prepare_fit(X, y, X_val, y_val)
self.__run(toolbox)
self.is_fitted_ = True
return self
def predict(self, X: npt.NDArray | Array):
"""Predict outputs using the best evolved individual.
Args:
X: Input data.
Returns:
predictions computed by the best individual.
"""
check_is_fitted(self)
toolbox, pset = self.__creator_toolbox_pset_config()
X = validate_data(
self, X, accept_sparse=False, reset=False, skip_check_array=True
)
test_data = {"X": X}
store = self.__data_store
args_predict_func = self.__fetch_shared_objects(store["common"]) | test_data
u_best = self.predict_func((self._best,), toolbox=toolbox, **args_predict_func)[
0
]
return u_best
def score(self, X: npt.NDArray | Array, y: npt.NDArray | Array = None):
"""Compute the score of the best evolved individual.
This method evaluates the user-provided `score_func` on the given dataset.
Args:
X: input data.
y: target values.
Returns:
score value returned by `score_func`.
"""
check_is_fitted(self)
toolbox, pset = self.__creator_toolbox_pset_config()
validated_data = validate_data(
self, X, y, accept_sparse=False, reset=False, skip_check_array=True
)
if y is None:
X = validated_data
test_data = {"X": X}
else:
X, y = validated_data
test_data = {"X": X, "y": y}
store = self.__data_store
args_score_func = self.__fetch_shared_objects(store["common"]) | test_data
score = self.score_func((self._best,), toolbox=toolbox, **args_score_func)[0]
return score
def __flatten_list(self, nested_lst: List):
"""Convert a list of lists into a single flat list.
Args:
nested_lst: a list containing multiple sublists.
Returns:
a single list containing all elements of the sublists in order.
"""
flat_list = []
for lst in nested_lst:
flat_list += lst
return flat_list
def __unflatten_list(self, flat_lst: List, lengths: List):
"""Restore a flat list into a list of sublists based on provided lengths.
Args:
flat_lst: the single-dimensional list to be partitioned.
lengths: a list of integers specifying the size of each original sublist.
Returns:
a list of lists reconstructed to match the original structure.
"""
result = []
start = 0 # Starting index of the current sublist
for length in lengths:
# Slice the list from the current start index to start+length
end = start + length
result.append(flat_lst[start:end])
start = end # Update the start index for the next sublist
return result
def __evolve_islands(self, cgen: int, toolbox: base.Toolbox):
"""Performs a single iteration of the evolution pipeline with the
multi-islands strategy.
Args:
cgen: current generation index.
toolbox: the toolbox for the evolution.
Returns:
the total number of evaluations.
"""
num_evals = 0
invalid_inds = [None] * self.num_islands
offsprings = [None] * self.num_islands
elite_inds = [None] * self.num_islands
for i in range(self.num_islands):
# Select the parents for the offspring
offsprings[i] = list(map(toolbox.clone, toolbox.select(self.__pop[i])))
# Apply crossover and mutation to the offspring with elitism
elite_inds[i] = tools.selBest(offsprings[i], self.n_elitist)
offsprings[i] = elite_inds[i] + algorithms.varOr(
offsprings[i],
toolbox,
self.num_individuals - self.n_elitist,
self.crossover_prob,
self.mut_prob,
)
# add individuals subject to cross-over and mutation to the list of invalids
invalid_inds[i] = [ind for ind in offsprings[i] if not ind.fitness.valid]
num_evals += len(invalid_inds[i])
if self.preprocess_args is not None:
preprocess_values = toolbox.map(
toolbox.preprocess_func, invalid_inds[i]
)
self.preprocess_args["callback"](invalid_inds[i], preprocess_values)
fitnesses = toolbox.map(
toolbox.evaluate_train, self.__flatten_list(invalid_inds)
)
fitnesses = self.__unflatten_list(fitnesses, [len(i) for i in invalid_inds])
for i in range(self.num_islands):
if self.callback_func is not None:
self.callback_func(invalid_inds[i], fitnesses[i])
else:
for ind, fit in zip(invalid_inds[i], fitnesses[i]):
ind.fitness.values = fit
# survival selection
if not self.overlapping_generation:
# The population is entirely replaced by the offspring
self.__pop[i][:] = offsprings[i]
else:
# parents and offspring compete for survival (truncation selection)
self.__pop[i] = tools.selBest(
self.__pop[i] + offsprings[i], self.num_individuals
)
# migrations among islands
if cgen % self.mig_frac == 0 and self.num_islands > 1:
migRing(
self.__pop,
int(self.mig_frac * self.num_individuals),
selection=random.sample,
)
return num_evals
def __remove_duplicates(self, toolbox: base.Toolbox):
"""Remove duplicates in the population.
Args:
toolbox: the toolbox for the evolution.
"""
for i in range(self.num_islands):
while True:
fitnesses = toolbox.map(toolbox.evaluate_train, self.__pop[i])
if self.callback_func is not None:
self.callback_func(self.__pop[i], fitnesses)
else:
for ind, fit in zip(self.__pop[i], fitnesses):
ind.fitness.values = fit
fitness_array = np.array(
[ind.fitness.values[0] for ind in self.__pop[i]]
)
# Identify unique fitness indices
_, idx_unique = np.unique(fitness_array, return_index=True)
# Identify duplicate indices
dup_indices = np.setdiff1d(np.arange(len(fitnesses)), idx_unique)
# Identify indices with fitness above threshold
threshold_indices = np.where(fitness_array > 1e5)[0]
# Combine both types of bad indices
bad_indices = np.unique(
np.concatenate([dup_indices, threshold_indices])
)
if len(bad_indices) == 0:
break
for idx in bad_indices:
self.__pop[i][idx] = toolbox.individual()
def get_best_individuals(self, n_ind: int = 1):
"""Returns the best individuals across all islands.
Args:
n_ind : number of top individuals to return.
Returns:
List of the best GP individuals.
"""
best_inds = tools.selBest(self.__flatten_list(self.__pop), k=n_ind)
return best_inds[:n_ind]
def _step(self, toolbox: base.Toolbox, cgen: int):
"""Performs a single step of the evolution pipeline.
Args:
toolbox: the toolbox for the evolution.
cgen: current generation index.
"""
num_evals = self.__evolve_islands(cgen, toolbox)
# select the best individuals in the current population
# (including all islands)
best_inds = self.get_best_individuals(self.num_best_inds_str)
# compute and print population statistics (including all islands)
self.__stats(self.__flatten_list(self.__pop), cgen, num_evals, toolbox)
if self.print_log:
print("Best individuals of this generation:", flush=True)
for i in range(self.num_best_inds_str):
print(str(best_inds[i]), flush=True)
if self.custom_logger is not None:
self.custom_logger(best_inds)
# Update history of best fitness and best validation score
self.__train_fit_history = self.__logbook.chapters["fitness"].select("min")
if self.validate:
self.__val_score_history = self.__logbook.chapters["valid"].select(
"valid_score"
)
self.max_val_score = max(self.__val_score_history)
self._best = best_inds[0]
def _restart(self, toolbox: base.Toolbox, save_best_inds: bool = True):
"""Re-initializes the population while optionally preserving the best
individuals.
Args:
toolbox: the toolbox for the evolution.
save_best_inds: whether to keep the best individual from each island
in the new population. Defaults to True.
"""
best_inds = [None] * self.num_islands
for i in range(self.num_islands):
best_inds[i] = tools.selBest(self.__pop[i], k=1)[0]
self._generate_init_pop(toolbox)
for i in range(self.num_islands):
self.__pop[i][0] = best_inds[i]
def _generate_init_pop(self, toolbox: base.Toolbox):
"""Generates the initial population.
Args:
toolbox: the toolbox for the evolution.
"""
self.__pop = [None] * self.num_islands
for i in range(self.num_islands):
self.__pop[i] = toolbox.population(n=self.num_individuals)
# Seeds the first island with individuals
if self.seed_str is not None:
self.__print(" Seeding population with individuals...")
self.__pop[0][: len(self.seed_ind)] = self.seed_ind
if self.remove_init_duplicates:
self.__print(" Removing duplicates from initial population(s)...")
self.__remove_duplicates(toolbox)
self.__print(" DONE.")
if self.preprocess_args is not None:
for i in range(self.num_islands):
preprocess_values = toolbox.map(toolbox.preprocess_func, self.__pop[i])
self.preprocess_args["callback"](self.__pop[i], preprocess_values)
def _evaluate_init_pop(self, toolbox: base.Toolbox):
"""Evaluates the initial population.
Args:
toolbox: the toolbox for the evolution.
"""
for i in range(self.num_islands):
fitnesses = toolbox.map(toolbox.evaluate_train, self.__pop[i])
if self.callback_func is not None:
self.callback_func(self.__pop[i], fitnesses)
else:
for ind, fit in zip(self.__pop[i], fitnesses):
ind.fitness.values = fit
def __run(self, toolbox: base.Toolbox):
"""Performs the evolution pipeline.
Args:
toolbox: the toolbox for the evolution.
"""
self.__print("Generating initial population(s)...")
self._generate_init_pop(toolbox)
self.__print("DONE.")
# Evaluate the fitness of the entire population on the training set
self.__print("Evaluating initial population(s)...")
self._evaluate_init_pop(toolbox)
self.__print("DONE.")
if self.validate:
self.__print("Using validation dataset.")
self.__print(" -= START OF EVOLUTION =- ")
for gen in range(self.generations):
self.__cgen = gen + 1
self._step(toolbox, self.__cgen)
if self._best.fitness.values[0] <= 1e-15:
self.__print("Fitness threshold reached - STOPPING.")
break
self.__print(" -= END OF EVOLUTION =- ")
self.__last_gen = self.__cgen
self.__print(f"The best individual is {self._best}")
self.__print(
f"The best fitness on the training set is {self.__train_fit_history[-1]}"
)
if self.validate:
self.__print(
f"The best score on the validation set is {self.max_val_score}"
)
if self.save_best_individual and self.output_path is not None:
self.__save_best_individual(self.output_path)
self.__print("String of the best individual saved to disk.")
if self.save_train_fit_history and self.output_path is not None:
self.__save_train_fit_history(self.output_path)
self.__print("Training fitness history saved to disk.")
# NOTE: ray.shutdown should be manually called by the user
def __save_best_individual(self, output_path: str):
"""Saves the string of the best individual of the population in a .txt file.
Args:
output_path: path where the history should be saved.
"""
file = open(join(output_path, "best_ind.txt"), "w")
file.write(str(self._best))
file.close()
def __save_train_fit_history(self, output_path: str):
"""Saves the training (and validation) history in a .npy file.
Args:
output_path: path where the history should be saved.
"""
np.save(join(output_path, "train_fit_history.npy"), self.__train_fit_history)
if self.validate:
np.save(
join(output_path, "val_score_history.npy"), self.__val_score_history
)
def get_best_individuals_sympy(
self,
sympy_conversion_rules: Dict = conversion_rules,
special_term_name: str = "c",
n_ind: int = 1,
):
"""Returns the SymPy expression of the best individuals.
Args:
sympy_conversion_rules: mapping from GP primitives (DEAP) to SymPy
primitives.
special_term_name: name used for constants during SymPy conversion.
n_ind: number of best individuals to convert to SymPy.
Returns:
sympy representation of the best individuals.
"""
best_inds = self.get_best_individuals(n_ind=n_ind)
best_sympy = [None] * n_ind
for i in range(n_ind):
best_sympy[i] = parse_expr(
stringify_for_sympy(
best_inds[i], sympy_conversion_rules, special_term_name
)
)
return best_sympy
def get_train_fit_history(self):
"""Returns the training score history.
Returns:
list containing the validation scores at each generation.
"""
return self.__train_fit_history
def get_val_score_history(self):
"""Returns the validation score history.
Returns:
list containing the validation scores at each generation.
"""
return self.__val_score_history
def get_last_gen(self):
"""Returns the last generation index.
Returns:
the last generation.
"""
return self.__last_gen
def save_best_test_sols(self, X_test: npt.NDArray | Array, output_path: str):
"""Compute and save the predictions corresponding to the best individual
at the end of the evolution, evaluated over the test dataset.
Args: