diff --git a/airflow/providers/common/io/xcom/backend.py b/airflow/providers/common/io/xcom/backend.py index 0bb5dd286f33f..6e995c30e1a24 100644 --- a/airflow/providers/common/io/xcom/backend.py +++ b/airflow/providers/common/io/xcom/backend.py @@ -39,7 +39,10 @@ def _is_relative_to(o: ObjectStoragePath, other: ObjectStoragePath) -> bool: - """This is a port of the pathlib.Path.is_relative_to method. It is not available in python 3.8.""" + """Return whether or not this path is relative to the other path. + + This is a port of the pathlib.Path.is_relative_to method. It is not available in python 3.8. + """ if hasattr(o, "is_relative_to"): return o.is_relative_to(other) @@ -51,7 +54,7 @@ def _is_relative_to(o: ObjectStoragePath, other: ObjectStoragePath) -> bool: def _get_compression_suffix(compression: str) -> str: - """This returns the compression suffix for the given compression. + """Return the compression suffix for the given compression. :raises ValueError: if the compression is not supported """ @@ -73,7 +76,7 @@ class XComObjectStoreBackend(BaseXCom): @staticmethod def _get_key(data: str) -> str: - """This gets the key from the url and normalizes it to be relative to the configured path. + """Get the key from the url and normalizes it to be relative to the configured path. :raises ValueError: if the key is not relative to the configured path :raises TypeError: if the url is not a valid url or cannot be split diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index bc3bd902095ef..10390e7e88134 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -196,8 +196,7 @@ def __init__( super().__init__(databricks_conn_id, timeout_seconds, retry_limit, retry_delay, retry_args, caller) def create_job(self, json: dict) -> int: - """ - Utility function to call the ``api/2.1/jobs/create`` endpoint. + """Call the ``api/2.1/jobs/create`` endpoint. :param json: The data used in the body of the request to the ``create`` endpoint. :return: the job_id as an int @@ -206,8 +205,7 @@ def create_job(self, json: dict) -> int: return response["job_id"] def reset_job(self, job_id: str, json: dict) -> None: - """ - Utility function to call the ``api/2.1/jobs/reset`` endpoint. + """Call the ``api/2.1/jobs/reset`` endpoint. :param json: The data used in the new_settings of the request to the ``reset`` endpoint. """ diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py index 4fd0f839d4a7e..79d6b74704952 100644 --- a/airflow/providers/databricks/operators/databricks.py +++ b/airflow/providers/databricks/operators/databricks.py @@ -257,7 +257,7 @@ def __init__( databricks_retry_args: dict[Any, Any] | None = None, **kwargs, ) -> None: - """Creates a new ``DatabricksCreateJobsOperator``.""" + """Create a new ``DatabricksCreateJobsOperator``.""" super().__init__(**kwargs) self.json = json or {} self.databricks_conn_id = databricks_conn_id diff --git a/airflow/providers/hashicorp/_internal_client/vault_client.py b/airflow/providers/hashicorp/_internal_client/vault_client.py index e170eda7870a6..aa2991f736aa6 100644 --- a/airflow/providers/hashicorp/_internal_client/vault_client.py +++ b/airflow/providers/hashicorp/_internal_client/vault_client.py @@ -388,7 +388,7 @@ def get_secret(self, secret_path: str, secret_version: int | None = None) -> dic def get_secret_metadata(self, secret_path: str) -> dict | None: """ - Reads secret metadata (including versions) from the engine. It is only valid for KV version 2. + Read secret metadata (including versions) from the engine. It is only valid for KV version 2. :param secret_path: The path of the secret. :return: secret metadata. This is a Dict containing metadata for the secret. @@ -410,7 +410,7 @@ def get_secret_including_metadata( self, secret_path: str, secret_version: int | None = None ) -> dict | None: """ - Reads secret including metadata. It is only valid for KV version 2. + Read secret including metadata. It is only valid for KV version 2. See https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for details. @@ -444,7 +444,7 @@ def create_or_update_secret( self, secret_path: str, secret: dict, method: str | None = None, cas: int | None = None ) -> Response: """ - Creates or updates secret. + Create or updates secret. :param secret_path: The path of the secret. :param secret: Secret to create or update for the path specified diff --git a/airflow/providers/hashicorp/hooks/vault.py b/airflow/providers/hashicorp/hooks/vault.py index 4edf616652e12..115cdcf5fb95e 100644 --- a/airflow/providers/hashicorp/hooks/vault.py +++ b/airflow/providers/hashicorp/hooks/vault.py @@ -290,7 +290,7 @@ def _get_radius_parameters_from_connection( def get_conn(self) -> hvac.Client: """ - Retrieves connection to Vault. + Retrieve connection to Vault. :return: connection used. """ @@ -313,7 +313,7 @@ def get_secret(self, secret_path: str, secret_version: int | None = None) -> dic def get_secret_metadata(self, secret_path: str) -> dict | None: """ - Reads secret metadata (including versions) from the engine. It is only valid for KV version 2. + Read secret metadata (including versions) from the engine. It is only valid for KV version 2. :param secret_path: Path to read from :return: secret metadata. This is a Dict containing metadata for the secret. @@ -327,7 +327,7 @@ def get_secret_including_metadata( self, secret_path: str, secret_version: int | None = None ) -> dict | None: """ - Reads secret including metadata. It is only valid for KV version 2. + Read secret including metadata. It is only valid for KV version 2. See https://hvac.readthedocs.io/en/stable/usage/secrets_engines/kv_v2.html for details. @@ -345,7 +345,7 @@ def create_or_update_secret( self, secret_path: str, secret: dict, method: str | None = None, cas: int | None = None ) -> Response: """ - Creates or updates secret. + Create or updates secret. :param secret_path: Path to read from :param secret: Secret to create or update for the path specified @@ -368,7 +368,7 @@ def create_or_update_secret( @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import BooleanField, IntegerField, StringField @@ -405,7 +405,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["extra"], "relabeling": {}, diff --git a/airflow/providers/imap/hooks/imap.py b/airflow/providers/imap/hooks/imap.py index 1ff2c7154b7f8..f67b81e862412 100644 --- a/airflow/providers/imap/hooks/imap.py +++ b/airflow/providers/imap/hooks/imap.py @@ -120,7 +120,7 @@ def has_mail_attachment( self, name: str, *, check_regex: bool = False, mail_folder: str = "INBOX", mail_filter: str = "All" ) -> bool: """ - Checks the mail folder for mails containing attachments with the given name. + Check the mail folder for mails containing attachments with the given name. :param name: The name of the attachment that will be searched for. :param check_regex: Checks the name for a regular expression. @@ -145,7 +145,7 @@ def retrieve_mail_attachments( not_found_mode: str = "raise", ) -> list[tuple]: """ - Retrieves mail's attachments in the mail folder by its name. + Retrieve mail's attachments in the mail folder by its name. :param name: The name of the attachment that will be downloaded. :param check_regex: Checks the name for a regular expression. @@ -181,7 +181,7 @@ def download_mail_attachments( not_found_mode: str = "raise", ) -> None: """ - Downloads mail's attachments in the mail folder by its name to the local directory. + Download mail's attachments in the mail folder by its name to the local directory. :param name: The name of the attachment that will be downloaded. :param local_output_directory: The output directory on the local machine @@ -304,7 +304,7 @@ def __init__(self, mail_body: str) -> None: def has_attachments(self) -> bool: """ - Checks the mail for a attachments. + Check the mail for a attachments. :returns: True if it has attachments and False if not. """ @@ -314,7 +314,7 @@ def get_attachments_by_name( self, name: str, check_regex: bool, find_first: bool = False ) -> list[tuple[Any, Any]]: """ - Gets all attachments by name for the mail. + Get all attachments by name for the mail. :param name: The name of the attachment to look for. :param check_regex: Checks the name for a regular expression. @@ -356,7 +356,7 @@ def __init__(self, part: Any) -> None: def is_attachment(self) -> bool: """ - Checks if the part is a valid mail attachment. + Check if the part is a valid mail attachment. :returns: True if it is an attachment and False if not. """ @@ -364,7 +364,7 @@ def is_attachment(self) -> bool: def has_matching_name(self, name: str) -> tuple[Any, Any] | None: """ - Checks if the given name matches the part's name. + Check if the given name matches the part's name. :param name: The name to look for. :returns: True if it matches the name (including regular expression). @@ -373,7 +373,7 @@ def has_matching_name(self, name: str) -> tuple[Any, Any] | None: def has_equal_name(self, name: str) -> bool: """ - Checks if the given name is equal to the part's name. + Check if the given name is equal to the part's name. :param name: The name to look for. :returns: True if it is equal to the given name. @@ -382,7 +382,7 @@ def has_equal_name(self, name: str) -> bool: def get_file(self) -> tuple: """ - Gets the file including name and payload. + Get the file including name and payload. :returns: the part's name and payload. """ diff --git a/airflow/providers/mongo/hooks/mongo.py b/airflow/providers/mongo/hooks/mongo.py index e776f0e3eff17..7a968ca9b21e1 100644 --- a/airflow/providers/mongo/hooks/mongo.py +++ b/airflow/providers/mongo/hooks/mongo.py @@ -131,7 +131,7 @@ def __exit__( self.client = None def get_conn(self) -> MongoClient: - """Fetches PyMongo Client.""" + """Fetch PyMongo Client.""" if self.client is not None: return self.client @@ -167,7 +167,7 @@ def get_collection( self, mongo_collection: str, mongo_db: str | None = None ) -> pymongo.collection.Collection: """ - Fetches a mongo collection object for querying. + Fetch a mongo collection object for querying. Uses connection schema as DB unless specified. """ @@ -180,7 +180,7 @@ def aggregate( self, mongo_collection: str, aggregate_query: list, mongo_db: str | None = None, **kwargs ) -> pymongo.command_cursor.CommandCursor: """ - Runs an aggregation pipeline and returns the results. + Run an aggregation pipeline and returns the results. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.aggregate https://pymongo.readthedocs.io/en/stable/examples/aggregation.html @@ -223,7 +223,7 @@ def find( **kwargs, ) -> pymongo.cursor.Cursor | Any | None: """ - Runs a mongo find query and returns the results. + Run a mongo find query and returns the results. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find """ @@ -238,7 +238,7 @@ def insert_one( self, mongo_collection: str, doc: dict, mongo_db: str | None = None, **kwargs ) -> pymongo.results.InsertOneResult: """ - Inserts a single document into a mongo collection. + Insert a single document into a mongo collection. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_one """ @@ -250,7 +250,7 @@ def insert_many( self, mongo_collection: str, docs: Iterable[dict], mongo_db: str | None = None, **kwargs ) -> pymongo.results.InsertManyResult: """ - Inserts many docs into a mongo collection. + Insert many docs into a mongo collection. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.insert_many """ @@ -267,7 +267,7 @@ def update_one( **kwargs, ) -> pymongo.results.UpdateResult: """ - Updates a single document in a mongo collection. + Update a single document in a mongo collection. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.update_one @@ -291,7 +291,7 @@ def update_many( **kwargs, ) -> pymongo.results.UpdateResult: """ - Updates one or more documents in a mongo collection. + Update one or more documents in a mongo collection. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.update_many @@ -314,7 +314,7 @@ def replace_one( **kwargs, ) -> pymongo.results.UpdateResult: """ - Replaces a single document in a mongo collection. + Replace a single document in a mongo collection. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.replace_one @@ -347,7 +347,7 @@ def replace_many( **kwargs, ) -> pymongo.results.BulkWriteResult: """ - Replaces many documents in a mongo collection. + Replace many documents in a mongo collection. Uses bulk_write with multiple ReplaceOne operations https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write @@ -384,7 +384,7 @@ def delete_one( self, mongo_collection: str, filter_doc: dict, mongo_db: str | None = None, **kwargs ) -> pymongo.results.DeleteResult: """ - Deletes a single document in a mongo collection. + Delete a single document in a mongo collection. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.delete_one @@ -401,7 +401,7 @@ def delete_many( self, mongo_collection: str, filter_doc: dict, mongo_db: str | None = None, **kwargs ) -> pymongo.results.DeleteResult: """ - Deletes one or more documents in a mongo collection. + Delete one or more documents in a mongo collection. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.delete_many @@ -423,7 +423,7 @@ def distinct( **kwargs, ) -> list[Any]: """ - Returns a list of distinct values for the given key across a collection. + Return a list of distinct values for the given key across a collection. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.distinct diff --git a/airflow/providers/mysql/hooks/mysql.py b/airflow/providers/mysql/hooks/mysql.py index 00ff92b62cf01..8176653fdd0a8 100644 --- a/airflow/providers/mysql/hooks/mysql.py +++ b/airflow/providers/mysql/hooks/mysql.py @@ -173,7 +173,7 @@ def _get_conn_config_mysql_connector_python(self, conn: Connection) -> dict: def get_conn(self) -> MySQLConnectionTypes: """ - Connection to a MySQL database. + Get connection to a MySQL database. Establishes a connection to a mysql database by extracting the connection configuration from the Airflow connection. @@ -268,7 +268,7 @@ def bulk_load_custom( self, table: str, tmp_file: str, duplicate_key_handling: str = "IGNORE", extra_options: str = "" ) -> None: """ - A more configurable way to load local data from a file into the database. + Load local data from a file into the database in a more configurable way. .. warning:: According to the mysql docs using this function is a `security risk `_. @@ -299,7 +299,7 @@ def bulk_load_custom( conn.close() # type: ignore[misc] def get_openlineage_database_info(self, connection): - """Returns MySQL specific information for OpenLineage.""" + """Return MySQL specific information for OpenLineage.""" from airflow.providers.openlineage.sqlparser import DatabaseInfo return DatabaseInfo( @@ -316,7 +316,7 @@ def get_openlineage_database_info(self, connection): ) def get_openlineage_database_dialect(self, _): - """Returns database dialect.""" + """Return database dialect.""" return "mysql" def get_openlineage_default_schema(self): diff --git a/airflow/providers/mysql/transfers/s3_to_mysql.py b/airflow/providers/mysql/transfers/s3_to_mysql.py index ab5b5c9b63c6c..98d96f7c9c6c4 100644 --- a/airflow/providers/mysql/transfers/s3_to_mysql.py +++ b/airflow/providers/mysql/transfers/s3_to_mysql.py @@ -75,7 +75,7 @@ def __init__( def execute(self, context: Context) -> None: """ - Executes the transfer operation from S3 to MySQL. + Execute the transfer operation from S3 to MySQL. :param context: The context that is being provided when executing. """ diff --git a/airflow/providers/neo4j/hooks/neo4j.py b/airflow/providers/neo4j/hooks/neo4j.py index c1e1b8639325f..59c00e08aa5fc 100644 --- a/airflow/providers/neo4j/hooks/neo4j.py +++ b/airflow/providers/neo4j/hooks/neo4j.py @@ -50,7 +50,7 @@ def __init__(self, conn_id: str = default_conn_name, *args, **kwargs) -> None: self.client: Driver | None = None def get_conn(self) -> Driver: - """Function that initiates a new Neo4j connection with username, password and database schema.""" + """Initiate a new Neo4j connection with username, password and database schema.""" if self.client is not None: return self.client @@ -67,7 +67,7 @@ def get_conn(self) -> Driver: def get_client(self, conn: Connection, encrypted: bool, uri: str) -> Driver: """ - Function to determine that relevant driver based on extras. + Determine that relevant driver based on extras. :param conn: Connection object. :param encrypted: boolean if encrypted connection or not. @@ -111,7 +111,7 @@ def get_uri(self, conn: Connection) -> str: def run(self, query) -> list[Any]: """ - Function to create a neo4j session and execute the query in the session. + Create a neo4j session and execute the query in the session. :param query: Neo4j query :return: Result diff --git a/airflow/providers/openfaas/hooks/openfaas.py b/airflow/providers/openfaas/hooks/openfaas.py index b1ae643707163..492128946710c 100644 --- a/airflow/providers/openfaas/hooks/openfaas.py +++ b/airflow/providers/openfaas/hooks/openfaas.py @@ -68,7 +68,7 @@ def deploy_function(self, overwrite_function_if_exist: bool, body: dict[str, Any self.log.info("Function deployed %s", self.function_name) def invoke_async_function(self, body: dict[str, Any]) -> None: - """Invoking function asynchronously.""" + """Invoke function asynchronously.""" url = self.get_conn().host + self.INVOKE_ASYNC_FUNCTION + self.function_name self.log.info("Invoking function asynchronously %s", url) response = requests.post(url, body) @@ -79,7 +79,7 @@ def invoke_async_function(self, body: dict[str, Any]) -> None: raise AirflowException("failed to invoke function") def invoke_function(self, body: dict[str, Any]) -> None: - """Invoking function synchronously, will block until function completes and returns.""" + """Invoke function synchronously. This will block until function completes and returns.""" url = self.get_conn().host + self.INVOKE_FUNCTION + self.function_name self.log.info("Invoking function synchronously %s", url) response = requests.post(url, body) diff --git a/airflow/providers/opensearch/hooks/opensearch.py b/airflow/providers/opensearch/hooks/opensearch.py index d0bdc3ff868d9..793741d1507df 100644 --- a/airflow/providers/opensearch/hooks/opensearch.py +++ b/airflow/providers/opensearch/hooks/opensearch.py @@ -107,7 +107,7 @@ def delete(self, index_name: str, query: dict | None = None, doc_id: int | None @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom UI field behaviour for OpenSearch Connection.""" + """Return custom UI field behaviour for OpenSearch Connection.""" return { "hidden_fields": ["schema"], "relabeling": { diff --git a/airflow/providers/opensearch/operators/opensearch.py b/airflow/providers/opensearch/operators/opensearch.py index f362271ae31da..cc12b6e8b0fde 100644 --- a/airflow/providers/opensearch/operators/opensearch.py +++ b/airflow/providers/opensearch/operators/opensearch.py @@ -32,7 +32,7 @@ class OpenSearchQueryOperator(BaseOperator): """ - Runs a query search against a given index on an OpenSearch cluster and returns results. + Run a query search against a given index on an OpenSearch cluster and returns results. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -66,11 +66,11 @@ def __init__( @cached_property def hook(self) -> OpenSearchHook: - """Gets an instance of an OpenSearchHook.""" + """Get an instance of an OpenSearchHook.""" return OpenSearchHook(open_search_conn_id=self.opensearch_conn_id, log_query=self.log_query) def execute(self, context: Context) -> Any: - """Executes a search against a given index or a Search object on an OpenSearch Cluster.""" + """Execute a search against a given index or a Search object on an OpenSearch Cluster.""" result = None if self.query is not None: @@ -123,11 +123,11 @@ def __init__( @cached_property def hook(self) -> OpenSearchHook: - """Gets an instance of an OpenSearchHook.""" + """Get an instance of an OpenSearchHook.""" return OpenSearchHook(open_search_conn_id=self.opensearch_conn_id, log_query=False) def execute(self, context: Context) -> Any: - """Creates an index on an OpenSearch cluster.""" + """Create an index on an OpenSearch cluster.""" try: self.hook.client.indices.create(index=self.index_name, body=self.index_body) except OpenSearchException as e: @@ -168,11 +168,11 @@ def __init__( @cached_property def hook(self) -> OpenSearchHook: - """Gets an instance of an OpenSearchHook.""" + """Get an instance of an OpenSearchHook.""" return OpenSearchHook(open_search_conn_id=self.opensearch_conn_id, log_query=False) def execute(self, context: Context) -> Any: - """Saves a document to a given index on an OpenSearch cluster.""" + """Save a document to a given index on an OpenSearch cluster.""" if self.doc_class is not None: try: doc = self.doc_class.init(using=self.hook.client) diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py index de78ab726ff71..541d0eec721a8 100644 --- a/airflow/providers/oracle/hooks/oracle.py +++ b/airflow/providers/oracle/hooks/oracle.py @@ -321,7 +321,7 @@ def bulk_insert_rows( target_fields: list[str] | None = None, commit_every: int = 5000, ): - """A performant bulk insert for Oracle DB. + """Perform bulk inserts efficiently for Oracle DB. This uses prepared statements via `executemany()`. For best performance, pass in `rows` as an iterator. diff --git a/airflow/providers/pagerduty/hooks/pagerduty.py b/airflow/providers/pagerduty/hooks/pagerduty.py index d2a67828dc23e..244314fa3461e 100644 --- a/airflow/providers/pagerduty/hooks/pagerduty.py +++ b/airflow/providers/pagerduty/hooks/pagerduty.py @@ -52,7 +52,7 @@ class PagerdutyHook(BaseHook): @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["port", "login", "schema", "host", "extra"], "relabeling": { @@ -62,7 +62,7 @@ def get_ui_field_behaviour(cls) -> dict[str, Any]: @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget from flask_babel import lazy_gettext from wtforms import PasswordField @@ -98,7 +98,7 @@ def __init__(self, token: str | None = None, pagerduty_conn_id: str | None = Non def get_session(self) -> pdpyras.APISession: """ - Returns `pdpyras.APISession` for use with sending or receiving data through the PagerDuty REST API. + Return `pdpyras.APISession` for use with sending or receiving data through the PagerDuty REST API. The `pdpyras` library supplies a class `pdpyras.APISession` extending `requests.Session` from the Requests HTTP library. diff --git a/airflow/providers/pagerduty/hooks/pagerduty_events.py b/airflow/providers/pagerduty/hooks/pagerduty_events.py index 97a3d6912e564..dbbfcfe4c6b32 100644 --- a/airflow/providers/pagerduty/hooks/pagerduty_events.py +++ b/airflow/providers/pagerduty/hooks/pagerduty_events.py @@ -49,7 +49,7 @@ class PagerdutyEventsHook(BaseHook): @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["port", "login", "schema", "host", "extra"], "relabeling": { diff --git a/airflow/providers/papermill/hooks/kernel.py b/airflow/providers/papermill/hooks/kernel.py index 9e6bb12a80e93..60c4edc6fd7f6 100644 --- a/airflow/providers/papermill/hooks/kernel.py +++ b/airflow/providers/papermill/hooks/kernel.py @@ -87,7 +87,7 @@ def get_conn(self) -> KernelConnection: def register_remote_kernel_engine(): - """Registers ``RemoteKernelEngine`` papermill engine.""" + """Register ``RemoteKernelEngine`` papermill engine.""" from papermill.engines import papermill_engines papermill_engines.register(REMOTE_KERNEL_ENGINE, RemoteKernelEngine) @@ -142,7 +142,7 @@ def execute_managed_notebook( execution_timeout=None, **kwargs, ): - """Performs the actual execution of the parameterized notebook locally.""" + """Perform the actual execution of the parameterized notebook locally.""" km = RemoteKernelManager() km.ip = kwargs["kernel_ip"] km.shell_port = kwargs["kernel_shell_port"] diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py index 6b076ff6baa3c..0afb7740fe58f 100644 --- a/airflow/providers/postgres/hooks/postgres.py +++ b/airflow/providers/postgres/hooks/postgres.py @@ -126,7 +126,7 @@ def _get_cursor(self, raw_cursor: str) -> CursorType: raise ValueError(f"Invalid cursor passed {_cursor}. Valid options are: {valid_cursors}") def get_conn(self) -> connection: - """Establishes a connection to a postgres database.""" + """Establish a connection to a postgres database.""" conn_id = getattr(self, self.conn_name_attr) conn = deepcopy(self.connection or self.get_connection(conn_id)) @@ -162,7 +162,7 @@ def get_conn(self) -> connection: return self.conn def copy_expert(self, sql: str, filename: str) -> None: - """Executes SQL using psycopg2's ``copy_expert`` method. + """Execute SQL using psycopg2's ``copy_expert`` method. Necessary to execute COPY command without access to a superuser. @@ -193,11 +193,11 @@ def get_uri(self) -> str: return uri def bulk_load(self, table: str, tmp_file: str) -> None: - """Loads a tab-delimited file into a database table.""" + """Load a tab-delimited file into a database table.""" self.copy_expert(f"COPY {table} FROM STDIN", tmp_file) def bulk_dump(self, table: str, tmp_file: str) -> None: - """Dumps a database table into a tab-delimited file.""" + """Dump a database table into a tab-delimited file.""" self.copy_expert(f"COPY {table} TO STDOUT", tmp_file) @staticmethod @@ -326,7 +326,7 @@ def _generate_insert_sql( return sql def get_openlineage_database_info(self, connection) -> DatabaseInfo: - """Returns Postgres/Redshift specific information for OpenLineage.""" + """Return Postgres/Redshift specific information for OpenLineage.""" from airflow.providers.openlineage.sqlparser import DatabaseInfo is_redshift = connection.extra_dejson.get("redshift", False) @@ -363,11 +363,11 @@ def _get_openlineage_redshift_authority_part(self, connection) -> str: return f"{cluster_identifier}.{region_name}:{port}" def get_openlineage_database_dialect(self, connection) -> str: - """Returns postgres/redshift dialect.""" + """Return postgres/redshift dialect.""" return "redshift" if connection.extra_dejson.get("redshift", False) else "postgres" def get_openlineage_default_schema(self) -> str | None: - """Returns current schema. This is usually changed with ``SEARCH_PATH`` parameter.""" + """Return current schema. This is usually changed with ``SEARCH_PATH`` parameter.""" return self.get_first("SELECT CURRENT_SCHEMA;")[0] @classmethod diff --git a/airflow/providers/presto/hooks/presto.py b/airflow/providers/presto/hooks/presto.py index 611bac37c59ac..bd2436d5828a4 100644 --- a/airflow/providers/presto/hooks/presto.py +++ b/airflow/providers/presto/hooks/presto.py @@ -90,7 +90,7 @@ def __init__(self, *args, **kwargs): self._placeholder: str = "?" def get_conn(self) -> Connection: - """Returns a connection object.""" + """Return a connection object.""" db = self.get_connection(self.presto_conn_id) # type: ignore[attr-defined] extra = db.extra_dejson auth = None @@ -135,7 +135,7 @@ def get_conn(self) -> Connection: return presto_conn def get_isolation_level(self) -> Any: - """Returns an isolation level.""" + """Return an isolation level.""" db = self.get_connection(self.presto_conn_id) # type: ignore[attr-defined] isolation_level = db.extra_dejson.get("isolation_level", "AUTOCOMMIT").upper() return getattr(IsolationLevel, isolation_level, IsolationLevel.AUTOCOMMIT) @@ -189,7 +189,7 @@ def insert_rows( **kwargs, ) -> None: """ - A generic way to insert a set of tuples into a table. + Insert a set of tuples into a table. :param table: Name of the target table :param rows: The rows to insert into the table diff --git a/airflow/providers/qdrant/hooks/qdrant.py b/airflow/providers/qdrant/hooks/qdrant.py index 494e4d57ed9d0..31aa0b5e1d428 100644 --- a/airflow/providers/qdrant/hooks/qdrant.py +++ b/airflow/providers/qdrant/hooks/qdrant.py @@ -41,7 +41,7 @@ class QdrantHook(BaseHook): @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import BooleanField, IntegerField, StringField @@ -80,7 +80,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["schema", "login", "extra"], "relabeling": {"password": "API Key"}, diff --git a/airflow/providers/salesforce/hooks/salesforce.py b/airflow/providers/salesforce/hooks/salesforce.py index af3ef1292b437..61678e2a6c81f 100644 --- a/airflow/providers/salesforce/hooks/salesforce.py +++ b/airflow/providers/salesforce/hooks/salesforce.py @@ -93,7 +93,7 @@ def _get_field(self, extras: dict, field_name: str): @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import PasswordField, StringField @@ -116,7 +116,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["schema", "port", "extra", "host"], "relabeling": { @@ -154,7 +154,7 @@ def conn(self) -> api.Salesforce: return conn def get_conn(self) -> api.Salesforce: - """Returns a Salesforce instance. (cached).""" + """Return a Salesforce instance. (cached).""" return self.conn def make_query(self, query: str, include_deleted: bool = False, query_params: dict | None = None) -> dict: diff --git a/airflow/providers/salesforce/operators/bulk.py b/airflow/providers/salesforce/operators/bulk.py index 98f718cb02429..a467601565660 100644 --- a/airflow/providers/salesforce/operators/bulk.py +++ b/airflow/providers/salesforce/operators/bulk.py @@ -81,7 +81,7 @@ def _validate_inputs(self) -> None: def execute(self, context: Context): """ - Makes an HTTP request to Salesforce Bulk API. + Make an HTTP request to Salesforce Bulk API. :param context: The task context during execution. :return: API response if do_xcom_push is True diff --git a/airflow/providers/salesforce/operators/salesforce_apex_rest.py b/airflow/providers/salesforce/operators/salesforce_apex_rest.py index f41c36c728500..8411e2320d240 100644 --- a/airflow/providers/salesforce/operators/salesforce_apex_rest.py +++ b/airflow/providers/salesforce/operators/salesforce_apex_rest.py @@ -56,7 +56,7 @@ def __init__( def execute(self, context: Context) -> dict: """ - Makes an HTTP request to an APEX REST endpoint and pushes results to xcom. + Make an HTTP request to an APEX REST endpoint and pushes results to xcom. :param context: The task context during execution. :return: Apex response diff --git a/airflow/providers/samba/hooks/samba.py b/airflow/providers/samba/hooks/samba.py index e4745be6cefd8..535ec267ccf42 100644 --- a/airflow/providers/samba/hooks/samba.py +++ b/airflow/providers/samba/hooks/samba.py @@ -252,7 +252,7 @@ def push_from_local(self, destination_filepath: str, local_filepath: str): @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["extra"], "relabeling": {"schema": "Share"}, diff --git a/airflow/providers/samba/transfers/gcs_to_samba.py b/airflow/providers/samba/transfers/gcs_to_samba.py index a645c25352b98..da18849430ee5 100644 --- a/airflow/providers/samba/transfers/gcs_to_samba.py +++ b/airflow/providers/samba/transfers/gcs_to_samba.py @@ -176,7 +176,7 @@ def _copy_single_object( source_object: str, destination_path: str, ) -> None: - """Helper function to copy single object.""" + """Copy single object.""" self.log.info( "Executing copy of gs://%s/%s to %s", self.source_bucket, diff --git a/airflow/providers/segment/hooks/segment.py b/airflow/providers/segment/hooks/segment.py index 258716387d603..3d93f51fa8fc7 100644 --- a/airflow/providers/segment/hooks/segment.py +++ b/airflow/providers/segment/hooks/segment.py @@ -80,6 +80,6 @@ def get_conn(self) -> analytics: return analytics def on_error(self, error: str, items: str) -> None: - """Handles error callbacks when using Segment with segment_debug_mode set to True.""" + """Handle error callbacks when using Segment with segment_debug_mode set to True.""" self.log.error("Encountered Segment error: %s with items: %s", error, items) raise AirflowException(f"Segment error: {error}") diff --git a/airflow/providers/telegram/hooks/telegram.py b/airflow/providers/telegram/hooks/telegram.py index 984f18a1bade6..3d67d3bea7fbf 100644 --- a/airflow/providers/telegram/hooks/telegram.py +++ b/airflow/providers/telegram/hooks/telegram.py @@ -71,7 +71,7 @@ def __init__( def get_conn(self) -> telegram.Bot: """ - Returns the telegram bot client. + Return the telegram bot client. :return: telegram bot client """ @@ -79,7 +79,7 @@ def get_conn(self) -> telegram.Bot: def __get_token(self, token: str | None, telegram_conn_id: str | None) -> str: """ - Returns the telegram API token. + Return the telegram API token. :param token: telegram API token :param telegram_conn_id: telegram connection name @@ -100,7 +100,7 @@ def __get_token(self, token: str | None, telegram_conn_id: str | None) -> str: def __get_chat_id(self, chat_id: str | None, telegram_conn_id: str | None) -> str | None: """ - Returns the telegram chat ID for a chat/channel/group. + Return the telegram chat ID for a chat/channel/group. :param chat_id: optional chat ID :param telegram_conn_id: telegram connection name @@ -122,7 +122,7 @@ def __get_chat_id(self, chat_id: str | None, telegram_conn_id: str | None) -> st ) def send_message(self, api_params: dict) -> None: """ - Sends the message to a telegram channel or chat. + Send the message to a telegram channel or chat. :param api_params: params for telegram_instance.send_message. It can also be used to override chat_id """ diff --git a/airflow/providers/telegram/operators/telegram.py b/airflow/providers/telegram/operators/telegram.py index 4c85487e2c0f2..363b3433dc8ef 100644 --- a/airflow/providers/telegram/operators/telegram.py +++ b/airflow/providers/telegram/operators/telegram.py @@ -72,7 +72,7 @@ def __init__( super().__init__(**kwargs) def execute(self, context: Context) -> None: - """Calls the TelegramHook to post the provided Telegram message.""" + """Call the TelegramHook to post the provided Telegram message.""" if self.text: self.telegram_kwargs["text"] = self.text diff --git a/airflow/providers/trino/hooks/trino.py b/airflow/providers/trino/hooks/trino.py index 7547b6bf953ec..9e776361c1478 100644 --- a/airflow/providers/trino/hooks/trino.py +++ b/airflow/providers/trino/hooks/trino.py @@ -94,7 +94,7 @@ def __init__(self, *args, **kwargs): self._placeholder: str = "?" def get_conn(self) -> Connection: - """Returns a connection object.""" + """Return a connection object.""" db = self.get_connection(self.trino_conn_id) # type: ignore[attr-defined] extra = db.extra_dejson auth = None @@ -165,7 +165,7 @@ def get_conn(self) -> Connection: return trino_conn def get_isolation_level(self) -> Any: - """Returns an isolation level.""" + """Return an isolation level.""" db = self.get_connection(self.trino_conn_id) # type: ignore[attr-defined] isolation_level = db.extra_dejson.get("isolation_level", "AUTOCOMMIT").upper() return getattr(IsolationLevel, isolation_level, IsolationLevel.AUTOCOMMIT) @@ -219,7 +219,7 @@ def insert_rows( **kwargs, ) -> None: """ - A generic way to insert a set of tuples into a table. + Insert a set of tuples into a table in a generic way. :param table: Name of the target table :param rows: The rows to insert into the table @@ -250,7 +250,7 @@ def _serialize_cell(cell: Any, conn: Connection | None = None) -> Any: return cell def get_openlineage_database_info(self, connection): - """Returns Trino specific information for OpenLineage.""" + """Return Trino specific information for OpenLineage.""" from airflow.providers.openlineage.sqlparser import DatabaseInfo return DatabaseInfo( @@ -271,9 +271,9 @@ def get_openlineage_database_info(self, connection): ) def get_openlineage_database_dialect(self, _): - """Returns Trino dialect.""" + """Return Trino dialect.""" return "trino" def get_openlineage_default_schema(self): - """Returns Trino default schema.""" + """Return Trino default schema.""" return trino.constants.DEFAULT_SCHEMA diff --git a/airflow/providers/weaviate/hooks/weaviate.py b/airflow/providers/weaviate/hooks/weaviate.py index 13f55fa83a508..649cacfc12bba 100644 --- a/airflow/providers/weaviate/hooks/weaviate.py +++ b/airflow/providers/weaviate/hooks/weaviate.py @@ -85,7 +85,7 @@ def __init__( @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget from flask_babel import lazy_gettext from wtforms import PasswordField @@ -96,7 +96,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["port", "schema"], "relabeling": { @@ -144,7 +144,7 @@ def conn(self) -> WeaviateClient: category=AirflowProviderDeprecationWarning, ) def get_client(self) -> WeaviateClient: - """Returns a Weaviate client.""" + """Return a Weaviate client.""" # Keeping this for backwards compatibility return self.conn @@ -183,7 +183,7 @@ def create_schema(self, schema_json: dict[str, Any] | str) -> None: @staticmethod def _convert_dataframe_to_list(data: list[dict[str, Any]] | pd.DataFrame | None) -> list[dict[str, Any]]: - """Helper function to convert dataframe to list of dicts. + """Convert dataframe to list of dicts. In scenario where Pandas isn't installed and we pass data as a list of dictionaries, importing Pandas will fail, which is invalid. This function handles this scenario. @@ -213,7 +213,7 @@ def get_schema(self, class_name: str | None = None): return client.schema.get(class_name) def delete_classes(self, class_names: list[str] | str, if_error: str = "stop") -> list[str] | None: - """Deletes all or specific classes if class_names are provided. + """Delete all or specific classes if class_names are provided. :param class_names: list of class names to be deleted. :param if_error: define the actions to be taken if there is an error while deleting a class, possible @@ -334,7 +334,7 @@ def _compare_schema_subset(self, subset_object: Any, superset_object: Any) -> bo @staticmethod def _convert_properties_to_dict(classes_objects, key_property: str = "name"): """ - Helper function to convert list of class properties into dict by using a `key_property` as key. + Convert list of class properties into dict by using a `key_property` as key. This is done to avoid class properties comparison as list of properties. @@ -410,7 +410,7 @@ def _process_batch_errors( verbose: bool = True, ) -> None: """ - Helper function to processes the results from insert or delete batch operation and collects any errors. + Process the results from insert or delete batch operation and collects any errors. :param results: Results from the batch operation. :param verbose: Flag to enable verbose logging. @@ -674,7 +674,8 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> bool: return client.data_object.exists(uuid, **kwargs) def _delete_objects(self, uuids: Collection, class_name: str, retry_attempts_per_object: int = 5): - """ + """Delete multiple objects. + Helper function for `create_or_replace_objects()` to delete multiple objects. :param uuids: Collection of uuids. @@ -711,7 +712,7 @@ def _generate_uuids( uuid_column: str | None = None, ) -> tuple[pd.DataFrame, str]: """ - Adds UUIDs to a DataFrame, useful for replace operations where UUIDs must be known before ingestion. + Add UUIDs to a DataFrame, useful for replace operations where UUIDs must be known before ingestion. By default, UUIDs are generated using a custom function if 'uuid_column' is not specified. The function can potentially ingest the same data multiple times with different UUIDs. @@ -762,7 +763,7 @@ def _get_documents_to_uuid_map( offset: int = 0, limit: int = 2000, ) -> dict[str, set]: - """Helper function to get the document to uuid map of existing objects in db. + """Get the document to uuid map of existing objects in db. :param data: A single pandas DataFrame. :param document_column: The name of the property to query. @@ -806,7 +807,7 @@ def _get_documents_to_uuid_map( def _prepare_document_to_uuid_map( data: list[dict], group_key: str, get_value: Callable[[dict], str] ) -> dict[str, set]: - """Helper function to prepare the map of grouped_key to set.""" + """Prepare the map of grouped_key to set.""" grouped_key_to_set: dict = {} for item in data: document_url = item[group_key] diff --git a/pyproject.toml b/pyproject.toml index 584a000c7e14c..a58f05a9446e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1299,9 +1299,6 @@ extend-select = [ "D2", "D3", "D400", - # We add modules that do not follow the rule `First line should be in imperative mood` - # into the `tool.ruff.per-file-ignores`, and should remove it from that list as soon as it follows. - # See: https://github.com/apache/airflow/issues/10742 "D401", "D402", "D403", @@ -1370,38 +1367,6 @@ combine-as-imports = true "tests/providers/qdrant/hooks/test_qdrant.py" = ["E402"] "tests/providers/qdrant/operators/test_qdrant.py" = ["E402"] -# All the modules which do not follow D401 yet, please remove as soon as it becomes compatible -"airflow/providers/common/io/xcom/backend.py" = ["D401"] -"airflow/providers/databricks/hooks/databricks.py" = ["D401"] -"airflow/providers/databricks/operators/databricks.py" = ["D401"] -"airflow/providers/hashicorp/_internal_client/vault_client.py" = ["D401"] -"airflow/providers/hashicorp/hooks/vault.py" = ["D401"] -"airflow/providers/imap/hooks/imap.py" = ["D401"] -"airflow/providers/mongo/hooks/mongo.py" = ["D401"] -"airflow/providers/mysql/hooks/mysql.py" = ["D401"] -"airflow/providers/mysql/transfers/s3_to_mysql.py" = ["D401"] -"airflow/providers/neo4j/hooks/neo4j.py" = ["D401"] -"airflow/providers/openfaas/hooks/openfaas.py" = ["D401"] -"airflow/providers/opensearch/hooks/opensearch.py" = ["D401"] -"airflow/providers/opensearch/operators/opensearch.py" = ["D401"] -"airflow/providers/oracle/hooks/oracle.py" = ["D401"] -"airflow/providers/pagerduty/hooks/pagerduty.py" = ["D401"] -"airflow/providers/pagerduty/hooks/pagerduty_events.py" = ["D401"] -"airflow/providers/papermill/hooks/kernel.py" = ["D401"] -"airflow/providers/postgres/hooks/postgres.py" = ["D401"] -"airflow/providers/presto/hooks/presto.py" = ["D401"] -"airflow/providers/qdrant/hooks/qdrant.py" = ["D401"] -"airflow/providers/salesforce/hooks/salesforce.py" = ["D401"] -"airflow/providers/salesforce/operators/bulk.py" = ["D401"] -"airflow/providers/salesforce/operators/salesforce_apex_rest.py" = ["D401"] -"airflow/providers/samba/hooks/samba.py" = ["D401"] -"airflow/providers/samba/transfers/gcs_to_samba.py" = ["D401"] -"airflow/providers/segment/hooks/segment.py" = ["D401"] -"airflow/providers/telegram/hooks/telegram.py" = ["D401"] -"airflow/providers/telegram/operators/telegram.py" = ["D401"] -"airflow/providers/trino/hooks/trino.py" = ["D401"] -"airflow/providers/weaviate/hooks/weaviate.py" = ["D401"] - [tool.ruff.lint.flake8-tidy-imports] # Ban certain modules from being imported at module level, instead requiring