diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index ac2857cc226..299c50718e0 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -155,18 +155,23 @@ jobs: - name: Run Scuro Python Tests if: ${{ matrix.test_mode == 'scuro' }} + env: + TORCH_HOME: ${{ github.workspace }}/.torch run: | + ( while true; do echo "."; sleep 25; done ) & + KA=$! + pip install --upgrade pip wheel setuptools + # Use CUDA 12.1 wheels to avoid slow/source builds + pip install --extra-index-url https://download.pytorch.org/whl/cu121 \ + torch==2.4.1 torchvision==0.19.1 pip install \ - torchvision \ transformers \ opencv-python \ - torch \ librosa \ h5py \ gensim \ opt-einsum \ nltk + kill $KA cd src/main/python - python -m unittest discover -s tests/scuro -p 'test_*.py' - - + python -m unittest discover -s tests/scuro -p 'test_*.py' -v \ No newline at end of file diff --git a/src/main/python/systemds/scuro/__init__.py b/src/main/python/systemds/scuro/__init__.py index 8e83c865a2a..b567b300247 100644 --- a/src/main/python/systemds/scuro/__init__.py +++ b/src/main/python/systemds/scuro/__init__.py @@ -67,19 +67,13 @@ from systemds.scuro.modality.joined import JoinedModality from systemds.scuro.modality.joined_transformed import JoinedTransformedModality from systemds.scuro.modality.modality import Modality -from systemds.scuro.modality.modality_identifier import ModalityIdentifier +from systemds.scuro.utils.identifier import Identifier from systemds.scuro.modality.transformed import TransformedModality from systemds.scuro.modality.type import ModalityType from systemds.scuro.modality.unimodal_modality import UnimodalModality -from systemds.scuro.drsearch.dr_search import DRSearch from systemds.scuro.drsearch.task import Task -from systemds.scuro.drsearch.fusion_optimizer import FusionOptimizer from systemds.scuro.drsearch.operator_registry import Registry from systemds.scuro.drsearch.optimization_data import OptimizationData -from systemds.scuro.drsearch.representation_cache import RepresentationCache -from systemds.scuro.drsearch.unimodal_representation_optimizer import ( - UnimodalRepresentationOptimizer, -) from systemds.scuro.representations.covarep_audio_features import ( RMSE, Spectral, @@ -131,17 +125,13 @@ "JoinedModality", "JoinedTransformedModality", "Modality", - "ModalityIdentifier", + "Identifier", "TransformedModality", "ModalityType", "UnimodalModality", - "DRSearch", "Task", - "FusionOptimizer", "Registry", "OptimizationData", - "RepresentationCache", - "UnimodalRepresentationOptimizer", "UnimodalOptimizer", "MultimodalOptimizer", "ZeroCrossing", diff --git a/src/main/python/systemds/scuro/dataloader/base_loader.py b/src/main/python/systemds/scuro/dataloader/base_loader.py index f21f212e7a0..33b418efb30 100644 --- a/src/main/python/systemds/scuro/dataloader/base_loader.py +++ b/src/main/python/systemds/scuro/dataloader/base_loader.py @@ -127,8 +127,8 @@ def _load(self, indices: List[str]): if isinstance(file_names, str): self.extract(file_names, indices) else: - for file_name in file_names: - self.extract(file_name) + for i, file_name in enumerate(file_names): + self.extract(file_name, indices[i]) return self.data, self.metadata diff --git a/src/main/python/systemds/scuro/drsearch/dr_search.py b/src/main/python/systemds/scuro/drsearch/dr_search.py deleted file mode 100644 index 601001c7428..00000000000 --- a/src/main/python/systemds/scuro/drsearch/dr_search.py +++ /dev/null @@ -1,167 +0,0 @@ -# ------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# ------------------------------------------------------------- -import itertools -import random -from typing import List - -from systemds.scuro.drsearch.task import Task -from systemds.scuro.modality.modality import Modality -from systemds.scuro.representations.representation import Representation - -import warnings - -warnings.filterwarnings("ignore") - - -def get_modalities_by_name(modalities, name): - for modality in modalities: - if modality.name == name: - return modality - - raise "Modality " + name + "not in modalities" - - -class DRSearch: - def __init__( - self, - modalities: List[Modality], - task: Task, - representations: List[Representation], - ): - """ - The DRSearch primitive finds the best uni- or multimodal data representation for the given modalities for - a specific task - :param modalities: List of uni-modal modalities - :param task: custom task - :param representations: List of representations to be evaluated - """ - self.modalities = modalities - self.task = task - self.representations = representations - self.scores = {} - self.best_modalities = None - self.best_representation = None - self.best_score = -1 - - def set_best_params( - self, - representation: Representation, - scores: List[float], - modality_names: List[str], - ): - """ - Updates the best parameters for given modalities, representation, and score - :param representation: The representation used to retrieve the current score - :param scores: achieved train/test scores for the set of modalities and representation - :param modality_names: List of modality names used in this setting - :return: - """ - - # check if modality name is already in dictionary - if "_".join(modality_names) not in list(self.scores.keys()): - # if not add it to dictionary - self.scores["_".join(modality_names)] = {} - - # set score for representation - self.scores["_".join(modality_names)][representation] = scores - - # compare current score with best score - if scores[1] > self.best_score: - self.best_score = scores[1] - self.best_representation = representation - self.best_modalities = modality_names - - def reset_best_params(self): - self.best_score = -1 - self.best_modalities = None - self.best_representation = None - self.scores = {} - - def fit_random(self, seed=-1): - """ - This method randomly selects a modality or combination of modalities and representation - """ - if seed != -1: - random.seed(seed) - - modalities = [] - for M in range(1, len(self.modalities) + 1): - for combination in itertools.combinations(self.modalities, M): - modalities.append(combination) - - modality_combination = random.choice(modalities) - representation = random.choice(self.representations) - - modality = modality_combination[0].combine( - list(modality_combination[1:]), representation - ) - - scores = self.task.run(modality.data) - self.set_best_params(representation, scores, modality.get_modality_names()) - - return self.best_representation, self.best_score, self.best_modalities - - def fit_enumerate_all(self): - """ - This method finds the best representation out of a given List of uni-modal modalities and - representations - :return: The best parameters found in the search procedure - """ - - for M in range(1, len(self.modalities) + 1): - for combination in itertools.combinations(self.modalities, M): - for representation in self.representations: - modality = combination[0] - if len(combination) > 1: - modality = combination[0].combine( - list(combination[1:]), representation - ) - - scores = self.task.run(modality.data) - self.set_best_params( - representation, - scores, - modality.get_modality_names(), - ) - - return self.best_representation, self.best_score, self.best_modalities - - def transform(self, modalities: List[Modality]): - """ - The transform method takes a list of uni-modal modalities and creates an aligned representation - by using the best parameters found during the fitting step - :param modalities: List of uni-modal modalities - :return: aligned data - """ - - if self.best_score == -1: - raise "Please fit representations first!" - - used_modalities = [] - - for modality_name in self.best_modalities: - used_modalities.append(get_modalities_by_name(modalities, modality_name)) - - modality = used_modalities[0].combine( - used_modalities[1:], self.best_representation - ) - - return modality.data diff --git a/src/main/python/systemds/scuro/drsearch/fusion_optimizer.py b/src/main/python/systemds/scuro/drsearch/fusion_optimizer.py deleted file mode 100644 index 7247720f555..00000000000 --- a/src/main/python/systemds/scuro/drsearch/fusion_optimizer.py +++ /dev/null @@ -1,295 +0,0 @@ -# ------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# ------------------------------------------------------------- -import time -import copy -import pickle -from systemds.scuro.drsearch.operator_registry import Registry -from systemds.scuro.drsearch.optimization_data import ( - OptimizationResult, - OptimizationStatistics, -) -from systemds.scuro.drsearch.representation_cache import RepresentationCache -from systemds.scuro.drsearch.task import Task -from systemds.scuro.representations.aggregate import Aggregation -from systemds.scuro.representations.context import Context - - -def extract_names(operator_chain): - result = [] - for op in operator_chain: - result.append(op.name) - - return result - - -class FusionOptimizer: - def __init__( - self, - modalities, - task: Task, - unimodal_representations_candidates, - representation_cache: RepresentationCache, - num_best_candidates=4, - max_chain_depth=5, - debug=False, - ): - self.modalities = modalities - self.task = task - self.unimodal_representations_candidates = unimodal_representations_candidates - self.num_best_candidates = num_best_candidates - self.k_best_candidates, self.candidates_per_modality = self.get_k_best_results( - num_best_candidates - ) - self.operator_registry = Registry() - self.max_chain_depth = max_chain_depth - self.debug = debug - self.evaluated_candidates = set() - self.cache = representation_cache - self.optimization_statistics = OptimizationStatistics(self.k_best_candidates) - self.optimization_results = [] - - def optimize(self): - """ - This method finds different ways in how to combine modalities and evaluates the fused representations against - the given task. It can fuse different representations from the same modality as well as fuse representations - form different modalities. - """ - - # TODO: add an aligned representation for all modalities with a temporal dimension - # TODO: keep a map of operator chains so that we don't evaluate them multiple times in different orders (if it does not make a difference) - - r = [] - - for candidate in self.k_best_candidates: - modality = self.candidates_per_modality[str(candidate)] - cached_representation, representation_ops, used_op_names = ( - self.cache.load_from_cache(modality, candidate.operator_chain) - ) - if cached_representation is not None: - modality = cached_representation - store = False - for representation in representation_ops: - if isinstance(representation, Context): - modality = modality.context(representation) - elif representation.name == "RowWiseConcatenation": - modality = modality.flatten(True) - else: - modality = modality.apply_representation(representation) - store = True - if store: - self.cache.save_to_cache(modality, used_op_names, representation_ops) - - remaining_candidates = [c for c in self.k_best_candidates if c != candidate] - r.append( - self._optimize_candidate(modality, candidate, remaining_candidates, 1) - ) - - if self.debug: - with open( - f"fusion_statistics_{self.task.model.name}_{self.num_best_candidates}_{self.max_chain_depth}.pkl", - "wb", - ) as fp: - pickle.dump( - self.optimization_statistics, - fp, - protocol=pickle.HIGHEST_PROTOCOL, - ) - - opt_results = copy.deepcopy(self.optimization_results) - for i, opt_res in enumerate(self.optimization_results): - op_name = [] - for op in opt_res.operator_chain: - if isinstance(op, list): - for o in op: - if isinstance(o, list): - for j in o: - op_name.append(j.name) - elif isinstance(o, str): - op_name.append(o) - else: - op_name.append(o.name) - elif isinstance(op, str): - op_name.append(op) - else: - op_name.append(op.name) - opt_results[i].operator_chain = op_name - with open( - f"fusion_results_{self.task.model.name}_{self.num_best_candidates}_{self.max_chain_depth}.pkl", - "wb", - ) as fp: - pickle.dump(opt_results, fp, protocol=pickle.HIGHEST_PROTOCOL) - - self.optimization_statistics.print_statistics() - - def get_k_best_results(self, k: int): - """ - Get the k best results per modality - :param k: number of best results - """ - best_results = [] - candidate_for_modality = {} - for modality in self.modalities: - k_results = sorted( - self.unimodal_representations_candidates[modality.modality_id][ - self.task.model.name - ], - key=lambda x: x.test_accuracy, - reverse=True, - )[:k] - for k_result in k_results: - candidate_for_modality[str(k_result)] = modality - best_results.extend(k_results) - - return best_results, candidate_for_modality - - def _optimize_candidate( - self, modality, candidate, remaining_candidates, chain_depth - ): - """ - Optimize a single candidate by fusing it with others recursively. - - :param candidate: The current candidate representation. - :param chain_depth: The current depth of fusion chains. - """ - if chain_depth > self.max_chain_depth: - return - - for other_candidate in remaining_candidates: - other_modality = self.candidates_per_modality[str(other_candidate)] - cached_representation, representation_ops, used_op_names = ( - self.cache.load_from_cache( - other_modality, other_candidate.operator_chain - ) - ) - if cached_representation is not None: - other_modality = cached_representation - store = False - for representation in representation_ops: - if representation.name == "Aggregation": - params = other_candidate.parameters[representation.name] - representation = Aggregation( - aggregation_function=params["aggregation"] - ) - if isinstance(representation, Context): - other_modality = other_modality.context(representation) - elif isinstance(representation, Aggregation): - other_modality = representation.execute(other_modality) - elif representation.name == "RowWiseConcatenation": - other_modality = other_modality.flatten(True) - else: - other_modality = other_modality.apply_representation(representation) - store = True - if store: - self.cache.save_to_cache( - other_modality, used_op_names, representation_ops - ) - - fusion_results = self.operator_registry.get_fusion_operators() - fusion_representation = None - for fusion_operator in fusion_results: - fusion_operator = fusion_operator() - chain_key = self.create_identifier( - candidate, fusion_operator, other_candidate - ) - # print(fusion_operator.name) - representation_start = time.time() - if ( - isinstance(fusion_operator, Context) - and fusion_representation is not None - ): - fusion_representation.context(fusion_operator) - elif isinstance(fusion_operator, Context): - continue - else: - fused_representation = modality.combine( - other_modality, fusion_operator - ) - - representation_end = time.time() - if chain_key not in self.evaluated_candidates: - # Evaluate the fused representation - - score = self.task.run(fused_representation.data) - fusion_params = {fusion_operator.name: fusion_operator.parameters} - result = OptimizationResult( - operator_chain=[ - candidate.operator_chain, - fusion_operator.name, - other_candidate.operator_chain, - ], - parameters=[ - candidate.parameters, - fusion_params, - other_candidate.parameters, - ], - train_accuracy=score[0], - test_accuracy=score[1], - # train_min_it_acc=score[2], - # test_min_it_acc=score[3], - training_runtime=self.task.training_time, - inference_runtime=self.task.inference_time, - representation_time=representation_end - representation_start, - output_shape=(1, 1), # TODO - ) - - # Store the result - self.optimization_results.append(result) - self.optimization_statistics.add_entry( - [ - candidate.operator_chain, - [fusion_operator.name], - other_candidate.operator_chain, - ], - score[1], - ) - - # Mark this chain as evaluated - self.evaluated_candidates.add(chain_key) - - if self.debug: - print( - f"Evaluated chain: {candidate.operator_chain} + {fusion_operator.name} + {other_candidate.operator_chain} -> {score[1]}" - ) - - # Recursively optimize further with this fused representation - self._optimize_candidate( - fused_representation, - result, - [c for c in remaining_candidates if c != other_candidate], - chain_depth + 1, - ) - - def create_identifier(self, candidate, fusion, other_candidate): - identifier = "".join(flatten_and_join(candidate.operator_chain)) - identifier += fusion.name - identifier += "".join(flatten_and_join(other_candidate.operator_chain)) - - return identifier - - -def flatten_and_join(data): - flat_list = [] - for item in data: - if isinstance(item, list): - flat_list.extend(flatten_and_join(item)) - else: - flat_list.append(item.name if not isinstance(item, str) else item) - return flat_list diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py index 04a3fa4701a..8902bb7d011 100644 --- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py +++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py @@ -18,89 +18,276 @@ # under the License. # # ------------------------------------------------------------- -import itertools +from typing import Dict, List, Tuple, Any, Optional +import numpy as np +from sklearn.model_selection import ParameterGrid +import json +import logging +from dataclasses import dataclass import time +import copy + +from systemds.scuro.modality.modality import Modality +from systemds.scuro.drsearch.task import Task -import numpy as np -from systemds.scuro.drsearch.optimization_data import OptimizationResult -from systemds.scuro.representations.context import Context +@dataclass +class HyperparamResult: + + representation_name: str + best_params: Dict[str, Any] + best_score: float + all_results: List[Tuple[Dict[str, Any], float]] + tuning_time: float + modality_id: int class HyperparameterTuner: - def __init__(self, task, n_trials=10, early_stopping_patience=5): - self.task = task - self.n_trials = n_trials - self.early_stopping_patience = early_stopping_patience - - def tune_operator_chain(self, modality, operator_chain): - best_result = None - best_score = -np.inf - - param_grids = {} - - for operator in operator_chain: - param_grids[operator.name] = operator.parameters - - param_combinations = self._generate_search_space(param_grids) - - for params in param_combinations: - modified_modality = modality - current_chain = [] - - representation_start = time.time() - try: - for operator in operator_chain: - - if operator.name in params: - operator.set_parameters(params[operator.name]) - - if isinstance(operator, Context): - modified_modality = modified_modality.context(operator) - else: - modified_modality = modified_modality.apply_representation( - operator - ) - - current_chain.append(operator) - - representation_end = time.time() - - score = self.task.run(modified_modality.data) - - if score[1] > best_score: - best_score = score[1] - best_params = params - best_result = OptimizationResult( - operator_chain=current_chain, - parameters=params, - train_accuracy=score[0], - test_accuracy=score[1], - training_runtime=self.task.training_time, - inference_runtime=self.task.inference_time, - representation_time=representation_end - representation_start, - output_shape=(1, 1), + + def __init__( + self, + modalities, + tasks, + optimization_results, + k: int = 2, + n_jobs: int = -1, + scoring_metric: str = "accuracy", + maximize_metric: bool = True, + save_results: bool = False, + debug: bool = False, + ): + self.tasks = tasks + self.optimization_results = optimization_results + self.n_jobs = n_jobs + self.scoring_metric = scoring_metric + self.maximize_metric = maximize_metric + self.save_results = save_results + self.results = {} + self.k = k + self.modalities = modalities + self.representations = None + self.k_best_cache = None + self.k_best_representations = None + self.extract_k_best_modalities_per_task() + self.debug = debug + self.logger = logging.getLogger(__name__) + if debug: + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) + + def get_modalities_by_id(self, modality_ids: List[int]) -> Modality: + modalities = [] + for mod in self.modalities: + if mod.modality_id in modality_ids: + modalities.append(mod) + return modalities + + def get_modality_by_id_and_instance_id(self, modality_id, instance_id): + counter = 0 + for modality in self.modalities: + if modality.modality_id == modality_id: + if counter == instance_id or instance_id == -1: + return modality + else: + counter += 1 + return None + + def extract_k_best_modalities_per_task(self): + self.k_best_representations = {} + self.k_best_cache = {} + representations = {} + for task in self.tasks: + self.k_best_representations[task.model.name] = [] + self.k_best_cache[task.model.name] = [] + representations[task.model.name] = {} + for modality in self.modalities: + k_best_results, cached_data = ( + self.optimization_results.get_k_best_results(modality, self.k, task) + ) + representations[task.model.name][modality.modality_id] = k_best_results + self.k_best_representations[task.model.name].extend(k_best_results) + self.k_best_cache[task.model.name].extend(cached_data) + self.representations = representations + + def tune_unimodal_representations(self, max_eval_per_rep: Optional[int] = None): + results = {} + for task in self.tasks: + results[task.model.name] = [] + for representation in self.k_best_representations[task.model.name]: + result = self.tune_dag_representation( + representation.dag, + representation.dag.root_node_id, + task, + max_eval_per_rep, + ) + results[task.model.name].append(result) + + self.results = results + + if self.save_results: + self.save_tuning_results() + + return results + + def tune_dag_representation(self, dag, root_node_id, task, max_evals=None): + hyperparams = {} + reps = [] + modality_ids = [] + node_order = [] + + visited = set() + + def visit_node(node_id): + if node_id in visited: + return + node = dag.get_node_by_id(node_id) + for input_id in node.inputs: + visit_node(input_id) + visited.add(node_id) + if node.operation is not None: + if node.parameters: + hyperparams.update(node.parameters) + reps.append(node.operation) + node_order.append(node_id) + if node.modality_id is not None: + modality_ids.append(node.modality_id) + + visit_node(root_node_id) + + if not hyperparams: + return None + + start_time = time.time() + rep_name = "_".join([rep.__name__ for rep in reps]) + + param_grid = list(ParameterGrid(hyperparams)) + if max_evals and len(param_grid) > max_evals: + np.random.shuffle(param_grid) + param_grid = param_grid[:max_evals] + + all_results = [] + for params in param_grid: + result = self.evaluate_dag_config( + dag, params, node_order, modality_ids, task + ) + all_results.append(result) + + if self.maximize_metric: + best_params, best_score = max(all_results, key=lambda x: x[1]) + else: + best_params, best_score = min(all_results, key=lambda x: x[1]) + + tuning_time = time.time() - start_time + + return HyperparamResult( + representation_name=rep_name, + best_params=best_params, + best_score=best_score, + all_results=all_results, + tuning_time=tuning_time, + modality_id=modality_ids[0] if modality_ids else None, + ) + + def evaluate_dag_config(self, dag, params, node_order, modality_ids, task): + try: + dag_copy = copy.deepcopy(dag) + + for node_id in node_order: + node = dag_copy.get_node_by_id(node_id) + if node.operation is not None and node.parameters: + node_params = { + k: v for k, v in params.items() if k in node.parameters + } + node.parameters = node_params + + modalities = self.get_modalities_by_id(modality_ids) + modified_modality = dag_copy.execute(modalities, task) + score = task.run( + modified_modality[list(modified_modality.keys())[-1]].data + )[1] + + return params, score + except Exception as e: + self.logger.error(f"Error evaluating DAG with params {params}: {e}") + return params, float("-inf") if self.maximize_metric else float("inf") + + def tune_multimodal_representations( + self, + optimization_results, + k: int = 1, + optimize_unimodal: bool = True, + max_eval_per_rep: Optional[int] = None, + ): + results = {} + for task in self.tasks: + best_results = sorted( + optimization_results[task.model.name], + key=lambda x: x.val_score, + reverse=True, + )[:k] + results[task.model.name] = [] + best_optimization_results = best_results + + for representation in best_optimization_results: + if optimize_unimodal: + dag = copy.deepcopy(representation.dag) + index = 0 + for i, node in enumerate(representation.dag.nodes): + if not node.inputs: + leaf_node_id = node.node_id + leaf_nodes = self.representations[task.model.name][ + node.modality_id + ][node.representation_index].dag.nodes + for leaf_idx, node in enumerate(dag.nodes): + if node.node_id == leaf_node_id: + dag.nodes[leaf_idx : leaf_idx + 1] = leaf_nodes + index = leaf_idx + len(leaf_nodes) - 1 + break + + for node in dag.nodes: + try: + idx = node.inputs.index(leaf_node_id) + node.inputs[idx] = dag.nodes[index].node_id + break + except ValueError: + continue + + result = self.tune_dag_representation( + dag, dag.root_node_id, task, max_eval_per_rep ) + else: + result = self.tune_dag_representation( + representation.dag, + representation.dag.root_node_id, + task, + max_eval_per_rep, + ) + results[task.model.name].append(result) + + self.results = results - except Exception as e: - print(f"Failed parameter combination {params}: {str(e)}") - continue + if self.save_results: + self.save_tuning_results() - return best_result + return results - def _generate_search_space(self, param_grids): - combinations = {} - for operator_name, params in param_grids.items(): - operator_combinations = [ - dict(zip(params.keys(), v)) for v in itertools.product(*params.values()) - ] - combinations[operator_name] = operator_combinations + def save_tuning_results(self, filepath: str = None): + if not filepath: + filepath = f"hyperparameter_results_{int(time.time())}.json" - keys = list(combinations.keys()) - values = [combinations[key] for key in keys] + json_results = {} + for task in self.results.keys(): + for result in self.results[task]: + json_results[result.representation_name] = { + "best_params": result.best_params, + "best_score": result.best_score, + "tuning_time": result.tuning_time, + "num_evaluations": len(result.all_results), + } - parameter_grid = [ - dict(zip(keys, combo)) for combo in itertools.product(*values) - ] + with open(filepath, "w") as f: + json.dump(json_results, f, indent=2) - return parameter_grid + if self.debug: + self.logger.info(f"Results saved to {filepath}") diff --git a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py index ac4365ed5c6..91d569bc598 100644 --- a/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py +++ b/src/main/python/systemds/scuro/drsearch/multimodal_optimizer.py @@ -18,368 +18,276 @@ # under the License. # # ------------------------------------------------------------- -import itertools + +import itertools +import pickle +import time +from dataclasses import dataclass +from typing import List, Dict, Any, Generator +import copy +import traceback +from itertools import chain +from systemds.scuro.drsearch.task import Task +from systemds.scuro.modality.type import ModalityType +from systemds.scuro.drsearch.representation_dag import ( + RepresentationDag, + RepresentationDAGBuilder, +) from systemds.scuro.representations.aggregated_representation import ( AggregatedRepresentation, ) - from systemds.scuro.representations.aggregate import Aggregation - from systemds.scuro.drsearch.operator_registry import Registry - from systemds.scuro.utils.schema_helpers import get_shape -import dataclasses class MultimodalOptimizer: def __init__( - self, modalities, unimodal_optimization_results, tasks, k=2, debug=True + self, + modalities: List[Any], + unimodal_optimization_results: Any, + tasks: List[Any], + k: int = 2, + debug: bool = True, + min_modalities: int = 2, + max_modalities: int = None, ): - self.k_best_cache = None - self.k_best_modalities = None self.modalities = modalities - self.unimodal_optimization_results = unimodal_optimization_results self.tasks = tasks self.k = k - self.extract_k_best_modalities_per_task() self.debug = debug + self.min_modalities = max(2, min_modalities) + self.max_modalities = max_modalities or len(modalities) self.operator_registry = Registry() - self.optimization_results = MultimodalResults( - modalities, tasks, debug, self.k_best_modalities + self.fusion_operators = self.operator_registry.get_fusion_operators() + + self.k_best_representations = self._extract_k_best_representations( + unimodal_optimization_results ) - self.cache = {} + self.optimization_results = [] - def optimize(self): - for task in self.tasks: - self.optimize_intermodal_representations(task) + def _extract_k_best_representations( + self, unimodal_optimization_results: Any + ) -> Dict[str, Dict[str, List[Any]]]: + k_best = {} - def optimize_intramodal_representations(self, task): - for modality in self.modalities: - representations = self.k_best_modalities[task.model.name][ - modality.modality_id - ] - applied_representations = self.extract_representations( - representations, modality, task.model.name - ) + for task in self.tasks: + k_best[task.model.name] = {} - for i in range(1, len(applied_representations)): - for fusion_method in self.operator_registry.get_fusion_operators(): - if fusion_method().needs_alignment and not applied_representations[ - i - 1 - ].is_aligned(applied_representations[i]): - continue - combined = applied_representations[i - 1].combine( - applied_representations[i], fusion_method() - ) - self.evaluate( - task, - combined, - [i - 1, i], - fusion_method, - [ - applied_representations[i - 1].modality_id, - applied_representations[i].modality_id, - ], + for modality in self.modalities: + k_best_results, cached_data = ( + unimodal_optimization_results.get_k_best_results( + modality, self.k, task ) - if not fusion_method().commutative: - combined_comm = applied_representations[i].combine( - applied_representations[i - 1], fusion_method() - ) - self.evaluate( - task, - combined_comm, - [i, i - 1], - fusion_method, - [ - applied_representations[i - 1].modality_id, - applied_representations[i].modality_id, - ], - ) - - # TODO: check if order matters for reused reps - only compute once - check in cache - # TODO: parallelize - whenever an item of len 0 comes along give it to a new thread - merge results - # TODO: change the algorithm so that one representation is used until there is no more representations to add - saves a lot of memory - def optimize_intermodal_representations(self, task): - modality_combos = [] - n = len(self.k_best_cache[task.model.name]) - reuse_cache = {} - - def generate_extensions(current_combo, remaining_indices): - # Add current combination if it has at least 2 elements - if len(current_combo) >= 2: - combo_tuple = tuple(i for i in current_combo) - modality_combos.append(combo_tuple) - - for i in remaining_indices: - new_combo = current_combo + [i] - new_remaining = [j for j in remaining_indices if j > i] - generate_extensions(new_combo, new_remaining) - - for start_idx in range(n): - remaining = list(range(start_idx + 1, n)) - generate_extensions([start_idx], remaining) - fusion_methods = self.operator_registry.get_fusion_operators() - fused_representations = [] - reuse_fused_representations = False - for i, modality_combo in enumerate(modality_combos): - # clear reuse cache - reuse_cache = self.prune_cache(modality_combos[i:], reuse_cache) - - if i != 0: - reuse_fused_representations = self.is_prefix_match( - modality_combos[i - 1], modality_combo - ) - if reuse_fused_representations: - mods = [ - self.k_best_cache[task.model.name][mod_idx] - for mod_idx in modality_combo[len(modality_combos[i - 1]) :] - ] - fused_representations = reuse_cache[modality_combos[i - 1]] - else: - prefix_idx = self.compute_equal_prefix_index( - modality_combos[i - 1], modality_combo - ) - if prefix_idx > 1: - fused_representations = reuse_cache[ - modality_combos[i - 1][:prefix_idx] - ] - reuse_fused_representations = True - mods = [ - self.k_best_cache[task.model.name][mod_idx] - for mod_idx in modality_combo[prefix_idx:] - ] - if self.debug: - print( - f"New modality combo: {modality_combo} - Reuse: {reuse_fused_representations} - # fused reps: {len(fused_representations)}" ) - all_mods = [ - self.k_best_cache[task.model.name][mod_idx] - for mod_idx in modality_combo - ] - temp_fused_reps = [] - for j, fusion_method in enumerate(fusion_methods): - # Evaluate all mods - fused_rep = all_mods[0].combine(all_mods[1:], fusion_method()) - temp_fused_reps.append(fused_rep) - self.evaluate( - task, - fused_rep, - [ - self.k_best_modalities[task.model.name][k].representations - for k in modality_combo - ], - fusion_method, - modality_combo, - ) - if reuse_fused_representations: - for fused_representation in fused_representations: - fused_rep = fused_representation.combine(mods, fusion_method()) - temp_fused_reps.append(fused_rep) - self.evaluate( - task, - fused_rep, - [ - self.k_best_modalities[task.model.name][ - k - ].representations - for k in modality_combo - ], - fusion_method, - modality_combo, - ) + k_best[task.model.name][modality.modality_id] = cached_data - if ( - len(modality_combo) < len(self.k_best_cache[task.model.name]) - and i + 1 < len(modality_combos) - and self.is_prefix_match(modality_combos[i], modality_combos[i + 1]) - ): - reuse_cache[modality_combo] = temp_fused_reps - reuse_fused_representations = False - - def prune_cache(self, sequences, cache): - seqs_as_tuples = [tuple(seq) for seq in sequences] - - def still_used(key): - return any(self.is_prefix_match(key, seq) for seq in seqs_as_tuples) - - cache = {key: value for key, value in cache.items() if still_used(key)} - return cache - - def is_prefix_match(self, seq1, seq2): - if len(seq1) > len(seq2): - return False - - # Check if seq1 matches the beginning of seq2 - return seq2[: len(seq1)] == seq1 - - def compute_equal_prefix_index(self, seq1, seq2): - max_len = min(len(seq1), len(seq2)) - i = 0 - while i < max_len and seq1[i] == seq2[i]: - i += 1 - - return i - - def extract_representations(self, representations, modality, task_name): - applied_representations = [] - for i in range(0, len(representations)): - cache_key = ( - tuple(representations[i].representations), - representations[i].task_time, - representations[i].representation_time, + return k_best + + def _generate_modality_combinations(self) -> Generator[List[str], None, None]: + modality_ids = [mod.modality_id for mod in self.modalities] + + for r in range( + self.min_modalities, min(self.max_modalities + 1, len(modality_ids) + 1) + ): + for modality_subset in itertools.combinations(modality_ids, r): + yield list(modality_subset) + + def _generate_representation_combinations( + self, modality_subset: List[str], task_name: str + ) -> Generator[Dict[str, int], None, None]: + representation_options = [] + + for modality_id in modality_subset: + num_representations = len( + self.k_best_representations[task_name][modality_id] ) - if ( - cache_key - in self.unimodal_optimization_results.cache[modality.modality_id][ - task_name - ] - ): - applied_representations.append( - self.unimodal_optimization_results.cache[modality.modality_id][ - task_name - ][cache_key] + representation_options.append(list(range(num_representations))) + + for combo in itertools.product(*representation_options): + yield { + modality_id: repr_idx + for modality_id, repr_idx in zip(modality_subset, combo) + } + + def _generate_fusion_dags( + self, modality_subset: List[str], representation_combo: Dict[str, int] + ) -> Generator[RepresentationDag, None, None]: + leaf_infos = [(m, representation_combo[m]) for m in modality_subset] + + def gen_trees(indices: List[int]): + if len(indices) == 1: + yield indices[0] + return + for split in range(1, len(indices)): + for left_idxs in itertools.combinations(indices, split): + left = list(left_idxs) + right = [i for i in indices if i not in left] + for l_tree in gen_trees(left): + for r_tree in gen_trees(right): + yield (l_tree, r_tree) + + def build_variants( + subtree, base_builder: RepresentationDAGBuilder, leaf_id_map + ): + variants = [] + + if isinstance(subtree, int): + variants.append((base_builder, leaf_id_map[subtree])) + return variants + + left_sub, right_sub = subtree + + left_variants = build_variants( + left_sub, copy.deepcopy(base_builder), leaf_id_map + ) + + for left_builder, left_root in left_variants: + right_variants = build_variants( + right_sub, copy.deepcopy(left_builder), leaf_id_map ) - else: - applied_representation = modality - for j, rep in enumerate(representations[i].representations): - representation, is_context = ( - self.operator_registry.get_representation_by_name( - rep, modality.modality_type + + for right_builder, right_root in right_variants: + for fusion_op_class in self.fusion_operators: + new_builder = copy.deepcopy(right_builder) + fusion_op = fusion_op_class() + fusion_id = new_builder.create_operation_node( + fusion_op.__class__, + [left_root, right_root], + fusion_op.parameters, ) + variants.append((new_builder, fusion_id)) + + return variants + + n = len(leaf_infos) + + for permuted_leaf_infos in itertools.permutations(leaf_infos, n): + base_builder = RepresentationDAGBuilder() + leaf_id_map = {} + for idx, (modality_id, repr_idx) in enumerate(permuted_leaf_infos): + nodeid = base_builder.create_leaf_node(modality_id, repr_idx) + leaf_id_map[idx] = nodeid + + indices = list(range(n)) + + for tree in gen_trees(indices): + variants = build_variants(tree, base_builder, leaf_id_map) + for builder_variant, root_id in variants: + try: + yield builder_variant.build(root_id) + except ValueError: + if self.debug: + print(f"Skipping invalid DAG for root {root_id}") + continue + + def _evaluate_dag(self, dag: RepresentationDag, task: Task) -> "OptimizationResult": + start_time = time.time() + + try: + fused_representation = dag.execute( + list( + chain.from_iterable( + self.k_best_representations[task.model.name].values() ) - if representation is None: - if rep == AggregatedRepresentation.__name__: - representation = AggregatedRepresentation(Aggregation()) - else: - representation = representation() - representation.set_parameters(representations[i].params[j]) - if is_context: - applied_representation = applied_representation.context( - representation - ) - else: - applied_representation = ( - applied_representation.apply_representation(representation) - ) - self.k_best_cache[task_name].append(applied_representation) - applied_representations.append(applied_representation) - return applied_representations - - def evaluate(self, task, modality, representations, fusion, modality_combo): - if task.expected_dim == 1 and get_shape(modality.metadata) > 1: - for aggregation in Aggregation().get_aggregation_functions(): - agg_operator = AggregatedRepresentation(Aggregation(aggregation, False)) - agg_modality = agg_operator.transform(modality) - - scores = task.run(agg_modality.data) - reps = representations.copy() - reps.append(agg_operator) - - self.optimization_results.add_result( - scores, - reps, - modality.transformation, - modality_combo, - task.model.name, - ) - else: - scores = task.run(modality.data) - self.optimization_results.add_result( - scores, - representations, - modality.transformation, - modality_combo, - task.model.name, + ), + task, + ) + + if fused_representation is None: + return None + + final_representation = fused_representation[ + list(fused_representation.keys())[-1] + ] + if task.expected_dim == 1 and get_shape(final_representation.metadata) > 1: + agg_operator = AggregatedRepresentation(Aggregation()) + final_representation = agg_operator.transform(final_representation) + + eval_start = time.time() + scores = task.run(final_representation.data) + eval_time = time.time() - eval_start + + total_time = time.time() - start_time + + return OptimizationResult( + dag=dag, + train_score=scores[0], + val_score=scores[1], + runtime=total_time, + task_name=task.model.name, + evaluation_time=eval_time, ) - def add_to_cache(self, result_idx, combined_modality): - self.cache[result_idx] = combined_modality + except Exception as e: + print(f"Error evaluating DAG: {e}") + traceback.print_exc() + return None + + def _get_modality_by_id_and_instance_id(self, modalities, modality_id, instance_id): + counter = 0 + for modality in modalities: + if modality.modality_id == modality_id: + if counter == instance_id or instance_id == -1: + return modality + else: + counter += 1 + return None + + def optimize( + self, max_combinations: int = None + ) -> Dict[str, List["OptimizationResult"]]: + all_results = {} - def extract_k_best_modalities_per_task(self): - self.k_best_modalities = {} - self.k_best_cache = {} for task in self.tasks: - self.k_best_modalities[task.model.name] = [] - self.k_best_cache[task.model.name] = [] - for modality in self.modalities: - k_best_results, cached_data = ( - self.unimodal_optimization_results.get_k_best_results( - modality, self.k, task - ) - ) + if self.debug: + print(f"Optimizing multimodal fusion for task: {task.model.name}") + all_results[task.model.name] = [] + evaluated_count = 0 - self.k_best_modalities[task.model.name].extend(k_best_results) - self.k_best_cache[task.model.name].extend(cached_data) + for modality_subset in self._generate_modality_combinations(): + if self.debug: + print(f" Evaluating modality subset: {modality_subset}") + for repr_combo in self._generate_representation_combinations( + modality_subset, task.model.name + ): -class MultimodalResults: - def __init__(self, modalities, tasks, debug, k_best_modalities): - self.modality_ids = [modality.modality_id for modality in modalities] - self.task_names = [task.model.name for task in tasks] - self.results = {} - self.debug = debug - self.k_best_modalities = k_best_modalities + for dag in self._generate_fusion_dags(modality_subset, repr_combo): + if max_combinations and evaluated_count >= max_combinations: + break - for task in tasks: - self.results[task.model.name] = {} + result = self._evaluate_dag(dag, task) + if result is not None: + all_results[task.model.name].append(result) - def add_result( - self, scores, best_representation_idx, fusion_methods, modality_combo, task_name - ): + evaluated_count += 1 - entry = MultimodalResultEntry( - representations=best_representation_idx, - train_score=scores[0], - val_score=scores[1], - fusion_methods=[ - fusion_method.__class__.__name__ for fusion_method in fusion_methods - ], - modality_combo=modality_combo, - task=task_name, - ) + if self.debug and evaluated_count % 100 == 0: + print(f" Evaluated {evaluated_count} combinations...") - modality_id_strings = "_".join(list(map(str, modality_combo))) - if not modality_id_strings in self.results[task_name]: - self.results[task_name][modality_id_strings] = [] + if max_combinations and evaluated_count >= max_combinations: + break - self.results[task_name][modality_id_strings].append(entry) + if max_combinations and evaluated_count >= max_combinations: + break - if self.debug: - print(f"{modality_id_strings}_{task_name}: {entry}") - - def print_results(self): - for task_name in self.task_names: - for modality in self.results[task_name].keys(): - for entry in self.results[task_name][modality]: - reps = [] - for i, mod_idx in enumerate(entry.modality_combo): - reps.append(self.k_best_modalities[task_name][mod_idx]) - - print( - f"{modality}_{task_name}: " - f"Validation score: {entry.val_score} - Training score: {entry.train_score}" - ) - for i, rep in enumerate(reps): - print( - f" Representation: {entry.modality_combo[i]} - {rep.representations}" - ) + if self.debug: + print( + f" Task completed: {len(all_results[task.model.name])} valid combinations evaluated" + ) - print(f" Fusion: {entry.fusion_methods[0]} ") + self.optimization_results = all_results - def store_results(self, file_name=None): - for task_name in self.task_names: - for modality in self.results[task_name].keys(): - for entry in self.results[task_name][modality]: - reps = [] - for i, mod_idx in enumerate(entry.modality_combo): - reps.append(self.k_best_modalities[task_name][mod_idx]) - entry.representations = reps + if self.debug: + print(f"\nOptimization completed") - import pickle + return all_results + def store_results(self, file_name=None): if file_name is None: import time @@ -387,14 +295,14 @@ def store_results(self, file_name=None): file_name = "multimodal_optimizer" + timestr + ".pkl" with open(file_name, "wb") as f: - pickle.dump(self.results, f) + pickle.dump(self.optimization_results, f) -@dataclasses.dataclass -class MultimodalResultEntry: - val_score: float - modality_combo: list - representations: list - fusion_methods: list +@dataclass +class OptimizationResult: + dag: RepresentationDag train_score: float - task: str + val_score: float + runtime: float + task_name: str + evaluation_time: float = 0.0 diff --git a/src/main/python/systemds/scuro/drsearch/operator_registry.py b/src/main/python/systemds/scuro/drsearch/operator_registry.py index 699dcad8571..9bc90720f8e 100644 --- a/src/main/python/systemds/scuro/drsearch/operator_registry.py +++ b/src/main/python/systemds/scuro/drsearch/operator_registry.py @@ -77,6 +77,12 @@ def get_context_operators(self): def get_fusion_operators(self): return self._fusion_operators + def get_fusion_operator_by_name(self, fusion_name): + for fusion in self._fusion_operators: + if fusion.__name__ == fusion_name: + return fusion + return None + def get_representation_by_name(self, representation_name, modality_type): for representation in self._context_operators: if representation.__name__ == representation_name: diff --git a/src/main/python/systemds/scuro/drsearch/representation_cache.py b/src/main/python/systemds/scuro/drsearch/representation_cache.py deleted file mode 100644 index 4df478272df..00000000000 --- a/src/main/python/systemds/scuro/drsearch/representation_cache.py +++ /dev/null @@ -1,128 +0,0 @@ -# ------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# ------------------------------------------------------------- -import copy -import os -import pickle -import tempfile - -from systemds.scuro.modality.transformed import TransformedModality - - -class RepresentationCache: - """ """ - - _instance = None - _cache_dir = None - debug = False - - def __new__(cls, debug=False): - if not cls._instance: - cls.debug = debug - cls._instance = super().__new__(cls) - cls._cache_dir = tempfile.TemporaryDirectory() - # cls._cache_dir = "representation_cache" - return cls._instance - - def _generate_cache_filename(self, modality_id, operators): - """ - Generate a unique filename for an operator based on its name. - - :param operator_name: The name of the operator. - :return: A full path to the cache file. - """ - op_names = [] - filename = modality_id - for operator in operators: - if isinstance(operator, str): - op_names.append(operator) - filename += operator - else: - op_names.append(operator.name) - filename += operator.name - - return os.path.join(self._cache_dir.name, filename), op_names # _cache_dir.name - - def save_to_cache(self, modality, used_op_names, operators): - """ - Save data to a cache file. - - :param operator_name: The name of the operator. - :param data: The data to save. - """ - filename, op_names = self._generate_cache_filename( - str(modality.modality_id) + used_op_names, operators - ) - if not os.path.exists(filename): - with open(f"{filename}.pkl", "wb") as f: - pickle.dump(modality.data, f) - - with open(f"{filename}.meta", "wb") as f: - pickle.dump(modality.metadata, f) - - if self.debug: - str_names = ", ".join(op_names) - print( - f"Saved data for operator {str(modality.modality_id)}{used_op_names}{str_names} to cache: {filename}" - ) - - def load_from_cache(self, modality, operators): - """ - Load data from a cache file if it exists. - - :param operator_name: The name of the operator. - :return: The cached data or None if not found. - """ - ops = copy.deepcopy(operators) - filename, op_names = self._generate_cache_filename( - str(modality.modality_id), ops - ) - dropped_ops = [] - while not os.path.exists(f"{filename}.pkl"): - op_names.pop() - dropped_ops.append(ops.pop()) - if len(ops) < 1: - break - filename, op_names = self._generate_cache_filename( - str(modality.modality_id), ops - ) - - dropped_ops.reverse() - op_names = "".join(op_names) - - if os.path.exists(f"{filename}.pkl"): - with open(f"{filename}.meta", "rb") as f: - metadata = pickle.load(f) - - transformed_modality = TransformedModality( - modality, - op_names, - ) - data = None - with open(f"{filename}.pkl", "rb") as f: - if self.debug: - print( - f"Loaded cached data for operator '{str(modality.modality_id) + op_names}' from {filename}" - ) - data = pickle.load(f) - transformed_modality.data = data - return transformed_modality, dropped_ops, op_names - - return None, dropped_ops, op_names diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag.py b/src/main/python/systemds/scuro/drsearch/representation_dag.py new file mode 100644 index 00000000000..1d5f512eb83 --- /dev/null +++ b/src/main/python/systemds/scuro/drsearch/representation_dag.py @@ -0,0 +1,232 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- +import copy +from dataclasses import dataclass, field +from typing import List, Dict, Any +from systemds.scuro.modality.modality import Modality +from systemds.scuro.modality.transformed import TransformedModality +from systemds.scuro.representations.representation import ( + Representation as UnimodalRepresentation, +) +from systemds.scuro.representations.aggregated_representation import ( + AggregatedRepresentation, +) +from systemds.scuro.representations.context import Context +from systemds.scuro.utils.identifier import get_op_id, get_node_id + + +@dataclass +class RepresentationNode: + node_id: str + operation: Any + inputs: List[str] + modality_id: str = None + representation_index: int = None + parameters: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class RepresentationDag: + + def __init__(self, nodes: List[Any], root_node_id): + self.root_node_id = root_node_id + self.nodes = self.filter_connected_nodes(nodes) + + def filter_connected_nodes(self, nodes): + node_map = {node.node_id: node for node in nodes} + + if self.root_node_id not in node_map: + return [] + + visited = set() + stack = [self.root_node_id] + + while stack: + current_id = stack.pop() + if current_id not in visited: + visited.add(current_id) + + current_node = node_map[current_id] + for input_id in current_node.inputs: + if input_id in node_map and input_id not in visited: + stack.append(input_id) + + return [node for node in nodes if node.node_id in visited] + + def get_leaf_nodes(self) -> List[str]: + leaf_nodes = [] + for node in self.nodes: + if not node.inputs: + leaf_nodes.append(node.node_id) + return leaf_nodes + + def get_node_by_id(self, node_id: str): + for node in self.nodes: + if node.node_id == node_id: + return node + return None + + def get_children(self, node_id: str) -> List[str]: + children = [] + for node in self.nodes: + if node_id in node.inputs: + children.append(node.node_id) + return children + + def validate(self) -> bool: + node_ids = {node.node_id for node in self.nodes} + + if self.root_node_id not in node_ids: + return False + + for node in self.nodes: + for input_id in node.inputs: + if input_id not in node_ids: + return False + + visited = set() + + def has_cycle(node_id: str, path: set) -> bool: + if node_id in path: + return True + if node_id in visited: + return False + path.add(node_id) + visited.add(node_id) + node = self.get_node_by_id(node_id) + for input_id in node.inputs: + if has_cycle(input_id, path.copy()): + return True + return False + + return not has_cycle(self.root_node_id, set()) + + def execute( + self, modalities: List[Modality], task=None + ) -> Dict[str, TransformedModality]: + cache = {} + + def execute_node(node_id: str, task) -> TransformedModality: + if node_id in cache: + return cache[node_id] + + node = self.get_node_by_id(node_id) + + if not node.inputs: + modality = get_modality_by_id_and_instance_id( + modalities, node.modality_id, node.representation_index + ) + cache[node_id] = modality + return modality + + input_mods = [execute_node(input_id, task) for input_id in node.inputs] + + node_operation = node.operation() + if len(input_mods) == 1: + # It's a unimodal operation + if isinstance(node_operation, Context): + result = input_mods[0].context(node_operation) + elif isinstance(node_operation, AggregatedRepresentation): + result = node_operation.transform(input_mods[0]) + elif isinstance(node_operation, UnimodalRepresentation): + if ( + isinstance(input_mods[0], TransformedModality) + and input_mods[0].transformation[0].__class__ == node.operation + ): + # Avoid duplicate transformations + result = input_mods[0] + else: + # Compute the representation + result = input_mods[0].apply_representation(node_operation) + else: + # It's a fusion operation + fusion_op = node_operation + if hasattr(fusion_op, "needs_training") and fusion_op.needs_training: + result = input_mods[0].combine_with_training( + input_mods[1:], fusion_op, task + ) + else: + result = input_mods[0].combine(input_mods[1:], fusion_op) + + cache[node_id] = result + return result + + execute_node(self.root_node_id, task) + + return cache + + +def get_modality_by_id_and_instance_id( + modalities: List[Modality], modality_id: int, instance_id: int +): + counter = 0 + for modality in modalities: + if modality.modality_id == modality_id: + if counter == instance_id or instance_id == -1: + return modality + else: + counter += 1 + return None + + +class RepresentationDAGBuilder: + def __init__(self): + self.nodes = [] + self.node_counter = 0 + + def create_leaf_node( + self, modality_id: str, representation_index: int = -1, operation=None + ) -> str: + if representation_index != -1: + node_id = f"leaf_{modality_id}_{representation_index}" + else: + node_id = f"leaf_{get_node_id()}" + node = RepresentationNode( + node_id=node_id, + inputs=[], + operation=operation, + modality_id=modality_id, + representation_index=representation_index, + ) + self.nodes.append(node) + return node_id + + def create_operation_node( + self, operation: Any, inputs: List[str], parameters: Dict[str, Any] = None + ) -> str: + node_id = f"op_{get_op_id()}" + self.node_counter += 1 + node = RepresentationNode( + node_id=node_id, + inputs=inputs, + operation=operation, + parameters=parameters or {}, + ) + self.nodes.append(node) + return node_id + + def build(self, root_node_id: str) -> RepresentationDag: + dag = RepresentationDag( + nodes=copy.deepcopy(self.nodes), root_node_id=root_node_id + ) + if not dag.validate(): + raise ValueError("Invalid DAG construction") + return dag diff --git a/src/main/python/systemds/scuro/drsearch/representation_dag_visualizer.py b/src/main/python/systemds/scuro/drsearch/representation_dag_visualizer.py new file mode 100644 index 00000000000..b0a1bbe285c --- /dev/null +++ b/src/main/python/systemds/scuro/drsearch/representation_dag_visualizer.py @@ -0,0 +1,55 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +from typing import Dict, Any +from systemds.scuro.drsearch.representation_dag import RepresentationDag + + +def visualize_dag(dag: RepresentationDag) -> Dict[str, Any]: + nodes = [] + edges = [] + + for i, node in enumerate(dag.nodes): + # Create node entry + node_type = "operation" if node.operation else "modality" + label = node.operation if node.operation else f"Modality: {node.modality_id}" + + nodes.append( + { + "id": node.node_id, + "label": label, + "type": node_type, + "parameters": node.parameters, + } + ) + + print(nodes[i]) + + # Create edges + for input_id in node.inputs: + edges.append({"from": input_id, "to": node.node_id}) + + for edge in edges: + print(edge) + + print(f"Root Node ID: {dag.root_node_id}") + + return {"nodes": nodes, "edges": edges, "root": dag.root_node_id} diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py index 86c7ce1e63a..10b127f5b60 100644 --- a/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py +++ b/src/main/python/systemds/scuro/drsearch/unimodal_optimizer.py @@ -10,7 +10,7 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, +# Unless required by applicable law or agreed in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the @@ -20,37 +20,46 @@ # ------------------------------------------------------------- import pickle import time -import copy from concurrent.futures import ProcessPoolExecutor, as_completed -from dataclasses import dataclass, field, asdict - +from dataclasses import dataclass import multiprocessing as mp -from typing import Union +from typing import List, Any +from functools import lru_cache -import numpy as np -from systemds.scuro.representations.window_aggregation import WindowAggregation +from systemds.scuro import ModalityType +from systemds.scuro.representations.fusion import Fusion from systemds.scuro.representations.concatenation import Concatenation from systemds.scuro.representations.hadamard import Hadamard from systemds.scuro.representations.sum import Sum - from systemds.scuro.representations.aggregated_representation import ( AggregatedRepresentation, ) -from systemds.scuro.modality.type import ModalityType from systemds.scuro.modality.modality import Modality -from systemds.scuro.modality.transformed import TransformedModality from systemds.scuro.representations.aggregate import Aggregation from systemds.scuro.drsearch.operator_registry import Registry from systemds.scuro.utils.schema_helpers import get_shape +from systemds.scuro.drsearch.representation_dag import ( + RepresentationDag, + RepresentationNode, + RepresentationDAGBuilder, +) +from systemds.scuro.drsearch.representation_dag_visualizer import visualize_dag class UnimodalOptimizer: def __init__(self, modalities, tasks, debug=True): self.modalities = modalities self.tasks = tasks + self.run = None + + self.builders = { + modality.modality_id: RepresentationDAGBuilder() for modality in modalities + } + + self.debug = debug self.operator_registry = Registry() - self.operator_performance = UnimodalResults(modalities, tasks, debug) + self.operator_performance = UnimodalResults(modalities, tasks, debug, self.run) self._tasks_require_same_dims = True self.expected_dimensions = tasks[0].expected_dim @@ -60,6 +69,22 @@ def __init__(self, modalities, tasks, debug=True): if tasks[i - 1].expected_dim != tasks[i].expected_dim: self._tasks_require_same_dims = False + self._combination_operators = [Concatenation(), Hadamard(), Sum()] + + @lru_cache(maxsize=128) + def _get_modality_operators(self, modality_type): + return self.operator_registry.get_representations(modality_type) + + @lru_cache(maxsize=128) + def _get_not_self_contained_reps(self, modality_type): + return self.operator_registry.get_not_self_contained_representations( + modality_type + ) + + @lru_cache(maxsize=32) + def _get_context_operators(self): + return self.operator_registry.get_context_operators() + def store_results(self, file_name=None): if file_name is None: import time @@ -82,80 +107,59 @@ def optimize_parallel(self, n_workers=None): for future in as_completed(future_to_modality): modality = future_to_modality[future] - # try: results = future.result() self._merge_results(results) - # except Exception as exc: - # print(f'Modality {modality.modality_id} generated an exception: {exc}') def optimize(self): + """Optimize representations for each modality""" for modality in self.modalities: local_result = self._process_modality(modality, False) def _process_modality(self, modality, parallel): if parallel: - local_results = UnimodalResults( - modalities=[modality], tasks=self.tasks, debug=False - ) + local_results = UnimodalResults([modality], self.tasks, debug=False) else: local_results = self.operator_performance - context_operators = self.operator_registry.get_context_operators() - not_self_contained_reps = ( - self.operator_registry.get_not_self_contained_representations( - modality.modality_type - ) - ) - modality_specific_operators = self.operator_registry.get_representations( + modality_specific_operators = self._get_modality_operators( modality.modality_type ) - for modality_specific_operator in modality_specific_operators: - mod_op = modality_specific_operator() - mod = modality.apply_representation(mod_op) - self._evaluate_local(mod, [mod_op], local_results) + for operator in modality_specific_operators: + dags = self._build_modality_dag(modality, operator()) - if not mod_op.self_contained: - self._combine_non_self_contained_representations( - modality, mod, not_self_contained_reps, local_results - ) + for dag in dags: + representations = dag.execute([modality]) + node_id = list(representations.keys())[-1] + node = dag.get_node_by_id(node_id) + if node.operation is None: + continue - for context_operator_after in context_operators: - con_op_after = context_operator_after() - mod_con = mod.context(con_op_after) - self._evaluate_local(mod_con, [mod_op, con_op_after], local_results) - - return local_results - - def _combine_non_self_contained_representations( - self, - modality: Modality, - representation: TransformedModality, - other_representations, - local_results, - ): - combined = representation - context_operators = self.operator_registry.get_context_operators() - used_representations = representation.transformation - for other_representation in other_representations: - used_representations.append(other_representation()) - for combination in [Concatenation(), Hadamard(), Sum()]: - combined = combined.combine( - modality.apply_representation(other_representation()), combination - ) + reps = self._get_representation_chain(node, dag) + combination = next((op for op in reps if isinstance(op, Fusion)), None) self._evaluate_local( - combined, used_representations, local_results, combination + representations[node_id], local_results, dag, combination ) + if self.debug: + visualize_dag(dag) + + return local_results - for context_op in context_operators: - con_op = context_op() - mod = combined.context(con_op) - c_t = copy.deepcopy(used_representations) - c_t.append(con_op) - self._evaluate_local(mod, c_t, local_results, combination) + def _get_representation_chain( + self, node: "RepresentationNode", dag: RepresentationDag + ) -> List[Any]: + representations = [] + if node.operation: + representations.append(node.operation) + + for input_id in node.inputs: + input_node = dag.get_node_by_id(input_id) + if input_node.operation: + representations.extend(self._get_representation_chain(input_node, dag)) + + return representations def _merge_results(self, local_results): - """Merge local results into the main results""" for modality_id in local_results.results: for task_name in local_results.results[modality_id]: self.operator_performance.results[modality_id][task_name].extend( @@ -167,82 +171,121 @@ def _merge_results(self, local_results): for key, value in local_results.cache[modality][task_name].items(): self.operator_performance.cache[modality][task_name][key] = value - def _evaluate_local( - self, modality, representations, local_results, combination=None - ): + def _evaluate_local(self, modality, local_results, dag, combination=None): if self._tasks_require_same_dims: if self.expected_dimensions == 1 and get_shape(modality.metadata) > 1: - # for aggregation in Aggregation().get_aggregation_functions(): - agg_operator = AggregatedRepresentation(Aggregation()) - agg_modality = agg_operator.transform(modality) - reps = representations.copy() - reps.append(agg_operator) - # agg_modality.pad() + builder = self.builders[modality.modality_id] + agg_operator = AggregatedRepresentation() + rep_node_id = builder.create_operation_node( + agg_operator.__class__, [dag.root_node_id], agg_operator.parameters + ) + dag = builder.build(rep_node_id) + representations = dag.execute([modality]) + node_id = list(representations.keys())[-1] for task in self.tasks: - start = time.time() - scores = task.run(agg_modality.data) - end = time.time() + start = time.perf_counter() + scores = task.run(representations[node_id].data) + end = time.perf_counter() local_results.add_result( - scores, - reps, - modality, - task.model.name, - end - start, - combination, + scores, modality, task.model.name, end - start, combination, dag ) else: modality.pad() for task in self.tasks: - start = time.time() + start = time.perf_counter() scores = task.run(modality.data) - end = time.time() + end = time.perf_counter() local_results.add_result( - scores, - representations, - modality, - task.model.name, - end - start, - combination, + scores, modality, task.model.name, end - start, combination, dag ) else: for task in self.tasks: if task.expected_dim == 1 and get_shape(modality.metadata) > 1: - # for aggregation in Aggregation().get_aggregation_functions(): + builder = self.builders[modality.modality_id] agg_operator = AggregatedRepresentation(Aggregation()) - agg_modality = agg_operator.transform(modality) - - reps = representations.copy() - reps.append(agg_operator) - # modality.pad() - start = time.time() - scores = task.run(agg_modality.data) - end = time.time() + rep_node_id = builder.create_operation_node( + agg_operator.__class__, + [dag.root_node_id], + agg_operator.parameters, + ) + dag = builder.build(rep_node_id) + representations = dag.execute([modality]) + node_id = list(representations.keys())[-1] + + start = time.perf_counter() + scores = task.run(representations[node_id].data) + end = time.perf_counter() local_results.add_result( - scores, - reps, - modality, - task.model.name, - end - start, - combination, + scores, modality, task.model.name, end - start, combination, dag ) else: - # modality.pad() - start = time.time() + start = time.perf_counter() scores = task.run(modality.data) - end = time.time() + end = time.perf_counter() local_results.add_result( - scores, - representations, - modality, - task.model.name, - end - start, - combination, + scores, modality, task.model.name, end - start, combination, dag + ) + + def _build_modality_dag( + self, modality: Modality, operator: Any + ) -> List[RepresentationDag]: + dags = [] + builder = self.builders[modality.modality_id] + leaf_id = builder.create_leaf_node(modality.modality_id) + + rep_node_id = builder.create_operation_node( + operator.__class__, [leaf_id], operator.parameters + ) + current_node_id = rep_node_id + dags.append(builder.build(current_node_id)) + + if not operator.self_contained: + not_self_contained_reps = self._get_not_self_contained_reps( + modality.modality_type + ) + not_self_contained_reps = [ + rep for rep in not_self_contained_reps if rep != operator.__class__ + ] + + for combination in self._combination_operators: + current_node_id = rep_node_id + for other_rep in not_self_contained_reps: + other_rep_id = builder.create_operation_node( + other_rep, [leaf_id], other_rep().parameters ) + combine_id = builder.create_operation_node( + combination.__class__, + [current_node_id, other_rep_id], + combination.parameters, + ) + dags.append(builder.build(combine_id)) + current_node_id = combine_id + + context_operators = self._get_context_operators() + + for context_op in context_operators: + if modality.modality_type != ModalityType.TEXT: + context_node_id = builder.create_operation_node( + context_op, + [leaf_id], + context_op().parameters, + ) + dags.append(builder.build(context_node_id)) + + context_node_id = builder.create_operation_node( + context_op, + [current_node_id], + context_op().parameters, + ) + dags.append(builder.build(context_node_id)) + + return dags + class UnimodalResults: - def __init__(self, modalities, tasks, debug=False): + def __init__(self, modalities, tasks, debug=False, run=None): self.modality_ids = [modality.modality_id for modality in modalities] self.task_names = [task.model.name for task in tasks] self.results = {} @@ -250,48 +293,27 @@ def __init__(self, modalities, tasks, debug=False): self.cache = {} for modality in self.modality_ids: - self.results[modality] = {} - self.cache[modality] = {} - for task_name in self.task_names: - self.cache[modality][task_name] = {} - self.results[modality][task_name] = [] - - def add_result( - self, scores, representations, modality, task_name, task_time, combination - ): - parameters = [] - representation_names = [] - - for rep in representations: - representation_names.append(type(rep).__name__) - if isinstance(rep, AggregatedRepresentation): - parameters.append(rep.parameters) - continue - - params = {} - for param in list(rep.parameters.keys()): - params[param] = getattr(rep, param) - - if isinstance(rep, WindowAggregation): - params["aggregation_function"] = ( - rep.aggregation_function.aggregation_function_name - ) - - parameters.append(params) + self.results[modality] = {task_name: [] for task_name in self.task_names} + self.cache[modality] = {task_name: {} for task_name in self.task_names} + def add_result(self, scores, modality, task_name, task_time, combination, dag): entry = ResultEntry( - representations=representation_names, - params=parameters, train_score=scores[0], val_score=scores[1], representation_time=modality.transform_time, task_time=task_time, combination=combination.name if combination else "", + dag=dag, ) + self.results[modality.modality_id][task_name].append(entry) - self.cache[modality.modality_id][task_name][ - (tuple(representation_names), scores[1], modality.transform_time) - ] = modality + + cache_key = ( + id(dag), + scores[1], + modality.transform_time, + ) + self.cache[modality.modality_id][task_name][cache_key] = modality if self.debug: print(f"{modality.modality_id}_{task_name}: {entry}") @@ -308,29 +330,29 @@ def get_k_best_results(self, modality, k, task): :param modality: modality to get the best results for :param k: number of best results """ - items = self.results[modality.modality_id][task.model.name] - sorted_indices = sorted( - range(len(items)), key=lambda x: items[x].val_score, reverse=True - )[:k] + task_results = self.results[modality.modality_id][task.model.name] + + results = sorted(task_results, key=lambda x: x.val_score, reverse=True)[:k] - results = sorted( - self.results[modality.modality_id][task.model.name], - key=lambda x: x.val_score, + sorted_indices = sorted( + range(len(task_results)), + key=lambda x: task_results[x].val_score, reverse=True, )[:k] - items = list(self.cache[modality.modality_id][task.model.name].items()) - reordered_cache = [items[i][1] for i in sorted_indices] + cache_items = list(self.cache[modality.modality_id][task.model.name].items()) + reordered_cache = [ + cache_items[i][1] for i in sorted_indices if i < len(cache_items) + ] - return results, list(reordered_cache) + return results, reordered_cache @dataclass(frozen=True) class ResultEntry: val_score: float - representations: list - params: list train_score: float representation_time: float task_time: float combination: str + dag: RepresentationDag diff --git a/src/main/python/systemds/scuro/drsearch/unimodal_representation_optimizer.py b/src/main/python/systemds/scuro/drsearch/unimodal_representation_optimizer.py deleted file mode 100644 index e59ddbe9beb..00000000000 --- a/src/main/python/systemds/scuro/drsearch/unimodal_representation_optimizer.py +++ /dev/null @@ -1,271 +0,0 @@ -# ------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# ------------------------------------------------------------- -import copy -import os -import pickle -import time -from typing import List - -from systemds.scuro.drsearch.operator_registry import Registry -from systemds.scuro.drsearch.optimization_data import OptimizationResult -from systemds.scuro.drsearch.representation_cache import RepresentationCache -from systemds.scuro.drsearch.task import Task -from systemds.scuro.modality.modality import Modality -from systemds.scuro.representations.aggregate import Aggregation -from systemds.scuro.representations.context import Context - - -class UnimodalRepresentationOptimizer: - def __init__( - self, - modalities: List[Modality], - tasks: List[Task], - max_chain_depth=5, - debug=False, - folder_name=None, - ): - self.optimization_results = {} - self.modalities = modalities - self.tasks = tasks - self.operator_registry = Registry() - self.initialize_optimization_results() - self.max_chain_depth = max_chain_depth - self.debug = debug - self.cache = RepresentationCache(self.debug) - if self.debug: - self.folder_name = folder_name - os.makedirs(self.folder_name, exist_ok=True) - - def initialize_optimization_results(self): - for modality in self.modalities: - self.optimization_results[modality.modality_id] = {} - for task in self.tasks: - self.optimization_results[modality.modality_id][task.model.name] = [] - - def optimize(self): - """ - This method finds different unimodal representations for all given modalities - """ - - for modality in self.modalities: - self._optimize_modality(modality) - - copy_results = copy.deepcopy( - self.optimization_results[modality.modality_id] - ) - for model in copy_results: - for i, model_task in enumerate(copy_results[model]): - ops = [] - for op in model_task.operator_chain: - if not isinstance(op, str): - ops.append(op.name) - if len(ops) > 0: - copy_results[model][i].operator_chain = ops - if self.debug: - with open( - f"{self.folder_name}/results_{model}_{modality.modality_type.name}.p", - "wb", - ) as fp: - pickle.dump( - copy_results[model], fp, protocol=pickle.HIGHEST_PROTOCOL - ) - - def get_k_best_results(self, modality: Modality, k: int, task: Task): - """ - Get the k best results for the given modality - :param modality: modality to get the best results for - :param k: number of best results - """ - results = sorted( - self.optimization_results[modality.modality_id][task.model.name], - key=lambda x: x.test_accuracy, - reverse=True, - )[:k] - - return results - - def _optimize_modality(self, modality: Modality): - """ - Optimize a single modality by leveraging modality specific heuristics and incorporating context and - stores the resulting operation chains as optimization results. - :param modality: modality to optimize - """ - - representations = self._get_compatible_operators(modality.modality_type, []) - - for rep in representations: - self._build_operator_chain(modality, [rep()], 1) - - def _get_compatible_operators(self, modality_type, used_operators): - next_operators = [] - for operator in self.operator_registry.get_representations(modality_type): - if operator.__name__ not in used_operators: - next_operators.append(operator) - - for context_operator in self.operator_registry.get_context_operators(): - if ( - len(used_operators) == 0 - or context_operator.__name__ not in used_operators[-1] - ): - next_operators.append(context_operator) - - return next_operators - - def _build_operator_chain(self, modality, current_operator_chain, depth): - - if depth > self.max_chain_depth: - return - - self._apply_operator_chain(modality, current_operator_chain) - - current_modality_type = modality.modality_type - - for operator in current_operator_chain: - if hasattr(operator, "output_modality_type"): - current_modality_type = operator.output_modality_type - - next_representations = self._get_compatible_operators( - current_modality_type, [type(op).__name__ for op in current_operator_chain] - ) - - for next_rep in next_representations: - rep_instance = next_rep() - new_chain = current_operator_chain + [rep_instance] - self._build_operator_chain(modality, new_chain, depth + 1) - - def _evaluate_with_flattened_data( - self, modality, operator_chain, op_params, representation_time, task - ): - from systemds.scuro.representations.aggregated_representation import ( - AggregatedRepresentation, - ) - - results = [] - for aggregation in Aggregation().get_aggregation_functions(): - start = time.time() - agg_operator = AggregatedRepresentation(Aggregation(aggregation, True)) - agg_modality = agg_operator.transform(modality) - end = time.time() - - agg_opperator_chain = operator_chain + [agg_operator] - agg_params = dict(op_params) - agg_params.update({agg_operator.name: agg_operator.parameters}) - - score = task.run(agg_modality.data) - result = OptimizationResult( - operator_chain=agg_opperator_chain, - parameters=agg_params, - train_accuracy=score[0], - test_accuracy=score[1], - # train_min_it_acc=score[2], - # test_min_it_acc=score[3], - training_runtime=task.training_time, - inference_runtime=task.inference_time, - representation_time=representation_time + end - start, - output_shape=(1, 1), # TODO - ) - results.append(result) - - if self.debug: - op_name = "" - for operator in agg_opperator_chain: - op_name += str(operator.__class__.__name__) - print(f"{task.name} {task.model.name} {op_name}: {score[1]}") - - return results - - def _evaluate_operator_chain( - self, modality, operator_chain, op_params, representation_time - ): - for task in self.tasks: - if isinstance(modality.data[0], str): - continue - - if ( - task.expected_dim == 1 - and not isinstance(modality.data[0], list) - and modality.data[0].ndim > 1 - ): - r = self._evaluate_with_flattened_data( - modality, operator_chain, op_params, representation_time, task - ) - self.optimization_results[modality.modality_id][task.model.name].extend( - r - ) - else: - score = task.run(modality.data) - result = OptimizationResult( - operator_chain=operator_chain, - parameters=op_params, - train_accuracy=score[0], - test_accuracy=score[1], - # train_min_it_acc=score[2], - # test_min_it_acc=score[3], - training_runtime=task.training_time, - inference_runtime=task.inference_time, - representation_time=representation_time, - output_shape=(1, 1), - ) # TODO - self.optimization_results[modality.modality_id][task.model.name].append( - result - ) - if self.debug: - op_name = "" - for operator in operator_chain: - op_name += str(operator.__class__.__name__) - print(f"{task.name} {task.model.name} - {op_name}: {score[1]}") - - def _apply_operator_chain(self, current_modality, operator_chain): - op_params = {} - modified_modality = current_modality - - representation_start = time.time() - try: - cached_representation, representation_ops, used_op_names = ( - self.cache.load_from_cache( - modified_modality, copy.deepcopy(operator_chain) - ) - ) - if cached_representation is not None: - modified_modality = cached_representation - store = False - for operator in representation_ops: - if isinstance(operator, Context): - modified_modality = modified_modality.context(operator) - else: - modified_modality = modified_modality.apply_representation(operator) - store = True - op_params[operator.name] = operator.get_current_parameters() - if store: - self.cache.save_to_cache( - modified_modality, used_op_names, representation_ops - ) - representation_end = time.time() - - self._evaluate_operator_chain( - modified_modality, - operator_chain, - op_params, - representation_end - representation_start, - ) - except Exception as e: - print(f"Failed to evaluate chain {operator_chain}: {str(e)}") - return diff --git a/src/main/python/systemds/scuro/modality/joined_transformed.py b/src/main/python/systemds/scuro/modality/joined_transformed.py index 3e0d8fb9dfb..078ce86f7af 100644 --- a/src/main/python/systemds/scuro/modality/joined_transformed.py +++ b/src/main/python/systemds/scuro/modality/joined_transformed.py @@ -72,7 +72,7 @@ def combine(self, fusion_method): return self def window_aggregation(self, window_size, aggregation): - w = WindowAggregation(window_size, aggregation) + w = WindowAggregation(aggregation, window_size) self.left_modality.data = w.execute(self.left_modality) self.right_modality.data = w.execute(self.right_modality) return self diff --git a/src/main/python/systemds/scuro/modality/modality.py b/src/main/python/systemds/scuro/modality/modality.py index f1b00fefcfe..98dd631e12c 100644 --- a/src/main/python/systemds/scuro/modality/modality.py +++ b/src/main/python/systemds/scuro/modality/modality.py @@ -132,6 +132,8 @@ def pad(self, value=0, max_len=None): try: if max_len is None: result = np.array(self.data) + elif isinstance(self.data, np.ndarray) and self.data.shape[1] == max_len: + result = self.data else: raise "Needs padding to max_len" except: @@ -144,8 +146,16 @@ def pad(self, value=0, max_len=None): for i, seq in enumerate(self.data): data = seq[:maxlen] result[i, : len(data)] = data - # TODO: add padding to metadata as attention_masks + if self.has_metadata(): + attention_mask = np.zeros(result.shape[1], dtype=np.int8) + attention_mask[: len(seq[:maxlen])] = 1 + md_key = list(self.metadata.keys())[i] + if "attention_mask" in self.metadata[md_key]: + self.metadata[md_key]["attention_mask"] = attention_mask + else: + self.metadata[md_key].update({"attention_mask": attention_mask}) + # TODO: this might need to be a new modality (otherwise we loose the original data) self.data = result def get_data_layout(self): diff --git a/src/main/python/systemds/scuro/modality/transformed.py b/src/main/python/systemds/scuro/modality/transformed.py index 9481937e2ca..7e8e54eff33 100644 --- a/src/main/python/systemds/scuro/modality/transformed.py +++ b/src/main/python/systemds/scuro/modality/transformed.py @@ -63,6 +63,7 @@ def __init__( def add_transformation(self, transformation, modality): if ( transformation.__class__.__bases__[0].__name__ == "Fusion" + and type(modality).__name__ == "TransformedModality" and modality.transformation[0].__class__.__bases__[0].__name__ != "Fusion" ): self.transformation = [] @@ -99,8 +100,8 @@ def join(self, right, join_condition): return joined_modality - def window_aggregation(self, windowSize, aggregation): - w = WindowAggregation(windowSize, aggregation) + def window_aggregation(self, window_size, aggregation): + w = WindowAggregation(aggregation, window_size) transformed_modality = TransformedModality( self, w, self_contained=self.self_contained ) @@ -135,11 +136,26 @@ def combine(self, other: Union[Modality, List[Modality]], fusion_method): fused_modality = TransformedModality( self, fusion_method, ModalityType.EMBEDDING ) + fused_modality.data = fusion_method.transform(self.create_modality_list(other)) + + return fused_modality + + def combine_with_training( + self, other: Union[Modality, List[Modality]], fusion_method, task + ): + fused_modality = TransformedModality( + self, fusion_method, ModalityType.EMBEDDING + ) + modalities = self.create_modality_list(other) + fused_modality.data = fusion_method.transform_with_training(modalities, task) + + return fused_modality + + def create_modality_list(self, other: Union[Modality, List[Modality]]): modalities = [self] if isinstance(other, list): modalities.extend(other) else: modalities.append(other) - fused_modality.data = fusion_method.transform(modalities) - return fused_modality + return modalities diff --git a/src/main/python/systemds/scuro/modality/type.py b/src/main/python/systemds/scuro/modality/type.py index b2331d0faed..2853e8135d6 100644 --- a/src/main/python/systemds/scuro/modality/type.py +++ b/src/main/python/systemds/scuro/modality/type.py @@ -230,6 +230,17 @@ def create_text_metadata(self, length, data): md["length"] = length return md + def create_ts_metadata( + self, signal_names, data, sampling_rate=None, is_single_instance=True + ): + md = deepcopy(self.get_schema()) + md = ModalitySchemas.update_base_metadata(md, data, is_single_instance) + md["frequency"] = sampling_rate if sampling_rate is not None else 1 + md["length"] = data.shape[0] + md["signal_names"] = signal_names + md["timestamp"] = create_timestamps(md["frequency"], md["length"]) + return md + def create_video_metadata(self, frequency, length, width, height, num_channels): md = deepcopy(self.get_schema()) md["frequency"] = frequency diff --git a/src/main/python/systemds/scuro/modality/unimodal_modality.py b/src/main/python/systemds/scuro/modality/unimodal_modality.py index dd1674ea85a..373921e95c2 100644 --- a/src/main/python/systemds/scuro/modality/unimodal_modality.py +++ b/src/main/python/systemds/scuro/modality/unimodal_modality.py @@ -27,7 +27,7 @@ from systemds.scuro.modality.modality import Modality from systemds.scuro.modality.joined import JoinedModality from systemds.scuro.modality.transformed import TransformedModality -from systemds.scuro.modality.modality_identifier import ModalityIdentifier +from systemds.scuro.utils.identifier import Identifier class UnimodalModality(Modality): @@ -40,7 +40,7 @@ def __init__(self, data_loader: BaseLoader): """ super().__init__( data_loader.modality_type, - ModalityIdentifier().new_id(), + Identifier().new_id(), {}, data_loader.data_type, ) @@ -130,6 +130,11 @@ def apply_representation(self, representation): self.extract_raw_data() new_modality = representation.transform(self) + for i, d in enumerate(new_modality.data): + output = np.array(d) + if np.isnan(output).any(): + new_modality.data[i] = np.where(np.isnan(output), 0, output) + if not all( "attention_masks" in entry for entry in new_modality.metadata.values() ): diff --git a/src/main/python/systemds/scuro/representations/aggregate.py b/src/main/python/systemds/scuro/representations/aggregate.py index 2c046dc4016..0a8438e684f 100644 --- a/src/main/python/systemds/scuro/representations/aggregate.py +++ b/src/main/python/systemds/scuro/representations/aggregate.py @@ -20,6 +20,7 @@ # ------------------------------------------------------------- import numpy as np +from systemds.scuro.modality.type import ModalityType from systemds.scuro.representations import utils @@ -71,7 +72,13 @@ def execute(self, modality): for i, instance in enumerate(modality.data): data.append([]) if isinstance(instance, np.ndarray): - aggregated_data = self._aggregation_func(instance) + if ( + modality.modality_type == ModalityType.IMAGE + or modality.modality_type == ModalityType.VIDEO + ) and instance.ndim > 2: + aggregated_data = instance.flatten() + else: + aggregated_data = self._aggregation_func(instance) else: aggregated_data = [] for entry in instance: @@ -79,18 +86,17 @@ def execute(self, modality): max_len = max(max_len, len(aggregated_data)) data[i] = aggregated_data - if self.pad_modality: - for i, instance in enumerate(data): - if isinstance(instance, np.ndarray): - if len(instance) < max_len: - padded_data = np.zeros(max_len, dtype=instance.dtype) - padded_data[: len(instance)] = instance - data[i] = padded_data - else: - padded_data = [] - for entry in instance: - padded_data.append(utils.pad_sequences(entry, max_len)) + for i, instance in enumerate(data): + if isinstance(instance, np.ndarray): + if len(instance) < max_len: + padded_data = np.zeros(max_len, dtype=instance.dtype) + padded_data[: len(instance)] = instance data[i] = padded_data + else: + padded_data = [] + for entry in instance: + padded_data.append(utils.pad_sequences(entry, max_len)) + data[i] = padded_data return np.array(data) diff --git a/src/main/python/systemds/scuro/representations/aggregated_representation.py b/src/main/python/systemds/scuro/representations/aggregated_representation.py index 9119070a027..1e98d2f92ae 100644 --- a/src/main/python/systemds/scuro/representations/aggregated_representation.py +++ b/src/main/python/systemds/scuro/representations/aggregated_representation.py @@ -20,12 +20,16 @@ # ------------------------------------------------------------- from systemds.scuro.modality.transformed import TransformedModality from systemds.scuro.representations.representation import Representation +from systemds.scuro.representations.aggregate import Aggregation class AggregatedRepresentation(Representation): - def __init__(self, aggregation): - super().__init__("AggregatedRepresentation", aggregation.parameters) - self.aggregation = aggregation + def __init__(self, aggregation="mean"): + parameters = { + "aggregation": list(Aggregation().get_aggregation_functions()), + } + super().__init__("AggregatedRepresentation", parameters) + self.aggregation = Aggregation(aggregation) self.self_contained = True def transform(self, modality): diff --git a/src/main/python/systemds/scuro/representations/bow.py b/src/main/python/systemds/scuro/representations/bow.py index 7cfddbb506f..2b338d30ee6 100644 --- a/src/main/python/systemds/scuro/representations/bow.py +++ b/src/main/python/systemds/scuro/representations/bow.py @@ -34,8 +34,8 @@ class BoW(UnimodalRepresentation): def __init__(self, ngram_range=2, min_df=2, output_file=None): parameters = {"ngram_range": [ngram_range], "min_df": [min_df]} super().__init__("BoW", ModalityType.EMBEDDING, parameters) - self.ngram_range = ngram_range - self.min_df = min_df + self.ngram_range = int(ngram_range) + self.min_df = int(min_df) self.output_file = output_file def transform(self, modality): diff --git a/src/main/python/systemds/scuro/representations/fusion.py b/src/main/python/systemds/scuro/representations/fusion.py index ea614ac0955..8cf67b1cb42 100644 --- a/src/main/python/systemds/scuro/representations/fusion.py +++ b/src/main/python/systemds/scuro/representations/fusion.py @@ -18,10 +18,14 @@ # under the License. # # ------------------------------------------------------------- +import copy from typing import List import numpy as np -from systemds.scuro import AggregatedRepresentation, Aggregation +from systemds.scuro.representations.aggregated_representation import ( + AggregatedRepresentation, +) +from systemds.scuro.modality.transformed import TransformedModality from systemds.scuro.modality.modality import Modality from systemds.scuro.representations.representation import Representation @@ -52,7 +56,7 @@ def transform(self, modalities: List[Modality]): for modality in modalities: agg_modality = None if get_shape(modality.metadata) > 1: - agg_operator = AggregatedRepresentation(Aggregation()) + agg_operator = AggregatedRepresentation() agg_modality = agg_operator.transform(modality) mods.append(agg_modality if agg_modality else modality) @@ -63,26 +67,53 @@ def transform(self, modalities: List[Modality]): return self.execute(mods) - def transform_with_training( - self, modalities: List[Modality], train_indices, labels - ): - # if self.needs_instance_alignment: - # max_len = self.get_max_embedding_size(modalities) - # for modality in modalities: - # modality.pad(max_len=max_len) - - self.execute( - [np.array(modality.data)[train_indices] for modality in modalities], - labels[train_indices], + def transform_with_training(self, modalities: List[Modality], task): + train_modalities = [] + for modality in modalities: + train_data = [ + d for i, d in enumerate(modality.data) if i in task.train_indices + ] + train_modality = TransformedModality(modality, self) + train_modality.data = copy.deepcopy(train_data) + train_modalities.append(train_modality) + + transformed_train = self.execute( + train_modalities, task.labels[task.train_indices] ) + transformed_val = self.transform_data(modalities, task.val_indices) - def transform_data(self, modalities: List[Modality], val_indices): - return self.apply_representation( - [np.array(modality.data)[val_indices] for modality in modalities] + transformed_data = np.zeros( + (len(modalities[0].data), transformed_train.shape[1]) ) + transformed_data[task.train_indices] = transformed_train + transformed_data[task.val_indices] = transformed_val + + return transformed_data + + def transform_data(self, modalities: List[Modality], indices=None): + val_modalities = [] + for modality in modalities: + val_data = ( + [d for i, d in enumerate(modality.data) if i in indices] + if indices + else modality.data + ) + val_modality = type(modality)(modality, self) + val_modality.data = copy.deepcopy(val_data) + val_modalities.append(val_modality) - def execute(self, modalities: List[Modality]): - raise f"Not implemented for Fusion: {self.name}" + return self.apply_representation(val_modalities) + + def execute(self, modalities: List[Modality], labels: np.ndarray = None): + raise NotImplementedError(f"Not implemented for Fusion: {self.name}") + + def apply_representation(self, modalities: List[Modality]): + if self.needs_training: + raise NotImplementedError( + f"apply_representation not implemented for trainable fusion: {self.name}" + ) + else: + return self.execute(modalities) def get_max_embedding_size(self, modalities: List[Modality]): """ @@ -90,6 +121,11 @@ def get_max_embedding_size(self, modalities: List[Modality]): :param modalities: List of modalities :return: maximum embedding size """ + try: + modalities[0].data = np.array(modalities[0].data) + except: + pass + if isinstance(modalities[0].data[0], list): max_size = modalities[0].data[0][0].shape[1] elif isinstance(modalities[0].data, np.ndarray): diff --git a/src/main/python/systemds/scuro/representations/lstm.py b/src/main/python/systemds/scuro/representations/lstm.py index 0cfafddefa9..c8e96448815 100644 --- a/src/main/python/systemds/scuro/representations/lstm.py +++ b/src/main/python/systemds/scuro/representations/lstm.py @@ -22,10 +22,10 @@ import random import torch - from torch import nn -from typing import List - +from torch.utils.data import DataLoader, TensorDataset +from typing import List, Dict, Any +from systemds.scuro.utils.static_variables import get_device import numpy as np from systemds.scuro.modality.modality import Modality @@ -34,21 +34,46 @@ from systemds.scuro.drsearch.operator_registry import register_fusion_operator -# TODO: concatenate before embedding -# Make this a hyperparameter @register_fusion_operator() class LSTM(Fusion): - def __init__(self, width=128, depth=1, dropout_rate=0.1): - """ - Combines modalities using an LSTM - """ - super().__init__("LSTM") - self.depth = depth - self.width = width - self.dropout_rate = dropout_rate - self.unimodal_embeddings = {} - seed = 42 - + def __init__( + self, + width=128, + depth=1, + dropout_rate=0.1, + learning_rate=0.001, + epochs=50, + batch_size=32, + ): + parameters = { + "width": [128, 256, 512], + "depth": [1, 2, 3], + "dropout_rate": [0.1, 0.2, 0.3, 0.4, 0.5], + "learning_rate": [0.001, 0.0001, 0.01, 0.1], + "epochs": [50, 100, 200], + "batch_size": [8, 16, 32, 64, 128], + } + + super().__init__("LSTM", parameters) + + self.width = int(width) + self.depth = int(depth) + self.dropout_rate = float(dropout_rate) + self.learning_rate = float(learning_rate) + self.epochs = int(epochs) + self.batch_size = int(batch_size) + + self.needs_training = True + self.needs_alignment = True + self.model = None + self.input_dim = None + self.num_classes = None + self.is_trained = False + self.model_state = None + + self._set_random_seeds() + + def _set_random_seeds(self, seed=42): os.environ["PYTHONHASHSEED"] = str(seed) random.seed(seed) np.random.seed(seed) @@ -57,40 +82,161 @@ def __init__(self, width=128, depth=1, dropout_rate=0.1): torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False - def transform(self, modalities: List[Modality]): - self.unimodal_embeddings = {} - size = len(modalities[0].data) - - result = np.zeros((size, 0)) + def _prepare_data(self, modalities: List[Modality]) -> np.ndarray: + processed_modalities = [] for modality in modalities: - if modality.modality_type in list(self.unimodal_embeddings.keys()): - out = self.unimodal_embeddings.get(modality.modality_type) + try: + data = np.array(modality.data) + except: + max_len = -1 + for md in modality.metadata.values(): + if max_len < md["data_layout"]["shape"][0]: + max_len = md["data_layout"]["shape"][0] + data = np.zeros((len(modality.data), max_len)) + for i, d in enumerate(modality.data): + data[i, : len(d)] = d + + modality.data = data + + if data.ndim == 1: + data = data.reshape(-1, 1, 1) + elif data.ndim == 2: + data = data.reshape(data.shape[0], 1, data.shape[1]) + elif data.ndim == 3: + pass else: - out = self.run_lstm(modality.data) - self.unimodal_embeddings[modality.modality_type] = out + raise ValueError( + f"Unsupported data shape: {data.shape}. Expected 1D, 2D, or 3D arrays." + ) + + processed_modalities.append(data) + + max_seq_len = max(mod.shape[1] for mod in processed_modalities) + + aligned_modalities = [] + for data in processed_modalities: + if data.shape[1] < max_seq_len: + pad_width = ((0, 0), (0, max_seq_len - data.shape[1]), (0, 0)) + data = np.pad(data, pad_width, mode="constant", constant_values=0) + aligned_modalities.append(data) + + concatenated_data = np.concatenate(aligned_modalities, axis=2) + + return concatenated_data.astype(np.float32) + + def _build_model(self, input_dim: int, num_classes: int) -> nn.Module: + + class LSTMClassifier(nn.Module): + def __init__( + self, input_dim, hidden_dim, num_layers, num_classes, dropout_rate + ): + super(LSTMClassifier, self).__init__() + self.hidden_dim = hidden_dim + self.num_layers = num_layers + + self.lstm = nn.LSTM( + input_dim, + hidden_dim, + num_layers, + batch_first=True, + bidirectional=True, + dropout=dropout_rate if num_layers > 1 else 0, + ) + + self.dropout = nn.Dropout(dropout_rate) + self.classifier = nn.Linear(hidden_dim * 2, num_classes) + + def forward(self, x): + lstm_out, _ = self.lstm(x) + last_output = lstm_out[:, -1, :] + dropped = self.dropout(last_output) + output = self.classifier(dropped) + + return last_output, output + + return LSTMClassifier( + input_dim, self.width, self.depth, num_classes, self.dropout_rate + ) + + def execute(self, modalities: List[Modality], labels: np.ndarray = None): + if labels is None: + raise ValueError("LSTM fusion requires labels for training") + + X = self._prepare_data(modalities) + y = np.array(labels) + + self.input_dim = X.shape[2] + self.num_classes = len(np.unique(y)) + + self.model = self._build_model(self.input_dim, self.num_classes) + device = get_device() + self.model.to(device) + + criterion = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate) + + X_tensor = torch.FloatTensor(X).to(device) + y_tensor = torch.LongTensor(y).to(device) + + dataset = TensorDataset(X_tensor, y_tensor) + dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True) + + self.model.train() + for epoch in range(self.epochs): + total_loss = 0 + for batch_X, batch_y in dataloader: + optimizer.zero_grad() + + features, predictions = self.model(batch_X) + loss = criterion(predictions, batch_y) + + loss.backward() + optimizer.step() + + total_loss += loss.item() + + self.is_trained = True + + self.model_state = { + "state_dict": self.model.state_dict(), + "input_dim": self.input_dim, + "num_classes": self.num_classes, + "width": self.width, + "depth": self.depth, + "dropout_rate": self.dropout_rate, + } + + self.model.eval() + with torch.no_grad(): + features, _ = self.model(X_tensor) + return features.cpu().numpy() + + def apply_representation(self, modalities: List[Modality]) -> np.ndarray: + if not self.is_trained or self.model is None: + raise ValueError("Model must be trained before applying representation") - result = np.concatenate([result, out], axis=-1) + X = self._prepare_data(modalities) - return result + device = get_device() + self.model.to(device) - def run_lstm(self, data): - if isinstance(data, list): - data = np.array(data) + X_tensor = torch.FloatTensor(X).to(device) - d = data.astype(np.float32) - dim = d.shape[-1] - d = torch.from_numpy(d) - dropout_layer = torch.nn.Dropout(self.dropout_rate) + self.model.eval() + with torch.no_grad(): + features, _ = self.model(X_tensor) - for x in range(0, self.depth): - lstm_x = nn.LSTM(dim, self.width, batch_first=True, bidirectional=True) - dim = 2 * self.width - d = lstm_x(d)[0] + return features.cpu().numpy() - out = dropout_layer(d) + def get_model_state(self) -> Dict[str, Any]: + return self.model_state - if d.ndim > 2: - out = torch.flatten(out, 1) + def set_model_state(self, state: Dict[str, Any]): + self.model_state = state + self.input_dim = state["input_dim"] + self.num_classes = state["num_classes"] - return out.detach().numpy() + self.model = self._build_model(self.input_dim, self.num_classes) + self.model.load_state_dict(state["state_dict"]) + self.is_trained = True diff --git a/src/main/python/systemds/scuro/representations/mel_spectrogram.py b/src/main/python/systemds/scuro/representations/mel_spectrogram.py index dca1b0eec85..6ea90619013 100644 --- a/src/main/python/systemds/scuro/representations/mel_spectrogram.py +++ b/src/main/python/systemds/scuro/representations/mel_spectrogram.py @@ -37,9 +37,9 @@ def __init__(self, n_mels=128, hop_length=512, n_fft=2048): "n_fft": [1024, 2048, 4096], } super().__init__("MelSpectrogram", ModalityType.TIMESERIES, parameters, False) - self.n_mels = n_mels - self.hop_length = hop_length - self.n_fft = n_fft + self.n_mels = int(n_mels) + self.hop_length = int(hop_length) + self.n_fft = int(n_fft) def transform(self, modality): transformed_modality = TransformedModality( diff --git a/src/main/python/systemds/scuro/representations/mfcc.py b/src/main/python/systemds/scuro/representations/mfcc.py index c942f3076e7..4d9989add98 100644 --- a/src/main/python/systemds/scuro/representations/mfcc.py +++ b/src/main/python/systemds/scuro/representations/mfcc.py @@ -38,10 +38,10 @@ def __init__(self, n_mfcc=12, dct_type=2, n_mels=128, hop_length=512): "n_mels": [20, 32, 64, 128], } # TODO super().__init__("MFCC", ModalityType.TIMESERIES, parameters, False) - self.n_mfcc = n_mfcc - self.dct_type = dct_type - self.n_mels = n_mels - self.hop_length = hop_length + self.n_mfcc = int(n_mfcc) + self.dct_type = int(dct_type) + self.n_mels = int(n_mels) + self.hop_length = int(hop_length) def transform(self, modality): transformed_modality = TransformedModality( diff --git a/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py b/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py index 7928b9988bd..d17451932e1 100644 --- a/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py +++ b/src/main/python/systemds/scuro/representations/multimodal_attention_fusion.py @@ -19,10 +19,12 @@ # # ------------------------------------------------------------- +import os +import random import torch import torch.nn as nn import torch.nn.functional as F -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Any import numpy as np from systemds.scuro.drsearch.operator_registry import register_fusion_operator from systemds.scuro.modality.modality import Modality @@ -37,140 +39,237 @@ def __init__( hidden_dim=256, num_heads=8, dropout=0.1, - fusion_strategy="attention", batch_size=32, num_epochs=50, + learning_rate=0.001, ): - self.encoder = None params = { - "hidden_dim": [128, 256, 512], - "num_heads": [1, 4, 8], - "dropout": [0.1, 0.2, 0.3], - "fusion_strategy": ["mean", "max", "attention", "cls"], - "batch_size": [32, 64, 128], - "num_epochs": [50, 70, 100, 150], + "hidden_dim": [32, 128, 256, 384, 512, 768], + "num_heads": [2, 4, 8, 12], + "dropout": [0.0, 0.1, 0.2, 0.3, 0.4], + "batch_size": [8, 16, 32, 64, 128], + "num_epochs": [50, 100, 150, 200], + "learning_rate": [1e-5, 1e-4, 1e-3, 1e-2], } super().__init__("AttentionFusion", params) - self.hidden_dim = hidden_dim - self.num_heads = num_heads - self.dropout = dropout - self.fusion_strategy = fusion_strategy - self.batch_size = batch_size - self.needs_training = True - self.needs_instance_alignment = True - self.num_epochs = num_epochs - def execute( - self, - data: List[np.ndarray], - labels: np.ndarray, - ): - input_dimension = {} + self.hidden_dim = int(hidden_dim) + self.num_heads = int(num_heads) + self.dropout = float(dropout) + self.batch_size = int(batch_size) + self.num_epochs = int(num_epochs) + self.learning_rate = float(learning_rate) + + self.needs_training = True + self.needs_alignment = True + self.encoder = None + self.classification_head = None + self.input_dimensions = None + self.max_sequence_length = None + self.num_classes = None + self.is_trained = False + self.model_state = None + + self._set_random_seeds() + + def _set_random_seeds(self, seed=42): + os.environ["PYTHONHASHSEED"] = str(seed) + random.seed(seed) + np.random.seed(seed) + torch.manual_seed(seed) + torch.cuda.manual_seed(seed) + torch.backends.cudnn.deterministic = True + torch.backends.cudnn.benchmark = False + + def _prepare_data(self, modalities: List[Modality]) -> Dict[str, torch.Tensor]: inputs = {} + input_dimensions = {} max_sequence_length = 0 - masks = {} - for i, modality in enumerate(data): - modality_name = "modality_" + str(i) - shape = modality.shape - max_sequence_length = max(max_sequence_length, shape[1]) - input_dimension[modality_name] = shape[2] if len(shape) > 2 else shape[1] - inputs[modality_name] = torch.from_numpy(np.stack(modality)).to( - get_device() - ) - # attention_masks_list = [ - # entry["attention_masks"] - # for entry in modality.metadata.values() - # if "attention_masks" in entry - # ] - attention_masks_list = None - if attention_masks_list: - masks[modality_name] = ( - torch.tensor(np.array(attention_masks_list)).bool().to(get_device()) - ) + for i, modality in enumerate(modalities): + modality_name = f"modality_{i}" + data = np.array(modality.data) + + if data.ndim == 1: + data = data.reshape(-1, 1, 1) + elif data.ndim == 2: + data = data.reshape(data.shape[0], 1, data.shape[1]) + elif data.ndim == 3: + pass else: - masks[modality_name] = None + raise ValueError( + f"Unsupported data shape: {data.shape}. Expected 1D, 2D, or 3D arrays." + ) + + input_dimensions[modality_name] = data.shape[2] # Feature dimension + max_sequence_length = max(max_sequence_length, data.shape[1]) + + inputs[modality_name] = torch.from_numpy(data.astype(np.float32)) + + for modality_name, tensor in inputs.items(): + if tensor.shape[1] < max_sequence_length: + pad_width = (0, 0, 0, max_sequence_length - tensor.shape[1], 0, 0) + inputs[modality_name] = F.pad( + tensor, pad_width, mode="constant", value=0 + ) + + return inputs, input_dimensions, max_sequence_length + + def execute(self, modalities: List[Modality], labels: np.ndarray = None): + if labels is None: + raise ValueError("Attention fusion requires labels for training") + + inputs, input_dimensions, max_sequence_length = self._prepare_data(modalities) + y = np.array(labels) + + self.input_dimensions = input_dimensions + self.max_sequence_length = max_sequence_length + self.num_classes = len(np.unique(y)) self.encoder = MultiModalAttentionFusion( - input_dimension, + self.input_dimensions, self.hidden_dim, self.num_heads, self.dropout, - max_sequence_length, - self.fusion_strategy, + self.max_sequence_length, ) - head = FusedClassificationHead( - fused_dim=self.hidden_dim, num_classes=len(np.unique(labels)) + self.classification_head = FusedClassificationHead( + fused_dim=self.hidden_dim, num_classes=self.num_classes ) + + device = get_device() + self.encoder.to(device) + self.classification_head.to(device) + criterion = nn.CrossEntropyLoss() optimizer = torch.optim.Adam( - list(self.encoder.parameters()) + list(head.parameters()), lr=0.001 + list(self.encoder.parameters()) + + list(self.classification_head.parameters()), + lr=self.learning_rate, ) - labels = torch.from_numpy(labels).to(get_device()) + + for modality_name in inputs: + inputs[modality_name] = inputs[modality_name].to(device) + labels_tensor = torch.from_numpy(y).long().to(device) + + dataset_inputs = [] + for i in range(len(y)): + sample_inputs = {name: tensor[i] for name, tensor in inputs.items()} + dataset_inputs.append((sample_inputs, labels_tensor[i])) + + self.encoder.train() + self.classification_head.train() for epoch in range(self.num_epochs): total_loss = 0 - total_accuracy = 0 - for batch_idx in range(0, len(data), self.batch_size): - batched_input = {} - for modality, modality_data in inputs.items(): - batched_input[modality] = modality_data[ - batch_idx : batch_idx + self.batch_size - ] - loss, predictions = self.train_encoder_step( - head, - inputs, - labels[batch_idx : batch_idx + self.batch_size], - criterion, - optimizer, - ) - total_loss += loss - total_accuracy += predictions + total_correct = 0 + total_samples = 0 + + for batch_start in range(0, len(dataset_inputs), self.batch_size): + batch_end = min(batch_start + self.batch_size, len(dataset_inputs)) + batch_data = dataset_inputs[batch_start:batch_end] + + batch_inputs = {} + batch_labels = [] + + for sample_inputs, label in batch_data: + batch_labels.append(label) + for modality_name, tensor in sample_inputs.items(): + if modality_name not in batch_inputs: + batch_inputs[modality_name] = [] + batch_inputs[modality_name].append(tensor) + + for modality_name in batch_inputs: + batch_inputs[modality_name] = torch.stack( + batch_inputs[modality_name] + ) + + batch_labels = torch.stack(batch_labels) + + optimizer.zero_grad() + + encoder_output = self.encoder(batch_inputs) + logits = self.classification_head(encoder_output["fused"]) + loss = criterion(logits, batch_labels) + + loss.backward() + optimizer.step() + + total_loss += loss.item() + _, predicted = torch.max(logits.data, 1) + total_correct += (predicted == batch_labels).sum().item() + total_samples += batch_labels.size(0) + + self.is_trained = True + + self.model_state = { + "encoder_state_dict": self.encoder.state_dict(), + "classification_head_state_dict": self.classification_head.state_dict(), + "input_dimensions": self.input_dimensions, + "max_sequence_length": self.max_sequence_length, + "num_classes": self.num_classes, + "hidden_dim": self.hidden_dim, + "num_heads": self.num_heads, + "dropout": self.dropout, + } - if epoch % 50 == 0 or epoch == self.num_epochs - 1: - print( - f"Epoch {epoch}, Loss: {total_loss:.4f}, accuracy: {total_accuracy/len(data):.4f}" - ) + def apply_representation(self, modalities: List[Modality]) -> np.ndarray: + if not self.is_trained or self.encoder is None: + raise ValueError("Model must be trained before applying representation") + + inputs, _, _ = self._prepare_data(modalities) + + device = get_device() + self.encoder.to(device) + + for modality_name in inputs: + inputs[modality_name] = inputs[modality_name].to(device) - # Training step (encoder + classification head) - def train_encoder_step(self, head, inputs, labels, criterion, optimizer): - self.encoder.train() - head.train() - optimizer.zero_grad() - output = self.encoder(inputs) - logits = head(output["fused"]) - loss = criterion(logits, labels) - loss.backward() - optimizer.step() - _, predicted = torch.max(logits.data, 1) - return loss.item(), (predicted == labels).sum().item() - - def apply_representation(self, modalities): - inputs = {} - for i, modality in enumerate(modalities): - modality_name = "modality_" + str(i) - inputs[modality_name] = torch.from_numpy(np.stack(modality)).to( - get_device() - ) self.encoder.eval() with torch.no_grad(): - output = self.encoder(inputs) - return output["fused"].cpu().numpy() + encoder_output = self.encoder(inputs) + + return encoder_output["fused"].cpu().numpy() + + def get_model_state(self) -> Dict[str, Any]: + return self.model_state + + def set_model_state(self, state: Dict[str, Any]): + self.model_state = state + self.input_dimensions = state["input_dimensions"] + self.max_sequence_length = state["max_sequence_length"] + self.num_classes = state["num_classes"] + + self.encoder = MultiModalAttentionFusion( + self.input_dimensions, + state["hidden_dim"], + state["num_heads"], + state["dropout"], + self.max_sequence_length, + ) + self.encoder.load_state_dict(state["encoder_state_dict"]) + + self.classification_head = FusedClassificationHead( + fused_dim=state["hidden_dim"], num_classes=self.num_classes + ) + self.classification_head.load_state_dict( + state["classification_head_state_dict"] + ) + + self.is_trained = True class FusedClassificationHead(nn.Module): - """ - Simple classification head for supervision during training. - """ def __init__(self, fused_dim, num_classes=2): super(FusedClassificationHead, self).__init__() self.head = nn.Sequential( nn.Linear(fused_dim, fused_dim // 2), nn.ReLU(), + nn.Dropout(0.1), nn.Linear(fused_dim // 2, num_classes), - ).to(get_device()) + ) def forward(self, fused): return self.head(fused) @@ -184,7 +283,7 @@ def __init__( num_heads: int, dropout: float, max_seq_len: int, - pooling_strategy: str, + pooling_strategy: str = "mean", ): super().__init__() @@ -194,50 +293,42 @@ def __init__( self.pooling_strategy = pooling_strategy self.max_seq_len = max_seq_len - # Project each modality to the same hidden dimension self.modality_projections = nn.ModuleDict( { - modality: nn.Linear(dim, hidden_dim).to(get_device()) + modality: nn.Linear(dim, hidden_dim) for modality, dim in modality_dims.items() } ) - # Positional encoding for sequence modalities self.positional_encoding = nn.Parameter( torch.randn(max_seq_len, hidden_dim) * 0.1 - ).to(get_device()) + ) - # Cross-modal attention self.cross_attention = nn.MultiheadAttention( embed_dim=hidden_dim, num_heads=num_heads, dropout=dropout, batch_first=True - ).to(get_device()) + ) - # Self-attention within each modality self.self_attention = nn.MultiheadAttention( embed_dim=hidden_dim, num_heads=num_heads, dropout=dropout, batch_first=True - ).to(get_device()) + ) - # Attention-based pooling for sequences if pooling_strategy == "attention": self.pooling_attention = nn.Sequential( nn.Linear(hidden_dim, hidden_dim // 2), nn.Tanh(), nn.Linear(hidden_dim // 2, 1), - ).to(get_device()) + ) - # Modality-level attention for final fusion self.modality_attention = nn.Sequential( nn.Linear(hidden_dim, hidden_dim // 2), nn.ReLU(), nn.Linear(hidden_dim // 2, 1), - ).to(get_device()) + ) - # Layer normalization - self.layer_norm = nn.LayerNorm(hidden_dim).to(get_device()) - self.dropout = nn.Dropout(dropout).to(get_device()) + self.layer_norm = nn.LayerNorm(hidden_dim) + self.dropout_layer = nn.Dropout(dropout) - # Final projection - self.final_projection = nn.Linear(hidden_dim, hidden_dim).to(get_device()) + self.final_projection = nn.Linear(hidden_dim, hidden_dim) def _handle_input_format(self, modality_tensor: torch.Tensor) -> torch.Tensor: if len(modality_tensor.shape) == 2: @@ -269,29 +360,22 @@ def _pool_sequence( elif self.pooling_strategy == "max": if mask is not None: - # Set masked positions to large negative value before max pooling masked_seq = sequence.masked_fill(~mask.unsqueeze(-1), float("-inf")) return masked_seq.max(dim=1)[0] else: return sequence.max(dim=1)[0] elif self.pooling_strategy == "cls": - # Use the first token (assuming it's a CLS token) return sequence[:, 0, :] elif self.pooling_strategy == "attention": - # Attention-based pooling - attention_scores = self.pooling_attention(sequence).squeeze( - -1 - ) # (batch, seq) + attention_scores = self.pooling_attention(sequence).squeeze(-1) if mask is not None: attention_scores = attention_scores.masked_fill(~mask, float("-inf")) - attention_weights = F.softmax(attention_scores, dim=1) # (batch, seq) - return (sequence * attention_weights.unsqueeze(-1)).sum( - dim=1 - ) # (batch, hidden) + attention_weights = F.softmax(attention_scores, dim=1) + return (sequence * attention_weights.unsqueeze(-1)).sum(dim=1) else: raise ValueError(f"Unknown pooling strategy: {self.pooling_strategy}") @@ -323,7 +407,7 @@ def forward( key_padding_mask=~mask if mask is not None else None, ) - projected = self.layer_norm(projected + self.dropout(attended)) + projected = self.layer_norm(projected + self.dropout_layer(attended)) pooled = self._pool_sequence(projected, mask) else: @@ -339,7 +423,7 @@ def forward( ) cross_attended = self.layer_norm( - modality_stack + self.dropout(cross_attended) + modality_stack + self.dropout_layer(cross_attended) ) updated_embeddings = { diff --git a/src/main/python/systemds/scuro/representations/representation.py b/src/main/python/systemds/scuro/representations/representation.py index 144b88f34c0..dac3bb2b983 100644 --- a/src/main/python/systemds/scuro/representations/representation.py +++ b/src/main/python/systemds/scuro/representations/representation.py @@ -18,7 +18,7 @@ # under the License. # # ------------------------------------------------------------- -from abc import abstractmethod +from systemds.scuro.utils.identifier import Identifier class Representation: @@ -26,6 +26,7 @@ def __init__(self, name, parameters): self.name = name self._parameters = parameters self.self_contained = True + self.representation_id = Identifier().new_id() @property def parameters(self): diff --git a/src/main/python/systemds/scuro/representations/resnet.py b/src/main/python/systemds/scuro/representations/resnet.py index f961cb4588a..7bb94d8bfde 100644 --- a/src/main/python/systemds/scuro/representations/resnet.py +++ b/src/main/python/systemds/scuro/representations/resnet.py @@ -32,17 +32,13 @@ from systemds.scuro.utils.static_variables import get_device -@register_representation( - [ModalityType.IMAGE, ModalityType.VIDEO, ModalityType.TIMESERIES] -) +@register_representation([ModalityType.IMAGE, ModalityType.VIDEO]) class ResNet(UnimodalRepresentation): - def __init__(self, layer="avgpool", model_name="ResNet18", output_file=None): + def __init__(self, model_name="ResNet18", layer="avgpool", output_file=None): self.data_type = torch.bfloat16 self.model_name = model_name parameters = self._get_parameters() - super().__init__( - "ResNet", ModalityType.TIMESERIES, parameters - ) # TODO: TIMESERIES only for videos - images would be handled as EMBEDDING + super().__init__("ResNet", ModalityType.EMBEDDING, parameters) self.output_file = output_file self.layer_name = layer @@ -95,7 +91,6 @@ def model_name(self, model_name): .to(get_device()) .to(self.data_type) ) - else: raise NotImplementedError diff --git a/src/main/python/systemds/scuro/representations/spectrogram.py b/src/main/python/systemds/scuro/representations/spectrogram.py index 51b69d7d87c..662fb0d627a 100644 --- a/src/main/python/systemds/scuro/representations/spectrogram.py +++ b/src/main/python/systemds/scuro/representations/spectrogram.py @@ -33,8 +33,8 @@ class Spectrogram(UnimodalRepresentation): def __init__(self, hop_length=512, n_fft=2048): parameters = {"hop_length": [256, 512, 1024, 2048], "n_fft": [1024, 2048, 4096]} super().__init__("Spectrogram", ModalityType.TIMESERIES, parameters, False) - self.hop_length = hop_length - self.n_fft = n_fft + self.hop_length = int(hop_length) + self.n_fft = int(n_fft) def transform(self, modality): transformed_modality = TransformedModality( @@ -44,8 +44,8 @@ def transform(self, modality): for i, sample in enumerate(modality.data): spectrogram = librosa.stft( - y=np.array(sample), hop_length=self.hop_length, n_fft=self.n_fft - ).astype(modality.data_type) + y=np.array(np.abs(sample)), hop_length=self.hop_length, n_fft=self.n_fft + ) S_dB = librosa.amplitude_to_db(np.abs(spectrogram)) result.append(S_dB.T) diff --git a/src/main/python/systemds/scuro/representations/tfidf.py b/src/main/python/systemds/scuro/representations/tfidf.py index 3b8f069df83..c82961949fe 100644 --- a/src/main/python/systemds/scuro/representations/tfidf.py +++ b/src/main/python/systemds/scuro/representations/tfidf.py @@ -34,7 +34,7 @@ class TfIdf(UnimodalRepresentation): def __init__(self, min_df=2, output_file=None): parameters = {"min_df": [min_df]} super().__init__("TF-IDF", ModalityType.EMBEDDING, parameters) - self.min_df = min_df + self.min_df = int(min_df) self.output_file = output_file def transform(self, modality): diff --git a/src/main/python/systemds/scuro/representations/window_aggregation.py b/src/main/python/systemds/scuro/representations/window_aggregation.py index b3ad9e1b934..c16f6d747fc 100644 --- a/src/main/python/systemds/scuro/representations/window_aggregation.py +++ b/src/main/python/systemds/scuro/representations/window_aggregation.py @@ -49,10 +49,10 @@ def aggregation_function(self, value): @register_context_operator() class WindowAggregation(Window): - def __init__(self, window_size=10, aggregation_function="mean", pad=False): + def __init__(self, aggregation_function="mean", window_size=10, pad=False): super().__init__("WindowAggregation", aggregation_function) self.parameters["window_size"] = [window_size] - self.window_size = window_size + self.window_size = int(window_size) self.pad = pad def execute(self, modality): @@ -141,10 +141,10 @@ def window_aggregate_nested_level(self, instance, new_length): @register_context_operator() class StaticWindow(Window): - def __init__(self, num_windows=100, aggregation_function="mean"): + def __init__(self, aggregation_function="mean", num_windows=100): super().__init__("StaticWindow", aggregation_function) self.parameters["num_windows"] = [num_windows] - self.num_windows = num_windows + self.num_windows = int(num_windows) def execute(self, modality): windowed_data = [] @@ -172,10 +172,10 @@ def execute(self, modality): @register_context_operator() class DynamicWindow(Window): - def __init__(self, num_windows=100, aggregation_function="mean"): + def __init__(self, aggregation_function="mean", num_windows=100): super().__init__("DynamicWindow", aggregation_function) self.parameters["num_windows"] = [num_windows] - self.num_windows = num_windows + self.num_windows = int(num_windows) def execute(self, modality): windowed_data = [] @@ -198,6 +198,7 @@ def execute(self, modality): ) output.append(val) start = end + windowed_data.append(output) return np.array(windowed_data) diff --git a/src/main/python/systemds/scuro/representations/word2vec.py b/src/main/python/systemds/scuro/representations/word2vec.py index 06e082fb695..837811935cd 100644 --- a/src/main/python/systemds/scuro/representations/word2vec.py +++ b/src/main/python/systemds/scuro/representations/word2vec.py @@ -41,16 +41,14 @@ def get_embedding(sentence, model): @register_representation(ModalityType.TEXT) class W2V(UnimodalRepresentation): - def __init__(self, vector_size=150, min_count=2, window=5, output_file=None): + def __init__(self, vector_size=150, min_count=1, output_file=None): parameters = { "vector_size": [vector_size], "min_count": [min_count], - "window": [window], - } # TODO + } super().__init__("Word2Vec", ModalityType.EMBEDDING, parameters) self.vector_size = vector_size self.min_count = min_count - self.window = window self.output_file = output_file def transform(self, modality): @@ -59,7 +57,6 @@ def transform(self, modality): model = Word2Vec( sentences=t, vector_size=self.vector_size, - window=self.window, min_count=self.min_count, ) embeddings = [] diff --git a/src/main/python/systemds/scuro/modality/modality_identifier.py b/src/main/python/systemds/scuro/utils/identifier.py similarity index 64% rename from src/main/python/systemds/scuro/modality/modality_identifier.py rename to src/main/python/systemds/scuro/utils/identifier.py index 5eeee7dc131..7b6802672cf 100644 --- a/src/main/python/systemds/scuro/modality/modality_identifier.py +++ b/src/main/python/systemds/scuro/utils/identifier.py @@ -18,7 +18,10 @@ # under the License. # # ------------------------------------------------------------- -class ModalityIdentifier: +import threading + + +class Identifier: """ """ _instance = None @@ -32,3 +35,27 @@ def __new__(cls): def new_id(self): # TODO: make threadsafe when parallelizing self.id += 1 return self.id + + +class IdGenerator: + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._ctr = 0 + cls._instance._ctr_lock = threading.Lock() + return cls._instance + + def next(self) -> int: + with self._instance._ctr_lock: + self._instance._ctr += 1 + n = self._instance._ctr + return n + + +get_op_id = IdGenerator().next +get_node_id = IdGenerator().next diff --git a/src/main/python/tests/scuro/test_data_loaders.py b/src/main/python/tests/scuro/test_data_loaders.py index 85da2919a04..fb07df71543 100644 --- a/src/main/python/tests/scuro/test_data_loaders.py +++ b/src/main/python/tests/scuro/test_data_loaders.py @@ -66,7 +66,6 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): - print("Cleaning up test data") shutil.rmtree(cls.test_file_path) def test_load_audio_data_from_file(self): diff --git a/src/main/python/tests/scuro/test_dr_search.py b/src/main/python/tests/scuro/test_dr_search.py deleted file mode 100644 index 3e0e702e6f3..00000000000 --- a/src/main/python/tests/scuro/test_dr_search.py +++ /dev/null @@ -1,174 +0,0 @@ -# ------------------------------------------------------------- -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# ------------------------------------------------------------- -import os -import shutil -import unittest - -import numpy as np -from sklearn import svm -from sklearn.metrics import classification_report -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import MinMaxScaler - -from systemds.scuro.modality.type import ModalityType -from systemds.scuro.drsearch.dr_search import DRSearch -from systemds.scuro.drsearch.task import Task -from systemds.scuro.models.model import Model -from systemds.scuro.representations.average import Average -from systemds.scuro.representations.bert import Bert -from systemds.scuro.representations.concatenation import Concatenation -from systemds.scuro.representations.lstm import LSTM -from systemds.scuro.representations.max import RowMax -from systemds.scuro.representations.mel_spectrogram import MelSpectrogram -from systemds.scuro.representations.hadamard import Hadamard -from systemds.scuro.representations.resnet import ResNet -from systemds.scuro.representations.sum import Sum -from tests.scuro.data_generator import ModalityRandomDataGenerator - - -import warnings - -warnings.filterwarnings("always") - - -class TestSVM(Model): - def __init__(self): - super().__init__("Test") - - def fit(self, X, y, X_test, y_test): - self.clf = svm.SVC(C=1, gamma="scale", kernel="rbf", verbose=False) - self.clf = self.clf.fit(X, np.array(y)) - y_pred = self.clf.predict(X) - - return classification_report( - y, y_pred, output_dict=True, digits=3, zero_division=1 - )["accuracy"] - - def test(self, test_X: np.ndarray, test_y: np.ndarray): - y_pred = self.clf.predict(np.array(test_X)) # noqa - - return classification_report( - np.array(test_y), y_pred, output_dict=True, digits=3, zero_division=1 - )["accuracy"] - - -def scale_data(data, train_indizes): - data = np.array(data).reshape(len(data), -1) - scaler = MinMaxScaler(feature_range=(0, 1)) - scaler.fit(data[train_indizes]) - return scaler.transform(data) - - -class TestDataLoaders(unittest.TestCase): - train_indizes = None - val_indizes = None - test_file_path = None - mods = None - text = None - audio = None - video = None - data_generator = None - num_instances = 0 - representations = None - - @classmethod - def setUpClass(cls): - cls.num_instances = 20 - cls.data_generator = ModalityRandomDataGenerator() - - cls.labels = ModalityRandomDataGenerator().create_balanced_labels( - num_instances=cls.num_instances - ) - # TODO: adapt the representation so they return non aggregated values. Apply windowing operation instead - - cls.video = cls.data_generator.create1DModality( - cls.num_instances, 100, ModalityType.VIDEO - ) - cls.text = cls.data_generator.create1DModality( - cls.num_instances, 100, ModalityType.TEXT - ) - cls.audio = cls.data_generator.create1DModality( - cls.num_instances, 100, ModalityType.AUDIO - ) - - cls.mods = [cls.video, cls.audio, cls.text] - - split = train_test_split( - np.array(range(cls.num_instances)), - cls.labels, - test_size=0.2, - random_state=42, - ) - cls.train_indizes, cls.val_indizes = [int(i) for i in split[0]], [ - int(i) for i in split[1] - ] - - for m in cls.mods: - m.data = scale_data(m.data, cls.train_indizes) - - cls.representations = [ - Concatenation(), - Average(), - RowMax(), - Hadamard(), - Sum(), - LSTM(width=256, depth=3), - ] - - def test_enumerate_all(self): - task = Task( - "TestTask", - TestSVM(), - self.labels, - self.train_indizes, - self.val_indizes, - ) - dr_search = DRSearch(self.mods, task, self.representations) - best_representation, best_score, best_modalities = dr_search.fit_enumerate_all() - - for r in dr_search.scores.values(): - for scores in r.values(): - assert scores[1] <= best_score - - def test_enumerate_all_vs_random(self): - task = Task( - "TestTask", - TestSVM(), - self.labels, - self.train_indizes, - self.val_indizes, - ) - dr_search = DRSearch(self.mods, task, self.representations) - best_representation_enum, best_score_enum, best_modalities_enum = ( - dr_search.fit_enumerate_all() - ) - - dr_search.reset_best_params() - - best_representation_rand, best_score_rand, best_modalities_rand = ( - dr_search.fit_random(seed=42) - ) - - assert best_score_rand <= best_score_enum - - -if __name__ == "__main__": - unittest.main() diff --git a/src/main/python/tests/scuro/test_hp_tuner.py b/src/main/python/tests/scuro/test_hp_tuner.py new file mode 100644 index 00000000000..73aab4493d3 --- /dev/null +++ b/src/main/python/tests/scuro/test_hp_tuner.py @@ -0,0 +1,244 @@ +# ------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------- + +import unittest + +import numpy as np +from sklearn import svm +from sklearn.metrics import classification_report +from sklearn.model_selection import train_test_split + +from systemds.scuro.drsearch.multimodal_optimizer import MultimodalOptimizer +from systemds.scuro.representations.average import Average +from systemds.scuro.representations.concatenation import Concatenation +from systemds.scuro.representations.lstm import LSTM +from systemds.scuro.drsearch.operator_registry import Registry +from systemds.scuro.models.model import Model +from systemds.scuro.drsearch.task import Task +from systemds.scuro.drsearch.unimodal_optimizer import UnimodalOptimizer + +from systemds.scuro.representations.spectrogram import Spectrogram +from systemds.scuro.representations.covarep_audio_features import ( + ZeroCrossing, + Spectral, + Pitch, +) +from systemds.scuro.representations.word2vec import W2V +from systemds.scuro.representations.bow import BoW +from systemds.scuro.modality.unimodal_modality import UnimodalModality +from systemds.scuro.representations.resnet import ResNet +from tests.scuro.data_generator import ModalityRandomDataGenerator, TestDataLoader + +from systemds.scuro.modality.type import ModalityType +from systemds.scuro.drsearch.hyperparameter_tuner import HyperparameterTuner + + +class TestSVM(Model): + def __init__(self): + super().__init__("TestSVM") + + def fit(self, X, y, X_test, y_test): + if X.ndim > 2: + X = X.reshape(X.shape[0], -1) + self.clf = svm.SVC(C=1, gamma="scale", kernel="rbf", verbose=False) + self.clf = self.clf.fit(X, np.array(y)) + y_pred = self.clf.predict(X) + + return classification_report( + y, y_pred, output_dict=True, digits=3, zero_division=1 + )["accuracy"] + + def test(self, test_X: np.ndarray, test_y: np.ndarray): + if test_X.ndim > 2: + test_X = test_X.reshape(test_X.shape[0], -1) + y_pred = self.clf.predict(np.array(test_X)) # noqa + + return classification_report( + np.array(test_y), y_pred, output_dict=True, digits=3, zero_division=1 + )["accuracy"] + + +class TestSVM2(Model): + def __init__(self): + super().__init__("TestSVM2") + + def fit(self, X, y, X_test, y_test): + if X.ndim > 2: + X = X.reshape(X.shape[0], -1) + self.clf = svm.SVC(C=1, gamma="scale", kernel="rbf", verbose=False) + self.clf = self.clf.fit(X, np.array(y)) + y_pred = self.clf.predict(X) + + return classification_report( + y, y_pred, output_dict=True, digits=3, zero_division=1 + )["accuracy"] + + def test(self, test_X: np.ndarray, test_y: np.ndarray): + if test_X.ndim > 2: + test_X = test_X.reshape(test_X.shape[0], -1) + y_pred = self.clf.predict(np.array(test_X)) # noqa + + return classification_report( + np.array(test_y), y_pred, output_dict=True, digits=3, zero_division=1 + )["accuracy"] + + +from unittest.mock import patch + + +class TestHPTuner(unittest.TestCase): + data_generator = None + num_instances = 0 + + @classmethod + def setUpClass(cls): + cls.num_instances = 10 + cls.mods = [ModalityType.VIDEO, ModalityType.AUDIO, ModalityType.TEXT] + cls.labels = ModalityRandomDataGenerator().create_balanced_labels( + num_instances=cls.num_instances + ) + cls.indices = np.array(range(cls.num_instances)) + + split = train_test_split( + cls.indices, + cls.labels, + test_size=0.2, + random_state=42, + ) + cls.train_indizes, cls.val_indizes = [int(i) for i in split[0]], [ + int(i) for i in split[1] + ] + + cls.tasks = [ + Task( + "UnimodalRepresentationTask1", + TestSVM(), + cls.labels, + cls.train_indizes, + cls.val_indizes, + ), + Task( + "UnimodalRepresentationTask2", + TestSVM2(), + cls.labels, + cls.train_indizes, + cls.val_indizes, + ), + ] + + def test_hp_tuner_for_audio_modality(self): + audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( + self.num_instances, 3000 + ) + audio = UnimodalModality( + TestDataLoader( + self.indices, None, ModalityType.AUDIO, audio_data, np.float32, audio_md + ) + ) + + self.run_hp_for_modality([audio]) + + # def test_multimodal_hp_tuning(self): + # audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( + # self.num_instances, 3000 + # ) + # audio = UnimodalModality( + # TestDataLoader( + # self.indices, None, ModalityType.AUDIO, audio_data, np.float32, audio_md + # ) + # ) + # + # text_data, text_md = ModalityRandomDataGenerator().create_text_data( + # self.num_instances + # ) + # text = UnimodalModality( + # TestDataLoader( + # self.indices, None, ModalityType.TEXT, text_data, str, text_md + # ) + # ) + # + # self.run_hp_for_modality( + # [audio, text], multimodal=True, tune_unimodal_representations=True + # ) + # self.run_hp_for_modality( + # [audio, text], multimodal=True, tune_unimodal_representations=False + # ) + + def test_hp_tuner_for_text_modality(self): + text_data, text_md = ModalityRandomDataGenerator().create_text_data( + self.num_instances + ) + text = UnimodalModality( + TestDataLoader( + self.indices, None, ModalityType.TEXT, text_data, str, text_md + ) + ) + self.run_hp_for_modality([text]) + + def run_hp_for_modality( + self, modalities, multimodal=False, tune_unimodal_representations=False + ): + with patch.object( + Registry, + "_representations", + { + ModalityType.TEXT: [W2V, BoW], + ModalityType.AUDIO: [Spectrogram, ZeroCrossing, Spectral, Pitch], + ModalityType.TIMESERIES: [ResNet], + ModalityType.VIDEO: [ResNet], + ModalityType.EMBEDDING: [], + }, + ): + registry = Registry() + registry._fusion_operators = [Average, Concatenation, LSTM] + unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks, False) + unimodal_optimizer.optimize() + + hp = HyperparameterTuner( + modalities, self.tasks, unimodal_optimizer.operator_performance + ) + + if multimodal: + m_o = MultimodalOptimizer( + modalities, + unimodal_optimizer.operator_performance, + self.tasks, + debug=False, + min_modalities=2, + max_modalities=3, + ) + fusion_results = m_o.optimize() + + hp.tune_multimodal_representations( + fusion_results, + k=1, + optimize_unimodal=tune_unimodal_representations, + ) + + else: + hp.tune_unimodal_representations() + + assert len(hp.results) == len(self.tasks) + assert len(hp.results[self.tasks[0].model.name]) == 2 + + +if __name__ == "__main__": + unittest.main() diff --git a/src/main/python/tests/scuro/test_multimodal_fusion.py b/src/main/python/tests/scuro/test_multimodal_fusion.py index ae3ddedffb1..0925e47cf25 100644 --- a/src/main/python/tests/scuro/test_multimodal_fusion.py +++ b/src/main/python/tests/scuro/test_multimodal_fusion.py @@ -19,10 +19,7 @@ # # ------------------------------------------------------------- - -import shutil import unittest -from multiprocessing import freeze_support import numpy as np from sklearn import svm @@ -32,30 +29,22 @@ from systemds.scuro.drsearch.multimodal_optimizer import MultimodalOptimizer from systemds.scuro.drsearch.unimodal_optimizer import UnimodalOptimizer from systemds.scuro.representations.concatenation import Concatenation +from systemds.scuro.representations.lstm import LSTM from systemds.scuro.representations.average import Average -from systemds.scuro.drsearch.fusion_optimizer import FusionOptimizer from systemds.scuro.drsearch.operator_registry import Registry from systemds.scuro.models.model import Model from systemds.scuro.drsearch.task import Task -from systemds.scuro.drsearch.unimodal_representation_optimizer import ( - UnimodalRepresentationOptimizer, -) from systemds.scuro.representations.spectrogram import Spectrogram from systemds.scuro.representations.word2vec import W2V from systemds.scuro.modality.unimodal_modality import UnimodalModality from systemds.scuro.representations.resnet import ResNet from tests.scuro.data_generator import ( - setup_data, TestDataLoader, ModalityRandomDataGenerator, ) -from systemds.scuro.dataloader.audio_loader import AudioLoader -from systemds.scuro.dataloader.video_loader import VideoLoader -from systemds.scuro.dataloader.text_loader import TextLoader from systemds.scuro.modality.type import ModalityType - from unittest.mock import patch @@ -144,24 +133,17 @@ def test_multimodal_fusion(self): ) audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( - self.num_instances, 100 + self.num_instances, 1000 ) text_data, text_md = ModalityRandomDataGenerator().create_text_data( self.num_instances ) - video_data, video_md = ModalityRandomDataGenerator().create_visual_modality( - self.num_instances, 60 - ) + audio = UnimodalModality( TestDataLoader( self.indices, None, ModalityType.AUDIO, audio_data, np.float32, audio_md ) ) - video = UnimodalModality( - TestDataLoader( - self.indices, None, ModalityType.VIDEO, video_data, np.float32, video_md - ) - ) text = UnimodalModality( TestDataLoader( self.indices, None, ModalityType.TEXT, text_data, str, text_md @@ -180,48 +162,26 @@ def test_multimodal_fusion(self): }, ): registry = Registry() - registry._fusion_operators = [Average, Concatenation] - unimodal_optimizer = UnimodalOptimizer( - [audio, text, video], [task], debug=False - ) + registry._fusion_operators = [Average, Concatenation, LSTM] + unimodal_optimizer = UnimodalOptimizer([audio, text], [task], debug=False) unimodal_optimizer.optimize() unimodal_optimizer.operator_performance.get_k_best_results(audio, 2, task) - - multimodal_optimizer = MultimodalOptimizer( - [audio, text, video], + m_o = MultimodalOptimizer( + [audio, text], unimodal_optimizer.operator_performance, [task], debug=False, + min_modalities=2, + max_modalities=3, ) + fusion_results = m_o.optimize() - multimodal_optimizer.optimize() + best_results = sorted( + fusion_results[task.model.name], key=lambda x: x.val_score, reverse=True + )[:2] - assert ( - len(multimodal_optimizer.optimization_results.results["TestSVM"].keys()) - == 57 - ) - assert ( - len( - multimodal_optimizer.optimization_results.results["TestSVM"][ - "0_1_2_3_4_5" - ] - ) - == 62 - ) - assert ( - len( - multimodal_optimizer.optimization_results.results["TestSVM"][ - "3_4_5" - ] - ) - == 6 - ) - assert ( - len(multimodal_optimizer.optimization_results.results["TestSVM"]["0_1"]) - == 2 - ) + assert best_results[0].val_score >= best_results[1].val_score if __name__ == "__main__": - freeze_support() unittest.main() diff --git a/src/main/python/tests/scuro/test_operator_registry.py b/src/main/python/tests/scuro/test_operator_registry.py index 0d83d83bda8..b5fa4b01b4d 100644 --- a/src/main/python/tests/scuro/test_operator_registry.py +++ b/src/main/python/tests/scuro/test_operator_registry.py @@ -74,9 +74,9 @@ def test_video_representations_in_registry(self): # SwinVideoTransformer, ] - def test_timeseries_representations_in_registry(self): - registry = Registry() - assert registry.get_representations(ModalityType.TIMESERIES) == [ResNet] + # def test_timeseries_representations_in_registry(self): + # registry = Registry() + # assert registry.get_representations(ModalityType.TIMESERIES) == [ResNet] def test_text_representations_in_registry(self): registry = Registry() diff --git a/src/main/python/tests/scuro/test_unimodal_optimizer.py b/src/main/python/tests/scuro/test_unimodal_optimizer.py index a4952d29f94..e2f0378d584 100644 --- a/src/main/python/tests/scuro/test_unimodal_optimizer.py +++ b/src/main/python/tests/scuro/test_unimodal_optimizer.py @@ -39,6 +39,7 @@ Pitch, ) from systemds.scuro.representations.word2vec import W2V +from systemds.scuro.representations.bow import BoW from systemds.scuro.modality.unimodal_modality import UnimodalModality from systemds.scuro.representations.resnet import ResNet from tests.scuro.data_generator import ModalityRandomDataGenerator, TestDataLoader @@ -164,7 +165,7 @@ def test_unimodal_optimizer_for_text_modality(self): def test_unimodal_optimizer_for_video_modality(self): video_data, video_md = ModalityRandomDataGenerator().create_visual_modality( - self.num_instances, 60 + self.num_instances, 10, 10 ) video = UnimodalModality( TestDataLoader( @@ -178,7 +179,7 @@ def optimize_unimodal_representation_for_modality(self, modality): Registry, "_representations", { - ModalityType.TEXT: [W2V], + ModalityType.TEXT: [W2V, BoW], ModalityType.AUDIO: [Spectrogram, ZeroCrossing, Spectral, Pitch], ModalityType.TIMESERIES: [ResNet], ModalityType.VIDEO: [ResNet], @@ -201,6 +202,9 @@ def optimize_unimodal_representation_for_modality(self, modality): assert len(result) == 1 assert len(cached) == 1 + # Todo: Add a test with all representations at once + # Todo: Add test with only one model + if __name__ == "__main__": unittest.main() diff --git a/src/main/python/tests/scuro/test_unimodal_representations.py b/src/main/python/tests/scuro/test_unimodal_representations.py index 52bca501ace..8c8e9baa2d4 100644 --- a/src/main/python/tests/scuro/test_unimodal_representations.py +++ b/src/main/python/tests/scuro/test_unimodal_representations.py @@ -19,8 +19,6 @@ # # ------------------------------------------------------------- -import os -import shutil import unittest import copy import numpy as np @@ -37,20 +35,14 @@ from systemds.scuro.representations.word2vec import W2V from systemds.scuro.representations.tfidf import TfIdf from systemds.scuro.modality.unimodal_modality import UnimodalModality -from systemds.scuro.representations.bert import Bert from systemds.scuro.representations.mel_spectrogram import MelSpectrogram from systemds.scuro.representations.mfcc import MFCC from systemds.scuro.representations.resnet import ResNet from systemds.scuro.representations.swin_video_transformer import SwinVideoTransformer -from tests.scuro.data_generator import setup_data from tests.scuro.data_generator import ( - setup_data, TestDataLoader, ModalityRandomDataGenerator, ) -from systemds.scuro.dataloader.audio_loader import AudioLoader -from systemds.scuro.dataloader.video_loader import VideoLoader -from systemds.scuro.dataloader.text_loader import TextLoader from systemds.scuro.modality.type import ModalityType @@ -78,7 +70,7 @@ def test_audio_representations(self): ZeroCrossing(), RMSE(), Pitch(), - ] # TODO: add FFT, TFN, 1DCNN + ] audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( self.num_instances, 1000 ) @@ -120,7 +112,7 @@ def test_video_representations(self): assert r.data[0].ndim == 2 def test_text_representations(self): - test_representations = [BoW(2, 2), W2V(5, 2, 2), TfIdf(2), Bert()] + test_representations = [BoW(2, 2), TfIdf(), W2V()] text_data, text_md = ModalityRandomDataGenerator().create_text_data( self.num_instances ) diff --git a/src/main/python/tests/scuro/test_window_operations.py b/src/main/python/tests/scuro/test_window_operations.py index 9aab25a8148..d98e9ff4f3b 100644 --- a/src/main/python/tests/scuro/test_window_operations.py +++ b/src/main/python/tests/scuro/test_window_operations.py @@ -53,7 +53,7 @@ def test_static_window(self): md, ) ) - aggregated_window = modality.context(StaticWindow(num_windows)) + aggregated_window = modality.context(StaticWindow(num_windows=num_windows)) for i in range(0, self.num_instances): assert len(aggregated_window.data[i]) == num_windows @@ -71,7 +71,7 @@ def test_dynamic_window(self): md, ) ) - aggregated_window = modality.context(DynamicWindow(num_windows)) + aggregated_window = modality.context(DynamicWindow(num_windows=num_windows)) for i in range(0, self.num_instances): assert len(aggregated_window.data[i]) == num_windows