Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion text_2_sql/data_dictionary/.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ OpenAI__ApiVersion=<openAIApiVersion>
Text2Sql__DatabaseEngine=<databaseEngine>
Text2Sql__DatabaseName=<databaseName>
Text2Sql__DatabaseConnectionString=<databaseConnectionString>
Text2Sql__Snowflake__User=<snowflakeUser if using Snowflake Data Source>
Text2Sql__Snowflake__Password=<snowflakePassword if using Snowflake Data Source>
Text2Sql__Snowflake__Account=<snowflakeAccount if using Snowflake Data Source>
Text2Sql__Snowflake__Warehouse=<snowflakeWarehouse if using Snowflake Data Source>
IdentityType=<identityType> # system_assigned or user_assigned or key
ClientId=<clientId if using user assigned identity>
ClientId=<clientId if using user assigned identity>
11 changes: 8 additions & 3 deletions text_2_sql/data_dictionary/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ A full data dictionary must be built for all the views / tables you which to exp

Manually creating the `entities.json` is a time consuming exercise. To speed up generation, a mixture of SQL Queries and an LLM can be used to generate a initial version. Existing comments and descriptions in the database, can be combined with sample values to generate the necessary descriptions. Manual input can then be used to tweak it for the use case and any improvements.

`data_dictionary_creator.py` contains a utility class that handles the automatic generation and selection of schemas from the source SQL database. It must be subclassed to the appropriate engine.

`sql_server_data_dictionary_creator.py` contains a subclassed version of `data_dictionary_creator.py` that implements the SQL Server specific functionality to extract the entities.
`data_dictionary_creator.py` contains a utility class that handles the automatic generation and selection of schemas from the source SQL database. It must be subclassed to the appropriate engine to handle engine specific queries and connection details.

See `./generated_samples/` for an example output of the script. This can then be automatically indexed with the provided indexer for the **Vector-Based Approach**.

The following Databases have pre-built scripts for them:

- **Microsoft SQL Server:** `sql_server_data_dictionary_creator.py`
- **Snowflake:** `snowflake_data_dictionary_creator.py`

If there is no pre-built script for your database engine, take one of the above as a starting point and adjust it.
4 changes: 4 additions & 0 deletions text_2_sql/data_dictionary/data_dictionary_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def __init__(
self,
entities: list[str] = None,
excluded_entities: list[str] = None,
excluded_schemas: list[str] = None,
single_file: bool = False,
generate_definitions: bool = True,
):
Expand All @@ -169,12 +170,14 @@ def __init__(
Args:
entities (list[str], optional): A list of entities to extract. Defaults to None. If None, all entities are extracted.
excluded_entities (list[str], optional): A list of entities to exclude. Defaults to None.
excluded_schemas (list[str], optional): A list of schemas to exclude. Defaults to None.
single_file (bool, optional): A flag to indicate if the data dictionary should be saved to a single file. Defaults to False.
generate_definitions (bool, optional): A flag to indicate if definitions should be generated. Defaults to True.
"""

self.entities = entities
self.excluded_entities = excluded_entities
self.excluded_schemas = excluded_schemas
self.single_file = single_file
self.generate_definitions = generate_definitions

Expand Down Expand Up @@ -381,6 +384,7 @@ async def extract_entities_with_definitions(self) -> list[EntityItem]:
entity
for entity in all_entities
if entity.entity not in self.excluded_entities
and entity.entity_schema not in self.excluded_schemas
]

# Add warehouse and database to entities
Expand Down
1 change: 1 addition & 0 deletions text_2_sql/data_dictionary/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ azure-identity
python-dotenv
pydantic
openai
snowflake-connector-python
networkx
140 changes: 140 additions & 0 deletions text_2_sql/data_dictionary/snowflake_data_dictionary_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from data_dictionary_creator import DataDictionaryCreator, EntityItem
import asyncio
import snowflake.connector
import logging
import os


class SnowflakeDataDictionaryCreator(DataDictionaryCreator):
def __init__(
self,
entities: list[str] = None,
excluded_entities: list[str] = None,
single_file: bool = False,
):
"""A method to initialize the DataDictionaryCreator class.

Args:
entities (list[str], optional): A list of entities to extract. Defaults to None. If None, all entities are extracted.
excluded_entities (list[str], optional): A list of entities to exclude. Defaults to None.
single_file (bool, optional): A flag to indicate if the data dictionary should be saved to a single file. Defaults to False.
"""
if excluded_entities is None:
excluded_entities = []

excluded_schemas = ["INFORMATION_SCHEMA"]
return super().__init__(
entities, excluded_entities, excluded_schemas, single_file
)

"""A class to extract data dictionary information from a Snowflake database."""

@property
def extract_table_entities_sql_query(self) -> str:
"""A property to extract table entities from a Snowflake database."""
return """SELECT
t.TABLE_NAME AS Entity,
t.TABLE_SCHEMA AS EntitySchema,
t.COMMENT AS Definition
FROM
INFORMATION_SCHEMA.TABLES t"""

@property
def extract_view_entities_sql_query(self) -> str:
"""A property to extract view entities from a Snowflake database."""
return """SELECT
v.TABLE_NAME AS Entity,
v.TABLE_SCHEMA AS EntitySchema,
v.COMMENT AS Definition
FROM
INFORMATION_SCHEMA.VIEWS v"""

def extract_columns_sql_query(self, entity: EntityItem) -> str:
"""A property to extract column information from a Snowflake database."""
return f"""SELECT
COLUMN_NAME AS Name,
DATA_TYPE AS Type,
COMMENT AS Definition
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_SCHEMA = '{entity.entity_schema}'
AND TABLE_NAME = '{entity.name}';"""

@property
def extract_entity_relationships_sql_query(self) -> str:
"""A property to extract entity relationships from a SQL Server database."""
return """SELECT
tc.table_schema AS EntitySchema,
tc.table_name AS Entity,
rc.unique_constraint_schema AS ForeignEntitySchema,
rc.unique_constraint_name AS ForeignEntityConstraint,
rc.constraint_name AS ForeignKeyConstraint
FROM
information_schema.referential_constraints rc
JOIN
information_schema.table_constraints tc
ON rc.constraint_schema = tc.constraint_schema
AND rc.constraint_name = tc.constraint_name
WHERE
tc.constraint_type = 'FOREIGN KEY'
ORDER BY
EntitySchema, Entity, ForeignEntitySchema, ForeignEntityConstraint;
"""

async def query_entities(
self, sql_query: str, cast_to: any = None
) -> list[EntityItem]:
"""A method to query a database for entities using Snowflake Connector. Overrides the base class method.

Args:
sql_query (str): The SQL query to run.
cast_to (any, optional): The class to cast the results to. Defaults to None.

Returns:
list[EntityItem]: The list of entities.
"""
logging.info(f"Running query: {sql_query}")
results = []

# Create a connection to Snowflake, without specifying a schema
conn = snowflake.connector.connect(
user=os.environ["Text2Sql__Snowflake__User"],
password=os.environ["Text2Sql__Snowflake__Password"],
account=os.environ["Text2Sql__Snowflake__Account"],
warehouse=os.environ["Text2Sql__Snowflake__Warehouse"],
database=os.environ["Text2Sql__DatabaseName"],
)

try:
# Using the connection to create a cursor
cursor = conn.cursor()

# Execute the query
await asyncio.to_thread(cursor.execute, sql_query)

# Fetch column names
columns = [col[0] for col in cursor.description]

# Fetch rows
rows = await asyncio.to_thread(cursor.fetchall)

# Process rows
for row in rows:
if cast_to:
results.append(cast_to.from_sql_row(row, columns))
else:
results.append(dict(zip(columns, row)))

finally:
cursor.close()
conn.close()

return results


if __name__ == "__main__":
data_dictionary_creator = SnowflakeDataDictionaryCreator()
asyncio.run(data_dictionary_creator.create_data_dictionary())
80 changes: 39 additions & 41 deletions text_2_sql/data_dictionary/sql_sever_data_dictionary_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ def __init__(
if excluded_entities is None:
excluded_entities = []

excluded_entities.extend(
["dbo.BuildVersion", "dbo.ErrorLog", "sys.database_firewall_rules"]
)
super().__init__(entities, excluded_entities, single_file)
excluded_schemas = ["dbo", "sys"]
super().__init__(entities, excluded_entities, excluded_schemas, single_file)
self.database = os.environ["Text2Sql__DatabaseName"]

"""A class to extract data dictionary information from a SQL Server database."""
Expand All @@ -34,53 +32,53 @@ def __init__(
def extract_table_entities_sql_query(self) -> str:
"""A property to extract table entities from a SQL Server database."""
return """SELECT
t.TABLE_NAME AS Entity,
t.TABLE_SCHEMA AS EntitySchema,
CAST(ep.value AS NVARCHAR(500)) AS Definition
FROM
INFORMATION_SCHEMA.TABLES t
LEFT JOIN
sys.extended_properties ep
ON ep.major_id = OBJECT_ID(t.TABLE_SCHEMA + '.' + t.TABLE_NAME)
AND ep.minor_id = 0
AND ep.class = 1
AND ep.name = 'MS_Description'
WHERE
t.TABLE_TYPE = 'BASE TABLE';"""
t.TABLE_NAME AS Entity,
t.TABLE_SCHEMA AS EntitySchema,
CAST(ep.value AS NVARCHAR(500)) AS Definition
FROM
INFORMATION_SCHEMA.TABLES t
LEFT JOIN
sys.extended_properties ep
ON ep.major_id = OBJECT_ID(t.TABLE_SCHEMA + '.' + t.TABLE_NAME)
AND ep.minor_id = 0
AND ep.class = 1
AND ep.name = 'MS_Description'
WHERE
t.TABLE_TYPE = 'BASE TABLE';"""

@property
def extract_view_entities_sql_query(self) -> str:
"""A property to extract view entities from a SQL Server database."""
return """SELECT
v.TABLE_NAME AS Entity,
v.TABLE_SCHEMA AS EntitySchema,
CAST(ep.value AS NVARCHAR(500)) AS Definition
FROM
INFORMATION_SCHEMA.VIEWS v
LEFT JOIN
sys.extended_properties ep
ON ep.major_id = OBJECT_ID(v.TABLE_SCHEMA + '.' + v.TABLE_NAME)
AND ep.minor_id = 0
AND ep.class = 1
v.TABLE_NAME AS Entity,
v.TABLE_SCHEMA AS EntitySchema,
CAST(ep.value AS NVARCHAR(500)) AS Definition
FROM
INFORMATION_SCHEMA.VIEWS v
LEFT JOIN
sys.extended_properties ep
ON ep.major_id = OBJECT_ID(v.TABLE_SCHEMA + '.' + v.TABLE_NAME)
AND ep.minor_id = 0
AND ep.class = 1
AND ep.name = 'MS_Description';"""

def extract_columns_sql_query(self, entity: EntityItem) -> str:
"""A property to extract column information from a SQL Server database."""
return f"""SELECT
c.COLUMN_NAME AS Name,
c.DATA_TYPE AS DataType,
CAST(ep.value AS NVARCHAR(500)) AS Definition
FROM
INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN
sys.extended_properties ep
ON ep.major_id = OBJECT_ID(c.TABLE_SCHEMA + '.' + c.TABLE_NAME)
AND ep.minor_id = c.ORDINAL_POSITION
AND ep.class = 1
AND ep.name = 'MS_Description'
WHERE
c.TABLE_SCHEMA = '{entity.entity_schema}'
AND c.TABLE_NAME = '{entity.name}';"""
c.COLUMN_NAME AS Name,
c.DATA_TYPE AS DataType,
CAST(ep.value AS NVARCHAR(500)) AS Definition
FROM
INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN
sys.extended_properties ep
ON ep.major_id = OBJECT_ID(c.TABLE_SCHEMA + '.' + c.TABLE_NAME)
AND ep.minor_id = c.ORDINAL_POSITION
AND ep.class = 1
AND ep.name = 'MS_Description'
WHERE
c.TABLE_SCHEMA = '{entity.entity_schema}'
AND c.TABLE_NAME = '{entity.name}';"""

@property
def extract_entity_relationships_sql_query(self) -> str:
Expand Down