Complete reference documentation for the DataBridge fluent API.
The main orchestration class that provides access to all DataBridge functionality.
from src.database.datamodel_service import DataBridge
bridge = DataBridge(db_conn=None, logger=None)| Parameter | Type | Description | Required |
|---|---|---|---|
db_conn |
pyodbc.Connection |
Database connection object | No |
logger |
logging.Logger |
Logger instance for operation tracking | No |
Returns a fluent schema discovery builder for extracting database metadata.
schema = bridge.discover_schema().from_database().build()Returns a fluent query generation builder for creating optimized SQL.
result = bridge.generate_query().select_all().where({...}).build()Returns a fluent export builder for multi-format schema serialization.
bridge.export_schema().to_yaml('output.yaml')Builder class for fluent schema discovery operations.
Extract schema from the connected database.
schema = bridge.discover_schema().from_database().build()Include only specified tables in the schema.
Parameters:
tables(str|list): Table name(s) to include
# Single table
schema = bridge.discover_schema().only_tables('users').build()
# Multiple tables
schema = bridge.discover_schema().only_tables(['users', 'orders']).build()Aliases: include_tables()
Exclude specified tables from the schema.
Parameters:
tables(str|list): Table name(s) to exclude
schema = bridge.discover_schema().exclude_tables(['temp_data', 'logs']).build()Aliases: without_tables()
Include only specified database schemas.
Parameters:
schemas(str|list): Schema name(s) to include
schema = bridge.discover_schema().only_schemas(['dbo', 'sales']).build()Exclude specified database schemas.
Parameters:
schemas(str|list): Schema name(s) to exclude
schema = bridge.discover_schema().exclude_schemas(['test', 'temp']).build()Aliases: without_schemas()
Include tables matching regex patterns.
Parameters:
patterns(str|list): Regex pattern(s) to match
# Single pattern
schema = bridge.discover_schema().matching_pattern(r'^user_.*').build()
# Multiple patterns
schema = bridge.discover_schema().matching_pattern([r'^user_.*', r'^order_.*']).build()Exclude tables matching regex patterns.
Parameters:
patterns(str|list): Regex pattern(s) to exclude
schema = bridge.discover_schema().excluding_pattern([r'^temp_.*', r'.*_backup$']).build()Aliases: without_pattern()
Generic include method supporting multiple target types.
Parameters:
target(str|list): Target to focus ontype(str): Type of target ('tables', 'schemas', 'patterns')
schema = bridge.discover_schema().focus_on('customers').build()
schema = bridge.discover_schema().focus_on('dbo', 'schemas').build()
schema = bridge.discover_schema().focus_on(r'^core_.*', 'patterns').build()Generic exclude method supporting multiple target types.
Parameters:
target(str|list): Target to ignoretype(str): Type of target ('tables', 'schemas', 'patterns')
schema = bridge.discover_schema().ignore(['temp_data', 'logs']).build()
schema = bridge.discover_schema().ignore('test', 'schemas').build()Include foreign key relationships from the database.
schema = (bridge.discover_schema()
.from_database()
.with_relationships_from_database()
.build())Include relationships from a CSV file.
Parameters:
file_path(str): Path to CSV relationships file
CSV Format:
table,parent,relation,parent_column,child_column
orders,users,many-to-one,user_id,user_idschema = (bridge.discover_schema()
.from_database()
.with_relationships_from_csv('data/relationships.csv')
.build())Include relationships from an XML file.
Parameters:
file_path(str): Path to XML relationships file
schema = (bridge.discover_schema()
.from_database()
.with_relationships_from_xml('data/relationships.xml')
.build())Execute the schema discovery and return the result.
Returns: SchemaDTO object containing discovered schema
schema = bridge.discover_schema().from_database().build()Builder class for fluent query generation operations.
Generate SELECT * queries for all relevant tables.
result = bridge.generate_query().select_all().build()Add WHERE clause filtering to the query.
Parameters:
filter_spec(dict): Filter specification
Filter Formats:
-
Table-Column List Format:
result = bridge.generate_query().where({'users': ['user_id', 'email']}).build()
-
Table.Column Value Format:
result = bridge.generate_query().where({'users.user_id': 'specific_value'}).build()
Limit query to specific tables.
Parameters:
tables(str|list): Table name(s) to include
result = (bridge.generate_query()
.select_all()
.only_from_tables(['customers', 'orders'])
.build())Exclude specific tables from query.
Parameters:
tables(str|list): Table name(s) to exclude
result = (bridge.generate_query()
.select_all()
.excluding_tables(['temp_data', 'logs'])
.build())Aliases: without_tables()
Include JOIN clauses based on relationships.
result = (bridge.generate_query()
.select_all()
.where({'users': ['user_id']})
.with_joins()
.build())Enable query optimization with index analysis.
result = (bridge.generate_query()
.select_all()
.where({'orders': ['customer_id', 'order_date']})
.optimize_with_indexes()
.build())Use a specific schema for query generation.
Parameters:
schema(SchemaDTO): Pre-discovered schema to use
custom_schema = bridge.discover_schema().only_tables(['users']).build()
result = (bridge.generate_query()
.select_all()
.using_schema(custom_schema)
.build())Limit query to tables present in the filtered schema.
result = (bridge.generate_query()
.select_all()
.limit_to_filtered_schema()
.build())Execute the query generation and return the result.
Returns: Dictionary containing:
query(str): Generated SQL queryparameters(list): Query parameterstables_used(list): Tables included in queryindex_recommendations(list): Index optimization suggestions (if enabled)
result = bridge.generate_query().select_all().where({...}).build()
print(result['query'])Builder class for fluent schema export operations.
Export schema to YAML format.
Parameters:
file_path(str): Output file path
bridge.export_schema().to_yaml('output/schema.yaml')Export schema to XML format.
Parameters:
file_path(str): Output file path**options: Export options (include_indexes, etc.)
bridge.export_schema().to_xml('output/schema.xml', include_indexes=True)Export schema to JSON format.
Parameters:
file_path(str): Output file path
bridge.export_schema().to_json('output/schema.json')Container for complete schema information.
| Property | Type | Description |
|---|---|---|
database_name |
str | Name of the source database |
tables |
dict | Dictionary of table name -> TableDTO |
relationships |
list | List of RelationshipDTO objects |
extraction_timestamp |
datetime | When the schema was extracted |
schema = bridge.discover_schema().from_database().build()
print(f"Database: {schema.database_name}")
print(f"Tables: {len(schema.tables)}")
print(f"Relationships: {len(schema.relationships)}")Container for table metadata.
| Property | Type | Description |
|---|---|---|
table_name |
str | Name of the table |
columns |
dict | Dictionary of column name -> ColumnDTO |
indexes |
list | List of IndexDTO objects |
primary_keys |
list | List of primary key column names |
for table_name, table in schema.tables.items():
print(f"Table: {table.table_name}")
print(f"Columns: {len(table.columns)}")
print(f"Primary Keys: {table.primary_keys}")Container for column metadata.
| Property | Type | Description |
|---|---|---|
column_name |
str | Name of the column |
data_type |
str | SQL data type |
max_length |
int | Maximum length (for string types) |
precision |
int | Numeric precision |
scale |
int | Numeric scale |
is_nullable |
bool | Whether column allows NULL |
is_primary_key |
bool | Whether column is part of primary key |
Container for table relationship information.
| Property | Type | Description |
|---|---|---|
child_table |
str | Child table name |
parent_table |
str | Parent table name |
relationship_type |
str | Type of relationship |
columns |
list | List of RelationshipColumnDTO objects |
Container for index information.
| Property | Type | Description |
|---|---|---|
index_name |
str | Name of the index |
table_name |
str | Table the index belongs to |
index_type |
str | Type of index (CLUSTERED, NONCLUSTERED, etc.) |
columns |
list | List of indexed column names |
is_unique |
bool | Whether index enforces uniqueness |
For quick operations without fluent builders:
One-liner schema discovery.
Parameters:
include_db_relationships(bool): Include database relationshipscsv_relationships_path(str): Path to CSV relationships filexml_relationships_path(str): Path to XML relationships filetable_filter(list): Tables to includeschema_filter(list): Schemas to include
schema = bridge.bridge_schema(
include_db_relationships=True,
csv_relationships_path='data/relationships.csv'
)One-liner query generation.
Parameters:
filter_spec(dict): Query filter specificationoptimize(bool): Enable optimizationinclude_joins(bool): Include JOIN clauses
result = bridge.bridge_query(
{'users': ['user_id']},
optimize=True,
include_joins=True
)One-liner schema export.
Parameters:
format_type(str): Export format ('yaml', 'xml', 'json')output_path(str): Output file pathschema(SchemaDTO): Schema to export (optional)
bridge.bridge_to_format('yaml', 'output/schema.yaml')Raised when database connection fails.
try:
schema = bridge.discover_schema().from_database().build()
except ConnectionError as e:
print(f"Database connection failed: {e}")Raised when relationship files are not found.
try:
schema = (bridge.discover_schema()
.with_relationships_from_csv('missing.csv')
.build())
except FileNotFoundError:
print("Relationships file not found, using database relationships only")Raised when schema validation fails.
try:
validation = bridge.validate_schema()
if not validation['is_valid']:
for issue in validation['issues']:
print(f"Validation issue: {issue}")
except ValidationError as e:
print(f"Schema validation error: {e}")-
Cache Expensive Operations
# Cache schema for multiple operations cached_schema = bridge.discover_schema().from_database().build() # Reuse cached schema query1 = bridge.generate_query().using_schema(cached_schema).build() query2 = bridge.generate_query().using_schema(cached_schema).build()
-
Use Targeted Filtering
# More efficient than processing all tables focused_schema = (bridge.discover_schema() .from_database() .only_tables(['core_tables']) .build())
-
Enable Query Optimization
# Always use optimization for production result = (bridge.generate_query() .select_all() .optimize_with_indexes() .build())
- Breaking Changes: None (first stable release)
- New Features: Complete fluent API implementation
- Deprecated: None
If upgrading from pre-1.0 versions, see the Migration Guide for detailed upgrade instructions.
This API reference covers all public methods and classes in DataBridge v1.0. For usage examples, see the User Guide and Example Gallery.