Skip to content

Commit cff9b6b

Browse files
Add timeout to health check (#1961)
Co-authored-by: Marcus Eagan <marcuseagan@gmail.com>
1 parent 4b77932 commit cff9b6b

File tree

13 files changed

+280
-22
lines changed

13 files changed

+280
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-config/examples/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,9 @@ The `private` server consists of a `listener` object and a `services` object. Th
362362
"scheduler": "MAIN_SCHEDULER",
363363
},
364364
"admin": {},
365-
"health": {},
365+
"health": {
366+
"timeout_seconds": 2
367+
},
366368
}
367369
}],
368370
"global": {},

nativelink-config/src/cas_server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,10 @@ pub struct HealthConfig {
246246
/// Default: "/status"
247247
#[serde(default)]
248248
pub path: String,
249+
250+
// Timeout on health checks. Defaults to 5s.
251+
#[serde(default)]
252+
pub timeout_seconds: u64,
249253
}
250254

251255
#[derive(Deserialize, Serialize, Debug)]

nativelink-service/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ rust_test_suite(
6262
"tests/cas_server_test.rs",
6363
"tests/execution_server_test.rs",
6464
"tests/fetch_server_test.rs",
65+
"tests/health_server_test.rs",
6566
"tests/push_server_test.rs",
6667
"tests/worker_api_server_test.rs",
6768
],
@@ -79,6 +80,7 @@ rust_test_suite(
7980
"//nativelink-store",
8081
"//nativelink-util",
8182
"@crates//:async-lock",
83+
"@crates//:axum",
8284
"@crates//:bytes",
8385
"@crates//:futures",
8486
"@crates//:hex",
@@ -88,6 +90,7 @@ rust_test_suite(
8890
"@crates//:pretty_assertions",
8991
"@crates//:prost",
9092
"@crates//:prost-types",
93+
"@crates//:serde_json",
9194
"@crates//:sha2",
9295
"@crates//:tokio",
9396
"@crates//:tokio-stream",

nativelink-service/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ hyper = "1.6.0"
6565
hyper-util = "0.1.11"
6666
pretty_assertions = { version = "1.4.1", features = ["std"] }
6767
prost-types = { version = "0.13.5", default-features = false }
68+
serde_json = { version = "1.0.140", default-features = false, features = [
69+
"std",
70+
] }
6871
sha2 = { version = "0.10.8", default-features = false }
6972
tracing-test = { version = "0.2.5", default-features = false, features = [
7073
"no-env-filter",

nativelink-service/src/health_server.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ use core::convert::Infallible;
1616
use core::future::Future;
1717
use core::pin::Pin;
1818
use core::task::{Context, Poll};
19+
use core::time::Duration;
1920

2021
use axum::body::Body;
2122
use bytes::Bytes;
2223
use futures::StreamExt;
2324
use http_body_util::Full;
2425
use hyper::header::{CONTENT_TYPE, HeaderValue};
2526
use hyper::{Request, Response, StatusCode};
27+
use nativelink_config::cas_server::HealthConfig;
2628
use nativelink_util::health_utils::{
2729
HealthRegistry, HealthStatus, HealthStatusDescription, HealthStatusReporter,
2830
};
@@ -32,14 +34,27 @@ use tracing::error_span;
3234
/// Content type header value for JSON.
3335
const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8";
3436

37+
// Note: This must be kept in sync with the documentation in
38+
// `HealthConfig::timeout_seconds`.
39+
const DEFAULT_HEALTH_CHECK_TIMEOUT_SECONDS: u64 = 5;
40+
3541
#[derive(Debug, Clone)]
3642
pub struct HealthServer {
3743
health_registry: HealthRegistry,
44+
timeout: Duration,
3845
}
3946

4047
impl HealthServer {
41-
pub const fn new(health_registry: HealthRegistry) -> Self {
42-
Self { health_registry }
48+
pub const fn new(health_registry: HealthRegistry, health_cfg: &HealthConfig) -> Self {
49+
let timeout = Duration::from_secs(if health_cfg.timeout_seconds == 0 {
50+
DEFAULT_HEALTH_CHECK_TIMEOUT_SECONDS
51+
} else {
52+
health_cfg.timeout_seconds
53+
});
54+
Self {
55+
health_registry,
56+
timeout,
57+
}
4358
}
4459
}
4560

@@ -54,15 +69,19 @@ impl Service<Request<Body>> for HealthServer {
5469

5570
fn call(&mut self, _req: Request<Body>) -> Self::Future {
5671
let health_registry = self.health_registry.clone();
72+
let local_timeout = self.timeout;
5773
Box::pin(error_span!("health_server_call").in_scope(|| async move {
58-
let health_status_descriptions: Vec<HealthStatusDescription> =
59-
health_registry.health_status_report().collect().await;
74+
let health_status_descriptions: Vec<HealthStatusDescription> = health_registry
75+
.health_status_report(&local_timeout)
76+
.collect()
77+
.await;
6078

6179
match serde_json5::to_string(&health_status_descriptions) {
6280
Ok(body) => {
6381
let contains_failed_report =
6482
health_status_descriptions.iter().any(|description| {
6583
matches!(description.status, HealthStatus::Failed { .. })
84+
| matches!(description.status, HealthStatus::Timeout { .. })
6685
});
6786
let status_code = if contains_failed_report {
6887
StatusCode::SERVICE_UNAVAILABLE
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
use core::time::Duration;
2+
use std::borrow::Cow;
3+
use std::sync::Arc;
4+
5+
use axum::http::Request;
6+
use hyper::StatusCode;
7+
use nativelink_config::cas_server::HealthConfig;
8+
use nativelink_macro::nativelink_test;
9+
use nativelink_service::health_server::HealthServer;
10+
use nativelink_util::health_utils::{
11+
HealthRegistry, HealthRegistryBuilder, HealthStatus, HealthStatusIndicator,
12+
};
13+
use pretty_assertions::assert_eq;
14+
use serde_json::Value;
15+
use tonic::async_trait;
16+
use tonic::body::Body;
17+
use tonic::service::Routes;
18+
use tower::{Service, ServiceExt};
19+
20+
async fn health_tester(
21+
health_registry: HealthRegistry,
22+
expected_status_code: StatusCode,
23+
expected_result: &str,
24+
config: HealthConfig,
25+
) -> Result<(), Box<dyn core::error::Error>> {
26+
let health_server = HealthServer::new(health_registry, &config);
27+
28+
let tonic_services = Routes::builder().routes();
29+
30+
let mut svc = tonic_services
31+
.into_axum_router()
32+
.route_service("/status", health_server);
33+
34+
let request = Request::builder()
35+
.method("GET")
36+
.uri("/status")
37+
.body(Body::empty())?;
38+
let response: hyper::Response<axum::body::Body> =
39+
svc.as_service().ready().await?.call(request).await?;
40+
assert_eq!(response.status(), expected_status_code);
41+
42+
let raw_json = String::from_utf8(
43+
axum::body::to_bytes(response.into_body(), usize::MAX)
44+
.await?
45+
.to_vec(),
46+
)?;
47+
let parsed_json: Value = serde_json::from_str(&raw_json)?;
48+
assert_eq!(
49+
serde_json::to_string_pretty(&parsed_json)?,
50+
String::from(expected_result)
51+
);
52+
Ok(())
53+
}
54+
55+
#[nativelink_test]
56+
async fn basic_health_test() -> Result<(), Box<dyn core::error::Error>> {
57+
let health_registry = HealthRegistryBuilder::new("foo").build();
58+
health_tester(
59+
health_registry,
60+
StatusCode::OK,
61+
"[]",
62+
HealthConfig::default(),
63+
)
64+
.await
65+
}
66+
67+
struct TestIndicator {}
68+
69+
#[async_trait]
70+
impl HealthStatusIndicator for TestIndicator {
71+
fn get_name(&self) -> &'static str {
72+
"test_indicator"
73+
}
74+
75+
fn struct_name(&self) -> &'static str {
76+
"TestIndicator"
77+
}
78+
79+
async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus {
80+
HealthStatus::Ok {
81+
struct_name: self.struct_name(),
82+
message: Cow::Borrowed("all good"),
83+
}
84+
}
85+
}
86+
87+
#[nativelink_test]
88+
async fn health_test_with_item() -> Result<(), Box<dyn core::error::Error>> {
89+
let mut health_registry_builder = HealthRegistryBuilder::new("foo");
90+
health_registry_builder.register_indicator(Arc::new(TestIndicator {}));
91+
let health_registry = health_registry_builder.build();
92+
health_tester(
93+
health_registry,
94+
StatusCode::OK,
95+
r#"[
96+
{
97+
"namespace": "/foo/test_indicator",
98+
"status": {
99+
"Ok": {
100+
"struct_name": "TestIndicator",
101+
"message": "all good"
102+
}
103+
}
104+
}
105+
]"#,
106+
HealthConfig::default(),
107+
)
108+
.await
109+
}
110+
111+
struct TestSleepIndicator {}
112+
113+
#[async_trait]
114+
impl HealthStatusIndicator for TestSleepIndicator {
115+
fn get_name(&self) -> &'static str {
116+
"test_sleep_indicator"
117+
}
118+
119+
async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus {
120+
tokio::time::sleep(Duration::MAX).await;
121+
unreachable!("Because we sleep forever");
122+
}
123+
124+
fn struct_name(&self) -> &'static str {
125+
"TestSleepIndicator"
126+
}
127+
}
128+
129+
#[nativelink_test]
130+
async fn health_test_with_sleep() -> Result<(), Box<dyn core::error::Error>> {
131+
let mut health_registry_builder = HealthRegistryBuilder::new("foo");
132+
health_registry_builder.register_indicator(Arc::new(TestSleepIndicator {}));
133+
let health_registry = health_registry_builder.build();
134+
health_tester(
135+
health_registry,
136+
StatusCode::SERVICE_UNAVAILABLE,
137+
r#"[
138+
{
139+
"namespace": "/foo/test_sleep_indicator",
140+
"status": {
141+
"Timeout": {
142+
"struct_name": "TestSleepIndicator"
143+
}
144+
}
145+
}
146+
]"#,
147+
HealthConfig {
148+
timeout_seconds: 1,
149+
..Default::default()
150+
},
151+
)
152+
.await?;
153+
assert!(logs_contain(
154+
"Timeout during health check struct_name=\"TestSleepIndicator\""
155+
));
156+
Ok(())
157+
}

nativelink-store/tests/redis_store_test.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS;
3333
use nativelink_store::redis_store::RedisStore;
3434
use nativelink_util::buf_channel::make_buf_channel_pair;
3535
use nativelink_util::common::DigestInfo;
36+
use nativelink_util::health_utils::HealthStatus;
3637
use nativelink_util::store_trait::{StoreKey, StoreLike, UploadSizeInfo};
3738
use pretty_assertions::assert_eq;
3839
use tokio::sync::watch;
@@ -866,3 +867,36 @@ fn test_connection_errors() {
866867
"Connection issue connecting to redis server with hosts: [\"non-existent-server:6379\"], username: None, database: 0"
867868
);
868869
}
870+
871+
#[nativelink_test]
872+
fn test_health() {
873+
let spec = RedisSpec {
874+
addresses: vec!["redis://nativelink.com:6379/".to_string()],
875+
..Default::default()
876+
};
877+
let store = RedisStore::new(spec).expect("Working spec");
878+
match store.check_health(std::borrow::Cow::Borrowed("foo")).await {
879+
HealthStatus::Ok {
880+
struct_name: _,
881+
message: _,
882+
} => {
883+
panic!("Expected failure");
884+
}
885+
HealthStatus::Failed {
886+
struct_name,
887+
message,
888+
} => {
889+
assert_eq!(struct_name, "nativelink_store::redis_store::RedisStore");
890+
assert_eq!(
891+
message,
892+
"Store.update_oneshot() failed: Error { code: DeadlineExceeded, messages: [\"Timeout Error: Request timed out.\", \"Connection issue connecting to redis server with hosts: [\\\"nativelink.com:6379\\\"], username: None, database: 0\"] }"
893+
);
894+
assert!(logs_contain(
895+
"check_health Store.update_oneshot() failed e=Error { code: DeadlineExceeded, messages: [\"Timeout Error: Request timed out.\", \"Connection issue connecting to redis server with hosts: [\\\"nativelink.com:6379\\\"], username: None, database: 0\"] }"
896+
));
897+
}
898+
health_result => {
899+
panic!("Other result: {health_result:?}");
900+
}
901+
}
902+
}

0 commit comments

Comments
 (0)