Skip to content

Commit c19d0c3

Browse files
x46085shanecelis
andauthored
Add pre_step & ctx.truncate to Elodin Sim (#400)
* Add elodin-py simulation pre-step * Add pre/post step truncate() to clear Simulation DB * chore: Make Clippy happy. * doc: Update release.md. * style: Reformat. --------- Co-authored-by: Shane Celis <shane.celis@gmail.com> Co-authored-by: Shane Celis <scelis@singularityus.com>
1 parent 30ea087 commit c19d0c3

File tree

12 files changed

+186
-39
lines changed

12 files changed

+186
-39
lines changed

docs/internal/release.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ Doing a pre-release should be quick-and-dirty on-demand operation without any of
3939
1. Run `just tag v$VERSION` on the updated `main` branch.
4040
2. Run `just promote v$VERSION` to promote the artifacts from GitHub to S3 and PyPi.
4141
```sh
42-
export VERSION=0.15.5-alpha.0
42+
export PYPI_TOKEN=pypi-...; # You'll need this token to publish the pypi package.
43+
export VERSION=0.16.0-alpha.1
44+
# Start release. Wait 40 minutes. Then publish pypi package.
4345
just tag v$VERSION origin/main && sleep $((40 * 60)) && just promote v$VERSION
4446
```
4547
3. Once released, we want to bump the version of alpha so that what was just released and what's on the 'main' branch will not be easily confused. If the version released was `0.1.2-alpha.0` then the next version would be `0.1.2-alpha.1`.

examples/betaflight-sitl/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ def __post_init__(self):
184184
_rc_channels_buffer = np.full(MAX_RC_CHANNELS, 1500, dtype=np.uint16)
185185

186186

187-
def sitl_post_step(tick: int, ctx: el.PostStepContext):
187+
def sitl_post_step(tick: int, ctx: el.StepContext):
188188
"""
189189
Post-step callback for lockstep SITL synchronization.
190190
@@ -229,6 +229,7 @@ def sitl_post_step(tick: int, ctx: el.PostStepContext):
229229
pass # Expected during initial warmup
230230
print(f"[SITL] Warmup complete ({warmup_count} responses at {config.pid_rate:.0f}Hz)")
231231
print("[SITL] Bridge ready")
232+
ctx.truncate() # Clears all data, resets tick to 0
232233

233234
if start_time[0] is None:
234235
start_time[0] = time.time()
@@ -305,7 +306,7 @@ def sitl_post_step(tick: int, ctx: el.PostStepContext):
305306
s.max_motor = max(s.max_motor, np.max(s.motors))
306307

307308
# Write motor commands back to Elodin-DB for physics simulation
308-
# This uses the PostStepContext for direct DB access (no TCP overhead)
309+
# This uses the StepContext for direct DB access (no TCP overhead)
309310
ctx.write_component("drone.motor_command", s.motors)
310311
except TimeoutError:
311312
pass # Timeouts expected during bootgrace

libs/db/src/append_log.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,14 @@ impl<E: IntoBytes + Immutable> AppendLog<E> {
175175
drop(guard);
176176
Ok(end - size_of::<Header<E>>())
177177
}
178+
179+
/// Truncate the append log, clearing all data while preserving the header.
180+
///
181+
/// This resets the committed_len back to just the header size, effectively
182+
/// removing all stored data without deallocating the memory-mapped file.
183+
pub fn truncate(&self) {
184+
let _guard = self.write_lock.lock().unwrap();
185+
self.committed_len()
186+
.store(size_of::<Header<E>>() as u64, Ordering::SeqCst);
187+
}
178188
}

libs/db/src/lib.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,18 @@ impl DB {
394394
Ok(())
395395
}
396396

397+
/// Truncate all component data and message logs, clearing all data while preserving schemas and metadata.
398+
///
399+
/// This effectively resets the database to an empty state, ready for fresh data.
400+
/// The vtable generation is incremented to signal that clients should refresh their views.
401+
pub fn truncate(&self) {
402+
self.with_state(|state| {
403+
state.truncate_all();
404+
});
405+
self.last_updated.store(Timestamp(i64::MIN));
406+
self.vtable_gen.fetch_add(1, atomic::Ordering::SeqCst);
407+
}
408+
397409
pub fn copy_native(&self, target_db_path: impl AsRef<Path>) -> Result<PathBuf, Error> {
398410
let final_db_dir = target_db_path.as_ref().to_path_buf();
399411
let parent_dir = final_db_dir
@@ -751,6 +763,16 @@ impl State {
751763
msg_log.set_metadata(metadata)?;
752764
Ok(())
753765
}
766+
767+
/// Truncate all components and message logs, clearing all data while preserving schemas and metadata.
768+
pub fn truncate_all(&self) {
769+
for component in self.components.values() {
770+
component.truncate();
771+
}
772+
for msg_log in self.msg_logs.values() {
773+
msg_log.truncate();
774+
}
775+
}
754776
}
755777

756778
#[derive(Clone, Debug, PartialEq)]
@@ -954,6 +976,11 @@ impl Component {
954976
fn sync_all(&self) -> Result<(), Error> {
955977
self.time_series.sync_all()
956978
}
979+
980+
/// Truncate the component, clearing all time-series data while preserving the schema.
981+
pub fn truncate(&self) {
982+
self.time_series.truncate();
983+
}
957984
}
958985

959986
struct DBSink<'a> {

libs/db/src/msg_log.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,15 @@ impl MsgLog {
136136
Ok(())
137137
}
138138

139+
/// Truncate the message log, clearing all messages while preserving metadata.
140+
///
141+
/// This resets all underlying append logs, effectively removing all stored
142+
/// messages without deallocating the underlying files.
143+
pub fn truncate(&self) {
144+
self.timestamps.truncate();
145+
self.bufs.truncate();
146+
}
147+
139148
pub fn set_metadata(&mut self, metadata: MsgMetadata) -> Result<(), Error> {
140149
let metadata = self.metadata.insert(metadata);
141150
let metadata_path = self.path.join("metadata");
@@ -165,6 +174,11 @@ impl BufLog {
165174
Ok(())
166175
}
167176

177+
fn truncate(&self) {
178+
self.offsets.truncate();
179+
self.data_log.truncate();
180+
}
181+
168182
pub fn get_msg(&self, index: usize) -> Option<&[u8]> {
169183
let buf = self.bufs().get(index)?;
170184
let data = match buf.len as usize {

libs/db/src/time_series.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,15 @@ impl TimeSeries {
142142
Ok(())
143143
}
144144

145+
/// Truncate the time series, clearing all data while preserving the schema.
146+
///
147+
/// This resets both the index and data append logs, effectively removing
148+
/// all stored time-series data without deallocating the underlying files.
149+
pub fn truncate(&self) {
150+
self.index.truncate();
151+
self.data.truncate();
152+
}
153+
145154
pub fn push_buf(&self, timestamp: Timestamp, buf: &[u8]) -> Result<(), Error> {
146155
let len = self.index.len() as usize;
147156

libs/nox-ecs/src/impeller2_server.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use impeller2_wkt::{ComponentMetadata, EntityMetadata};
55
use nox_ecs::Error;
66
use std::{
77
collections::HashSet,
8-
sync::{Arc, atomic},
8+
sync::{
9+
Arc,
10+
atomic::{self, AtomicU64, Ordering},
11+
},
912
time::{Duration, Instant},
1013
};
1114
use stellarator::struc_con::{Joinable, Thread};
@@ -25,14 +28,15 @@ impl Server {
2528

2629
pub async fn run(self) -> Result<(), Error> {
2730
tracing::info!("running server");
28-
self.run_with_cancellation(|| false, |_, _, _| {}, false)
31+
self.run_with_cancellation(|| false, |_, _, _, _| {}, |_, _, _, _| {}, false)
2932
.await
3033
}
3134

3235
pub async fn run_with_cancellation(
3336
self,
3437
is_cancelled: impl Fn() -> bool + 'static,
35-
post_step: impl Fn(u64, &Arc<DB>, Timestamp) + 'static,
38+
pre_step: impl Fn(u64, &Arc<DB>, &Arc<AtomicU64>, Timestamp) + 'static,
39+
post_step: impl Fn(u64, &Arc<DB>, &Arc<AtomicU64>, Timestamp) + 'static,
3640
interactive: bool,
3741
) -> Result<(), Error> {
3842
tracing::info!("running server with cancellation");
@@ -41,6 +45,8 @@ impl Server {
4145
let start_time = Timestamp::now();
4246
init_db(&db, &mut world.world, start_time)?;
4347
let tick_db = db.clone();
48+
// Shared tick counter that can be reset by StepContext::truncate()
49+
let tick_counter = Arc::new(AtomicU64::new(0));
4450
let stream: Thread<Option<Result<(), Error>>> =
4551
stellarator::struc_con::stellar(move || async move {
4652
let mut handles = vec![];
@@ -51,8 +57,10 @@ impl Server {
5157
});
5258
let tick = stellarator::spawn(tick(
5359
tick_db,
60+
tick_counter,
5461
world,
5562
is_cancelled,
63+
pre_step,
5664
post_step,
5765
start_time,
5866
interactive,
@@ -167,7 +175,10 @@ pub fn copy_db_to_world(state: &State, world: &mut WorldExec<Compiled>) {
167175
let Some(component) = state.get_component(pair_id) else {
168176
continue;
169177
};
170-
let (_, head) = component.time_series.latest().unwrap();
178+
// After truncate(), time series may be empty - skip if no data
179+
let Some((_, head)) = component.time_series.latest() else {
180+
continue;
181+
};
171182

172183
// Check if the value has changed
173184
let current_value = &column.buffer[offset..offset + size];
@@ -262,16 +273,18 @@ pub fn commit_world_head(
262273
Ok(())
263274
}
264275

276+
#[allow(clippy::too_many_arguments)]
265277
async fn tick(
266278
db: Arc<DB>,
279+
tick_counter: Arc<AtomicU64>,
267280
mut world: WorldExec<Compiled>,
268281
is_cancelled: impl Fn() -> bool + 'static,
269-
post_step: impl Fn(u64, &Arc<DB>, Timestamp) + 'static,
270-
mut timestamp: Timestamp,
282+
pre_step: impl Fn(u64, &Arc<DB>, &Arc<AtomicU64>, Timestamp) + 'static,
283+
post_step: impl Fn(u64, &Arc<DB>, &Arc<AtomicU64>, Timestamp) + 'static,
284+
start_timestamp: Timestamp,
271285
interactive: bool,
272286
) {
273287
// XXX This is what world.run ultimately calls.
274-
let mut tick = 0;
275288
let external_controls: HashSet<ComponentId> = external_controls(&world).collect();
276289
let wait_for_write: Vec<ComponentId> = wait_for_write(&world).collect();
277290
let wait_for_write_pair_ids: Vec<PairId> = get_pair_ids(&world, &wait_for_write).unwrap();
@@ -281,15 +294,22 @@ async fn tick(
281294
.metadata
282295
.run_time_step
283296
.map(|time_step| time_step.0);
297+
let time_step = world.world.sim_time_step().0;
284298
while db.recording_cell.wait().await {
285299
let start = Instant::now();
300+
// Read tick from shared counter (can be reset by StepContext::truncate())
301+
let tick = tick_counter.load(Ordering::SeqCst);
302+
// Calculate timestamp based on current tick
303+
let timestamp = start_timestamp + time_step * (tick as u32);
286304
if tick >= world.world.max_tick() {
287305
db.recording_cell.set_playing(false);
288306
world.world.metadata.max_tick = u64::MAX;
289307
if !interactive {
290308
return;
291309
}
292310
}
311+
// Python pre_step func runs (before copy_db_to_world so writes are picked up).
312+
pre_step(tick, &db, &tick_counter, timestamp);
293313
db.with_state(|state| copy_db_to_world(state, &mut world));
294314
// JAX runs.
295315
if let Err(err) = world.run() {
@@ -314,16 +334,15 @@ async fn tick(
314334
if is_cancelled() {
315335
return;
316336
}
317-
// Python func runs.
318-
post_step(tick, &db, timestamp);
337+
// Python post_step func runs.
338+
post_step(tick, &db, &tick_counter, timestamp);
319339
// We only wait if there is a run_time_step set and it's >= the time elapsed.
320340
if let Some(run_time_step) = run_time_step.as_ref()
321341
&& let Some(sleep_time) = run_time_step.checked_sub(start.elapsed())
322342
{
323343
stellarator::sleep(sleep_time).await;
324344
}
325-
tick += 1;
326-
timestamp += world.world.sim_time_step().0;
345+
tick_counter.fetch_add(1, Ordering::SeqCst);
327346
}
328347
}
329348

libs/nox-py/python/elodin/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ def run(
403403
max_ticks: Optional[int] = None,
404404
optimize: bool = False,
405405
is_canceled: Optional[callable] = None,
406+
pre_step: Optional[callable] = None,
406407
post_step: Optional[callable] = None,
407408
db_path: Optional[str] = None,
408409
interactive: bool = True,
@@ -421,6 +422,7 @@ def run(
421422
max_ticks,
422423
optimize,
423424
is_canceled,
425+
pre_step,
424426
post_step,
425427
db_path,
426428
interactive,

libs/nox-py/python/elodin/elodin.pyi

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ class Integrator:
2222
Rk4: Integrator
2323
SemiImplicit: Integrator
2424

25-
class PostStepContext:
26-
"""Context object passed to post_step callbacks, providing direct DB read/write access.
25+
class StepContext:
26+
"""Context object passed to pre_step and post_step callbacks, providing direct DB read/write access.
2727
2828
This enables SITL workflows to read sensor data and write component data (like motor
29-
commands from Betaflight) back to the database within the same process.
29+
commands from Betaflight) directly to the database within the same process.
3030
"""
3131
@property
3232
def tick(self) -> int:
@@ -87,6 +87,15 @@ class PostStepContext:
8787
ValueError: If any write data size doesn't match the component schema
8888
"""
8989
...
90+
def truncate(self) -> None:
91+
"""Truncate all component data and message logs in the database, resetting tick to 0.
92+
93+
This clears all stored time-series data while preserving component schemas and metadata.
94+
The simulation tick will be reset to 0, effectively starting fresh.
95+
96+
Use this to control the freshness of the database and ensure reliable data from a known tick.
97+
"""
98+
...
9099

91100
class ComponentType:
92101
def __init__(self, ty: PrimitiveType, shape: Tuple[int, ...]): ...
@@ -120,7 +129,8 @@ class WorldBuilder:
120129
max_ticks: Optional[int] = None,
121130
optimize: bool = False,
122131
is_canceled: Optional[Callable[[], bool]] = None,
123-
post_step: Optional[Callable[[int, PostStepContext], None]] = None,
132+
pre_step: Optional[Callable[[int, StepContext], None]] = None,
133+
post_step: Optional[Callable[[int, StepContext], None]] = None,
124134
db_path: Optional[str] = None,
125135
interactive: bool = True,
126136
): ...

libs/nox-py/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ mod error;
2121
mod exec;
2222
mod graph;
2323
mod linalg;
24-
mod post_step_context;
2524
mod query;
2625
mod s10;
2726
mod spatial;
27+
mod step_context;
2828
mod system;
2929
mod ukf;
3030
mod world_builder;
@@ -36,9 +36,9 @@ pub use error::*;
3636
pub use exec::*;
3737
pub use graph::*;
3838
pub use linalg::*;
39-
pub use post_step_context::*;
4039
pub use query::*;
4140
pub use spatial::*;
41+
pub use step_context::*;
4242
pub use system::*;
4343
pub use world_builder::*;
4444

@@ -143,7 +143,7 @@ pub fn elodin(m: &Bound<'_, PyModule>) -> PyResult<()> {
143143
m.add_class::<QueryMetadata>()?;
144144
m.add_class::<SystemBuilder>()?;
145145
m.add_class::<System>()?;
146-
m.add_class::<PostStepContext>()?;
146+
m.add_class::<StepContext>()?;
147147
m.add_function(wrap_pyfunction!(six_dof, m)?)?;
148148
m.add_function(wrap_pyfunction!(skew, m)?)?;
149149
m.add_function(wrap_pyfunction!(_get_cache_dir, m)?)?;

0 commit comments

Comments
 (0)