Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,11 @@ IO.puts("Statement expects #{param_count} parameter(s)") # Prints: 1
IO.puts("Result will have #{col_count} column(s)") # Prints: 4

# Get column names
{:ok, col_names} = Enum.map(0..(col_count-1), fn i ->
{:ok, name} = EctoLibSql.Native.statement_column_name(state, stmt_id, i)
name
end)
col_names =
Enum.map(0..(col_count - 1), fn i ->
{:ok, name} = EctoLibSql.Native.statement_column_name(state, stmt_id, i)
name
end)
IO.inspect(col_names) # Prints: ["id", "name", "email", "created_at"]

:ok = EctoLibSql.Native.close_stmt(stmt_id)
Expand Down
7 changes: 3 additions & 4 deletions IMPLEMENTATION_ROADMAP_FOCUSED.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,9 @@ Logger.info("Current replication frame: #{current_frame}")
```

**libsql API**:
- `database.sync_until(frame_no)` - Sync until specific frame
- `database.get_frame_no()` - Get current frame number
- `database.flush_replicator()` - Flush pending replication
- `database.sync_frames(count)` - Sync specific number of frames
- `replication_index()` - Get current frame number
- `sync()` / `sync_until(frame_no)` - Sync replica until specific frame
- `flush_replicator()` - Flush pending replication

**Implementation**:
- [x] Add `sync_until(conn_id, frame_no)` NIF
Expand Down
87 changes: 47 additions & 40 deletions native/ecto_libsql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,45 @@ fn decode_transaction_behavior(atom: Atom) -> Option<TransactionBehavior> {
}
}

/// Helper function to verify transaction ownership.
///
/// Returns an error if the transaction does not belong to the specified connection.
fn verify_transaction_ownership(
entry: &TransactionEntry,
conn_id: &str,
) -> Result<(), rustler::Error> {
if entry.conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Transaction does not belong to this connection",
)));
}
Ok(())
}

/// Helper function to verify statement ownership.
///
/// Returns an error if the statement does not belong to the specified connection.
fn verify_statement_ownership(stmt_conn_id: &str, conn_id: &str) -> Result<(), rustler::Error> {
if stmt_conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Statement does not belong to connection",
)));
}
Ok(())
}

/// Helper function to verify cursor ownership.
///
/// Returns an error if the cursor does not belong to the specified connection.
fn verify_cursor_ownership(cursor: &CursorData, conn_id: &str) -> Result<(), rustler::Error> {
if cursor.conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Cursor does not belong to connection",
)));
}
Ok(())
}

#[rustler::nif(schedule = "DirtyIo")]
pub fn begin_transaction(conn_id: &str) -> NifResult<String> {
let conn_map = safe_lock(&CONNECTION_REGISTRY, "begin_transaction conn_map")?;
Expand Down Expand Up @@ -226,11 +265,7 @@ pub fn execute_with_transaction<'a>(
.ok_or_else(|| rustler::Error::Term(Box::new("Transaction not found")))?;

// Verify transaction belongs to this connection
if entry.conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Transaction does not belong to this connection",
)));
}
verify_transaction_ownership(entry, conn_id)?;

let decoded_args: Vec<Value> = args
.into_iter()
Expand Down Expand Up @@ -260,11 +295,7 @@ pub fn query_with_trx_args<'a>(
.ok_or_else(|| rustler::Error::Term(Box::new("Transaction not found")))?;

// Verify transaction belongs to this connection
if entry.conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Transaction does not belong to this connection",
)));
}
verify_transaction_ownership(entry, conn_id)?;

let decoded_args: Vec<Value> = args
.into_iter()
Expand Down Expand Up @@ -941,11 +972,7 @@ fn query_prepared<'a>(
.ok_or_else(|| rustler::Error::Term(Box::new("Statement not found")))?;

// Verify statement belongs to this connection
if stored_conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Statement does not belong to connection",
)));
}
verify_statement_ownership(stored_conn_id, conn_id)?;

let cached_stmt = cached_stmt.clone();

Expand Down Expand Up @@ -1005,11 +1032,7 @@ fn execute_prepared<'a>(
.ok_or_else(|| rustler::Error::Term(Box::new("Statement not found")))?;

// Verify statement belongs to this connection
if stored_conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Statement does not belong to connection",
)));
}
verify_statement_ownership(stored_conn_id, conn_id)?;

let cached_stmt = cached_stmt.clone();

Expand Down Expand Up @@ -1341,11 +1364,7 @@ fn fetch_cursor<'a>(
.ok_or_else(|| rustler::Error::Term(Box::new("Cursor not found")))?;

// Verify cursor belongs to this connection
if cursor.conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Cursor does not belong to connection",
)));
}
verify_cursor_ownership(cursor, conn_id)?;

let remaining = cursor.rows.len().saturating_sub(cursor.position);
let fetch_count = remaining.min(max_rows);
Expand Down Expand Up @@ -1623,11 +1642,7 @@ fn statement_column_count(conn_id: &str, stmt_id: &str) -> NifResult<usize> {
.ok_or_else(|| rustler::Error::Term(Box::new("Statement not found")))?;

// Verify statement belongs to this connection
if stored_conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Statement does not belong to connection",
)));
}
verify_statement_ownership(stored_conn_id, conn_id)?;

let cached_stmt = cached_stmt.clone();

Expand Down Expand Up @@ -1656,11 +1671,7 @@ fn statement_column_name(conn_id: &str, stmt_id: &str, idx: usize) -> NifResult<
.ok_or_else(|| rustler::Error::Term(Box::new("Statement not found")))?;

// Verify statement belongs to this connection
if stored_conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Statement does not belong to connection",
)));
}
verify_statement_ownership(stored_conn_id, conn_id)?;

let cached_stmt = cached_stmt.clone();

Expand Down Expand Up @@ -1699,11 +1710,7 @@ fn statement_parameter_count(conn_id: &str, stmt_id: &str) -> NifResult<usize> {
.ok_or_else(|| rustler::Error::Term(Box::new("Statement not found")))?;

// Verify statement belongs to this connection
if stored_conn_id != conn_id {
return Err(rustler::Error::Term(Box::new(
"Statement does not belong to connection",
)));
}
verify_statement_ownership(stored_conn_id, conn_id)?;

let cached_stmt = cached_stmt.clone();

Expand Down
Loading