diff --git a/adi_function_app/key_phrase_extraction.py b/adi_function_app/key_phrase_extraction.py index 01bb8aa..78b0327 100644 --- a/adi_function_app/key_phrase_extraction.py +++ b/adi_function_app/key_phrase_extraction.py @@ -1,84 +1,107 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. - import logging import json import os from azure.ai.textanalytics.aio import TextAnalyticsClient from azure.core.exceptions import HttpResponseError -from azure.core.credentials import AzureKeyCredential -import asyncio from azure.identity import DefaultAzureCredential -from environment import IdentityType, get_identity_type +from tenacity import retry +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_exponential +import asyncio MAX_TEXT_ELEMENTS = 5120 -def split_document(document: str, max_size: int) -> list[str]: - """Split a document into chunks of max_size. +def split_document(document, max_size): + """Split a document into chunks of max_size and filter out any empty strings Args: document (str): The document to split. - max_size (int): The maximum size of each chunk.""" - return [document[i : i + max_size] for i in range(0, len(document), max_size)] + max_size (int): The maximum size of each chunk. - -async def extract_key_phrases_from_text( - data: list[str], max_key_phrase_count: int, retries_left: int = 3 + Returns: + list: The list of document chunks.""" + return [ + document[i : i + max_size] + for i in range(0, len(document), max_size) + if len(document[i : i + max_size]) > 0 + ] + + +@retry( + reraise=True, + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), +) +async def extract_key_phrases_from_batch( + batch_data: list[str], max_key_phrase_count: int ) -> list[str]: - """Extract key phrases from the text. + """Extract key phrases from text using Azure AI services. Args: - data (list[str]): The text data. - max_key_phrase_count (int): The maximum number of key phrases to return. + batch_data (list[str]): The list of text to process. + max_key_phrase_count(int): no of keywords to return Returns: - list[str]: The key phrases extracted from the text.""" - logging.info("Python HTTP trigger function processed a request.") + list: The list of key phrases.""" key_phrase_list = [] - if get_identity_type() == IdentityType.SYSTEM_ASSIGNED: - credential = DefaultAzureCredential() - elif get_identity_type() == IdentityType.USER_ASSIGNED: - credential = DefaultAzureCredential( - managed_identity_client_id=os.environ.get("FunctionApp__ClientId") - ) - else: - credential = AzureKeyCredential(os.environ.get("AIService__Language__Key")) text_analytics_client = TextAnalyticsClient( - endpoint=os.environ.get("AIService__Language__Endpoint"), - credential=credential, + endpoint=os.environ["AIService__Services__Endpoint"], + credential=DefaultAzureCredential( + managed_identity_client_id=os.environ.get("FunctionApp__ClientId") + ), ) async with text_analytics_client: try: - # Split large documents - split_documents = [] - for doc in data: - if len(doc) > MAX_TEXT_ELEMENTS: - split_documents.extend(split_document(doc, MAX_TEXT_ELEMENTS)) - else: - split_documents.append(doc) - - result = await text_analytics_client.extract_key_phrases(split_documents) - for idx, doc in enumerate(result): + result = await text_analytics_client.extract_key_phrases(batch_data) + for doc in result: if not doc.is_error: key_phrase_list.extend(doc.key_phrases[:max_key_phrase_count]) else: - raise Exception(f"Document {idx} error: {doc.error}") + raise Exception(f"Document error: {doc.error}") except HttpResponseError as e: - if e.status_code == 429 and retries_left > 0: # Rate limiting error - wait_time = 2**retries_left # Exponential backoff - logging.info( - "%s Rate limit exceeded. Retrying in %s seconds...", e, wait_time - ) - await asyncio.sleep(wait_time) - return await extract_key_phrases_from_text( - data, max_key_phrase_count, retries_left - 1 - ) - else: - raise Exception(f"An error occurred: {e}") from e + logging.error("An error occurred: %s", e) + raise e + + return key_phrase_list + + +async def extract_key_phrases_from_text( + data: list[str], max_key_phrase_count: int +) -> list[str]: + """Extract key phrases from text using Azure AI services. + + Args: + data (list[str]): The list of text to process. + max_key_phrase_count(int): no of keywords to return""" + logging.info("Python HTTP trigger function processed a request.") + key_phrase_list = [] + + split_documents = [] + for doc in data: + if len(doc) > MAX_TEXT_ELEMENTS: + split_documents.extend(split_document(doc, MAX_TEXT_ELEMENTS)) + elif len(doc) > 0: + split_documents.append(doc) + + # Filter out any empty documents + split_documents = [doc for doc in split_documents if len(doc) > 0] + + for i in range(0, len(split_documents), 10): + key_phrase_list.extend( + await extract_key_phrases_from_batch( + split_documents[i : i + 10], max_key_phrase_count + ) + ) + + if len(key_phrase_list) > max_key_phrase_count: + key_phrase_list = key_phrase_list[:max_key_phrase_count] + break return key_phrase_list @@ -105,26 +128,40 @@ async def process_key_phrase_extraction( "errors": None, "warnings": None, } - extracted_record["data"]["key_phrases"] = await extract_key_phrases_from_text( + extracted_record["data"]["keyPhrases"] = await extract_key_phrases_from_text( [record["data"]["text"]], max_key_phrase_count ) - except Exception as inner_e: - logging.error("key phrase extraction Error: %s", inner_e) - logging.error( - "Failed to extract key phrase. Check function app logs for more details of exact failure." - ) - return { - "recordId": record["recordId"], - "data": {}, - "errors": [ - { - "message": "Failed to extract key phrase. Check function app logs for more details of exact failure." - } - ], - "warnings": None, - } - else: - json_str = json.dumps(extracted_record, indent=4) - - logging.info(f"key phrase extraction output: {json_str}") - return extracted_record + except Exception as e: + logging.error("key phrase extraction Error: %s", e) + await asyncio.sleep(10) + try: + extracted_record = { + "recordId": record["recordId"], + "data": {}, + "errors": None, + "warnings": None, + } + extracted_record["data"][ + "keyPhrases" + ] = await extract_key_phrases_from_text( + [record["data"]["text"]], max_key_phrase_count + ) + except Exception as inner_e: + logging.error("key phrase extraction Error: %s", inner_e) + logging.error( + "Failed to extract key phrase. Check function app logs for more details of exact failure." + ) + return { + "recordId": record["recordId"], + "data": {}, + "errors": [ + { + "message": "Failed to extract key phrase. Check function app logs for more details of exact failure." + } + ], + "warnings": None, + } + json_str = json.dumps(extracted_record, indent=4) + + logging.info(f"key phrase extraction output: {json_str}") + return extracted_record diff --git a/adi_function_app/pre_embedding_cleaner.py b/adi_function_app/pre_embedding_cleaner.py index f6f0a87..ad49231 100644 --- a/adi_function_app/pre_embedding_cleaner.py +++ b/adi_function_app/pre_embedding_cleaner.py @@ -2,13 +2,7 @@ # Licensed under the MIT License. import logging import json -import nltk import re -from nltk.tokenize import word_tokenize - -nltk.download("punkt") -nltk.download("stopwords") -nltk.download("punkt_tab") def get_section(cleaned_text: str) -> list: @@ -69,38 +63,28 @@ def clean_text(src_text: str) -> str: str: The clean text.""" try: + logging.info(f"Input text: {src_text}") + if len(src_text) == 0: + logging.error("Input text is empty") + raise ValueError("Input text is empty") + # Define specific patterns for each tag tag_patterns = { - "figurecontent": r"", + "figurecontent": r"", "figure": r"
(.*?)
", "figures": r"\(figures/\d+\)(.*?)\(figures/\d+\)", "figcaption": r"
(.*?)
", } cleaned_text = remove_markdown_tags(src_text, tag_patterns) - # remove html tags - cleaned_text = re.sub(r"<.*?>", "", cleaned_text) - - # Replace newline characters with spaces - cleaned_text = re.sub(r"\n", " ", cleaned_text) - - # Replace multiple whitespace characters with a single space - cleaned_text = re.sub(r"\s+", " ", cleaned_text) - - # remove stopwords - tokens = word_tokenize(cleaned_text, "english") - stop_words = nltk.corpus.stopwords.words("english") - filtered_tokens = [word for word in tokens if word not in stop_words] - cleaned_text = " ".join(filtered_tokens) - - # remove special characters - cleaned_text = re.sub(r"[^a-zA-Z\s]", "", cleaned_text) - - # remove extra white spaces - cleaned_text = " ".join([word for word in cleaned_text.split()]) + # Updated regex to keep Unicode letters, punctuation, whitespace, currency symbols, and percentage signs, + # while also removing non-printable characters + cleaned_text = re.sub(r"[^\p{L}\p{P}\s\p{Sc}%\x20-\x7E]", "", cleaned_text) - # case normalization - cleaned_text = cleaned_text.lower() + logging.info(f"Cleaned text: {cleaned_text}") + if len(cleaned_text) == 0: + logging.error("Cleaned text is empty") + raise ValueError("Cleaned text is empty") except Exception as e: logging.error(f"An error occurred in clean_text: {e}") return "" diff --git a/adi_function_app/requirements.txt b/adi_function_app/requirements.txt index 2be1da8..b97a6d6 100644 --- a/adi_function_app/requirements.txt +++ b/adi_function_app/requirements.txt @@ -9,7 +9,7 @@ pandas azure-identity openpyxl regex -nltk==3.9.1 +tenacity bs4 azure-search azure-search-documents