1717use crate :: kvdb:: IntrospectorKvdb ;
1818use clap:: Parser ;
1919use color_eyre:: Result ;
20- use log:: { error, info} ;
20+ use log:: { debug , error, info} ;
2121use prometheus_endpoint:: { prometheus:: IntGaugeVec , Opts , Registry } ;
22+ use rand:: { thread_rng, Rng } ;
2223
2324#[ derive( Clone , Debug , Parser , Default ) ]
2425#[ clap( rename_all = "kebab-case" ) ]
@@ -29,6 +30,12 @@ pub struct KvdbPrometheusOptions {
2930 /// Database poll timeout (default, once per 5 minutes).
3031 #[ clap( long, default_value = "300.0" ) ]
3132 poll_timeout : f32 ,
33+ /// Probability to delay in the iteration to reduce instant DB load (default - around 10000 iterations)
34+ #[ clap( long, default_value = "0.00001" ) ]
35+ sleep_probability : f32 ,
36+ /// Sleep time in seconds (default 10ms)
37+ #[ clap( long, default_value = "0.01" ) ]
38+ sleep_time : f32 ,
3239}
3340
3441struct KvdbPrometheusMetrics {
@@ -77,10 +84,13 @@ async fn update_db<D: IntrospectorKvdb>(
7784 Ok ( columns) => {
7885 let mut update_results: Vec < UpdateResult > = vec ! [ ] ;
7986
80- let all_done = columns. iter ( ) . all ( |col| {
87+ let mut all_done = true ;
88+ for col in columns {
8189 let mut keys_space = 0_i64 ;
8290 let mut keys_count = 0_i64 ;
8391 let mut values_space = 0_i64 ;
92+ // Used to sleep more frequently on large keys
93+ let mut size_factor: f32 = 1.0 ;
8494
8595 info ! ( "Iterating over column {}" , col. as_str( ) ) ;
8696 match db. iter_values ( col. as_str ( ) ) {
@@ -89,20 +99,33 @@ async fn update_db<D: IntrospectorKvdb>(
8999 keys_space += key. len ( ) as i64 ;
90100 keys_count += 1 ;
91101 values_space += value. len ( ) as i64 ;
102+
103+ if prometheus_opts. sleep_probability > 0.0 && prometheus_opts. sleep_time > 0.0 {
104+ let dice: f32 = thread_rng ( ) . gen_range ( 0.0 ..1.0 ) ;
105+ size_factor += ( key. len ( ) + value. len ( ) ) as f32 / 10240.0 ;
106+ if dice < prometheus_opts. sleep_probability * size_factor {
107+ debug ! ( "sleeping to unload database" ) ;
108+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs_f32 (
109+ prometheus_opts. sleep_time ,
110+ ) )
111+ . await ;
112+ size_factor = 1.0 ;
113+ }
114+ }
92115 } ,
93116 Err ( e) => {
94117 error ! (
95118 "Failed to get iterator for column {} in database: {:?}, trying to reopen database" ,
96119 col. as_str( ) ,
97120 e
98121 ) ;
99- return false
122+ all_done = false ;
123+ break
100124 } ,
101125 }
102126
103127 update_results. push ( UpdateResult { column : col. clone ( ) , keys_count, keys_space, values_space } ) ;
104- true
105- } ) ;
128+ }
106129
107130 if all_done {
108131 for res in & update_results {
0 commit comments