diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py index d4da248a84fa4..15c45fbc5b848 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_tests/test_remote_logging.py @@ -54,16 +54,8 @@ def test_remote_logging_s3(self): f"DAG {TestRemoteLogging.dag_id} did not complete successfully. Final state: {state}" ) - task_logs = self.airflow_client.get_task_logs( - dag_id=TestRemoteLogging.dag_id, - task_id="bash_pull", - run_id=resp["dag_run_id"], - ) - - task_log_sources = [ - source for content in task_logs.get("content", [{}]) for source in content.get("sources", []) - ] - + # This bucket will be created part of the docker-compose setup in + bucket_name = "test-airflow-logs" s3_client = boto3.client( "s3", endpoint_url="http://localhost:4566", @@ -72,11 +64,7 @@ def test_remote_logging_s3(self): region_name="us-east-1", ) - # This bucket will be created part of the docker-compose setup in - bucket_name = "test-airflow-logs" - response = s3_client.list_objects_v2(Bucket=bucket_name) - - # Wait for logs to be available in S3 + # Wait for logs to be available in S3 before we call `get_task_logs` for _ in range(self.max_retries): response = s3_client.list_objects_v2(Bucket=bucket_name) contents = response.get("Contents", []) @@ -86,15 +74,26 @@ def test_remote_logging_s3(self): print(f"Expected at least {self.task_count} log files, found {len(contents)}. Retrying...") time.sleep(self.retry_interval_in_seconds) - if "Contents" not in response: - pytest.fail("No objects found in S3 bucket %s", bucket_name) - if len(response["Contents"]) < self.task_count: pytest.fail( f"Expected at least {self.task_count} log files in S3 bucket {bucket_name}, " f"but found {len(response['Contents'])} objects: {[obj.get('Key') for obj in response.get('Contents', [])]}" ) + task_logs = self.airflow_client.get_task_logs( + dag_id=TestRemoteLogging.dag_id, + task_id="bash_pull", + run_id=resp["dag_run_id"], + ) + + task_log_sources = [ + source for content in task_logs.get("content", [{}]) for source in content.get("sources", []) + ] + response = s3_client.list_objects_v2(Bucket=bucket_name) + + if "Contents" not in response: + pytest.fail("No objects found in S3 bucket %s", bucket_name) + # s3 key format: dag_id=example_xcom/run_id=manual__2025-09-29T23:32:09.457215+00:00/task_id=bash_pull/attempt=1.log log_files = [f"s3://{bucket_name}/{obj['Key']}" for obj in response["Contents"]]