Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
ignore = E501, E203, W503, F401
31 changes: 31 additions & 0 deletions .github/workflows/flake8.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# This is a basic workflow to help you get started with Actions

name: flake8

# Controls when the workflow will run
on:
pull_request:
branches: [ master ]
push:
branches: [ master ]

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "build"
build:
# The type of runner that the job will run on
runs-on: ubuntu-latest

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- uses: actions/checkout@v2
with:
fetch-depth: 0
- uses: actions/setup-python@v2
- run: pip install --upgrade pip
- run: pip install flake8
- run: python3 -m flake8 --count
71 changes: 42 additions & 29 deletions pytabular/__init__.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,57 @@
# flake8: noqa
import logging
import os
import sys
import platform

logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s::%(module)s::%(funcName)s::%(levelname)s::%(message)s',
datefmt='%y/%m/%d %H:%M:%S %z')
logger = logging.getLogger('PyTabular')
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s::%(module)s::%(funcName)s::%(levelname)s::%(message)s",
datefmt="%y/%m/%d %H:%M:%S %z",
)
logger = logging.getLogger("PyTabular")
logger.setLevel(logging.DEBUG)
logger.info('Logging configured...')
logger.info(f'To update PyTabular logger...')
logger.info(f'>>> import logging')
logger.info(f'>>> pytabular.logger.setLevel(level=logging.INFO)')
logger.info(f'See https://docs.python.org/3/library/logging.html#logging-levels')
logger.info("Logging configured...")
logger.info(f"To update PyTabular logger...")
logger.info(f">>> import logging")
logger.info(f">>> pytabular.logger.setLevel(level=logging.INFO)")
logger.info(f"See https://docs.python.org/3/library/logging.html#logging-levels")


logger.debug(f'Python Version::{sys.version}')
logger.debug(f'Python Location::{sys.exec_prefix}')
logger.debug(f'Package Location::{__file__}')
logger.debug(f'Working Directory::{os.getcwd()}')
logger.debug(f'Platform::{sys.platform}-{platform.release()}')
logger.debug(f"Python Version::{sys.version}")
logger.debug(f"Python Location::{sys.exec_prefix}")
logger.debug(f"Package Location::{__file__}")
logger.debug(f"Working Directory::{os.getcwd()}")
logger.debug(f"Platform::{sys.platform}-{platform.release()}")

dll = os.path.join(os.path.dirname(__file__),"dll")
dll = os.path.join(os.path.dirname(__file__), "dll")
sys.path.append(dll)
sys.path.append(os.path.dirname(__file__))

logger.debug(f'Beginning CLR references...')
logger.debug(f"Beginning CLR references...")
import clr
logger.debug('Adding Reference Microsoft.AnalysisServices.AdomdClient')
clr.AddReference('Microsoft.AnalysisServices.AdomdClient')
logger.debug('Adding Reference Microsoft.AnalysisServices.Tabular')
clr.AddReference('Microsoft.AnalysisServices.Tabular')
logger.debug('Adding Reference Microsoft.AnalysisServices')
clr.AddReference('Microsoft.AnalysisServices')

logger.debug("Adding Reference Microsoft.AnalysisServices.AdomdClient")
clr.AddReference("Microsoft.AnalysisServices.AdomdClient")
logger.debug("Adding Reference Microsoft.AnalysisServices.Tabular")
clr.AddReference("Microsoft.AnalysisServices.Tabular")
logger.debug("Adding Reference Microsoft.AnalysisServices")
clr.AddReference("Microsoft.AnalysisServices")

logger.debug(f"Importing specifics in module...")
from . pytabular import Tabular
from . basic_checks import Return_Zero_Row_Tables, Table_Last_Refresh_Times, BPA_Violations_To_DF, Last_X_Interval
from . logic_utils import pd_dataframe_to_m_expression, pandas_datatype_to_tabular_datatype
from . tabular_tracing import Base_Trace, Refresh_Trace
from . tabular_editor import Tabular_Editor
from . best_practice_analyzer import BPA
logger.info(f'Import successful...')
from .pytabular import Tabular
from .basic_checks import (
Return_Zero_Row_Tables,
Table_Last_Refresh_Times,
BPA_Violations_To_DF,
Last_X_Interval,
)
from .logic_utils import (
pd_dataframe_to_m_expression,
pandas_datatype_to_tabular_datatype,
)
from .tabular_tracing import Base_Trace, Refresh_Trace
from .tabular_editor import Tabular_Editor
from .best_practice_analyzer import BPA

logger.info(f"Import successful...")
223 changes: 124 additions & 99 deletions pytabular/basic_checks.py
Original file line number Diff line number Diff line change
@@ -1,108 +1,133 @@
import logging
logger = logging.getLogger('PyTabular')
from typing import List, Union
import pytabular
from logic_utils import ticks_to_datetime
import sys
import pandas as pd

def Return_Zero_Row_Tables(model:pytabular.Tabular) -> List[str]:
''' Returns list of table names of those that are returning isna()

Args:
model (pytabular.Tabular): Tabular Model

Returns:
List[str]: List of table names where DAX COUNTROWS('Table Name') is nan or 0.
'''
logger.info(f'Executing Basic Function {sys._getframe(0).f_code.co_name}')
query_function: str = 'COUNTROWS(_)'
df: pd.DataFrame = model.Query_Every_Table(query_function)
return df[df[f'[{query_function}]'].isna()]['[Table]'].to_list()

def Table_Last_Refresh_Times(model:pytabular.Tabular, group_partition:bool = True) -> pd.DataFrame:
''' Returns pd.DataFrame of tables with their latest refresh time.
Optional 'group_partition' variable, default is True.
If False an extra column will be include to have the last refresh time to the grain of the partition
Example to add to model model.Create_Table(p.Table_Last_Refresh_Times(model),'RefreshTimes')

Args:
model (pytabular.Tabular): Tabular Model
group_partition (bool, optional): Whether or not you want the grain of the dataframe to be by table or by partition. Defaults to True.

Returns:
pd.DataFrame: pd dataframe with the RefreshedTime property: https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.partition.refreshedtime?view=analysisservices-dotnet#microsoft-analysisservices-tabular-partition-refreshedtime
If group_partition == True and the table has multiple partitions, then df.groupby(by["tables"]).max()
'''
logger.info(f'Executing Basic Function {sys._getframe(0).f_code.co_name}')
data = {\
"Tables":[partition.Table.Name for partition in model.Partitions],\
"Partitions":[partition.Name for partition in model.Partitions],\
"RefreshedTime":[ticks_to_datetime(partition.RefreshedTime.Ticks).strftime('%Y-%m-%dT%H:%M:%S.%fZ')[:-3] for partition in model.Partitions]\
}
df = pd.DataFrame(data)
if group_partition:
logger.debug('Grouping together to grain of Table')
return df[["Tables","RefreshedTime"]].groupby(by=["Tables"]).max().reset_index(drop=False)
else:
logger.debug('Returning DF')
return df

def BPA_Violations_To_DF(model:pytabular.Tabular,te2:str, bpa:str) -> pd.DataFrame:
'''Runs BPA Analyzer from TE2 and outputs result into a DF.

Args:
model (pytabular.Tabular): Tabular Model Class
te2 (str): TE2 Exe File Path (Can use TE2().EXE_path)
bpa (str): BPA File Location (Can use BPA().Location)

Returns:
pd.DataFrame: Super simple right now. Just splits into two columns.. The object in violation and the rule.
'''
results = model.Analyze_BPA(te2,bpa)
data = [rule.replace(' violates rule ','^').replace('\"','').split('^') for rule in results]
columns = ["Object","Violation"]
return pd.DataFrame(data,columns=columns)
logger = logging.getLogger("PyTabular")


def Return_Zero_Row_Tables(model: pytabular.Tabular) -> List[str]:
"""Returns list of table names of those that are returning isna()

Args:
model (pytabular.Tabular): Tabular Model

Returns:
List[str]: List of table names where DAX COUNTROWS('Table Name') is nan or 0.
"""
logger.info(f"Executing Basic Function {sys._getframe(0).f_code.co_name}")
query_function: str = "COUNTROWS(_)"
df: pd.DataFrame = model.Query_Every_Table(query_function)
return df[df[f"[{query_function}]"].isna()]["[Table]"].to_list()


def Table_Last_Refresh_Times(
model: pytabular.Tabular, group_partition: bool = True
) -> pd.DataFrame:
"""Returns pd.DataFrame of tables with their latest refresh time.
Optional 'group_partition' variable, default is True.
If False an extra column will be include to have the last refresh time to the grain of the partition
Example to add to model model.Create_Table(p.Table_Last_Refresh_Times(model),'RefreshTimes')

Args:
model (pytabular.Tabular): Tabular Model
group_partition (bool, optional): Whether or not you want the grain of the dataframe to be by table or by partition. Defaults to True.

Returns:
pd.DataFrame: pd dataframe with the RefreshedTime property: https://docs.microsoft.com/en-us/dotnet/api/microsoft.analysisservices.tabular.partition.refreshedtime?view=analysisservices-dotnet#microsoft-analysisservices-tabular-partition-refreshedtime
If group_partition == True and the table has multiple partitions, then df.groupby(by["tables"]).max()
"""
logger.info(f"Executing Basic Function {sys._getframe(0).f_code.co_name}")
data = {
"Tables": [partition.Table.Name for partition in model.Partitions],
"Partitions": [partition.Name for partition in model.Partitions],
"RefreshedTime": [
ticks_to_datetime(partition.RefreshedTime.Ticks).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)[:-3]
for partition in model.Partitions
],
}
df = pd.DataFrame(data)
if group_partition:
logger.debug("Grouping together to grain of Table")
return (
df[["Tables", "RefreshedTime"]]
.groupby(by=["Tables"])
.max()
.reset_index(drop=False)
)
else:
logger.debug("Returning DF")
return df


def BPA_Violations_To_DF(model: pytabular.Tabular, te2: str, bpa: str) -> pd.DataFrame:
"""Runs BPA Analyzer from TE2 and outputs result into a DF.

Args:
model (pytabular.Tabular): Tabular Model Class
te2 (str): TE2 Exe File Path (Can use TE2().EXE_path)
bpa (str): BPA File Location (Can use BPA().Location)

Returns:
pd.DataFrame: Super simple right now. Just splits into two columns.. The object in violation and the rule.
"""
results = model.Analyze_BPA(te2, bpa)
data = [
rule.replace(" violates rule ", "^").replace('"', "").split("^")
for rule in results
]
columns = ["Object", "Violation"]
return pd.DataFrame(data, columns=columns)


def Last_X_Interval(
Model:pytabular.Tabular,
Measure:Union[str,pytabular.pytabular.Measure],
Column_Name:Union[str,None] = None,
Date_Column_Identifier:str = "'Date'[DATE_DTE_KEY]",
Number_Of_Intervals:int = 90,
Interval:str = "DAY") -> pd.DataFrame:
'''Pulls the Last X Interval (Ex Last 90 Days) of a specific measure.

Args:
Model (pytabular.Tabular): Tabular Model to perform query on.
Measure (Union[str,pytabular.pytabular.Measure]): Measure to query. If string, will first check for a measure in the model with that name, otherwise will assume it is a DAX Expression (Ex SUM(FactTable[ColumnValue]) ) and perform that as expression
Column_Name (Union[str,None], optional): Column Name to be outputted in DataFrame. You can provide your own otherwise will take from the Measure Name. Defaults to "Result".
Date_Column_Identifier (str, optional): Date column dax identifier. Defaults to "'Date'[DATE_DTE_KEY]".
Number_Of_Intervals (int, optional): This is used to plug in the variables for [DATESINPERIOD](https://docs.microsoft.com/en-us/dax/datesinperiod-function-dax). Defaults to 90.
Interval (str, optional): Sames as Number_Of_Intervals. Used to plug in parameters of DAX function [DATESINPERIOD](https://docs.microsoft.com/en-us/dax/datesinperiod-function-dax). Defaults to "DAY". Possible options are "DAY", "MONTH", "QUARTER", and "YEAR"

Returns:
pd.DataFrame: Pandas DataFrame of results.
'''
if isinstance(Measure,str):
try:
Measure = [measure for measure in Model.Measures if measure.Name == Measure][-1]
Column_Name = Measure.Name if Column_Name is None else Column_Name
Expression = f"[{Measure.Name}]"
except:
logging.debug(f'Measure is string but unable to find Measure...')
Column_Name = "Result" if Column_Name is None else Column_Name
Expression = Measure
else:
Column_Name = Measure.Name if Column_Name is None else Column_Name
Expression = f'[{Column_Name}]'
Query_Str = f'''
EVALUATE
SUMMARIZECOLUMNS(
{Date_Column_Identifier},
KEEPFILTERS( DATESINPERIOD ( {Date_Column_Identifier}, UTCTODAY(), -{Number_Of_Intervals}, {Interval} ) ),
"{Column_Name}", {Expression}
)
'''
logging.info(f'Running query for {Column_Name} in the last {Number_Of_Intervals} {Interval}s...')
return Model.Query(Query_Str)
Model: pytabular.Tabular,
Measure: Union[str, pytabular.pytabular.Measure],
Column_Name: Union[str, None] = None,
Date_Column_Identifier: str = "'Date'[DATE_DTE_KEY]",
Number_Of_Intervals: int = 90,
Interval: str = "DAY",
) -> pd.DataFrame:
"""Pulls the Last X Interval (Ex Last 90 Days) of a specific measure.

Args:
Model (pytabular.Tabular): Tabular Model to perform query on.
Measure (Union[str,pytabular.pytabular.Measure]): Measure to query. If string, will first check for a measure in the model with that name, otherwise will assume it is a DAX Expression (Ex SUM(FactTable[ColumnValue]) ) and perform that as expression
Column_Name (Union[str,None], optional): Column Name to be outputted in DataFrame. You can provide your own otherwise will take from the Measure Name. Defaults to "Result".
Date_Column_Identifier (str, optional): Date column dax identifier. Defaults to "'Date'[DATE_DTE_KEY]".
Number_Of_Intervals (int, optional): This is used to plug in the variables for [DATESINPERIOD](https://docs.microsoft.com/en-us/dax/datesinperiod-function-dax). Defaults to 90.
Interval (str, optional): Sames as Number_Of_Intervals. Used to plug in parameters of DAX function [DATESINPERIOD](https://docs.microsoft.com/en-us/dax/datesinperiod-function-dax). Defaults to "DAY". Possible options are "DAY", "MONTH", "QUARTER", and "YEAR"

Returns:
pd.DataFrame: Pandas DataFrame of results.
"""
if isinstance(Measure, str):
try:
Measure = [
measure for measure in Model.Measures if measure.Name == Measure
][-1]
Column_Name = Measure.Name if Column_Name is None else Column_Name
Expression = f"[{Measure.Name}]"
except Exception:
logging.debug("Measure is string but unable to find Measure...")
Column_Name = "Result" if Column_Name is None else Column_Name
Expression = Measure
else:
Column_Name = Measure.Name if Column_Name is None else Column_Name
Expression = f"[{Column_Name}]"
Query_Str = f"""
EVALUATE
SUMMARIZECOLUMNS(
{Date_Column_Identifier},
KEEPFILTERS( DATESINPERIOD ( {Date_Column_Identifier}, UTCTODAY(), -{Number_Of_Intervals}, {Interval} ) ),
"{Column_Name}", {Expression}
)
"""
logging.info(
f"Running query for {Column_Name} in the last {Number_Of_Intervals} {Interval}s..."
)
return Model.Query(Query_Str)
Loading