From 7f2704bc19ea6d6da41ea26232a16986353b090b Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 20 Oct 2025 12:07:49 +0800 Subject: [PATCH 1/2] Fix task_log_sources naming for test_remote_logging_s3 --- .../airflow_e2e_tests/remote_log_tests/test_remote_logging.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py index d4da248a84fa4..eb1e5dee76f31 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -63,6 +63,10 @@ def test_remote_logging_s3(self): task_log_sources = [ source for content in task_logs.get("content", [{}]) for source in content.get("sources", []) ] + # remove "/opt/airflow/logs/" prefix from log source paths + # before: /opt/airflow/logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_pull/attempt=1.log + # after: dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_pull/attempt=1.log + task_log_sources = [source.replace("/opt/airflow/logs/", "") for source in task_log_sources] s3_client = boto3.client( "s3", From 6293d6dbb4f7a603c070a5ee002f1bbbad81f35c Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 20 Oct 2025 15:10:00 +0800 Subject: [PATCH 2/2] Move retry before get_task_logs --- .../remote_log_tests/test_remote_logging.py | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py index eb1e5dee76f31..15c45fbc5b848 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -54,20 +54,8 @@ def test_remote_logging_s3(self): f"DAG {TestRemoteLogging.dag_id} did not complete successfully. Final state: {state}" ) - task_logs = self.airflow_client.get_task_logs( - dag_id=TestRemoteLogging.dag_id, - task_id="bash_pull", - run_id=resp["dag_run_id"], - ) - - task_log_sources = [ - source for content in task_logs.get("content", [{}]) for source in content.get("sources", []) - ] - # remove "/opt/airflow/logs/" prefix from log source paths - # before: /opt/airflow/logs/dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_pull/attempt=1.log - # after: dag_id=example_xcom_test/run_id=manual__2025-10-20T03:24:32.261538+00:00/task_id=bash_pull/attempt=1.log - task_log_sources = [source.replace("/opt/airflow/logs/", "") for source in task_log_sources] - + # This bucket will be created part of the docker-compose setup in + bucket_name = "test-airflow-logs" s3_client = boto3.client( "s3", endpoint_url="http://localhost:4566", @@ -76,11 +64,7 @@ def test_remote_logging_s3(self): region_name="us-east-1", ) - # This bucket will be created part of the docker-compose setup in - bucket_name = "test-airflow-logs" - response = s3_client.list_objects_v2(Bucket=bucket_name) - - # Wait for logs to be available in S3 + # Wait for logs to be available in S3 before we call `get_task_logs` for _ in range(self.max_retries): response = s3_client.list_objects_v2(Bucket=bucket_name) contents = response.get("Contents", []) @@ -90,15 +74,26 @@ def test_remote_logging_s3(self): print(f"Expected at least {self.task_count} log files, found {len(contents)}. Retrying...") time.sleep(self.retry_interval_in_seconds) - if "Contents" not in response: - pytest.fail("No objects found in S3 bucket %s", bucket_name) - if len(response["Contents"]) < self.task_count: pytest.fail( f"Expected at least {self.task_count} log files in S3 bucket {bucket_name}, " f"but found {len(response['Contents'])} objects: {[obj.get('Key') for obj in response.get('Contents', [])]}" ) + task_logs = self.airflow_client.get_task_logs( + dag_id=TestRemoteLogging.dag_id, + task_id="bash_pull", + run_id=resp["dag_run_id"], + ) + + task_log_sources = [ + source for content in task_logs.get("content", [{}]) for source in content.get("sources", []) + ] + response = s3_client.list_objects_v2(Bucket=bucket_name) + + if "Contents" not in response: + pytest.fail("No objects found in S3 bucket %s", bucket_name) + # s3 key format: dag_id=example_xcom/run_id=manual__2025-09-29T23:32:09.457215+00:00/task_id=bash_pull/attempt=1.log log_files = [f"s3://{bucket_name}/{obj['Key']}" for obj in response["Contents"]]