Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,24 @@ def write(
self.log.exception("Could not verify previous log to append")
return False

bucket, key = self.hook.parse_s3_url(remote_log_location)
# Upload the log via the boto3 client directly instead of S3Hook.load_string. The hook's
# upload helpers report the object to the hook lineage collector, which would make task
# logs show up as task outputs in OpenLineage events. Logs are not task data assets.
extra_args = {}
if conf.getboolean("logging", "ENCRYPT_S3_LOGS"):
extra_args["ServerSideEncryption"] = "AES256"

# Default to a single retry attempt because s3 upload failures are
# rare but occasionally occur. Multiple retry attempts are unlikely
# to help as they usually indicate non-ephemeral errors.
for try_num in range(1 + max_retry):
try:
self.hook.load_string(
log,
key=remote_log_location,
replace=True,
encrypt=conf.getboolean("logging", "ENCRYPT_S3_LOGS"),
self.hook.get_conn().put_object(
Bucket=bucket,
Key=key,
Body=log.encode("utf-8"),
**extra_args,
)
break
except Exception:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,21 @@ def test_write_existing(self):

assert body == b"previous \ntext"

def test_write_does_not_expose_lineage(self, hook_lineage_collector):
# Remote task logs are not task data assets, so uploading them must not add the S3 object
# as a task output in OpenLineage events.
self.subject.write("text", self.remote_log_location)
assert hook_lineage_collector.collected_assets.outputs == []
assert hook_lineage_collector.collected_assets.inputs == []

@conf_vars({("logging", "encrypt_s3_logs"): "True"})
def test_write_with_encryption(self):
self.subject.write("text", self.remote_log_location)
resp = self.conn.head_object(Bucket="bucket", Key=self.remote_log_key)
assert resp["ServerSideEncryption"] == "AES256"
body = boto3.resource("s3").Object("bucket", self.remote_log_key).get()["Body"].read()
assert body == b"text"

def test_upload_repeated_appends_no_duplication(self):
"""Simulate reschedule-mode sensor: each cycle appends to the local log, then uploads.

Expand Down
Loading