From e206854cd59e284c3e66cc51aeedf3fb523534d9 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 27 Jun 2023 13:16:49 -0700 Subject: [PATCH 1/7] EKS Create Cluster deferrable --- airflow/providers/amazon/aws/operators/eks.py | 102 +++++++++++++++++- airflow/providers/amazon/aws/triggers/eks.py | 60 +++++++++++ .../example_eks_with_fargate_in_one_step.py | 3 + 3 files changed, 161 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index 6858f801121bd..8e9c6c5ae5bc8 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -30,6 +30,7 @@ from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.eks import EksHook from airflow.providers.amazon.aws.triggers.eks import ( + EksClusterTrigger, EksCreateFargateProfileTrigger, EksCreateNodegroupTrigger, EksDeleteFargateProfileTrigger, @@ -225,6 +226,7 @@ def __init__( wait_for_completion: bool = False, aws_conn_id: str = DEFAULT_CONN_ID, region: str | None = None, + deferrable: bool = False, waiter_delay: int = 30, waiter_max_attempts: int = 40, **kwargs, @@ -237,7 +239,7 @@ def __init__( self.nodegroup_role_arn = nodegroup_role_arn self.fargate_pod_execution_role_arn = fargate_pod_execution_role_arn self.create_fargate_profile_kwargs = create_fargate_profile_kwargs or {} - self.wait_for_completion = wait_for_completion + self.wait_for_completion = False if deferrable else wait_for_completion self.waiter_delay = waiter_delay self.waiter_max_attempts = waiter_max_attempts self.aws_conn_id = aws_conn_id @@ -246,6 +248,7 @@ def __init__( self.create_nodegroup_kwargs = create_nodegroup_kwargs or {} self.fargate_selectors = fargate_selectors or [{"namespace": DEFAULT_NAMESPACE_NAME}] self.fargate_profile_name = fargate_profile_name + self.deferrable = deferrable; super().__init__( **kwargs, ) @@ -274,11 +277,25 @@ def execute(self, context: Context): # Short circuit early if we don't need to wait to attach compute # and the caller hasn't requested to wait for the cluster either. - if not self.compute and not self.wait_for_completion: + if not self.compute and not self.wait_for_completion and not self.deferrable: return None self.log.info("Waiting for EKS Cluster to provision. This will take some time.") client = self.eks_hook.conn + + if self.deferrable: + self.defer( + trigger=EksClusterTrigger( + waiter_name="cluster_active", + cluster_name=self.cluster_name, + aws_conn_id=self.aws_conn_id, + region=self.region, + waiter_delay=self.waiter_delay, + waiter_max_attempts=self.waiter_max_attempts, + ), + method_name="deferrable_create_cluster_next", + timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), + ) try: client.get_waiter("cluster_active").wait( @@ -310,7 +327,73 @@ def execute(self, context: Context): create_fargate_profile_kwargs=self.create_fargate_profile_kwargs, subnets=cast(List[str], self.resources_vpc_config.get("subnetIds")), ) - + def deferrable_create_cluster_next(self, context, event=None): + if event["status"] == "failed": + self.log.error("Cluster failed to start and will be torn down.") + self.eks_hook.delete_cluster(name=self.cluster_name) + self.defer( + trigger=EksClusterTrigger( + cluster_name=self.cluster_name, + aws_conn_id=self.aws_conn_id, + region=self.region, + waiter_delay=self.waiter_delay, + waiter_max_attempts=self.waiter_max_attempts, + ), + method_name="execute_complete", + timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), + ) + elif event["status"] == "success": + self.log.info("Cluster is ready to provision compute.") + _create_compute( + compute=self.compute, + cluster_name=self.cluster_name, + aws_conn_id=self.aws_conn_id, + region=self.region, + wait_for_completion=self.wait_for_completion, + waiter_delay=self.waiter_delay, + waiter_max_attempts=self.waiter_max_attempts, + nodegroup_name=self.nodegroup_name, + nodegroup_role_arn=self.nodegroup_role_arn, + create_nodegroup_kwargs=self.create_nodegroup_kwargs, + fargate_profile_name=self.fargate_profile_name, + fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn, + fargate_selectors=self.fargate_selectors, + create_fargate_profile_kwargs=self.create_fargate_profile_kwargs, + subnets=cast(List[str], self.resources_vpc_config.get("subnetIds")), + ) + if self.compute == "fargate": + self.defer( + trigger=EksCreateFargateProfileTrigger( + cluster_name=self.cluster_name, + fargate_profile_name=self.fargate_profile_name, + waiter_delay=self.waiter_delay, + waiter_max_attempts=self.waiter_max_attempts, + aws_conn_id=self.aws_conn_id, + region=self.region, + ), + method_name="execute_complete", + timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), + ) + else: + self.defer( + trigger=EksNodegroupTrigger( + waiter_name="nodegroup_active", + cluster_name=self.cluster_name, + aws_conn_id=self.aws_conn_id, + region=self.region, + waiter_delay=self.waiter_delay, + waiter_max_attempts=self.waiter_max_attempts, + ), + method_name="execute_complete", + timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), + ) + def execute_complete(self, context, event=None): + resource = "fargate profile" if self.compute == "fargate" else self.compute + if event["status"] != "success": + raise AirflowException(f"Error creating {resource}: {event}") + else: + self.log.info(f"{resource} created successfully") + return class EksCreateNodegroupOperator(BaseOperator): """ @@ -564,6 +647,11 @@ class EksDeleteClusterOperator(BaseOperator): maintained on each worker node). :param region: Which AWS region the connection should use. (templated) If this is None or empty then the default boto3 behaviour is used. + :param waiter_delay: Time (in seconds) to wait between two consecutive calls to check cluster state + :param waiter_max_attempts: The maximum number of attempts to check cluster state + :param deferrable: If True, the operator will wait asynchronously for the cluster to be deleted. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) """ @@ -582,13 +670,19 @@ def __init__( wait_for_completion: bool = False, aws_conn_id: str = DEFAULT_CONN_ID, region: str | None = None, + deferrable: bool = False, + waiter_delay: int = 30, + waiter_max_attempts: int = 40, **kwargs, ) -> None: self.cluster_name = cluster_name self.force_delete_compute = force_delete_compute - self.wait_for_completion = wait_for_completion + self.wait_for_completion = False if deferrable else wait_for_completion self.aws_conn_id = aws_conn_id self.region = region + self.deferrable = deferrable + self.waiter_delay = waiter_delay + self.waiter_max_attempts = waiter_max_attempts super().__init__(**kwargs) def execute(self, context: Context): diff --git a/airflow/providers/amazon/aws/triggers/eks.py b/airflow/providers/amazon/aws/triggers/eks.py index a6fb75eb80fa2..ea8e9c420218e 100644 --- a/airflow/providers/amazon/aws/triggers/eks.py +++ b/airflow/providers/amazon/aws/triggers/eks.py @@ -24,6 +24,66 @@ from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger +class EksClusterTrigger(BaseTrigger): + """ + Trigger for EksCreateClusterOperator. + The trigger will asynchronously wait for the cluster to be created. + + :param cluster_name: The name of the EKS cluster + :param waiter_delay: The amount of time in seconds to wait between attempts. + :param waiter_max_attempts: The maximum number of attempts to be made. + :param aws_conn_id: The Airflow connection used for AWS credentials. + :param region: Which AWS region the connection should use. + If this is None or empty then the default boto3 behaviour is used. + """ + + def __init__( + self, + waiter_name: str, + cluster_name: str, + waiter_delay: int, + waiter_max_attempts: int, + aws_conn_id: str, + region: str, + ): + self.waiter_name = waiter_name + self.cluster_name = cluster_name + self.waiter_delay = waiter_delay + self.waiter_max_attempts = waiter_max_attempts + self.aws_conn_id = aws_conn_id + self.region = region + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ( + self.__class__.__module__ + "." + self.__class__.__qualname__, + { + "waiter_name": self.waiter_name, + "cluster_name": self.cluster_name, + "waiter_delay": str(self.waiter_delay), + "waiter_max_attempts": str(self.waiter_max_attempts), + "aws_conn_id": self.aws_conn_id, + "region": self.region, + }, + ) + + async def run(self): + self.hook = EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region) + async with self.hook.async_conn as client: + waiter = client.get_waiter(self.waiter_name) + await async_wait( + waiter=waiter, + waiter_max_attempts=int(self.waiter_max_attempts), + waiter_delay=int(self.waiter_delay), + args={"name": self.cluster_name}, + failure_message="Error checking Eks cluster", + status_message="Eks cluster status is", + status_args=["cluster.status"], + ) + yield TriggerEvent( + {"status": "success"} + ) + + class EksCreateFargateProfileTrigger(AwsBaseWaiterTrigger): """ Asynchronously wait for the fargate profile to be created. diff --git a/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py b/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py index ae67a26588bdc..6b9907d836108 100644 --- a/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py +++ b/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py @@ -81,6 +81,9 @@ # Opting to use the same ARN for the cluster and the pod here, # but a different ARN could be configured and passed if desired. fargate_pod_execution_role_arn=fargate_pod_role_arn, + deferrable=True, + waiter_delay=30, + wait_for_completion=399, ) # [END howto_operator_eks_create_cluster_with_fargate_profile] From 20e0b67d565679236ddd0d722a4a31513b184f4d Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 4 Jul 2023 14:04:32 -0700 Subject: [PATCH 2/7] Add unit tests Add documentation --- airflow/providers/amazon/aws/operators/eks.py | 58 +++++-- airflow/providers/amazon/aws/triggers/eks.py | 142 ++++++++++++++++-- .../operators/eks.rst | 2 + .../amazon/aws/operators/test_eks.py | 33 ++++ .../providers/amazon/aws/triggers/test_eks.py | 1 + 5 files changed, 213 insertions(+), 23 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index 8e9c6c5ae5bc8..4c976a66c0df7 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -30,9 +30,10 @@ from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.eks import EksHook from airflow.providers.amazon.aws.triggers.eks import ( - EksClusterTrigger, + EksCreateClusterTrigger, EksCreateFargateProfileTrigger, EksCreateNodegroupTrigger, + EksDeleteClusterTrigger, EksDeleteFargateProfileTrigger, EksDeleteNodegroupTrigger, ) @@ -188,6 +189,9 @@ class EksCreateClusterOperator(BaseOperator): (templated) :param waiter_delay: Time (in seconds) to wait between two consecutive calls to check cluster state :param waiter_max_attempts: The maximum number of attempts to check cluster state + :param deferrable: If True, the operator will wait asynchronously for the job to complete. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) """ @@ -248,7 +252,7 @@ def __init__( self.create_nodegroup_kwargs = create_nodegroup_kwargs or {} self.fargate_selectors = fargate_selectors or [{"namespace": DEFAULT_NAMESPACE_NAME}] self.fargate_profile_name = fargate_profile_name - self.deferrable = deferrable; + self.deferrable = deferrable super().__init__( **kwargs, ) @@ -277,15 +281,17 @@ def execute(self, context: Context): # Short circuit early if we don't need to wait to attach compute # and the caller hasn't requested to wait for the cluster either. - if not self.compute and not self.wait_for_completion and not self.deferrable: + # if not self.compute and not self.wait_for_completion and not self.deferrable: + # return None + if not any([self.compute, self.wait_for_completion, self.deferrable]): return None self.log.info("Waiting for EKS Cluster to provision. This will take some time.") client = self.eks_hook.conn - + if self.deferrable: self.defer( - trigger=EksClusterTrigger( + trigger=EksCreateClusterTrigger( waiter_name="cluster_active", cluster_name=self.cluster_name, aws_conn_id=self.aws_conn_id, @@ -294,7 +300,7 @@ def execute(self, context: Context): waiter_max_attempts=self.waiter_max_attempts, ), method_name="deferrable_create_cluster_next", - timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), + timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), ) try: @@ -327,20 +333,22 @@ def execute(self, context: Context): create_fargate_profile_kwargs=self.create_fargate_profile_kwargs, subnets=cast(List[str], self.resources_vpc_config.get("subnetIds")), ) + def deferrable_create_cluster_next(self, context, event=None): if event["status"] == "failed": self.log.error("Cluster failed to start and will be torn down.") self.eks_hook.delete_cluster(name=self.cluster_name) self.defer( - trigger=EksClusterTrigger( + trigger=EksDeleteClusterTrigger( cluster_name=self.cluster_name, - aws_conn_id=self.aws_conn_id, - region=self.region, waiter_delay=self.waiter_delay, waiter_max_attempts=self.waiter_max_attempts, + aws_conn_id=self.aws_conn_id, + region=self.region, + delete_resources=False, ), - method_name="execute_complete", - timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), + method_name="execute_failed", + timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), ) elif event["status"] == "success": self.log.info("Cluster is ready to provision compute.") @@ -378,6 +386,7 @@ def deferrable_create_cluster_next(self, context, event=None): self.defer( trigger=EksNodegroupTrigger( waiter_name="nodegroup_active", + nodegroup_name=self.nodegroup_name, cluster_name=self.cluster_name, aws_conn_id=self.aws_conn_id, region=self.region, @@ -387,14 +396,21 @@ def deferrable_create_cluster_next(self, context, event=None): method_name="execute_complete", timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), ) + + def execute_failed(self, context, event=None): + if event["status"] == "delteted": + self.log.info("Cluster deleted") + raise event["exception"] + def execute_complete(self, context, event=None): resource = "fargate profile" if self.compute == "fargate" else self.compute if event["status"] != "success": raise AirflowException(f"Error creating {resource}: {event}") else: - self.log.info(f"{resource} created successfully") + self.log.info("%s created successfully", resource) return + class EksCreateNodegroupOperator(BaseOperator): """ Creates an Amazon EKS managed node group for an existing Amazon EKS Cluster. @@ -690,7 +706,19 @@ def execute(self, context: Context): aws_conn_id=self.aws_conn_id, region_name=self.region, ) - + if self.deferrable: + self.defer( + trigger=EksDeleteClusterTrigger( + cluster_name=self.cluster_name, + waiter_delay=self.waiter_delay, + waiter_max_attempts=self.waiter_max_attempts, + aws_conn_id=self.aws_conn_id, + region=self.region, + force_delete_compute=self.force_delete_compute, + ), + method_name="execute_complete", + timeout=timedelta(seconds=self.waiter_delay * self.waiter_max_attempts), + ) if self.force_delete_compute: self.delete_any_nodegroups(eks_hook) self.delete_any_fargate_profiles(eks_hook) @@ -739,6 +767,10 @@ def delete_any_fargate_profiles(self, eks_hook) -> None: ) self.log.info(SUCCESS_MSG.format(compute=FARGATE_FULL_NAME)) + def execute_complete(self, context, event=None): + if event["status"] == "success": + self.log.info("Cluster deleted successfully.") + class EksDeleteNodegroupOperator(BaseOperator): """ diff --git a/airflow/providers/amazon/aws/triggers/eks.py b/airflow/providers/amazon/aws/triggers/eks.py index ea8e9c420218e..42483433b01e7 100644 --- a/airflow/providers/amazon/aws/triggers/eks.py +++ b/airflow/providers/amazon/aws/triggers/eks.py @@ -17,6 +17,9 @@ from __future__ import annotations import warnings +import asyncio +from functools import cached_property +from typing import Any from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook @@ -24,11 +27,12 @@ from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger -class EksClusterTrigger(BaseTrigger): +class EksCreateClusterTrigger(BaseTrigger): """ Trigger for EksCreateClusterOperator. The trigger will asynchronously wait for the cluster to be created. + :param waiter_name: The name of the waiter to use. :param cluster_name: The name of the EKS cluster :param waiter_delay: The amount of time in seconds to wait between attempts. :param waiter_max_attempts: The maximum number of attempts to be made. @@ -44,7 +48,7 @@ def __init__( waiter_delay: int, waiter_max_attempts: int, aws_conn_id: str, - region: str, + region: str | None, ): self.waiter_name = waiter_name self.cluster_name = cluster_name @@ -65,24 +69,142 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "region": self.region, }, ) - + async def run(self): + failure_message = "Error checking Eks cluster" self.hook = EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region) async with self.hook.async_conn as client: waiter = client.get_waiter(self.waiter_name) + try: + await async_wait( + waiter=waiter, + waiter_max_attempts=int(self.waiter_max_attempts), + waiter_delay=int(self.waiter_delay), + args={"name": self.cluster_name}, + failure_message=failure_message, + status_message="Eks cluster status is", + status_args=["cluster.status"], + ) + except AirflowException as exc: + if failure_message in str(exc): + yield TriggerEvent({"status": "failed", "exception": exc}) + raise + yield TriggerEvent({"status": "success"}) + + +class EksDeleteClusterTrigger(BaseTrigger): + """ + Trigger for EksDeleteClusterOperator. + The trigger will asynchronously wait for the cluster to be deleted. If there are + any nodegroups or fargate profiles associated with the cluster, they will be deleted + before the cluster is deleted. + + :param cluster_name: The name of the EKS cluster + :param waiter_delay: The amount of time in seconds to wait between attempts. + :param waiter_max_attempts: The maximum number of attempts to be made. + :param aws_conn_id: The Airflow connection used for AWS credentials. + :param region: Which AWS region the connection should use. + If this is None or empty then the default boto3 behaviour is used. + :param force_delete_compute: If True, any nodegroups or fargate profiles associated + with the cluster will be deleted before the cluster is deleted. + """ + + def __init__( + self, + cluster_name, + waiter_delay: int, + waiter_max_attempts: int, + aws_conn_id: str, + region: str | None, + force_delete_compute: bool, + ): + self.cluster_name = cluster_name + self.waiter_delay = waiter_delay + self.waiter_max_attempts = waiter_max_attempts + self.aws_conn_id = aws_conn_id + self.region = region + self.force_delete_compute = force_delete_compute + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ( + self.__class__.__module__ + "." + self.__class__.__qualname__, + { + "cluster_name": self.cluster_name, + "waiter_delay": str(self.waiter_delay), + "waiter_max_attempts": str(self.waiter_max_attempts), + "aws_conn_id": self.aws_conn_id, + "region": self.region, + "force_delete_compute": self.force_delete_compute, + }, + ) + + @cached_property + def hook(self) -> EksHook: + return EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region) + + async def run(self): + async with self.hook.async_conn as client: + waiter = client.get_waiter("cluster_deleted") + if self.force_delete_compute: + await self.delete_any_nodegroups(client=client) + await self.delete_any_fargate_profiles(client=client) + await client.delete_cluster(name=self.cluster_name) await async_wait( waiter=waiter, - waiter_max_attempts=int(self.waiter_max_attempts), waiter_delay=int(self.waiter_delay), + waiter_max_attempts=int(self.waiter_max_attempts), args={"name": self.cluster_name}, - failure_message="Error checking Eks cluster", - status_message="Eks cluster status is", + failure_message="Error deleting cluster", + status_message="Status of cluster is", status_args=["cluster.status"], ) - yield TriggerEvent( - {"status": "success"} - ) - + + yield TriggerEvent({"status": "deleted"}) + + async def delete_any_nodegroups(self, client): + nodegroups = await client.list_nodegroups(clusterName=self.cluster_name) + if nodegroups.get("nodegroups", None): + self.log.info("Deleting nodegroups") + for group in nodegroups["nodegroups"]: + await client.delete_nodegroup(clusterName=self.cluster_name, nodegroupName=group) + await async_wait( + waiter=self.hook.get_waiter("all_nodegroups_deleted", deferrable=True, client=client), + waiter_delay=int(self.waiter_delay), + waiter_max_attempts=int(self.waiter_max_attempts), + args={"clusterName": self.cluster_name}, + failure_message=f"Error deleting nodegroup for cluster {self.cluster_name}", + status_message="Deleting nodegroups associated with the cluster", + status_args=["nodegroups"], + ) + self.log.info("All nodegroups deleted") + else: + self.log.info("No nodegroups associated with cluster %s", self.cluster_name) + + async def delete_any_fargate_profiles(self, client) -> None: + """ + Deletes all EKS Fargate profiles for a provided Amazon EKS Cluster. + + EKS Fargate profiles must be deleted one at a time, so we must wait + for one to be deleted before sending the next delete command. + """ + fargate_profiles = await client.list_fargate_profiles(clusterName=self.cluster_name) + if fargate_profiles.get("fargateProfileNames"): + self.log.info("Waiting for Fargate profiles to delete. This will take some time.") + for profile in fargate_profiles["fargateProfileNames"]: + await client.delete_fargate_profile(clusterName=self.cluster_name, fargateProfileName=profile) + await async_wait( + waiter=client.get_waiter("fargate_profile_deleted"), + waiter_delay=int(self.waiter_delay), + waiter_max_attempts=int(self.waiter_max_attempts), + args={"clusterName": self.cluster_name, "fargateProfileName": profile}, + failure_message=f"Error deleting fargate profile for cluster {self.cluster_name}", + status_message="Status of fargate profile is", + status_args=["fargateProfile.status"], + ) + self.log.info("All Fargate profiles deleted") + else: + self.log.info(f"No Fargate profiles associated with cluster {self.cluster_name}") + class EksCreateFargateProfileTrigger(AwsBaseWaiterTrigger): """ diff --git a/docs/apache-airflow-providers-amazon/operators/eks.rst b/docs/apache-airflow-providers-amazon/operators/eks.rst index cff682db2aa8d..9f1cc9df61eec 100644 --- a/docs/apache-airflow-providers-amazon/operators/eks.rst +++ b/docs/apache-airflow-providers-amazon/operators/eks.rst @@ -76,6 +76,7 @@ Create an Amazon EKS cluster and AWS Fargate profile in one step To create an Amazon EKS cluster and an AWS Fargate profile in one command, you can use :class:`~airflow.providers.amazon.aws.operators.eks.EksCreateClusterOperator`. +You can also run this operator in deferrable mode by setting ``deferrable`` param to ``True``. Note: An AWS IAM role with the following permissions is required: ``ec2.amazon.aws.com`` must be in the Trusted Relationships @@ -97,6 +98,7 @@ Delete an Amazon EKS Cluster To delete an existing Amazon EKS Cluster you can use :class:`~airflow.providers.amazon.aws.operators.eks.EksDeleteClusterOperator`. +You can also run this operator in deferrable mode by setting ``deferrable`` param to ``True``. .. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py :language: python diff --git a/tests/providers/amazon/aws/operators/test_eks.py b/tests/providers/amazon/aws/operators/test_eks.py index 8381a26ef151a..4410105b74717 100644 --- a/tests/providers/amazon/aws/operators/test_eks.py +++ b/tests/providers/amazon/aws/operators/test_eks.py @@ -337,6 +337,34 @@ def test_fargate_compute_missing_fargate_pod_execution_role_arn(self): ): missing_fargate_pod_execution_role_arn.execute({}) + @mock.patch.object(EksHook, "create_cluster") + def test_eks_create_cluster_short_circuit_early(self, mock_create_cluster, caplog): + mock_create_cluster.return_value = None + eks_create_cluster_operator = EksCreateClusterOperator( + task_id=TASK_ID, + **self.create_cluster_params, + compute=None, + wait_for_completion=False, + deferrable=False, + ) + eks_create_cluster_operator.execute({}) + assert len(caplog.records) == 0 + + @mock.patch.object(EksHook, "create_cluster") + def test_eks_create_cluster_with_deferrable(self, mock_create_cluster, caplog): + mock_create_cluster.return_value = None + + eks_create_cluster_operator = EksCreateClusterOperator( + task_id=TASK_ID, + **self.create_cluster_params, + compute=None, + wait_for_completion=False, + deferrable=True, + ) + with pytest.raises(TaskDeferred): + eks_create_cluster_operator.execute({}) + assert "Waiting for EKS Cluster to provision. This will take some time." in caplog.messages + class TestEksCreateFargateProfileOperator: def setup_method(self) -> None: @@ -542,6 +570,11 @@ def test_existing_cluster_not_in_use_with_wait( mock_waiter.assert_called_with(mock.ANY, name=CLUSTER_NAME) assert_expected_waiter_type(mock_waiter, "ClusterDeleted") + def test_eks_delete_cluster_operator_with_deferrable(self): + self.delete_cluster_operator.deferrable = True + with pytest.raises(TaskDeferred): + self.delete_cluster_operator.execute({}) + class TestEksDeleteNodegroupOperator: def setup_method(self) -> None: diff --git a/tests/providers/amazon/aws/triggers/test_eks.py b/tests/providers/amazon/aws/triggers/test_eks.py index 045519aea57e8..de39c2e9370ac 100644 --- a/tests/providers/amazon/aws/triggers/test_eks.py +++ b/tests/providers/amazon/aws/triggers/test_eks.py @@ -19,6 +19,7 @@ import pytest from airflow.providers.amazon.aws.triggers.eks import ( + EksCreateClusterTrigger, EksCreateFargateProfileTrigger, EksCreateNodegroupTrigger, EksDeleteFargateProfileTrigger, From fe183e61f60cd7e421d8b0a1f08fe961895fb1aa Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Thu, 6 Jul 2023 16:03:29 -0700 Subject: [PATCH 3/7] D205 Doc style changes --- airflow/providers/amazon/aws/operators/eks.py | 4 +--- airflow/providers/amazon/aws/triggers/eks.py | 2 ++ tests/providers/amazon/aws/operators/test_eks.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index 4c976a66c0df7..837d18a6ad827 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -281,12 +281,10 @@ def execute(self, context: Context): # Short circuit early if we don't need to wait to attach compute # and the caller hasn't requested to wait for the cluster either. - # if not self.compute and not self.wait_for_completion and not self.deferrable: - # return None if not any([self.compute, self.wait_for_completion, self.deferrable]): return None - self.log.info("Waiting for EKS Cluster to provision. This will take some time.") + self.log.info("Waiting for EKS Cluster to provision. This will take some time.") client = self.eks_hook.conn if self.deferrable: diff --git a/airflow/providers/amazon/aws/triggers/eks.py b/airflow/providers/amazon/aws/triggers/eks.py index 42483433b01e7..e327ff525d5bf 100644 --- a/airflow/providers/amazon/aws/triggers/eks.py +++ b/airflow/providers/amazon/aws/triggers/eks.py @@ -30,6 +30,7 @@ class EksCreateClusterTrigger(BaseTrigger): """ Trigger for EksCreateClusterOperator. + The trigger will asynchronously wait for the cluster to be created. :param waiter_name: The name of the waiter to use. @@ -95,6 +96,7 @@ async def run(self): class EksDeleteClusterTrigger(BaseTrigger): """ Trigger for EksDeleteClusterOperator. + The trigger will asynchronously wait for the cluster to be deleted. If there are any nodegroups or fargate profiles associated with the cluster, they will be deleted before the cluster is deleted. diff --git a/tests/providers/amazon/aws/operators/test_eks.py b/tests/providers/amazon/aws/operators/test_eks.py index 4410105b74717..e537ab20a3336 100644 --- a/tests/providers/amazon/aws/operators/test_eks.py +++ b/tests/providers/amazon/aws/operators/test_eks.py @@ -363,7 +363,7 @@ def test_eks_create_cluster_with_deferrable(self, mock_create_cluster, caplog): ) with pytest.raises(TaskDeferred): eks_create_cluster_operator.execute({}) - assert "Waiting for EKS Cluster to provision. This will take some time." in caplog.messages + assert "Waiting for EKS Cluster to provision. This will take some time." in caplog.messages class TestEksCreateFargateProfileOperator: From a79c229c7940baacd345cdb82602224314cf3bb8 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 11 Jul 2023 11:40:00 -0700 Subject: [PATCH 4/7] Rebase to use AwsBaseWaiterTrigger --- airflow/providers/amazon/aws/operators/eks.py | 16 ++-- airflow/providers/amazon/aws/triggers/eks.py | 95 ++++++++----------- .../providers/amazon/aws/triggers/test_eks.py | 16 ++++ 3 files changed, 62 insertions(+), 65 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index 837d18a6ad827..5146da9de17b6 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -230,7 +230,7 @@ def __init__( wait_for_completion: bool = False, aws_conn_id: str = DEFAULT_CONN_ID, region: str | None = None, - deferrable: bool = False, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), waiter_delay: int = 30, waiter_max_attempts: int = 40, **kwargs, @@ -290,10 +290,9 @@ def execute(self, context: Context): if self.deferrable: self.defer( trigger=EksCreateClusterTrigger( - waiter_name="cluster_active", cluster_name=self.cluster_name, aws_conn_id=self.aws_conn_id, - region=self.region, + region_name=self.region, waiter_delay=self.waiter_delay, waiter_max_attempts=self.waiter_max_attempts, ), @@ -342,7 +341,7 @@ def deferrable_create_cluster_next(self, context, event=None): waiter_delay=self.waiter_delay, waiter_max_attempts=self.waiter_max_attempts, aws_conn_id=self.aws_conn_id, - region=self.region, + region_name=self.region, delete_resources=False, ), method_name="execute_failed", @@ -382,12 +381,11 @@ def deferrable_create_cluster_next(self, context, event=None): ) else: self.defer( - trigger=EksNodegroupTrigger( - waiter_name="nodegroup_active", + trigger=EksCreateNodegroupTrigger( nodegroup_name=self.nodegroup_name, cluster_name=self.cluster_name, aws_conn_id=self.aws_conn_id, - region=self.region, + region_name=self.region, waiter_delay=self.waiter_delay, waiter_max_attempts=self.waiter_max_attempts, ), @@ -684,7 +682,7 @@ def __init__( wait_for_completion: bool = False, aws_conn_id: str = DEFAULT_CONN_ID, region: str | None = None, - deferrable: bool = False, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), waiter_delay: int = 30, waiter_max_attempts: int = 40, **kwargs, @@ -711,7 +709,7 @@ def execute(self, context: Context): waiter_delay=self.waiter_delay, waiter_max_attempts=self.waiter_max_attempts, aws_conn_id=self.aws_conn_id, - region=self.region, + region_name=self.region, force_delete_compute=self.force_delete_compute, ), method_name="execute_complete", diff --git a/airflow/providers/amazon/aws/triggers/eks.py b/airflow/providers/amazon/aws/triggers/eks.py index e327ff525d5bf..576286d209c6a 100644 --- a/airflow/providers/amazon/aws/triggers/eks.py +++ b/airflow/providers/amazon/aws/triggers/eks.py @@ -17,86 +17,60 @@ from __future__ import annotations import warnings -import asyncio -from functools import cached_property from typing import Any from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook from airflow.providers.amazon.aws.hooks.eks import EksHook from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger +from airflow.providers.amazon.aws.utils.waiter_with_logging import async_wait +from airflow.triggers.base import TriggerEvent -class EksCreateClusterTrigger(BaseTrigger): +class EksCreateClusterTrigger(AwsBaseWaiterTrigger): """ Trigger for EksCreateClusterOperator. - + The trigger will asynchronously wait for the cluster to be created. - :param waiter_name: The name of the waiter to use. :param cluster_name: The name of the EKS cluster :param waiter_delay: The amount of time in seconds to wait between attempts. :param waiter_max_attempts: The maximum number of attempts to be made. :param aws_conn_id: The Airflow connection used for AWS credentials. - :param region: Which AWS region the connection should use. + :param region_name: Which AWS region the connection should use. If this is None or empty then the default boto3 behaviour is used. """ def __init__( self, - waiter_name: str, cluster_name: str, waiter_delay: int, waiter_max_attempts: int, aws_conn_id: str, - region: str | None, + region_name: str | None, ): - self.waiter_name = waiter_name - self.cluster_name = cluster_name - self.waiter_delay = waiter_delay - self.waiter_max_attempts = waiter_max_attempts - self.aws_conn_id = aws_conn_id - self.region = region - - def serialize(self) -> tuple[str, dict[str, Any]]: - return ( - self.__class__.__module__ + "." + self.__class__.__qualname__, - { - "waiter_name": self.waiter_name, - "cluster_name": self.cluster_name, - "waiter_delay": str(self.waiter_delay), - "waiter_max_attempts": str(self.waiter_max_attempts), - "aws_conn_id": self.aws_conn_id, - "region": self.region, - }, + super().__init__( + serialized_fields={"cluster_name": cluster_name, "region_name": region_name}, + waiter_name="cluster_active", + waiter_args={"name": cluster_name}, + failure_message="Error checking Eks cluster", + status_message="Eks cluster status is", + status_queries=["cluster.status"], + return_value=None, + waiter_delay=waiter_delay, + waiter_max_attempts=waiter_max_attempts, + aws_conn_id=aws_conn_id, + region_name=region_name, ) - async def run(self): - failure_message = "Error checking Eks cluster" - self.hook = EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region) - async with self.hook.async_conn as client: - waiter = client.get_waiter(self.waiter_name) - try: - await async_wait( - waiter=waiter, - waiter_max_attempts=int(self.waiter_max_attempts), - waiter_delay=int(self.waiter_delay), - args={"name": self.cluster_name}, - failure_message=failure_message, - status_message="Eks cluster status is", - status_args=["cluster.status"], - ) - except AirflowException as exc: - if failure_message in str(exc): - yield TriggerEvent({"status": "failed", "exception": exc}) - raise - yield TriggerEvent({"status": "success"}) + def hook(self) -> AwsGenericHook: + return EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) -class EksDeleteClusterTrigger(BaseTrigger): +class EksDeleteClusterTrigger(AwsBaseWaiterTrigger): """ Trigger for EksDeleteClusterOperator. - + The trigger will asynchronously wait for the cluster to be deleted. If there are any nodegroups or fargate profiles associated with the cluster, they will be deleted before the cluster is deleted. @@ -105,7 +79,7 @@ class EksDeleteClusterTrigger(BaseTrigger): :param waiter_delay: The amount of time in seconds to wait between attempts. :param waiter_max_attempts: The maximum number of attempts to be made. :param aws_conn_id: The Airflow connection used for AWS credentials. - :param region: Which AWS region the connection should use. + :param region_name: Which AWS region the connection should use. If this is None or empty then the default boto3 behaviour is used. :param force_delete_compute: If True, any nodegroups or fargate profiles associated with the cluster will be deleted before the cluster is deleted. @@ -117,14 +91,14 @@ def __init__( waiter_delay: int, waiter_max_attempts: int, aws_conn_id: str, - region: str | None, + region_name: str | None, force_delete_compute: bool, ): self.cluster_name = cluster_name self.waiter_delay = waiter_delay self.waiter_max_attempts = waiter_max_attempts self.aws_conn_id = aws_conn_id - self.region = region + self.region_name = region_name self.force_delete_compute = force_delete_compute def serialize(self) -> tuple[str, dict[str, Any]]: @@ -135,14 +109,13 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "waiter_delay": str(self.waiter_delay), "waiter_max_attempts": str(self.waiter_max_attempts), "aws_conn_id": self.aws_conn_id, - "region": self.region, + "region_name": self.region_name, "force_delete_compute": self.force_delete_compute, }, ) - @cached_property - def hook(self) -> EksHook: - return EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region) + def hook(self) -> AwsGenericHook: + return EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) async def run(self): async with self.hook.async_conn as client: @@ -164,6 +137,12 @@ async def run(self): yield TriggerEvent({"status": "deleted"}) async def delete_any_nodegroups(self, client): + """ + Deletes all EKS Nodegroups for a provided Amazon EKS Cluster. + + All the EKS Nodegroups are deleted simultaneously. We wait for + all Nodegroups to be deleted before returning. + """ nodegroups = await client.list_nodegroups(clusterName=self.cluster_name) if nodegroups.get("nodegroups", None): self.log.info("Deleting nodegroups") @@ -329,7 +308,11 @@ def __init__( region_name: str | None, ): super().__init__( - serialized_fields={"cluster_name": cluster_name, "nodegroup_name": nodegroup_name}, + serialized_fields={ + "cluster_name": cluster_name, + "nodegroup_name": nodegroup_name, + "region_name": region_name, + }, waiter_name="nodegroup_active", waiter_args={"clusterName": cluster_name, "nodegroupName": nodegroup_name}, failure_message="Error creating nodegroup", diff --git a/tests/providers/amazon/aws/triggers/test_eks.py b/tests/providers/amazon/aws/triggers/test_eks.py index de39c2e9370ac..023f8d2a97d46 100644 --- a/tests/providers/amazon/aws/triggers/test_eks.py +++ b/tests/providers/amazon/aws/triggers/test_eks.py @@ -22,6 +22,7 @@ EksCreateClusterTrigger, EksCreateFargateProfileTrigger, EksCreateNodegroupTrigger, + EksDeleteClusterTrigger, EksDeleteFargateProfileTrigger, EksDeleteNodegroupTrigger, ) @@ -69,6 +70,21 @@ class TestEksTriggers: waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS, region_name=TEST_REGION, ), + EksCreateClusterTrigger( + cluster_name=TEST_CLUSTER_IDENTIFIER, + waiter_delay=TEST_WAITER_DELAY, + waiter_max_attempts=TEST_WAITER_DELAY, + aws_conn_id=TEST_AWS_CONN_ID, + region_name=TEST_REGION, + ), + EksDeleteClusterTrigger( + cluster_name=TEST_CLUSTER_IDENTIFIER, + waiter_delay=TEST_WAITER_DELAY, + waiter_max_attempts=TEST_WAITER_DELAY, + aws_conn_id=TEST_AWS_CONN_ID, + region_name=TEST_REGION, + force_delete_compute=True, + ), ], ) def test_serialize_recreate(self, trigger): From 9660412a52e66639ee0f2005d3896e621fef75cb Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 11 Jul 2023 12:41:07 -0700 Subject: [PATCH 5/7] Add type hints to execute_* functions --- airflow/providers/amazon/aws/operators/eks.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index 5146da9de17b6..e0a3ce78c4d42 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -21,7 +21,7 @@ import warnings from ast import literal_eval from datetime import timedelta -from typing import TYPE_CHECKING, List, Sequence, cast +from typing import TYPE_CHECKING, Any, List, Sequence, cast from botocore.exceptions import ClientError, WaiterError @@ -331,7 +331,7 @@ def execute(self, context: Context): subnets=cast(List[str], self.resources_vpc_config.get("subnetIds")), ) - def deferrable_create_cluster_next(self, context, event=None): + def deferrable_create_cluster_next(self, context: Context, event: dict[str, Any] = {}) -> None: if event["status"] == "failed": self.log.error("Cluster failed to start and will be torn down.") self.eks_hook.delete_cluster(name=self.cluster_name) @@ -342,7 +342,7 @@ def deferrable_create_cluster_next(self, context, event=None): waiter_max_attempts=self.waiter_max_attempts, aws_conn_id=self.aws_conn_id, region_name=self.region, - delete_resources=False, + force_delete_compute=False, ), method_name="execute_failed", timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), @@ -393,18 +393,17 @@ def deferrable_create_cluster_next(self, context, event=None): timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), ) - def execute_failed(self, context, event=None): + def execute_failed(self, context: Context, event: dict[str, Any] = {}) -> None: if event["status"] == "delteted": self.log.info("Cluster deleted") raise event["exception"] - def execute_complete(self, context, event=None): + def execute_complete(self, context: Context, event: dict[str, Any] = {}) -> None: resource = "fargate profile" if self.compute == "fargate" else self.compute if event["status"] != "success": raise AirflowException(f"Error creating {resource}: {event}") - else: - self.log.info("%s created successfully", resource) - return + + self.log.info("%s created successfully", resource) class EksCreateNodegroupOperator(BaseOperator): @@ -715,7 +714,7 @@ def execute(self, context: Context): method_name="execute_complete", timeout=timedelta(seconds=self.waiter_delay * self.waiter_max_attempts), ) - if self.force_delete_compute: + elif self.force_delete_compute: self.delete_any_nodegroups(eks_hook) self.delete_any_fargate_profiles(eks_hook) @@ -763,7 +762,7 @@ def delete_any_fargate_profiles(self, eks_hook) -> None: ) self.log.info(SUCCESS_MSG.format(compute=FARGATE_FULL_NAME)) - def execute_complete(self, context, event=None): + def execute_complete(self, context: Context, event: dict[str, Any] = {}) -> None: if event["status"] == "success": self.log.info("Cluster deleted successfully.") From 0befff82b9c7a576da19da6328678247da8988bc Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Wed, 12 Jul 2023 12:40:59 -0700 Subject: [PATCH 6/7] Change function definition of execute_* method to set default value of event to None --- airflow/providers/amazon/aws/operators/eks.py | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index e0a3ce78c4d42..e7931cc07a075 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -331,8 +331,11 @@ def execute(self, context: Context): subnets=cast(List[str], self.resources_vpc_config.get("subnetIds")), ) - def deferrable_create_cluster_next(self, context: Context, event: dict[str, Any] = {}) -> None: - if event["status"] == "failed": + def deferrable_create_cluster_next(self, context: Context, event: dict[str, Any] | None = None) -> None: + if event is None: + self.log.error("Trigger error: event is None") + raise AirflowException("Trigger error: event is None") + elif event["status"] == "failed": self.log.error("Cluster failed to start and will be torn down.") self.eks_hook.delete_cluster(name=self.cluster_name) self.defer( @@ -393,14 +396,20 @@ def deferrable_create_cluster_next(self, context: Context, event: dict[str, Any] timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), ) - def execute_failed(self, context: Context, event: dict[str, Any] = {}) -> None: - if event["status"] == "delteted": + def execute_failed(self, context: Context, event: dict[str, Any] | None = None) -> None: + if event is None: + self.log.info("Trigger error: event is None") + raise AirflowException("Trigger error: event is None") + elif event["status"] == "delteted": self.log.info("Cluster deleted") raise event["exception"] - def execute_complete(self, context: Context, event: dict[str, Any] = {}) -> None: + def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None: resource = "fargate profile" if self.compute == "fargate" else self.compute - if event["status"] != "success": + if event is None: + self.log.info("Trigger error: event is None") + raise AirflowException("Trigger error: event is None") + elif event["status"] != "success": raise AirflowException(f"Error creating {resource}: {event}") self.log.info("%s created successfully", resource) @@ -762,8 +771,11 @@ def delete_any_fargate_profiles(self, eks_hook) -> None: ) self.log.info(SUCCESS_MSG.format(compute=FARGATE_FULL_NAME)) - def execute_complete(self, context: Context, event: dict[str, Any] = {}) -> None: - if event["status"] == "success": + def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None: + if event is None: + self.log.error("Trigger error. Event is None") + raise AirflowException("Trigger error. Event is None") + elif event["status"] == "success": self.log.info("Cluster deleted successfully.") From 0cd3069818f0215620ae7a44df0fd2fbf4a6d487 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Fri, 14 Jul 2023 09:37:59 -0700 Subject: [PATCH 7/7] Minor Refactor --- airflow/providers/amazon/aws/triggers/eks.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/triggers/eks.py b/airflow/providers/amazon/aws/triggers/eks.py index 576286d209c6a..ff99b512001ea 100644 --- a/airflow/providers/amazon/aws/triggers/eks.py +++ b/airflow/providers/amazon/aws/triggers/eks.py @@ -136,7 +136,7 @@ async def run(self): yield TriggerEvent({"status": "deleted"}) - async def delete_any_nodegroups(self, client): + async def delete_any_nodegroups(self, client) -> None: """ Deletes all EKS Nodegroups for a provided Amazon EKS Cluster. @@ -146,10 +146,14 @@ async def delete_any_nodegroups(self, client): nodegroups = await client.list_nodegroups(clusterName=self.cluster_name) if nodegroups.get("nodegroups", None): self.log.info("Deleting nodegroups") + # ignoring attr-defined here because aws_base hook defines get_waiter for all hooks + waiter = self.hook.get_waiter( # type: ignore[attr-defined] + "all_nodegroups_deleted", deferrable=True, client=client + ) for group in nodegroups["nodegroups"]: await client.delete_nodegroup(clusterName=self.cluster_name, nodegroupName=group) await async_wait( - waiter=self.hook.get_waiter("all_nodegroups_deleted", deferrable=True, client=client), + waiter=waiter, waiter_delay=int(self.waiter_delay), waiter_max_attempts=int(self.waiter_max_attempts), args={"clusterName": self.cluster_name},