diff --git a/text_2_sql/data_dictionary/.env b/text_2_sql/data_dictionary/.env index 4ecd14c..e5cca6f 100644 --- a/text_2_sql/data_dictionary/.env +++ b/text_2_sql/data_dictionary/.env @@ -6,5 +6,9 @@ OpenAI__ApiVersion= Text2Sql__DatabaseEngine= Text2Sql__DatabaseName= Text2Sql__DatabaseConnectionString= +Text2Sql__Snowflake__User= +Text2Sql__Snowflake__Password= +Text2Sql__Snowflake__Account= +Text2Sql__Snowflake__Warehouse= IdentityType= # system_assigned or user_assigned or key -ClientId= +ClientId= diff --git a/text_2_sql/data_dictionary/README.md b/text_2_sql/data_dictionary/README.md index d6d6dc4..9ebba79 100644 --- a/text_2_sql/data_dictionary/README.md +++ b/text_2_sql/data_dictionary/README.md @@ -93,8 +93,13 @@ A full data dictionary must be built for all the views / tables you which to exp Manually creating the `entities.json` is a time consuming exercise. To speed up generation, a mixture of SQL Queries and an LLM can be used to generate a initial version. Existing comments and descriptions in the database, can be combined with sample values to generate the necessary descriptions. Manual input can then be used to tweak it for the use case and any improvements. -`data_dictionary_creator.py` contains a utility class that handles the automatic generation and selection of schemas from the source SQL database. It must be subclassed to the appropriate engine. - -`sql_server_data_dictionary_creator.py` contains a subclassed version of `data_dictionary_creator.py` that implements the SQL Server specific functionality to extract the entities. +`data_dictionary_creator.py` contains a utility class that handles the automatic generation and selection of schemas from the source SQL database. It must be subclassed to the appropriate engine to handle engine specific queries and connection details. See `./generated_samples/` for an example output of the script. This can then be automatically indexed with the provided indexer for the **Vector-Based Approach**. + +The following Databases have pre-built scripts for them: + +- **Microsoft SQL Server:** `sql_server_data_dictionary_creator.py` +- **Snowflake:** `snowflake_data_dictionary_creator.py` + +If there is no pre-built script for your database engine, take one of the above as a starting point and adjust it. diff --git a/text_2_sql/data_dictionary/data_dictionary_creator.py b/text_2_sql/data_dictionary/data_dictionary_creator.py index 8de5e9e..24212a7 100644 --- a/text_2_sql/data_dictionary/data_dictionary_creator.py +++ b/text_2_sql/data_dictionary/data_dictionary_creator.py @@ -161,6 +161,7 @@ def __init__( self, entities: list[str] = None, excluded_entities: list[str] = None, + excluded_schemas: list[str] = None, single_file: bool = False, generate_definitions: bool = True, ): @@ -169,12 +170,14 @@ def __init__( Args: entities (list[str], optional): A list of entities to extract. Defaults to None. If None, all entities are extracted. excluded_entities (list[str], optional): A list of entities to exclude. Defaults to None. + excluded_schemas (list[str], optional): A list of schemas to exclude. Defaults to None. single_file (bool, optional): A flag to indicate if the data dictionary should be saved to a single file. Defaults to False. generate_definitions (bool, optional): A flag to indicate if definitions should be generated. Defaults to True. """ self.entities = entities self.excluded_entities = excluded_entities + self.excluded_schemas = excluded_schemas self.single_file = single_file self.generate_definitions = generate_definitions @@ -381,6 +384,7 @@ async def extract_entities_with_definitions(self) -> list[EntityItem]: entity for entity in all_entities if entity.entity not in self.excluded_entities + and entity.entity_schema not in self.excluded_schemas ] # Add warehouse and database to entities diff --git a/text_2_sql/data_dictionary/requirements.txt b/text_2_sql/data_dictionary/requirements.txt index becad18..903dd0c 100644 --- a/text_2_sql/data_dictionary/requirements.txt +++ b/text_2_sql/data_dictionary/requirements.txt @@ -3,4 +3,5 @@ azure-identity python-dotenv pydantic openai +snowflake-connector-python networkx diff --git a/text_2_sql/data_dictionary/snowflake_data_dictionary_creator.py b/text_2_sql/data_dictionary/snowflake_data_dictionary_creator.py new file mode 100644 index 0000000..e01c0c2 --- /dev/null +++ b/text_2_sql/data_dictionary/snowflake_data_dictionary_creator.py @@ -0,0 +1,140 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +from data_dictionary_creator import DataDictionaryCreator, EntityItem +import asyncio +import snowflake.connector +import logging +import os + + +class SnowflakeDataDictionaryCreator(DataDictionaryCreator): + def __init__( + self, + entities: list[str] = None, + excluded_entities: list[str] = None, + single_file: bool = False, + ): + """A method to initialize the DataDictionaryCreator class. + + Args: + entities (list[str], optional): A list of entities to extract. Defaults to None. If None, all entities are extracted. + excluded_entities (list[str], optional): A list of entities to exclude. Defaults to None. + single_file (bool, optional): A flag to indicate if the data dictionary should be saved to a single file. Defaults to False. + """ + if excluded_entities is None: + excluded_entities = [] + + excluded_schemas = ["INFORMATION_SCHEMA"] + return super().__init__( + entities, excluded_entities, excluded_schemas, single_file + ) + + """A class to extract data dictionary information from a Snowflake database.""" + + @property + def extract_table_entities_sql_query(self) -> str: + """A property to extract table entities from a Snowflake database.""" + return """SELECT + t.TABLE_NAME AS Entity, + t.TABLE_SCHEMA AS EntitySchema, + t.COMMENT AS Definition + FROM + INFORMATION_SCHEMA.TABLES t""" + + @property + def extract_view_entities_sql_query(self) -> str: + """A property to extract view entities from a Snowflake database.""" + return """SELECT + v.TABLE_NAME AS Entity, + v.TABLE_SCHEMA AS EntitySchema, + v.COMMENT AS Definition + FROM + INFORMATION_SCHEMA.VIEWS v""" + + def extract_columns_sql_query(self, entity: EntityItem) -> str: + """A property to extract column information from a Snowflake database.""" + return f"""SELECT + COLUMN_NAME AS Name, + DATA_TYPE AS Type, + COMMENT AS Definition + FROM + INFORMATION_SCHEMA.COLUMNS + WHERE + TABLE_SCHEMA = '{entity.entity_schema}' + AND TABLE_NAME = '{entity.name}';""" + + @property + def extract_entity_relationships_sql_query(self) -> str: + """A property to extract entity relationships from a SQL Server database.""" + return """SELECT + tc.table_schema AS EntitySchema, + tc.table_name AS Entity, + rc.unique_constraint_schema AS ForeignEntitySchema, + rc.unique_constraint_name AS ForeignEntityConstraint, + rc.constraint_name AS ForeignKeyConstraint + FROM + information_schema.referential_constraints rc + JOIN + information_schema.table_constraints tc + ON rc.constraint_schema = tc.constraint_schema + AND rc.constraint_name = tc.constraint_name + WHERE + tc.constraint_type = 'FOREIGN KEY' + ORDER BY + EntitySchema, Entity, ForeignEntitySchema, ForeignEntityConstraint; + """ + + async def query_entities( + self, sql_query: str, cast_to: any = None + ) -> list[EntityItem]: + """A method to query a database for entities using Snowflake Connector. Overrides the base class method. + + Args: + sql_query (str): The SQL query to run. + cast_to (any, optional): The class to cast the results to. Defaults to None. + + Returns: + list[EntityItem]: The list of entities. + """ + logging.info(f"Running query: {sql_query}") + results = [] + + # Create a connection to Snowflake, without specifying a schema + conn = snowflake.connector.connect( + user=os.environ["Text2Sql__Snowflake__User"], + password=os.environ["Text2Sql__Snowflake__Password"], + account=os.environ["Text2Sql__Snowflake__Account"], + warehouse=os.environ["Text2Sql__Snowflake__Warehouse"], + database=os.environ["Text2Sql__DatabaseName"], + ) + + try: + # Using the connection to create a cursor + cursor = conn.cursor() + + # Execute the query + await asyncio.to_thread(cursor.execute, sql_query) + + # Fetch column names + columns = [col[0] for col in cursor.description] + + # Fetch rows + rows = await asyncio.to_thread(cursor.fetchall) + + # Process rows + for row in rows: + if cast_to: + results.append(cast_to.from_sql_row(row, columns)) + else: + results.append(dict(zip(columns, row))) + + finally: + cursor.close() + conn.close() + + return results + + +if __name__ == "__main__": + data_dictionary_creator = SnowflakeDataDictionaryCreator() + asyncio.run(data_dictionary_creator.create_data_dictionary()) diff --git a/text_2_sql/data_dictionary/sql_sever_data_dictionary_creator.py b/text_2_sql/data_dictionary/sql_sever_data_dictionary_creator.py index ed7a9c8..235ff00 100644 --- a/text_2_sql/data_dictionary/sql_sever_data_dictionary_creator.py +++ b/text_2_sql/data_dictionary/sql_sever_data_dictionary_creator.py @@ -22,10 +22,8 @@ def __init__( if excluded_entities is None: excluded_entities = [] - excluded_entities.extend( - ["dbo.BuildVersion", "dbo.ErrorLog", "sys.database_firewall_rules"] - ) - super().__init__(entities, excluded_entities, single_file) + excluded_schemas = ["dbo", "sys"] + super().__init__(entities, excluded_entities, excluded_schemas, single_file) self.database = os.environ["Text2Sql__DatabaseName"] """A class to extract data dictionary information from a SQL Server database.""" @@ -34,53 +32,53 @@ def __init__( def extract_table_entities_sql_query(self) -> str: """A property to extract table entities from a SQL Server database.""" return """SELECT - t.TABLE_NAME AS Entity, - t.TABLE_SCHEMA AS EntitySchema, - CAST(ep.value AS NVARCHAR(500)) AS Definition -FROM - INFORMATION_SCHEMA.TABLES t -LEFT JOIN - sys.extended_properties ep - ON ep.major_id = OBJECT_ID(t.TABLE_SCHEMA + '.' + t.TABLE_NAME) - AND ep.minor_id = 0 - AND ep.class = 1 - AND ep.name = 'MS_Description' -WHERE - t.TABLE_TYPE = 'BASE TABLE';""" + t.TABLE_NAME AS Entity, + t.TABLE_SCHEMA AS EntitySchema, + CAST(ep.value AS NVARCHAR(500)) AS Definition + FROM + INFORMATION_SCHEMA.TABLES t + LEFT JOIN + sys.extended_properties ep + ON ep.major_id = OBJECT_ID(t.TABLE_SCHEMA + '.' + t.TABLE_NAME) + AND ep.minor_id = 0 + AND ep.class = 1 + AND ep.name = 'MS_Description' + WHERE + t.TABLE_TYPE = 'BASE TABLE';""" @property def extract_view_entities_sql_query(self) -> str: """A property to extract view entities from a SQL Server database.""" return """SELECT - v.TABLE_NAME AS Entity, - v.TABLE_SCHEMA AS EntitySchema, - CAST(ep.value AS NVARCHAR(500)) AS Definition -FROM - INFORMATION_SCHEMA.VIEWS v -LEFT JOIN - sys.extended_properties ep - ON ep.major_id = OBJECT_ID(v.TABLE_SCHEMA + '.' + v.TABLE_NAME) - AND ep.minor_id = 0 - AND ep.class = 1 + v.TABLE_NAME AS Entity, + v.TABLE_SCHEMA AS EntitySchema, + CAST(ep.value AS NVARCHAR(500)) AS Definition + FROM + INFORMATION_SCHEMA.VIEWS v + LEFT JOIN + sys.extended_properties ep + ON ep.major_id = OBJECT_ID(v.TABLE_SCHEMA + '.' + v.TABLE_NAME) + AND ep.minor_id = 0 + AND ep.class = 1 AND ep.name = 'MS_Description';""" def extract_columns_sql_query(self, entity: EntityItem) -> str: """A property to extract column information from a SQL Server database.""" return f"""SELECT - c.COLUMN_NAME AS Name, - c.DATA_TYPE AS DataType, - CAST(ep.value AS NVARCHAR(500)) AS Definition -FROM - INFORMATION_SCHEMA.COLUMNS c -LEFT JOIN - sys.extended_properties ep - ON ep.major_id = OBJECT_ID(c.TABLE_SCHEMA + '.' + c.TABLE_NAME) - AND ep.minor_id = c.ORDINAL_POSITION - AND ep.class = 1 - AND ep.name = 'MS_Description' -WHERE - c.TABLE_SCHEMA = '{entity.entity_schema}' - AND c.TABLE_NAME = '{entity.name}';""" + c.COLUMN_NAME AS Name, + c.DATA_TYPE AS DataType, + CAST(ep.value AS NVARCHAR(500)) AS Definition + FROM + INFORMATION_SCHEMA.COLUMNS c + LEFT JOIN + sys.extended_properties ep + ON ep.major_id = OBJECT_ID(c.TABLE_SCHEMA + '.' + c.TABLE_NAME) + AND ep.minor_id = c.ORDINAL_POSITION + AND ep.class = 1 + AND ep.name = 'MS_Description' + WHERE + c.TABLE_SCHEMA = '{entity.entity_schema}' + AND c.TABLE_NAME = '{entity.name}';""" @property def extract_entity_relationships_sql_query(self) -> str: