Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0b6774a
replace gunicorm with uvicorn.run()
vatsrahul1001 Dec 20, 2024
85305fc
Merge branch 'main' of github.com:astronomer/airflow into replace-gun…
vatsrahul1001 Dec 31, 2024
9d4a1ec
fixing tests
vatsrahul1001 Dec 31, 2024
4b60ca7
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Dec 31, 2024
a167f94
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 21, 2025
7ceea79
Daemonized fastapi server
vatsrahul1001 Jan 21, 2025
eb85115
Merge branch 'replace-gunicorn-with-uvicorn-run' of github.com:astron…
vatsrahul1001 Jan 21, 2025
1332d3d
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 21, 2025
613d466
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 21, 2025
3dcf8f2
fixing setproctitle format
vatsrahul1001 Jan 22, 2025
fc3535e
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 22, 2025
57e3cfe
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 22, 2025
d928ddb
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 22, 2025
0ab6d7b
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 22, 2025
49642e3
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 23, 2025
d667103
updating setproctitle
vatsrahul1001 Jan 23, 2025
410cf5a
Merge branch 'replace-gunicorn-with-uvicorn-run' of github.com:astron…
vatsrahul1001 Jan 23, 2025
0b3696e
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 24, 2025
b381f7d
Merge branch 'main' into replace-gunicorn-with-uvicorn-run
vatsrahul1001 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 22 additions & 127 deletions airflow/cli/commands/local_commands/fastapi_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,16 @@

import logging
import os
import signal
import subprocess
import sys
import textwrap
from contextlib import suppress
from pathlib import Path
from time import sleep
from typing import NoReturn

import psutil
from lockfile.pidlockfile import read_pid_from_pidfile
from uvicorn.workers import UvicornWorker
import uvicorn
from gunicorn.util import daemonize
from setproctitle import setproctitle

from airflow import settings
from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.commands.local_commands.webserver_command import GunicornMonitor
from airflow.exceptions import AirflowConfigException
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

log = logging.getLogger(__name__)
Expand All @@ -47,8 +38,6 @@
# This shouldn't be necessary but there seems to be an issue in uvloop that causes bad file descriptor
# errors when shutting down workers. Despite the 'closed' status of the issue it is not solved,
# more info here: https://github.com/benoitc/gunicorn/issues/1877#issuecomment-1911136399
AirflowUvicornWorker = UvicornWorker
AirflowUvicornWorker.CONFIG_KWARGS = {"loop": "asyncio", "http": "auto"}


@cli_utils.action_cli
Expand All @@ -59,18 +48,13 @@ def fastapi_api(args):

apps = args.apps
access_logfile = args.access_logfile or "-"
error_logfile = args.error_logfile or "-"
access_logformat = args.access_logformat
num_workers = args.workers
worker_timeout = args.worker_timeout

worker_class = "airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker"

from airflow.api_fastapi.app import create_app

if args.debug:
print(f"Starting the FastAPI API server on port {args.port} and host {args.hostname} debug.")
log.warning("Running in dev mode, ignoring gunicorn args")
log.warning("Running in dev mode, ignoring uvicorn args")

run_args = [
"fastapi",
Expand All @@ -93,124 +77,35 @@ def fastapi_api(args):
process.wait()
os.environ.pop("AIRFLOW_API_APPS")
else:
if args.daemon:
daemonize()
log.info("Daemonized the FastAPI API server process PID: %s", os.getpid())

log.info(
textwrap.dedent(
f"""\
Running the Gunicorn Server with:
Running the uvicorn with:
Apps: {apps}
Workers: {num_workers} {worker_class}
Workers: {num_workers}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
Logfiles: {access_logfile} {error_logfile}
Logfiles: {access_logfile}
Access Logformat: {access_logformat}
================================================================="""
)
)

pid_file, _, _, _ = setup_locations("fastapi-api", pid=args.pid)

run_args = [
sys.executable,
"-m",
"gunicorn",
"--workers",
str(num_workers),
"--worker-class",
str(worker_class),
"--timeout",
str(worker_timeout),
"--bind",
args.hostname + ":" + str(args.port),
"--name",
"airflow-fastapi-api",
"--pid",
pid_file,
"--access-logfile",
str(access_logfile),
"--error-logfile",
str(error_logfile),
"--config",
"python:airflow.api_fastapi.gunicorn_config",
]

ssl_cert, ssl_key = _get_ssl_cert_and_key_filepaths(args)
if ssl_cert and ssl_key:
run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key]

if args.access_logformat and args.access_logformat.strip():
run_args += ["--access-logformat", str(args.access_logformat)]

if args.daemon:
run_args += ["--daemon"]

run_args += [f"airflow.api_fastapi.app:cached_app(apps='{apps}')"]

# To prevent different workers creating the web app and
# all writing to the database at the same time, we use the --preload option.
# With the preload option, the app is loaded before the workers are forked, and each worker will
# then have a copy of the app
run_args += ["--preload"]

def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
log.info("Received signal: %s. Closing gunicorn.", signum)
gunicorn_master_proc.terminate()
with suppress(TimeoutError):
gunicorn_master_proc.wait(timeout=30)
if isinstance(gunicorn_master_proc, subprocess.Popen):
still_running = gunicorn_master_proc.poll() is not None
else:
still_running = gunicorn_master_proc.is_running()
if still_running:
gunicorn_master_proc.kill()
sys.exit(0)

def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
# Register signal handlers
signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc))
signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc))

# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
GunicornMonitor(
gunicorn_master_pid=gunicorn_master_proc.pid,
num_workers_expected=num_workers,
master_timeout=120,
worker_refresh_interval=30,
worker_refresh_batch_size=1,
reload_on_plugin_change=False,
).start()

def start_and_monitor_gunicorn(args):
if args.daemon:
subprocess.Popen(run_args, close_fds=True)

# Reading pid of gunicorn master as it will be different that
# the one of process spawned above.
gunicorn_master_proc_pid = None
while not gunicorn_master_proc_pid:
sleep(0.1)
gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file)

# Run Gunicorn monitor
gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
monitor_gunicorn(gunicorn_master_proc)
else:
with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc:
monitor_gunicorn(gunicorn_master_proc)

if args.daemon:
# This makes possible errors get reported before daemonization
os.environ["SKIP_DAGS_PARSING"] = "True"
create_app(apps)
os.environ.pop("SKIP_DAGS_PARSING")

pid_file_path = Path(pid_file)
monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}"))
run_command_with_daemon_option(
args=args,
process_name="fastapi-api",
callback=lambda: start_and_monitor_gunicorn(args),
should_setup_logging=True,
pid_file=monitor_pid_file,
Comment thread
pierrejeambrun marked this conversation as resolved.
setproctitle(f"airflow fastapi_api -- host:{args.hostname} port:{args.port}")
uvicorn.run(
"airflow.api_fastapi.main:app",
host=args.hostname,
port=args.port,
workers=num_workers,
timeout_keep_alive=worker_timeout,
timeout_graceful_shutdown=worker_timeout,
ssl_keyfile=ssl_key,
ssl_certfile=ssl_cert,
access_log=access_logfile,
Comment thread
pierrejeambrun marked this conversation as resolved.
)


Expand Down
115 changes: 11 additions & 104 deletions tests/cli/commands/local_commands/test_fastapi_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
# under the License.
from __future__ import annotations

import os
import subprocess
import sys
import time
from unittest import mock

import pytest
Expand All @@ -37,73 +33,9 @@
class TestCliFastAPI(_CommonCLIGunicornTestClass):
main_process_regexp = r"airflow fastapi-api"

@pytest.mark.execution_timeout(210)
def test_cli_fastapi_api_background(self, tmp_path):
parent_path = tmp_path / "gunicorn"
parent_path.mkdir()
pidfile_fastapi_api = parent_path / "pidflow-fastapi-api.pid"
pidfile_monitor = parent_path / "pidflow-fastapi-api-monitor.pid"
stdout = parent_path / "airflow-fastapi-api.out"
stderr = parent_path / "airflow-fastapi-api.err"
logfile = parent_path / "airflow-fastapi-api.log"
try:
# Run fastapi-api as daemon in background. Note that the wait method is not called.
console.print("[magenta]Starting airflow fastapi-api --daemon")
env = os.environ.copy()
proc = subprocess.Popen(
[
"airflow",
"fastapi-api",
"--daemon",
"--pid",
os.fspath(pidfile_fastapi_api),
"--stdout",
os.fspath(stdout),
"--stderr",
os.fspath(stderr),
"--log-file",
os.fspath(logfile),
],
env=env,
)
assert proc.poll() is None

pid_monitor = self._wait_pidfile(pidfile_monitor)
console.print(f"[blue]Monitor started at {pid_monitor}")
pid_fastapi_api = self._wait_pidfile(pidfile_fastapi_api)
console.print(f"[blue]FastAPI API started at {pid_fastapi_api}")
console.print("[blue]Running airflow fastapi-api process:")
# Assert that the fastapi-api and gunicorn processes are running (by name rather than pid).
assert self._find_process(r"airflow fastapi-api --daemon", print_found_process=True)
console.print("[blue]Waiting for gunicorn processes:")
# wait for gunicorn to start
for _ in range(30):
if self._find_process(r"^gunicorn"):
break
console.print("[blue]Waiting for gunicorn to start ...")
time.sleep(1)
console.print("[blue]Running gunicorn processes:")
assert self._find_all_processes("^gunicorn", print_found_process=True)
console.print("[magenta]fastapi-api process started successfully.")
console.print(
"[magenta]Terminating monitor process and expect "
"fastapi-api and gunicorn processes to terminate as well"
)
self._terminate_multiple_process([pid_fastapi_api, pid_monitor])
self._check_processes(ignore_running=False)
console.print("[magenta]All fastapi-api and gunicorn processes are terminated.")
except Exception:
console.print("[red]Exception occurred. Dumping all logs.")
# Dump all logs
for file in parent_path.glob("*"):
console.print(f"Dumping {file} (size: {file.stat().st_size})")
console.print(file.read_text())
raise

def test_cli_fastapi_api_debug(self, app):
with (
mock.patch("subprocess.Popen") as Popen,
mock.patch.object(fastapi_api_command, "GunicornMonitor"),
):
port = "9092"
hostname = "somehost"
Expand All @@ -130,7 +62,6 @@ def test_cli_fastapi_api_env_var_set_unset(self, app):
"""
with (
mock.patch("subprocess.Popen") as Popen,
mock.patch.object(fastapi_api_command, "GunicornMonitor"),
mock.patch("os.environ", autospec=True) as mock_environ,
):
apps_value = "core,execution"
Expand Down Expand Up @@ -172,8 +103,7 @@ def test_cli_fastapi_api_args(self, ssl_cert_and_key):
cert_path, key_path = ssl_cert_and_key

with (
mock.patch("subprocess.Popen") as Popen,
mock.patch.object(fastapi_api_command, "GunicornMonitor"),
mock.patch("uvicorn.run") as mock_run,
):
args = self.parser.parse_args(
[
Expand All @@ -192,39 +122,16 @@ def test_cli_fastapi_api_args(self, ssl_cert_and_key):
)
fastapi_api_command.fastapi_api(args)

Popen.assert_called_with(
[
sys.executable,
"-m",
"gunicorn",
"--workers",
"4",
"--worker-class",
"airflow.cli.commands.local_commands.fastapi_api_command.AirflowUvicornWorker",
"--timeout",
"120",
"--bind",
"0.0.0.0:9091",
"--name",
"airflow-fastapi-api",
"--pid",
"/tmp/x.pid",
"--access-logfile",
"-",
"--error-logfile",
"-",
"--config",
"python:airflow.api_fastapi.gunicorn_config",
"--certfile",
str(cert_path),
"--keyfile",
str(key_path),
"--access-logformat",
"custom_log_format",
"airflow.api_fastapi.app:cached_app(apps='core')",
"--preload",
],
close_fds=True,
mock_run.assert_called_with(
"airflow.api_fastapi.main:app",
host="0.0.0.0",
port=9091,
workers=4,
timeout_keep_alive=120,
timeout_graceful_shutdown=120,
ssl_keyfile=str(key_path),
ssl_certfile=str(cert_path),
access_log="-",
)

@pytest.mark.parametrize(
Expand Down