Skip to content

Commit f8f9098

Browse files
committed
refactor: improve threading model
1 parent 26d60b7 commit f8f9098

File tree

4 files changed

+94
-79
lines changed

4 files changed

+94
-79
lines changed

src/indexer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
pub mod chains;
22

33
use crate::types::{BitcoinBlockData, BlockIdentifier, StacksBlockData, StacksBlockMetadata};
4-
use crate::utils::stacks::{PoxInfo, StacksRpc};
4+
use crate::utils::stacks::PoxInfo;
55
use rocket::serde::json::Value as JsonValue;
66
use std::collections::{HashMap, VecDeque};
77

src/integrate/events_observer.rs

Lines changed: 78 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::indexer::{chains, BitcoinChainEvent, Indexer, IndexerConfig, StacksCh
33
use crate::integrate::{MempoolAdmissionData, ServiceStatusData, Status};
44
use crate::poke::load_session;
55
use crate::publish::{publish_contract, Network};
6-
use crate::types::{self, DevnetConfig};
6+
use crate::types::{self, DevnetConfig, BlockIdentifier};
77
use crate::utils;
8-
use crate::utils::stacks::{transactions, PoxInfo, StacksRpc};
8+
use crate::utils::stacks::{transactions, StacksRpc, PoxInfo};
99
use base58::FromBase58;
1010
use clarity_repl::clarity::representations::ClarityName;
1111
use clarity_repl::clarity::types::{BuffData, SequenceData, TupleData, Value as ClarityValue};
@@ -109,10 +109,18 @@ impl EventObserverConfig {
109109
}
110110
}
111111

112+
pub enum EventsObserverCommand {
113+
Terminate(bool), // Restart
114+
UpdatePoxInfo,
115+
PublishInitialContracts,
116+
PublishPoxStackingOrders(BlockIdentifier),
117+
}
118+
112119
pub async fn start_events_observer(
113120
config: EventObserverConfig,
114121
devnet_event_tx: Sender<DevnetEvent>,
115-
terminator_rx: Receiver<bool>,
122+
events_observer_commands_rx: Receiver<EventsObserverCommand>,
123+
events_observer_commands_tx: Sender<EventsObserverCommand>,
116124
) -> Result<(), Box<dyn Error>> {
117125
let _ = config.execute_scripts().await;
118126

@@ -137,15 +145,16 @@ pub async fn start_events_observer(
137145
let port = config.devnet_config.orchestrator_port;
138146
let manifest_path = config.manifest_path.clone();
139147

140-
let config_mutex = Arc::new(Mutex::new(config));
148+
let config_mutex = Arc::new(Mutex::new(config.clone()));
141149
let init_status_rw_lock = Arc::new(RwLock::new(init_status));
142150
let indexer_rw_lock = Arc::new(RwLock::new(indexer));
143151

144-
let moved_config_mutex = config_mutex.clone();
145152
let devnet_event_tx_mutex = Arc::new(Mutex::new(devnet_event_tx.clone()));
153+
let background_job_tx_mutex = Arc::new(Mutex::new(events_observer_commands_tx.clone()));
154+
146155
let moved_init_status_rw_lock = init_status_rw_lock.clone();
147156

148-
let config = Config {
157+
let rocket_config = Config {
149158
port: port,
150159
workers: 4,
151160
address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
@@ -156,11 +165,12 @@ pub async fn start_events_observer(
156165
};
157166

158167
let _ = std::thread::spawn(move || {
159-
let future = rocket::custom(config)
168+
let future = rocket::custom(rocket_config)
160169
.manage(indexer_rw_lock)
161170
.manage(devnet_event_tx_mutex)
162-
.manage(moved_config_mutex)
171+
.manage(config_mutex)
163172
.manage(moved_init_status_rw_lock)
173+
.manage(background_job_tx_mutex)
164174
.mount(
165175
"/",
166176
routes![
@@ -176,15 +186,16 @@ pub async fn start_events_observer(
176186
rt.block_on(future).expect("Unable to spawn event observer");
177187
});
178188

189+
// This loop is used for handling background jobs, emitted by HTTP calls.
179190
loop {
180-
match terminator_rx.recv() {
181-
Ok(true) => {
191+
match events_observer_commands_rx.recv() {
192+
Ok(EventsObserverCommand::Terminate(true)) => {
182193
devnet_event_tx
183194
.send(DevnetEvent::info("Terminating event observer".into()))
184195
.expect("Unable to terminate event observer");
185196
break;
186197
}
187-
Ok(false) => {
198+
Ok(EventsObserverCommand::Terminate(false)) => {
188199
// Restart
189200
devnet_event_tx
190201
.send(DevnetEvent::info("Reloading contracts".into()))
@@ -209,9 +220,33 @@ pub async fn start_events_observer(
209220
)))
210221
.expect("Unable to terminate event observer");
211222

212-
if let Ok(mut config_writer) = init_status_rw_lock.write() {
213-
config_writer.contracts_left_to_deploy = contracts_to_deploy;
214-
config_writer.deployer_nonce = 0;
223+
if let Ok(mut init_status) = init_status_rw_lock.write() {
224+
init_status.contracts_left_to_deploy = contracts_to_deploy;
225+
init_status.deployer_nonce = 0;
226+
}
227+
}
228+
Ok(EventsObserverCommand::UpdatePoxInfo) => {
229+
230+
}
231+
Ok(EventsObserverCommand::PublishInitialContracts) => {
232+
if let Ok(mut init_status_writer) = init_status_rw_lock.write() {
233+
let res = publish_initial_contracts(&config.devnet_config, &config.accounts, config.deployment_fee_rate, &mut init_status_writer);
234+
if let Some(tx_count) = res {
235+
let _ = devnet_event_tx.send(DevnetEvent::success(format!(
236+
"Will publish {} contracts",
237+
tx_count
238+
)));
239+
}
240+
}
241+
}
242+
Ok(EventsObserverCommand::PublishPoxStackingOrders(block_identifier)) => {
243+
let bitcoin_block_height = block_identifier.index;
244+
let res = publish_stacking_orders(&config.devnet_config, &config.accounts, config.deployment_fee_rate, bitcoin_block_height as u32).await;
245+
if let Some(tx_count) = res {
246+
let _ = devnet_event_tx.send(DevnetEvent::success(format!(
247+
"Will broadcast {} stacking orders",
248+
tx_count
249+
)));
215250
}
216251
}
217252
Err(_) => {
@@ -283,11 +318,11 @@ pub fn handle_new_burn_block(
283318
}
284319

285320
#[post("/new_block", format = "application/json", data = "<marshalled_block>")]
286-
pub async fn handle_new_block(
287-
config: &State<Arc<Mutex<EventObserverConfig>>>,
321+
pub fn handle_new_block(
288322
init_status: &State<Arc<RwLock<DevnetInitializationStatus>>>,
289323
indexer_rw_lock: &State<Arc<RwLock<Indexer>>>,
290324
devnet_events_tx: &State<Arc<Mutex<Sender<DevnetEvent>>>>,
325+
background_job_tx_mutex: &State<Arc<Mutex<Sender<EventsObserverCommand>>>>,
291326
marshalled_block: Json<JsonValue>,
292327
) -> Json<JsonValue> {
293328
// Standardize the structure of the block, and identify the
@@ -344,22 +379,10 @@ pub async fn handle_new_block(
344379
// - Contracts deployment orchestration during the first blocks (max: 25 / block)
345380
// - PoX stacking order orchestration: enable PoX with some "stack-stx" transactions
346381
// defined in the devnet file config.
347-
if let Ok(mut init_status_writer) = init_status.inner().write() {
382+
if let Ok(init_status_writer) = init_status.inner().read() {
348383
if init_status_writer.contracts_left_to_deploy.len() > 0 {
349-
if let Ok(config_reader) = config.inner().lock() {
350-
if let Some(count) = publish_initial_contracts(
351-
&config_reader.devnet_config,
352-
&config_reader.accounts,
353-
&mut init_status_writer,
354-
config_reader.deployment_fee_rate,
355-
) {
356-
if let Ok(tx) = devnet_events_tx.lock() {
357-
let _ = tx.send(DevnetEvent::success(format!(
358-
"Will publish {} contracts",
359-
count
360-
)));
361-
}
362-
}
384+
if let Ok(background_job_tx) = background_job_tx_mutex.lock() {
385+
let _ = background_job_tx.send(EventsObserverCommand::PublishInitialContracts);
363386
}
364387
}
365388
}
@@ -368,30 +391,10 @@ pub async fn handle_new_block(
368391
// cycle starts.
369392
let pox_cycle_length: u32 =
370393
pox_info.prepare_phase_block_length + pox_info.reward_phase_block_length;
371-
let should_update_pox_info = block.metadata.pox_cycle_position == (pox_cycle_length - 3);
372394
let should_submit_pox_orders = block.metadata.pox_cycle_position == (pox_cycle_length - 2);
373395
if should_submit_pox_orders {
374-
if let Ok(config_reader) = config.inner().lock() {
375-
let bitcoin_block_height = block.metadata.bitcoin_anchor_block_identifier.index;
376-
if let Some(count) = publish_stacking_orders(
377-
&config_reader.devnet_config,
378-
&config_reader.accounts,
379-
&pox_info,
380-
bitcoin_block_height as u32,
381-
config_reader.deployment_fee_rate,
382-
) {
383-
if let Ok(tx) = devnet_events_tx.lock() {
384-
let _ = tx.send(DevnetEvent::success(format!(
385-
"Will broadcast {} stacking orders",
386-
count
387-
)));
388-
}
389-
}
390-
}
391-
}
392-
if should_update_pox_info {
393-
if let Ok(mut indexer) = indexer_rw_lock.inner().write() {
394-
indexer.update_pox_info().await;
396+
if let Ok(background_job_tx) = background_job_tx_mutex.lock() {
397+
let _ = background_job_tx.send(EventsObserverCommand::PublishPoxStackingOrders(block.metadata.bitcoin_anchor_block_identifier.clone()));
395398
}
396399
}
397400

@@ -480,11 +483,20 @@ pub fn handle_drop_mempool_tx() -> Json<JsonValue> {
480483
}))
481484
}
482485

486+
#[get("/ping", format = "application/json")]
487+
pub fn handle_ping() -> Json<JsonValue> {
488+
Json(json!({
489+
"status": 200,
490+
"result": "Ok",
491+
}))
492+
}
493+
494+
483495
pub fn publish_initial_contracts(
484496
devnet_config: &DevnetConfig,
485497
accounts: &Vec<Account>,
486-
init_status: &mut DevnetInitializationStatus,
487498
deployment_fee_rate: u64,
499+
init_status: &mut DevnetInitializationStatus,
488500
) -> Option<usize> {
489501
let contracts_left = init_status.contracts_left_to_deploy.len();
490502
if contracts_left == 0 {
@@ -507,8 +519,6 @@ pub fn publish_initial_contracts(
507519
contracts_to_deploy.push(contract);
508520
}
509521

510-
let moved_node_url = node_url.clone();
511-
512522
let mut deployers_lookup = BTreeMap::new();
513523
for account in accounts.iter() {
514524
if account.name == "deployer" {
@@ -521,15 +531,13 @@ pub fn publish_initial_contracts(
521531
let contract_to_deploy_len = contracts_to_deploy.len();
522532
init_status.deployer_nonce += contract_to_deploy_len as u64;
523533

524-
// Move the transactions submission to another thread, the clock on that thread is ticking,
525-
// and blocking our stacks-node
526534
std::thread::spawn(move || {
527535
for contract in contracts_to_deploy.into_iter() {
528536
match publish_contract(
529537
&contract,
530538
&deployers_lookup,
531539
&mut deployers_nonces,
532-
&moved_node_url,
540+
&node_url,
533541
deployment_fee_rate,
534542
&Network::Devnet,
535543
) {
@@ -550,19 +558,25 @@ pub fn publish_initial_contracts(
550558
Some(contract_to_deploy_len)
551559
}
552560

553-
pub fn publish_stacking_orders(
561+
pub async fn publish_stacking_orders(
554562
devnet_config: &DevnetConfig,
555563
accounts: &Vec<Account>,
556-
pox_info: &PoxInfo,
557-
bitcoin_block_height: u32,
558564
fee_rate: u64,
565+
bitcoin_block_height: u32,
559566
) -> Option<usize> {
560567
if devnet_config.pox_stacking_orders.len() == 0 {
561568
return None;
562569
}
563570

564-
let node_url = format!("http://localhost:{}", devnet_config.stacks_node_rpc_port);
571+
let stacks_node_rpc_url = format!("http://localhost:{}", devnet_config.stacks_node_rpc_port);
572+
565573
let mut transactions = 0;
574+
let pox_info: PoxInfo = reqwest::get(format!("{}/v2/pox", stacks_node_rpc_url))
575+
.await
576+
.expect("Unable to retrieve pox info")
577+
.json()
578+
.await
579+
.expect("Unable to parse contract");
566580

567581
for pox_stacking_order in devnet_config.pox_stacking_orders.iter() {
568582
if pox_stacking_order.start_at_cycle == (pox_info.reward_cycle_id + 1) {
@@ -579,8 +593,8 @@ pub fn publish_stacking_orders(
579593

580594
transactions += 1;
581595

582-
let stacks_rpc = StacksRpc::new(&node_url);
583596
let default_fee = fee_rate * 10;
597+
let stacks_rpc = StacksRpc::new(&stacks_node_rpc_url);
584598
let nonce = stacks_rpc
585599
.get_nonce(&account.address)
586600
.expect("Unable to retrieve nonce");

src/integrate/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tracing_appender;
1010

1111
use crate::types::{BitcoinBlockData, StacksBlockData};
1212
use crate::utils;
13-
use events_observer::start_events_observer;
13+
use events_observer::{start_events_observer, EventsObserverCommand};
1414
pub use orchestrator::DevnetOrchestrator;
1515

1616
use self::events_observer::EventObserverConfig;
@@ -78,9 +78,10 @@ pub async fn do_run_devnet(
7878
let config = EventObserverConfig::new(devnet_config, devnet.manifest_path.clone());
7979
let contracts_to_deploy_len = config.contracts_to_deploy.len();
8080
let events_observer_tx = devnet_events_tx.clone();
81-
let (events_observer_terminator_tx, terminator_rx) = channel();
81+
let (events_observer_commands_tx, events_observer_commands_rx) = channel();
82+
let moved_events_observer_commands_tx = events_observer_commands_tx.clone();
8283
let events_observer_handle = std::thread::spawn(move || {
83-
let future = start_events_observer(config, events_observer_tx, terminator_rx);
84+
let future = start_events_observer(config, events_observer_tx, events_observer_commands_rx, moved_events_observer_commands_tx);
8485
let rt = utils::create_basic_runtime();
8586
let _ = rt.block_on(future);
8687
});
@@ -106,17 +107,17 @@ pub async fn do_run_devnet(
106107
let _ = ui::start_ui(
107108
devnet_events_tx,
108109
devnet_events_rx,
109-
events_observer_terminator_tx,
110+
events_observer_commands_tx,
110111
orchestrator_terminator_tx,
111112
orchestrator_terminated_rx,
112113
&devnet_path,
113114
);
114115
} else {
115116
let moved_orchestrator_terminator_tx = orchestrator_terminator_tx.clone();
116-
let moved_events_observer_terminator_tx = events_observer_terminator_tx.clone();
117+
let moved_events_observer_commands_tx = events_observer_commands_tx.clone();
117118
ctrlc::set_handler(move || {
118-
moved_events_observer_terminator_tx
119-
.send(true)
119+
moved_events_observer_commands_tx
120+
.send(EventsObserverCommand::Terminate(true))
120121
.expect("Unable to terminate devnet");
121122
moved_orchestrator_terminator_tx
122123
.send(true)

src/integrate/ui/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ mod ui;
55
#[allow(dead_code)]
66
mod util;
77

8-
use super::DevnetEvent;
8+
use super::{DevnetEvent, events_observer::EventsObserverCommand};
99
use app::App;
1010
use crossterm::{
1111
event::{self, Event, KeyCode, KeyModifiers},
@@ -24,7 +24,7 @@ use tui::{backend::CrosstermBackend, Terminal};
2424
pub fn start_ui(
2525
devnet_events_tx: Sender<DevnetEvent>,
2626
devnet_events_rx: Receiver<DevnetEvent>,
27-
events_observer_terminator_tx: Sender<bool>,
27+
events_observer_commands_tx: Sender<EventsObserverCommand>,
2828
orchestrator_terminator_tx: Sender<bool>,
2929
orchestrator_terminated_rx: Receiver<bool>,
3030
devnet_path: &str,
@@ -73,7 +73,7 @@ pub fn start_ui(
7373
app.display_log(DevnetEvent::log_warning("Ctrl+C received, initiating termination sequence.".into()));
7474
let _ = trigger_reset(
7575
true,
76-
&events_observer_terminator_tx,
76+
&events_observer_commands_tx,
7777
&orchestrator_terminator_tx);
7878

7979
let _ = terminate(
@@ -87,7 +87,7 @@ pub fn start_ui(
8787
app.display_log(DevnetEvent::log_warning("Reset Devnet...".into()));
8888
let _ = trigger_reset(
8989
false,
90-
&events_observer_terminator_tx,
90+
&events_observer_commands_tx,
9191
&orchestrator_terminator_tx);
9292
},
9393
(_, KeyCode::Left) => app.on_left(),
@@ -131,11 +131,11 @@ pub fn start_ui(
131131

132132
fn trigger_reset(
133133
terminate: bool,
134-
events_observer_terminator_tx: &Sender<bool>,
134+
events_observer_commands_tx: &Sender<EventsObserverCommand>,
135135
orchestrator_terminator_tx: &Sender<bool>,
136136
) -> Result<(), Box<dyn Error>> {
137-
events_observer_terminator_tx
138-
.send(terminate)
137+
events_observer_commands_tx
138+
.send(EventsObserverCommand::Terminate(true))
139139
.expect("Unable to terminate devnet");
140140
orchestrator_terminator_tx
141141
.send(terminate)

0 commit comments

Comments
 (0)