From 587820264d9687b6362125f018d07d003f257318 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 16 Jun 2026 19:23:08 +0900 Subject: [PATCH] fix(s3_task_handler.py): upload log with put_object Signed-off-by: PoAn Yang --- .../amazon/aws/log/s3_task_handler.py | 18 +++++++++++++----- .../amazon/aws/log/test_s3_task_handler.py | 15 +++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index d27530f7c871d..b1ce26098fb34 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -128,16 +128,24 @@ def write( self.log.exception("Could not verify previous log to append") return False + bucket, key = self.hook.parse_s3_url(remote_log_location) + # Upload the log via the boto3 client directly instead of S3Hook.load_string. The hook's + # upload helpers report the object to the hook lineage collector, which would make task + # logs show up as task outputs in OpenLineage events. Logs are not task data assets. + extra_args = {} + if conf.getboolean("logging", "ENCRYPT_S3_LOGS"): + extra_args["ServerSideEncryption"] = "AES256" + # Default to a single retry attempt because s3 upload failures are # rare but occasionally occur. Multiple retry attempts are unlikely # to help as they usually indicate non-ephemeral errors. for try_num in range(1 + max_retry): try: - self.hook.load_string( - log, - key=remote_log_location, - replace=True, - encrypt=conf.getboolean("logging", "ENCRYPT_S3_LOGS"), + self.hook.get_conn().put_object( + Bucket=bucket, + Key=key, + Body=log.encode("utf-8"), + **extra_args, ) break except Exception: diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 211a213b3b892..01614ff732b52 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -184,6 +184,21 @@ def test_write_existing(self): assert body == b"previous \ntext" + def test_write_does_not_expose_lineage(self, hook_lineage_collector): + # Remote task logs are not task data assets, so uploading them must not add the S3 object + # as a task output in OpenLineage events. + self.subject.write("text", self.remote_log_location) + assert hook_lineage_collector.collected_assets.outputs == [] + assert hook_lineage_collector.collected_assets.inputs == [] + + @conf_vars({("logging", "encrypt_s3_logs"): "True"}) + def test_write_with_encryption(self): + self.subject.write("text", self.remote_log_location) + resp = self.conn.head_object(Bucket="bucket", Key=self.remote_log_key) + assert resp["ServerSideEncryption"] == "AES256" + body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read() + assert body == b"text" + def test_upload_repeated_appends_no_duplication(self): """Simulate reschedule-mode sensor: each cycle appends to the local log, then uploads.