diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index b33dffc971456..e905b99432a68 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -309,6 +309,7 @@ class FilterOptionEnum(Enum): ANY_EQUAL = "any_eq" ALL_EQUAL = "all_eq" IS_NONE = "is_none" + CONTAINS = "contains" class FilterParam(BaseParam[T]): @@ -366,6 +367,13 @@ def to_orm(self, select: Select) -> Select: return select.where(self.attribute.is_not(None)) if self.value is True: return select.where(self.attribute.is_(None)) + if self.filter_option == FilterOptionEnum.CONTAINS: + # For JSON/JSONB columns, convert to text before applying LIKE + from sqlalchemy import Text, cast + + if str(self.attribute.type).upper() in ("JSON", "JSONB"): + return select.where(cast(self.attribute, Text).contains(self.value)) + return select.where(self.attribute.contains(self.value)) raise ValueError(f"Invalid filter option {self.filter_option} for value {self.value}") @classmethod diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index ed7aac2ae000f..acfa5547a50bf 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -180,3 +180,10 @@ class DAGRunsBatchBody(StrictBaseModel): end_date_gt: AwareDatetime | None = None end_date_lte: AwareDatetime | None = None end_date_lt: AwareDatetime | None = None + + duration_gte: float | None = None + duration_gt: float | None = None + duration_lte: float | None = None + duration_lt: float | None = None + + conf_contains: str | None = None diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 90cfc65b77289..db3cb4858e064 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -2177,6 +2177,38 @@ paths: format: date-time - type: 'null' title: End Date Lt + - name: duration_gte + in: query + required: false + schema: + anyOf: + - type: number + - type: 'null' + title: Duration Gte + - name: duration_gt + in: query + required: false + schema: + anyOf: + - type: number + - type: 'null' + title: Duration Gt + - name: duration_lte + in: query + required: false + schema: + anyOf: + - type: number + - type: 'null' + title: Duration Lte + - name: duration_lt + in: query + required: false + schema: + anyOf: + - type: number + - type: 'null' + title: Duration Lt - name: updated_at_gte in: query required: false @@ -2213,6 +2245,12 @@ paths: format: date-time - type: 'null' title: Updated At Lt + - name: conf_contains + in: query + required: false + schema: + type: string + title: Conf Contains - name: run_type in: query required: false @@ -10474,6 +10512,31 @@ components: format: date-time - type: 'null' title: End Date Lt + duration_gte: + anyOf: + - type: number + - type: 'null' + title: Duration Gte + duration_gt: + anyOf: + - type: number + - type: 'null' + title: Duration Gt + duration_lte: + anyOf: + - type: number + - type: 'null' + title: Duration Lte + duration_lt: + anyOf: + - type: number + - type: 'null' + title: Duration Lt + conf_contains: + anyOf: + - type: string + - type: 'null' + title: Conf Contains additionalProperties: false type: object title: DAGRunsBatchBody diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 0aa4be0997901..fa1b0780519c0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -51,6 +51,8 @@ SortParam, _SearchParam, datetime_range_filter_factory, + filter_param_factory, + float_range_filter_factory, search_param_factory, ) from airflow.api_fastapi.common.router import AirflowRouter @@ -317,7 +319,12 @@ def get_dag_runs( logical_date: Annotated[RangeFilter, Depends(datetime_range_filter_factory("logical_date", DagRun))], start_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("start_date", DagRun))], end_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("end_date", DagRun))], + duration_range: Annotated[RangeFilter, Depends(float_range_filter_factory("duration", DagRun))], update_at_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("updated_at", DagRun))], + conf_contains: Annotated[ + FilterParam[str], + Depends(filter_param_factory(DagRun.conf, str, FilterOptionEnum.CONTAINS, "conf_contains")), + ], run_type: QueryDagRunRunTypesFilter, state: QueryDagRunStateFilter, dag_version: QueryDagRunVersionFilter, @@ -376,6 +383,8 @@ def get_dag_runs( start_date_range, end_date_range, update_at_range, + duration_range, + conf_contains, state, run_type, dag_version, @@ -565,6 +574,16 @@ def get_list_dag_runs_batch( ), attribute=DagRun.end_date, ) + duration = RangeFilter( + Range( + lower_bound_gte=body.duration_gte, + lower_bound_gt=body.duration_gt, + upper_bound_lte=body.duration_lte, + upper_bound_lt=body.duration_lt, + ), + attribute=DagRun.duration, + ) + conf_contains = FilterParam(DagRun.conf, body.conf_contains, FilterOptionEnum.CONTAINS) state = FilterParam(DagRun.state, body.states, FilterOptionEnum.ANY_EQUAL) offset = OffsetFilter(body.page_offset) @@ -590,7 +609,17 @@ def get_list_dag_runs_batch( base_query = select(DagRun).options(joinedload(DagRun.dag_model)) dag_runs_select, total_entries = paginated_select( statement=base_query, - filters=[dag_ids, logical_date, run_after, start_date, end_date, state, readable_dag_runs_filter], + filters=[ + dag_ids, + logical_date, + run_after, + start_date, + end_date, + duration, + conf_contains, + state, + readable_dag_runs_filter, + ], order_by=order_by, offset=offset, limit=limit, diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index ec7dff2ba71df..ab1867d1458e7 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -396,9 +396,15 @@ def duration(cls, session: Session = NEW_SESSION) -> Case: if dialect_name == "mysql": return func.timestampdiff(text("SECOND"), cls.start_date, cls.end_date) + if dialect_name == "sqlite": + duration_expr = (func.julianday(cls.end_date) - func.julianday(cls.start_date)) * 86400 + + else: + duration_expr = func.extract("epoch", cls.end_date - cls.start_date) + when_condition = ( (cls.end_date != None) & (cls.start_date != None), # noqa: E711 - func.extract("epoch", cls.end_date - cls.start_date), + duration_expr, ) return case(when_condition, else_=None) diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 64d1234171f04..02f4060a7d47b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -143,10 +143,15 @@ export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId }: export type DagRunServiceGetDagRunsDefaultResponse = Awaited>; export type DagRunServiceGetDagRunsQueryResult = UseQueryResult; export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns"; -export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const UseDagRunServiceGetDagRunsKeyFn = ({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -174,7 +179,7 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, dagIdPattern, dagVersio updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])]; +}, queryKey?: Array) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])]; export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited>; export type DagRunServiceWaitDagRunUntilFinishedQueryResult = UseQueryResult; export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index dd15b40d9d53e..1b82a78ba6627 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -281,10 +281,15 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu * @param data.endDateGt * @param data.endDateLte * @param data.endDateLt +* @param data.durationGte +* @param data.durationGt +* @param data.durationLte +* @param data.durationLt * @param data.updatedAtGte * @param data.updatedAtGt * @param data.updatedAtLte * @param data.updatedAtLt +* @param data.confContains * @param data.runType * @param data.state * @param data.dagVersion @@ -295,10 +300,15 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -326,7 +336,7 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); +}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 6b5a229359059..14bfa4489c7bd 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -281,10 +281,15 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer * @param data.endDateGt * @param data.endDateLte * @param data.endDateLt +* @param data.durationGte +* @param data.durationGt +* @param data.durationLte +* @param data.durationLt * @param data.updatedAtGte * @param data.updatedAtGt * @param data.updatedAtLte * @param data.updatedAtLt +* @param data.confContains * @param data.runType * @param data.state * @param data.dagVersion @@ -295,10 +300,15 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer * @returns DAGRunCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -326,7 +336,7 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d updatedAtGte?: string; updatedAtLt?: string; updatedAtLte?: string; -}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); +}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 7f1bd242e4506..f8ddf3baf325a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -281,10 +281,15 @@ export const useDagRunServiceGetUpstreamAssetEvents = = unknown[]>({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const useDagRunServiceGetDagRuns = = unknown[]>({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -326,7 +336,7 @@ export const useDagRunServiceGetDagRuns = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 94f156302c7e3..7cd2bd43da1f7 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -281,10 +281,15 @@ export const useDagRunServiceGetUpstreamAssetEventsSuspense = = unknown[]>({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { +export const useDagRunServiceGetDagRunsSuspense = = unknown[]>({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: { + confContains?: string; dagId: string; dagIdPattern?: string; dagVersion?: number[]; + durationGt?: number; + durationGte?: number; + durationLt?: number; + durationLte?: number; endDateGt?: string; endDateGte?: string; endDateLt?: string; @@ -326,7 +336,7 @@ export const useDagRunServiceGetDagRunsSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options }); /** * Experimental: Wait for a dag run to complete, and return task results if requested. * 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 36273f8818adc..a00c346261c35 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2777,6 +2777,61 @@ export const $DAGRunsBatchBody = { } ], title: 'End Date Lt' + }, + duration_gte: { + anyOf: [ + { + type: 'number' + }, + { + type: 'null' + } + ], + title: 'Duration Gte' + }, + duration_gt: { + anyOf: [ + { + type: 'number' + }, + { + type: 'null' + } + ], + title: 'Duration Gt' + }, + duration_lte: { + anyOf: [ + { + type: 'number' + }, + { + type: 'null' + } + ], + title: 'Duration Lte' + }, + duration_lt: { + anyOf: [ + { + type: 'number' + }, + { + type: 'null' + } + ], + title: 'Duration Lt' + }, + conf_contains: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Conf Contains' } }, additionalProperties: false, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index b4f58a096a68d..1d4f3de2d8d88 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -992,10 +992,15 @@ export class DagRunService { * @param data.endDateGt * @param data.endDateLte * @param data.endDateLt + * @param data.durationGte + * @param data.durationGt + * @param data.durationLte + * @param data.durationLt * @param data.updatedAtGte * @param data.updatedAtGt * @param data.updatedAtLte * @param data.updatedAtLt + * @param data.confContains * @param data.runType * @param data.state * @param data.dagVersion @@ -1032,10 +1037,15 @@ export class DagRunService { end_date_gt: data.endDateGt, end_date_lte: data.endDateLte, end_date_lt: data.endDateLt, + duration_gte: data.durationGte, + duration_gt: data.durationGt, + duration_lte: data.durationLte, + duration_lt: data.durationLt, updated_at_gte: data.updatedAtGte, updated_at_gt: data.updatedAtGt, updated_at_lte: data.updatedAtLte, updated_at_lt: data.updatedAtLt, + conf_contains: data.confContains, run_type: data.runType, state: data.state, dag_version: data.dagVersion, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 58a082807a234..4deeac4226abd 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -691,6 +691,11 @@ export type DAGRunsBatchBody = { end_date_gt?: string | null; end_date_lte?: string | null; end_date_lt?: string | null; + duration_gte?: number | null; + duration_gt?: number | null; + duration_lte?: number | null; + duration_lt?: number | null; + conf_contains?: string | null; }; /** @@ -2272,12 +2277,17 @@ export type ClearDagRunData = { export type ClearDagRunResponse = TaskInstanceCollectionResponse | DAGRunResponse; export type GetDagRunsData = { + confContains?: string; dagId: string; /** * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. */ dagIdPattern?: string | null; dagVersion?: Array<(number)>; + durationGt?: number | null; + durationGte?: number | null; + durationLt?: number | null; + durationLte?: number | null; endDateGt?: string | null; endDateGte?: string | null; endDateLt?: string | null; diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 111a3250704dc..8622fb6a0fc7e 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -64,7 +64,7 @@ "runType": "Run Type", "sourceAssetEvent": "Source Asset Event", "triggeredBy": "Triggered By", - "triggeringUser": "Triggering User Name" + "triggeringUser": "Triggering User" }, "dagRun_one": "Dag Run", "dagRun_other": "Dag Runs", @@ -101,6 +101,8 @@ }, "filter": "Filter", "filters": { + "durationFrom": "Duration From", + "durationTo": "Duration To", "logicalDateFrom": "Logical Date From", "logicalDateTo": "Logical Date To", "runAfterFrom": "Run After From", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json index 1367ac6a929f8..6705e10196206 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json @@ -20,8 +20,7 @@ "all": "All", "paused": "Paused" }, - "runIdPatternFilter": "Search Dag Runs", - "triggeringUserNameFilter": "Search by Triggering User" + "runIdPatternFilter": "Search Dag Runs" }, "ownerLink": "Owner link for {{owner}}", "runAndTaskActions": { diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx index 8c83a75b84dc5..4bcf04088a9e4 100644 --- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx +++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx @@ -16,15 +16,27 @@ * specific language governing permissions and limitations * under the License. */ +import { Flex } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { FiBarChart, FiUser } from "react-icons/fi"; import { LuBrackets } from "react-icons/lu"; -import { MdDateRange, MdSearch } from "react-icons/md"; +import { + MdDateRange, + MdSearch, + MdHistory, + MdHourglassEmpty, + MdCode, + MdPlayArrow, + MdCheckCircle, +} from "react-icons/md"; +import type { DagRunState, DagRunType } from "openapi/requests/types.gen"; import { DagIcon } from "src/assets/DagIcon"; import { TaskIcon } from "src/assets/TaskIcon"; import type { FilterConfig } from "src/components/FilterBar"; +import { RunTypeIcon } from "src/components/RunTypeIcon"; import { StateBadge } from "src/components/StateBadge"; +import { dagRunStateOptions, dagRunTypeOptions } from "src/constants/stateOptions"; import { SearchParamsKeys } from "./searchParams"; @@ -36,7 +48,7 @@ export enum FilterTypes { } export const useFilterConfigs = () => { - const { t: translate } = useTranslation(["browse", "common", "admin", "hitl"]); + const { t: translate } = useTranslation(["browse", "common", "admin", "hitl", "dagRun"]); const filterConfigMap = { [SearchParamsKeys.AFTER]: { @@ -49,6 +61,12 @@ export const useFilterConfigs = () => { label: translate("common:table.to"), type: FilterTypes.DATE, }, + [SearchParamsKeys.CONF_CONTAINS]: { + hotkeyDisabled: true, + icon: , + label: translate("common:dagRun.conf"), + type: FilterTypes.TEXT, + }, [SearchParamsKeys.DAG_DISPLAY_NAME_PATTERN]: { hotkeyDisabled: true, icon: , @@ -61,6 +79,31 @@ export const useFilterConfigs = () => { label: translate("common:dagId"), type: FilterTypes.TEXT, }, + [SearchParamsKeys.DAG_ID_PATTERN]: { + hotkeyDisabled: true, + icon: , + label: translate("common:dagId"), + type: FilterTypes.TEXT, + }, + [SearchParamsKeys.DAG_VERSION]: { + hotkeyDisabled: true, + icon: , + label: translate("common:dagRun.dagVersions"), + min: 1, + type: FilterTypes.NUMBER, + }, + [SearchParamsKeys.DURATION_GTE]: { + icon: , + label: translate("common:filters.durationFrom"), + min: 0, + type: FilterTypes.NUMBER, + }, + [SearchParamsKeys.DURATION_LTE]: { + icon: , + label: translate("common:filters.durationTo"), + min: 0, + type: FilterTypes.NUMBER, + }, [SearchParamsKeys.END_DATE]: { icon: , label: translate("common:table.to"), @@ -129,11 +172,42 @@ export const useFilterConfigs = () => { label: translate("common:runId"), type: FilterTypes.TEXT, }, + [SearchParamsKeys.RUN_TYPE]: { + icon: , + label: translate("common:dagRun.runType"), + options: dagRunTypeOptions.items.map((option) => ({ + label: + option.value === "all" ? ( + translate(option.label) + ) : ( + + + {translate(option.label)} + + ), + value: option.value, + })), + type: FilterTypes.SELECT, + }, [SearchParamsKeys.START_DATE]: { icon: , label: translate("common:table.from"), type: FilterTypes.DATE, }, + [SearchParamsKeys.STATE]: { + icon: , + label: translate("common:state"), + options: dagRunStateOptions.items.map((option) => ({ + label: + option.value === "all" ? ( + translate(option.label) + ) : ( + {translate(option.label)} + ), + value: option.value, + })), + type: FilterTypes.SELECT, + }, [SearchParamsKeys.TASK_ID]: { hotkeyDisabled: true, icon: , @@ -146,6 +220,12 @@ export const useFilterConfigs = () => { label: translate("common:taskId"), type: FilterTypes.TEXT, }, + [SearchParamsKeys.TRIGGERING_USER_NAME_PATTERN]: { + hotkeyDisabled: true, + icon: , + label: translate("common:dagRun.triggeringUser"), + type: FilterTypes.TEXT, + }, [SearchParamsKeys.TRY_NUMBER]: { label: translate("common:tryNumber"), min: 1, diff --git a/airflow-core/src/airflow/ui/src/constants/searchParams.ts b/airflow-core/src/airflow/ui/src/constants/searchParams.ts index cebc3d074ee0b..8ed9e3dd5b3c7 100644 --- a/airflow-core/src/airflow/ui/src/constants/searchParams.ts +++ b/airflow-core/src/airflow/ui/src/constants/searchParams.ts @@ -19,10 +19,14 @@ export enum SearchParamsKeys { AFTER = "after", BEFORE = "before", + CONF_CONTAINS = "conf_contains", DAG_DISPLAY_NAME_PATTERN = "dag_display_name_pattern", DAG_ID = "dag_id", DAG_ID_PATTERN = "dag_id_pattern", + DAG_VERSION = "dag_version", DEPENDENCIES = "dependencies", + DURATION_GTE = "duration_gte", + DURATION_LTE = "duration_lte", END_DATE = "end_date", EVENT_TYPE = "event_type", EXCLUDED_EVENTS = "excluded_events", diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx index 792e42054efb1..f0917e971862e 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx @@ -1,5 +1,3 @@ -/* eslint-disable max-lines */ - /*! * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,15 +16,14 @@ * specific language governing permissions and limitations * under the License. */ -import { Flex, HStack, Link, type SelectValueChangeDetails, Text, Box } from "@chakra-ui/react"; +import { Flex, HStack, Link, Text } from "@chakra-ui/react"; import type { ColumnDef } from "@tanstack/react-table"; import type { TFunction } from "i18next"; -import { useCallback } from "react"; import { useTranslation } from "react-i18next"; import { Link as RouterLink, useParams, useSearchParams } from "react-router-dom"; import { useDagRunServiceGetDagRuns } from "openapi/queries"; -import type { DAGRunResponse, DagRunState, DagRunType } from "openapi/requests/types.gen"; +import type { DAGRunResponse } from "openapi/requests/types.gen"; import { ClearRunButton } from "src/components/Clear"; import { DagVersion } from "src/components/DagVersion"; import { DataTable } from "src/components/DataTable"; @@ -36,20 +33,24 @@ import { LimitedItemsList } from "src/components/LimitedItemsList"; import { MarkRunAsButton } from "src/components/MarkAs"; import RenderedJsonField from "src/components/RenderedJsonField"; import { RunTypeIcon } from "src/components/RunTypeIcon"; -import { SearchBar } from "src/components/SearchBar"; import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { TruncatedText } from "src/components/TruncatedText"; -import { Select } from "src/components/ui"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; -import { dagRunTypeOptions, dagRunStateOptions as stateOptions } from "src/constants/stateOptions"; +import { DagRunsFilters } from "src/pages/DagRunsFilters"; import DeleteRunButton from "src/pages/DeleteRunButton"; import { renderDuration, useAutoRefresh, isStatePending } from "src/utils"; type DagRunRow = { row: { original: DAGRunResponse } }; const { + CONF_CONTAINS: CONF_CONTAINS_PARAM, DAG_ID_PATTERN: DAG_ID_PATTERN_PARAM, + DAG_VERSION: DAG_VERSION_PARAM, + DURATION_GTE: DURATION_GTE_PARAM, + DURATION_LTE: DURATION_LTE_PARAM, END_DATE: END_DATE_PARAM, + RUN_AFTER_GTE: RUN_AFTER_GTE_PARAM, + RUN_AFTER_LTE: RUN_AFTER_LTE_PARAM, RUN_ID_PATTERN: RUN_ID_PATTERN_PARAM, RUN_TYPE: RUN_TYPE_PARAM, START_DATE: START_DATE_PARAM, @@ -178,7 +179,7 @@ const runColumns = (translate: TFunction, dagId?: string): Array { const { t: translate } = useTranslation(); const { dagId } = useParams(); - const [searchParams, setSearchParams] = useSearchParams(); + const [searchParams] = useSearchParams(); const { setTableURLState, tableURLState } = useTableURLState({ columnVisibility: { @@ -197,19 +198,32 @@ export const DagRuns = () => { const filteredRunIdPattern = searchParams.get(RUN_ID_PATTERN_PARAM); const filteredTriggeringUserNamePattern = searchParams.get(TRIGGERING_USER_NAME_PATTERN_PARAM); const filteredDagIdPattern = searchParams.get(DAG_ID_PATTERN_PARAM); + const filteredDagVersion = searchParams.get(DAG_VERSION_PARAM); const startDate = searchParams.get(START_DATE_PARAM); const endDate = searchParams.get(END_DATE_PARAM); + const runAfterGte = searchParams.get(RUN_AFTER_GTE_PARAM); + const runAfterLte = searchParams.get(RUN_AFTER_LTE_PARAM); + const durationGte = searchParams.get(DURATION_GTE_PARAM); + const durationLte = searchParams.get(DURATION_LTE_PARAM); + const confContains = searchParams.get(CONF_CONTAINS_PARAM); const refetchInterval = useAutoRefresh({}); const { data, error, isLoading } = useDagRunServiceGetDagRuns( { + confContains: confContains !== null && confContains !== "" ? confContains : undefined, dagId: dagId ?? "~", dagIdPattern: filteredDagIdPattern ?? undefined, + dagVersion: + filteredDagVersion !== null && filteredDagVersion !== "" ? [Number(filteredDagVersion)] : undefined, + durationGte: durationGte !== null && durationGte !== "" ? Number(durationGte) : undefined, + durationLte: durationLte !== null && durationLte !== "" ? Number(durationLte) : undefined, endDateLte: endDate ?? undefined, limit: pageSize, offset: pageIndex * pageSize, orderBy, + runAfterGte: runAfterGte ?? undefined, + runAfterLte: runAfterLte ?? undefined, runIdPattern: filteredRunIdPattern ?? undefined, runType: filteredType === null ? undefined : [filteredType], startDateGte: startDate ?? undefined, @@ -223,190 +237,9 @@ export const DagRuns = () => { }, ); - const handleStateChange = useCallback( - ({ value }: SelectValueChangeDetails) => { - const [val] = value; - - if (val === undefined || val === "all") { - searchParams.delete(STATE_PARAM); - } else { - searchParams.set(STATE_PARAM, val); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - - const handleTypeChange = useCallback( - ({ value }: SelectValueChangeDetails) => { - const [val] = value; - - if (val === undefined || val === "all") { - searchParams.delete(RUN_TYPE_PARAM); - } else { - searchParams.set(RUN_TYPE_PARAM, val); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - - const handleRunIdPatternChange = useCallback( - (value: string) => { - if (value === "") { - searchParams.delete(RUN_ID_PATTERN_PARAM); - } else { - searchParams.set(RUN_ID_PATTERN_PARAM, value); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - - const handleTriggeringUserNamePatternChange = useCallback( - (value: string) => { - if (value === "") { - searchParams.delete(TRIGGERING_USER_NAME_PATTERN_PARAM); - } else { - searchParams.set(TRIGGERING_USER_NAME_PATTERN_PARAM, value); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - - const handleDagIdPatternChange = useCallback( - (value: string) => { - if (value === "") { - searchParams.delete(DAG_ID_PATTERN_PARAM); - } else { - searchParams.set(DAG_ID_PATTERN_PARAM, value); - } - setTableURLState({ - pagination: { ...pagination, pageIndex: 0 }, - sorting, - }); - setSearchParams(searchParams); - }, - [pagination, searchParams, setSearchParams, setTableURLState, sorting], - ); - return ( <> - - {dagId === undefined && ( - - - - )} - - - - - - - - - - {() => - filteredState === null ? ( - translate("dags:filters.allStates") - ) : ( - - {translate(`common:states.${filteredState}`)} - - ) - } - - - - {stateOptions.items.map((option) => ( - - {option.value === "all" ? ( - translate(option.label) - ) : ( - {translate(option.label)} - )} - - ))} - - - - - - - {() => - filteredType === null ? ( - translate("dags:filters.allRunTypes") - ) : ( - - - {translate(`common:runTypes.${filteredType}`)} - - ) - } - - - - {dagRunTypeOptions.items.map((option) => ( - - {option.value === "all" ? ( - translate(option.label) - ) : ( - - - {translate(option.label)} - - )} - - ))} - - - + { + const searchParamKeys = useMemo((): Array => { + const keys: Array = [ + SearchParamsKeys.RUN_ID_PATTERN, + SearchParamsKeys.STATE, + SearchParamsKeys.RUN_TYPE, + SearchParamsKeys.START_DATE, + SearchParamsKeys.END_DATE, + SearchParamsKeys.RUN_AFTER_GTE, + SearchParamsKeys.RUN_AFTER_LTE, + SearchParamsKeys.DURATION_GTE, + SearchParamsKeys.DURATION_LTE, + SearchParamsKeys.CONF_CONTAINS, + SearchParamsKeys.TRIGGERING_USER_NAME_PATTERN, + SearchParamsKeys.DAG_VERSION, + ]; + + if (dagId === undefined) { + keys.unshift(SearchParamsKeys.DAG_ID_PATTERN); + } + + return keys; + }, [dagId]); + + const { filterConfigs, handleFiltersChange, searchParams } = useFiltersHandler(searchParamKeys); + + const initialValues = useMemo(() => { + const values: Record = {}; + + filterConfigs.forEach((config) => { + const value = searchParams.get(config.key); + + if (value !== null && value !== "") { + if (config.type === "number") { + const parsedValue = Number(value); + + values[config.key] = isNaN(parsedValue) ? value : parsedValue; + } else { + values[config.key] = value; + } + } + }); + + return values; + }, [searchParams, filterConfigs]); + + return ( + + + + ); +}; diff --git a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts index d3543228d94eb..53d5cece02949 100644 --- a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts +++ b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts @@ -27,8 +27,13 @@ import { SearchParamsKeys } from "src/constants/searchParams"; export type FilterableSearchParamsKeys = | SearchParamsKeys.AFTER | SearchParamsKeys.BEFORE + | SearchParamsKeys.CONF_CONTAINS | SearchParamsKeys.DAG_DISPLAY_NAME_PATTERN | SearchParamsKeys.DAG_ID + | SearchParamsKeys.DAG_ID_PATTERN + | SearchParamsKeys.DAG_VERSION + | SearchParamsKeys.DURATION_GTE + | SearchParamsKeys.DURATION_LTE | SearchParamsKeys.END_DATE | SearchParamsKeys.EVENT_TYPE | SearchParamsKeys.KEY_PATTERN @@ -40,9 +45,12 @@ export type FilterableSearchParamsKeys = | SearchParamsKeys.RUN_AFTER_LTE | SearchParamsKeys.RUN_ID | SearchParamsKeys.RUN_ID_PATTERN + | SearchParamsKeys.RUN_TYPE | SearchParamsKeys.START_DATE + | SearchParamsKeys.STATE | SearchParamsKeys.TASK_ID | SearchParamsKeys.TASK_ID_PATTERN + | SearchParamsKeys.TRIGGERING_USER_NAME_PATTERN | SearchParamsKeys.TRY_NUMBER | SearchParamsKeys.USER; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 11ef75674be3c..4f36edd1550fe 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -113,6 +113,10 @@ def setup(request, dag_maker, session=None): # Set triggering_user_name for testing dag_run1.triggering_user_name = "alice_admin" dag_run1.note = (DAG1_RUN1_NOTE, "not_test") + # Set end_date for testing duration filter + dag_run1.end_date = dag_run1.start_date + timedelta(seconds=101) + # Set conf for testing conf_contains filter (values ordered for predictable sorting) + dag_run1.conf = {"env": "development", "version": "1.0"} for i, task in enumerate([task1, task2], start=1): ti = dag_run1.get_task_instance(task_id=task.task_id) @@ -130,6 +134,10 @@ def setup(request, dag_maker, session=None): ) # Set triggering_user_name for testing dag_run2.triggering_user_name = "bob_service" + # Set end_date for testing duration filter + dag_run2.end_date = dag_run2.start_date + timedelta(seconds=201) + # Set conf for testing conf_contains filter + dag_run2.conf = {"env": "production", "debug": True} ti1 = dag_run2.get_task_instance(task_id=task1.task_id) ti1.task = task1 @@ -151,6 +159,10 @@ def setup(request, dag_maker, session=None): ) # Set triggering_user_name for testing dag_run3.triggering_user_name = "service_account" + # Set end_date for testing duration filter + dag_run3.end_date = dag_run3.start_date + timedelta(seconds=51) + # Set conf for testing conf_contains filter + dag_run3.conf = {"env": "staging", "test_mode": True} dag_run4 = dag_maker.create_dagrun( run_id=DAG2_RUN2_ID, @@ -161,6 +173,10 @@ def setup(request, dag_maker, session=None): ) # Leave triggering_user_name as None for testing dag_run4.triggering_user_name = None + # Set end_date for testing duration filter + dag_run4.end_date = dag_run4.start_date + timedelta(seconds=150) + # Set conf for testing conf_contains filter + dag_run4.conf = {"env": "testing", "mode": "ci"} dag_maker.sync_dagbag_to_db() dag_maker.dag_model.has_task_concurrency_limits = True @@ -188,7 +204,7 @@ def get_dag_run_dict(run: DagRun): "queued_at": from_datetime_to_zulu(run.queued_at) if run.queued_at else None, "run_after": from_datetime_to_zulu_without_ms(run.run_after), "start_date": from_datetime_to_zulu_without_ms(run.start_date), - "end_date": from_datetime_to_zulu(run.end_date), + "end_date": from_datetime_to_zulu_without_ms(run.end_date), "duration": run.duration, "data_interval_start": from_datetime_to_zulu_without_ms(run.data_interval_start), "data_interval_end": from_datetime_to_zulu_without_ms(run.data_interval_end), @@ -450,26 +466,30 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): ( DAG1_ID, { - "end_date_gte": START_DATE2.isoformat(), + "end_date_gte": START_DATE2.isoformat(), # 2024-04-15 "end_date_lte": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(), }, - [DAG1_RUN1_ID, DAG1_RUN2_ID], + # DAG1 runs have end_date based on START_DATE1 (2024-01-15), so all < 2024-04-15 + [], ), ( DAG1_ID, { - "end_date_gt": START_DATE2.isoformat(), + "end_date_gt": START_DATE2.isoformat(), # 2024-04-15 "end_date_lt": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(), }, - [DAG1_RUN1_ID, DAG1_RUN2_ID], + # DAG1 runs have end_date based on START_DATE1 (2024-01-15), so all < 2024-04-15 + [], ), ( DAG1_ID, { - "end_date_gt": (START_DATE2 - timedelta(days=1)).isoformat(), + "end_date_gt": ( + START_DATE1 + timedelta(seconds=50) + ).isoformat(), # Between the two end dates "end_date_lt": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(), }, - [DAG1_RUN1_ID, DAG1_RUN2_ID], + [DAG1_RUN1_ID, DAG1_RUN2_ID], # Both should match as their end_date > start + 50s ), ( DAG1_ID, @@ -629,6 +649,23 @@ def test_bad_limit_and_offset(self, test_client, query_params, expected_detail): {"dag_version": [1, 999]}, [DAG1_RUN1_ID, DAG1_RUN2_ID], ), # Multiple versions, only existing ones match + # Test duration filters + ("~", {"duration_gte": 200}, [DAG1_RUN2_ID]), # Test >= 200 seconds + ("~", {"duration_lt": 100}, [DAG2_RUN1_ID]), # Test < 100 seconds + ( + "~", + {"duration_gte": 100, "duration_lte": 150}, + [DAG1_RUN1_ID, DAG2_RUN2_ID], + ), # Test between 100 and 150 (inclusive) + # Test conf_contains filter + ("~", {"conf_contains": "development"}, [DAG1_RUN1_ID]), # Test for "development" env + ( + "~", + {"conf_contains": "debug"}, + [DAG1_RUN2_ID], + ), # Test for debug key + ("~", {"conf_contains": "version"}, [DAG1_RUN1_ID]), # Test for the key "version" + ("~", {"conf_contains": "nonexistent_key"}, []), # Test for a key that doesn't exist ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") @@ -907,10 +944,12 @@ def test_bad_limit_and_offset(self, test_client, post_body, expected_detail): ), ( { - "end_date_gte": START_DATE2.isoformat(), + "end_date_gte": START_DATE2.isoformat(), # 2024-04-15 "end_date_lte": (datetime.now(tz=timezone.utc) + timedelta(days=1)).isoformat(), }, - DAG_RUNS_LIST, + # Only DAG2 runs match: their start_date is 2024-04-15, so end_date >= 2024-04-15 + # DAG1 runs have start_date 2024-01-15, so end_date < 2024-04-15 + [DAG2_RUN1_ID, DAG2_RUN2_ID], ), ( { diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 6ea61b24cf85c..f460e6661461d 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1392,6 +1392,11 @@ class DAGRunsBatchBody(BaseModel): end_date_gt: Annotated[datetime | None, Field(title="End Date Gt")] = None end_date_lte: Annotated[datetime | None, Field(title="End Date Lte")] = None end_date_lt: Annotated[datetime | None, Field(title="End Date Lt")] = None + duration_gte: Annotated[float | None, Field(title="Duration Gte")] = None + duration_gt: Annotated[float | None, Field(title="Duration Gt")] = None + duration_lte: Annotated[float | None, Field(title="Duration Lte")] = None + duration_lt: Annotated[float | None, Field(title="Duration Lt")] = None + conf_contains: Annotated[str | None, Field(title="Conf Contains")] = None class DAGVersionCollectionResponse(BaseModel):