diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..d24977e --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +ignore = E501, E203, W503, F401 \ No newline at end of file diff --git a/.github/workflows/flake8.yml b/.github/workflows/flake8.yml new file mode 100644 index 0000000..428676d --- /dev/null +++ b/.github/workflows/flake8.yml @@ -0,0 +1,31 @@ +# This is a basic workflow to help you get started with Actions + +name: flake8 + +# Controls when the workflow will run +on: + pull_request: + branches: [ master ] + push: + branches: [ master ] + + # Allows you to run this workflow manually from the Actions tab + workflow_dispatch: + +# A workflow run is made up of one or more jobs that can run sequentially or in parallel +jobs: + # This workflow contains a single job called "build" + build: + # The type of runner that the job will run on + runs-on: ubuntu-latest + + # Steps represent a sequence of tasks that will be executed as part of the job + steps: + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + - uses: actions/setup-python@v2 + - run: pip install --upgrade pip + - run: pip install flake8 + - run: python3 -m flake8 --count \ No newline at end of file diff --git a/pytabular/__init__.py b/pytabular/__init__.py index 0398f68..e4b0008 100644 --- a/pytabular/__init__.py +++ b/pytabular/__init__.py @@ -1,44 +1,57 @@ +# flake8: noqa import logging import os import sys import platform -logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s::%(module)s::%(funcName)s::%(levelname)s::%(message)s', - datefmt='%y/%m/%d %H:%M:%S %z') -logger = logging.getLogger('PyTabular') +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s::%(module)s::%(funcName)s::%(levelname)s::%(message)s", + datefmt="%y/%m/%d %H:%M:%S %z", +) +logger = logging.getLogger("PyTabular") logger.setLevel(logging.DEBUG) -logger.info('Logging configured...') -logger.info(f'To update PyTabular logger...') -logger.info(f'>>> import logging') -logger.info(f'>>> pytabular.logger.setLevel(level=logging.INFO)') -logger.info(f'See https://docs.python.org/3/library/logging.html#logging-levels') +logger.info("Logging configured...") +logger.info(f"To update PyTabular logger...") +logger.info(f">>> import logging") +logger.info(f">>> pytabular.logger.setLevel(level=logging.INFO)") +logger.info(f"See https://docs.python.org/3/library/logging.html#logging-levels") -logger.debug(f'Python Version::{sys.version}') -logger.debug(f'Python Location::{sys.exec_prefix}') -logger.debug(f'Package Location::{__file__}') -logger.debug(f'Working Directory::{os.getcwd()}') -logger.debug(f'Platform::{sys.platform}-{platform.release()}') +logger.debug(f"Python Version::{sys.version}") +logger.debug(f"Python Location::{sys.exec_prefix}") +logger.debug(f"Package Location::{__file__}") +logger.debug(f"Working Directory::{os.getcwd()}") +logger.debug(f"Platform::{sys.platform}-{platform.release()}") -dll = os.path.join(os.path.dirname(__file__),"dll") +dll = os.path.join(os.path.dirname(__file__), "dll") sys.path.append(dll) sys.path.append(os.path.dirname(__file__)) -logger.debug(f'Beginning CLR references...') +logger.debug(f"Beginning CLR references...") import clr -logger.debug('Adding Reference Microsoft.AnalysisServices.AdomdClient') -clr.AddReference('Microsoft.AnalysisServices.AdomdClient') -logger.debug('Adding Reference Microsoft.AnalysisServices.Tabular') -clr.AddReference('Microsoft.AnalysisServices.Tabular') -logger.debug('Adding Reference Microsoft.AnalysisServices') -clr.AddReference('Microsoft.AnalysisServices') + +logger.debug("Adding Reference Microsoft.AnalysisServices.AdomdClient") +clr.AddReference("Microsoft.AnalysisServices.AdomdClient") +logger.debug("Adding Reference Microsoft.AnalysisServices.Tabular") +clr.AddReference("Microsoft.AnalysisServices.Tabular") +logger.debug("Adding Reference Microsoft.AnalysisServices") +clr.AddReference("Microsoft.AnalysisServices") logger.debug(f"Importing specifics in module...") -from . pytabular import Tabular -from . basic_checks import Return_Zero_Row_Tables, Table_Last_Refresh_Times, BPA_Violations_To_DF, Last_X_Interval -from . logic_utils import pd_dataframe_to_m_expression, pandas_datatype_to_tabular_datatype -from . tabular_tracing import Base_Trace, Refresh_Trace -from . tabular_editor import Tabular_Editor -from . best_practice_analyzer import BPA -logger.info(f'Import successful...') \ No newline at end of file +from .pytabular import Tabular +from .basic_checks import ( + Return_Zero_Row_Tables, + Table_Last_Refresh_Times, + BPA_Violations_To_DF, + Last_X_Interval, +) +from .logic_utils import ( + pd_dataframe_to_m_expression, + pandas_datatype_to_tabular_datatype, +) +from .tabular_tracing import Base_Trace, Refresh_Trace +from .tabular_editor import Tabular_Editor +from .best_practice_analyzer import BPA + +logger.info(f"Import successful...") diff --git a/pytabular/basic_checks.py b/pytabular/basic_checks.py index bc5414b..8a1986d 100644 --- a/pytabular/basic_checks.py +++ b/pytabular/basic_checks.py @@ -1,108 +1,133 @@ import logging -logger = logging.getLogger('PyTabular') from typing import List, Union import pytabular from logic_utils import ticks_to_datetime import sys import pandas as pd -def Return_Zero_Row_Tables(model:pytabular.Tabular) -> List[str]: - ''' Returns list of table names of those that are returning isna() - - Args: - model (pytabular.Tabular): Tabular Model - - Returns: - List[str]: List of table names where DAX COUNTROWS('Table Name') is nan or 0. - ''' - logger.info(f'Executing Basic Function {sys._getframe(0).f_code.co_name}') - query_function: str = 'COUNTROWS(_)' - df: pd.DataFrame = model.Query_Every_Table(query_function) - return df[df[f'[{query_function}]'].isna()]['[Table]'].to_list() - -def Table_Last_Refresh_Times(model:pytabular.Tabular, group_partition:bool = True) -> pd.DataFrame: - ''' Returns pd.DataFrame of tables with their latest refresh time. - Optional 'group_partition' variable, default is True. - If False an extra column will be include to have the last refresh time to the grain of the partition - Example to add to model model.Create_Table(p.Table_Last_Refresh_Times(model),'RefreshTimes') - - Args: - model (pytabular.Tabular): Tabular Model - group_partition (bool, optional): Whether or not you want the grain of the dataframe to be by table or by partition. Defaults to True. - - Returns: - pd.DataFrame: pd dataframe with the RefreshedTime property: https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.partition.refreshedtime?view=analysisservices-dotnet#microsoft-analysisservices-tabular-partition-refreshedtime - If group_partition == True and the table has multiple partitions, then df.groupby(by["tables"]).max() - ''' - logger.info(f'Executing Basic Function {sys._getframe(0).f_code.co_name}') - data = {\ - "Tables":[partition.Table.Name for partition in model.Partitions],\ - "Partitions":[partition.Name for partition in model.Partitions],\ - "RefreshedTime":[ticks_to_datetime(partition.RefreshedTime.Ticks).strftime('%Y-%m-%dT%H:%M:%S.%fZ')[:-3] for partition in model.Partitions]\ - } - df = pd.DataFrame(data) - if group_partition: - logger.debug('Grouping together to grain of Table') - return df[["Tables","RefreshedTime"]].groupby(by=["Tables"]).max().reset_index(drop=False) - else: - logger.debug('Returning DF') - return df - -def BPA_Violations_To_DF(model:pytabular.Tabular,te2:str, bpa:str) -> pd.DataFrame: - '''Runs BPA Analyzer from TE2 and outputs result into a DF. - - Args: - model (pytabular.Tabular): Tabular Model Class - te2 (str): TE2 Exe File Path (Can use TE2().EXE_path) - bpa (str): BPA File Location (Can use BPA().Location) - - Returns: - pd.DataFrame: Super simple right now. Just splits into two columns.. The object in violation and the rule. - ''' - results = model.Analyze_BPA(te2,bpa) - data = [rule.replace(' violates rule ','^').replace('\"','').split('^') for rule in results] - columns = ["Object","Violation"] - return pd.DataFrame(data,columns=columns) +logger = logging.getLogger("PyTabular") + + +def Return_Zero_Row_Tables(model: pytabular.Tabular) -> List[str]: + """Returns list of table names of those that are returning isna() + + Args: + model (pytabular.Tabular): Tabular Model + + Returns: + List[str]: List of table names where DAX COUNTROWS('Table Name') is nan or 0. + """ + logger.info(f"Executing Basic Function {sys._getframe(0).f_code.co_name}") + query_function: str = "COUNTROWS(_)" + df: pd.DataFrame = model.Query_Every_Table(query_function) + return df[df[f"[{query_function}]"].isna()]["[Table]"].to_list() + + +def Table_Last_Refresh_Times( + model: pytabular.Tabular, group_partition: bool = True +) -> pd.DataFrame: + """Returns pd.DataFrame of tables with their latest refresh time. + Optional 'group_partition' variable, default is True. + If False an extra column will be include to have the last refresh time to the grain of the partition + Example to add to model model.Create_Table(p.Table_Last_Refresh_Times(model),'RefreshTimes') + + Args: + model (pytabular.Tabular): Tabular Model + group_partition (bool, optional): Whether or not you want the grain of the dataframe to be by table or by partition. Defaults to True. + + Returns: + pd.DataFrame: pd dataframe with the RefreshedTime property: https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.partition.refreshedtime?view=analysisservices-dotnet#microsoft-analysisservices-tabular-partition-refreshedtime + If group_partition == True and the table has multiple partitions, then df.groupby(by["tables"]).max() + """ + logger.info(f"Executing Basic Function {sys._getframe(0).f_code.co_name}") + data = { + "Tables": [partition.Table.Name for partition in model.Partitions], + "Partitions": [partition.Name for partition in model.Partitions], + "RefreshedTime": [ + ticks_to_datetime(partition.RefreshedTime.Ticks).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ" + )[:-3] + for partition in model.Partitions + ], + } + df = pd.DataFrame(data) + if group_partition: + logger.debug("Grouping together to grain of Table") + return ( + df[["Tables", "RefreshedTime"]] + .groupby(by=["Tables"]) + .max() + .reset_index(drop=False) + ) + else: + logger.debug("Returning DF") + return df + + +def BPA_Violations_To_DF(model: pytabular.Tabular, te2: str, bpa: str) -> pd.DataFrame: + """Runs BPA Analyzer from TE2 and outputs result into a DF. + + Args: + model (pytabular.Tabular): Tabular Model Class + te2 (str): TE2 Exe File Path (Can use TE2().EXE_path) + bpa (str): BPA File Location (Can use BPA().Location) + + Returns: + pd.DataFrame: Super simple right now. Just splits into two columns.. The object in violation and the rule. + """ + results = model.Analyze_BPA(te2, bpa) + data = [ + rule.replace(" violates rule ", "^").replace('"', "").split("^") + for rule in results + ] + columns = ["Object", "Violation"] + return pd.DataFrame(data, columns=columns) + def Last_X_Interval( - Model:pytabular.Tabular, - Measure:Union[str,pytabular.pytabular.Measure], - Column_Name:Union[str,None] = None, - Date_Column_Identifier:str = "'Date'[DATE_DTE_KEY]", - Number_Of_Intervals:int = 90, - Interval:str = "DAY") -> pd.DataFrame: - '''Pulls the Last X Interval (Ex Last 90 Days) of a specific measure. - - Args: - Model (pytabular.Tabular): Tabular Model to perform query on. - Measure (Union[str,pytabular.pytabular.Measure]): Measure to query. If string, will first check for a measure in the model with that name, otherwise will assume it is a DAX Expression (Ex SUM(FactTable[ColumnValue]) ) and perform that as expression - Column_Name (Union[str,None], optional): Column Name to be outputted in DataFrame. You can provide your own otherwise will take from the Measure Name. Defaults to "Result". - Date_Column_Identifier (str, optional): Date column dax identifier. Defaults to "'Date'[DATE_DTE_KEY]". - Number_Of_Intervals (int, optional): This is used to plug in the variables for [DATESINPERIOD](https://docs.microsoft.com/en-us/dax/datesinperiod-function-dax). Defaults to 90. - Interval (str, optional): Sames as Number_Of_Intervals. Used to plug in parameters of DAX function [DATESINPERIOD](https://docs.microsoft.com/en-us/dax/datesinperiod-function-dax). Defaults to "DAY". Possible options are "DAY", "MONTH", "QUARTER", and "YEAR" - - Returns: - pd.DataFrame: Pandas DataFrame of results. - ''' - if isinstance(Measure,str): - try: - Measure = [measure for measure in Model.Measures if measure.Name == Measure][-1] - Column_Name = Measure.Name if Column_Name is None else Column_Name - Expression = f"[{Measure.Name}]" - except: - logging.debug(f'Measure is string but unable to find Measure...') - Column_Name = "Result" if Column_Name is None else Column_Name - Expression = Measure - else: - Column_Name = Measure.Name if Column_Name is None else Column_Name - Expression = f'[{Column_Name}]' - Query_Str = f''' - EVALUATE - SUMMARIZECOLUMNS( - {Date_Column_Identifier}, - KEEPFILTERS( DATESINPERIOD ( {Date_Column_Identifier}, UTCTODAY(), -{Number_Of_Intervals}, {Interval} ) ), - "{Column_Name}", {Expression} - ) - ''' - logging.info(f'Running query for {Column_Name} in the last {Number_Of_Intervals} {Interval}s...') - return Model.Query(Query_Str) \ No newline at end of file + Model: pytabular.Tabular, + Measure: Union[str, pytabular.pytabular.Measure], + Column_Name: Union[str, None] = None, + Date_Column_Identifier: str = "'Date'[DATE_DTE_KEY]", + Number_Of_Intervals: int = 90, + Interval: str = "DAY", +) -> pd.DataFrame: + """Pulls the Last X Interval (Ex Last 90 Days) of a specific measure. + + Args: + Model (pytabular.Tabular): Tabular Model to perform query on. + Measure (Union[str,pytabular.pytabular.Measure]): Measure to query. If string, will first check for a measure in the model with that name, otherwise will assume it is a DAX Expression (Ex SUM(FactTable[ColumnValue]) ) and perform that as expression + Column_Name (Union[str,None], optional): Column Name to be outputted in DataFrame. You can provide your own otherwise will take from the Measure Name. Defaults to "Result". + Date_Column_Identifier (str, optional): Date column dax identifier. Defaults to "'Date'[DATE_DTE_KEY]". + Number_Of_Intervals (int, optional): This is used to plug in the variables for [DATESINPERIOD](https://docs.microsoft.com/en-us/dax/datesinperiod-function-dax). Defaults to 90. + Interval (str, optional): Sames as Number_Of_Intervals. Used to plug in parameters of DAX function [DATESINPERIOD](https://docs.microsoft.com/en-us/dax/datesinperiod-function-dax). Defaults to "DAY". Possible options are "DAY", "MONTH", "QUARTER", and "YEAR" + + Returns: + pd.DataFrame: Pandas DataFrame of results. + """ + if isinstance(Measure, str): + try: + Measure = [ + measure for measure in Model.Measures if measure.Name == Measure + ][-1] + Column_Name = Measure.Name if Column_Name is None else Column_Name + Expression = f"[{Measure.Name}]" + except Exception: + logging.debug("Measure is string but unable to find Measure...") + Column_Name = "Result" if Column_Name is None else Column_Name + Expression = Measure + else: + Column_Name = Measure.Name if Column_Name is None else Column_Name + Expression = f"[{Column_Name}]" + Query_Str = f""" + EVALUATE + SUMMARIZECOLUMNS( + {Date_Column_Identifier}, + KEEPFILTERS( DATESINPERIOD ( {Date_Column_Identifier}, UTCTODAY(), -{Number_Of_Intervals}, {Interval} ) ), + "{Column_Name}", {Expression} + ) + """ + logging.info( + f"Running query for {Column_Name} in the last {Number_Of_Intervals} {Interval}s..." + ) + return Model.Query(Query_Str) diff --git a/pytabular/best_practice_analyzer.py b/pytabular/best_practice_analyzer.py index c63a277..ddcc342 100644 --- a/pytabular/best_practice_analyzer.py +++ b/pytabular/best_practice_analyzer.py @@ -1,44 +1,50 @@ import logging -logger = logging.getLogger('PyTabular') import requests as r import atexit import json import os from logic_utils import remove_folder_and_contents -def Download_BPA_File(Download_Location:str = 'https://raw.githubusercontent.com/microsoft/Analysis-Services/master/BestPracticeRules/BPARules.json', -Folder:str = 'Best_Practice_Analyzer', -Auto_Remove = True) -> str: - '''Runs a request.get() to retrieve the json file from web. Will return and store in directory. Will also register the removal of the new directory and file when exiting program. - - Args: - Download_Location (_type_, optional): F. Defaults to [Microsoft GitHub BPA]'https://raw.githubusercontent.com/microsoft/Analysis-Services/master/BestPracticeRules/BPARules.json'. - Folder (str, optional): New Folder String. Defaults to 'Best_Practice_Analyzer'. - Auto_Remove (bool, optional): If you wish to Auto Remove when script exits. Defaults to True. - - Returns: - str: File Path for the newly downloaded BPA. - ''' - logger.info(f'Downloading BPA from {Download_Location}') - folder_location = os.path.join(os.getcwd(),Folder) - if os.path.exists(folder_location) == False: - os.makedirs(folder_location) - response = r.get(Download_Location) - file_location = os.path.join(folder_location,Download_Location.split('/')[-1]) - with open(file_location, 'w', encoding='utf-8') as bpa: - json.dump(response.json(), bpa, ensure_ascii=False, indent= 4) - if Auto_Remove: - logger.debug(f'Registering removal on termination... For {folder_location}') - atexit.register(remove_folder_and_contents, folder_location) - return file_location + +logger = logging.getLogger("PyTabular") + + +def Download_BPA_File( + Download_Location: str = "https://raw.githubusercontent.com/microsoft/Analysis-Services/master/BestPracticeRules/BPARules.json", + Folder: str = "Best_Practice_Analyzer", + Auto_Remove=True, +) -> str: + """Runs a request.get() to retrieve the json file from web. Will return and store in directory. Will also register the removal of the new directory and file when exiting program. + + Args: + Download_Location (_type_, optional): F. Defaults to [Microsoft GitHub BPA]'https://raw.githubusercontent.com/microsoft/Analysis-Services/master/BestPracticeRules/BPARules.json'. + Folder (str, optional): New Folder String. Defaults to 'Best_Practice_Analyzer'. + Auto_Remove (bool, optional): If you wish to Auto Remove when script exits. Defaults to True. + + Returns: + str: File Path for the newly downloaded BPA. + """ + logger.info(f"Downloading BPA from {Download_Location}") + folder_location = os.path.join(os.getcwd(), Folder) + if os.path.exists(folder_location) is False: + os.makedirs(folder_location) + response = r.get(Download_Location) + file_location = os.path.join(folder_location, Download_Location.split("/")[-1]) + with open(file_location, "w", encoding="utf-8") as bpa: + json.dump(response.json(), bpa, ensure_ascii=False, indent=4) + if Auto_Remove: + logger.debug(f"Registering removal on termination... For {folder_location}") + atexit.register(remove_folder_and_contents, folder_location) + return file_location + class BPA: - '''Setting BPA Class for future work... - ''' - def __init__(self, File_Path:str = 'Default') -> None: - logger.debug(f'Initializing BPA Class:: {File_Path}') - if File_Path == 'Default': - self.Location: str = Download_BPA_File() - else: - self.Location: str = File_Path - pass \ No newline at end of file + """Setting BPA Class for future work...""" + + def __init__(self, File_Path: str = "Default") -> None: + logger.debug(f"Initializing BPA Class:: {File_Path}") + if File_Path == "Default": + self.Location: str = Download_BPA_File() + else: + self.Location: str = File_Path + pass diff --git a/pytabular/logic_utils.py b/pytabular/logic_utils.py index ef29bcd..b0d047b 100644 --- a/pytabular/logic_utils.py +++ b/pytabular/logic_utils.py @@ -1,155 +1,177 @@ import logging -logger = logging.getLogger('PyTabular') import datetime import os from typing import Dict, List import pandas as pd -import clr - -clr.AddReference('Microsoft.AnalysisServices.Tabular') from Microsoft.AnalysisServices.Tabular import DataType -def ticks_to_datetime(ticks:int) -> datetime.datetime: - '''Converts a C# System DateTime Tick into a Python DateTime - - Args: - ticks (int): [C# DateTime Tick](https://docs.microsoft.com/en-us/dotnet/api/system.datetime.ticks?view=net-6.0) - - Returns: - datetime.datetime: [datetime.datetime](https://docs.python.org/3/library/datetime.html) - ''' - return datetime.datetime(1,1,1) + datetime.timedelta(microseconds=ticks//10) - -def pandas_datatype_to_tabular_datatype(df:pd.DataFrame)-> Dict: - '''WiP takes dataframe columns and gets respective tabular column datatype. ([NumPy Datatypes](https://numpy.org/doc/stable/reference/generated/numpy.dtype.kind.html) and [Tabular Datatypes](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.datatype?view=analysisservices-dotnet)) - - Args: - df (pd.DataFrame): Pandas DataFrame - - Returns: - Dict: EX {'col1': , 'col2': , 'col3': } - ''' - logger.info(f'Getting DF Column Dtypes to Tabular Dtypes...') - tabular_datatype_mapping_key = { - 'b':DataType.Boolean, - 'i':DataType.Int64, - 'u':DataType.Int64, - 'f':DataType.Double, - 'c':DataType.Double, - 'm':DataType.DateTime, - 'M':DataType.DateTime, - 'O':DataType.String, - 'S':DataType.String, - 'U':DataType.String, - 'V':DataType.String - } - return {column:tabular_datatype_mapping_key[df[column].dtype.kind] for column in df.columns} - -def pd_dataframe_to_dax_expression(df:pd.DataFrame = pd.DataFrame(data={'col1': [1.0, 2.0], 'col2': [3, 4]})) -> str: - ''' - This will take a pandas dataframe and convert to a dax expression - For example this DF: - col1 col2 - 0 1 3 - 1 2 4 - - | - | - V - - Will convert to this expression string: - DEFINE - TABLE tablename = { ( 1, 3 ), ( 2, 4 ) } - - EVALUATE - SELECTCOLUMNS( - tablename, - "col1", tablename[Value1], - "col2", tablename[Value2] - ) - ''' - def dax_tableconstructor_rows_expression_generator(list_of_strings: list[str]) -> str: - ''' - Converts list[str] to dax table rows for example ['one','two'] -> '('one','two')' - ''' - return - return True -def pd_dataframe_to_m_expression(df:pd.DataFrame) -> str: - '''This will take a pandas dataframe and convert to an m expression - For example this DF: - col1 col2 - 0 1 3 - 1 2 4 - - | - | - V - - Will convert to this expression string: - let - Source=#table({"col1","col2"}, - { - {"1","3"},{"2","4"} - }) - in - Source - - Args: - df (pd.DataFrame): Pandas DataFrame - - Returns: - str: Currently only returning string values in your tabular model. - ''' - def m_list_expression_generator(list_of_strings:List[str]) -> str: - ''' - Takes a python list of strings and converts to power query m expression list format... - Ex: ["item1","item2","item3"] --> {"item1","item2","item3"} - Codepoint reference --> \u007b == { and \u007d == } - ''' - string_components = ','.join([f'\"{string_value}\"' for string_value in list_of_strings]) - return f'\u007b{string_components}\u007d' - logger.debug(f'Executing m_list_generator()... for {df.columns}') - columns = m_list_expression_generator(df.columns) - expression_str = f"let\nSource=#table({columns},\n" - logger.debug(f'Iterating through rows to build expression... df has {len(df)} rows...') - expression_list_rows = [] - for index, row in df.iterrows(): - expression_list_rows += [m_list_expression_generator(row.to_list())] - expression_str += f"\u007b\n{','.join(expression_list_rows)}\n\u007d)\nin\nSource" - return expression_str +logger = logging.getLogger("PyTabular") + + +def ticks_to_datetime(ticks: int) -> datetime.datetime: + """Converts a C# System DateTime Tick into a Python DateTime + + Args: + ticks (int): [C# DateTime Tick](https://docs.microsoft.com/en-us/dotnet/api/system.datetime.ticks?view=net-6.0) + + Returns: + datetime.datetime: [datetime.datetime](https://docs.python.org/3/library/datetime.html) + """ + return datetime.datetime(1, 1, 1) + datetime.timedelta(microseconds=ticks // 10) + + +def pandas_datatype_to_tabular_datatype(df: pd.DataFrame) -> Dict: + """WiP takes dataframe columns and gets respective tabular column datatype. ([NumPy Datatypes](https://numpy.org/doc/stable/reference/generated/numpy.dtype.kind.html) and [Tabular Datatypes](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.datatype?view=analysisservices-dotnet)) + + Args: + df (pd.DataFrame): Pandas DataFrame + + Returns: + Dict: EX {'col1': , 'col2': , 'col3': } + """ + logger.info("Getting DF Column Dtypes to Tabular Dtypes...") + tabular_datatype_mapping_key = { + "b": DataType.Boolean, + "i": DataType.Int64, + "u": DataType.Int64, + "f": DataType.Double, + "c": DataType.Double, + "m": DataType.DateTime, + "M": DataType.DateTime, + "O": DataType.String, + "S": DataType.String, + "U": DataType.String, + "V": DataType.String, + } + return { + column: tabular_datatype_mapping_key[df[column].dtype.kind] + for column in df.columns + } + + +def pd_dataframe_to_dax_expression( + df: pd.DataFrame = pd.DataFrame(data={"col1": [1.0, 2.0], "col2": [3, 4]}) +) -> str: + """ + This will take a pandas dataframe and convert to a dax expression + For example this DF: + col1 col2 + 0 1 3 + 1 2 4 + + | + | + V + + Will convert to this expression string: + DEFINE + TABLE tablename = { ( 1, 3 ), ( 2, 4 ) } + + EVALUATE + SELECTCOLUMNS( + tablename, + "col1", tablename[Value1], + "col2", tablename[Value2] + ) + """ + + def dax_tableconstructor_rows_expression_generator( + list_of_strings: list[str], + ) -> str: + """ + Converts list[str] to dax table rows for example ['one','two'] -> '('one','two')' + """ + return + + return True + + +def pd_dataframe_to_m_expression(df: pd.DataFrame) -> str: + """This will take a pandas dataframe and convert to an m expression + For example this DF: + col1 col2 + 0 1 3 + 1 2 4 + + | + | + V + + Will convert to this expression string: + let + Source=#table({"col1","col2"}, + { + {"1","3"},{"2","4"} + }) + in + Source + + Args: + df (pd.DataFrame): Pandas DataFrame + + Returns: + str: Currently only returning string values in your tabular model. + """ + + def m_list_expression_generator(list_of_strings: List[str]) -> str: + """ + Takes a python list of strings and converts to power query m expression list format... + Ex: ["item1","item2","item3"] --> {"item1","item2","item3"} + Codepoint reference --> \u007b == { and \u007d == } + """ + string_components = ",".join( + [f'"{string_value}"' for string_value in list_of_strings] + ) + return f"\u007b{string_components}\u007d" + + logger.debug(f"Executing m_list_generator()... for {df.columns}") + columns = m_list_expression_generator(df.columns) + expression_str = f"let\nSource=#table({columns},\n" + logger.debug( + f"Iterating through rows to build expression... df has {len(df)} rows..." + ) + expression_list_rows = [] + for index, row in df.iterrows(): + expression_list_rows += [m_list_expression_generator(row.to_list())] + expression_str += f"\u007b\n{','.join(expression_list_rows)}\n\u007d)\nin\nSource" + return expression_str + def remove_folder_and_contents(folder_location): - '''Internal used in tabular_editor.py and best_practice_analyzer.py. + """Internal used in tabular_editor.py and best_practice_analyzer.py. + + Args: + folder_location (str): Folder path to remove directory and contents. + """ + import shutil + + if os.path.exists(folder_location): + logger.info(f"Removing Dir and Contents -> {folder_location}") + shutil.rmtree(folder_location) - Args: - folder_location (str): Folder path to remove directory and contents. - ''' - import shutil - if os.path.exists(folder_location): - logger.info(f'Removing Dir and Contents -> {folder_location}') - shutil.rmtree(folder_location) def remove_suffix(input_string, suffix): - '''Adding for >3.9 compatiblity. (Stackoverflow Answer)[https://stackoverflow.com/questions/66683630/removesuffix-returns-error-str-object-has-no-attribute-removesuffix] + """Adding for >3.9 compatiblity. (Stackoverflow Answer)[https://stackoverflow.com/questions/66683630/removesuffix-returns-error-str-object-has-no-attribute-removesuffix] + + Args: + input_string (str): input string to remove suffix from + suffix (str): suffix to be removed - Args: - input_string (str): input string to remove suffix from - suffix (str): suffix to be removed + Returns: + str: input_str with suffix removed + """ + if suffix and input_string.endswith(suffix): + return input_string[: -len(suffix)] + return input_string - Returns: - str: input_str with suffix removed - ''' - if suffix and input_string.endswith(suffix): - return input_string[:-len(suffix)] - return input_string -def sql_wrap_count_around_query(original_query:str) -> str: - '''Simple string formating to get the total row count of a sql query. +def sql_wrap_count_around_query(original_query: str) -> str: + """Simple string formating to get the total row count of a sql query. - Args: - original_query (str): Regular sql query to get count of. + Args: + original_query (str): Regular sql query to get count of. - Returns: - str: f"SELECT COUNT(1) FROM ({original_query}) temp_table" - ''' - return f"SELECT COUNT(1) FROM ({original_query}) temp_table" \ No newline at end of file + Returns: + str: f"SELECT COUNT(1) FROM ({original_query}) temp_table" + """ + return f"SELECT COUNT(1) FROM ({original_query}) temp_table" diff --git a/pytabular/pytabular.py b/pytabular/pytabular.py index 3011810..d466bae 100644 --- a/pytabular/pytabular.py +++ b/pytabular/pytabular.py @@ -1,495 +1,660 @@ import logging -from pathlib import Path -logger = logging.getLogger('PyTabular') - -logger.debug(f'Importing Microsoft.AnalysisServices.Tabular') -from Microsoft.AnalysisServices.Tabular import Server, RefreshType, ColumnType, Table, DataColumn, Partition, MPartitionSource, Measure -logger.debug(f'Importing Microsoft.AnalysisServices.AdomdClient') -from Microsoft.AnalysisServices.AdomdClient import (AdomdCommand, AdomdConnection) -logger.debug(f'Importing Microsoft.AnalysisServices') -from Microsoft.AnalysisServices import UpdateOptions -logger.debug('Importing Other Packages...') -from typing import Any, Dict, List, Union, TYPE_CHECKING -from collections.abc import Iterable +from Microsoft.AnalysisServices.Tabular import ( + Server, + RefreshType, + ColumnType, + Table, + DataColumn, + Partition, + MPartitionSource, + Measure, +) +from Microsoft.AnalysisServices.AdomdClient import AdomdCommand, AdomdConnection +from Microsoft.AnalysisServices import UpdateOptions +from typing import Any, Dict, List, Union from collections import namedtuple import pandas as pd import os import subprocess import atexit - - -from logic_utils import pd_dataframe_to_m_expression, pandas_datatype_to_tabular_datatype, ticks_to_datetime, remove_suffix +from logic_utils import ( + pd_dataframe_to_m_expression, + pandas_datatype_to_tabular_datatype, + ticks_to_datetime, + remove_suffix, +) from tabular_tracing import Refresh_Trace +logger = logging.getLogger("PyTabular") + class Tabular: - '''Tabular Class to perform operations: [Microsoft.AnalysisServices.Tabular](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular?view=analysisservices-dotnet). You can use this class as your main way to interact with your model. - - Args: - CONNECTION_STR (str): Valid [Connection String](https://docs.microsoft.com/en-us/analysis-services/instances/connection-string-properties-analysis-services?view=asallproducts-allversions) for connecting to a Tabular Model. - Attributes: - Server (Server): See [Server MS Docs](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.server?view=analysisservices-dotnet). - Catalog (str): Name of Database. See [Catalog MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.connectioninfo.catalog?view=analysisservices-dotnet#microsoft-analysisservices-connectioninfo-catalog). - Model (Model): See [Model MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.model?view=analysisservices-dotnet). - AdomdConnection (AdomdConnection): For querying. See [AdomdConnection MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.adomdclient.adomdconnection?view=analysisservices-dotnet). Connection made from parts of the originally provided connection string. - Tables (List[Table]): Easy access list of tables from model. See [Table MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.table?view=analysisservices-dotnet). - Columns (List[Column]): Easy access list of columns from model. See [Column MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.column?view=analysisservices-dotnet). - Partitions (List[Partition]): Easy access list of partitions from model. See [Partition MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.partition?view=analysisservices-dotnet). - Measures (List[Measure]): Easy access list of measures from model. See [Measure MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.table.measures?view=analysisservices-dotnet#microsoft-analysisservices-tabular-table-measures). - ''' - def __init__(self,CONNECTION_STR:str): - logger.debug(f'Initializing Tabular Class') - self.Server = Server() - self.Server.Connect(CONNECTION_STR) - logger.info(f'Connected to Server - {self.Server.Name}') - self.Catalog = self.Server.ConnectionInfo.Catalog - logger.debug(f'Received Catalog - {self.Catalog}') - try: - self.Database = [database for database in self.Server.Databases.GetEnumerator() if database.Name == self.Catalog][0] - except: - err_msg = f'Unable to find Database... {self.Catalog}' - logger.error(err_msg) - raise Exception(err_msg) - logger.info(f'Connected to Database - {self.Database.Name}') - self.CompatibilityLevel: int = self.Database.CompatibilityLevel - self.CompatibilityMode: int = self.Database.CompatibilityMode.value__ - self.Model = self.Database.Model - logger.info(f'Connected to Model - {self.Model.Name}') - self.AdomdConnection = AdomdConnection() - self.AdomdConnection.ConnectionString = f"{self.Server.ConnectionString}Password='{self.Server.ConnectionInfo.Password}'" - self.Reload_Model_Info() - logger.debug(f'Class Initialization Completed') - logger.debug(f'Registering Disconnect on Termination...') - atexit.register(self.Disconnect) - - pass - def __repr__(self) -> str: - return f'{self.Server.Name}::{self.Database.Name}::{self.Model.Name}\nEstimated Size::{self.Database.EstimatedSize}\nTables::{len(self.Tables)}\nColumns::{len(self.Columns)}\nPartitions::{len(self.Partitions)}\nMeasures::{len(self.Measures)}' - def Reload_Model_Info(self) -> bool: - '''Runs on __init__ iterates through details, can be called after any model changes. Called in SaveChanges() - - Returns: - bool: True if successful - ''' - self.Tables = [table for table in self.Model.Tables.GetEnumerator()] - self.Columns = [column for table in self.Tables for column in table.Columns.GetEnumerator()] - self.Partitions = [partition for table in self.Tables for partition in table.Partitions.GetEnumerator()] - self.Measures = [measure for table in self.Tables for measure in table.Measures.GetEnumerator()] - self.Database.Refresh() - return True - def Is_Process(self) -> bool: - '''Run method to check if Processing is occurring. Will query DMV $SYSTEM.DISCOVER_JOBS to see if any processing is happening. - - Returns: - bool: True if DMV shows Process, False if not. - ''' - _jobs_df = self.Query("select * from $SYSTEM.DISCOVER_JOBS") - return True if len(_jobs_df[_jobs_df['JOB_DESCRIPTION']=='Process']) > 0 else False - def Disconnect(self) -> bool: - '''Disconnects from Model - - Returns: - bool: True if successful - ''' - logger.debug(f'Disconnecting from - {self.Server.Name}') - return self.Server.Disconnect() - def Refresh(self, - Object:Union[ - str, Table, Partition, Dict[str, Any] - ], - RefreshType:RefreshType = RefreshType.Full, - Tracing = False) -> None: - '''Refreshes table(s) and partition(s). - - Args: - Object (Union[ str, Table, Partition, Dict[str, Any], Iterable[str, Table, Partition, Dict[str, Any]] ]): Designed to handle a few different ways of selecting a refresh. - str == 'Table_Name' - Table == Table Object - Partition == Partition Object - Dict[str, Any] == A way to specify a partition of group of partitions. For ex. {'Table_Name':'Partition1'} or {'Table_Name':['Partition1','Partition2']}. NOTE you can also change out the strings for partition or tables objects. - RefreshType (RefreshType, optional): See [RefreshType](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.refreshtype?view=analysisservices-dotnet). Defaults to RefreshType.Full. - Tracing (bool, optional): Currently just some basic tracing to track refreshes. Defaults to False. - - Raises: - Exception: Raises exception if unable to find table or partition via string. - - - Returns: - WIP: WIP - ''' - logger.debug(f'Beginning RequestRefresh cadence...') - - def _Refresh_Report(Property_Changes) -> pd.DataFrame: - logger.debug(f'Running Refresh Report...') - refresh_data = [] - for property_change in Property_Changes: - if isinstance(property_change.Object,Partition) and property_change.Property_Name == 'RefreshedTime': - table, partition, refreshed_time = property_change.Object.Table.Name, property_change.Object.Name, ticks_to_datetime(property_change.New_Value.Ticks) - logger.info(f'{table} - {partition} Refreshed! - {refreshed_time.strftime("%m/%d/%Y, %H:%M:%S")}') - refresh_data += [[table, partition, refreshed_time]] - return pd.DataFrame(refresh_data, columns=['Table','Partition','Refreshed Time']) - - def refresh_table(table:Table) -> None: - logging.info(f'Requesting refresh for {table.Name}') - table.RequestRefresh(RefreshType) - - def refresh_partition(partition:Partition) -> None: - logging.info(f'Requesting refresh for {partition.Table.Name}|{partition.Name}') - partition.RequestRefresh(RefreshType) - - def refresh_dict(partition_dict:Dict) -> None: - for table in partition_dict.keys(): - table_object = find_table(table) if isinstance(table,str) else table - def handle_partitions(object): - if isinstance(object,str): - refresh_partition(find_partition(table_object, object)) - elif isinstance(object,Partition): - refresh_partition(object) - else: - [handle_partitions(obj) for obj in object] - handle_partitions(partition_dict[table]) - - - def find_table(table_str:str) -> Table: - result = self.Model.Tables.Find(table_str) - if result is None: - raise Exception(f"Unable to find table! from {table_str}") - logging.debug(f'Found table {result.Name}') - return result - - def find_partition(table:Table, partition_str:str) -> Partition: - result = table.Partitions.Find(partition_str) - if result is None: - raise Exception(f"Unable to find partition! {table.Name}|{partition_str}") - logging.debug(f'Found partition {result.Table.Name}|{result.Name}') - return result - - def refresh(Object): - if isinstance(Object,str): - refresh_table(find_table(Object)) - elif isinstance(Object, Dict): - refresh_dict(Object) - elif isinstance(Object, Table): - refresh_table(Object) - elif isinstance(Object, Partition): - refresh_partition(Object) - else: - [refresh(object) for object in Object] - refresh(Object) - if Tracing: - rt = Refresh_Trace(self) - rt.Start() - - m = self.SaveChanges() - - if Tracing: - rt.Stop() - rt.Drop() - - return _Refresh_Report(m.Property_Changes) - def Update(self, UpdateOptions:UpdateOptions = UpdateOptions.ExpandFull) -> None: - '''[Update Model](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.majorobject.update?view=analysisservices-dotnet#microsoft-analysisservices-majorobject-update(microsoft-analysisservices-updateoptions)) - - Args: - UpdateOptions (UpdateOptions, optional): See above MS Doc link. Defaults to UpdateOptions.ExpandFull. - - Returns: - None: Placeholder to eventually change. - ''' - logger.debug('Running Update Request') - return self.Database.Update(UpdateOptions) - def SaveChanges(self) -> bool: - def property_changes(Property_Changes): - Property_Change = namedtuple("Property_Change","New_Value Object Original_Value Property_Name Property_Type") - return [Property_Change(change.NewValue, change.Object, change.OriginalValue, change.PropertyName, change.PropertyType) for change in Property_Changes.GetEnumerator()] - - - logger.info(f'Executing SaveChanges()...') - Model_Save_Results = self.Model.SaveChanges() - if isinstance(Model_Save_Results.Impact, type(None)): - logger.warning(f'No changes detected on save for {self.Server.Name}') - return None - else: - Property_Changes = Model_Save_Results.Impact.PropertyChanges - Added_Objects = Model_Save_Results.Impact.AddedObjects - Added_Subtree_Roots = Model_Save_Results.Impact.AddedSubtreeRoots - Removed_Objects = Model_Save_Results.Impact.RemovedObjects - Removed_Subtree_Roots = Model_Save_Results.Impact.RemovedSubtreeRoots - Xmla_Results = Model_Save_Results.XmlaResults - Changes = namedtuple("Changes","Property_Changes Added_Objects Added_Subtree_Roots Removed_Objects Removed_Subtree_Roots Xmla_Results") - [property_changes(Property_Changes), Added_Objects, Added_Subtree_Roots, Removed_Objects, Removed_Subtree_Roots, Xmla_Results] - self.Reload_Model_Info() - return Changes(property_changes(Property_Changes), Added_Objects, Added_Subtree_Roots, Removed_Objects, Removed_Subtree_Roots, Xmla_Results) - def Backup_Table(self,table_str:str) -> bool: - '''USE WITH CAUTION, EXPERIMENTAL. Backs up table in memory, brings with it measures, columns, hierarchies, relationships, roles, etc. - It will add suffix '_backup' to all objects. - Refresh is performed from source during backup. - - Args: - table_str (str, optional): Name of Table. - - Returns: - bool: Returns True if Successful, else will return error. - ''' - logger.info('Backup Beginning...') - logger.debug(f'Cloning {table_str}') - table = self.Model.Tables.Find(table_str).Clone() - logger.info(f'Beginning Renames') - def rename(items): - for item in items: - item.RequestRename(f'{item.Name}_backup') - logger.debug(f'Renamed - {item.Name}') - logger.info('Renaming Columns') - rename(table.Columns.GetEnumerator()) - logger.info('Renaming Partitions') - rename(table.Partitions.GetEnumerator()) - logger.info('Renaming Measures') - rename(table.Measures.GetEnumerator()) - logger.info('Renaming Hierarchies') - rename(table.Hierarchies.GetEnumerator()) - logger.info('Renaming Table') - table.RequestRename(f'{table.Name}_backup') - logger.info('Adding Table to Model as backup') - self.Model.Tables.Add(table) - logger.info('Finding Necessary Relationships... Cloning...') - relationships = [relationship.Clone() for relationship in self.Model.Relationships.GetEnumerator() if relationship.ToTable.Name == remove_suffix(table.Name,'_backup') or relationship.FromTable.Name == remove_suffix(table.Name,'_backup')] - logger.info('Renaming Relationships') - rename(relationships) - logger.info('Switching Relationships to Clone Table & Column') - for relationship in relationships: - logger.debug(f'Renaming - {relationship.Name}') - if relationship.ToTable.Name == remove_suffix(table.Name,'_backup'): - relationship.set_ToColumn(table.Columns.Find(f'{relationship.ToColumn.Name}_backup')) - elif relationship.FromTable.Name == remove_suffix(table.Name,'_backup'): - relationship.set_FromColumn(table.Columns.Find(f'{relationship.FromColumn.Name}_backup')) - logger.debug(f'Adding {relationship.Name} to {self.Model.Name}') - self.Model.Relationships.Add(relationship) - def clone_role_permissions(): - logger.info(f'Beginning to handle roles and permissions for table...') - logger.debug(f'Finding Roles...') - roles = [role for role in self.Model.Roles.GetEnumerator() for tablepermission in role.TablePermissions.GetEnumerator() if tablepermission.Name == table_str] - for role in roles: - logger.debug(f'Role {role.Name} matched, looking into it...') - logger.debug(f'Searching for table specific permissions') - tablepermissions = [table.Clone() for table in role.TablePermissions.GetEnumerator() if table.Name == table_str] - for tablepermission in tablepermissions: - logger.debug(f'{tablepermission.Name} found... switching table to clone') - tablepermission.set_Table(table) - for column in tablepermission.ColumnPermissions.GetEnumerator(): - logger.debug(f'Column - {column.Name} copying permissions to clone...') - column.set_Column(self.Model.Tables.Find(table.Name).Columns.Find(f'{column.Name}_backup')) - logger.debug(f'Adding {tablepermission.Name} to {role.Name}') - role.TablePermissions.Add(tablepermission) - return True - clone_role_permissions() - logger.info(f'Refreshing Clone... {table.Name}') - self.Refresh([table]) - logger.info(f'Updating Model {self.Model.Name}') - self.SaveChanges() - return True - def Revert_Table(self, table_str:str) -> bool: - '''USE WITH CAUTION, EXPERIMENTAL. This is used in conjunction with Backup_Table(). - It will take the 'TableName_backup' and replace with the original. - Example scenario -> - 1. model.Backup_Table('TableName') - 2. perform any proposed changes in original 'TableName' - 3. validate changes in 'TableName' - 4. if unsuccessful run model.Revert_Table('TableName') - - Args: - table_str (str): Name of table. - - Returns: - bool: Returns True if Successful, else will return error. - ''' - logger.info(f'Beginning Revert for {table_str}') - logger.debug(f'Finding original {table_str}') - main = self.Model.Tables.Find(table_str) - logger.debug(f'Finding backup {table_str}') - backup = self.Model.Tables.Find(f'{table_str}_backup') - logger.debug(f'Finding original relationships') - main_relationships = [relationship for relationship in self.Model.Relationships.GetEnumerator() if relationship.ToTable.Name == main.Name or relationship.FromTable.Name == main.Name] - logger.debug(f'Finding backup relationships') - backup_relationships = [relationship for relationship in self.Model.Relationships.GetEnumerator() if relationship.ToTable.Name == backup.Name or relationship.FromTable.Name == backup.Name] - - def remove_role_permissions(): - logger.debug(f'Finding table and column permission in roles to remove from {table_str}') - roles = [role for role in self.Model.Roles.GetEnumerator() for tablepermission in role.TablePermissions.GetEnumerator() if tablepermission.Name == table_str] - for role in roles: - logger.debug(f'Role {role.Name} Found') - tablepermissions = [table for table in role.TablePermissions.GetEnumerator() if table.Name == table_str] - for tablepermission in tablepermissions: - logger.debug(f'Removing {tablepermission.Name} from {role.Name}') - role.TablePermissions.Remove(tablepermission) - for relationship in main_relationships: - logger.debug(f'Cleaning relationships...') - if relationship.ToTable.Name == main.Name: - logger.debug(f'Removing {relationship.Name}') - self.Model.Relationships.Remove(relationship) - elif relationship.FromTable.Name == main.Name: - logger.debug(f'Removing {relationship.Name}') - self.Model.Relationships.Remove(relationship) - logger.debug(f'Removing Original Table {main.Name}') - self.Model.Tables.Remove(main) - remove_role_permissions() - def dename(items): - for item in items: - logger.debug(f'Removing Suffix for {item.Name}') - item.RequestRename(remove_suffix(item.Name,'_backup')) - logger.debug(f'Saving Changes... for {item.Name}') - self.Model.SaveChanges() - logger.info(f'Name changes for Columns...') - dename([column for column in backup.Columns.GetEnumerator() if column.Type != ColumnType.RowNumber]) - logger.info(f'Name changes for Partitions...') - dename(backup.Partitions.GetEnumerator()) - logger.info(f'Name changes for Measures...') - dename(backup.Measures.GetEnumerator()) - logger.info(f'Name changes for Hierarchies...') - dename(backup.Hierarchies.GetEnumerator()) - logger.info(f'Name changes for Relationships...') - dename(backup_relationships) - logger.info(f'Name changes for Backup Table...') - backup.RequestRename(remove_suffix(backup.Name,'_backup')) - self.SaveChanges() - return True - def Query(self,Query_Str:str) -> Union[pd.DataFrame,str,int]: - ''' Executes Query on Model and Returns Results in Pandas DataFrame - - Args: - Query_Str (str): Dax Query. Note, needs full syntax (ex: EVALUATE). See (DAX Queries)[https://docs.microsoft.com/en-us/dax/dax-queries]. - Will check if query string is a file. If it is, then it will perform a query on whatever is read from the file. - It is also possible to query DMV. For example. Query("select * from $SYSTEM.DISCOVER_TRACE_EVENT_CATEGORIES"). See (DMVs)[https://docs.microsoft.com/en-us/analysis-services/instances/use-dynamic-management-views-dmvs-to-monitor-analysis-services?view=asallproducts-allversions] - - Returns: - pd.DataFrame: Returns dataframe with results - ''' - - if os.path.isfile(Query_Str): - logging.debug(f'File path detected, reading file... -> {Query_Str}',) - with open(Query_Str,'r') as file: - Query_Str = str(file.read()) - - - try: - logger.debug(f'Setting first initial Adomd Connection...') - self.AdomdConnection.Open() - logger.debug(f'Connected!') - except: - pass - logger.info(f'Querying Model...') - Query = AdomdCommand(Query_Str, self.AdomdConnection).ExecuteReader() - Column_Headers = [(index,Query.GetName(index)) for index in range(0,Query.FieldCount)] - Results = list() - while Query.Read(): - Results.append([Query.GetValue(index) for index in range(0,len(Column_Headers))]) - Query.Close() - logger.debug(f'Data retrieved... reading...') - df = pd.DataFrame(Results,columns=[value for _,value in Column_Headers]) - if len(df) == 1 and len(df.columns) == 1: - return df.iloc[0][df.columns[0]] - return df - def Query_Every_Column(self,query_function:str='COUNTROWS(VALUES(_))') -> pd.DataFrame: - '''This will dynamically create a query to pull all columns from the model and run the query function. -
It will replace the _ with the column to run. - - Args: - query_function (str, optional): Dax query is dynamically building a query with the UNION & ROW DAX Functions. - - Returns: - pd.DataFrame: Returns dataframe with results. - ''' - logger.info(f'Beginning execution of querying every column...') - logger.debug(f'Function to be run: {query_function}') - logger.debug(f'Dynamically creating DAX query...') - query_str = "EVALUATE UNION(\n" - for column in self.Columns: - if column.Type != ColumnType.RowNumber: - table_name = column.Table.get_Name() - column_name = column.get_Name() - dax_identifier = f"'{table_name}'[{column_name}]" - query_str += f"ROW(\"Table\",\"{table_name}\",\"Column\",\"{column_name}\",\"{query_function}\",{query_function.replace('_',dax_identifier)}),\n" - query_str = f'{query_str[:-2]})' - return self.Query(query_str) - def Query_Every_Table(self,query_function:str='COUNTROWS(_)') -> pd.DataFrame: - '''This will dynamically create a query to pull all tables from the model and run the query function. - It will replace the _ with the table to run. - - Args: - query_function (str, optional): Dax query is dynamically building a query with the UNION & ROW DAX Functions. Defaults to 'COUNTROWS(_)'. - - Returns: - pd.DataFrame: Returns dataframe with results - ''' - logger.info(f'Beginning execution of querying every table...') - logger.debug(f'Function to be run: {query_function}') - logger.debug(f'Dynamically creating DAX query...') - query_str = "EVALUATE UNION(\n" - for table in self.Tables: - table_name = table.get_Name() - dax_table_identifier = f'\'{table_name}\'' - query_str += f"ROW(\"Table\",\"{table_name}\",\"{query_function}\",{query_function.replace('_',dax_table_identifier)}),\n" - query_str = f'{query_str[:-2]})' - return self.Query(query_str) - def Analyze_BPA(self,Tabular_Editor_Exe:str,Best_Practice_Analyzer:str) -> List[str]: - '''Takes your Tabular Model and performs TE2s BPA. Runs through Command line. - [Tabular Editor BPA](https://docs.tabulareditor.com/te2/Best-Practice-Analyzer.html) - [Tabular Editor Command Line Options](https://docs.tabulareditor.com/te2/Command-line-Options.html) - - Args: - Tabular_Editor_Exe (str): TE2 Exe File path. Feel free to use class TE2().EXE_Path or provide your own. - Best_Practice_Analyzer (str): BPA json file path. Feel free to use class BPA().Location or provide your own. Defualts to https://raw.githubusercontent.com/microsoft/Analysis-Services/master/BestPracticeRules/BPARules.json - - Returns: - List[str]: Assuming no failure, will return list of BPA violations. Else will return error from command line. - ''' - #Working TE2 Script in Python os.system(f"start /wait {te2.EXE_Path} \"Provider=MSOLAP;{model.AdomdConnection.ConnectionString}\" FINANCE -B \"{os.getcwd()}\\Model.bim\" -A {l.BPA_LOCAL_FILE_PATH} -V/?") - #start /wait - logger.debug(f'Beginning request to talk with TE2 & Find BPA...') - cmd = f"{Tabular_Editor_Exe} \"Provider=MSOLAP;{self.AdomdConnection.ConnectionString}\" {self.Database.Name} -B \"{os.getcwd()}\\Model.bim\" -A {Best_Practice_Analyzer} -V/?" - logger.debug(f'Command Generated') - logger.debug(f'Submitting Command...') - sp = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE,universal_newlines=True) - raw_output,error = sp.communicate() - if len(error) > 0: - return error - else: - return [output for output in raw_output.split('\n') if 'violates rule' in output] - def Create_Table(self,df:pd.DataFrame, table_name:str) -> bool: - '''Creates tables from pd.DataFrame as an M-Partition. - So will convert the dataframe to M-Partition logic via the M query table constructor. - Runs refresh and will update model. - - Args: - df (pd.DataFrame): DataFrame to add to model - table_name (str): _description_ - - Returns: - bool: True if successful - ''' - logger.debug(f'Beginning to create table for {table_name}...') - new_table = Table() - new_table.RequestRename(table_name) - logger.debug(f'Sorting through columns...') - df_column_names = df.columns - dtype_conversion = pandas_datatype_to_tabular_datatype(df) - for df_column_name in df_column_names: - logger.debug(f'Adding {df_column_name} to Table...') - column = DataColumn() - column.RequestRename(df_column_name) - column.set_SourceColumn(df_column_name) - column.set_DataType(dtype_conversion[df_column_name]) - new_table.Columns.Add(column) - logger.debug(f'Expression String Created...') - logger.debug(f'Creating MPartition...') - partition = Partition() - partition.set_Source(MPartitionSource()) - logger.debug(f'Setting MPartition Expression...') - partition.Source.set_Expression(pd_dataframe_to_m_expression(df)) - logger.debug(f'Adding partition: {partition.Name} to {self.Server.Name}::{self.Database.Name}::{self.Model.Name}') - new_table.Partitions.Add(partition) - logger.debug(f'Adding table: {new_table.Name} to {self.Server.Name}::{self.Database.Name}::{self.Model.Name}') - self.Model.Tables.Add(new_table) - self.Refresh([new_table]) - self.SaveChanges() - return True \ No newline at end of file + """Tabular Class to perform operations: [Microsoft.AnalysisServices.Tabular](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular?view=analysisservices-dotnet). You can use this class as your main way to interact with your model. + + Args: + CONNECTION_STR (str): Valid [Connection String](https://docs.microsoft.com/en-us/analysis-services/instances/connection-string-properties-analysis-services?view=asallproducts-allversions) for connecting to a Tabular Model. + Attributes: + Server (Server): See [Server MS Docs](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.server?view=analysisservices-dotnet). + Catalog (str): Name of Database. See [Catalog MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.connectioninfo.catalog?view=analysisservices-dotnet#microsoft-analysisservices-connectioninfo-catalog). + Model (Model): See [Model MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.model?view=analysisservices-dotnet). + AdomdConnection (AdomdConnection): For querying. See [AdomdConnection MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.adomdclient.adomdconnection?view=analysisservices-dotnet). Connection made from parts of the originally provided connection string. + Tables (List[Table]): Easy access list of tables from model. See [Table MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.table?view=analysisservices-dotnet). + Columns (List[Column]): Easy access list of columns from model. See [Column MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.column?view=analysisservices-dotnet). + Partitions (List[Partition]): Easy access list of partitions from model. See [Partition MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.partition?view=analysisservices-dotnet). + Measures (List[Measure]): Easy access list of measures from model. See [Measure MS Docs](https://learn.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.table.measures?view=analysisservices-dotnet#microsoft-analysisservices-tabular-table-measures). + """ + + def __init__(self, CONNECTION_STR: str): + logger.debug("Initializing Tabular Class") + self.Server = Server() + self.Server.Connect(CONNECTION_STR) + logger.info(f"Connected to Server - {self.Server.Name}") + self.Catalog = self.Server.ConnectionInfo.Catalog + logger.debug(f"Received Catalog - {self.Catalog}") + try: + self.Database = [ + database + for database in self.Server.Databases.GetEnumerator() + if database.Name == self.Catalog + ][0] + except Exception: + err_msg = f"Unable to find Database... {self.Catalog}" + logger.error(err_msg) + raise Exception(err_msg) + logger.info(f"Connected to Database - {self.Database.Name}") + self.CompatibilityLevel: int = self.Database.CompatibilityLevel + self.CompatibilityMode: int = self.Database.CompatibilityMode.value__ + self.Model = self.Database.Model + logger.info(f"Connected to Model - {self.Model.Name}") + self.AdomdConnection = AdomdConnection() + self.AdomdConnection.ConnectionString = f"{self.Server.ConnectionString}Password='{self.Server.ConnectionInfo.Password}'" + self.Reload_Model_Info() + logger.debug("Class Initialization Completed") + logger.debug("Registering Disconnect on Termination...") + atexit.register(self.Disconnect) + + pass + + def __repr__(self) -> str: + return f"{self.Server.Name}::{self.Database.Name}::{self.Model.Name}\nEstimated Size::{self.Database.EstimatedSize}\nTables::{len(self.Tables)}\nColumns::{len(self.Columns)}\nPartitions::{len(self.Partitions)}\nMeasures::{len(self.Measures)}" + + def Reload_Model_Info(self) -> bool: + """Runs on __init__ iterates through details, can be called after any model changes. Called in SaveChanges() + + Returns: + bool: True if successful + """ + self.Tables = [table for table in self.Model.Tables.GetEnumerator()] + self.Columns = [ + column for table in self.Tables for column in table.Columns.GetEnumerator() + ] + self.Partitions = [ + partition + for table in self.Tables + for partition in table.Partitions.GetEnumerator() + ] + self.Measures = [ + measure + for table in self.Tables + for measure in table.Measures.GetEnumerator() + ] + self.Database.Refresh() + return True + + def Is_Process(self) -> bool: + """Run method to check if Processing is occurring. Will query DMV $SYSTEM.DISCOVER_JOBS to see if any processing is happening. + + Returns: + bool: True if DMV shows Process, False if not. + """ + _jobs_df = self.Query("select * from $SYSTEM.DISCOVER_JOBS") + return ( + True + if len(_jobs_df[_jobs_df["JOB_DESCRIPTION"] == "Process"]) > 0 + else False + ) + + def Disconnect(self) -> bool: + """Disconnects from Model + + Returns: + bool: True if successful + """ + logger.debug(f"Disconnecting from - {self.Server.Name}") + return self.Server.Disconnect() + + def Refresh( + self, + Object: Union[str, Table, Partition, Dict[str, Any]], + RefreshType: RefreshType = RefreshType.Full, + Tracing=False, + ) -> None: + """Refreshes table(s) and partition(s). + + Args: + Object (Union[ str, Table, Partition, Dict[str, Any], Iterable[str, Table, Partition, Dict[str, Any]] ]): Designed to handle a few different ways of selecting a refresh. + str == 'Table_Name' + Table == Table Object + Partition == Partition Object + Dict[str, Any] == A way to specify a partition of group of partitions. For ex. {'Table_Name':'Partition1'} or {'Table_Name':['Partition1','Partition2']}. NOTE you can also change out the strings for partition or tables objects. + RefreshType (RefreshType, optional): See [RefreshType](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.refreshtype?view=analysisservices-dotnet). Defaults to RefreshType.Full. + Tracing (bool, optional): Currently just some basic tracing to track refreshes. Defaults to False. + + Raises: + Exception: Raises exception if unable to find table or partition via string. + + + Returns: + WIP: WIP + """ + logger.debug("Beginning RequestRefresh cadence...") + + def _Refresh_Report(Property_Changes) -> pd.DataFrame: + logger.debug("Running Refresh Report...") + refresh_data = [] + for property_change in Property_Changes: + if ( + isinstance(property_change.Object, Partition) + and property_change.Property_Name == "RefreshedTime" + ): + table, partition, refreshed_time = ( + property_change.Object.Table.Name, + property_change.Object.Name, + ticks_to_datetime(property_change.New_Value.Ticks), + ) + logger.info( + f'{table} - {partition} Refreshed! - {refreshed_time.strftime("%m/%d/%Y, %H:%M:%S")}' + ) + refresh_data += [[table, partition, refreshed_time]] + return pd.DataFrame( + refresh_data, columns=["Table", "Partition", "Refreshed Time"] + ) + + def refresh_table(table: Table) -> None: + logging.info(f"Requesting refresh for {table.Name}") + table.RequestRefresh(RefreshType) + + def refresh_partition(partition: Partition) -> None: + logging.info( + f"Requesting refresh for {partition.Table.Name}|{partition.Name}" + ) + partition.RequestRefresh(RefreshType) + + def refresh_dict(partition_dict: Dict) -> None: + for table in partition_dict.keys(): + table_object = find_table(table) if isinstance(table, str) else table + + def handle_partitions(object): + if isinstance(object, str): + refresh_partition(find_partition(table_object, object)) + elif isinstance(object, Partition): + refresh_partition(object) + else: + [handle_partitions(obj) for obj in object] + + handle_partitions(partition_dict[table]) + + def find_table(table_str: str) -> Table: + result = self.Model.Tables.Find(table_str) + if result is None: + raise Exception(f"Unable to find table! from {table_str}") + logging.debug(f"Found table {result.Name}") + return result + + def find_partition(table: Table, partition_str: str) -> Partition: + result = table.Partitions.Find(partition_str) + if result is None: + raise Exception( + f"Unable to find partition! {table.Name}|{partition_str}" + ) + logging.debug(f"Found partition {result.Table.Name}|{result.Name}") + return result + + def refresh(Object): + if isinstance(Object, str): + refresh_table(find_table(Object)) + elif isinstance(Object, Dict): + refresh_dict(Object) + elif isinstance(Object, Table): + refresh_table(Object) + elif isinstance(Object, Partition): + refresh_partition(Object) + else: + [refresh(object) for object in Object] + + refresh(Object) + if Tracing: + rt = Refresh_Trace(self) + rt.Start() + + m = self.SaveChanges() + + if Tracing: + rt.Stop() + rt.Drop() + + return _Refresh_Report(m.Property_Changes) + + def Update(self, UpdateOptions: UpdateOptions = UpdateOptions.ExpandFull) -> None: + """[Update Model](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.majorobject.update?view=analysisservices-dotnet#microsoft-analysisservices-majorobject-update(microsoft-analysisservices-updateoptions)) + + Args: + UpdateOptions (UpdateOptions, optional): See above MS Doc link. Defaults to UpdateOptions.ExpandFull. + + Returns: + None: Placeholder to eventually change. + """ + logger.debug("Running Update Request") + return self.Database.Update(UpdateOptions) + + def SaveChanges(self) -> bool: + def property_changes(Property_Changes): + Property_Change = namedtuple( + "Property_Change", + "New_Value Object Original_Value Property_Name Property_Type", + ) + return [ + Property_Change( + change.NewValue, + change.Object, + change.OriginalValue, + change.PropertyName, + change.PropertyType, + ) + for change in Property_Changes.GetEnumerator() + ] + + logger.info("Executing SaveChanges()...") + Model_Save_Results = self.Model.SaveChanges() + if isinstance(Model_Save_Results.Impact, type(None)): + logger.warning(f"No changes detected on save for {self.Server.Name}") + return None + else: + Property_Changes = Model_Save_Results.Impact.PropertyChanges + Added_Objects = Model_Save_Results.Impact.AddedObjects + Added_Subtree_Roots = Model_Save_Results.Impact.AddedSubtreeRoots + Removed_Objects = Model_Save_Results.Impact.RemovedObjects + Removed_Subtree_Roots = Model_Save_Results.Impact.RemovedSubtreeRoots + Xmla_Results = Model_Save_Results.XmlaResults + Changes = namedtuple( + "Changes", + "Property_Changes Added_Objects Added_Subtree_Roots Removed_Objects Removed_Subtree_Roots Xmla_Results", + ) + [ + property_changes(Property_Changes), + Added_Objects, + Added_Subtree_Roots, + Removed_Objects, + Removed_Subtree_Roots, + Xmla_Results, + ] + self.Reload_Model_Info() + return Changes( + property_changes(Property_Changes), + Added_Objects, + Added_Subtree_Roots, + Removed_Objects, + Removed_Subtree_Roots, + Xmla_Results, + ) + + def Backup_Table(self, table_str: str) -> bool: + """USE WITH CAUTION, EXPERIMENTAL. Backs up table in memory, brings with it measures, columns, hierarchies, relationships, roles, etc. + It will add suffix '_backup' to all objects. + Refresh is performed from source during backup. + + Args: + table_str (str, optional): Name of Table. + + Returns: + bool: Returns True if Successful, else will return error. + """ + logger.info("Backup Beginning...") + logger.debug(f"Cloning {table_str}") + table = self.Model.Tables.Find(table_str).Clone() + logger.info("Beginning Renames") + + def rename(items): + for item in items: + item.RequestRename(f"{item.Name}_backup") + logger.debug(f"Renamed - {item.Name}") + + logger.info("Renaming Columns") + rename(table.Columns.GetEnumerator()) + logger.info("Renaming Partitions") + rename(table.Partitions.GetEnumerator()) + logger.info("Renaming Measures") + rename(table.Measures.GetEnumerator()) + logger.info("Renaming Hierarchies") + rename(table.Hierarchies.GetEnumerator()) + logger.info("Renaming Table") + table.RequestRename(f"{table.Name}_backup") + logger.info("Adding Table to Model as backup") + self.Model.Tables.Add(table) + logger.info("Finding Necessary Relationships... Cloning...") + relationships = [ + relationship.Clone() + for relationship in self.Model.Relationships.GetEnumerator() + if relationship.ToTable.Name == remove_suffix(table.Name, "_backup") + or relationship.FromTable.Name == remove_suffix(table.Name, "_backup") + ] + logger.info("Renaming Relationships") + rename(relationships) + logger.info("Switching Relationships to Clone Table & Column") + for relationship in relationships: + logger.debug(f"Renaming - {relationship.Name}") + if relationship.ToTable.Name == remove_suffix(table.Name, "_backup"): + relationship.set_ToColumn( + table.Columns.Find(f"{relationship.ToColumn.Name}_backup") + ) + elif relationship.FromTable.Name == remove_suffix(table.Name, "_backup"): + relationship.set_FromColumn( + table.Columns.Find(f"{relationship.FromColumn.Name}_backup") + ) + logger.debug(f"Adding {relationship.Name} to {self.Model.Name}") + self.Model.Relationships.Add(relationship) + + def clone_role_permissions(): + logger.info("Beginning to handle roles and permissions for table...") + logger.debug("Finding Roles...") + roles = [ + role + for role in self.Model.Roles.GetEnumerator() + for tablepermission in role.TablePermissions.GetEnumerator() + if tablepermission.Name == table_str + ] + for role in roles: + logger.debug(f"Role {role.Name} matched, looking into it...") + logger.debug("Searching for table specific permissions") + tablepermissions = [ + table.Clone() + for table in role.TablePermissions.GetEnumerator() + if table.Name == table_str + ] + for tablepermission in tablepermissions: + logger.debug( + f"{tablepermission.Name} found... switching table to clone" + ) + tablepermission.set_Table(table) + for column in tablepermission.ColumnPermissions.GetEnumerator(): + logger.debug( + f"Column - {column.Name} copying permissions to clone..." + ) + column.set_Column( + self.Model.Tables.Find(table.Name).Columns.Find( + f"{column.Name}_backup" + ) + ) + logger.debug(f"Adding {tablepermission.Name} to {role.Name}") + role.TablePermissions.Add(tablepermission) + return True + + clone_role_permissions() + logger.info(f"Refreshing Clone... {table.Name}") + self.Refresh([table]) + logger.info(f"Updating Model {self.Model.Name}") + self.SaveChanges() + return True + + def Revert_Table(self, table_str: str) -> bool: + """USE WITH CAUTION, EXPERIMENTAL. This is used in conjunction with Backup_Table(). + It will take the 'TableName_backup' and replace with the original. + Example scenario -> + 1. model.Backup_Table('TableName') + 2. perform any proposed changes in original 'TableName' + 3. validate changes in 'TableName' + 4. if unsuccessful run model.Revert_Table('TableName') + + Args: + table_str (str): Name of table. + + Returns: + bool: Returns True if Successful, else will return error. + """ + logger.info(f"Beginning Revert for {table_str}") + logger.debug(f"Finding original {table_str}") + main = self.Model.Tables.Find(table_str) + logger.debug(f"Finding backup {table_str}") + backup = self.Model.Tables.Find(f"{table_str}_backup") + logger.debug("Finding original relationships") + main_relationships = [ + relationship + for relationship in self.Model.Relationships.GetEnumerator() + if relationship.ToTable.Name == main.Name + or relationship.FromTable.Name == main.Name + ] + logger.debug("Finding backup relationships") + backup_relationships = [ + relationship + for relationship in self.Model.Relationships.GetEnumerator() + if relationship.ToTable.Name == backup.Name + or relationship.FromTable.Name == backup.Name + ] + + def remove_role_permissions(): + logger.debug( + f"Finding table and column permission in roles to remove from {table_str}" + ) + roles = [ + role + for role in self.Model.Roles.GetEnumerator() + for tablepermission in role.TablePermissions.GetEnumerator() + if tablepermission.Name == table_str + ] + for role in roles: + logger.debug(f"Role {role.Name} Found") + tablepermissions = [ + table + for table in role.TablePermissions.GetEnumerator() + if table.Name == table_str + ] + for tablepermission in tablepermissions: + logger.debug(f"Removing {tablepermission.Name} from {role.Name}") + role.TablePermissions.Remove(tablepermission) + + for relationship in main_relationships: + logger.debug("Cleaning relationships...") + if relationship.ToTable.Name == main.Name: + logger.debug(f"Removing {relationship.Name}") + self.Model.Relationships.Remove(relationship) + elif relationship.FromTable.Name == main.Name: + logger.debug(f"Removing {relationship.Name}") + self.Model.Relationships.Remove(relationship) + logger.debug(f"Removing Original Table {main.Name}") + self.Model.Tables.Remove(main) + remove_role_permissions() + + def dename(items): + for item in items: + logger.debug(f"Removing Suffix for {item.Name}") + item.RequestRename(remove_suffix(item.Name, "_backup")) + logger.debug(f"Saving Changes... for {item.Name}") + self.Model.SaveChanges() + + logger.info("Name changes for Columns...") + dename( + [ + column + for column in backup.Columns.GetEnumerator() + if column.Type != ColumnType.RowNumber + ] + ) + logger.info("Name changes for Partitions...") + dename(backup.Partitions.GetEnumerator()) + logger.info("Name changes for Measures...") + dename(backup.Measures.GetEnumerator()) + logger.info("Name changes for Hierarchies...") + dename(backup.Hierarchies.GetEnumerator()) + logger.info("Name changes for Relationships...") + dename(backup_relationships) + logger.info("Name changes for Backup Table...") + backup.RequestRename(remove_suffix(backup.Name, "_backup")) + self.SaveChanges() + return True + + def Query(self, Query_Str: str) -> Union[pd.DataFrame, str, int]: + """Executes Query on Model and Returns Results in Pandas DataFrame + + Args: + Query_Str (str): Dax Query. Note, needs full syntax (ex: EVALUATE). See (DAX Queries)[https://docs.microsoft.com/en-us/dax/dax-queries]. + Will check if query string is a file. If it is, then it will perform a query on whatever is read from the file. + It is also possible to query DMV. For example. Query("select * from $SYSTEM.DISCOVER_TRACE_EVENT_CATEGORIES"). See (DMVs)[https://docs.microsoft.com/en-us/analysis-services/instances/use-dynamic-management-views-dmvs-to-monitor-analysis-services?view=asallproducts-allversions] + + Returns: + pd.DataFrame: Returns dataframe with results + """ + + if os.path.isfile(Query_Str): + logging.debug( + f"File path detected, reading file... -> {Query_Str}", + ) + with open(Query_Str, "r") as file: + Query_Str = str(file.read()) + + try: + logger.debug("Setting first initial Adomd Connection...") + self.AdomdConnection.Open() + logger.debug("Connected!") + except Exception: + pass + logger.info("Querying Model...") + Query = AdomdCommand(Query_Str, self.AdomdConnection).ExecuteReader() + Column_Headers = [ + (index, Query.GetName(index)) for index in range(0, Query.FieldCount) + ] + Results = list() + while Query.Read(): + Results.append( + [Query.GetValue(index) for index in range(0, len(Column_Headers))] + ) + Query.Close() + logger.debug("Data retrieved... reading...") + df = pd.DataFrame(Results, columns=[value for _, value in Column_Headers]) + if len(df) == 1 and len(df.columns) == 1: + return df.iloc[0][df.columns[0]] + return df + + def Query_Every_Column( + self, query_function: str = "COUNTROWS(VALUES(_))" + ) -> pd.DataFrame: + """This will dynamically create a query to pull all columns from the model and run the query function. +
It will replace the _ with the column to run. + + Args: + query_function (str, optional): Dax query is dynamically building a query with the UNION & ROW DAX Functions. + + Returns: + pd.DataFrame: Returns dataframe with results. + """ + logger.info("Beginning execution of querying every column...") + logger.debug(f"Function to be run: {query_function}") + logger.debug("Dynamically creating DAX query...") + query_str = "EVALUATE UNION(\n" + for column in self.Columns: + if column.Type != ColumnType.RowNumber: + table_name = column.Table.get_Name() + column_name = column.get_Name() + dax_identifier = f"'{table_name}'[{column_name}]" + query_str += f"ROW(\"Table\",\"{table_name}\",\"Column\",\"{column_name}\",\"{query_function}\",{query_function.replace('_',dax_identifier)}),\n" + query_str = f"{query_str[:-2]})" + return self.Query(query_str) + + def Query_Every_Table(self, query_function: str = "COUNTROWS(_)") -> pd.DataFrame: + """This will dynamically create a query to pull all tables from the model and run the query function. + It will replace the _ with the table to run. + + Args: + query_function (str, optional): Dax query is dynamically building a query with the UNION & ROW DAX Functions. Defaults to 'COUNTROWS(_)'. + + Returns: + pd.DataFrame: Returns dataframe with results + """ + logger.info("Beginning execution of querying every table...") + logger.debug(f"Function to be run: {query_function}") + logger.debug("Dynamically creating DAX query...") + query_str = "EVALUATE UNION(\n" + for table in self.Tables: + table_name = table.get_Name() + dax_table_identifier = f"'{table_name}'" + query_str += f"ROW(\"Table\",\"{table_name}\",\"{query_function}\",{query_function.replace('_',dax_table_identifier)}),\n" + query_str = f"{query_str[:-2]})" + return self.Query(query_str) + + def Analyze_BPA( + self, Tabular_Editor_Exe: str, Best_Practice_Analyzer: str + ) -> List[str]: + """Takes your Tabular Model and performs TE2s BPA. Runs through Command line. + [Tabular Editor BPA](https://docs.tabulareditor.com/te2/Best-Practice-Analyzer.html) + [Tabular Editor Command Line Options](https://docs.tabulareditor.com/te2/Command-line-Options.html) + + Args: + Tabular_Editor_Exe (str): TE2 Exe File path. Feel free to use class TE2().EXE_Path or provide your own. + Best_Practice_Analyzer (str): BPA json file path. Feel free to use class BPA().Location or provide your own. Defualts to https://raw.githubusercontent.com/microsoft/Analysis-Services/master/BestPracticeRules/BPARules.json + + Returns: + List[str]: Assuming no failure, will return list of BPA violations. Else will return error from command line. + """ + # Working TE2 Script in Python os.system(f"start /wait {te2.EXE_Path} \"Provider=MSOLAP;{model.AdomdConnection.ConnectionString}\" FINANCE -B \"{os.getcwd()}\\Model.bim\" -A {l.BPA_LOCAL_FILE_PATH} -V/?") + # start /wait + logger.debug("Beginning request to talk with TE2 & Find BPA...") + cmd = f'{Tabular_Editor_Exe} "Provider=MSOLAP;{self.AdomdConnection.ConnectionString}" {self.Database.Name} -B "{os.getcwd()}\\Model.bim" -A {Best_Practice_Analyzer} -V/?' + logger.debug("Command Generated") + logger.debug("Submitting Command...") + sp = subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + raw_output, error = sp.communicate() + if len(error) > 0: + return error + else: + return [ + output for output in raw_output.split("\n") if "violates rule" in output + ] + + def Create_Table(self, df: pd.DataFrame, table_name: str) -> bool: + """Creates tables from pd.DataFrame as an M-Partition. + So will convert the dataframe to M-Partition logic via the M query table constructor. + Runs refresh and will update model. + + Args: + df (pd.DataFrame): DataFrame to add to model + table_name (str): _description_ + + Returns: + bool: True if successful + """ + logger.debug(f"Beginning to create table for {table_name}...") + new_table = Table() + new_table.RequestRename(table_name) + logger.debug("Sorting through columns...") + df_column_names = df.columns + dtype_conversion = pandas_datatype_to_tabular_datatype(df) + for df_column_name in df_column_names: + logger.debug(f"Adding {df_column_name} to Table...") + column = DataColumn() + column.RequestRename(df_column_name) + column.set_SourceColumn(df_column_name) + column.set_DataType(dtype_conversion[df_column_name]) + new_table.Columns.Add(column) + logger.debug("Expression String Created...") + logger.debug("Creating MPartition...") + partition = Partition() + partition.set_Source(MPartitionSource()) + logger.debug("Setting MPartition Expression...") + partition.Source.set_Expression(pd_dataframe_to_m_expression(df)) + logger.debug( + f"Adding partition: {partition.Name} to {self.Server.Name}::{self.Database.Name}::{self.Model.Name}" + ) + new_table.Partitions.Add(partition) + logger.debug( + f"Adding table: {new_table.Name} to {self.Server.Name}::{self.Database.Name}::{self.Model.Name}" + ) + self.Model.Tables.Add(new_table) + self.Refresh([new_table]) + self.SaveChanges() + return True diff --git a/pytabular/tabular_editor.py b/pytabular/tabular_editor.py index 95a2109..3c87f15 100644 --- a/pytabular/tabular_editor.py +++ b/pytabular/tabular_editor.py @@ -1,48 +1,53 @@ import logging -logger = logging.getLogger('PyTabular') import os import requests as r import zipfile as Z import atexit from logic_utils import remove_folder_and_contents -def Download_Tabular_Editor(Download_Location:str = 'https://github.com/TabularEditor/TabularEditor/releases/download/2.16.7/TabularEditor.Portable.zip', -Folder:str = 'Tabular_Editor_2', -Auto_Remove = True) -> str: - '''Runs a request.get() to retrieve the zip file from web. Will unzip response and store in directory. Will also register the removal of the new directory and files when exiting program. - - Args: - Download_Location (str, optional): File path for zip of Tabular Editor 2. Defaults to [Tabular Editor 2 Github Zip Location]'https://github.com/TabularEditor/TabularEditor/releases/download/2.16.7/TabularEditor.Portable.zip'. - Folder (str, optional): New Folder Location. Defaults to 'Tabular_Editor_2'. - Auto_Remove (bool, optional): Boolean to determine auto removal of files once script exits. Defaults to True. - - Returns: - str: File path of TabularEditor.exe - ''' - logger.info(f'Downloading Tabular Editor 2...') - logger.info(f'From... {Download_Location}') - folder_location = os.path.join(os.getcwd(),Folder) - response = r.get(Download_Location) - file_location = f"{os.getcwd()}\\{Download_Location.split('/')[-1]}" - with open(file_location, 'wb') as te2_zip: - te2_zip.write(response.content) - with Z.ZipFile(file_location) as zipper: - zipper.extractall(path=folder_location) - logger.debug(f'Removing Zip File...') - os.remove(file_location) - logger.info(f'Tabular Editor Downloaded and Extracted to {folder_location}') - if Auto_Remove: - logger.debug(f'Registering removal on termination... For {folder_location}') - atexit.register(remove_folder_and_contents, folder_location) - return f'{folder_location}\\TabularEditor.exe' +logger = logging.getLogger("PyTabular") + + +def Download_Tabular_Editor( + Download_Location: str = "https://github.com/TabularEditor/TabularEditor/releases/download/2.16.7/TabularEditor.Portable.zip", + Folder: str = "Tabular_Editor_2", + Auto_Remove=True, +) -> str: + """Runs a request.get() to retrieve the zip file from web. Will unzip response and store in directory. Will also register the removal of the new directory and files when exiting program. + + Args: + Download_Location (str, optional): File path for zip of Tabular Editor 2. Defaults to [Tabular Editor 2 Github Zip Location]'https://github.com/TabularEditor/TabularEditor/releases/download/2.16.7/TabularEditor.Portable.zip'. + Folder (str, optional): New Folder Location. Defaults to 'Tabular_Editor_2'. + Auto_Remove (bool, optional): Boolean to determine auto removal of files once script exits. Defaults to True. + + Returns: + str: File path of TabularEditor.exe + """ + logger.info("Downloading Tabular Editor 2...") + logger.info(f"From... {Download_Location}") + folder_location = os.path.join(os.getcwd(), Folder) + response = r.get(Download_Location) + file_location = f"{os.getcwd()}\\{Download_Location.split('/')[-1]}" + with open(file_location, "wb") as te2_zip: + te2_zip.write(response.content) + with Z.ZipFile(file_location) as zipper: + zipper.extractall(path=folder_location) + logger.debug("Removing Zip File...") + os.remove(file_location) + logger.info(f"Tabular Editor Downloaded and Extracted to {folder_location}") + if Auto_Remove: + logger.debug(f"Registering removal on termination... For {folder_location}") + atexit.register(remove_folder_and_contents, folder_location) + return f"{folder_location}\\TabularEditor.exe" + class Tabular_Editor: - '''Setting Tabular_Editor Class for future work. - ''' - def __init__(self, EXE_File_Path:str = 'Default') -> None: - logger.debug(f'Initializing Tabular Editor Class:: {EXE_File_Path}') - if EXE_File_Path == 'Default': - self.EXE: str = Download_Tabular_Editor() - else: - self.EXE: str = EXE_File_Path - pass \ No newline at end of file + """Setting Tabular_Editor Class for future work.""" + + def __init__(self, EXE_File_Path: str = "Default") -> None: + logger.debug(f"Initializing Tabular Editor Class:: {EXE_File_Path}") + if EXE_File_Path == "Default": + self.EXE: str = Download_Tabular_Editor() + else: + self.EXE: str = EXE_File_Path + pass diff --git a/pytabular/tabular_tracing.py b/pytabular/tabular_tracing.py index 5a3e6ce..0c8489e 100644 --- a/pytabular/tabular_tracing.py +++ b/pytabular/tabular_tracing.py @@ -1,148 +1,216 @@ import logging -logger = logging.getLogger('PyTabular') import random import xmltodict from typing import List, Callable from Microsoft.AnalysisServices.Tabular import Trace, TraceEvent, TraceEventHandler from Microsoft.AnalysisServices import TraceColumn, TraceEventClass, TraceEventSubclass +logger = logging.getLogger("PyTabular") class Base_Trace: - '''Generates Trace to be run on Server. This is the base class to customize the type of Trace you are looking for. - [Server Traces](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.server.traces?view=analysisservices-dotnet#microsoft-analysisservices-tabular-server-traces) - - Args: - Tabular_Class (Tabular): Tabular Class to retrieve the connected Server and Model. - Trace_Events (List[TraceEvent]): List of Trace Events that you wish to track. [TraceEventClass](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.traceeventclass?view=analysisservices-dotnet) - Trace_Event_Columns (List[TraceColumn]): List of Trace Event Columns you with to track. [TraceEventColumn](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tracecolumn?view=analysisservices-dotnet) - Handler (Callable): Function to call when Trace returns response. Input needs to be two arguments. One is source (Which is currently None... Need to investigate why). Second is [TraceEventArgs](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.traceeventargs?view=analysisservices-dotnet) - ''' - def __init__(self, Tabular_Class, Trace_Events:List[TraceEvent], Trace_Event_Columns:List[TraceColumn], Handler:Callable) -> None: - logger.debug(f'Trace Base Class initializing...') - self.Name = 'PyTabular_'+''.join(random.SystemRandom().choices([str(x) for x in [y for y in range(0,10)]], k=10)) - self.ID = self.Name.replace('PyTabular_', '') - self.Trace = Trace(self.Name, self.ID) - logger.debug(f'Trace {self.Trace.Name} created...') - self.Tabular_Class = Tabular_Class - self.Event_Categories = self._Query_DMV_For_Event_Categories() - - self.Trace_Events = Trace_Events - self.Trace_Event_Columns = Trace_Event_Columns - self.Handler = Handler - - self.Build() - self.Add() - self.Update() - - def Build(self) -> bool: - '''Run on initialization. This will take the inputed arguments for the class and attempt to build the Trace. - - Returns: - bool: True if successful - ''' - logger.info(f'Building Trace {self.Name}') - TE = [TraceEvent(trace_event) for trace_event in self.Trace_Events] - logger.debug(f'Adding Events to... {self.Trace.Name}') - [self.Trace.get_Events().Add(te) for te in TE] - - def add_column(trace_event,trace_event_column): - try: - trace_event.Columns.Add(trace_event_column) - except: - logger.warning(f'{trace_event} - {trace_event_column} Skipped') - - logger.debug(f'Adding Trace Event Columns...') - [add_column(trace_event,trace_event_column) for trace_event_column in self.Trace_Event_Columns for trace_event in TE if str(trace_event_column.value__) in self.Event_Categories[str(trace_event.EventID.value__)] ] - - logger.debug(f'Adding Handler to Trace...') - self.Handler = TraceEventHandler(self.Handler) - self.Trace.OnEvent += self.Handler - return True - - def Arguments(Trace_Events: List[TraceEvent], Trace_Event_Columns: List[TraceColumn], Handler: Callable): - raise NotImplementedError - - def Add(self) -> int: - '''Runs on initialization. Adds built Trace to the Server. - - Returns: - int: Return int of placement in Server.Traces.get_Item(int) - ''' - logger.info(f'Adding {self.Name} to {self.Tabular_Class.Server.Name}') - return self.Tabular_Class.Server.Traces.Add(self.Trace) - - def Update(self) -> None: - '''Runs on initialization. Syncs with Server. - - Returns: - None: Returns None. Unless unsuccessful then it will return the error from Server. - ''' - logger.info(f'Updating {self.Name} in {self.Tabular_Class.Server.Name}') - return self.Trace.Update() - - def Start(self) -> None: - '''Call when you want to start the Trace - - Returns: - None: Returns None. Unless unsuccessful then it will return the error from Server. - ''' - logger.info(f'Starting {self.Name} in {self.Tabular_Class.Server.Name}') - return self.Trace.Start() - - def Stop(self) -> None: - '''Call when you want to stop the Trace - - Returns: - None: Returns None. Unless unsuccessful then it will return the error from Server. - ''' - logger.info(f'Stopping {self.Name} in {self.Tabular_Class.Server.Name}') - return self.Trace.Stop() - - def Drop(self) -> None: - '''Call when you want to drop the Trace - - Returns: - None: Returns None. Unless unsuccessful then it will return the error from Server. - ''' - logger.info(f'Dropping {self.Name} in {self.Tabular_Class.Server.Name}') - return self.Trace.Drop() - - def _Query_DMV_For_Event_Categories(self): - '''Internal use. Called during the building process to locate allowed columns for event categories. This is done by executing a Tabular().Query() on the DISCOVER_EVENT_CATEGORIES table in the DMV. Then the function will parse the results, as it is xml inside of rows. - - Returns: - _type_: _description_ - ''' - Event_Categories = {} - events = [] - logger.debug(f'Querying DMV for columns rules...') - logger.debug(f'select * from $SYSTEM.DISCOVER_TRACE_EVENT_CATEGORIES') - df = self.Tabular_Class.Query("select * from $SYSTEM.DISCOVER_TRACE_EVENT_CATEGORIES") - for index, row in df.iterrows(): - xml_data = xmltodict.parse(row.Data) - if type(xml_data['EVENTCATEGORY']['EVENTLIST']['EVENT']) == list: - events += [event for event in xml_data['EVENTCATEGORY']['EVENTLIST']['EVENT'] ] - else: - events += [xml_data['EVENTCATEGORY']['EVENTLIST']['EVENT']] - for event in events: - Event_Categories[event['ID']] = [column['ID'] for column in event['EVENTCOLUMNLIST']['EVENTCOLUMN']] - return Event_Categories + """Generates Trace to be run on Server. + This is the base class to customize the type of Trace you are looking for. + [Server Traces](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.server.traces?view=analysisservices-dotnet#microsoft-analysisservices-tabular-server-traces) + + Args: + Tabular_Class (Tabular): Tabular Class. + Trace_Events (List[TraceEvent]): List of Trace Events. + [TraceEventClass](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.traceeventclass?view=analysisservices-dotnet) + Trace_Event_Columns (List[TraceColumn]): List of Trace Event Columns. + [TraceEventColumn](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tracecolumn?view=analysisservices-dotnet) + Handler (Callable): Function to call when Trace returns response. + Input needs to be two arguments. + One is source (Which is currently None... Need to investigate why). + Second is + [TraceEventArgs](https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.traceeventargs?view=analysisservices-dotnet) + """ + + def __init__( + self, + Tabular_Class, + Trace_Events: List[TraceEvent], + Trace_Event_Columns: List[TraceColumn], + Handler: Callable, + ) -> None: + logger.debug("Trace Base Class initializing...") + self.Name = "PyTabular_" + "".join( + random.SystemRandom().choices( + [str(x) for x in [y for y in range(0, 10)]], k=10 + ) + ) + self.ID = self.Name.replace("PyTabular_", "") + self.Trace = Trace(self.Name, self.ID) + logger.debug(f"Trace {self.Trace.Name} created...") + self.Tabular_Class = Tabular_Class + self.Event_Categories = self._Query_DMV_For_Event_Categories() + + self.Trace_Events = Trace_Events + self.Trace_Event_Columns = Trace_Event_Columns + self.Handler = Handler + + self.Build() + self.Add() + self.Update() + + def Build(self) -> bool: + """Run on initialization. + This will take the inputed arguments for the class + and attempt to build the Trace. + + Returns: + bool: True if successful + """ + logger.info(f"Building Trace {self.Name}") + TE = [TraceEvent(trace_event) for trace_event in self.Trace_Events] + logger.debug(f"Adding Events to... {self.Trace.Name}") + [self.Trace.get_Events().Add(te) for te in TE] + + def add_column(trace_event, trace_event_column): + try: + trace_event.Columns.Add(trace_event_column) + except Exception: + logger.warning(f"{trace_event} - {trace_event_column} Skipped") + pass + + logger.debug("Adding Trace Event Columns...") + [ + add_column(trace_event, trace_event_column) + for trace_event_column in self.Trace_Event_Columns + for trace_event in TE + if str(trace_event_column.value__) + in self.Event_Categories[str(trace_event.EventID.value__)] + ] + + logger.debug("Adding Handler to Trace...") + self.Handler = TraceEventHandler(self.Handler) + self.Trace.OnEvent += self.Handler + return True + + def Arguments( + Trace_Events: List[TraceEvent], + Trace_Event_Columns: List[TraceColumn], + Handler: Callable, + ): + raise NotImplementedError + + def Add(self) -> int: + """Runs on initialization. Adds built Trace to the Server. + + Returns: + int: Return int of placement in Server.Traces.get_Item(int) + """ + logger.info(f"Adding {self.Name} to {self.Tabular_Class.Server.Name}") + return self.Tabular_Class.Server.Traces.Add(self.Trace) + + def Update(self) -> None: + """Runs on initialization. Syncs with Server. + + Returns: + None: Returns None. + Unless unsuccessful then it will return the error from Server. + """ + logger.info(f"Updating {self.Name} in {self.Tabular_Class.Server.Name}") + return self.Trace.Update() + + def Start(self) -> None: + """Call when you want to start the Trace + + Returns: + None: Returns None. + Unless unsuccessful then it will return the error from Server. + """ + logger.info(f"Starting {self.Name} in {self.Tabular_Class.Server.Name}") + return self.Trace.Start() + + def Stop(self) -> None: + """Call when you want to stop the Trace + + Returns: + None: Returns None. + Unless unsuccessful then it will return the error from Server. + """ + logger.info(f"Stopping {self.Name} in {self.Tabular_Class.Server.Name}") + return self.Trace.Stop() + + def Drop(self) -> None: + """Call when you want to drop the Trace + + Returns: + None: Returns None. Unless unsuccessful, + then it will return the error from Server. + """ + logger.info(f"Dropping {self.Name} in {self.Tabular_Class.Server.Name}") + return self.Trace.Drop() + + def _Query_DMV_For_Event_Categories(self): + """Internal use. + Called during the building process + to locate allowed columns for event categories. + This is done by executing a Tabular().Query() + on the DISCOVER_EVENT_CATEGORIES table in the DMV. + Then the function will parse the results, + as it is xml inside of rows. + + Returns: + _type_: _description_ + """ + Event_Categories = {} + events = [] + logger.debug("Querying DMV for columns rules...") + logger.debug("select * from $SYSTEM.DISCOVER_TRACE_EVENT_CATEGORIES") + df = self.Tabular_Class.Query( + "select * from $SYSTEM.DISCOVER_TRACE_EVENT_CATEGORIES" + ) + for index, row in df.iterrows(): + xml_data = xmltodict.parse(row.Data) + if type(xml_data["EVENTCATEGORY"]["EVENTLIST"]["EVENT"]) == list: + events += [ + event for event in xml_data["EVENTCATEGORY"]["EVENTLIST"]["EVENT"] + ] + else: + events += [xml_data["EVENTCATEGORY"]["EVENTLIST"]["EVENT"]] + for event in events: + Event_Categories[event["ID"]] = [ + column["ID"] for column in event["EVENTCOLUMNLIST"]["EVENTCOLUMN"] + ] + return Event_Categories def refresh_handler(source, args): - if args.EventSubclass == TraceEventSubclass.ReadData: - logger.debug(f'{args.ProgressTotal} - {args.ObjectPath}') - else: - logger.debug(f'{args.EventClass} - {args.EventSubclass} - {args.ObjectName}') + if args.EventSubclass == TraceEventSubclass.ReadData: + logger.debug(f"{args.ProgressTotal} - {args.ObjectPath}") + else: + logger.debug(f"{args.EventClass} - {args.EventSubclass} - {args.ObjectName}") + class Refresh_Trace(Base_Trace): - '''Subclass of Base_Trace. For built-in Refresh Tracing. - - Args: - Base_Trace (_type_): _description_ - ''' - def __init__(self, Tabular_Class, Trace_Events: List[TraceEvent] = [TraceEventClass.ProgressReportBegin,TraceEventClass.ProgressReportCurrent,TraceEventClass.ProgressReportEnd,TraceEventClass.ProgressReportError], - Trace_Event_Columns: List[TraceColumn] = [TraceColumn.EventSubclass,TraceColumn.CurrentTime, TraceColumn.ObjectName, TraceColumn.ObjectPath, TraceColumn.DatabaseName, TraceColumn.SessionID, TraceColumn.TextData, TraceColumn.EventClass, TraceColumn.ProgressTotal], - Handler: Callable = refresh_handler) -> None: - super().__init__(Tabular_Class, Trace_Events, Trace_Event_Columns, Handler) \ No newline at end of file + """Subclass of Base_Trace. For built-in Refresh Tracing. + + Args: + Base_Trace (_type_): _description_ + """ + + def __init__( + self, + Tabular_Class, + Trace_Events: List[TraceEvent] = [ + TraceEventClass.ProgressReportBegin, + TraceEventClass.ProgressReportCurrent, + TraceEventClass.ProgressReportEnd, + TraceEventClass.ProgressReportError, + ], + Trace_Event_Columns: List[TraceColumn] = [ + TraceColumn.EventSubclass, + TraceColumn.CurrentTime, + TraceColumn.ObjectName, + TraceColumn.ObjectPath, + TraceColumn.DatabaseName, + TraceColumn.SessionID, + TraceColumn.TextData, + TraceColumn.EventClass, + TraceColumn.ProgressTotal, + ], + Handler: Callable = refresh_handler, + ) -> None: + super().__init__(Tabular_Class, Trace_Events, Trace_Event_Columns, Handler) diff --git a/test/test_tabular.py b/test/test_tabular.py index a15cd8b..4f27aa0 100644 --- a/test/test_tabular.py +++ b/test/test_tabular.py @@ -1,80 +1,126 @@ import pytabular -import local as l +import local import pytest import pandas as pd from Microsoft.AnalysisServices.Tabular import Database -aas = pytabular.Tabular(l.AAS) -gen2 = pytabular.Tabular(l.GEN2) -testing_parameters = [(aas),(gen2)] +aas = pytabular.Tabular(local.AAS) +gen2 = pytabular.Tabular(local.GEN2) +testing_parameters = [(aas), (gen2)] testingtable = 'PyTestTable' -@pytest.mark.parametrize("model",testing_parameters) + +@pytest.mark.parametrize("model", testing_parameters) def test_sanity_check(model): - assert 1 == 1 + assert 1 == 1 + -@pytest.mark.parametrize("model",testing_parameters) +@pytest.mark.parametrize("model", testing_parameters) def test_connection(model): - ''' - Does a quick check to the Tabular Class - To ensure that it can connnect - ''' - assert model.Server.Connected + ''' + Does a quick check to the Tabular Class + To ensure that it can connnect + ''' + assert model.Server.Connected + -@pytest.mark.parametrize("model",testing_parameters) +@pytest.mark.parametrize("model", testing_parameters) def test_database(model): - assert isinstance(model.Database,Database) + assert isinstance(model.Database, Database) -@pytest.mark.parametrize("model",testing_parameters) + +@pytest.mark.parametrize("model", testing_parameters) def test_basic_query(model): - int_result = model.Query('EVALUATE {1}') - text_result = model.Query('EVALUATE {"Hello World"}') - assert int_result == 1 and text_result == 'Hello World' + int_result = model.Query('EVALUATE {1}') + text_result = model.Query('EVALUATE {"Hello World"}') + assert int_result == 1 and text_result == 'Hello World' -@pytest.mark.parametrize("model",testing_parameters) +@pytest.mark.parametrize("model", testing_parameters) def test_file_query(model): - singlevaltest = l.SINGLEVALTESTPATH - dfvaltest = l.DFVALTESTPATH - dfreplication = pd.DataFrame({'[Value1]':(1,3),'[Value2]':(2,4)}) - assert model.Query(singlevaltest) == 1 and model.Query(dfvaltest).equals(dfreplication) + singlevaltest = local.SINGLEVALTESTPATH + dfvaltest = local.DFVALTESTPATH + dfdupe = pd.DataFrame({'[Value1]': (1, 3), '[Value2]': (2, 4)}) + assert ( + model.Query(singlevaltest) == 1 and model.Query(dfvaltest).equals(dfdupe) + ) def remove_testing_table(model): - table_check = [table for table in model.Model.Tables.GetEnumerator() if testingtable in table.Name] - for table in table_check: - model.Model.Tables.Remove(table) - model.SaveChanges() - -@pytest.mark.parametrize("model",testing_parameters) + table_check = [ + table + for table + in model.Model.Tables.GetEnumerator() + if testingtable in table.Name + ] + for table in table_check: + model.Model.Tables.Remove(table) + model.SaveChanges() + + +@pytest.mark.parametrize("model", testing_parameters) def test_pre_table_checks(model): - remove_testing_table(model) - assert len([table for table in model.Model.Tables.GetEnumerator() if testingtable in table.Name]) == 0 - -@pytest.mark.parametrize("model",testing_parameters) + remove_testing_table(model) + assert len( + [ + table + for table + in model.Model.Tables.GetEnumerator() + if testingtable in table.Name + ] + ) == 0 + + +@pytest.mark.parametrize("model", testing_parameters) def test_create_table(model): - df = pd.DataFrame(data={'col1':[1,2,3],'col2':['four','five','six']}) - model.Create_Table(df,testingtable) - assert len(model.Query(f"EVALUATE {testingtable}")) == 3 + df = pd.DataFrame(data={'col1': [1, 2, 3], 'col2': ['four', 'five', 'six']}) + model.Create_Table(df, testingtable) + assert len( + model.Query(f"EVALUATE {testingtable}") + ) == 3 -@pytest.mark.parametrize("model",testing_parameters) -def test_backingup_table(model): - model.Backup_Table(testingtable) - assert len([table for table in model.Model.Tables.GetEnumerator() if f'{testingtable}_backup' == table.Name]) == 1 -@pytest.mark.parametrize("model",testing_parameters) +@pytest.mark.parametrize("model", testing_parameters) +def test_backingup_table(model): + model.Backup_Table(testingtable) + assert len( + [ + table + for table + in model.Model.Tables.GetEnumerator() + if f'{testingtable}_backup' == table.Name + ] + ) == 1 + + +@pytest.mark.parametrize("model", testing_parameters) def test_revert_table(model): - model.Revert_Table(testingtable) - assert len([table for table in model.Model.Tables.GetEnumerator() if f'{testingtable}' == table.Name]) == 1 - - -@pytest.mark.parametrize("model",testing_parameters) + model.Revert_Table(testingtable) + assert len( + [ + table + for table + in model.Model.Tables.GetEnumerator() + if f'{testingtable}' == table.Name + ] + ) == 1 + + +@pytest.mark.parametrize("model", testing_parameters) def test_table_removal(model): - remove_testing_table(model) - assert len([table for table in model.Model.Tables.GetEnumerator() if testingtable in table.Name]) == 0 - -@pytest.mark.parametrize("model",testing_parameters) + remove_testing_table(model) + assert len( + [ + table + for table + in model.Model.Tables.GetEnumerator() + if testingtable in table.Name + ] + ) == 0 + + +@pytest.mark.parametrize("model", testing_parameters) def test_bpa(model): - te2 = pytabular.Tabular_Editor().EXE - bpa = pytabular.BPA().Location - assert isinstance(model.Analyze_BPA(te2,bpa), list) + te2 = pytabular.Tabular_Editor().EXE + bpa = pytabular.BPA().Location + assert isinstance(model.Analyze_BPA(te2, bpa), list)