From cac5a4d97805200203136b0ee618218ad87d4461 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Thu, 26 May 2022 18:28:47 +0200 Subject: [PATCH 01/14] Add RdsCreateDbInstanceOperator --- airflow/providers/amazon/aws/operators/rds.py | 312 +++++++++++++++++- .../amazon/aws/operators/test_rds.py | 34 ++ 2 files changed, 344 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index a527107e80a46..5649199b13ba7 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -18,9 +18,9 @@ import json import time -from typing import TYPE_CHECKING, List, Optional, Sequence +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence -from mypy_boto3_rds.type_defs import TagTypeDef +from mypy_boto3_rds.type_defs import ProcessorFeatureTypeDef, TagTypeDef from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -551,6 +551,313 @@ def execute(self, context: 'Context') -> str: return json.dumps(delete_subscription, default=str) +class RdsCreateDbInstanceOperator(RdsBaseOperator): + """ + Creates an RDS DB instance + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RdsCreateDbInstanceOperator` + + :param db_instance_identifier: The DB instance identifier, must start with a letter and + contain from 1 to 63 letters, numbers, or hyphens + :param db_instance_class: The compute and memory capacity of the DB instance, for example db.m5.large + :param engine: The name of the database engine to be used for this instance + :param db_name: The name of the database to create when the DB instance is created + :param allocated_storage: The amount of storage in GiB to allocate for the DB instance + :param master_username: The name for the master user + :param master_user_password: The password for the master user + :param db_security_groups: A list of DB security groups to associate with this DB instance + :param vpc_security_group_ids: A list of Amazon EC2 VPC security groups to associate with this DB instance + :param availability_zone: The Availability Zone (AZ) where the database will be created + :param db_subnet_group_name: A DB subnet group to associate with this DB instance + :param preferred_maintenance_window: The time range each week in format during + which system maintenance can occur + :param db_parameter_group_name: The name of the DB parameter group to associate with this DB instance + :param backup_retention_period: The number of days for which automated backups are retained + :param preferred_backup_window: The daily time range during which automated backups are created + if automated backups are enabled + :param port: The port number on which the database accepts connections + :param multi_az: A value that indicates whether the DB instance is a Multi-AZ deployment + :param engine_version: The version number of the database engine to use + :param auto_minor_version_upgrade: A value that indicates whether + minor engine upgrades are applied automatically to the DB instance during the maintenance window + :param license_model: License model information for this DB instance + :param iops: The amount of Provisioned IOPS to be initially allocated for the DB instance + :param option_group_name: A value that indicates that + the DB instance should be associated with the specified option group + :param character_set_name: For supported engines, this value indicates + that the DB instance should be associated with the specified CharacterSet + :param nchar_character_set_name: The name of the NCHAR character set for the Oracle DB instance + :param publicly_accessible: A value that indicates whether the DB instance is publicly accessible + :param tags: Tags to assign to the DB instance + :param db_cluster_identifier: The identifier of the DB cluster that the instance will belong to + :param storage_type: Specifies the storage type to be associated with the DB instance + :param tde_credential_arn: The ARN from the key store + with which to associate the instance for TDE encryption + :param tde_credential_password: The password for the given ARN from the key store + in order to access the device + :param storage_encrypted: A value that indicates whether the DB instance is encrypted + :param kms_key_id: The AWS KMS key identifier for an encrypted DB instance + :param domain: The Active Directory directory ID to create the DB instance in + :param copy_tags_to_snapshot: A value that indicates whether to copy tags + from the DB instance to snapshots of the DB instance + :param monitoring_interval: The interval, in seconds, + between points when Enhanced Monitoring metrics are collected for the DB instance + :param monitoring_role_arn: The ARN for the IAM role + that permits RDS to send enhanced monitoring metrics to Amazon CloudWatch Logs + :param domain_iam_role_name: Specify the name of the IAM role to be used + when making API calls to the Directory Service + :param promotion_tier: A value that specifies the order in which + an Aurora Replica is promoted to the primary instance after a failure of the existing primary instance + :param timezone: The time zone of the DB instance + :param enable_iam_database_authentication: A value that indicates whether to enable + mapping of IAM accounts to database accounts + :param enable_performance_insights: A value that indicates whether to enable + Performance Insights for the DB instance + :param performance_insights_kms_key_id: The AWS KMS key identifier for + encryption of Performance Insights data + :param performance_insights_retention_period: The amount of time, in days, to retain + Performance Insights data, valid values are 7 or 731 (2 years) + :param enable_cloudwatch_logs_exports: The list of log types that need to be enabled + for exporting to CloudWatch Logs + :param processor_features: The number of CPU cores and the number of threads + per core for the DB instance class of the DB instance + :param deletion_protection: A value that indicates whether the DB instance has deletion protection enabled + :param max_allocated_storage: The upper limit in GiB to + which Amazon RDS can automatically scale the storage of the DB instance + :param enable_customer_owned_ip: A value that indicates whether to + enable a customer-owned IP address (CoIP) for an RDS on Outposts DB instance + :param custom_iam_instance_profile: The instance profile associated with + the underlying Amazon EC2 instance of an RDS Custom DB instance + :param backup_target: Specifies where automated backups and manual snapshots are stored + :param network_type: The network type of the DB instance + """ + + def __init__( + self, + *, + db_instance_identifier: str, + db_instance_class: str, + engine: str, + db_name: Optional[str] = None, + allocated_storage: Optional[int] = None, + master_username: Optional[str] = None, + master_user_password: Optional[str] = None, + db_security_groups: Optional[Sequence[str]] = None, + vpc_security_group_ids: Optional[Sequence[str]] = None, + availability_zone: Optional[str] = None, + db_subnet_group_name: Optional[str] = None, + preferred_maintenance_window: Optional[str] = None, + db_parameter_group_name: Optional[str] = None, + backup_retention_period: Optional[int] = None, + preferred_backup_window: Optional[str] = None, + port: Optional[int] = None, + multi_az: Optional[bool] = None, + engine_version: Optional[str] = None, + auto_minor_version_upgrade: Optional[bool] = None, + license_model: Optional[str] = None, + iops: Optional[int] = None, + option_group_name: Optional[str] = None, + character_set_name: Optional[str] = None, + nchar_character_set_name: Optional[str] = None, + publicly_accessible: Optional[bool] = None, + tags: Optional[Sequence[TagTypeDef]] = None, + db_cluster_identifier: Optional[str] = None, + storage_type: Optional[str] = None, + tde_credential_arn: Optional[str] = None, + tde_credential_password: Optional[str] = None, + storage_encrypted: Optional[bool] = None, + kms_key_id: Optional[str] = None, + domain: Optional[str] = None, + copy_tags_to_snapshot: Optional[bool] = None, + monitoring_interval: Optional[int] = None, + monitoring_role_arn: Optional[str] = None, + domain_iam_role_name: Optional[str] = None, + promotion_tier: Optional[int] = None, + timezone: Optional[str] = None, + enable_iam_database_authentication: Optional[bool] = None, + enable_performance_insights: Optional[bool] = None, + performance_insights_kms_key_id: Optional[str] = None, + performance_insights_retention_period: Optional[int] = None, + enable_cloudwatch_logs_exports: Optional[Sequence[str]] = None, + processor_features: Optional[Sequence[ProcessorFeatureTypeDef]] = None, + deletion_protection: Optional[bool] = None, + max_allocated_storage: Optional[int] = None, + enable_customer_owned_ip: Optional[bool] = None, + custom_iam_instance_profile: Optional[str] = None, + backup_target: Optional[str] = None, + network_type: Optional[str] = None, + aws_conn_id: str = "aws_default", + **kwargs, + ): + super().__init__(aws_conn_id=aws_conn_id, **kwargs) + + self.db_instance_identifier = db_instance_identifier + self.db_instance_class = db_instance_class + self.engine = engine + self.db_name = db_name + self.allocated_storage = allocated_storage + self.master_username = master_username + self.master_user_password = master_user_password + self.db_security_groups = db_security_groups or [] + self.vpc_security_group_ids = vpc_security_group_ids or [] + self.availability_zone = availability_zone + self.db_subnet_group_name = db_subnet_group_name + self.preferred_maintenance_window = preferred_maintenance_window + self.db_parameter_group_name = db_parameter_group_name + self.backup_retention_period = backup_retention_period + self.preferred_backup_window = preferred_backup_window + self.port = port + self.multi_az = multi_az + self.engine_version = engine_version + self.auto_minor_version_upgrade = auto_minor_version_upgrade + self.license_model = license_model + self.iops = iops + self.option_group_name = option_group_name + self.character_set_name = character_set_name + self.nchar_character_set_name = nchar_character_set_name + self.publicly_accessible = publicly_accessible + self.tags = tags or [] + self.db_cluster_identifier = db_cluster_identifier + self.storage_type = storage_type + self.tde_credential_arn = tde_credential_arn + self.tde_credential_password = tde_credential_password + self.storage_encrypted = storage_encrypted + self.kms_key_id = kms_key_id + self.domain = domain + self.copy_tags_to_snapshot = copy_tags_to_snapshot + self.monitoring_interval = monitoring_interval + self.monitoring_role_arn = monitoring_role_arn + self.domain_iam_role_name = domain_iam_role_name + self.promotion_tier = promotion_tier + self.timezone = timezone + self.enable_iam_database_authentication = enable_iam_database_authentication + self.enable_performance_insights = enable_performance_insights + self.performance_insights_kms_key_id = performance_insights_kms_key_id + self.performance_insights_retention_period = performance_insights_retention_period + self.enable_cloudwatch_logs_exports = enable_cloudwatch_logs_exports or [] + self.processor_features = processor_features or [] + self.deletion_protection = deletion_protection + self.max_allocated_storage = max_allocated_storage + self.enable_customer_owned_ip = enable_customer_owned_ip + self.custom_iam_instance_profile = custom_iam_instance_profile + self.backup_target = backup_target + self.network_type = network_type + + def execute(self, context: 'Context') -> str: + self.log.info(f"Creating new DB instance {self.db_instance_identifier}") + params: Dict[str, Any] = {} + if self.db_name: + params["DBName"] = self.db_name + if self.allocated_storage: + params["AllocatedStorage"] = self.allocated_storage + if self.master_username: + params["MasterUsername"] = self.master_username + if self.master_user_password: + params["MasterUserPassword"] = self.master_user_password + if self.db_security_groups: + params["DBSecurityGroups"] = self.db_security_groups + if self.vpc_security_group_ids: + params["VpcSecurityGroupIds"] = self.vpc_security_group_ids + if self.availability_zone: + params["AvailabilityZone"] = self.availability_zone + if self.db_subnet_group_name: + params["DBSubnetGroupName"] = self.db_subnet_group_name + if self.preferred_maintenance_window: + params["PreferredMaintenanceWindow"] = self.preferred_maintenance_window + if self.db_parameter_group_name: + params["DBParameterGroupName"] = self.db_parameter_group_name + if self.backup_retention_period: + params["BackupRetentionPeriod"] = self.backup_retention_period + if self.preferred_backup_window: + params["PreferredBackupWindow"] = self.preferred_backup_window + if self.port: + params["Port"] = self.port + if self.multi_az: + params["MultiAZ"] = self.multi_az + if self.engine_version: + params["EngineVersion"] = self.engine_version + if self.auto_minor_version_upgrade: + params["AutoMinorVersionUpgrade"] = self.auto_minor_version_upgrade + if self.license_model: + params["LicenseModel"] = self.license_model + if self.iops: + params["Iops"] = self.iops + if self.option_group_name: + params["OptionGroupName"] = self.option_group_name + if self.character_set_name: + params["CharacterSetName"] = self.character_set_name + if self.nchar_character_set_name: + params["NcharCharacterSetName"] = self.nchar_character_set_name + if self.publicly_accessible: + params["PubliclyAccessible"] = self.publicly_accessible + if self.tags: + params["Tags"] = self.tags + if self.db_cluster_identifier: + params["DBClusterIdentifier"] = self.db_cluster_identifier + if self.storage_type: + params["StorageType"] = self.storage_type + if self.tde_credential_arn: + params["TdeCredentialArn"] = self.tde_credential_arn + if self.tde_credential_password: + params["TdeCredentialPassword"] = self.tde_credential_password + if self.storage_encrypted: + params["StorageEncrypted"] = self.storage_encrypted + if self.kms_key_id: + params["KmsKeyId"] = self.kms_key_id + if self.domain: + params["Domain"] = self.domain + if self.copy_tags_to_snapshot: + params["CopyTagsToSnapshot"] = self.copy_tags_to_snapshot + if self.monitoring_interval: + params["MonitoringInterval"] = self.monitoring_interval + if self.monitoring_role_arn: + params["MonitoringRoleArn"] = self.monitoring_role_arn + if self.domain_iam_role_name: + params["DomainIAMRoleName"] = self.domain_iam_role_name + if self.promotion_tier: + params["PromotionTier"] = self.promotion_tier + if self.timezone: + params["Timezone"] = self.timezone + if self.enable_iam_database_authentication: + params["EnableIAMDatabaseAuthentication"] = self.enable_iam_database_authentication + if self.enable_performance_insights: + params["EnablePerformanceInsights"] = self.enable_performance_insights + if self.performance_insights_kms_key_id: + params["PerformanceInsightsKMSKeyId"] = self.performance_insights_kms_key_id + if self.performance_insights_retention_period: + params["PerformanceInsightsRetentionPeriod"] = self.performance_insights_retention_period + if self.enable_cloudwatch_logs_exports: + params["EnableCloudwatchLogsExports"] = self.enable_cloudwatch_logs_exports + if self.processor_features: + params["ProcessorFeatures"] = self.processor_features + if self.deletion_protection: + params["DeletionProtection"] = self.deletion_protection + if self.max_allocated_storage: + params["MaxAllocatedStorage"] = self.max_allocated_storage + if self.enable_customer_owned_ip: + params["EnableCustomerOwnedIp"] = self.enable_customer_owned_ip + if self.custom_iam_instance_profile: + params["CustomIamInstanceProfile"] = self.custom_iam_instance_profile + if self.backup_target: + params["BackupTarget"] = self.backup_target + if self.network_type: + params["NetworkType"] = self.network_type + + create_db_instance = self.hook.conn.create_db_instance( + DBInstanceIdentifier=self.db_instance_identifier, + DBInstanceClass=self.db_instance_class, + Engine=self.engine, + **params, + ) + self.hook.conn.get_waiter("db_instance_available").wait( + DBInstanceIdentifier=self.db_instance_identifier + ) + + return json.dumps(create_db_instance, default=str) + + __all__ = [ "RdsCreateDbSnapshotOperator", "RdsCopyDbSnapshotOperator", @@ -559,4 +866,5 @@ def execute(self, context: 'Context') -> str: "RdsDeleteEventSubscriptionOperator", "RdsStartExportTaskOperator", "RdsCancelExportTaskOperator", + "RdsCreateDbInstanceOperator", ] diff --git a/tests/providers/amazon/aws/operators/test_rds.py b/tests/providers/amazon/aws/operators/test_rds.py index d952fbc11a93c..ca1510bb707a3 100644 --- a/tests/providers/amazon/aws/operators/test_rds.py +++ b/tests/providers/amazon/aws/operators/test_rds.py @@ -26,6 +26,7 @@ RdsBaseOperator, RdsCancelExportTaskOperator, RdsCopyDbSnapshotOperator, + RdsCreateDbInstanceOperator, RdsCreateDbSnapshotOperator, RdsCreateEventSubscriptionOperator, RdsDeleteDbSnapshotOperator, @@ -450,3 +451,36 @@ def test_delete_event_subscription(self): with pytest.raises(self.hook.conn.exceptions.ClientError): self.hook.conn.describe_event_subscriptions(SubscriptionName=EXPORT_TASK_NAME) + + +@pytest.mark.skipif(mock_rds is None, reason='mock_rds package not present') +class TestRdsCreateDbInstanceOperator: + @classmethod + def setup_class(cls): + cls.dag = DAG('test_dag', default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE}) + cls.hook = RdsHook(aws_conn_id=AWS_CONN, region_name='us-east-1') + + @classmethod + def teardown_class(cls): + del cls.dag + del cls.hook + + @mock_rds + def test_create_db_instance(self): + create_db_instance_operator = RdsCreateDbInstanceOperator( + task_id='test_create_db_instance', + db_name=DB_INSTANCE_NAME, + db_instance_identifier=DB_INSTANCE_NAME, + db_instance_class="db.m5.large", + engine="postgres", + aws_conn_id=AWS_CONN, + dag=self.dag, + ) + create_db_instance_operator.execute(None) + + result = self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME) + db_instances = result.get("DBInstances") + + assert db_instances + assert len(db_instances) == 1 + assert db_instances[0]['DBInstanceStatus'] == 'available' From e9355c32946593c36fc36c7a89426c0d33df7fb8 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Mon, 30 May 2022 12:17:39 +0200 Subject: [PATCH 02/14] Add RdsDeleteDbInstanceOperator --- airflow/providers/amazon/aws/operators/rds.py | 55 +++++++++++++++++++ .../amazon/aws/operators/test_rds.py | 29 ++++++++++ 2 files changed, 84 insertions(+) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index 5649199b13ba7..1f98f48468e7d 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -858,6 +858,60 @@ def execute(self, context: 'Context') -> str: return json.dumps(create_db_instance, default=str) +class RdsDeleteDbInstanceOperator(RdsBaseOperator): + """ + Deletes an RDS DB Instance + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RdsDeleteDbInstanceOperator` + + :param db_instance_identifier: The DB instance identifier for the DB instance to be deleted + :param skip_final_snapshot: A value that indicates whether to + skip the creation of a final DB snapshot before deleting the instance + :param final_db_snapshot_identifier: The DBSnapshotIdentifier of the new DBSnapshot created + when the SkipFinalSnapshot parameter is disabled + :param delete_automated_backups: A value that indicates whether to + remove automated backups immediately after the DB instance is deleted + """ + + def __init__( + self, + *, + db_instance_identifier: str, + skip_final_snapshot: Optional[bool] = None, + final_db_snapshot_identifier: Optional[str] = None, + delete_automated_backups: Optional[bool] = None, + aws_conn_id: str = "aws_default", + **kwargs, + ): + super().__init__(aws_conn_id=aws_conn_id, **kwargs) + self.db_instance_identifier = db_instance_identifier + self.skip_final_snapshot = skip_final_snapshot + self.final_db_snapshot_identifier = final_db_snapshot_identifier + self.delete_automated_backups = delete_automated_backups + + def execute(self, context: 'Context') -> str: + self.log.info(f"Deleting DB instance {self.db_instance_identifier}") + params: Dict[str, Any] = {} + if self.skip_final_snapshot: + params["SkipFinalSnapshot"] = self.skip_final_snapshot + if self.final_db_snapshot_identifier: + params["FinalDBSnapshotIdentifier"] = self.final_db_snapshot_identifier + if self.delete_automated_backups: + params["DeleteAutomatedBackups"] = self.delete_automated_backups + + delete_db_instance = self.hook.conn.delete_db_instance( + DBInstanceIdentifier=self.db_instance_identifier, + **params, + ) + self.hook.conn.get_waiter("db_instance_deleted").wait( + DBInstanceIdentifier=self.db_instance_identifier + ) + + return json.dumps(delete_db_instance, default=str) + + __all__ = [ "RdsCreateDbSnapshotOperator", "RdsCopyDbSnapshotOperator", @@ -867,4 +921,5 @@ def execute(self, context: 'Context') -> str: "RdsStartExportTaskOperator", "RdsCancelExportTaskOperator", "RdsCreateDbInstanceOperator", + "RdsDeleteDbInstanceOperator", ] diff --git a/tests/providers/amazon/aws/operators/test_rds.py b/tests/providers/amazon/aws/operators/test_rds.py index ca1510bb707a3..7aa64856d4c54 100644 --- a/tests/providers/amazon/aws/operators/test_rds.py +++ b/tests/providers/amazon/aws/operators/test_rds.py @@ -29,6 +29,7 @@ RdsCreateDbInstanceOperator, RdsCreateDbSnapshotOperator, RdsCreateEventSubscriptionOperator, + RdsDeleteDbInstanceOperator, RdsDeleteDbSnapshotOperator, RdsDeleteEventSubscriptionOperator, RdsStartExportTaskOperator, @@ -484,3 +485,31 @@ def test_create_db_instance(self): assert db_instances assert len(db_instances) == 1 assert db_instances[0]['DBInstanceStatus'] == 'available' + + +@pytest.mark.skipif(mock_rds is None, reason='mock_rds package not present') +class TestRdsDeleteDbInstanceOperator: + @classmethod + def setup_class(cls): + cls.dag = DAG('test_dag', default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE}) + cls.hook = RdsHook(aws_conn_id=AWS_CONN, region_name='us-east-1') + + @classmethod + def teardown_class(cls): + del cls.dag + del cls.hook + + @mock_rds + def test_delete_event_subscription(self): + _create_db_instance(self.hook) + + delete_db_instance_operator = RdsDeleteDbInstanceOperator( + task_id='test_delete_db_instance', + db_instance_identifier=DB_INSTANCE_NAME, + aws_conn_id=AWS_CONN, + dag=self.dag, + ) + delete_db_instance_operator.execute(None) + + with pytest.raises(self.hook.conn.exceptions.ClientError): + self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME) From af0226d5daa8956b7b0b14dae3eb1c9db2b3cdab Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Mon, 30 May 2022 12:18:11 +0200 Subject: [PATCH 03/14] Add example DAG for RDS db instance create and delete operators --- .../example_dags/example_rds_db_instance.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py diff --git a/airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py b/airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py new file mode 100644 index 0000000000000..c5d78529f8a6a --- /dev/null +++ b/airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime +from os import getenv + +from airflow import DAG +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.rds import ( + RdsCreateDbInstanceOperator, + RdsDeleteDbInstanceOperator, +) + +RDS_DB_IDENTIFIER = getenv("RDS_DB_IDENTIFIER", "database-identifier") + +with DAG( + dag_id='example_rds_instance', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=['example'], + catchup=False, +) as dag: + # [START howto_operator_rds_create_db_instance] + create_db_instance = RdsCreateDbInstanceOperator( + task_id='create_db_instance', + db_name=RDS_DB_IDENTIFIER, + db_instance_identifier=RDS_DB_IDENTIFIER, + db_instance_class="db.m5.large", + engine="postgres", + ) + # [END howto_operator_rds_create_db_instance] + + # [START howto_operator_rds_delete_db_instance] + delete_db_instance = RdsDeleteDbInstanceOperator( + task_id='delete_db_instance', + db_instance_identifier=RDS_DB_IDENTIFIER, + ) + # [END howto_operator_rds_delete_db_instance] + + chain(create_db_instance, delete_db_instance) From 7828e94628770a8a9712fd4ceee4bef8e027068c Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Mon, 30 May 2022 14:55:49 +0200 Subject: [PATCH 04/14] Add documentation for RDSCreateDBSnapshotOperator, RDSDeleteDbInstanceOperator --- .../operators/rds.rst | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/docs/apache-airflow-providers-amazon/operators/rds.rst b/docs/apache-airflow-providers-amazon/operators/rds.rst index 7e48a983f4d0a..1a2af2cdf7e0a 100644 --- a/docs/apache-airflow-providers-amazon/operators/rds.rst +++ b/docs/apache-airflow-providers-amazon/operators/rds.rst @@ -138,6 +138,34 @@ To delete an Amazon RDS event subscription you can use :start-after: [START howto_operator_rds_delete_event_subscription] :end-before: [END howto_operator_rds_delete_event_subscription] +.. _howto/operator:RdsCreateDbInstanceOperator: + +Create a database instance +========================== + +To create a AWS DB instance you can use +:class:`~airflow.providers.amazon.aws.operators.rds.RdsCreateDbInstanceOperator`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_rds_create_db_instance] + :end-before: [END howto_operator_rds_create_db_instance] + +.. _howto/operator:RDSDeleteDbInstanceOperator: + +Delete a database instance +========================== + +To delete a AWS DB instance you can use +:class:`~airflow.providers.amazon.aws.operators.rds.RDSDeleteDbInstanceOperator`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_rds_delete_db_instance] + :end-before: [END howto_operator_rds_delete_db_instance] + Sensors ------- From b61a8497d29ec5c15f413b8400d415f494dba75b Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Thu, 2 Jun 2022 12:03:29 +0200 Subject: [PATCH 05/14] Adjust input arguments handling for RdsCreateDbInstanceOperator, RdsDeleteDbInstanceOperator --- .../example_dags/example_rds_db_instance.py | 13 +- airflow/providers/amazon/aws/operators/rds.py | 295 +----------------- .../amazon/aws/operators/test_rds.py | 7 +- 3 files changed, 28 insertions(+), 287 deletions(-) diff --git a/airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py b/airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py index c5d78529f8a6a..80bb26a688407 100644 --- a/airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py +++ b/airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py @@ -27,6 +27,10 @@ ) RDS_DB_IDENTIFIER = getenv("RDS_DB_IDENTIFIER", "database-identifier") +RDS_USERNAME = getenv("RDS_USERNAME", "database_username") +# NEVER store your production password in plaintext in a DAG like this. +# Use Airflow Secrets or a secret manager for this in production. +RDS_PASSWORD = getenv("RDS_PASSWORD", "database_password") with DAG( dag_id='example_rds_instance', @@ -38,10 +42,14 @@ # [START howto_operator_rds_create_db_instance] create_db_instance = RdsCreateDbInstanceOperator( task_id='create_db_instance', - db_name=RDS_DB_IDENTIFIER, db_instance_identifier=RDS_DB_IDENTIFIER, db_instance_class="db.m5.large", engine="postgres", + rds_kwargs={ + "MasterUsername": RDS_USERNAME, + "MasterUserPassword": RDS_PASSWORD, + "AllocatedStorage": 20, + }, ) # [END howto_operator_rds_create_db_instance] @@ -49,6 +57,9 @@ delete_db_instance = RdsDeleteDbInstanceOperator( task_id='delete_db_instance', db_instance_identifier=RDS_DB_IDENTIFIER, + rds_kwargs={ + "SkipFinalSnapshot": True, + }, ) # [END howto_operator_rds_delete_db_instance] diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index 1f98f48468e7d..c5b71acd39b59 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -18,9 +18,9 @@ import json import time -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence +from typing import TYPE_CHECKING, Dict, List, Optional, Sequence -from mypy_boto3_rds.type_defs import ProcessorFeatureTypeDef, TagTypeDef +from mypy_boto3_rds.type_defs import TagTypeDef from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -563,75 +563,7 @@ class RdsCreateDbInstanceOperator(RdsBaseOperator): contain from 1 to 63 letters, numbers, or hyphens :param db_instance_class: The compute and memory capacity of the DB instance, for example db.m5.large :param engine: The name of the database engine to be used for this instance - :param db_name: The name of the database to create when the DB instance is created - :param allocated_storage: The amount of storage in GiB to allocate for the DB instance - :param master_username: The name for the master user - :param master_user_password: The password for the master user - :param db_security_groups: A list of DB security groups to associate with this DB instance - :param vpc_security_group_ids: A list of Amazon EC2 VPC security groups to associate with this DB instance - :param availability_zone: The Availability Zone (AZ) where the database will be created - :param db_subnet_group_name: A DB subnet group to associate with this DB instance - :param preferred_maintenance_window: The time range each week in format during - which system maintenance can occur - :param db_parameter_group_name: The name of the DB parameter group to associate with this DB instance - :param backup_retention_period: The number of days for which automated backups are retained - :param preferred_backup_window: The daily time range during which automated backups are created - if automated backups are enabled - :param port: The port number on which the database accepts connections - :param multi_az: A value that indicates whether the DB instance is a Multi-AZ deployment - :param engine_version: The version number of the database engine to use - :param auto_minor_version_upgrade: A value that indicates whether - minor engine upgrades are applied automatically to the DB instance during the maintenance window - :param license_model: License model information for this DB instance - :param iops: The amount of Provisioned IOPS to be initially allocated for the DB instance - :param option_group_name: A value that indicates that - the DB instance should be associated with the specified option group - :param character_set_name: For supported engines, this value indicates - that the DB instance should be associated with the specified CharacterSet - :param nchar_character_set_name: The name of the NCHAR character set for the Oracle DB instance - :param publicly_accessible: A value that indicates whether the DB instance is publicly accessible - :param tags: Tags to assign to the DB instance - :param db_cluster_identifier: The identifier of the DB cluster that the instance will belong to - :param storage_type: Specifies the storage type to be associated with the DB instance - :param tde_credential_arn: The ARN from the key store - with which to associate the instance for TDE encryption - :param tde_credential_password: The password for the given ARN from the key store - in order to access the device - :param storage_encrypted: A value that indicates whether the DB instance is encrypted - :param kms_key_id: The AWS KMS key identifier for an encrypted DB instance - :param domain: The Active Directory directory ID to create the DB instance in - :param copy_tags_to_snapshot: A value that indicates whether to copy tags - from the DB instance to snapshots of the DB instance - :param monitoring_interval: The interval, in seconds, - between points when Enhanced Monitoring metrics are collected for the DB instance - :param monitoring_role_arn: The ARN for the IAM role - that permits RDS to send enhanced monitoring metrics to Amazon CloudWatch Logs - :param domain_iam_role_name: Specify the name of the IAM role to be used - when making API calls to the Directory Service - :param promotion_tier: A value that specifies the order in which - an Aurora Replica is promoted to the primary instance after a failure of the existing primary instance - :param timezone: The time zone of the DB instance - :param enable_iam_database_authentication: A value that indicates whether to enable - mapping of IAM accounts to database accounts - :param enable_performance_insights: A value that indicates whether to enable - Performance Insights for the DB instance - :param performance_insights_kms_key_id: The AWS KMS key identifier for - encryption of Performance Insights data - :param performance_insights_retention_period: The amount of time, in days, to retain - Performance Insights data, valid values are 7 or 731 (2 years) - :param enable_cloudwatch_logs_exports: The list of log types that need to be enabled - for exporting to CloudWatch Logs - :param processor_features: The number of CPU cores and the number of threads - per core for the DB instance class of the DB instance - :param deletion_protection: A value that indicates whether the DB instance has deletion protection enabled - :param max_allocated_storage: The upper limit in GiB to - which Amazon RDS can automatically scale the storage of the DB instance - :param enable_customer_owned_ip: A value that indicates whether to - enable a customer-owned IP address (CoIP) for an RDS on Outposts DB instance - :param custom_iam_instance_profile: The instance profile associated with - the underlying Amazon EC2 instance of an RDS Custom DB instance - :param backup_target: Specifies where automated backups and manual snapshots are stored - :param network_type: The network type of the DB instance + :rds_kwargs: Named arguments to pass to boto3 RDS client function ``create_db_instance`` """ def __init__( @@ -640,54 +572,7 @@ def __init__( db_instance_identifier: str, db_instance_class: str, engine: str, - db_name: Optional[str] = None, - allocated_storage: Optional[int] = None, - master_username: Optional[str] = None, - master_user_password: Optional[str] = None, - db_security_groups: Optional[Sequence[str]] = None, - vpc_security_group_ids: Optional[Sequence[str]] = None, - availability_zone: Optional[str] = None, - db_subnet_group_name: Optional[str] = None, - preferred_maintenance_window: Optional[str] = None, - db_parameter_group_name: Optional[str] = None, - backup_retention_period: Optional[int] = None, - preferred_backup_window: Optional[str] = None, - port: Optional[int] = None, - multi_az: Optional[bool] = None, - engine_version: Optional[str] = None, - auto_minor_version_upgrade: Optional[bool] = None, - license_model: Optional[str] = None, - iops: Optional[int] = None, - option_group_name: Optional[str] = None, - character_set_name: Optional[str] = None, - nchar_character_set_name: Optional[str] = None, - publicly_accessible: Optional[bool] = None, - tags: Optional[Sequence[TagTypeDef]] = None, - db_cluster_identifier: Optional[str] = None, - storage_type: Optional[str] = None, - tde_credential_arn: Optional[str] = None, - tde_credential_password: Optional[str] = None, - storage_encrypted: Optional[bool] = None, - kms_key_id: Optional[str] = None, - domain: Optional[str] = None, - copy_tags_to_snapshot: Optional[bool] = None, - monitoring_interval: Optional[int] = None, - monitoring_role_arn: Optional[str] = None, - domain_iam_role_name: Optional[str] = None, - promotion_tier: Optional[int] = None, - timezone: Optional[str] = None, - enable_iam_database_authentication: Optional[bool] = None, - enable_performance_insights: Optional[bool] = None, - performance_insights_kms_key_id: Optional[str] = None, - performance_insights_retention_period: Optional[int] = None, - enable_cloudwatch_logs_exports: Optional[Sequence[str]] = None, - processor_features: Optional[Sequence[ProcessorFeatureTypeDef]] = None, - deletion_protection: Optional[bool] = None, - max_allocated_storage: Optional[int] = None, - enable_customer_owned_ip: Optional[bool] = None, - custom_iam_instance_profile: Optional[str] = None, - backup_target: Optional[str] = None, - network_type: Optional[str] = None, + rds_kwargs: Optional[Dict] = None, aws_conn_id: str = "aws_default", **kwargs, ): @@ -696,160 +581,16 @@ def __init__( self.db_instance_identifier = db_instance_identifier self.db_instance_class = db_instance_class self.engine = engine - self.db_name = db_name - self.allocated_storage = allocated_storage - self.master_username = master_username - self.master_user_password = master_user_password - self.db_security_groups = db_security_groups or [] - self.vpc_security_group_ids = vpc_security_group_ids or [] - self.availability_zone = availability_zone - self.db_subnet_group_name = db_subnet_group_name - self.preferred_maintenance_window = preferred_maintenance_window - self.db_parameter_group_name = db_parameter_group_name - self.backup_retention_period = backup_retention_period - self.preferred_backup_window = preferred_backup_window - self.port = port - self.multi_az = multi_az - self.engine_version = engine_version - self.auto_minor_version_upgrade = auto_minor_version_upgrade - self.license_model = license_model - self.iops = iops - self.option_group_name = option_group_name - self.character_set_name = character_set_name - self.nchar_character_set_name = nchar_character_set_name - self.publicly_accessible = publicly_accessible - self.tags = tags or [] - self.db_cluster_identifier = db_cluster_identifier - self.storage_type = storage_type - self.tde_credential_arn = tde_credential_arn - self.tde_credential_password = tde_credential_password - self.storage_encrypted = storage_encrypted - self.kms_key_id = kms_key_id - self.domain = domain - self.copy_tags_to_snapshot = copy_tags_to_snapshot - self.monitoring_interval = monitoring_interval - self.monitoring_role_arn = monitoring_role_arn - self.domain_iam_role_name = domain_iam_role_name - self.promotion_tier = promotion_tier - self.timezone = timezone - self.enable_iam_database_authentication = enable_iam_database_authentication - self.enable_performance_insights = enable_performance_insights - self.performance_insights_kms_key_id = performance_insights_kms_key_id - self.performance_insights_retention_period = performance_insights_retention_period - self.enable_cloudwatch_logs_exports = enable_cloudwatch_logs_exports or [] - self.processor_features = processor_features or [] - self.deletion_protection = deletion_protection - self.max_allocated_storage = max_allocated_storage - self.enable_customer_owned_ip = enable_customer_owned_ip - self.custom_iam_instance_profile = custom_iam_instance_profile - self.backup_target = backup_target - self.network_type = network_type + self.rds_kwargs = rds_kwargs or {} def execute(self, context: 'Context') -> str: self.log.info(f"Creating new DB instance {self.db_instance_identifier}") - params: Dict[str, Any] = {} - if self.db_name: - params["DBName"] = self.db_name - if self.allocated_storage: - params["AllocatedStorage"] = self.allocated_storage - if self.master_username: - params["MasterUsername"] = self.master_username - if self.master_user_password: - params["MasterUserPassword"] = self.master_user_password - if self.db_security_groups: - params["DBSecurityGroups"] = self.db_security_groups - if self.vpc_security_group_ids: - params["VpcSecurityGroupIds"] = self.vpc_security_group_ids - if self.availability_zone: - params["AvailabilityZone"] = self.availability_zone - if self.db_subnet_group_name: - params["DBSubnetGroupName"] = self.db_subnet_group_name - if self.preferred_maintenance_window: - params["PreferredMaintenanceWindow"] = self.preferred_maintenance_window - if self.db_parameter_group_name: - params["DBParameterGroupName"] = self.db_parameter_group_name - if self.backup_retention_period: - params["BackupRetentionPeriod"] = self.backup_retention_period - if self.preferred_backup_window: - params["PreferredBackupWindow"] = self.preferred_backup_window - if self.port: - params["Port"] = self.port - if self.multi_az: - params["MultiAZ"] = self.multi_az - if self.engine_version: - params["EngineVersion"] = self.engine_version - if self.auto_minor_version_upgrade: - params["AutoMinorVersionUpgrade"] = self.auto_minor_version_upgrade - if self.license_model: - params["LicenseModel"] = self.license_model - if self.iops: - params["Iops"] = self.iops - if self.option_group_name: - params["OptionGroupName"] = self.option_group_name - if self.character_set_name: - params["CharacterSetName"] = self.character_set_name - if self.nchar_character_set_name: - params["NcharCharacterSetName"] = self.nchar_character_set_name - if self.publicly_accessible: - params["PubliclyAccessible"] = self.publicly_accessible - if self.tags: - params["Tags"] = self.tags - if self.db_cluster_identifier: - params["DBClusterIdentifier"] = self.db_cluster_identifier - if self.storage_type: - params["StorageType"] = self.storage_type - if self.tde_credential_arn: - params["TdeCredentialArn"] = self.tde_credential_arn - if self.tde_credential_password: - params["TdeCredentialPassword"] = self.tde_credential_password - if self.storage_encrypted: - params["StorageEncrypted"] = self.storage_encrypted - if self.kms_key_id: - params["KmsKeyId"] = self.kms_key_id - if self.domain: - params["Domain"] = self.domain - if self.copy_tags_to_snapshot: - params["CopyTagsToSnapshot"] = self.copy_tags_to_snapshot - if self.monitoring_interval: - params["MonitoringInterval"] = self.monitoring_interval - if self.monitoring_role_arn: - params["MonitoringRoleArn"] = self.monitoring_role_arn - if self.domain_iam_role_name: - params["DomainIAMRoleName"] = self.domain_iam_role_name - if self.promotion_tier: - params["PromotionTier"] = self.promotion_tier - if self.timezone: - params["Timezone"] = self.timezone - if self.enable_iam_database_authentication: - params["EnableIAMDatabaseAuthentication"] = self.enable_iam_database_authentication - if self.enable_performance_insights: - params["EnablePerformanceInsights"] = self.enable_performance_insights - if self.performance_insights_kms_key_id: - params["PerformanceInsightsKMSKeyId"] = self.performance_insights_kms_key_id - if self.performance_insights_retention_period: - params["PerformanceInsightsRetentionPeriod"] = self.performance_insights_retention_period - if self.enable_cloudwatch_logs_exports: - params["EnableCloudwatchLogsExports"] = self.enable_cloudwatch_logs_exports - if self.processor_features: - params["ProcessorFeatures"] = self.processor_features - if self.deletion_protection: - params["DeletionProtection"] = self.deletion_protection - if self.max_allocated_storage: - params["MaxAllocatedStorage"] = self.max_allocated_storage - if self.enable_customer_owned_ip: - params["EnableCustomerOwnedIp"] = self.enable_customer_owned_ip - if self.custom_iam_instance_profile: - params["CustomIamInstanceProfile"] = self.custom_iam_instance_profile - if self.backup_target: - params["BackupTarget"] = self.backup_target - if self.network_type: - params["NetworkType"] = self.network_type create_db_instance = self.hook.conn.create_db_instance( DBInstanceIdentifier=self.db_instance_identifier, DBInstanceClass=self.db_instance_class, Engine=self.engine, - **params, + **self.rds_kwargs, ) self.hook.conn.get_waiter("db_instance_available").wait( DBInstanceIdentifier=self.db_instance_identifier @@ -867,43 +608,27 @@ class RdsDeleteDbInstanceOperator(RdsBaseOperator): :ref:`howto/operator:RdsDeleteDbInstanceOperator` :param db_instance_identifier: The DB instance identifier for the DB instance to be deleted - :param skip_final_snapshot: A value that indicates whether to - skip the creation of a final DB snapshot before deleting the instance - :param final_db_snapshot_identifier: The DBSnapshotIdentifier of the new DBSnapshot created - when the SkipFinalSnapshot parameter is disabled - :param delete_automated_backups: A value that indicates whether to - remove automated backups immediately after the DB instance is deleted + :rds_kwargs: Named arguments to pass to boto3 RDS client function ``delete_db_instance`` """ def __init__( self, *, db_instance_identifier: str, - skip_final_snapshot: Optional[bool] = None, - final_db_snapshot_identifier: Optional[str] = None, - delete_automated_backups: Optional[bool] = None, + rds_kwargs: Optional[Dict] = None, aws_conn_id: str = "aws_default", **kwargs, ): super().__init__(aws_conn_id=aws_conn_id, **kwargs) self.db_instance_identifier = db_instance_identifier - self.skip_final_snapshot = skip_final_snapshot - self.final_db_snapshot_identifier = final_db_snapshot_identifier - self.delete_automated_backups = delete_automated_backups + self.rds_kwargs = rds_kwargs or {} def execute(self, context: 'Context') -> str: self.log.info(f"Deleting DB instance {self.db_instance_identifier}") - params: Dict[str, Any] = {} - if self.skip_final_snapshot: - params["SkipFinalSnapshot"] = self.skip_final_snapshot - if self.final_db_snapshot_identifier: - params["FinalDBSnapshotIdentifier"] = self.final_db_snapshot_identifier - if self.delete_automated_backups: - params["DeleteAutomatedBackups"] = self.delete_automated_backups delete_db_instance = self.hook.conn.delete_db_instance( DBInstanceIdentifier=self.db_instance_identifier, - **params, + **self.rds_kwargs, ) self.hook.conn.get_waiter("db_instance_deleted").wait( DBInstanceIdentifier=self.db_instance_identifier diff --git a/tests/providers/amazon/aws/operators/test_rds.py b/tests/providers/amazon/aws/operators/test_rds.py index 7aa64856d4c54..355ff529c992f 100644 --- a/tests/providers/amazon/aws/operators/test_rds.py +++ b/tests/providers/amazon/aws/operators/test_rds.py @@ -470,10 +470,12 @@ def teardown_class(cls): def test_create_db_instance(self): create_db_instance_operator = RdsCreateDbInstanceOperator( task_id='test_create_db_instance', - db_name=DB_INSTANCE_NAME, db_instance_identifier=DB_INSTANCE_NAME, db_instance_class="db.m5.large", engine="postgres", + rds_kwargs={ + "DBName": DB_INSTANCE_NAME, + }, aws_conn_id=AWS_CONN, dag=self.dag, ) @@ -506,6 +508,9 @@ def test_delete_event_subscription(self): delete_db_instance_operator = RdsDeleteDbInstanceOperator( task_id='test_delete_db_instance', db_instance_identifier=DB_INSTANCE_NAME, + rds_kwargs={ + "SkipFinalSnapshot": True, + }, aws_conn_id=AWS_CONN, dag=self.dag, ) From 37d3a8b60990219de1ef05e9178ddbcfc9a39c85 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Tue, 7 Jun 2022 22:41:20 +0200 Subject: [PATCH 06/14] Add `aws_conn_id` into docstring of `RdsCreateDbInstanceOperator` and `RdsDeleteDbInstanceOperator` --- airflow/providers/amazon/aws/operators/rds.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index c5b71acd39b59..fee6df83992bb 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -564,6 +564,7 @@ class RdsCreateDbInstanceOperator(RdsBaseOperator): :param db_instance_class: The compute and memory capacity of the DB instance, for example db.m5.large :param engine: The name of the database engine to be used for this instance :rds_kwargs: Named arguments to pass to boto3 RDS client function ``create_db_instance`` + :param aws_conn_id: The Airflow connection used for AWS credentials. """ def __init__( @@ -609,6 +610,7 @@ class RdsDeleteDbInstanceOperator(RdsBaseOperator): :param db_instance_identifier: The DB instance identifier for the DB instance to be deleted :rds_kwargs: Named arguments to pass to boto3 RDS client function ``delete_db_instance`` + :param aws_conn_id: The Airflow connection used for AWS credentials. """ def __init__( From 4ea330e13e39c772f42bdc2679dfc70261b97666 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Tue, 7 Jun 2022 22:54:28 +0200 Subject: [PATCH 07/14] Make examples for AWS RDS create and delete operators AIP-47 compatible --- .../operators/rds.rst | 4 ++-- .../system/providers/amazon/aws/rds/__init__.py | 17 +++++++++++++++++ .../amazon/aws/rds}/example_rds_db_instance.py | 11 ++++++++++- 3 files changed, 29 insertions(+), 3 deletions(-) create mode 100644 tests/system/providers/amazon/aws/rds/__init__.py rename {airflow/providers/amazon/aws/example_dags => tests/system/providers/amazon/aws/rds}/example_rds_db_instance.py (88%) diff --git a/docs/apache-airflow-providers-amazon/operators/rds.rst b/docs/apache-airflow-providers-amazon/operators/rds.rst index 1a2af2cdf7e0a..82292e3d1142b 100644 --- a/docs/apache-airflow-providers-amazon/operators/rds.rst +++ b/docs/apache-airflow-providers-amazon/operators/rds.rst @@ -146,7 +146,7 @@ Create a database instance To create a AWS DB instance you can use :class:`~airflow.providers.amazon.aws.operators.rds.RdsCreateDbInstanceOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/rds/example_rds_db_instance.py :language: python :dedent: 4 :start-after: [START howto_operator_rds_create_db_instance] @@ -160,7 +160,7 @@ Delete a database instance To delete a AWS DB instance you can use :class:`~airflow.providers.amazon.aws.operators.rds.RDSDeleteDbInstanceOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/rds/example_rds_db_instance.py :language: python :dedent: 4 :start-after: [START howto_operator_rds_delete_db_instance] diff --git a/tests/system/providers/amazon/aws/rds/__init__.py b/tests/system/providers/amazon/aws/rds/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/tests/system/providers/amazon/aws/rds/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py b/tests/system/providers/amazon/aws/rds/example_rds_db_instance.py similarity index 88% rename from airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py rename to tests/system/providers/amazon/aws/rds/example_rds_db_instance.py index 80bb26a688407..aa777c094d21c 100644 --- a/airflow/providers/amazon/aws/example_dags/example_rds_db_instance.py +++ b/tests/system/providers/amazon/aws/rds/example_rds_db_instance.py @@ -25,6 +25,10 @@ RdsCreateDbInstanceOperator, RdsDeleteDbInstanceOperator, ) +from tests.system.providers.amazon.aws.utils import set_env_id + +ENV_ID = set_env_id() +DAG_ID = "example_rds_instance" RDS_DB_IDENTIFIER = getenv("RDS_DB_IDENTIFIER", "database-identifier") RDS_USERNAME = getenv("RDS_USERNAME", "database_username") @@ -33,7 +37,7 @@ RDS_PASSWORD = getenv("RDS_PASSWORD", "database_password") with DAG( - dag_id='example_rds_instance', + dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2021, 1, 1), tags=['example'], @@ -64,3 +68,8 @@ # [END howto_operator_rds_delete_db_instance] chain(create_db_instance, delete_db_instance) + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From 8588af3d275eb20cf072502e7ae4dc04bded9151 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Wed, 8 Jun 2022 08:56:04 +0200 Subject: [PATCH 08/14] Fix missing `param` in docstring --- airflow/providers/amazon/aws/operators/rds.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index fee6df83992bb..396fcc3c952c6 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -563,7 +563,7 @@ class RdsCreateDbInstanceOperator(RdsBaseOperator): contain from 1 to 63 letters, numbers, or hyphens :param db_instance_class: The compute and memory capacity of the DB instance, for example db.m5.large :param engine: The name of the database engine to be used for this instance - :rds_kwargs: Named arguments to pass to boto3 RDS client function ``create_db_instance`` + :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``create_db_instance`` :param aws_conn_id: The Airflow connection used for AWS credentials. """ @@ -609,7 +609,7 @@ class RdsDeleteDbInstanceOperator(RdsBaseOperator): :ref:`howto/operator:RdsDeleteDbInstanceOperator` :param db_instance_identifier: The DB instance identifier for the DB instance to be deleted - :rds_kwargs: Named arguments to pass to boto3 RDS client function ``delete_db_instance`` + :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``delete_db_instance`` :param aws_conn_id: The Airflow connection used for AWS credentials. """ From ee096c5e3677280841e771cde016cdff72caba9e Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Wed, 8 Jun 2022 17:19:14 +0200 Subject: [PATCH 09/14] Add links to boto docs --- airflow/providers/amazon/aws/operators/rds.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index 396fcc3c952c6..d9394457cf85f 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -564,6 +564,7 @@ class RdsCreateDbInstanceOperator(RdsBaseOperator): :param db_instance_class: The compute and memory capacity of the DB instance, for example db.m5.large :param engine: The name of the database engine to be used for this instance :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``create_db_instance`` + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_instance :param aws_conn_id: The Airflow connection used for AWS credentials. """ @@ -610,6 +611,7 @@ class RdsDeleteDbInstanceOperator(RdsBaseOperator): :param db_instance_identifier: The DB instance identifier for the DB instance to be deleted :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``delete_db_instance`` + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.delete_db_instance :param aws_conn_id: The Airflow connection used for AWS credentials. """ From d8fa2f587ce7becc3632a530bafbacf1bbfc3852 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Tue, 14 Jun 2022 15:10:06 +0200 Subject: [PATCH 10/14] Rename file with example rds instance DAG --- docs/apache-airflow-providers-amazon/operators/rds.rst | 4 ++-- .../{example_rds_db_instance.py => example_rds_instance.py} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename tests/system/providers/amazon/aws/rds/{example_rds_db_instance.py => example_rds_instance.py} (100%) diff --git a/docs/apache-airflow-providers-amazon/operators/rds.rst b/docs/apache-airflow-providers-amazon/operators/rds.rst index 82292e3d1142b..434804d910be4 100644 --- a/docs/apache-airflow-providers-amazon/operators/rds.rst +++ b/docs/apache-airflow-providers-amazon/operators/rds.rst @@ -146,7 +146,7 @@ Create a database instance To create a AWS DB instance you can use :class:`~airflow.providers.amazon.aws.operators.rds.RdsCreateDbInstanceOperator`. -.. exampleinclude:: /../../tests/system/providers/amazon/aws/rds/example_rds_db_instance.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/rds/example_rds_instance.py :language: python :dedent: 4 :start-after: [START howto_operator_rds_create_db_instance] @@ -160,7 +160,7 @@ Delete a database instance To delete a AWS DB instance you can use :class:`~airflow.providers.amazon.aws.operators.rds.RDSDeleteDbInstanceOperator`. -.. exampleinclude:: /../../tests/system/providers/amazon/aws/rds/example_rds_db_instance.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/rds/example_rds_instance.py :language: python :dedent: 4 :start-after: [START howto_operator_rds_delete_db_instance] diff --git a/tests/system/providers/amazon/aws/rds/example_rds_db_instance.py b/tests/system/providers/amazon/aws/rds/example_rds_instance.py similarity index 100% rename from tests/system/providers/amazon/aws/rds/example_rds_db_instance.py rename to tests/system/providers/amazon/aws/rds/example_rds_instance.py From 72b30ca2d2d835669f359bf343ddd0be32e191f9 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Tue, 14 Jun 2022 15:20:59 +0200 Subject: [PATCH 11/14] Add flag `wait_for_completion` to `RdsCreateDbInstanceOperator` and `RdsDeleteDbInstanceOperator` --- airflow/providers/amazon/aws/operators/rds.py | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index d9394457cf85f..e4fbfbe161760 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -566,6 +566,7 @@ class RdsCreateDbInstanceOperator(RdsBaseOperator): :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``create_db_instance`` https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_instance :param aws_conn_id: The Airflow connection used for AWS credentials. + :param wait_for_completion: Whether or not wait for creation of the DB instance complete. (default: True) """ def __init__( @@ -576,6 +577,7 @@ def __init__( engine: str, rds_kwargs: Optional[Dict] = None, aws_conn_id: str = "aws_default", + wait_for_completion: bool = True, **kwargs, ): super().__init__(aws_conn_id=aws_conn_id, **kwargs) @@ -584,6 +586,7 @@ def __init__( self.db_instance_class = db_instance_class self.engine = engine self.rds_kwargs = rds_kwargs or {} + self.wait_for_completion = wait_for_completion def execute(self, context: 'Context') -> str: self.log.info(f"Creating new DB instance {self.db_instance_identifier}") @@ -594,9 +597,11 @@ def execute(self, context: 'Context') -> str: Engine=self.engine, **self.rds_kwargs, ) - self.hook.conn.get_waiter("db_instance_available").wait( - DBInstanceIdentifier=self.db_instance_identifier - ) + + if self.wait_for_completion: + self.hook.conn.get_waiter("db_instance_available").wait( + DBInstanceIdentifier=self.db_instance_identifier + ) return json.dumps(create_db_instance, default=str) @@ -613,6 +618,7 @@ class RdsDeleteDbInstanceOperator(RdsBaseOperator): :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``delete_db_instance`` https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.delete_db_instance :param aws_conn_id: The Airflow connection used for AWS credentials. + :param wait_for_completion: Whether or not wait for deletion of the DB instance complete. (default: True) """ def __init__( @@ -621,11 +627,13 @@ def __init__( db_instance_identifier: str, rds_kwargs: Optional[Dict] = None, aws_conn_id: str = "aws_default", + wait_for_completion: bool = True, **kwargs, ): super().__init__(aws_conn_id=aws_conn_id, **kwargs) self.db_instance_identifier = db_instance_identifier self.rds_kwargs = rds_kwargs or {} + self.wait_for_completion = wait_for_completion def execute(self, context: 'Context') -> str: self.log.info(f"Deleting DB instance {self.db_instance_identifier}") @@ -634,9 +642,11 @@ def execute(self, context: 'Context') -> str: DBInstanceIdentifier=self.db_instance_identifier, **self.rds_kwargs, ) - self.hook.conn.get_waiter("db_instance_deleted").wait( - DBInstanceIdentifier=self.db_instance_identifier - ) + + if self.wait_for_completion: + self.hook.conn.get_waiter("db_instance_deleted").wait( + DBInstanceIdentifier=self.db_instance_identifier + ) return json.dumps(delete_db_instance, default=str) From 7f145a756ecaa2af635baf4ded481878a91353b0 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Tue, 14 Jun 2022 16:31:18 +0200 Subject: [PATCH 12/14] Commit proposed changes in example_rds_instance DAG --- .../providers/amazon/aws/rds/example_rds_instance.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/system/providers/amazon/aws/rds/example_rds_instance.py b/tests/system/providers/amazon/aws/rds/example_rds_instance.py index aa777c094d21c..da157dbdf914b 100644 --- a/tests/system/providers/amazon/aws/rds/example_rds_instance.py +++ b/tests/system/providers/amazon/aws/rds/example_rds_instance.py @@ -17,7 +17,6 @@ # under the License. from datetime import datetime -from os import getenv from airflow import DAG from airflow.models.baseoperator import chain @@ -30,11 +29,11 @@ ENV_ID = set_env_id() DAG_ID = "example_rds_instance" -RDS_DB_IDENTIFIER = getenv("RDS_DB_IDENTIFIER", "database-identifier") -RDS_USERNAME = getenv("RDS_USERNAME", "database_username") +RDS_DB_IDENTIFIER = f'{ENV_ID}-database' +RDS_USERNAME = 'database_username' # NEVER store your production password in plaintext in a DAG like this. # Use Airflow Secrets or a secret manager for this in production. -RDS_PASSWORD = getenv("RDS_PASSWORD", "database_password") +RDS_PASSWORD = 'database_password' with DAG( dag_id=DAG_ID, From e68d304d9438338416d35f4e8bda32eb902a1ec5 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Wed, 15 Jun 2022 14:17:03 +0200 Subject: [PATCH 13/14] Refactor example_dms DAG working with new RDS operators --- .../amazon/aws/example_dags/example_dms.py | 115 ++++++++---------- 1 file changed, 51 insertions(+), 64 deletions(-) diff --git a/airflow/providers/amazon/aws/example_dags/example_dms.py b/airflow/providers/amazon/aws/example_dags/example_dms.py index caffe44353fda..d897a44b2544f 100644 --- a/airflow/providers/amazon/aws/example_dags/example_dms.py +++ b/airflow/providers/amazon/aws/example_dags/example_dms.py @@ -38,6 +38,10 @@ DmsStartTaskOperator, DmsStopTaskOperator, ) +from airflow.providers.amazon.aws.operators.rds import ( + RdsCreateDbInstanceOperator, + RdsDeleteDbInstanceOperator, +) from airflow.providers.amazon.aws.sensors.dms import DmsTaskBaseSensor, DmsTaskCompletedSensor S3_BUCKET = os.getenv('S3_BUCKET', 's3_bucket_name') @@ -109,29 +113,20 @@ } -def _create_rds_instance(): - print('Creating RDS Instance.') - +def _get_rds_instance_endpoint(): + print('Retrieving RDS instance endpoint.') rds_client = boto3.client('rds') - rds_client.create_db_instance( - DBName=RDS_DB_NAME, - DBInstanceIdentifier=RDS_INSTANCE_NAME, - AllocatedStorage=20, - DBInstanceClass='db.t3.micro', - Engine=RDS_ENGINE, - MasterUsername=RDS_USERNAME, - MasterUserPassword=RDS_PASSWORD, - ) - - rds_client.get_waiter('db_instance_available').wait(DBInstanceIdentifier=RDS_INSTANCE_NAME) response = rds_client.describe_db_instances(DBInstanceIdentifier=RDS_INSTANCE_NAME) - return response['DBInstances'][0]['Endpoint'] + rds_instance_endpoint = response['DBInstances'][0]['Endpoint'] + return rds_instance_endpoint -def _create_rds_table(rds_endpoint): - print('Creating table.') +@task +def create_sample_table(): + print('Creating sample table.') + rds_endpoint = _get_rds_instance_endpoint() hostname = rds_endpoint['Address'] port = rds_endpoint['Port'] rds_url = f'{RDS_PROTOCOL}://{RDS_USERNAME}:{RDS_PASSWORD}@{hostname}:{port}/{RDS_DB_NAME}' @@ -154,7 +149,13 @@ def _create_rds_table(rds_endpoint): connection.execute(table.select()) -def _create_dms_replication_instance(ti, dms_client): +@task +def create_dms_assets(): + print('Creating DMS assets.') + ti = get_current_context()['ti'] + dms_client = boto3.client('dms') + rds_instance_endpoint = _get_rds_instance_endpoint() + print('Creating replication instance.') instance_arn = dms_client.create_replication_instance( ReplicationInstanceIdentifier=DMS_REPLICATION_INSTANCE_NAME, @@ -162,10 +163,7 @@ def _create_dms_replication_instance(ti, dms_client): )['ReplicationInstance']['ReplicationInstanceArn'] ti.xcom_push(key='replication_instance_arn', value=instance_arn) - return instance_arn - -def _create_dms_endpoints(ti, dms_client, rds_instance_endpoint): print('Creating DMS source endpoint.') source_endpoint_arn = dms_client.create_endpoint( EndpointIdentifier=SOURCE_ENDPOINT_IDENTIFIER, @@ -194,28 +192,16 @@ def _create_dms_endpoints(ti, dms_client, rds_instance_endpoint): ti.xcom_push(key='source_endpoint_arn', value=source_endpoint_arn) ti.xcom_push(key='target_endpoint_arn', value=target_endpoint_arn) - -def _await_setup_assets(dms_client, instance_arn): - print("Awaiting asset provisioning.") + print("Awaiting replication instance provisioning.") dms_client.get_waiter('replication_instance_available').wait( Filters=[{'Name': 'replication-instance-arn', 'Values': [instance_arn]}] ) -def _delete_rds_instance(): - print('Deleting RDS Instance.') - - rds_client = boto3.client('rds') - rds_client.delete_db_instance( - DBInstanceIdentifier=RDS_INSTANCE_NAME, - SkipFinalSnapshot=True, - ) - - rds_client.get_waiter('db_instance_deleted').wait(DBInstanceIdentifier=RDS_INSTANCE_NAME) - - -def _delete_dms_assets(dms_client): +@task(trigger_rule='all_done') +def delete_dms_assets(): ti = get_current_context()['ti'] + dms_client = boto3.client('dms') replication_instance_arn = ti.xcom_pull(key='replication_instance_arn') source_arn = ti.xcom_pull(key='source_endpoint_arn') target_arn = ti.xcom_pull(key='target_endpoint_arn') @@ -225,13 +211,10 @@ def _delete_dms_assets(dms_client): dms_client.delete_endpoint(EndpointArn=source_arn) dms_client.delete_endpoint(EndpointArn=target_arn) - -def _await_all_teardowns(dms_client): - print('Awaiting tear-down.') + print('Awaiting DMS assets tear-down.') dms_client.get_waiter('replication_instance_deleted').wait( Filters=[{'Name': 'replication-instance-id', 'Values': [DMS_REPLICATION_INSTANCE_NAME]}] ) - dms_client.get_waiter('endpoint_deleted').wait( Filters=[ { @@ -242,27 +225,6 @@ def _await_all_teardowns(dms_client): ) -@task -def set_up(): - ti = get_current_context()['ti'] - dms_client = boto3.client('dms') - - rds_instance_endpoint = _create_rds_instance() - _create_rds_table(rds_instance_endpoint) - instance_arn = _create_dms_replication_instance(ti, dms_client) - _create_dms_endpoints(ti, dms_client, rds_instance_endpoint) - _await_setup_assets(dms_client, instance_arn) - - -@task(trigger_rule='all_done') -def clean_up(): - dms_client = boto3.client('dms') - - _delete_rds_instance() - _delete_dms_assets(dms_client) - _await_all_teardowns(dms_client) - - with DAG( dag_id='example_dms', schedule_interval=None, @@ -271,6 +233,19 @@ def clean_up(): catchup=False, ) as dag: + create_db_instance = RdsCreateDbInstanceOperator( + task_id="create_db_instance", + db_instance_identifier=RDS_INSTANCE_NAME, + db_instance_class='db.t3.micro', + engine=RDS_ENGINE, + rds_kwargs={ + "DBName": RDS_DB_NAME, + "AllocatedStorage": 20, + "MasterUsername": RDS_USERNAME, + "MasterUserPassword": RDS_PASSWORD, + }, + ) + # [START howto_operator_dms_create_task] create_task = DmsCreateTaskOperator( task_id='create_task', @@ -334,8 +309,19 @@ def clean_up(): ) # [END howto_operator_dms_delete_task] + delete_db_instance = RdsDeleteDbInstanceOperator( + task_id='delete_db_instance', + db_instance_identifier=RDS_INSTANCE_NAME, + rds_kwargs={ + "SkipFinalSnapshot": True, + }, + trigger_rule='all_done', + ) + chain( - set_up() + create_db_instance + >> create_sample_table() + >> create_dms_assets() >> create_task >> start_task >> describe_tasks @@ -343,5 +329,6 @@ def clean_up(): >> stop_task >> await_task_stop >> delete_task - >> clean_up() + >> delete_dms_assets() + >> delete_db_instance ) From 206e884512871118b4047c0e5292ff73985ecd86 Mon Sep 17 00:00:00 2001 From: Eugene Karimov Date: Mon, 27 Jun 2022 10:31:07 +0200 Subject: [PATCH 14/14] Minor fixes in RDS Create-Delete operators and DMS example DAG --- .../amazon/aws/example_dags/example_dms.py | 24 +++++++++---------- airflow/providers/amazon/aws/operators/rds.py | 10 ++++---- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/airflow/providers/amazon/aws/example_dags/example_dms.py b/airflow/providers/amazon/aws/example_dags/example_dms.py index d897a44b2544f..46e97d92a50d1 100644 --- a/airflow/providers/amazon/aws/example_dags/example_dms.py +++ b/airflow/providers/amazon/aws/example_dags/example_dms.py @@ -319,16 +319,16 @@ def delete_dms_assets(): ) chain( - create_db_instance - >> create_sample_table() - >> create_dms_assets() - >> create_task - >> start_task - >> describe_tasks - >> await_task_start - >> stop_task - >> await_task_stop - >> delete_task - >> delete_dms_assets() - >> delete_db_instance + create_db_instance, + create_sample_table(), + create_dms_assets(), + create_task, + start_task, + describe_tasks, + await_task_start, + stop_task, + await_task_stop, + delete_task, + delete_dms_assets(), + delete_db_instance, ) diff --git a/airflow/providers/amazon/aws/operators/rds.py b/airflow/providers/amazon/aws/operators/rds.py index e4fbfbe161760..fe38bfed69548 100644 --- a/airflow/providers/amazon/aws/operators/rds.py +++ b/airflow/providers/amazon/aws/operators/rds.py @@ -566,7 +566,8 @@ class RdsCreateDbInstanceOperator(RdsBaseOperator): :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``create_db_instance`` https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.create_db_instance :param aws_conn_id: The Airflow connection used for AWS credentials. - :param wait_for_completion: Whether or not wait for creation of the DB instance complete. (default: True) + :param wait_for_completion: Whether or not wait for creation of the DB instance to + complete. (default: True) """ def __init__( @@ -589,7 +590,7 @@ def __init__( self.wait_for_completion = wait_for_completion def execute(self, context: 'Context') -> str: - self.log.info(f"Creating new DB instance {self.db_instance_identifier}") + self.log.info("Creating new DB instance %s", self.db_instance_identifier) create_db_instance = self.hook.conn.create_db_instance( DBInstanceIdentifier=self.db_instance_identifier, @@ -618,7 +619,8 @@ class RdsDeleteDbInstanceOperator(RdsBaseOperator): :param rds_kwargs: Named arguments to pass to boto3 RDS client function ``delete_db_instance`` https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds.html#RDS.Client.delete_db_instance :param aws_conn_id: The Airflow connection used for AWS credentials. - :param wait_for_completion: Whether or not wait for deletion of the DB instance complete. (default: True) + :param wait_for_completion: Whether or not wait for deletion of the DB instance to + complete. (default: True) """ def __init__( @@ -636,7 +638,7 @@ def __init__( self.wait_for_completion = wait_for_completion def execute(self, context: 'Context') -> str: - self.log.info(f"Deleting DB instance {self.db_instance_identifier}") + self.log.info("Deleting DB instance %s", self.db_instance_identifier) delete_db_instance = self.hook.conn.delete_db_instance( DBInstanceIdentifier=self.db_instance_identifier,