diff --git a/adi_function_app/README.md b/adi_function_app/README.md index 722734f..a127fcc 100644 --- a/adi_function_app/README.md +++ b/adi_function_app/README.md @@ -91,7 +91,7 @@ This method takes the detected figures, and crops them out of the page to save t `update_figure_description` is used to update the original Markdown content with the description and meaning of the figure. -##### clean_adi_markdown +##### build_and_clean_markdown_for_response This method performs the final cleaning of the Markdown contents. In this method, the section headings and page numbers are extracted for the content to be returned to the indexer. diff --git a/adi_function_app/adi_2_ai_search.py b/adi_function_app/adi_2_ai_search.py index 947f35f..7e8ff27 100644 --- a/adi_function_app/adi_2_ai_search.py +++ b/adi_function_app/adi_2_ai_search.py @@ -17,12 +17,16 @@ import concurrent.futures import json from openai import AsyncAzureOpenAI +from typing import Union import openai from environment import IdentityType, get_identity_type -def clean_adi_markdown( - markdown_text: str, page_no: int = None, remove_irrelevant_figures=False +def build_and_clean_markdown_for_response( + markdown_text: str, + figures: dict, + page_no: int = None, + remove_irrelevant_figures=False, ): """Clean Markdown text extracted by the Azure Document Intelligence service. @@ -62,28 +66,33 @@ def clean_adi_markdown( output_dict["content"] = cleaned_text output_dict["sections"] = doc_metadata + output_dict["figures"] = figures + # add page number when chunk by page is enabled if page_no is not None: - output_dict["page_number"] = page_no + output_dict["pageNumber"] = page_no return output_dict -def update_figure_description(md_content, img_description, offset, length): +def update_figure_description( + md_content: str, figure_id: str, img_description: str, offset: int, length: int +): """ Updates the figure description in the Markdown content. Args: md_content (str): The original Markdown content. img_description (str): The new description for the image. - idx (int): The index of the figure. + offset (int): Position offset in the text. + length (int): Length of the original figure in the text. Returns: str: The updated Markdown content with the new figure description. """ # Define the new string to replace the old content - new_string = f'' + new_string = f'' # Calculate the end index of the content to be replaced end_index = offset + length @@ -131,9 +140,9 @@ async def understand_image_with_gptv(image_base64, caption, tries_left=3): token_provider = None api_key = os.environ["OpenAI__ApiKey"] - system_prompt = """You are an expert in image analysis. Use your experience and skills to provided a detailed description of any provided images. You should FOCUS on what info can be inferred from the image and the meaning of the data inside the image. Draw actionable insights and conclusions from the image. + system_prompt = """You are an expert in technical image analysis. Your task is to provided analysis of images. You should FOCUS on what info can be inferred from the image and the meaning of the data inside the image. Draw actionable insights and conclusions from the image. Do not describe the image in a general way or describe the image in a way that is not useful for decision-making. - If the image is a chart for instance, you should describe the data trends, patterns, and insights that can be drawn from the chart. + If the image is a chart for instance, you should describe the data trends, patterns, and insights that can be drawn from the chart. For example, you could describe the increase or decrease in sales over time, the peak sales period, or the sales performance of a particular product. If the image is a map, you should describe the geographical features, landmarks, and any other relevant information that can be inferred from the map. @@ -141,12 +150,14 @@ async def understand_image_with_gptv(image_base64, caption, tries_left=3): Include any data points, labels, and other relevant information that can be inferred from the image. + Provide a well-structured, detailed, and actionable analysis of the image. Focus on extracting data and information that can be inferred from the image. + IMPORTANT: If the provided image is a logo or photograph, simply return 'Irrelevant Image'.""" - user_input = "Describe this image with technical analysis. Provide a well-structured, description." + user_input = "Perform technical analysis on this image. Provide a well-structured, description." if caption is not None and len(caption) > 0: - user_input += f" (note: it has image caption: {caption})" + user_input += f" (note: it has the following caption: {caption})" try: async with AsyncAzureOpenAI( @@ -244,7 +255,7 @@ async def process_figures_from_extracted_content( markdown_content: str, page_number: None | int = None, page_offset: int = 0, -) -> str: +) -> Union[str, dict]: """Process the figures extracted from the content using ADI and send them for analysis. Args: @@ -258,12 +269,13 @@ async def process_figures_from_extracted_content( Returns: -------- - str: The updated Markdown content with the figure descriptions.""" + str: The updated Markdown content with the figure descriptions. + dict: A mapping of the FigureId to the stored Uri in blob storage.""" - image_processing_datas = [] + figure_processing_datas = [] download_image_tasks = [] - image_understanding_tasks = [] - image_upload_tasks = [] + figure_understanding_tasks = [] + figure_upload_tasks = [] if result.figures: for figure in result.figures: @@ -290,8 +302,8 @@ async def process_figures_from_extracted_content( logging.info(f"Figure Caption: {caption}") - image_processing_datas.append( - (container, image_blob, caption, figure.spans[0]) + figure_processing_datas.append( + (figure.id, container, image_blob, caption, figure.spans[0]) ) break @@ -302,45 +314,59 @@ async def process_figures_from_extracted_content( storage_account_helper = await get_storage_account_helper() - for image_processing_data, response in zip(image_processing_datas, image_responses): - container, image_blob, caption, _ = image_processing_data + for figure_processing_data, response in zip( + figure_processing_datas, image_responses + ): + _, container, image_blob, caption, _ = figure_processing_data base_64_image = base64.b64encode(response).decode("utf-8") logging.info(f"Image Blob: {image_blob}") - image_understanding_tasks.append( + figure_understanding_tasks.append( understand_image_with_gptv(base_64_image, caption) ) image_data = base64.b64decode(base_64_image) - image_upload_tasks.append( + figure_upload_tasks.append( storage_account_helper.upload_blob( container, image_blob, image_data, "image/png" ) ) + figure_ids = [ + figure_processing_data[0] for figure_processing_data in figure_processing_datas + ] logging.info("Running image understanding tasks") - image_descriptions = await asyncio.gather(*image_understanding_tasks) + figure_descriptions = await asyncio.gather(*figure_understanding_tasks) logging.info("Finished image understanding tasks") - logging.info(f"Image Descriptions: {image_descriptions}") + logging.info(f"Image Descriptions: {figure_descriptions}") logging.info("Running image upload tasks") - await asyncio.gather(*image_upload_tasks) + figure_uris = await asyncio.gather(*figure_upload_tasks) logging.info("Finished image upload tasks") + figures = [ + {"figureId": figure_id, "figureUri": figure_uri} + for figure_id, figure_uri in zip(figure_ids, figure_uris) + ] + running_offset = 0 - for image_processing_data, image_description in zip( - image_processing_datas, image_descriptions + for figure_processing_data, figure_description in zip( + figure_processing_datas, figure_descriptions ): - _, _, _, figure_span = image_processing_data + figure_id, _, _, _, figure_span = figure_processing_data starting_offset = figure_span.offset + running_offset - page_offset markdown_content, desc_offset = update_figure_description( - markdown_content, image_description, starting_offset, figure_span.length + markdown_content, + figure_id, + figure_description, + starting_offset, + figure_span.length, ) running_offset += desc_offset - return markdown_content + return markdown_content, figures def create_page_wise_content(result: AnalyzeResult) -> list: @@ -359,12 +385,12 @@ def create_page_wise_content(result: AnalyzeResult) -> list: page_numbers = [] page_offsets = [] - for page_number, page in enumerate(result.pages): + for page in result.pages: page_content = result.content[ page.spans[0]["offset"] : page.spans[0]["offset"] + page.spans[0]["length"] ] page_wise_content.append(page_content) - page_numbers.append(page_number + 1) + page_numbers.append(page.page_number) page_offsets.append(page.spans[0]["offset"]) return page_wise_content, page_numbers, page_offsets @@ -570,9 +596,13 @@ async def process_adi_2_ai_search(record: dict, chunk_by_page: bool = False) -> with concurrent.futures.ProcessPoolExecutor() as executor: futures = { executor.submit( - clean_adi_markdown, page_content, page_number, True - ): page_content - for page_content, page_number in zip( + build_and_clean_markdown_for_response, + extracted_page_content[0], + extracted_page_content[1], + page_number, + True, + ): extracted_page_content + for extracted_page_content, page_number in zip( content_with_figures, page_numbers ) } @@ -582,7 +612,10 @@ async def process_adi_2_ai_search(record: dict, chunk_by_page: bool = False) -> else: markdown_content = result.content - content_with_figures = await process_figures_from_extracted_content( + ( + extracted_content, + figures, + ) = await process_figures_from_extracted_content( result, operation_id, container_and_blob, @@ -591,8 +624,8 @@ async def process_adi_2_ai_search(record: dict, chunk_by_page: bool = False) -> page_number=None, ) - cleaned_result = clean_adi_markdown( - content_with_figures, remove_irrelevant_figures=True + cleaned_result = build_and_clean_markdown_for_response( + extracted_content, figures, remove_irrelevant_figures=True ) except Exception as e: logging.error(e) diff --git a/adi_function_app/pre_embedding_cleaner.py b/adi_function_app/pre_embedding_cleaner.py index 005954e..9e3d97b 100644 --- a/adi_function_app/pre_embedding_cleaner.py +++ b/adi_function_app/pre_embedding_cleaner.py @@ -8,6 +8,7 @@ nltk.download("punkt") nltk.download("stopwords") +nltk.download("punkt_tab") def get_section(cleaned_text: str) -> list: @@ -29,9 +30,9 @@ def get_section(cleaned_text: str) -> list: def clean_sections(sections: list) -> list: """Cleans the sections by removing special characters and extra white spaces.""" - cleaned_sections = [re.sub(r"[=#]", "", match).strip() for match in sections] + cleanedSections = [re.sub(r"[=#]", "", match).strip() for match in sections] - return cleaned_sections + return cleanedSections def remove_markdown_tags(text: str, tag_patterns: dict) -> str: @@ -123,19 +124,17 @@ async def process_pre_embedding_cleaner(record: dict) -> dict: # scenarios when page by chunking is enabled if isinstance(record["data"]["chunk"], dict): - cleaned_record["data"]["cleaned_chunk"] = clean_text( + cleaned_record["data"]["cleanedChunk"] = clean_text( record["data"]["chunk"]["content"] ) cleaned_record["data"]["chunk"] = record["data"]["chunk"]["content"] - cleaned_record["data"]["cleaned_sections"] = clean_sections( + cleaned_record["data"]["cleanedSections"] = clean_sections( record["data"]["chunk"]["sections"] ) else: - cleaned_record["data"]["cleaned_chunk"] = clean_text( - record["data"]["chunk"] - ) + cleaned_record["data"]["cleanedChunk"] = clean_text(record["data"]["chunk"]) cleaned_record["data"]["chunk"] = record["data"]["chunk"] - cleaned_record["data"]["cleaned_sections"] = get_section( + cleaned_record["data"]["cleanedSections"] = get_section( record["data"]["chunk"] ) diff --git a/adi_function_app/storage_account.py b/adi_function_app/storage_account.py index 5289260..015a144 100644 --- a/adi_function_app/storage_account.py +++ b/adi_function_app/storage_account.py @@ -58,7 +58,10 @@ async def upload_blob( Args: container (str): The container of the blob. blob (str): The blob name. - data (bytes): The data to upload.""" + data (bytes): The data to upload. + + Returns: + str: url of the uploaded blob.""" logging.info("Uploading Blob...") logging.info(f"Container: {container}") @@ -76,6 +79,8 @@ async def upload_blob( content_type=content_type, ) + return blob_client.url + async def download_blob_to_temp_dir( self, source: str, container: str, target_file_name ) -> tuple[str, dict]: diff --git a/deploy_ai_search/ai_search.py b/deploy_ai_search/ai_search.py index 2d9a650..0ea69ff 100644 --- a/deploy_ai_search/ai_search.py +++ b/deploy_ai_search/ai_search.py @@ -194,16 +194,13 @@ def get_data_source(self) -> SearchIndexerDataSourceConnection: return data_source_connection - def get_pre_embedding_cleaner_skill( - self, context, source, target_name="cleaned_chunk" - ) -> WebApiSkill: + def get_pre_embedding_cleaner_skill(self, context, source) -> WebApiSkill: """Get the custom skill for data cleanup. Args: ----- context (str): The context of the skill - inputs (List[InputFieldMappingEntry]): The inputs of the skill - outputs (List[OutputFieldMappingEntry]): The outputs of the skill + source (str): The source of the skill Returns: -------- @@ -221,10 +218,10 @@ def get_pre_embedding_cleaner_skill( ] pre_embedding_cleaner_skill_outputs = [ - OutputFieldMappingEntry(name="cleaned_chunk", target_name=target_name), + OutputFieldMappingEntry(name="cleanedChunk", target_name="cleanedChunk"), OutputFieldMappingEntry(name="chunk", target_name="chunk"), OutputFieldMappingEntry( - name="cleaned_sections", target_name="cleaned_sections" + name="cleanedSections", target_name="cleanedSections" ), ] diff --git a/deploy_ai_search/rag_documents.py b/deploy_ai_search/rag_documents.py index 1fb03ac..dba2645 100644 --- a/deploy_ai_search/rag_documents.py +++ b/deploy_ai_search/rag_documents.py @@ -19,6 +19,7 @@ SearchIndexerIndexProjectionsParameters, IndexProjectionMode, SimpleField, + ComplexField, BlobIndexerDataToExtract, IndexerExecutionEnvironment, ) @@ -96,6 +97,27 @@ def get_index_fields(self) -> list[SearchableField]: filterable=True, facetable=True, ), + ComplexField( + name="Figures", + collection=True, + fields=[ + SimpleField( + name="FigureId", + type=SearchFieldDataType.String, + collection=True, + ), + SimpleField( + name="FigureUri", + type=SearchFieldDataType.String, + collection=True, + ), + ], + ), + SimpleField( + name="DateLastModified", + type=SearchFieldDataType.DateTimeOffset, + filterable=True, + ), ] if self.enable_page_by_chunking: @@ -152,11 +174,11 @@ def get_skills(self) -> list: ) key_phrase_extraction_skill = self.get_key_phrase_extraction_skill( - "/document/pages/*", "/document/pages/*/cleaned_chunk" + "/document/pages/*", "/document/pages/*/cleanedChunk" ) embedding_skill = self.get_vector_skill( - "/document/pages/*", "/document/pages/*/cleaned_chunk" + "/document/pages/*", "/document/pages/*/cleanedChunk" ) if self.enable_page_by_chunking: @@ -191,7 +213,22 @@ def get_index_projections(self) -> SearchIndexerIndexProjections: name="Keywords", source="/document/pages/*/keywords" ), InputFieldMappingEntry( - name="Sections", source="/document/pages/*/cleaned_sections" + name="Sections", source="/document/pages/*/cleanedSections" + ), + InputFieldMappingEntry( + name="Figures", + source_context="/document/pages/*/figures/*", + inputs=[ + InputFieldMappingEntry( + name="FigureId", source="/document/pages/*/figures/*/figureId" + ), + InputFieldMappingEntry( + name="FigureUri", source="/document/pages/*/figures/*/figureUri" + ), + ], + ), + InputFieldMappingEntry( + name="DateLastModified", source="/document/DateLastModified" ), ] @@ -199,7 +236,7 @@ def get_index_projections(self) -> SearchIndexerIndexProjections: mappings.extend( [ InputFieldMappingEntry( - name="PageNumber", source="/document/pages/*/page_number" + name="PageNumber", source="/document/pages/*/pageNumber" ) ] ) @@ -269,6 +306,10 @@ def get_indexer(self) -> SearchIndexer: source_field_name="metadata_storage_path", target_field_name="SourceUri", ), + FieldMapping( + source_field_name="metadata_storage_last_modified", + target_field_name="DateLastModified", + ), ], parameters=indexer_parameters, ) diff --git a/deploy_ai_search/text_2_sql.py b/deploy_ai_search/text_2_sql.py index 38a592e..614aea4 100644 --- a/deploy_ai_search/text_2_sql.py +++ b/deploy_ai_search/text_2_sql.py @@ -94,6 +94,11 @@ def get_index_fields(self) -> list[SearchableField]: collection=True, hidden=True, ), # This is needed to enable semantic searching against the column names as complex field types are not used. + SimpleField( + name="DateLastModified", + type=SearchFieldDataType.DateTimeOffset, + filterable=True, + ), ] return fields @@ -177,6 +182,12 @@ def get_indexer(self) -> SearchIndexer: target_index_name=self.index_name, data_source_name=self.data_source_name, schedule=schedule, + field_mappings=[ + FieldMapping( + source_field_name="metadata_storage_last_modified", + target_field_name="DateLastModified", + ) + ], output_field_mappings=[ FieldMapping( source_field_name="/document/Entity", target_field_name="Entity" @@ -201,6 +212,9 @@ def get_indexer(self) -> SearchIndexer: source_field_name="/document/Columns/*/Name", target_field_name="ColumnNames", ), + FieldMapping( + name="DateLastModified", source="/document/DateLastModified" + ), ], parameters=indexer_parameters, ) diff --git a/deploy_ai_search/text_2_sql_query_cache.py b/deploy_ai_search/text_2_sql_query_cache.py index 9dc0c9c..26cb0c4 100644 --- a/deploy_ai_search/text_2_sql_query_cache.py +++ b/deploy_ai_search/text_2_sql_query_cache.py @@ -97,6 +97,11 @@ def get_index_fields(self) -> list[SearchableField]: ), ], ), + SimpleField( + name="DateLastModified", + type=SearchFieldDataType.DateTimeOffset, + filterable=True, + ), ] return fields diff --git a/text_2_sql/ai_search.py b/text_2_sql/ai_search.py index 201158c..c63022a 100644 --- a/text_2_sql/ai_search.py +++ b/text_2_sql/ai_search.py @@ -9,6 +9,7 @@ import os import logging import base64 +from datetime import datetime, timezone async def run_ai_search_query( @@ -87,6 +88,8 @@ async def add_entry_to_index(document: dict, vector_fields: dict, index_name: st fields_to_embed = {field: document[field] for field in vector_fields} + document["DateLastModified"] = datetime.now(timezone.utc) + async with AsyncAzureOpenAI( # This is the default and can be omitted api_key=os.environ["OpenAI__ApiKey"],