Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ class FilterOptionEnum(Enum):
ANY_EQUAL = "any_eq"
ALL_EQUAL = "all_eq"
IS_NONE = "is_none"
CONTAINS = "contains"


class FilterParam(BaseParam[T]):
Expand Down Expand Up @@ -366,6 +367,13 @@ def to_orm(self, select: Select) -> Select:
return select.where(self.attribute.is_not(None))
if self.value is True:
return select.where(self.attribute.is_(None))
if self.filter_option == FilterOptionEnum.CONTAINS:
# For JSON/JSONB columns, convert to text before applying LIKE
from sqlalchemy import Text, cast

if str(self.attribute.type).upper() in ("JSON", "JSONB"):
return select.where(cast(self.attribute, Text).contains(self.value))
return select.where(self.attribute.contains(self.value))
raise ValueError(f"Invalid filter option {self.filter_option} for value {self.value}")

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,10 @@ class DAGRunsBatchBody(StrictBaseModel):
end_date_gt: AwareDatetime | None = None
end_date_lte: AwareDatetime | None = None
end_date_lt: AwareDatetime | None = None

duration_gte: float | None = None
duration_gt: float | None = None
duration_lte: float | None = None
duration_lt: float | None = None

conf_contains: str | None = None
Original file line number Diff line number Diff line change
Expand Up @@ -2177,6 +2177,38 @@ paths:
format: date-time
- type: 'null'
title: End Date Lt
- name: duration_gte
in: query
required: false
schema:
anyOf:
- type: number
- type: 'null'
title: Duration Gte
- name: duration_gt
in: query
required: false
schema:
anyOf:
- type: number
- type: 'null'
title: Duration Gt
- name: duration_lte
in: query
required: false
schema:
anyOf:
- type: number
- type: 'null'
title: Duration Lte
- name: duration_lt
in: query
required: false
schema:
anyOf:
- type: number
- type: 'null'
title: Duration Lt
- name: updated_at_gte
in: query
required: false
Expand Down Expand Up @@ -2213,6 +2245,12 @@ paths:
format: date-time
- type: 'null'
title: Updated At Lt
- name: conf_contains
in: query
required: false
schema:
type: string
title: Conf Contains
- name: run_type
in: query
required: false
Expand Down Expand Up @@ -10474,6 +10512,31 @@ components:
format: date-time
- type: 'null'
title: End Date Lt
duration_gte:
anyOf:
- type: number
- type: 'null'
title: Duration Gte
duration_gt:
anyOf:
- type: number
- type: 'null'
title: Duration Gt
duration_lte:
anyOf:
- type: number
- type: 'null'
title: Duration Lte
duration_lt:
anyOf:
- type: number
- type: 'null'
title: Duration Lt
conf_contains:
anyOf:
- type: string
- type: 'null'
title: Conf Contains
additionalProperties: false
type: object
title: DAGRunsBatchBody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
SortParam,
_SearchParam,
datetime_range_filter_factory,
filter_param_factory,
float_range_filter_factory,
search_param_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
Expand Down Expand Up @@ -317,7 +319,12 @@ def get_dag_runs(
logical_date: Annotated[RangeFilter, Depends(datetime_range_filter_factory("logical_date", DagRun))],
start_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("start_date", DagRun))],
end_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("end_date", DagRun))],
duration_range: Annotated[RangeFilter, Depends(float_range_filter_factory("duration", DagRun))],
update_at_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("updated_at", DagRun))],
conf_contains: Annotated[
FilterParam[str],
Depends(filter_param_factory(DagRun.conf, str, FilterOptionEnum.CONTAINS, "conf_contains")),
],
run_type: QueryDagRunRunTypesFilter,
state: QueryDagRunStateFilter,
dag_version: QueryDagRunVersionFilter,
Expand Down Expand Up @@ -376,6 +383,8 @@ def get_dag_runs(
start_date_range,
end_date_range,
update_at_range,
duration_range,
conf_contains,
state,
run_type,
dag_version,
Expand Down Expand Up @@ -565,6 +574,16 @@ def get_list_dag_runs_batch(
),
attribute=DagRun.end_date,
)
duration = RangeFilter(
Range(
lower_bound_gte=body.duration_gte,
lower_bound_gt=body.duration_gt,
upper_bound_lte=body.duration_lte,
upper_bound_lt=body.duration_lt,
),
attribute=DagRun.duration,
)
conf_contains = FilterParam(DagRun.conf, body.conf_contains, FilterOptionEnum.CONTAINS)
state = FilterParam(DagRun.state, body.states, FilterOptionEnum.ANY_EQUAL)

offset = OffsetFilter(body.page_offset)
Expand All @@ -590,7 +609,17 @@ def get_list_dag_runs_batch(
base_query = select(DagRun).options(joinedload(DagRun.dag_model))
dag_runs_select, total_entries = paginated_select(
statement=base_query,
filters=[dag_ids, logical_date, run_after, start_date, end_date, state, readable_dag_runs_filter],
filters=[
dag_ids,
logical_date,
run_after,
start_date,
end_date,
duration,
conf_contains,
state,
readable_dag_runs_filter,
],
order_by=order_by,
offset=offset,
limit=limit,
Expand Down
8 changes: 7 additions & 1 deletion airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,15 @@ def duration(cls, session: Session = NEW_SESSION) -> Case:
if dialect_name == "mysql":
return func.timestampdiff(text("SECOND"), cls.start_date, cls.end_date)

if dialect_name == "sqlite":
duration_expr = (func.julianday(cls.end_date) - func.julianday(cls.start_date)) * 86400

else:
duration_expr = func.extract("epoch", cls.end_date - cls.start_date)

when_condition = (
(cls.end_date != None) & (cls.start_date != None), # noqa: E711
func.extract("epoch", cls.end_date - cls.start_date),
duration_expr,
)

return case(when_condition, else_=None)
Expand Down
9 changes: 7 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,15 @@ export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId }:
export type DagRunServiceGetDagRunsDefaultResponse = Awaited<ReturnType<typeof DagRunService.getDagRuns>>;
export type DagRunServiceGetDagRunsQueryResult<TData = DagRunServiceGetDagRunsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns";
export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const UseDagRunServiceGetDagRunsKeyFn = ({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
confContains?: string;
dagId: string;
dagIdPattern?: string;
dagVersion?: number[];
durationGt?: number;
durationGte?: number;
durationLt?: number;
durationLte?: number;
endDateGt?: string;
endDateGte?: string;
endDateLt?: string;
Expand Down Expand Up @@ -174,7 +179,7 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, dagIdPattern, dagVersio
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])];
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])];
export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited<ReturnType<typeof DagRunService.waitDagRunUntilFinished>>;
export type DagRunServiceWaitDagRunUntilFinishedQueryResult<TData = DagRunServiceWaitDagRunUntilFinishedDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,15 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu
* @param data.endDateGt
* @param data.endDateLte
* @param data.endDateLt
* @param data.durationGte
* @param data.durationGt
* @param data.durationLte
* @param data.durationLt
* @param data.updatedAtGte
* @param data.updatedAtGt
* @param data.updatedAtLte
* @param data.updatedAtLt
* @param data.confContains
* @param data.runType
* @param data.state
* @param data.dagVersion
Expand All @@ -295,10 +300,15 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
confContains?: string;
dagId: string;
dagIdPattern?: string;
dagVersion?: number[];
durationGt?: number;
durationGte?: number;
durationLt?: number;
durationLte?: number;
endDateGt?: string;
endDateGte?: string;
endDateLt?: string;
Expand Down Expand Up @@ -326,7 +336,7 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, {
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
Expand Down
14 changes: 12 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,15 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer
* @param data.endDateGt
* @param data.endDateLte
* @param data.endDateLt
* @param data.durationGte
* @param data.durationGt
* @param data.durationLte
* @param data.durationLt
* @param data.updatedAtGte
* @param data.updatedAtGt
* @param data.updatedAtLte
* @param data.updatedAtLt
* @param data.confContains
* @param data.runType
* @param data.state
* @param data.dagVersion
Expand All @@ -295,10 +300,15 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
confContains?: string;
dagId: string;
dagIdPattern?: string;
dagVersion?: number[];
durationGt?: number;
durationGte?: number;
durationLt?: number;
durationLte?: number;
endDateGt?: string;
endDateGte?: string;
endDateLt?: string;
Expand Down Expand Up @@ -326,7 +336,7 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagIdPattern, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
Expand Down
Loading
Loading