diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index d27530f7c871d..b1ce26098fb34 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -128,16 +128,24 @@ def write( self.log.exception("Could not verify previous log to append") return False + bucket, key = self.hook.parse_s3_url(remote_log_location) + # Upload the log via the boto3 client directly instead of S3Hook.load_string. The hook's + # upload helpers report the object to the hook lineage collector, which would make task + # logs show up as task outputs in OpenLineage events. Logs are not task data assets. + extra_args = {} + if conf.getboolean("logging", "ENCRYPT_S3_LOGS"): + extra_args["ServerSideEncryption"] = "AES256" + # Default to a single retry attempt because s3 upload failures are # rare but occasionally occur. Multiple retry attempts are unlikely # to help as they usually indicate non-ephemeral errors. for try_num in range(1 + max_retry): try: - self.hook.load_string( - log, - key=remote_log_location, - replace=True, - encrypt=conf.getboolean("logging", "ENCRYPT_S3_LOGS"), + self.hook.get_conn().put_object( + Bucket=bucket, + Key=key, + Body=log.encode("utf-8"), + **extra_args, ) break except Exception: diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 211a213b3b892..01614ff732b52 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -184,6 +184,21 @@ def test_write_existing(self): assert body == b"previous \ntext" + def test_write_does_not_expose_lineage(self, hook_lineage_collector): + # Remote task logs are not task data assets, so uploading them must not add the S3 object + # as a task output in OpenLineage events. + self.subject.write("text", self.remote_log_location) + assert hook_lineage_collector.collected_assets.outputs == [] + assert hook_lineage_collector.collected_assets.inputs == [] + + @conf_vars({("logging", "encrypt_s3_logs"): "True"}) + def test_write_with_encryption(self): + self.subject.write("text", self.remote_log_location) + resp = self.conn.head_object(Bucket="bucket", Key=self.remote_log_key) + assert resp["ServerSideEncryption"] == "AES256" + body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read() + assert body == b"text" + def test_upload_repeated_appends_no_duplication(self): """Simulate reschedule-mode sensor: each cycle appends to the local log, then uploads.