Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class TableauOperator(BaseOperator):
between each instance state checks until operation is completed
:param tableau_conn_id: The :ref:`Tableau Connection id <howto/connection:tableau>`
containing the credentials to authenticate to the Tableau Server.
:param incremental: When set to True triggers an incremental extract refresh instead of a full
refresh. Only applies when ``method="refresh"`` on datasource or workbook resources.
Defaults to False (full refresh).
"""

template_fields: Sequence[str] = (
Expand All @@ -82,6 +85,7 @@ def __init__(
blocking_refresh: bool = True,
check_interval: float = 20,
tableau_conn_id: str = "tableau_default",
incremental: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -93,6 +97,7 @@ def __init__(
self.site_id = site_id
self.blocking_refresh = blocking_refresh
self.tableau_conn_id = tableau_conn_id
self.incremental = incremental

def execute(self, context: Context) -> str:
"""
Expand Down Expand Up @@ -124,6 +129,9 @@ def execute(self, context: Context) -> str:
if not job_items:
raise ValueError("Tableau tasks.run returned no JobItem in response")
job_id = job_items[0].id
elif self.method == "refresh":
response = method(resource_id, incremental=self.incremental)
job_id = response.id
else:
response = method(resource_id)
job_id = response.id
Expand Down
36 changes: 32 additions & 4 deletions providers/tableau/tests/unit/tableau/operators/test_tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_execute_workbooks(self, mock_tableau_hook):

job_id = operator.execute(context={})

mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2)
mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2, incremental=False)
assert mock_tableau_hook.server.workbooks.refresh.return_value.id == job_id

@patch("airflow.providers.tableau.operators.tableau.TableauHook")
Expand Down Expand Up @@ -106,7 +106,7 @@ def mock_wait_for_state(job_id, target_state, check_interval):

job_id = operator.execute(context={})

mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2)
mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2, incremental=False)
assert mock_tableau_hook.server.workbooks.refresh.return_value.id == job_id
mock_tableau_hook.wait_for_state.assert_called_once_with(
job_id=job_id, check_interval=20, target_state=TableauJobFinishCode.SUCCESS
Expand Down Expand Up @@ -135,7 +135,7 @@ def test_execute_datasources(self, mock_tableau_hook):

job_id = operator.execute(context={})

mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2)
mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2, incremental=False)
assert mock_tableau_hook.server.datasources.refresh.return_value.id == job_id

@patch("airflow.providers.tableau.operators.tableau.TableauHook")
Expand Down Expand Up @@ -167,7 +167,7 @@ def mock_wait_for_state(job_id, target_state, check_interval):

job_id = operator.execute(context={})

mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2)
mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2, incremental=False)
assert mock_tableau_hook.server.datasources.refresh.return_value.id == job_id
mock_tableau_hook.wait_for_state.assert_called_once_with(
job_id=job_id, check_interval=20, target_state=TableauJobFinishCode.SUCCESS
Expand Down Expand Up @@ -270,6 +270,34 @@ def test_execute_unavailable_resource(self):
with pytest.raises(AirflowException):
operator.execute({})

@patch("airflow.providers.tableau.operators.tableau.TableauHook")
def test_execute_datasources_incremental(self, mock_tableau_hook):
"""incremental=True must be forwarded to datasources.refresh()."""
mock_tableau_hook.get_all = Mock(return_value=self.mock_datasources)
mock_tableau_hook.return_value.__enter__ = Mock(return_value=mock_tableau_hook)
operator = TableauOperator(
blocking_refresh=False, find="ds_2", resource="datasources", incremental=True, **self.kwargs
)

job_id = operator.execute(context={})

mock_tableau_hook.server.datasources.refresh.assert_called_once_with(2, incremental=True)
assert mock_tableau_hook.server.datasources.refresh.return_value.id == job_id

@patch("airflow.providers.tableau.operators.tableau.TableauHook")
def test_execute_workbooks_incremental(self, mock_tableau_hook):
"""incremental=True must be forwarded to workbooks.refresh()."""
mock_tableau_hook.get_all = Mock(return_value=self.mocked_workbooks)
mock_tableau_hook.return_value.__enter__ = Mock(return_value=mock_tableau_hook)
operator = TableauOperator(
blocking_refresh=False, find="wb_2", resource="workbooks", incremental=True, **self.kwargs
)

job_id = operator.execute(context={})

mock_tableau_hook.server.workbooks.refresh.assert_called_once_with(2, incremental=True)
assert mock_tableau_hook.server.workbooks.refresh.return_value.id == job_id

def test_get_resource_id(self):
"""
Test get resource id
Expand Down