forked from getAlby/ldk-node
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscoring.rs
More file actions
112 lines (103 loc) · 3.38 KB
/
scoring.rs
File metadata and controls
112 lines (103 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::io::Cursor;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use lightning::routing::scoring::ChannelLiquidities;
use lightning::util::ser::Readable;
use lightning::{log_error, log_info, log_trace};
use crate::config::{
EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL, EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS,
};
use crate::io::utils::write_external_pathfinding_scores_to_cache;
use crate::logger::LdkLogger;
use crate::runtime::Runtime;
use crate::{write_node_metrics, DynStore, Logger, NodeMetrics, Scorer};
/// Start a background task that periodically downloads scores via an external url and merges them into the local
/// pathfinding scores.
pub fn setup_background_pathfinding_scores_sync(
url: String, scorer: Arc<Mutex<crate::types::Scorer>>, node_metrics: Arc<RwLock<NodeMetrics>>,
kv_store: Arc<DynStore>, logger: Arc<Logger>, runtime: Arc<Runtime>,
mut stop_receiver: tokio::sync::watch::Receiver<()>,
) {
log_info!(logger, "External scores background syncing enabled from {}", url);
let logger = Arc::clone(&logger);
runtime.spawn_background_processor_task(async move {
let mut interval = tokio::time::interval(EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL);
loop {
tokio::select! {
_ = stop_receiver.changed() => {
log_trace!(
logger,
"Stopping background syncing external scores.",
);
return;
}
_ = interval.tick() => {
log_trace!(
logger,
"Background sync of external scores started.",
);
sync_external_scores(logger.as_ref(), scorer.as_ref(), node_metrics.as_ref(), Arc::clone(&kv_store), &url).await;
}
}
}
});
}
async fn sync_external_scores(
logger: &Logger, scorer: &Mutex<Scorer>, node_metrics: &RwLock<NodeMetrics>,
kv_store: Arc<DynStore>, url: &String,
) -> () {
let response = tokio::time::timeout(
Duration::from_secs(EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS),
reqwest::get(url),
)
.await;
let response = match response {
Ok(resp) => resp,
Err(e) => {
log_error!(logger, "Retrieving external scores timed out: {}", e);
return;
},
};
let response = match response {
Ok(resp) => resp,
Err(e) => {
log_error!(logger, "Failed to retrieve external scores update: {}", e);
return;
},
};
let body = match response.bytes().await {
Ok(bytes) => bytes,
Err(e) => {
log_error!(logger, "Failed to read external scores update: {}", e);
return;
},
};
let mut reader = Cursor::new(body);
match ChannelLiquidities::read(&mut reader) {
Ok(liquidities) => {
if let Err(e) = write_external_pathfinding_scores_to_cache(
Arc::clone(&kv_store),
&liquidities,
logger,
)
.await
{
log_error!(logger, "Failed to persist external scores to cache: {}", e);
}
let duration_since_epoch =
SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
scorer.lock().unwrap().merge(liquidities, duration_since_epoch);
let mut locked_node_metrics = node_metrics.write().unwrap();
locked_node_metrics.latest_pathfinding_scores_sync_timestamp =
Some(duration_since_epoch.as_secs());
write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), logger)
.unwrap_or_else(|e| {
log_error!(logger, "Persisting node metrics failed: {}", e);
});
log_trace!(logger, "External scores merged successfully");
},
Err(e) => {
log_error!(logger, "Failed to parse external scores update: {}", e);
},
}
}