Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/language/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ Cross-temporal comparisons are supported: dates, times, and timestamps are all c
| `.db.splayed.set` | variadic | Save table as splayed columns to directory | `(.db.splayed.set "db/trades" trades)` |
| `.db.splayed.get` | variadic | Load splayed table from directory | `(.db.splayed.get "db/trades")` |
| `.db.parted.get` | variadic | Load partitioned table by name from root directory | `(.db.parted.get "db" 'trades)` |
| `.db.parted.tables` | variadic | List table names under a parted root | `(.db.parted.tables "db")` |
| `.db.parted.fill` | variadic | Backfill missing tables across partitions | `(.db.parted.fill "db")` |

## EAV (Entity-Attribute-Value)

Expand Down
22 changes: 21 additions & 1 deletion docs/docs/namespaces/db.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The `get` builtins memory-map every column file — load is constant-time regard
| [`.db.splayed.get`](#db-splayed-get) | variadic | — | Load a splayed table (columns mmap'd). |
| [`.db.parted.get`](#db-parted-get) | variadic | — | Load a partitioned table by name from a db root. |
| [`.db.parted.tables`](#db-parted-tables) | variadic | — | List the table names available under a parted db root. |
| [`.db.parted.fill`](#db-parted-fill) | variadic | restricted | Fill missing tables across a parted db's partitions. |

## `.db.splayed.set` { #db-splayed-set }

Expand Down Expand Up @@ -70,7 +71,7 @@ Errors: `domain` (arity != 2 or `tbl_name` invalid), `type` (root not a string o

Signature: `(.db.parted.tables "db_root")`.

Returns a sorted `sym` vector of the table names available under a parted `db_root` — the splayed-table subdirectories (those with a `.d` schema) of the first partition. Each name can be passed straight to `.db.parted.get`; nothing is loaded or bound by this call.
Returns a sorted `sym` vector of the table names available under a parted `db_root` — the splayed-table subdirectories (those with a `.d` schema) of the **most recent** (last, sorted) partition, which reflects the current table set. Each name can be passed straight to `.db.parted.get`; nothing is loaded or bound by this call.

```lisp
(.db.parted.tables "/data/db")
Expand All @@ -82,6 +83,25 @@ Returns a sorted `sym` vector of the table names available under a parted `db_ro

Errors: `domain` (arity != 1), `type` (root not a string), `io` (root unreadable or not a parted root — no partition directories).

## `.db.parted.fill` { #db-parted-fill }

Signature: `(.db.parted.fill "db_root")`.

For every table that appears in **any** partition, ensures **every** partition has it: a partition missing the table gets an **empty** copy whose schema is taken from the most recent partition that does have it. This keeps `select`s that span partitions from failing on a partition where a table is absent — the typical case being a table added partway through the database's life, or a partition written before that table existed.

Returns a sorted `sym` vector of the partition names that were filled (an **empty** vector when nothing needed fixing, so a repeat call is a no-op). Requires write permission on the db root.

```lisp
;; trades exists in every day, but `news` was only added from 2024.01.10 on.
(.db.parted.fill "/data/db")
;; => [`2024.01.01 `2024.01.02 … `2024.01.09] ; days that gained an empty `news`

;; now every partition has every table; cross-partition queries are safe.
(select {from: (.db.parted.get "/data/db" 'news)})
```

The filled copies are empty, so aggregate results across the db are unchanged — only the on-disk uniformity is. Errors: `domain` (arity != 1), `type` (root not a string), `io` (not a parted root), plus any `oom`/`corrupt` surfaced while reading a template or writing a copy.

## See also

- [`.csv.splayed`](csv.md#csv-splayed) / [`.csv.parted`](csv.md#csv-parted) — stream CSV directly into splayed / parted layouts.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/namespaces/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Rayfall's builtins are organised under dotted namespaces. Names beginning with `
When the server is started with `-U <password>`, the following dotted builtins are blocked (return an `access` error) for IPC peers — see [IPC restricted mode](../storage/ipc.md). Page-level admonitions on each namespace flag the exact builtins that carry the `RAY_FN_RESTRICTED` attribute.

- `.csv.read`, `.csv.write`, `.csv.splayed`, `.csv.parted`
- `.db.splayed.set`
- `.db.splayed.set`, `.db.parted.fill`
- `.ipc.open`, `.ipc.close`, `.ipc.send`
- `.log.open`, `.log.replay`, `.log.roll`, `.log.snapshot`, `.log.close`
- `.os.getenv`, `.os.setenv`
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/reference/all-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,8 @@ Persistent columnar storage — splayed (one file per column) and partitioned ta
| `.db.splayed.set` | variadic | restricted | Save table as splayed columns to a directory | `(.db.splayed.set "db/trades" trades)` |
| `.db.splayed.get` | variadic | — | Load splayed table from a directory | `(.db.splayed.get "db/trades")` |
| `.db.parted.get` | variadic | — | Load partitioned table from root directory | `(.db.parted.get "db" 'trades)` |
| `.db.parted.tables` | variadic | — | List table names under a parted root (from the most recent partition) | `(.db.parted.tables "db")` |
| `.db.parted.fill` | variadic | restricted | Backfill missing tables across a parted db's partitions | `(.db.parted.fill "db")` |

```lisp
; Save and reload a splayed table
Expand Down
13 changes: 12 additions & 1 deletion docs/docs/storage/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,25 @@ This scans all date-named subdirectories under `db/trades/`, memory-maps every c

### `.db.parted.tables` — List a Root's Table Names

Returns a sorted `sym` vector of the table names available under a partitioned database root — the table subdirectories of the first partition. Each name can be passed straight to `.db.parted.get`; nothing is loaded by this call.
Returns a sorted `sym` vector of the table names available under a partitioned database root — the table subdirectories of the **most recent** (last, sorted) partition, which reflects the current table set. Each name can be passed straight to `.db.parted.get`; nothing is loaded by this call.

```lisp
; Discover which tables live under db/, then load each one
(.db.parted.tables "db") ; => [`quotes `trades]
(map (fn [t] (.db.parted.get "db" t)) (.db.parted.tables "db"))
```

### `.db.parted.fill` — Backfill Missing Tables Across Partitions

For every table present in **any** partition, ensures **every** partition has it: a partition missing the table gets an **empty** copy whose schema is taken from the most recent partition that has it. This keeps queries that span partitions from failing on a partition where a table is absent — typically a table added partway through the database's life. Returns a sorted `sym` vector of the partition names it filled (empty when nothing needed fixing, so a repeat call is a no-op). Requires write permission on the root.

```lisp
; `news` was only added from 2024.01.10 onward; backfill the earlier days.
(.db.parted.fill "db") ; => [`2024.01.01 … `2024.01.09]
; now every partition has every table — the empty copies add no rows.
(count (.db.parted.get "db" 'news)) ; unchanged
```

## Symbol Table Management

Symbol tables are persisted automatically when you use `.db.splayed.set`. A `.sym` file is written into the table directory containing all interned symbol strings. When loading with `.db.splayed.get` or `.db.parted.get`, the symbol table is loaded first so that symbol columns decode correctly.
Expand Down
148 changes: 76 additions & 72 deletions src/core/sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,95 +86,99 @@ ray_sock_t ray_sock_accept(ray_sock_t srv)
return fd;
}

ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms)
/* Connect an already-created socket `fd` to one resolved address. With
* timeout_ms > 0 the connect is driven non-blocking + poll (a blocking
* connect() ignores SO_*TIMEO) and the same budget is then applied as the
* handshake SO_RCVTIMEO/SO_SNDTIMEO; otherwise it is a plain blocking
* connect. Returns 0 on success, -1 on failure with errno set
* (ETIMEDOUT on a connect timeout). */
static int sock_connect_one(ray_sock_t fd, const struct sockaddr* addr,
socklen_t addrlen, int timeout_ms)
{
struct addrinfo hints, *res = NULL;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;

char port_str[8];
snprintf(port_str, sizeof(port_str), "%u", (unsigned)port);

if (getaddrinfo(host, port_str, &hints, &res) != 0 || !res)
return RAY_INVALID_SOCK;
if (timeout_ms <= 0)
return connect(fd, addr, addrlen) < 0 ? -1 : 0;

ray_sock_t fd = (ray_sock_t)socket(res->ai_family, res->ai_socktype,
res->ai_protocol);
if (fd == RAY_INVALID_SOCK) {
freeaddrinfo(res);
return RAY_INVALID_SOCK;
}

if (timeout_ms > 0) {
/* Bounded connect. A blocking connect() ignores SO_*TIMEO, so to
* honor the requested timeout we connect non-blocking and wait at
* most timeout_ms for the socket to become writable (= connected
* or failed), then restore blocking mode for the handshake. */
ray_sock_set_nonblocking(fd);
int rc = connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen);
if (rc < 0) {
ray_sock_set_nonblocking(fd);
int rc = connect(fd, addr, addrlen);
if (rc < 0) {
#ifdef RAY_OS_WINDOWS
int werr = WSAGetLastError();
int in_progress = (werr == WSAEWOULDBLOCK || werr == WSAEINPROGRESS);
int werr = WSAGetLastError();
int in_progress = (werr == WSAEWOULDBLOCK || werr == WSAEINPROGRESS);
#else
int in_progress = (errno == EINPROGRESS);
int in_progress = (errno == EINPROGRESS);
#endif
if (!in_progress) {
ray_sock_close(fd);
freeaddrinfo(res);
return RAY_INVALID_SOCK;
}
struct pollfd pfd = { .fd = fd, .events = POLLOUT };
int pr;
if (!in_progress) return -1;
struct pollfd pfd = { .fd = fd, .events = POLLOUT };
int pr;
#ifdef RAY_OS_WINDOWS
pr = WSAPoll(&pfd, 1, timeout_ms);
pr = WSAPoll(&pfd, 1, timeout_ms);
#else
do { pr = poll(&pfd, 1, timeout_ms); } while (pr < 0 && errno == EINTR);
do { pr = poll(&pfd, 1, timeout_ms); } while (pr < 0 && errno == EINTR);
#endif
if (pr == 0) { /* timed out — distinct from refused */
ray_sock_close(fd);
freeaddrinfo(res);
errno = ETIMEDOUT;
return RAY_INVALID_SOCK;
}
if (pr < 0) {
ray_sock_close(fd);
freeaddrinfo(res);
return RAY_INVALID_SOCK;
}
/* Writable: harvest the pending connect result via SO_ERROR. */
int soerr = 0;
socklen_t soerr_len = sizeof(soerr);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&soerr, &soerr_len) < 0
|| soerr != 0) {
ray_sock_close(fd);
freeaddrinfo(res);
if (soerr != 0) errno = soerr;
return RAY_INVALID_SOCK;
}
if (pr == 0) { errno = ETIMEDOUT; return -1; }
if (pr < 0) return -1;
/* Writable: harvest the pending connect result via SO_ERROR. */
int soerr = 0;
socklen_t soerr_len = sizeof(soerr);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&soerr, &soerr_len) < 0
|| soerr != 0) {
if (soerr != 0) errno = soerr;
return -1;
}
ray_sock_set_blocking(fd);
}
ray_sock_set_blocking(fd);

/* Apply the same budget as the handshake send/recv timeout. */
/* Apply the same budget as the handshake send/recv timeout. */
#ifdef RAY_OS_WINDOWS
DWORD tv = (DWORD)timeout_ms;
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv));
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof(tv));
DWORD tv = (DWORD)timeout_ms;
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv));
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof(tv));
#else
struct timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
struct timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
#endif
} else if (connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen) < 0) {
ray_sock_close(fd);
freeaddrinfo(res);
return 0;
}

ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms)
{
struct addrinfo hints, *res = NULL, *rp;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;

char port_str[8];
snprintf(port_str, sizeof(port_str), "%u", (unsigned)port);

if (getaddrinfo(host, port_str, &hints, &res) != 0 || !res)
return RAY_INVALID_SOCK;

/* Try every resolved address in turn, not just the first. `localhost`
* commonly resolves to BOTH ::1 (IPv6, often first) and 127.0.0.1; a
* server bound IPv4-only refuses the ::1 attempt, so we must fall
* through to the next candidate rather than give up. */
ray_sock_t fd = RAY_INVALID_SOCK;
int saved_errno = 0;
for (rp = res; rp != NULL; rp = rp->ai_next) {
fd = (ray_sock_t)socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (fd == RAY_INVALID_SOCK) { saved_errno = errno; continue; }
if (sock_connect_one(fd, rp->ai_addr, (socklen_t)rp->ai_addrlen,
timeout_ms) == 0)
break; /* connected */
saved_errno = errno;
ray_sock_close(fd);
fd = RAY_INVALID_SOCK;
}
freeaddrinfo(res);

if (fd == RAY_INVALID_SOCK) {
errno = saved_errno; /* preserve ETIMEDOUT / refused */
return RAY_INVALID_SOCK;
}

int yes = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const char*)&yes, sizeof(yes));
return fd;
Expand Down
7 changes: 7 additions & 0 deletions src/lang/compile.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,13 @@ static void compile_list(compiler_t *c, ray_t *ast) {
return;
}
compile_expr(c, elems[2]);
/* Materialize a lazy value before binding — the interpreter's
* ray_let_fn does the same. A lazy handle is single-use
* (materialization consumes its deferred graph), so a local
* that aliased one would break on its SECOND read (e.g.
* `(let v (first xs)) (if (> v 0) v 0)`: the compare consumes
* the lazy, the branch then reloads a dead handle). */
emit(c, OP_FORCE);
emit(c, OP_DUP);
int32_t slot = find_local(c, name_obj->i64);
if (slot < 0) slot = add_local(c, name_obj->i64);
Expand Down
29 changes: 29 additions & 0 deletions src/lang/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,7 @@ static ray_t* vm_exec(ray_t* lambda, ray_t** call_args, int64_t argc) {
[OP_SCOPE_BEGIN] = &&op_scope_begin,
[OP_SCOPE_END] = &&op_scope_end,
[OP_TRYH] = &&op_tryh,
[OP_FORCE] = &&op_force,
};

/* Arity check before allocating VM state */
Expand Down Expand Up @@ -1914,6 +1915,21 @@ op_loadconst_w: {
op_loadenv: {
uint8_t slot = code[ip++];
ray_t *val = LOCAL(slot);
if (val && ray_is_lazy(val)) {
/* Force a lazy local to a concrete, reusable value on first read,
* storing it back into the slot. A lazy handle is single-use
* (materialization consumes its deferred graph), so a second read
* would otherwise see a dead handle — e.g. a lazy first/last bound
* to a lambda PARAM (which, unlike `let`, is not forced at bind)
* and then used twice: `((fn [v] (if (> v 0) v 0)) (first xs))`. */
val = ray_lazy_materialize(val); /* consumes the slot's ref */
if (!val || RAY_IS_ERR(val)) {
vm_err_obj = val ? val : ray_error("type", NULL);
LOCAL(slot) = NULL; /* ref already consumed */
goto vm_error;
}
LOCAL(slot) = val; /* slot now owns the concrete */
}
if (val) ray_retain(val);
else val = make_i64(0);
PUSH(val);
Expand Down Expand Up @@ -2376,6 +2392,18 @@ op_tryh: {
DISPATCH();
}

op_force: {
/* Materialize a lazy TOS so a let-bound local holds a concrete,
* reusable value (a lazy handle is single-use). */
ray_t* v = POP();
if (v && ray_is_lazy(v)) {
v = ray_lazy_materialize(v); /* consumes; concrete or error */
if (!v || RAY_IS_ERR(v)) { vm_err_obj = v ? v : ray_error("type", NULL); goto vm_error; }
}
PUSH(v);
DISPATCH();
}

op_scope_begin: {
/* Stack: [.., syms_vec] — materialize the live locals (slot i holds
* the value of syms[i]) plus self into a fresh scope frame so the
Expand Down Expand Up @@ -2847,6 +2875,7 @@ static void ray_register_builtins(void) {
register_vary(".db.splayed.get", RAY_FN_NONE, ray_get_splayed_fn);
register_vary(".db.parted.get", RAY_FN_NONE, ray_get_parted_fn);
register_vary(".db.parted.tables", RAY_FN_NONE, ray_get_parted_tables_fn);
register_vary(".db.parted.fill", RAY_FN_RESTRICTED, ray_fill_parted_fn);

/* GUID generation */
register_unary("guid", RAY_FN_NONE, ray_guid_fn);
Expand Down
2 changes: 2 additions & 0 deletions src/lang/eval.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ enum {
OP_TRYH, /* try-handler dispatch: pop err_val + handler; if
* handler is callable, call it with err_val, else
* push handler as a fallback value */
OP_FORCE, /* materialize TOS if it is a lazy handle (so a
* let-bound local holds a concrete, reusable value) */
OP__COUNT
};

Expand Down
1 change: 1 addition & 0 deletions src/lang/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ ray_t* ray_set_splayed_fn(ray_t** args, int64_t n);
ray_t* ray_get_splayed_fn(ray_t** args, int64_t n);
ray_t* ray_get_parted_fn(ray_t** args, int64_t n);
ray_t* ray_get_parted_tables_fn(ray_t** args, int64_t n);
ray_t* ray_fill_parted_fn(ray_t** args, int64_t n);
ray_t* ray_guid_fn(ray_t* n_arg);

/* Transaction-log journaling (.log.*) — the -l/-L feature.
Expand Down
Loading
Loading