diff --git a/api/data_pipeline.py b/api/data_pipeline.py index 3a11065f8..5e1f5fa47 100644 --- a/api/data_pipeline.py +++ b/api/data_pipeline.py @@ -847,6 +847,21 @@ def prepare_db_index(self, embedder_type: str = None, is_ollama_embedder: bool = Returns: List[Document]: List of Document objects """ + def _embedding_vector_length(doc: Document) -> int: + vector = getattr(doc, "vector", None) + if vector is None: + return 0 + try: + if hasattr(vector, "shape"): + if len(vector.shape) == 0: + return 0 + return int(vector.shape[-1]) + if hasattr(vector, "__len__"): + return int(len(vector)) + except Exception: + return 0 + return 0 + # Handle backward compatibility if embedder_type is None and is_ollama_embedder is not None: embedder_type = 'ollama' if is_ollama_embedder else None @@ -857,8 +872,24 @@ def prepare_db_index(self, embedder_type: str = None, is_ollama_embedder: bool = self.db = LocalDB.load_state(self.repo_paths["save_db_file"]) documents = self.db.get_transformed_data(key="split_and_embed") if documents: - logger.info(f"Loaded {len(documents)} documents from existing database") - return documents + lengths = [_embedding_vector_length(doc) for doc in documents] + non_empty = sum(1 for n in lengths if n > 0) + empty = len(lengths) - non_empty + sample_sizes = sorted({n for n in lengths if n > 0})[:3] + logger.info( + "Loaded %s documents from existing database (embeddings: %s non-empty, %s empty; sample_dims=%s)", + len(documents), + non_empty, + empty, + sample_sizes, + ) + + if non_empty == 0: + logger.warning( + "Existing database contains no usable embeddings. Rebuilding embeddings..." + ) + else: + return documents except Exception as e: logger.error(f"Error loading existing database: {e}") # Continue to create a new database diff --git a/api/google_embedder_client.py b/api/google_embedder_client.py index b604fd8ed..dd15dd146 100644 --- a/api/google_embedder_client.py +++ b/api/google_embedder_client.py @@ -88,42 +88,62 @@ def parse_embedding_response(self, response) -> EmbedderOutput: from adalflow.core.types import Embedding embedding_data = [] + + def _extract_embedding_value(obj): + if obj is None: + return None + if isinstance(obj, dict): + if "embedding" in obj: + return obj.get("embedding") + if "embeddings" in obj: + return obj.get("embeddings") + if hasattr(obj, "embedding"): + return getattr(obj, "embedding") + if hasattr(obj, "embeddings"): + return getattr(obj, "embeddings") + for method_name in ("model_dump", "to_dict", "dict"): + if hasattr(obj, method_name): + try: + dumped = getattr(obj, method_name)() + if isinstance(dumped, dict): + if "embedding" in dumped: + return dumped.get("embedding") + if "embeddings" in dumped: + return dumped.get("embeddings") + except Exception: + pass + return None - if isinstance(response, dict): - if 'embedding' in response: - embedding_value = response['embedding'] - if isinstance(embedding_value, list) and len(embedding_value) > 0: - # Check if it's a single embedding (list of floats) or batch (list of lists) - if isinstance(embedding_value[0], (int, float)): - # Single embedding response: {'embedding': [float, ...]} - embedding_data = [Embedding(embedding=embedding_value, index=0)] - else: - # Batch embeddings response: {'embedding': [[float, ...], [float, ...], ...]} - embedding_data = [ - Embedding(embedding=emb_list, index=i) - for i, emb_list in enumerate(embedding_value) - ] - else: - log.warning(f"Empty or invalid embedding data: {embedding_value}") - embedding_data = [] - elif 'embeddings' in response: - # Alternative batch format: {'embeddings': [{'embedding': [float, ...]}, ...]} + embedding_value = _extract_embedding_value(response) + if embedding_value is None: + log.warning("Unexpected embedding response type/structure: %s", type(response)) + embedding_data = [] + elif isinstance(embedding_value, list) and len(embedding_value) > 0: + if isinstance(embedding_value[0], (int, float)): + embedding_data = [Embedding(embedding=embedding_value, index=0)] + elif isinstance(embedding_value[0], list): embedding_data = [ - Embedding(embedding=item['embedding'], index=i) - for i, item in enumerate(response['embeddings']) + Embedding(embedding=emb_list, index=i) + for i, emb_list in enumerate(embedding_value) + if isinstance(emb_list, list) and len(emb_list) > 0 ] else: - log.warning(f"Unexpected response structure: {response.keys()}") - embedding_data = [] - elif hasattr(response, 'embeddings'): - # Custom batch response object from our implementation - embedding_data = [ - Embedding(embedding=emb, index=i) - for i, emb in enumerate(response.embeddings) - ] + extracted = [] + for item in embedding_value: + item_emb = _extract_embedding_value(item) + if isinstance(item_emb, list) and len(item_emb) > 0: + extracted.append(item_emb) + embedding_data = [ + Embedding(embedding=emb_list, index=i) + for i, emb_list in enumerate(extracted) + ] else: - log.warning(f"Unexpected response type: {type(response)}") + log.warning("Empty or invalid embedding data parsed from response") embedding_data = [] + + if embedding_data: + first_dim = len(embedding_data[0].embedding) if embedding_data[0].embedding is not None else 0 + log.info("Parsed %s embedding(s) (dim=%s)", len(embedding_data), first_dim) return EmbedderOutput( data=embedding_data, @@ -201,7 +221,16 @@ def call(self, api_kwargs: Dict = {}, model_type: ModelType = ModelType.UNDEFINE if model_type != ModelType.EMBEDDER: raise ValueError(f"GoogleEmbedderClient only supports EMBEDDER model type") - log.info(f"Google AI Embeddings API kwargs: {api_kwargs}") + safe_log_kwargs = {k: v for k, v in api_kwargs.items() if k not in {"content", "contents"}} + if "content" in api_kwargs: + safe_log_kwargs["content_chars"] = len(str(api_kwargs.get("content", ""))) + if "contents" in api_kwargs: + try: + contents = api_kwargs.get("contents") + safe_log_kwargs["contents_count"] = len(contents) if hasattr(contents, "__len__") else None + except Exception: + safe_log_kwargs["contents_count"] = None + log.info("Google AI Embeddings call kwargs (sanitized): %s", safe_log_kwargs) try: # Use embed_content for single text or batch embedding