From a11f913ecb9034343f816ef0a0205718b1520345 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=96=86=E5=AE=B8?= Date: Tue, 16 Sep 2025 12:07:22 -0400 Subject: [PATCH 1/9] feat(ui): implement filterbar for Dag run feat: add filter for dag Version feat(api): Add duration filter for Dag runs feat(ui): implement duration filter for Dag runs feat(ui,api): implement conf_contains filter refactor(ui): move trigering user filter into filterbar refactor: Simplify JSON type detection for filtering fix(i18n): modify triggering user translation --- .../airflow/api_fastapi/common/parameters.py | 8 ++ .../core_api/datamodels/dag_run.py | 7 ++ .../openapi/v2-rest-api-generated.yaml | 63 ++++++++++++++ .../core_api/routes/public/dag_run.py | 31 ++++++- .../airflow/ui/openapi-gen/queries/common.ts | 9 +- .../ui/openapi-gen/queries/ensureQueryData.ts | 14 +++- .../ui/openapi-gen/queries/prefetch.ts | 14 +++- .../airflow/ui/openapi-gen/queries/queries.ts | 14 +++- .../ui/openapi-gen/queries/suspense.ts | 14 +++- .../ui/openapi-gen/requests/schemas.gen.ts | 55 +++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 10 +++ .../ui/openapi-gen/requests/types.gen.ts | 10 +++ .../ui/public/i18n/locales/en/common.json | 2 +- .../ui/public/i18n/locales/en/dags.json | 3 +- .../ui/src/constants/filterConfigs.tsx | 35 +++++++- .../airflow/ui/src/constants/searchParams.ts | 4 + .../src/airflow/ui/src/pages/DagRuns.tsx | 62 +++++--------- .../airflow/ui/src/pages/DagRunsFilters.tsx | 82 +++++++++++++++++++ .../airflow/ui/src/utils/useFiltersHandler.ts | 5 ++ .../airflowctl/api/datamodels/generated.py | 5 ++ 20 files changed, 390 insertions(+), 57 deletions(-) create mode 100644 airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index e801d2d9e79df..4c68c610fe4ae 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -307,6 +307,7 @@ class FilterOptionEnum(Enum): ANY_EQUAL = "any_eq" ALL_EQUAL = "all_eq" IS_NONE = "is_none" + CONTAINS = "contains" class FilterParam(BaseParam[T]): @@ -364,6 +365,13 @@ def to_orm(self, select: Select) -> Select: return select.where(self.attribute.is_not(None)) if self.value is True: return select.where(self.attribute.is_(None)) + if self.filter_option == FilterOptionEnum.CONTAINS: + # For JSON/JSONB columns, convert to text before applying LIKE + from sqlalchemy import Text, cast + + if str(self.attribute.type).upper() in ("JSON", "JSONB"): + return select.where(cast(self.attribute, Text).contains(self.value)) + return select.where(self.attribute.contains(self.value)) raise ValueError(f"Invalid filter option {self.filter_option} for value {self.value}") @classmethod diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index ed7aac2ae000f..acfa5547a50bf 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -180,3 +180,10 @@ class DAGRunsBatchBody(StrictBaseModel): end_date_gt: AwareDatetime | None = None end_date_lte: AwareDatetime | None = None end_date_lt: AwareDatetime | None = None + + duration_gte: float | None = None + duration_gt: float | None = None + duration_lte: float | None = None + duration_lt: float | None = None + + conf_contains: str | None = None diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 59ab6146c5cbd..352bb24040dc9 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2187,6 +2187,44 @@ paths: format: date-time - type: 'null' title: Updated At Lt + - name: duration_gte + in: query + required: false + schema: + anyOf: + - type: number + - type: 'null' + title: Duration Gte + - name: duration_gt + in: query + required: false + schema: + anyOf: + - type: number + - type: 'null' + title: Duration Gt + - name: duration_lte + in: query + required: false + schema: + anyOf: + - type: number + - type: 'null' + title: Duration Lte + - name: duration_lt + in: query + required: false + schema: + anyOf: + - type: number + - type: 'null' + title: Duration Lt + - name: conf_contains + in: query + required: false + schema: + type: string + title: Conf Contains - name: run_type in: query required: false @@ -10366,6 +10404,31 @@ components: format: date-time - type: 'null' title: End Date Lt + duration_gte: + anyOf: + - type: number + - type: 'null' + title: Duration Gte + duration_gt: + anyOf: + - type: number + - type: 'null' + title: Duration Gt + duration_lte: + anyOf: + - type: number + - type: 'null' + title: Duration Lte + duration_lt: + anyOf: + - type: number + - type: 'null' + title: Duration Lt + conf_contains: + anyOf: + - type: string + - type: 'null' + title: Conf Contains additionalProperties: false type: object title: DAGRunsBatchBody diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 0aa4be0997901..3aba4b78bde92 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -51,6 +51,8 @@ SortParam, _SearchParam, datetime_range_filter_factory, + filter_param_factory, + float_range_filter_factory, search_param_factory, ) from airflow.api_fastapi.common.router import AirflowRouter @@ -318,6 +320,11 @@ def get_dag_runs( start_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("start_date", DagRun))], end_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("end_date", DagRun))], update_at_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("updated_at", DagRun))], + duration_range: Annotated[RangeFilter, Depends(float_range_filter_factory("duration", DagRun))], + conf_contains: Annotated[ + FilterParam[str], + Depends(filter_param_factory(DagRun.conf, str, FilterOptionEnum.CONTAINS, "conf_contains")), + ], run_type: QueryDagRunRunTypesFilter, state: QueryDagRunStateFilter, dag_version: QueryDagRunVersionFilter, @@ -376,6 +383,8 @@ def get_dag_runs( start_date_range, end_date_range, update_at_range, + duration_range, + conf_contains, state, run_type, dag_version, @@ -565,6 +574,16 @@ def get_list_dag_runs_batch( ), attribute=DagRun.end_date, ) + duration = RangeFilter( + Range( + lower_bound_gte=body.duration_gte, + lower_bound_gt=body.duration_gt, + upper_bound_lte=body.duration_lte, + upper_bound_lt=body.duration_lt, + ), + attribute=DagRun.duration, + ) + conf_contains = FilterParam(DagRun.conf, body.conf_contains, FilterOptionEnum.CONTAINS) state = FilterParam(DagRun.state, body.states, FilterOptionEnum.ANY_EQUAL) offset = OffsetFilter(body.page_offset) @@ -590,7 +609,17 @@ def get_list_dag_runs_batch( base_query = select(DagRun).options(joinedload(DagRun.dag_model)) dag_runs_select, total_entries = paginated_select( statement=base_query, - filters=[dag_ids, logical_date, run_after, start_date, end_date, state, readable_dag_runs_filter], + filters=[ + dag_ids, + logical_date, + run_after, + start_date, + end_date, + duration, + conf_contains, + state, + readable_dag_runs_filter, + ], order_by=order_by, offset=offset, limit=limit, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 64d1234171f04..02f4060a7d47b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -143,10 +143,15 @@ export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId }: export type DagRunServiceGetDagRunsDefaultResponse = Awaited>; export type DagRunServiceGetDagRunsQueryResult = UseQueryResult; export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns"; -export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const UseDagRunServiceGetDagRunsKeyFn = ({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -174,7 +179,7 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, dagIdPattern, dagVersio updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])]; +}, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])]; export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; export type DagRunServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index f6f354d707c13..3450028eaab89 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -285,6 +285,11 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu * @param data.updatedAtGt * @param data.updatedAtLte * @param data.updatedAtLt +* @param data.durationGte +* @param data.durationGt +* @param data.durationLte +* @param data.durationLt +* @param data.confContains * @param data.runType * @param data.state * @param data.dagVersion @@ -295,10 +300,15 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -326,7 +336,7 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); +}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 35acd1db35c74..0654c66ba42d4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -285,6 +285,11 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer * @param data.updatedAtGt * @param data.updatedAtLte * @param data.updatedAtLt +* @param data.durationGte +* @param data.durationGt +* @param data.durationLte +* @param data.durationLt +* @param data.confContains * @param data.runType * @param data.state * @param data.dagVersion @@ -295,10 +300,15 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -326,7 +336,7 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); +}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 4ead6f2369806..64185e962c8fa 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -285,6 +285,11 @@ export const useDagRunServiceGetUpstreamAssetEvents = = unknown[]>({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const useDagRunServiceGetDagRuns = = unknown[]>({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -326,7 +336,7 @@ export const useDagRunServiceGetDagRuns = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 4967a3cb498df..7e508342e2e1a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -285,6 +285,11 @@ export const useDagRunServiceGetUpstreamAssetEventsSuspense = = unknown[]>({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const useDagRunServiceGetDagRunsSuspense = = unknown[]>({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -326,7 +336,7 @@ export const useDagRunServiceGetDagRunsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 36273f8818adc..a00c346261c35 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2777,6 +2777,61 @@ export const $DAGRunsBatchBody = { } ], title: 'End Date Lt' + }, + duration_gte: { + anyOf: [ + { + type: 'number' + }, + { + type: 'null' + } + ], + title: 'Duration Gte' + }, + duration_gt: { + anyOf: [ + { + type: 'number' + }, + { + type: 'null' + } + ], + title: 'Duration Gt' + }, + duration_lte: { + anyOf: [ + { + type: 'number' + }, + { + type: 'null' + } + ], + title: 'Duration Lte' + }, + duration_lt: { + anyOf: [ + { + type: 'number' + }, + { + type: 'null' + } + ], + title: 'Duration Lt' + }, + conf_contains: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Conf Contains' } }, additionalProperties: false, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 90ec7766f8c6f..63735684e6abd 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -996,6 +996,11 @@ export class DagRunService { * @param data.updatedAtGt * @param data.updatedAtLte * @param data.updatedAtLt + * @param data.durationGte + * @param data.durationGt + * @param data.durationLte + * @param data.durationLt + * @param data.confContains * @param data.runType * @param data.state * @param data.dagVersion @@ -1036,6 +1041,11 @@ export class DagRunService { updated_at_gt: data.updatedAtGt, updated_at_lte: data.updatedAtLte, updated_at_lt: data.updatedAtLt, + duration_gte: data.durationGte, + duration_gt: data.durationGt, + duration_lte: data.durationLte, + duration_lt: data.durationLt, + conf_contains: data.confContains, run_type: data.runType, state: data.state, dag_version: data.dagVersion, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index ed113c27ebed8..9c40c2332a72e 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -691,6 +691,11 @@ export type DAGRunsBatchBody = { end_date_gt?: string | null; end_date_lte?: string | null; end_date_lt?: string | null; + duration_gte?: number | null; + duration_gt?: number | null; + duration_lte?: number | null; + duration_lt?: number | null; + conf_contains?: string | null; }; /** @@ -2254,12 +2259,17 @@ export type ClearDagRunData = { export type ClearDagRunResponse = TaskInstanceCollectionResponse | DAGRunResponse; export type GetDagRunsData = { + confContains?: string; dagId: string; /** * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. */ dagIdPattern?: string | null; dagVersion?: Array<(number)>; + durationGt?: number | null; + durationGte?: number | null; + durationLt?: number | null; + durationLte?: number | null; endDateGt?: string | null; endDateGte?: string | null; endDateLt?: string | null; diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 111a3250704dc..b0a5fbdbbf23f 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -64,7 +64,7 @@ "runType": "Run Type", "sourceAssetEvent": "Source Asset Event", "triggeredBy": "Triggered By", - "triggeringUser": "Triggering User Name" + "triggeringUser": "Triggering User" }, "dagRun_one": "Dag Run", "dagRun_other": "Dag Runs", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json index 1367ac6a929f8..6705e10196206 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json @@ -20,8 +20,7 @@ "all": "All", "paused": "Paused" }, - "runIdPatternFilter": "Search Dag Runs", - "triggeringUserNameFilter": "Search by Triggering User" + "runIdPatternFilter": "Search Dag Runs" }, "ownerLink": "Owner link for {{owner}}", "runAndTaskActions": { diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx index 8c83a75b84dc5..3b037bcaaacfc 100644 --- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx +++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx @@ -19,7 +19,7 @@ import { useTranslation } from "react-i18next"; import { FiBarChart, FiUser } from "react-icons/fi"; import { LuBrackets } from "react-icons/lu"; -import { MdDateRange, MdSearch } from "react-icons/md"; +import { MdDateRange, MdSearch, MdHistory, MdHourglassEmpty, MdCode } from "react-icons/md"; import { DagIcon } from "src/assets/DagIcon"; import { TaskIcon } from "src/assets/TaskIcon"; @@ -36,7 +36,7 @@ export enum FilterTypes { } export const useFilterConfigs = () => { - const { t: translate } = useTranslation(["browse", "common", "admin", "hitl"]); + const { t: translate } = useTranslation(["browse", "common", "admin", "hitl", "dagRun"]); const filterConfigMap = { [SearchParamsKeys.AFTER]: { @@ -49,6 +49,12 @@ export const useFilterConfigs = () => { label: translate("common:table.to"), type: FilterTypes.DATE, }, + [SearchParamsKeys.CONF_CONTAINS]: { + hotkeyDisabled: true, + icon: , + label: translate("common:dagRun.conf"), + type: FilterTypes.TEXT, + }, [SearchParamsKeys.DAG_DISPLAY_NAME_PATTERN]: { hotkeyDisabled: true, icon: , @@ -61,6 +67,25 @@ export const useFilterConfigs = () => { label: translate("common:dagId"), type: FilterTypes.TEXT, }, + [SearchParamsKeys.DAG_VERSION]: { + hotkeyDisabled: true, + icon: , + label: translate("common:dagRun.dagVersions"), + min: 1, + type: FilterTypes.NUMBER, + }, + [SearchParamsKeys.DURATION_GTE]: { + icon: , + label: translate("common:filters.durationFrom", "Duration From"), // To do: add translation key and remove default value + min: 0, + type: FilterTypes.NUMBER, + }, + [SearchParamsKeys.DURATION_LTE]: { + icon: , + label: translate("common:filters.durationTo", "Duration To"), // To do: add translation key and remove default value + min: 0, + type: FilterTypes.NUMBER, + }, [SearchParamsKeys.END_DATE]: { icon: , label: translate("common:table.to"), @@ -146,6 +171,12 @@ export const useFilterConfigs = () => { label: translate("common:taskId"), type: FilterTypes.TEXT, }, + [SearchParamsKeys.TRIGGERING_USER_NAME_PATTERN]: { + hotkeyDisabled: true, + icon: , + label: translate("dagRun.triggeringUser"), + type: FilterTypes.TEXT, + }, [SearchParamsKeys.TRY_NUMBER]: { label: translate("common:tryNumber"), min: 1, diff --git a/airflow-core/src/airflow/ui/src/constants/searchParams.ts b/airflow-core/src/airflow/ui/src/constants/searchParams.ts index cebc3d074ee0b..8ed9e3dd5b3c7 100644 --- a/airflow-core/src/airflow/ui/src/constants/searchParams.ts +++ b/airflow-core/src/airflow/ui/src/constants/searchParams.ts @@ -19,10 +19,14 @@ export enum SearchParamsKeys { AFTER = "after", BEFORE = "before", + CONF_CONTAINS = "conf_contains", DAG_DISPLAY_NAME_PATTERN = "dag_display_name_pattern", DAG_ID = "dag_id", DAG_ID_PATTERN = "dag_id_pattern", + DAG_VERSION = "dag_version", DEPENDENCIES = "dependencies", + DURATION_GTE = "duration_gte", + DURATION_LTE = "duration_lte", END_DATE = "end_date", EVENT_TYPE = "event_type", EXCLUDED_EVENTS = "excluded_events", diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx index 792e42054efb1..b55404d016917 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx @@ -43,13 +43,20 @@ import { TruncatedText } from "src/components/TruncatedText"; import { Select } from "src/components/ui"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { dagRunTypeOptions, dagRunStateOptions as stateOptions } from "src/constants/stateOptions"; +import { DagRunsFilters } from "src/pages/DagRunsFilters"; import DeleteRunButton from "src/pages/DeleteRunButton"; import { renderDuration, useAutoRefresh, isStatePending } from "src/utils"; type DagRunRow = { row: { original: DAGRunResponse } }; const { + CONF_CONTAINS: CONF_CONTAINS_PARAM, DAG_ID_PATTERN: DAG_ID_PATTERN_PARAM, + DAG_VERSION: DAG_VERSION_PARAM, + DURATION_GTE: DURATION_GTE_PARAM, + DURATION_LTE: DURATION_LTE_PARAM, END_DATE: END_DATE_PARAM, + RUN_AFTER_GTE: RUN_AFTER_GTE_PARAM, + RUN_AFTER_LTE: RUN_AFTER_LTE_PARAM, RUN_ID_PATTERN: RUN_ID_PATTERN_PARAM, RUN_TYPE: RUN_TYPE_PARAM, START_DATE: START_DATE_PARAM, @@ -197,19 +204,32 @@ export const DagRuns = () => { const filteredRunIdPattern = searchParams.get(RUN_ID_PATTERN_PARAM); const filteredTriggeringUserNamePattern = searchParams.get(TRIGGERING_USER_NAME_PATTERN_PARAM); const filteredDagIdPattern = searchParams.get(DAG_ID_PATTERN_PARAM); + const filteredDagVersion = searchParams.get(DAG_VERSION_PARAM); const startDate = searchParams.get(START_DATE_PARAM); const endDate = searchParams.get(END_DATE_PARAM); + const runAfterGte = searchParams.get(RUN_AFTER_GTE_PARAM); + const runAfterLte = searchParams.get(RUN_AFTER_LTE_PARAM); + const durationGte = searchParams.get(DURATION_GTE_PARAM); + const durationLte = searchParams.get(DURATION_LTE_PARAM); + const confContains = searchParams.get(CONF_CONTAINS_PARAM); const refetchInterval = useAutoRefresh({}); const { data, error, isLoading } = useDagRunServiceGetDagRuns( { + confContains: confContains !== null && confContains !== "" ? confContains : undefined, dagId: dagId ?? "~", dagIdPattern: filteredDagIdPattern ?? undefined, + dagVersion: + filteredDagVersion !== null && filteredDagVersion !== "" ? [Number(filteredDagVersion)] : undefined, + durationGte: durationGte !== null && durationGte !== "" ? Number(durationGte) : undefined, + durationLte: durationLte !== null && durationLte !== "" ? Number(durationLte) : undefined, endDateLte: endDate ?? undefined, limit: pageSize, offset: pageIndex * pageSize, orderBy, + runAfterGte: runAfterGte ?? undefined, + runAfterLte: runAfterLte ?? undefined, runIdPattern: filteredRunIdPattern ?? undefined, runType: filteredType === null ? undefined : [filteredType], startDateGte: startDate ?? undefined, @@ -275,38 +295,6 @@ export const DagRuns = () => { [pagination, searchParams, setSearchParams, setTableURLState, sorting], ); - const handleTriggeringUserNamePatternChange = useCallback( - (value: string) => { - if (value === "") { - searchParams.delete(TRIGGERING_USER_NAME_PATTERN_PARAM); - } else { - searchParams.set(TRIGGERING_USER_NAME_PATTERN_PARAM, value); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - - const handleDagIdPatternChange = useCallback( - (value: string) => { - if (value === "") { - searchParams.delete(DAG_ID_PATTERN_PARAM); - } else { - searchParams.set(DAG_ID_PATTERN_PARAM, value); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - return ( <> @@ -330,15 +318,6 @@ export const DagRuns = () => { placeHolder={translate("dags:filters.runIdPatternFilter")} /> - - - { + { + const searchParamKeys = useMemo((): Array => { + const keys: Array = [ + SearchParamsKeys.START_DATE, + SearchParamsKeys.END_DATE, + SearchParamsKeys.RUN_AFTER_GTE, + SearchParamsKeys.RUN_AFTER_LTE, + SearchParamsKeys.DURATION_GTE, + SearchParamsKeys.DURATION_LTE, + SearchParamsKeys.CONF_CONTAINS, + SearchParamsKeys.TRIGGERING_USER_NAME_PATTERN, + SearchParamsKeys.DAG_VERSION, + ]; + + if (dagId === undefined) { + keys.unshift(SearchParamsKeys.DAG_ID); + } + + return keys; + }, [dagId]); + + const { filterConfigs, handleFiltersChange, searchParams } = useFiltersHandler(searchParamKeys); + + const initialValues = useMemo(() => { + const values: Record = {}; + + filterConfigs.forEach((config) => { + const value = searchParams.get(config.key); + + if (value !== null && value !== "") { + if (config.type === "number") { + const parsedValue = Number(value); + + values[config.key] = isNaN(parsedValue) ? value : parsedValue; + } else { + values[config.key] = value; + } + } + }); + + return values; + }, [searchParams, filterConfigs]); + + return ( + + + + ); +}; diff --git a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts index d3543228d94eb..e63846543a423 100644 --- a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts +++ b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts @@ -27,8 +27,12 @@ import { SearchParamsKeys } from "src/constants/searchParams"; export type FilterableSearchParamsKeys = | SearchParamsKeys.AFTER | SearchParamsKeys.BEFORE + | SearchParamsKeys.CONF_CONTAINS | SearchParamsKeys.DAG_DISPLAY_NAME_PATTERN | SearchParamsKeys.DAG_ID + | SearchParamsKeys.DAG_VERSION + | SearchParamsKeys.DURATION_GTE + | SearchParamsKeys.DURATION_LTE | SearchParamsKeys.END_DATE | SearchParamsKeys.EVENT_TYPE | SearchParamsKeys.KEY_PATTERN @@ -43,6 +47,7 @@ export type FilterableSearchParamsKeys = | SearchParamsKeys.START_DATE | SearchParamsKeys.TASK_ID | SearchParamsKeys.TASK_ID_PATTERN + | SearchParamsKeys.TRIGGERING_USER_NAME_PATTERN | SearchParamsKeys.TRY_NUMBER | SearchParamsKeys.USER; diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 6ea61b24cf85c..f460e6661461d 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1392,6 +1392,11 @@ class DAGRunsBatchBody(BaseModel): end_date_gt: Annotated[datetime | None, Field(title="End Date Gt")] = None end_date_lte: Annotated[datetime | None, Field(title="End Date Lte")] = None end_date_lt: Annotated[datetime | None, Field(title="End Date Lt")] = None + duration_gte: Annotated[float | None, Field(title="Duration Gte")] = None + duration_gt: Annotated[float | None, Field(title="Duration Gt")] = None + duration_lte: Annotated[float | None, Field(title="Duration Lte")] = None + duration_lt: Annotated[float | None, Field(title="Duration Lt")] = None + conf_contains: Annotated[str | None, Field(title="Conf Contains")] = None class DAGVersionCollectionResponse(BaseModel): From deb34cdb3b685e7d366b850a8f4cd9b90cfd6681 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=96=86=E5=AE=B8?= Date: Tue, 23 Sep 2025 12:55:37 -0400 Subject: [PATCH 2/9] fix: resolve conflicts & move all filters into filterBar --- .../ui/src/constants/filterConfigs.tsx | 37 ++++- .../src/airflow/ui/src/pages/DagRuns.tsx | 155 +----------------- .../airflow/ui/src/pages/DagRunsFilters.tsx | 5 +- .../airflow/ui/src/utils/useFiltersHandler.ts | 3 + 4 files changed, 46 insertions(+), 154 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx index 3b037bcaaacfc..4dd8b279bffcb 100644 --- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx +++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx @@ -19,12 +19,21 @@ import { useTranslation } from "react-i18next"; import { FiBarChart, FiUser } from "react-icons/fi"; import { LuBrackets } from "react-icons/lu"; -import { MdDateRange, MdSearch, MdHistory, MdHourglassEmpty, MdCode } from "react-icons/md"; +import { + MdDateRange, + MdSearch, + MdHistory, + MdHourglassEmpty, + MdCode, + MdPlayArrow, + MdCheckCircle, +} from "react-icons/md"; import { DagIcon } from "src/assets/DagIcon"; import { TaskIcon } from "src/assets/TaskIcon"; import type { FilterConfig } from "src/components/FilterBar"; import { StateBadge } from "src/components/StateBadge"; +import { dagRunStateOptions, dagRunTypeOptions } from "src/constants/stateOptions"; import { SearchParamsKeys } from "./searchParams"; @@ -67,6 +76,12 @@ export const useFilterConfigs = () => { label: translate("common:dagId"), type: FilterTypes.TEXT, }, + [SearchParamsKeys.DAG_ID_PATTERN]: { + hotkeyDisabled: true, + icon: , + label: translate("common:dagId"), + type: FilterTypes.TEXT, + }, [SearchParamsKeys.DAG_VERSION]: { hotkeyDisabled: true, icon: , @@ -154,11 +169,29 @@ export const useFilterConfigs = () => { label: translate("common:runId"), type: FilterTypes.TEXT, }, + [SearchParamsKeys.RUN_TYPE]: { + icon: , + label: translate("common:dagRun.runType"), + options: dagRunTypeOptions.items.map((option) => ({ + label: option.value === "all" ? translate(option.label) : translate(option.label), + value: option.value, + })), + type: FilterTypes.SELECT, + }, [SearchParamsKeys.START_DATE]: { icon: , label: translate("common:table.from"), type: FilterTypes.DATE, }, + [SearchParamsKeys.STATE]: { + icon: , + label: translate("common:state"), + options: dagRunStateOptions.items.map((option) => ({ + label: option.value === "all" ? translate(option.label) : translate(option.label), + value: option.value, + })), + type: FilterTypes.SELECT, + }, [SearchParamsKeys.TASK_ID]: { hotkeyDisabled: true, icon: , @@ -174,7 +207,7 @@ export const useFilterConfigs = () => { [SearchParamsKeys.TRIGGERING_USER_NAME_PATTERN]: { hotkeyDisabled: true, icon: , - label: translate("dagRun.triggeringUser"), + label: translate("common:dagRun.triggeringUser"), type: FilterTypes.TEXT, }, [SearchParamsKeys.TRY_NUMBER]: { diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx index b55404d016917..f0917e971862e 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx @@ -1,5 +1,3 @@ -/* eslint-disable max-lines */ - /*! * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,15 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -import { Flex, HStack, Link, type SelectValueChangeDetails, Text, Box } from "@chakra-ui/react"; +import { Flex, HStack, Link, Text } from "@chakra-ui/react"; import type { ColumnDef } from "@tanstack/react-table"; import type { TFunction } from "i18next"; -import { useCallback } from "react"; import { useTranslation } from "react-i18next"; import { Link as RouterLink, useParams, useSearchParams } from "react-router-dom"; import { useDagRunServiceGetDagRuns } from "openapi/queries"; -import type { DAGRunResponse, DagRunState, DagRunType } from "openapi/requests/types.gen"; +import type { DAGRunResponse } from "openapi/requests/types.gen"; import { ClearRunButton } from "src/components/Clear"; import { DagVersion } from "src/components/DagVersion"; import { DataTable } from "src/components/DataTable"; @@ -36,13 +33,10 @@ import { LimitedItemsList } from "src/components/LimitedItemsList"; import { MarkRunAsButton } from "src/components/MarkAs"; import RenderedJsonField from "src/components/RenderedJsonField"; import { RunTypeIcon } from "src/components/RunTypeIcon"; -import { SearchBar } from "src/components/SearchBar"; import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { TruncatedText } from "src/components/TruncatedText"; -import { Select } from "src/components/ui"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; -import { dagRunTypeOptions, dagRunStateOptions as stateOptions } from "src/constants/stateOptions"; import { DagRunsFilters } from "src/pages/DagRunsFilters"; import DeleteRunButton from "src/pages/DeleteRunButton"; import { renderDuration, useAutoRefresh, isStatePending } from "src/utils"; @@ -185,7 +179,7 @@ const runColumns = (translate: TFunction, dagId?: string): Array { const { t: translate } = useTranslation(); const { dagId } = useParams(); - const [searchParams, setSearchParams] = useSearchParams(); + const [searchParams] = useSearchParams(); const { setTableURLState, tableURLState } = useTableURLState({ columnVisibility: { @@ -217,7 +211,7 @@ export const DagRuns = () => { const { data, error, isLoading } = useDagRunServiceGetDagRuns( { - confContains: confContains !== null && confContains !== "" ? confContains : undefined, + confContains: confContains !== null && confContains !== "" ? confContains : undefined, dagId: dagId ?? "~", dagIdPattern: filteredDagIdPattern ?? undefined, dagVersion: @@ -243,149 +237,8 @@ export const DagRuns = () => { }, ); - const handleStateChange = useCallback( - ({ value }: SelectValueChangeDetails) => { - const [val] = value; - - if (val === undefined || val === "all") { - searchParams.delete(STATE_PARAM); - } else { - searchParams.set(STATE_PARAM, val); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - - const handleTypeChange = useCallback( - ({ value }: SelectValueChangeDetails) => { - const [val] = value; - - if (val === undefined || val === "all") { - searchParams.delete(RUN_TYPE_PARAM); - } else { - searchParams.set(RUN_TYPE_PARAM, val); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - - const handleRunIdPatternChange = useCallback( - (value: string) => { - if (value === "") { - searchParams.delete(RUN_ID_PATTERN_PARAM); - } else { - searchParams.set(RUN_ID_PATTERN_PARAM, value); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - return ( <> - - {dagId === undefined && ( - - - - )} - - - - - - - {() => - filteredState === null ? ( - translate("dags:filters.allStates") - ) : ( - - {translate(`common:states.${filteredState}`)} - - ) - } - - - - {stateOptions.items.map((option) => ( - - {option.value === "all" ? ( - translate(option.label) - ) : ( - {translate(option.label)} - )} - - ))} - - - - - - - {() => - filteredType === null ? ( - translate("dags:filters.allRunTypes") - ) : ( - - - {translate(`common:runTypes.${filteredType}`)} - - ) - } - - - - {dagRunTypeOptions.items.map((option) => ( - - {option.value === "all" ? ( - translate(option.label) - ) : ( - - - {translate(option.label)} - - )} - - ))} - - - { const searchParamKeys = useMemo((): Array => { const keys: Array = [ + SearchParamsKeys.RUN_ID_PATTERN, + SearchParamsKeys.STATE, + SearchParamsKeys.RUN_TYPE, SearchParamsKeys.START_DATE, SearchParamsKeys.END_DATE, SearchParamsKeys.RUN_AFTER_GTE, @@ -42,7 +45,7 @@ export const DagRunsFilters = ({ dagId }: DagRunsFiltersProps) => { ]; if (dagId === undefined) { - keys.unshift(SearchParamsKeys.DAG_ID); + keys.unshift(SearchParamsKeys.DAG_ID_PATTERN); } return keys; diff --git a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts index e63846543a423..53d5cece02949 100644 --- a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts +++ b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts @@ -30,6 +30,7 @@ export type FilterableSearchParamsKeys = | SearchParamsKeys.CONF_CONTAINS | SearchParamsKeys.DAG_DISPLAY_NAME_PATTERN | SearchParamsKeys.DAG_ID + | SearchParamsKeys.DAG_ID_PATTERN | SearchParamsKeys.DAG_VERSION | SearchParamsKeys.DURATION_GTE | SearchParamsKeys.DURATION_LTE @@ -44,7 +45,9 @@ export type FilterableSearchParamsKeys = | SearchParamsKeys.RUN_AFTER_LTE | SearchParamsKeys.RUN_ID | SearchParamsKeys.RUN_ID_PATTERN + | SearchParamsKeys.RUN_TYPE | SearchParamsKeys.START_DATE + | SearchParamsKeys.STATE | SearchParamsKeys.TASK_ID | SearchParamsKeys.TASK_ID_PATTERN | SearchParamsKeys.TRIGGERING_USER_NAME_PATTERN From da96dbfc206d130bf8ece8df76518bd15a594a78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=96=86=E5=AE=B8?= Date: Tue, 23 Sep 2025 13:27:03 -0400 Subject: [PATCH 3/9] test: Add test for duration and conf_contains filters test: modify duration test --- .../core_api/routes/public/test_dag_run.py | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 11ef75674be3c..df6616b2c94f8 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -162,6 +162,26 @@ def setup(request, dag_maker, session=None): # Leave triggering_user_name as None for testing dag_run4.triggering_user_name = None + # Set start_date and end_date to create durations for testing duration filters + base_start = dag_run1.start_date or datetime(2021, 1, 1, tzinfo=timezone.utc) + dag_run1.start_date = base_start + dag_run1.end_date = base_start + timedelta(seconds=100.5) # 100.5 seconds duration + + dag_run2.start_date = base_start + dag_run2.end_date = base_start + timedelta(seconds=200.75) # 200.75 seconds duration + + dag_run3.start_date = base_start + dag_run3.end_date = base_start + timedelta(seconds=50.25) # 50.25 seconds duration + + dag_run4.start_date = base_start + dag_run4.end_date = base_start + timedelta(seconds=150.0) # 150 seconds duration + + # Set conf for testing conf_contains filter + dag_run1.conf = {"env": "production", "version": "1.0"} + dag_run2.conf = {"env": "staging", "debug": True} + dag_run3.conf = {"env": "development", "test_mode": True} + dag_run4.conf = {"env": "production", "version": "2.0"} + dag_maker.sync_dagbag_to_db() dag_maker.dag_model.has_task_concurrency_limits = True session.merge(ti1) @@ -629,6 +649,35 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): {"dag_version": [1, 999]}, [DAG1_RUN1_ID, DAG1_RUN2_ID], ), # Multiple versions, only existing ones match + # Test duration filters + ("~", {"duration_gte": 100}, [DAG1_RUN1_ID, DAG1_RUN2_ID, DAG2_RUN2_ID]), # >= 100 seconds + ("~", {"duration_lte": 100}, [DAG1_RUN1_ID, DAG2_RUN1_ID]), # <= 100 seconds + ("~", {"duration_gte": 50, "duration_lte": 150}, [DAG1_RUN1_ID, DAG2_RUN1_ID, DAG2_RUN2_ID]), + ("~", {"duration_gt": 100}, [DAG1_RUN2_ID, DAG2_RUN2_ID]), # > 100 seconds + ("~", {"duration_lt": 100}, [DAG2_RUN1_ID]), # < 100 seconds + (DAG1_ID, {"duration_gte": 150}, [DAG1_RUN2_ID]), # DAG1 runs >= 150 seconds + (DAG2_ID, {"duration_lte": 100}, [DAG2_RUN1_ID]), # DAG2 runs <= 100 seconds + # Test conf_contains filter + ("~", {"conf_contains": "production"}, [DAG1_RUN1_ID, DAG2_RUN2_ID]), # Contains "production" + ("~", {"conf_contains": "staging"}, [DAG1_RUN2_ID]), # Contains "staging" + ("~", {"conf_contains": "development"}, [DAG2_RUN1_ID]), # Contains "development" + ("~", {"conf_contains": "version"}, [DAG1_RUN1_ID, DAG2_RUN2_ID]), # Contains "version" + ("~", {"conf_contains": "debug"}, [DAG1_RUN2_ID]), # Contains "debug" + ("~", {"conf_contains": "test_mode"}, [DAG2_RUN1_ID]), # Contains "test_mode" + ("~", {"conf_contains": "nonexistent"}, []), # Contains non-existent string + (DAG1_ID, {"conf_contains": "production"}, [DAG1_RUN1_ID]), # DAG1 with "production" + (DAG2_ID, {"conf_contains": "env"}, [DAG2_RUN1_ID, DAG2_RUN2_ID]), # DAG2 with "env" + # Test combined filters + ( + "~", + {"duration_gte": 100, "conf_contains": "production"}, + [DAG1_RUN1_ID, DAG2_RUN2_ID], + ), # Duration >= 100 AND contains "production" + ( + "~", + {"state": DagRunState.SUCCESS.value, "duration_lte": 150}, + [DAG1_RUN1_ID, DAG2_RUN1_ID, DAG2_RUN2_ID], + ), # Success state AND duration <= 150 ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") @@ -648,6 +697,8 @@ def test_bad_filters(self, test_client): "start_date_lte": "invalid", "end_date_lte": "invalid", "run_after_lte": "invalid", + "duration_gte": "invalid", + "duration_lte": "invalid", } expected_detail = [ { @@ -706,6 +757,18 @@ def test_bad_filters(self, test_client): "input": "invalid", "ctx": {"error": "input is too short"}, }, + { + "type": "float_parsing", + "loc": ["query", "duration_gte"], + "msg": "Input should be a valid number, unable to parse string as a number", + "input": "invalid", + }, + { + "type": "float_parsing", + "loc": ["query", "duration_lte"], + "msg": "Input should be a valid number, unable to parse string as a number", + "input": "invalid", + }, ] response = test_client.get(f"/dags/{DAG1_ID}/dagRuns", params=query_params) assert response.status_code == 422 @@ -945,6 +1008,27 @@ def test_bad_limit_and_offset(self, test_client, post_body, expected_detail): }, [DAG1_RUN2_ID], ), + # Test duration filters for batch API + ({"duration_gte": 100}, [DAG1_RUN1_ID, DAG1_RUN2_ID, DAG2_RUN2_ID]), # >= 100 seconds + ({"duration_lte": 100}, [DAG1_RUN1_ID, DAG2_RUN1_ID]), # <= 100 seconds + ({"duration_gte": 50, "duration_lte": 150}, [DAG1_RUN1_ID, DAG2_RUN1_ID, DAG2_RUN2_ID]), + ({"duration_gt": 100}, [DAG1_RUN2_ID, DAG2_RUN2_ID]), # > 100 seconds + ({"duration_lt": 100}, [DAG2_RUN1_ID]), # < 100 seconds + # Test conf_contains filter for batch API + ({"conf_contains": "production"}, [DAG1_RUN1_ID, DAG2_RUN2_ID]), # Contains "production" + ({"conf_contains": "staging"}, [DAG1_RUN2_ID]), # Contains "staging" + ({"conf_contains": "development"}, [DAG2_RUN1_ID]), # Contains "development" + ({"conf_contains": "version"}, [DAG1_RUN1_ID, DAG2_RUN2_ID]), # Contains "version" + ({"conf_contains": "nonexistent"}, []), # Contains non-existent string + # Test combined filters for batch API + ( + {"duration_gte": 100, "conf_contains": "production"}, + [DAG1_RUN1_ID, DAG2_RUN2_ID], + ), # Duration >= 100 AND contains "production" + ( + {"states": [DagRunState.SUCCESS.value], "duration_lte": 150}, + [DAG1_RUN1_ID, DAG2_RUN1_ID, DAG2_RUN2_ID], + ), # Success state AND duration <= 150 ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") @@ -962,6 +1046,8 @@ def test_bad_filters(self, test_client): "logical_date_lte": "invalid", "start_date_lte": "invalid", "end_date_lte": "invalid", + "duration_gte": "invalid", + "duration_lte": "invalid", "dag_ids": "invalid", } expected_detail = [ @@ -1013,6 +1099,18 @@ def test_bad_filters(self, test_client): "input": "invalid", "ctx": {"error": "input is too short"}, }, + { + "type": "float_parsing", + "loc": ["body", "duration_gte"], + "msg": "Input should be a valid number, unable to parse string as a number", + "input": "invalid", + }, + { + "type": "float_parsing", + "loc": ["body", "duration_lte"], + "msg": "Input should be a valid number, unable to parse string as a number", + "input": "invalid", + }, ] response = test_client.post("/dags/~/dagRuns/list", json=post_body) assert response.status_code == 422 From f0a17d157424c19f0009c05c2bdc30095c34751e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=96=86=E5=AE=B8?= Date: Tue, 23 Sep 2025 15:01:36 -0400 Subject: [PATCH 4/9] fix(ui): Add icons and styled options to Dag runs select type filters fix(tests): Correct assignment in fixture fix: Correct tests for duration and conf_contains filters fix(test): using from_datetime_to_zulu_without_ms for end date fix(tests): Correct tuple assignment for in test setup --- .../ui/src/constants/filterConfigs.tsx | 20 ++- .../core_api/routes/public/test_dag_run.py | 121 ++++-------------- 2 files changed, 46 insertions(+), 95 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx index 4dd8b279bffcb..1210f094107ff 100644 --- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx +++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ +import { Flex } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { FiBarChart, FiUser } from "react-icons/fi"; import { LuBrackets } from "react-icons/lu"; @@ -29,9 +30,11 @@ import { MdCheckCircle, } from "react-icons/md"; +import type { DagRunState, DagRunType } from "openapi/requests/types.gen"; import { DagIcon } from "src/assets/DagIcon"; import { TaskIcon } from "src/assets/TaskIcon"; import type { FilterConfig } from "src/components/FilterBar"; +import { RunTypeIcon } from "src/components/RunTypeIcon"; import { StateBadge } from "src/components/StateBadge"; import { dagRunStateOptions, dagRunTypeOptions } from "src/constants/stateOptions"; @@ -173,7 +176,15 @@ export const useFilterConfigs = () => { icon: , label: translate("common:dagRun.runType"), options: dagRunTypeOptions.items.map((option) => ({ - label: option.value === "all" ? translate(option.label) : translate(option.label), + label: + option.value === "all" ? ( + translate(option.label) + ) : ( + + + {translate(option.label)} + + ), value: option.value, })), type: FilterTypes.SELECT, @@ -187,7 +198,12 @@ export const useFilterConfigs = () => { icon: , label: translate("common:state"), options: dagRunStateOptions.items.map((option) => ({ - label: option.value === "all" ? translate(option.label) : translate(option.label), + label: + option.value === "all" ? ( + translate(option.label) + ) : ( + {translate(option.label)} + ), value: option.value, })), type: FilterTypes.SELECT, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index df6616b2c94f8..fee585ea8dbeb 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -112,7 +112,11 @@ def setup(request, dag_maker, session=None): ) # Set triggering_user_name for testing dag_run1.triggering_user_name = "alice_admin" - dag_run1.note = (DAG1_RUN1_NOTE, "not_test") + dag_run1.note = DAG1_RUN1_NOTE + # Set end_date for testing duration filter + dag_run1.end_date = dag_run1.start_date + timedelta(seconds=100.5) + # Set conf for testing conf_contains filter + dag_run1.conf = {"env": "production", "version": "1.0"} for i, task in enumerate([task1, task2], start=1): ti = dag_run1.get_task_instance(task_id=task.task_id) @@ -130,6 +134,10 @@ def setup(request, dag_maker, session=None): ) # Set triggering_user_name for testing dag_run2.triggering_user_name = "bob_service" + # Set end_date for testing duration filter + dag_run2.end_date = dag_run2.start_date + timedelta(seconds=200.75) + # Set conf for testing conf_contains filter + dag_run2.conf = {"env": "staging", "debug": True} ti1 = dag_run2.get_task_instance(task_id=task1.task_id) ti1.task = task1 @@ -151,6 +159,10 @@ def setup(request, dag_maker, session=None): ) # Set triggering_user_name for testing dag_run3.triggering_user_name = "service_account" + # Set end_date for testing duration filter + dag_run3.end_date = dag_run3.start_date + timedelta(seconds=50.25) + # Set conf for testing conf_contains filter + dag_run3.conf = {"env": "development", "test_mode": True} dag_run4 = dag_maker.create_dagrun( run_id=DAG2_RUN2_ID, @@ -161,26 +173,10 @@ def setup(request, dag_maker, session=None): ) # Leave triggering_user_name as None for testing dag_run4.triggering_user_name = None - - # Set start_date and end_date to create durations for testing duration filters - base_start = dag_run1.start_date or datetime(2021, 1, 1, tzinfo=timezone.utc) - dag_run1.start_date = base_start - dag_run1.end_date = base_start + timedelta(seconds=100.5) # 100.5 seconds duration - - dag_run2.start_date = base_start - dag_run2.end_date = base_start + timedelta(seconds=200.75) # 200.75 seconds duration - - dag_run3.start_date = base_start - dag_run3.end_date = base_start + timedelta(seconds=50.25) # 50.25 seconds duration - - dag_run4.start_date = base_start - dag_run4.end_date = base_start + timedelta(seconds=150.0) # 150 seconds duration - + # Set end_date for testing duration filter + dag_run4.end_date = dag_run4.start_date + timedelta(seconds=150.0) # Set conf for testing conf_contains filter - dag_run1.conf = {"env": "production", "version": "1.0"} - dag_run2.conf = {"env": "staging", "debug": True} - dag_run3.conf = {"env": "development", "test_mode": True} - dag_run4.conf = {"env": "production", "version": "2.0"} + dag_run4.conf = None dag_maker.sync_dagbag_to_db() dag_maker.dag_model.has_task_concurrency_limits = True @@ -208,7 +204,7 @@ def get_dag_run_dict(run: DagRun): "queued_at": from_datetime_to_zulu(run.queued_at) if run.queued_at else None, "run_after": from_datetime_to_zulu_without_ms(run.run_after), "start_date": from_datetime_to_zulu_without_ms(run.start_date), - "end_date": from_datetime_to_zulu(run.end_date), + "end_date": from_datetime_to_zulu_without_ms(run.end_date), "duration": run.duration, "data_interval_start": from_datetime_to_zulu_without_ms(run.data_interval_start), "data_interval_end": from_datetime_to_zulu_without_ms(run.data_interval_end), @@ -650,34 +646,22 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): [DAG1_RUN1_ID, DAG1_RUN2_ID], ), # Multiple versions, only existing ones match # Test duration filters - ("~", {"duration_gte": 100}, [DAG1_RUN1_ID, DAG1_RUN2_ID, DAG2_RUN2_ID]), # >= 100 seconds - ("~", {"duration_lte": 100}, [DAG1_RUN1_ID, DAG2_RUN1_ID]), # <= 100 seconds - ("~", {"duration_gte": 50, "duration_lte": 150}, [DAG1_RUN1_ID, DAG2_RUN1_ID, DAG2_RUN2_ID]), - ("~", {"duration_gt": 100}, [DAG1_RUN2_ID, DAG2_RUN2_ID]), # > 100 seconds - ("~", {"duration_lt": 100}, [DAG2_RUN1_ID]), # < 100 seconds - (DAG1_ID, {"duration_gte": 150}, [DAG1_RUN2_ID]), # DAG1 runs >= 150 seconds - (DAG2_ID, {"duration_lte": 100}, [DAG2_RUN1_ID]), # DAG2 runs <= 100 seconds - # Test conf_contains filter - ("~", {"conf_contains": "production"}, [DAG1_RUN1_ID, DAG2_RUN2_ID]), # Contains "production" - ("~", {"conf_contains": "staging"}, [DAG1_RUN2_ID]), # Contains "staging" - ("~", {"conf_contains": "development"}, [DAG2_RUN1_ID]), # Contains "development" - ("~", {"conf_contains": "version"}, [DAG1_RUN1_ID, DAG2_RUN2_ID]), # Contains "version" - ("~", {"conf_contains": "debug"}, [DAG1_RUN2_ID]), # Contains "debug" - ("~", {"conf_contains": "test_mode"}, [DAG2_RUN1_ID]), # Contains "test_mode" - ("~", {"conf_contains": "nonexistent"}, []), # Contains non-existent string - (DAG1_ID, {"conf_contains": "production"}, [DAG1_RUN1_ID]), # DAG1 with "production" - (DAG2_ID, {"conf_contains": "env"}, [DAG2_RUN1_ID, DAG2_RUN2_ID]), # DAG2 with "env" - # Test combined filters + ("~", {"duration_gte": 200}, [DAG1_RUN2_ID]), # Test >= 200 seconds + ("~", {"duration_lt": 100}, [DAG2_RUN1_ID]), # Test < 100 seconds ( "~", - {"duration_gte": 100, "conf_contains": "production"}, + {"duration_gte": 100, "duration_lte": 150}, [DAG1_RUN1_ID, DAG2_RUN2_ID], - ), # Duration >= 100 AND contains "production" + ), # Test between 100 and 150 (inclusive) + # Test conf_contains filter + ("~", {"conf_contains": '"env": "production"'}, [DAG1_RUN1_ID]), # Test for "production" env ( "~", - {"state": DagRunState.SUCCESS.value, "duration_lte": 150}, - [DAG1_RUN1_ID, DAG2_RUN1_ID, DAG2_RUN2_ID], - ), # Success state AND duration <= 150 + {"conf_contains": '"debug": true'}, + [DAG1_RUN2_ID], + ), # Test for debug flag (note JSON true is lowercase) + ("~", {"conf_contains": "version"}, [DAG1_RUN1_ID]), # Test for the key "version" + ("~", {"conf_contains": "nonexistent_key"}, []), # Test for a key that doesn't exist ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") @@ -697,8 +681,6 @@ def test_bad_filters(self, test_client): "start_date_lte": "invalid", "end_date_lte": "invalid", "run_after_lte": "invalid", - "duration_gte": "invalid", - "duration_lte": "invalid", } expected_detail = [ { @@ -757,18 +739,6 @@ def test_bad_filters(self, test_client): "input": "invalid", "ctx": {"error": "input is too short"}, }, - { - "type": "float_parsing", - "loc": ["query", "duration_gte"], - "msg": "Input should be a valid number, unable to parse string as a number", - "input": "invalid", - }, - { - "type": "float_parsing", - "loc": ["query", "duration_lte"], - "msg": "Input should be a valid number, unable to parse string as a number", - "input": "invalid", - }, ] response = test_client.get(f"/dags/{DAG1_ID}/dagRuns", params=query_params) assert response.status_code == 422 @@ -1008,27 +978,6 @@ def test_bad_limit_and_offset(self, test_client, post_body, expected_detail): }, [DAG1_RUN2_ID], ), - # Test duration filters for batch API - ({"duration_gte": 100}, [DAG1_RUN1_ID, DAG1_RUN2_ID, DAG2_RUN2_ID]), # >= 100 seconds - ({"duration_lte": 100}, [DAG1_RUN1_ID, DAG2_RUN1_ID]), # <= 100 seconds - ({"duration_gte": 50, "duration_lte": 150}, [DAG1_RUN1_ID, DAG2_RUN1_ID, DAG2_RUN2_ID]), - ({"duration_gt": 100}, [DAG1_RUN2_ID, DAG2_RUN2_ID]), # > 100 seconds - ({"duration_lt": 100}, [DAG2_RUN1_ID]), # < 100 seconds - # Test conf_contains filter for batch API - ({"conf_contains": "production"}, [DAG1_RUN1_ID, DAG2_RUN2_ID]), # Contains "production" - ({"conf_contains": "staging"}, [DAG1_RUN2_ID]), # Contains "staging" - ({"conf_contains": "development"}, [DAG2_RUN1_ID]), # Contains "development" - ({"conf_contains": "version"}, [DAG1_RUN1_ID, DAG2_RUN2_ID]), # Contains "version" - ({"conf_contains": "nonexistent"}, []), # Contains non-existent string - # Test combined filters for batch API - ( - {"duration_gte": 100, "conf_contains": "production"}, - [DAG1_RUN1_ID, DAG2_RUN2_ID], - ), # Duration >= 100 AND contains "production" - ( - {"states": [DagRunState.SUCCESS.value], "duration_lte": 150}, - [DAG1_RUN1_ID, DAG2_RUN1_ID, DAG2_RUN2_ID], - ), # Success state AND duration <= 150 ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") @@ -1046,8 +995,6 @@ def test_bad_filters(self, test_client): "logical_date_lte": "invalid", "start_date_lte": "invalid", "end_date_lte": "invalid", - "duration_gte": "invalid", - "duration_lte": "invalid", "dag_ids": "invalid", } expected_detail = [ @@ -1099,18 +1046,6 @@ def test_bad_filters(self, test_client): "input": "invalid", "ctx": {"error": "input is too short"}, }, - { - "type": "float_parsing", - "loc": ["body", "duration_gte"], - "msg": "Input should be a valid number, unable to parse string as a number", - "input": "invalid", - }, - { - "type": "float_parsing", - "loc": ["body", "duration_lte"], - "msg": "Input should be a valid number, unable to parse string as a number", - "input": "invalid", - }, ] response = test_client.post("/dags/~/dagRuns/list", json=post_body) assert response.status_code == 422 From aa067a184cc7b64b8c05f5cc22936916a64e8c6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=96=86=E5=AE=B8?= Date: Wed, 24 Sep 2025 00:53:04 -0400 Subject: [PATCH 5/9] fix(tests): Correct datetime formatting and test expectations in dag_run API tests --- .../core_api/routes/public/test_dag_run.py | 80 +++++++++++++------ 1 file changed, 54 insertions(+), 26 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index fee585ea8dbeb..eef50affcb2fa 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -112,11 +112,11 @@ def setup(request, dag_maker, session=None): ) # Set triggering_user_name for testing dag_run1.triggering_user_name = "alice_admin" - dag_run1.note = DAG1_RUN1_NOTE + dag_run1.note = (DAG1_RUN1_NOTE, "not_test") # Set end_date for testing duration filter dag_run1.end_date = dag_run1.start_date + timedelta(seconds=100.5) - # Set conf for testing conf_contains filter - dag_run1.conf = {"env": "production", "version": "1.0"} + # Set conf for testing conf_contains filter (values ordered for predictable sorting) + dag_run1.conf = {"env": "development", "version": "1.0"} for i, task in enumerate([task1, task2], start=1): ti = dag_run1.get_task_instance(task_id=task.task_id) @@ -137,7 +137,7 @@ def setup(request, dag_maker, session=None): # Set end_date for testing duration filter dag_run2.end_date = dag_run2.start_date + timedelta(seconds=200.75) # Set conf for testing conf_contains filter - dag_run2.conf = {"env": "staging", "debug": True} + dag_run2.conf = {"env": "production", "debug": True} ti1 = dag_run2.get_task_instance(task_id=task1.task_id) ti1.task = task1 @@ -162,7 +162,7 @@ def setup(request, dag_maker, session=None): # Set end_date for testing duration filter dag_run3.end_date = dag_run3.start_date + timedelta(seconds=50.25) # Set conf for testing conf_contains filter - dag_run3.conf = {"env": "development", "test_mode": True} + dag_run3.conf = {"env": "staging", "test_mode": True} dag_run4 = dag_maker.create_dagrun( run_id=DAG2_RUN2_ID, @@ -176,7 +176,7 @@ def setup(request, dag_maker, session=None): # Set end_date for testing duration filter dag_run4.end_date = dag_run4.start_date + timedelta(seconds=150.0) # Set conf for testing conf_contains filter - dag_run4.conf = None + dag_run4.conf = {"env": "testing", "mode": "ci"} dag_maker.sync_dagbag_to_db() dag_maker.dag_model.has_task_concurrency_limits = True @@ -194,23 +194,45 @@ def get_dag_versions_dict(dag_versions: list[DagVersion]) -> list[dict]: ] +def format_datetime_like_api(dt: datetime | None) -> str | None: + """ + Format datetime to match API output behavior. + + The API serializes datetimes with microseconds only when they are non-zero: + - 2024-01-15T00:00:00Z (for times with zero microseconds) + - 2024-01-15T00:01:40.500000Z (for times with non-zero microseconds) + + This function ensures test expectations match the actual API behavior. + If API datetime serialization changes in the future, only this function needs updating. + + Args: + dt: datetime object or None + + Returns: + ISO 8601 formatted string with conditional microseconds, or None + """ + if dt is None: + return None + if dt.microsecond == 0: + return from_datetime_to_zulu_without_ms(dt) + return from_datetime_to_zulu(dt) + + def get_dag_run_dict(run: DagRun): return { "bundle_version": None, "dag_display_name": run.dag_model.dag_display_name, "dag_run_id": run.run_id, "dag_id": run.dag_id, - "logical_date": from_datetime_to_zulu_without_ms(run.logical_date), - "queued_at": from_datetime_to_zulu(run.queued_at) if run.queued_at else None, - "run_after": from_datetime_to_zulu_without_ms(run.run_after), - "start_date": from_datetime_to_zulu_without_ms(run.start_date), - "end_date": from_datetime_to_zulu_without_ms(run.end_date), + "logical_date": format_datetime_like_api(run.logical_date), + "queued_at": format_datetime_like_api(run.queued_at), + "run_after": format_datetime_like_api(run.run_after), + "start_date": format_datetime_like_api(run.start_date), + "end_date": format_datetime_like_api(run.end_date), "duration": run.duration, - "data_interval_start": from_datetime_to_zulu_without_ms(run.data_interval_start), - "data_interval_end": from_datetime_to_zulu_without_ms(run.data_interval_end), - "last_scheduling_decision": ( - from_datetime_to_zulu(run.last_scheduling_decision) if run.last_scheduling_decision else None - ), + "data_interval_start": format_datetime_like_api(run.data_interval_start), + "data_interval_end": format_datetime_like_api(run.data_interval_end), + "last_scheduling_decision": format_datetime_like_api(run.last_scheduling_decision), "run_type": run.run_type, "state": run.state, "triggered_by": run.triggered_by.value, @@ -466,26 +488,30 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): ( DAG1_ID, { - "end_date_gte": START_DATE2.isoformat(), + "end_date_gte": START_DATE2.isoformat(), # 2024-04-15 "end_date_lte": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(), }, - [DAG1_RUN1_ID, DAG1_RUN2_ID], + # DAG1 runs have end_date based on START_DATE1 (2024-01-15), so all < 2024-04-15 + [], ), ( DAG1_ID, { - "end_date_gt": START_DATE2.isoformat(), + "end_date_gt": START_DATE2.isoformat(), # 2024-04-15 "end_date_lt": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(), }, - [DAG1_RUN1_ID, DAG1_RUN2_ID], + # DAG1 runs have end_date based on START_DATE1 (2024-01-15), so all < 2024-04-15 + [], ), ( DAG1_ID, { - "end_date_gt": (START_DATE2 - timedelta(days=1)).isoformat(), + "end_date_gt": ( + START_DATE1 + timedelta(seconds=50) + ).isoformat(), # Between the two end dates "end_date_lt": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(), }, - [DAG1_RUN1_ID, DAG1_RUN2_ID], + [DAG1_RUN1_ID, DAG1_RUN2_ID], # Both should match as their end_date > start + 50s ), ( DAG1_ID, @@ -654,12 +680,12 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): [DAG1_RUN1_ID, DAG2_RUN2_ID], ), # Test between 100 and 150 (inclusive) # Test conf_contains filter - ("~", {"conf_contains": '"env": "production"'}, [DAG1_RUN1_ID]), # Test for "production" env + ("~", {"conf_contains": '"env": "development"'}, [DAG1_RUN1_ID]), # Test for "development" env ( "~", {"conf_contains": '"debug": true'}, [DAG1_RUN2_ID], - ), # Test for debug flag (note JSON true is lowercase) + ), # Test for debug flag ("~", {"conf_contains": "version"}, [DAG1_RUN1_ID]), # Test for the key "version" ("~", {"conf_contains": "nonexistent_key"}, []), # Test for a key that doesn't exist ], @@ -940,10 +966,12 @@ def test_bad_limit_and_offset(self, test_client, post_body, expected_detail): ), ( { - "end_date_gte": START_DATE2.isoformat(), + "end_date_gte": START_DATE2.isoformat(), # 2024-04-15 "end_date_lte": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(), }, - DAG_RUNS_LIST, + # Only DAG2 runs match: their start_date is 2024-04-15, so end_date >= 2024-04-15 + # DAG1 runs have start_date 2024-01-15, so end_date < 2024-04-15 + [DAG2_RUN1_ID, DAG2_RUN2_ID], ), ( { From 5ce31e60ab081070d4a7fe39137605f5f907df9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=96=86=E5=AE=B8?= Date: Wed, 24 Sep 2025 10:01:33 -0400 Subject: [PATCH 6/9] fix(tests): Simplify conf_contains filter test patterns --- .../unit/api_fastapi/core_api/routes/public/test_dag_run.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index eef50affcb2fa..c368b7f32b587 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -680,12 +680,12 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): [DAG1_RUN1_ID, DAG2_RUN2_ID], ), # Test between 100 and 150 (inclusive) # Test conf_contains filter - ("~", {"conf_contains": '"env": "development"'}, [DAG1_RUN1_ID]), # Test for "development" env + ("~", {"conf_contains": "development"}, [DAG1_RUN1_ID]), # Test for "development" env ( "~", - {"conf_contains": '"debug": true'}, + {"conf_contains": "debug"}, [DAG1_RUN2_ID], - ), # Test for debug flag + ), # Test for debug key ("~", {"conf_contains": "version"}, [DAG1_RUN1_ID]), # Test for the key "version" ("~", {"conf_contains": "nonexistent_key"}, []), # Test for a key that doesn't exist ], From 99662f7dfd9c79ee5a0fae532791182790d7ad86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=96=86=E5=AE=B8?= Date: Wed, 24 Sep 2025 13:21:18 -0400 Subject: [PATCH 7/9] test: using int for testing duration --- .../core_api/routes/public/test_dag_run.py | 50 ++++++------------- 1 file changed, 14 insertions(+), 36 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index c368b7f32b587..4f36edd1550fe 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -114,7 +114,7 @@ def setup(request, dag_maker, session=None): dag_run1.triggering_user_name = "alice_admin" dag_run1.note = (DAG1_RUN1_NOTE, "not_test") # Set end_date for testing duration filter - dag_run1.end_date = dag_run1.start_date + timedelta(seconds=100.5) + dag_run1.end_date = dag_run1.start_date + timedelta(seconds=101) # Set conf for testing conf_contains filter (values ordered for predictable sorting) dag_run1.conf = {"env": "development", "version": "1.0"} @@ -135,7 +135,7 @@ def setup(request, dag_maker, session=None): # Set triggering_user_name for testing dag_run2.triggering_user_name = "bob_service" # Set end_date for testing duration filter - dag_run2.end_date = dag_run2.start_date + timedelta(seconds=200.75) + dag_run2.end_date = dag_run2.start_date + timedelta(seconds=201) # Set conf for testing conf_contains filter dag_run2.conf = {"env": "production", "debug": True} @@ -160,7 +160,7 @@ def setup(request, dag_maker, session=None): # Set triggering_user_name for testing dag_run3.triggering_user_name = "service_account" # Set end_date for testing duration filter - dag_run3.end_date = dag_run3.start_date + timedelta(seconds=50.25) + dag_run3.end_date = dag_run3.start_date + timedelta(seconds=51) # Set conf for testing conf_contains filter dag_run3.conf = {"env": "staging", "test_mode": True} @@ -174,7 +174,7 @@ def setup(request, dag_maker, session=None): # Leave triggering_user_name as None for testing dag_run4.triggering_user_name = None # Set end_date for testing duration filter - dag_run4.end_date = dag_run4.start_date + timedelta(seconds=150.0) + dag_run4.end_date = dag_run4.start_date + timedelta(seconds=150) # Set conf for testing conf_contains filter dag_run4.conf = {"env": "testing", "mode": "ci"} @@ -194,45 +194,23 @@ def get_dag_versions_dict(dag_versions: list[DagVersion]) -> list[dict]: ] -def format_datetime_like_api(dt: datetime | None) -> str | None: - """ - Format datetime to match API output behavior. - - The API serializes datetimes with microseconds only when they are non-zero: - - 2024-01-15T00:00:00Z (for times with zero microseconds) - - 2024-01-15T00:01:40.500000Z (for times with non-zero microseconds) - - This function ensures test expectations match the actual API behavior. - If API datetime serialization changes in the future, only this function needs updating. - - Args: - dt: datetime object or None - - Returns: - ISO 8601 formatted string with conditional microseconds, or None - """ - if dt is None: - return None - if dt.microsecond == 0: - return from_datetime_to_zulu_without_ms(dt) - return from_datetime_to_zulu(dt) - - def get_dag_run_dict(run: DagRun): return { "bundle_version": None, "dag_display_name": run.dag_model.dag_display_name, "dag_run_id": run.run_id, "dag_id": run.dag_id, - "logical_date": format_datetime_like_api(run.logical_date), - "queued_at": format_datetime_like_api(run.queued_at), - "run_after": format_datetime_like_api(run.run_after), - "start_date": format_datetime_like_api(run.start_date), - "end_date": format_datetime_like_api(run.end_date), + "logical_date": from_datetime_to_zulu_without_ms(run.logical_date), + "queued_at": from_datetime_to_zulu(run.queued_at) if run.queued_at else None, + "run_after": from_datetime_to_zulu_without_ms(run.run_after), + "start_date": from_datetime_to_zulu_without_ms(run.start_date), + "end_date": from_datetime_to_zulu_without_ms(run.end_date), "duration": run.duration, - "data_interval_start": format_datetime_like_api(run.data_interval_start), - "data_interval_end": format_datetime_like_api(run.data_interval_end), - "last_scheduling_decision": format_datetime_like_api(run.last_scheduling_decision), + "data_interval_start": from_datetime_to_zulu_without_ms(run.data_interval_start), + "data_interval_end": from_datetime_to_zulu_without_ms(run.data_interval_end), + "last_scheduling_decision": ( + from_datetime_to_zulu(run.last_scheduling_decision) if run.last_scheduling_decision else None + ), "run_type": run.run_type, "state": run.state, "triggered_by": run.triggered_by.value, From 232baf22aca35cbc39f48ce5b211ca5d9c19646b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=96=86=E5=AE=B8?= Date: Thu, 25 Sep 2025 04:03:48 -0400 Subject: [PATCH 8/9] fix(models): Make DagRun.duration property SQLite-compatible --- .../openapi/v2-rest-api-generated.yaml | 56 +++++++++---------- .../core_api/routes/public/dag_run.py | 2 +- airflow-core/src/airflow/models/dagrun.py | 8 ++- .../ui/openapi-gen/queries/ensureQueryData.ts | 8 +-- .../ui/openapi-gen/queries/prefetch.ts | 8 +-- .../airflow/ui/openapi-gen/queries/queries.ts | 8 +-- .../ui/openapi-gen/queries/suspense.ts | 8 +-- .../ui/openapi-gen/requests/services.gen.ts | 16 +++--- 8 files changed, 60 insertions(+), 54 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index c24ed0782fb13..db3cb4858e064 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2177,74 +2177,74 @@ paths: format: date-time - type: 'null' title: End Date Lt - - name: updated_at_gte + - name: duration_gte in: query required: false schema: anyOf: - - type: string - format: date-time + - type: number - type: 'null' - title: Updated At Gte - - name: updated_at_gt + title: Duration Gte + - name: duration_gt in: query required: false schema: anyOf: - - type: string - format: date-time + - type: number - type: 'null' - title: Updated At Gt - - name: updated_at_lte + title: Duration Gt + - name: duration_lte in: query required: false schema: anyOf: - - type: string - format: date-time + - type: number - type: 'null' - title: Updated At Lte - - name: updated_at_lt + title: Duration Lte + - name: duration_lt in: query required: false schema: anyOf: - - type: string - format: date-time + - type: number - type: 'null' - title: Updated At Lt - - name: duration_gte + title: Duration Lt + - name: updated_at_gte in: query required: false schema: anyOf: - - type: number + - type: string + format: date-time - type: 'null' - title: Duration Gte - - name: duration_gt + title: Updated At Gte + - name: updated_at_gt in: query required: false schema: anyOf: - - type: number + - type: string + format: date-time - type: 'null' - title: Duration Gt - - name: duration_lte + title: Updated At Gt + - name: updated_at_lte in: query required: false schema: anyOf: - - type: number + - type: string + format: date-time - type: 'null' - title: Duration Lte - - name: duration_lt + title: Updated At Lte + - name: updated_at_lt in: query required: false schema: anyOf: - - type: number + - type: string + format: date-time - type: 'null' - title: Duration Lt + title: Updated At Lt - name: conf_contains in: query required: false diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 3aba4b78bde92..fa1b0780519c0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -319,8 +319,8 @@ def get_dag_runs( logical_date: Annotated[RangeFilter, Depends(datetime_range_filter_factory("logical_date", DagRun))], start_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("start_date", DagRun))], end_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("end_date", DagRun))], - update_at_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("updated_at", DagRun))], duration_range: Annotated[RangeFilter, Depends(float_range_filter_factory("duration", DagRun))], + update_at_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("updated_at", DagRun))], conf_contains: Annotated[ FilterParam[str], Depends(filter_param_factory(DagRun.conf, str, FilterOptionEnum.CONTAINS, "conf_contains")), diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index ec7dff2ba71df..ab1867d1458e7 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -396,9 +396,15 @@ def duration(cls, session: Session = NEW_SESSION) -> Case: if dialect_name == "mysql": return func.timestampdiff(text("SECOND"), cls.start_date, cls.end_date) + if dialect_name == "sqlite": + duration_expr = (func.julianday(cls.end_date) - func.julianday(cls.start_date)) * 86400 + + else: + duration_expr = func.extract("epoch", cls.end_date - cls.start_date) + when_condition = ( (cls.end_date != None) & (cls.start_date != None), # noqa: E711 - func.extract("epoch", cls.end_date - cls.start_date), + duration_expr, ) return case(when_condition, else_=None) diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 072c3036977c5..1b82a78ba6627 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -281,14 +281,14 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu * @param data.endDateGt * @param data.endDateLte * @param data.endDateLt -* @param data.updatedAtGte -* @param data.updatedAtGt -* @param data.updatedAtLte -* @param data.updatedAtLt * @param data.durationGte * @param data.durationGt * @param data.durationLte * @param data.durationLt +* @param data.updatedAtGte +* @param data.updatedAtGt +* @param data.updatedAtLte +* @param data.updatedAtLt * @param data.confContains * @param data.runType * @param data.state diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 8f0b5af4ed45f..14bfa4489c7bd 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -281,14 +281,14 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer * @param data.endDateGt * @param data.endDateLte * @param data.endDateLt -* @param data.updatedAtGte -* @param data.updatedAtGt -* @param data.updatedAtLte -* @param data.updatedAtLt * @param data.durationGte * @param data.durationGt * @param data.durationLte * @param data.durationLt +* @param data.updatedAtGte +* @param data.updatedAtGt +* @param data.updatedAtLte +* @param data.updatedAtLt * @param data.confContains * @param data.runType * @param data.state diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 8bfa70450c4e5..f8ddf3baf325a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -281,14 +281,14 @@ export const useDagRunServiceGetUpstreamAssetEvents = Date: Fri, 26 Sep 2025 10:57:37 -0400 Subject: [PATCH 9/9] fix(i18n): add duration translation keys --- .../src/airflow/ui/public/i18n/locales/en/common.json | 2 ++ airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index b0a5fbdbbf23f..8622fb6a0fc7e 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -101,6 +101,8 @@ }, "filter": "Filter", "filters": { + "durationFrom": "Duration From", + "durationTo": "Duration To", "logicalDateFrom": "Logical Date From", "logicalDateTo": "Logical Date To", "runAfterFrom": "Run After From", diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx index 1210f094107ff..4bcf04088a9e4 100644 --- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx +++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx @@ -94,13 +94,13 @@ export const useFilterConfigs = () => { }, [SearchParamsKeys.DURATION_GTE]: { icon: , - label: translate("common:filters.durationFrom", "Duration From"), // To do: add translation key and remove default value + label: translate("common:filters.durationFrom"), min: 0, type: FilterTypes.NUMBER, }, [SearchParamsKeys.DURATION_LTE]: { icon: , - label: translate("common:filters.durationTo", "Duration To"), // To do: add translation key and remove default value + label: translate("common:filters.durationTo"), min: 0, type: FilterTypes.NUMBER, },