diff --git a/docs/docs/language/functions.md b/docs/docs/language/functions.md index c5b1653b..7a0b9598 100644 --- a/docs/docs/language/functions.md +++ b/docs/docs/language/functions.md @@ -352,6 +352,8 @@ Cross-temporal comparisons are supported: dates, times, and timestamps are all c | `.db.splayed.set` | variadic | Save table as splayed columns to directory | `(.db.splayed.set "db/trades" trades)` | | `.db.splayed.get` | variadic | Load splayed table from directory | `(.db.splayed.get "db/trades")` | | `.db.parted.get` | variadic | Load partitioned table by name from root directory | `(.db.parted.get "db" 'trades)` | +| `.db.parted.tables` | variadic | List table names under a parted root | `(.db.parted.tables "db")` | +| `.db.parted.fill` | variadic | Backfill missing tables across partitions | `(.db.parted.fill "db")` | ## EAV (Entity-Attribute-Value) diff --git a/docs/docs/namespaces/db.md b/docs/docs/namespaces/db.md index 9328bafb..9de840fd 100644 --- a/docs/docs/namespaces/db.md +++ b/docs/docs/namespaces/db.md @@ -18,6 +18,7 @@ The `get` builtins memory-map every column file — load is constant-time regard | [`.db.splayed.get`](#db-splayed-get) | variadic | — | Load a splayed table (columns mmap'd). | | [`.db.parted.get`](#db-parted-get) | variadic | — | Load a partitioned table by name from a db root. | | [`.db.parted.tables`](#db-parted-tables) | variadic | — | List the table names available under a parted db root. | +| [`.db.parted.fill`](#db-parted-fill) | variadic | restricted | Fill missing tables across a parted db's partitions. | ## `.db.splayed.set` { #db-splayed-set } @@ -70,7 +71,7 @@ Errors: `domain` (arity != 2 or `tbl_name` invalid), `type` (root not a string o Signature: `(.db.parted.tables "db_root")`. -Returns a sorted `sym` vector of the table names available under a parted `db_root` — the splayed-table subdirectories (those with a `.d` schema) of the first partition. Each name can be passed straight to `.db.parted.get`; nothing is loaded or bound by this call. +Returns a sorted `sym` vector of the table names available under a parted `db_root` — the splayed-table subdirectories (those with a `.d` schema) of the **most recent** (last, sorted) partition, which reflects the current table set. Each name can be passed straight to `.db.parted.get`; nothing is loaded or bound by this call. ```lisp (.db.parted.tables "/data/db") @@ -82,6 +83,25 @@ Returns a sorted `sym` vector of the table names available under a parted `db_ro Errors: `domain` (arity != 1), `type` (root not a string), `io` (root unreadable or not a parted root — no partition directories). +## `.db.parted.fill` { #db-parted-fill } + +Signature: `(.db.parted.fill "db_root")`. + +For every table that appears in **any** partition, ensures **every** partition has it: a partition missing the table gets an **empty** copy whose schema is taken from the most recent partition that does have it. This keeps `select`s that span partitions from failing on a partition where a table is absent — the typical case being a table added partway through the database's life, or a partition written before that table existed. + +Returns a sorted `sym` vector of the partition names that were filled (an **empty** vector when nothing needed fixing, so a repeat call is a no-op). Requires write permission on the db root. + +```lisp +;; trades exists in every day, but `news` was only added from 2024.01.10 on. +(.db.parted.fill "/data/db") +;; => [`2024.01.01 `2024.01.02 … `2024.01.09] ; days that gained an empty `news` + +;; now every partition has every table; cross-partition queries are safe. +(select {from: (.db.parted.get "/data/db" 'news)}) +``` + +The filled copies are empty, so aggregate results across the db are unchanged — only the on-disk uniformity is. Errors: `domain` (arity != 1), `type` (root not a string), `io` (not a parted root), plus any `oom`/`corrupt` surfaced while reading a template or writing a copy. + ## See also - [`.csv.splayed`](csv.md#csv-splayed) / [`.csv.parted`](csv.md#csv-parted) — stream CSV directly into splayed / parted layouts. diff --git a/docs/docs/namespaces/index.md b/docs/docs/namespaces/index.md index 8ff212f7..d7e07605 100644 --- a/docs/docs/namespaces/index.md +++ b/docs/docs/namespaces/index.md @@ -23,7 +23,7 @@ Rayfall's builtins are organised under dotted namespaces. Names beginning with ` When the server is started with `-U `, the following dotted builtins are blocked (return an `access` error) for IPC peers — see [IPC restricted mode](../storage/ipc.md). Page-level admonitions on each namespace flag the exact builtins that carry the `RAY_FN_RESTRICTED` attribute. - `.csv.read`, `.csv.write`, `.csv.splayed`, `.csv.parted` -- `.db.splayed.set` +- `.db.splayed.set`, `.db.parted.fill` - `.ipc.open`, `.ipc.close`, `.ipc.send` - `.log.open`, `.log.replay`, `.log.roll`, `.log.snapshot`, `.log.close` - `.os.getenv`, `.os.setenv` diff --git a/docs/docs/reference/all-functions.md b/docs/docs/reference/all-functions.md index 54f38b14..877f3c92 100644 --- a/docs/docs/reference/all-functions.md +++ b/docs/docs/reference/all-functions.md @@ -513,6 +513,8 @@ Persistent columnar storage — splayed (one file per column) and partitioned ta | `.db.splayed.set` | variadic | restricted | Save table as splayed columns to a directory | `(.db.splayed.set "db/trades" trades)` | | `.db.splayed.get` | variadic | — | Load splayed table from a directory | `(.db.splayed.get "db/trades")` | | `.db.parted.get` | variadic | — | Load partitioned table from root directory | `(.db.parted.get "db" 'trades)` | +| `.db.parted.tables` | variadic | — | List table names under a parted root (from the most recent partition) | `(.db.parted.tables "db")` | +| `.db.parted.fill` | variadic | restricted | Backfill missing tables across a parted db's partitions | `(.db.parted.fill "db")` | ```lisp ; Save and reload a splayed table diff --git a/docs/docs/storage/index.md b/docs/docs/storage/index.md index 7e444cad..0c611e18 100644 --- a/docs/docs/storage/index.md +++ b/docs/docs/storage/index.md @@ -286,7 +286,7 @@ This scans all date-named subdirectories under `db/trades/`, memory-maps every c ### `.db.parted.tables` — List a Root's Table Names -Returns a sorted `sym` vector of the table names available under a partitioned database root — the table subdirectories of the first partition. Each name can be passed straight to `.db.parted.get`; nothing is loaded by this call. +Returns a sorted `sym` vector of the table names available under a partitioned database root — the table subdirectories of the **most recent** (last, sorted) partition, which reflects the current table set. Each name can be passed straight to `.db.parted.get`; nothing is loaded by this call. ```lisp ; Discover which tables live under db/, then load each one @@ -294,6 +294,17 @@ Returns a sorted `sym` vector of the table names available under a partitioned d (map (fn [t] (.db.parted.get "db" t)) (.db.parted.tables "db")) ``` +### `.db.parted.fill` — Backfill Missing Tables Across Partitions + +For every table present in **any** partition, ensures **every** partition has it: a partition missing the table gets an **empty** copy whose schema is taken from the most recent partition that has it. This keeps queries that span partitions from failing on a partition where a table is absent — typically a table added partway through the database's life. Returns a sorted `sym` vector of the partition names it filled (empty when nothing needed fixing, so a repeat call is a no-op). Requires write permission on the root. + +```lisp +; `news` was only added from 2024.01.10 onward; backfill the earlier days. +(.db.parted.fill "db") ; => [`2024.01.01 … `2024.01.09] +; now every partition has every table — the empty copies add no rows. +(count (.db.parted.get "db" 'news)) ; unchanged +``` + ## Symbol Table Management Symbol tables are persisted automatically when you use `.db.splayed.set`. A `.sym` file is written into the table directory containing all interned symbol strings. When loading with `.db.splayed.get` or `.db.parted.get`, the symbol table is loaded first so that symbol columns decode correctly. diff --git a/src/core/sock.c b/src/core/sock.c index a13b0263..35bd96eb 100644 --- a/src/core/sock.c +++ b/src/core/sock.c @@ -86,95 +86,99 @@ ray_sock_t ray_sock_accept(ray_sock_t srv) return fd; } -ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms) +/* Connect an already-created socket `fd` to one resolved address. With + * timeout_ms > 0 the connect is driven non-blocking + poll (a blocking + * connect() ignores SO_*TIMEO) and the same budget is then applied as the + * handshake SO_RCVTIMEO/SO_SNDTIMEO; otherwise it is a plain blocking + * connect. Returns 0 on success, -1 on failure with errno set + * (ETIMEDOUT on a connect timeout). */ +static int sock_connect_one(ray_sock_t fd, const struct sockaddr* addr, + socklen_t addrlen, int timeout_ms) { - struct addrinfo hints, *res = NULL; - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - - char port_str[8]; - snprintf(port_str, sizeof(port_str), "%u", (unsigned)port); - - if (getaddrinfo(host, port_str, &hints, &res) != 0 || !res) - return RAY_INVALID_SOCK; + if (timeout_ms <= 0) + return connect(fd, addr, addrlen) < 0 ? -1 : 0; - ray_sock_t fd = (ray_sock_t)socket(res->ai_family, res->ai_socktype, - res->ai_protocol); - if (fd == RAY_INVALID_SOCK) { - freeaddrinfo(res); - return RAY_INVALID_SOCK; - } - - if (timeout_ms > 0) { - /* Bounded connect. A blocking connect() ignores SO_*TIMEO, so to - * honor the requested timeout we connect non-blocking and wait at - * most timeout_ms for the socket to become writable (= connected - * or failed), then restore blocking mode for the handshake. */ - ray_sock_set_nonblocking(fd); - int rc = connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen); - if (rc < 0) { + ray_sock_set_nonblocking(fd); + int rc = connect(fd, addr, addrlen); + if (rc < 0) { #ifdef RAY_OS_WINDOWS - int werr = WSAGetLastError(); - int in_progress = (werr == WSAEWOULDBLOCK || werr == WSAEINPROGRESS); + int werr = WSAGetLastError(); + int in_progress = (werr == WSAEWOULDBLOCK || werr == WSAEINPROGRESS); #else - int in_progress = (errno == EINPROGRESS); + int in_progress = (errno == EINPROGRESS); #endif - if (!in_progress) { - ray_sock_close(fd); - freeaddrinfo(res); - return RAY_INVALID_SOCK; - } - struct pollfd pfd = { .fd = fd, .events = POLLOUT }; - int pr; + if (!in_progress) return -1; + struct pollfd pfd = { .fd = fd, .events = POLLOUT }; + int pr; #ifdef RAY_OS_WINDOWS - pr = WSAPoll(&pfd, 1, timeout_ms); + pr = WSAPoll(&pfd, 1, timeout_ms); #else - do { pr = poll(&pfd, 1, timeout_ms); } while (pr < 0 && errno == EINTR); + do { pr = poll(&pfd, 1, timeout_ms); } while (pr < 0 && errno == EINTR); #endif - if (pr == 0) { /* timed out — distinct from refused */ - ray_sock_close(fd); - freeaddrinfo(res); - errno = ETIMEDOUT; - return RAY_INVALID_SOCK; - } - if (pr < 0) { - ray_sock_close(fd); - freeaddrinfo(res); - return RAY_INVALID_SOCK; - } - /* Writable: harvest the pending connect result via SO_ERROR. */ - int soerr = 0; - socklen_t soerr_len = sizeof(soerr); - if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&soerr, &soerr_len) < 0 - || soerr != 0) { - ray_sock_close(fd); - freeaddrinfo(res); - if (soerr != 0) errno = soerr; - return RAY_INVALID_SOCK; - } + if (pr == 0) { errno = ETIMEDOUT; return -1; } + if (pr < 0) return -1; + /* Writable: harvest the pending connect result via SO_ERROR. */ + int soerr = 0; + socklen_t soerr_len = sizeof(soerr); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&soerr, &soerr_len) < 0 + || soerr != 0) { + if (soerr != 0) errno = soerr; + return -1; } - ray_sock_set_blocking(fd); + } + ray_sock_set_blocking(fd); - /* Apply the same budget as the handshake send/recv timeout. */ + /* Apply the same budget as the handshake send/recv timeout. */ #ifdef RAY_OS_WINDOWS - DWORD tv = (DWORD)timeout_ms; - setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv)); - setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof(tv)); + DWORD tv = (DWORD)timeout_ms; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv)); + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof(tv)); #else - struct timeval tv; - tv.tv_sec = timeout_ms / 1000; - tv.tv_usec = (timeout_ms % 1000) * 1000; - setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + struct timeval tv; + tv.tv_sec = timeout_ms / 1000; + tv.tv_usec = (timeout_ms % 1000) * 1000; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); #endif - } else if (connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen) < 0) { - ray_sock_close(fd); - freeaddrinfo(res); + return 0; +} + +ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms) +{ + struct addrinfo hints, *res = NULL, *rp; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + char port_str[8]; + snprintf(port_str, sizeof(port_str), "%u", (unsigned)port); + + if (getaddrinfo(host, port_str, &hints, &res) != 0 || !res) return RAY_INVALID_SOCK; + + /* Try every resolved address in turn, not just the first. `localhost` + * commonly resolves to BOTH ::1 (IPv6, often first) and 127.0.0.1; a + * server bound IPv4-only refuses the ::1 attempt, so we must fall + * through to the next candidate rather than give up. */ + ray_sock_t fd = RAY_INVALID_SOCK; + int saved_errno = 0; + for (rp = res; rp != NULL; rp = rp->ai_next) { + fd = (ray_sock_t)socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (fd == RAY_INVALID_SOCK) { saved_errno = errno; continue; } + if (sock_connect_one(fd, rp->ai_addr, (socklen_t)rp->ai_addrlen, + timeout_ms) == 0) + break; /* connected */ + saved_errno = errno; + ray_sock_close(fd); + fd = RAY_INVALID_SOCK; } freeaddrinfo(res); + if (fd == RAY_INVALID_SOCK) { + errno = saved_errno; /* preserve ETIMEDOUT / refused */ + return RAY_INVALID_SOCK; + } + int yes = 1; setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const char*)&yes, sizeof(yes)); return fd; diff --git a/src/lang/compile.c b/src/lang/compile.c index 680dd896..7358e0c0 100644 --- a/src/lang/compile.c +++ b/src/lang/compile.c @@ -313,6 +313,13 @@ static void compile_list(compiler_t *c, ray_t *ast) { return; } compile_expr(c, elems[2]); + /* Materialize a lazy value before binding — the interpreter's + * ray_let_fn does the same. A lazy handle is single-use + * (materialization consumes its deferred graph), so a local + * that aliased one would break on its SECOND read (e.g. + * `(let v (first xs)) (if (> v 0) v 0)`: the compare consumes + * the lazy, the branch then reloads a dead handle). */ + emit(c, OP_FORCE); emit(c, OP_DUP); int32_t slot = find_local(c, name_obj->i64); if (slot < 0) slot = add_local(c, name_obj->i64); diff --git a/src/lang/eval.c b/src/lang/eval.c index bb54a512..708de14b 100644 --- a/src/lang/eval.c +++ b/src/lang/eval.c @@ -1843,6 +1843,7 @@ static ray_t* vm_exec(ray_t* lambda, ray_t** call_args, int64_t argc) { [OP_SCOPE_BEGIN] = &&op_scope_begin, [OP_SCOPE_END] = &&op_scope_end, [OP_TRYH] = &&op_tryh, + [OP_FORCE] = &&op_force, }; /* Arity check before allocating VM state */ @@ -1914,6 +1915,21 @@ op_loadconst_w: { op_loadenv: { uint8_t slot = code[ip++]; ray_t *val = LOCAL(slot); + if (val && ray_is_lazy(val)) { + /* Force a lazy local to a concrete, reusable value on first read, + * storing it back into the slot. A lazy handle is single-use + * (materialization consumes its deferred graph), so a second read + * would otherwise see a dead handle — e.g. a lazy first/last bound + * to a lambda PARAM (which, unlike `let`, is not forced at bind) + * and then used twice: `((fn [v] (if (> v 0) v 0)) (first xs))`. */ + val = ray_lazy_materialize(val); /* consumes the slot's ref */ + if (!val || RAY_IS_ERR(val)) { + vm_err_obj = val ? val : ray_error("type", NULL); + LOCAL(slot) = NULL; /* ref already consumed */ + goto vm_error; + } + LOCAL(slot) = val; /* slot now owns the concrete */ + } if (val) ray_retain(val); else val = make_i64(0); PUSH(val); @@ -2376,6 +2392,18 @@ op_tryh: { DISPATCH(); } +op_force: { + /* Materialize a lazy TOS so a let-bound local holds a concrete, + * reusable value (a lazy handle is single-use). */ + ray_t* v = POP(); + if (v && ray_is_lazy(v)) { + v = ray_lazy_materialize(v); /* consumes; concrete or error */ + if (!v || RAY_IS_ERR(v)) { vm_err_obj = v ? v : ray_error("type", NULL); goto vm_error; } + } + PUSH(v); + DISPATCH(); +} + op_scope_begin: { /* Stack: [.., syms_vec] — materialize the live locals (slot i holds * the value of syms[i]) plus self into a fresh scope frame so the @@ -2847,6 +2875,7 @@ static void ray_register_builtins(void) { register_vary(".db.splayed.get", RAY_FN_NONE, ray_get_splayed_fn); register_vary(".db.parted.get", RAY_FN_NONE, ray_get_parted_fn); register_vary(".db.parted.tables", RAY_FN_NONE, ray_get_parted_tables_fn); + register_vary(".db.parted.fill", RAY_FN_RESTRICTED, ray_fill_parted_fn); /* GUID generation */ register_unary("guid", RAY_FN_NONE, ray_guid_fn); diff --git a/src/lang/eval.h b/src/lang/eval.h index 8d7f0059..a87ebf77 100644 --- a/src/lang/eval.h +++ b/src/lang/eval.h @@ -89,6 +89,8 @@ enum { OP_TRYH, /* try-handler dispatch: pop err_val + handler; if * handler is callable, call it with err_val, else * push handler as a fallback value */ + OP_FORCE, /* materialize TOS if it is a lazy handle (so a + * let-bound local holds a concrete, reusable value) */ OP__COUNT }; diff --git a/src/lang/internal.h b/src/lang/internal.h index fec5005d..25625ab7 100644 --- a/src/lang/internal.h +++ b/src/lang/internal.h @@ -567,6 +567,7 @@ ray_t* ray_set_splayed_fn(ray_t** args, int64_t n); ray_t* ray_get_splayed_fn(ray_t** args, int64_t n); ray_t* ray_get_parted_fn(ray_t** args, int64_t n); ray_t* ray_get_parted_tables_fn(ray_t** args, int64_t n); +ray_t* ray_fill_parted_fn(ray_t** args, int64_t n); ray_t* ray_guid_fn(ray_t* n_arg); /* Transaction-log journaling (.log.*) — the -l/-L feature. diff --git a/src/ops/query.c b/src/ops/query.c index 68ef38f7..ad7375ca 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -1135,59 +1135,89 @@ ray_op_t* compile_expr_dag(ray_graph_t* g, ray_t* expr) { return ray_cast(g, col, tgt); } - /* (within col [lo hi]) — inclusive range membership. Lowers to + /* (within col range) — inclusive range membership. Lowers to * `(and (>= col lo) (<= col hi))`, reusing the comparison * executors so a `within` WHERE predicate inherits range-index * rowsel, partition pruning, null handling and type promotion - * for free. The range must compile to a 2-element constant - * vector — the same literal-operand constraint OP_IN's set has; - * a non-const or wrong-length range returns NULL so the select - * falls back to the eval-level `within` builtin. */ + * for free. Two range forms are accepted: + * 1. a 2-element constructor — `(list lo hi)` / `(enlist lo hi)` + * — whose element EXPRESSIONS are compiled individually, so + * variable / runtime bounds work (e.g. `(list lo hi)` with + * `lo`,`hi` bound by `set`); + * 2. a constant 2-element vector literal — e.g. `[2 4]`. + * Anything else returns NULL so the select falls back to the + * eval-level `within` builtin. */ if (fname_len == 6 && memcmp(fname, "within", 6) == 0) { if (n != 3) return NULL; ray_op_t* col = compile_expr_dag(g, elems[1]); if (!col) return NULL; uint32_t col_id = col->id; - ray_op_t* rng = compile_expr_dag(g, elems[2]); - if (!rng || rng->opcode != OP_CONST) return NULL; - ray_op_ext_t* rext = find_ext(g, rng->id); - if (!rext || !rext->literal) return NULL; - ray_t* rv = rext->literal; - if (!ray_is_vec(rv) || rv->len != 2) return NULL; - /* Extract the two bounds as typed atoms via ray_at_fn so a - * DATE/TIME/I32 range keeps its element type for the - * comparison (ray_const_atom retains, so release ours). */ - ray_t* lo_idx = ray_i64(0); - ray_t* hi_idx = ray_i64(1); - ray_t* lo_atom = ray_at_fn(rv, lo_idx); - ray_t* hi_atom = ray_at_fn(rv, hi_idx); - ray_release(lo_idx); - ray_release(hi_idx); - if (!lo_atom || RAY_IS_ERR(lo_atom) || - !hi_atom || RAY_IS_ERR(hi_atom)) { - if (lo_atom) ray_release(lo_atom); - if (hi_atom) ray_release(hi_atom); - return NULL; + + uint32_t lo_id = 0, hi_id = 0; + ray_t* range_ast = elems[2]; + int handled = 0; + + /* Form 1: 2-element (list …) / (enlist …) constructor. */ + if (range_ast->type == RAY_LIST && ray_len(range_ast) == 3) { + ray_t** re = (ray_t**)ray_data(range_ast); + if (re[0]->type == -RAY_SYM) { + ray_t* hs = ray_sym_str(re[0]->i64); + size_t hl = hs ? ray_str_len(hs) : 0; + const char* hp = hs ? ray_str_ptr(hs) : NULL; + if ((hl == 4 && memcmp(hp, "list", 4) == 0) || + (hl == 6 && memcmp(hp, "enlist", 6) == 0)) { + ray_op_t* lo = compile_expr_dag(g, re[1]); + if (!lo) return NULL; + lo_id = lo->id; + ray_op_t* hi = compile_expr_dag(g, re[2]); + if (!hi) return NULL; + hi_id = hi->id; + handled = 1; + } + } + } + + /* Form 2: constant 2-element vector literal. Extract the two + * bounds as typed atoms via ray_at_fn so a DATE/TIME/I32 range + * keeps its element type for the comparison (ray_const_atom + * retains, so release ours). */ + if (!handled) { + ray_op_t* rng = compile_expr_dag(g, range_ast); + if (!rng || rng->opcode != OP_CONST) return NULL; + ray_op_ext_t* rext = find_ext(g, rng->id); + if (!rext || !rext->literal) return NULL; + ray_t* rv = rext->literal; + if (!ray_is_vec(rv) || rv->len != 2) return NULL; + ray_t* lo_idx = ray_i64(0); + ray_t* hi_idx = ray_i64(1); + ray_t* lo_atom = ray_at_fn(rv, lo_idx); + ray_t* hi_atom = ray_at_fn(rv, hi_idx); + ray_release(lo_idx); + ray_release(hi_idx); + if (!lo_atom || RAY_IS_ERR(lo_atom) || + !hi_atom || RAY_IS_ERR(hi_atom)) { + if (lo_atom) ray_release(lo_atom); + if (hi_atom) ray_release(hi_atom); + return NULL; + } + ray_op_t* lo = ray_const_atom(g, lo_atom); + ray_op_t* hi = ray_const_atom(g, hi_atom); + ray_release(lo_atom); + ray_release(hi_atom); + if (!lo || !hi) return NULL; + lo_id = lo->id; + hi_id = hi->id; } - ray_op_t* lo = ray_const_atom(g, lo_atom); - ray_op_t* hi = ray_const_atom(g, hi_atom); - ray_release(lo_atom); - ray_release(hi_atom); - if (!lo || !hi) return NULL; - /* Snapshot IDs and re-resolve after each graph alloc — the - * node array may realloc between constructor calls. */ - uint32_t lo_id = lo->id, hi_id = hi->id; - col = &g->nodes[col_id]; - lo = &g->nodes[lo_id]; - ray_op_t* ge = ray_ge(g, col, lo); + + /* Build (and (>= col lo) (<= col hi)). Snapshot IDs and + * re-resolve after each graph alloc — the node array may + * realloc between constructor calls. */ + ray_op_t* ge = ray_ge(g, &g->nodes[col_id], &g->nodes[lo_id]); if (!ge) return NULL; uint32_t ge_id = ge->id; - col = &g->nodes[col_id]; - hi = &g->nodes[hi_id]; - ray_op_t* le = ray_le(g, col, hi); + ray_op_t* le = ray_le(g, &g->nodes[col_id], &g->nodes[hi_id]); if (!le) return NULL; - ge = &g->nodes[ge_id]; - return ray_and(g, ge, le); + return ray_and(g, &g->nodes[ge_id], le); } /* Temporal extract: (year col), (month col), (day col), ... */ diff --git a/src/ops/system.c b/src/ops/system.c index 7b0358f8..f0bd828f 100644 --- a/src/ops/system.c +++ b/src/ops/system.c @@ -222,6 +222,20 @@ ray_t* ray_get_parted_tables_fn(ray_t** args, int64_t n) { return ray_parted_tables(root); } +/* (.db.parted.fill "db_root") → sym vector of partition names that were + * filled. Writes an empty copy of every table into the partitions that lack + * it (schema from the most recent partition with it). */ +ray_t* ray_fill_parted_fn(ray_t** args, int64_t n) { + if (n != 1) + return ray_error("domain", ".db.parted.fill expects 1 argument, got %lld", (long long)n); + + char root[1024]; + if (!str_to_cpath(args[0], root, sizeof(root))) + return ray_error("type", ".db.parted.fill expects a string db root path, got %s", ray_type_name(args[0]->type)); + + return ray_parted_fill(root); +} + /* stat/dirent used by the .os.* filesystem metadata builtins below. */ #include #include diff --git a/src/store/part.c b/src/store/part.c index 876ecc9a..8b936793 100644 --- a/src/store/part.c +++ b/src/store/part.c @@ -466,36 +466,18 @@ ray_t* ray_read_parted(const char* db_root, const char* table_name) { } /* -------------------------------------------------------------------------- - * ray_parted_tables — list the table names under a parted db root - * - * Table names are the splayed-table subdirectories of a partition (those - * carrying a `.d` schema). We read them from the FIRST (sorted) partition, - * the same canonical-schema convention ray_read_parted uses for columns. - * Returns a sorted RAY_SYM vector suitable for ray_read_parted, or an error. + * collect_table_dirs — list the splayed-table subdirectories (those carrying + * a `.d` schema) of ONE partition directory `pdir`, as a sorted array of + * malloc'd names. Caller frees each name and the array. Returns RAY_OK + * (count may be 0), RAY_ERR_IO if pdir can't be opened, or RAY_ERR_OOM. * -------------------------------------------------------------------------- */ -ray_t* ray_parted_tables(const char* db_root) { - if (!db_root || !*db_root) return ray_error("io", NULL); - - char** part_dirs = NULL; - int64_t part_count = 0; - ray_err_t e = collect_part_dirs(db_root, &part_dirs, &part_count); - if (e != RAY_OK) - return ray_error(ray_err_code_str(e), - "parted %s: cannot enumerate partition directories", db_root); - - /* Only the first partition is needed; release the rest. */ - char pdir[1024]; - int pn = snprintf(pdir, sizeof(pdir), "%s/%s", db_root, part_dirs[0]); - for (int64_t p = 0; p < part_count; p++) free(part_dirs[p]); - free(part_dirs); - if (pn < 0 || (size_t)pn >= sizeof(pdir)) - return ray_error("range", "parted %s: partition path too long", db_root); - +static ray_err_t collect_table_dirs(const char* pdir, char*** out_names, + int64_t* out_count) { + *out_names = NULL; + *out_count = 0; DIR* d = opendir(pdir); - if (!d) - return ray_error("io", "parted %s: cannot read partition %s", db_root, pdir); + if (!d) return RAY_ERR_IO; - /* Collect subdirectories that are splayed tables (contain a `.d`). */ char** names = NULL; int64_t count = 0, cap = 0; ray_err_t oom = RAY_OK; @@ -524,7 +506,7 @@ ray_t* ray_parted_tables(const char* db_root) { if (oom != RAY_OK) { for (int64_t i = 0; i < count; i++) free(names[i]); free(names); - return ray_error("oom", NULL); + return RAY_ERR_OOM; } /* Deterministic order. */ @@ -534,6 +516,78 @@ ray_t* ray_parted_tables(const char* db_root) { char* t = names[i]; names[i] = names[j]; names[j] = t; } + *out_names = names; + *out_count = count; + return RAY_OK; +} + +/* Does partition `part` contain splayed table `tname` (i.e. a `.d` schema)? */ +static bool partition_has_table(const char* db_root, const char* part, + const char* tname) { + char dpath[1024]; + int n = snprintf(dpath, sizeof(dpath), "%s/%s/%s/.d", db_root, part, tname); + if (n < 0 || (size_t)n >= sizeof(dpath)) return false; + struct stat st; + return stat(dpath, &st) == 0 && S_ISREG(st.st_mode); +} + +/* Build a 0-row table with the same column names and types as `tmpl`. */ +static ray_t* empty_table_like(ray_t* tmpl) { + int64_t ncols = ray_table_ncols(tmpl); + ray_t* out = ray_table_new(ncols); + if (!out || RAY_IS_ERR(out)) return out ? out : ray_error("oom", NULL); + for (int64_t c = 0; c < ncols; c++) { + ray_t* col = ray_table_get_col_idx(tmpl, c); + if (!col) { ray_release(out); return ray_error("type", "empty_table_like: null column"); } + ray_t* ecol = ray_vec_new(col->type, 0); + if (!ecol || RAY_IS_ERR(ecol)) { ray_release(out); return ecol ? ecol : ray_error("oom", NULL); } + ray_t* nout = ray_table_add_col(out, ray_table_col_name(tmpl, c), ecol); + ray_release(ecol); + if (!nout || RAY_IS_ERR(nout)) { ray_release(out); return nout ? nout : ray_error("oom", NULL); } + out = nout; + } + return out; +} + +/* -------------------------------------------------------------------------- + * ray_parted_tables — list the table names under a parted db root + * + * Table names are the splayed-table subdirectories of a partition (those + * carrying a `.d` schema). We read them from the LAST (sorted) partition: + * partition dirs sort ascending (e.g. by date), so the last is the most + * recent and reflects the current table set — a table added in a later + * partition would be invisible if we only looked at the first/oldest. + * Returns a sorted RAY_SYM vector suitable for ray_read_parted, or an error. + * -------------------------------------------------------------------------- */ +ray_t* ray_parted_tables(const char* db_root) { + if (!db_root || !*db_root) return ray_error("io", NULL); + + char** part_dirs = NULL; + int64_t part_count = 0; + ray_err_t e = collect_part_dirs(db_root, &part_dirs, &part_count); + if (e != RAY_OK) + return ray_error(ray_err_code_str(e), + "parted %s: cannot enumerate partition directories", db_root); + if (part_count <= 0) { + free(part_dirs); + return ray_error("io", "parted %s: no partition directories", db_root); + } + + /* Only the last (most recent) partition is needed; release the rest. */ + char pdir[1024]; + int pn = snprintf(pdir, sizeof(pdir), "%s/%s", db_root, part_dirs[part_count - 1]); + for (int64_t p = 0; p < part_count; p++) free(part_dirs[p]); + free(part_dirs); + if (pn < 0 || (size_t)pn >= sizeof(pdir)) + return ray_error("range", "parted %s: partition path too long", db_root); + + char** names = NULL; + int64_t count = 0; + ray_err_t te = collect_table_dirs(pdir, &names, &count); + if (te != RAY_OK) + return ray_error(ray_err_code_str(te), + "parted %s: cannot read partition %s", db_root, pdir); + ray_t* result = ray_vec_new(RAY_SYM, count); if (!result || RAY_IS_ERR(result)) { for (int64_t i = 0; i < count; i++) free(names[i]); @@ -549,3 +603,142 @@ ray_t* ray_parted_tables(const char* db_root) { free(names); return result; } + +/* -------------------------------------------------------------------------- + * ray_parted_fill — fill missing tables across a parted db's partitions. + * + * For every table that appears in ANY partition, ensure every partition has + * it: a partition missing the table gets an EMPTY copy whose schema is taken + * from the most recent partition that does have it (the same canonical-schema + * convention ray_parted_tables / ray_read_parted use). This keeps queries + * that span partitions from failing on a partition where a table is absent. + * + * Returns a sorted RAY_SYM vector of the partition names that were filled + * (empty vector when nothing needed fixing), or an error. + * -------------------------------------------------------------------------- */ +ray_t* ray_parted_fill(const char* db_root) { + if (!db_root || !*db_root) return ray_error("io", NULL); + + char** part_dirs = NULL; + int64_t part_count = 0; + ray_err_t e = collect_part_dirs(db_root, &part_dirs, &part_count); + if (e != RAY_OK) + return ray_error(ray_err_code_str(e), + "parted %s: cannot enumerate partition directories", db_root); + if (part_count <= 0) { + free(part_dirs); + return ray_error("io", "parted %s: no partition directories", db_root); + } + + /* Shared root symfile — pass to load/save so SYM columns intern against + * the one vocabulary. NULL when the DB is symbol-free (no /.sym). */ + char sym_buf[1024]; + const char* sym_path = NULL; + int sn = snprintf(sym_buf, sizeof(sym_buf), "%s/.sym", db_root); + if (sn > 0 && (size_t)sn < sizeof(sym_buf)) { + struct stat sst; + if (stat(sym_buf, &sst) == 0) sym_path = sym_buf; + } + + /* Union of table names across all partitions. */ + char** all = NULL; + int64_t all_count = 0, all_cap = 0; + uint8_t* fixed = (uint8_t*)calloc((size_t)part_count, 1); + ray_err_t err = fixed ? RAY_OK : RAY_ERR_OOM; + + for (int64_t p = 0; p < part_count && err == RAY_OK; p++) { + char pdir[1024]; + int pn = snprintf(pdir, sizeof(pdir), "%s/%s", db_root, part_dirs[p]); + if (pn < 0 || (size_t)pn >= sizeof(pdir)) { err = RAY_ERR_RANGE; break; } + char** names = NULL; + int64_t cnt = 0; + ray_err_t te = collect_table_dirs(pdir, &names, &cnt); + if (te != RAY_OK) { err = te; break; } + for (int64_t i = 0; i < cnt; i++) { + bool seen = false; + for (int64_t k = 0; k < all_count; k++) + if (strcmp(all[k], names[i]) == 0) { seen = true; break; } + if (seen) { free(names[i]); continue; } + if (all_count >= all_cap) { + int64_t nc = all_cap == 0 ? 16 : all_cap * 2; + char** tmp = (char**)realloc(all, (size_t)nc * sizeof(char*)); + if (!tmp) { err = RAY_ERR_OOM; free(names[i]); continue; } + all = tmp; all_cap = nc; + } + all[all_count++] = names[i]; /* transfer ownership */ + } + free(names); + } + + /* For each table: template = last partition with it; fill the rest. */ + for (int64_t t = 0; t < all_count && err == RAY_OK; t++) { + const char* tname = all[t]; + int64_t templ = -1; + for (int64_t p = part_count - 1; p >= 0; p--) + if (partition_has_table(db_root, part_dirs[p], tname)) { templ = p; break; } + if (templ < 0) continue; /* unreachable (it's in the union) */ + + ray_t* empty_tbl = NULL; /* built lazily on first miss */ + for (int64_t p = 0; p < part_count && err == RAY_OK; p++) { + if (partition_has_table(db_root, part_dirs[p], tname)) continue; + + if (!empty_tbl) { + char tdir[1024]; + int tn = snprintf(tdir, sizeof(tdir), "%s/%s/%s", + db_root, part_dirs[templ], tname); + if (tn < 0 || (size_t)tn >= sizeof(tdir)) { err = RAY_ERR_RANGE; break; } + ray_t* full = ray_read_splayed(tdir, sym_path); + if (!full || RAY_IS_ERR(full)) { + if (full) ray_error_free(full); + err = RAY_ERR_IO; + break; + } + empty_tbl = empty_table_like(full); + ray_release(full); + if (!empty_tbl || RAY_IS_ERR(empty_tbl)) { + if (empty_tbl) ray_error_free(empty_tbl); + empty_tbl = NULL; + err = RAY_ERR_IO; + break; + } + } + + char ptdir[1024]; + int ptn = snprintf(ptdir, sizeof(ptdir), "%s/%s/%s", + db_root, part_dirs[p], tname); + if (ptn < 0 || (size_t)ptn >= sizeof(ptdir)) { err = RAY_ERR_RANGE; break; } + ray_err_t se = ray_splay_save(empty_tbl, ptdir, sym_path); + if (se != RAY_OK) { err = se; break; } + fixed[p] = 1; + } + if (empty_tbl) ray_release(empty_tbl); + } + + /* Build the sorted SYM result of fixed partition names. */ + ray_t* result = NULL; + if (err == RAY_OK) { + int64_t nfixed = 0; + for (int64_t p = 0; p < part_count; p++) if (fixed[p]) nfixed++; + result = ray_vec_new(RAY_SYM, nfixed); + if (!result || RAY_IS_ERR(result)) { err = RAY_ERR_OOM; } + else { + result->len = nfixed; + int64_t* out = (int64_t*)ray_data(result); + int64_t w = 0; + for (int64_t p = 0; p < part_count; p++) + if (fixed[p]) out[w++] = ray_sym_intern(part_dirs[p], strlen(part_dirs[p])); + } + } + + for (int64_t i = 0; i < all_count; i++) free(all[i]); + free(all); + for (int64_t p = 0; p < part_count; p++) free(part_dirs[p]); + free(part_dirs); + free(fixed); + + if (err != RAY_OK) { + if (result && !RAY_IS_ERR(result)) ray_release(result); + return ray_error(ray_err_code_str(err), "parted %s: fill failed", db_root); + } + return result; +} diff --git a/src/store/part.h b/src/store/part.h index f96af4ab..f6755829 100644 --- a/src/store/part.h +++ b/src/store/part.h @@ -35,4 +35,11 @@ ray_t* ray_read_parted(const char* db_root, const char* table_name); * or an error object (e.g. `io` when no partitions exist). */ ray_t* ray_parted_tables(const char* db_root); +/* Fill missing tables across a parted db: for every table present in any + * partition, write an empty copy — schema taken from the most recent + * partition that has it — into every partition that lacks it. Returns a + * sorted RAY_SYM vector of the partition names that were filled (empty when + * nothing needed fixing), or an error. */ +ray_t* ray_parted_fill(const char* db_root); + #endif /* RAY_PART_H */ diff --git a/test/rfl/agg/first.rfl b/test/rfl/agg/first.rfl index 092673a7..b4e1ae62 100644 --- a/test/rfl/agg/first.rfl +++ b/test/rfl/agg/first.rfl @@ -52,3 +52,16 @@ (set Tfgu (table [g] (list Gf))) (== (first Gf) (at (at (select {r: (first g) from: Tfgu}) 'r) 0)) -- true (type (at (select {r: (first g) from: Tfgu}) 'r)) -- 'GUID + +;; ── first must return a CONCRETE element, never a lazy DAG wrap, even for +;; ── I64/F64. A lazy result bound to a variable and used more than once +;; ── broke on the second use (it consumed itself on first materialization). +(type (first [10 20 30])) -- 'i64 +((fn [] (let v (first [10 20 30])) (if (> v 0) v 0))) -- 10 +((fn [] (let v (first [1.0 2.0 3.0])) (if (> v 0.0) v 0.0))) -- 1.0 +;; Reported case: first of a select-aggregate column, bound via let, reused. +(set sesslog (table [sym sn] (list ['c1 'c1] [1 2]))) +((fn [] (let v (first (at (select {from: sesslog max_sn: (max sn)}) 'max_sn))) (if (> v 0) v 0))) -- 2 +;; Same hazard via a lambda PARAMETER (forced on first load, not at bind). +((fn [v] (if (> v 0) v 0)) (first [10 20 30])) -- 10 +((fn [v] (+ v v)) (last [10 20 30])) -- 60 diff --git a/test/rfl/agg/last.rfl b/test/rfl/agg/last.rfl index 2161969f..f711e009 100644 --- a/test/rfl/agg/last.rfl +++ b/test/rfl/agg/last.rfl @@ -46,3 +46,9 @@ (set Gl (guid 6)) (set Tlgu (table [g] (list Gl))) (== (last Gl) (at (at (select {r: (last g) from: Tlgu}) 'r) 0)) -- true + +;; ── last must return a CONCRETE element (never a lazy DAG wrap) for I64/F64 +;; ── too, so a value bound via let and used twice survives (see first.rfl). +(type (last [10 20 30])) -- 'i64 +((fn [] (let v (last [10 20 30])) (if (> v 0) v 0))) -- 30 +((fn [] (let v (last [1.0 2.0 3.0])) (if (> v 0.0) v 0.0))) -- 3.0 diff --git a/test/rfl/query/where_within.rfl b/test/rfl/query/where_within.rfl index c832d4a5..b1736759 100644 --- a/test/rfl/query/where_within.rfl +++ b/test/rfl/query/where_within.rfl @@ -6,8 +6,9 @@ ;; comparison opcodes any hand-written range predicate would — inheriting ;; range-index rowsel, partition pruning, null handling and type promotion. ;; -;; The range operand must be a compile-time-constant 2-element vector, the -;; same literal constraint OP_IN's set operand has. +;; The range is either a constant 2-element vector literal (e.g. [2 4]) or +;; a 2-element `(list …)` / `(enlist …)` constructor — the latter lets the +;; bounds be variables / runtime expressions (issue follow-up). ;; -------------------------------------------------------------------- ;; I64 column — inclusive on both ends. @@ -15,6 +16,14 @@ (set t (table [sn] (list [1 2 3 4 5]))) ;; sn in [2 4] → {2,3,4}. (at (select {from: t where: (within sn [2 4]) c: sn}) 'c) -- [2 3 4] +;; Variable bounds via (list lo hi) and (enlist lo hi) — NOT a const vec, +;; so these exercise the constructor-decomposition path. (`[lo hi]` is a +;; symbol-vector literal by design and is intentionally NOT supported.) +(set lo 2) (set hi 4) +(at (select {from: t where: (within sn (list lo hi)) c: sn}) 'c) -- [2 3 4] +(at (select {from: t where: (within sn (enlist lo hi)) c: sn}) 'c) -- [2 3 4] +;; Literal args inside the constructor work too. +(at (select {from: t where: (within sn (list 2 4)) c: sn}) 'c) -- [2 3 4] ;; Equivalent hand-written range predicate produces the same rows. (at (select {from: t where: (and (>= sn 2) (<= sn 4)) c: sn}) 'c) -- [2 3 4] ;; Lower bound inclusive: [1 1] keeps exactly row 1. diff --git a/test/rfl/system/db_get.rfl b/test/rfl/system/db_get.rfl index 0a3606f5..e491d10d 100644 --- a/test/rfl/system/db_get.rfl +++ b/test/rfl/system/db_get.rfl @@ -7,7 +7,7 @@ ;; .db.splayed.set sweeps stale files and commits .d last, so re-setting ;; over a previous run is safe; the wipe below just isolates fixtures. -(.sys.exec "rm -rf /tmp/rfl_get_splayed /tmp/rfl_get_parted") +(.sys.exec "rm -rf /tmp/rfl_get_splayed /tmp/rfl_get_parted /tmp/rfl_get_parted2") ;; ────────────── several splayed tables under one root ────────────── (set Trades (table [s p] (list ['AAPL 'GOOG] [100.0 200.0]))) @@ -83,8 +83,8 @@ (max (at B 'v)) -- 8 ;; ────────────── .db.parted.tables: discover the table names ────────────── -;; Returns the splayed-table subdirectories of the first partition as a -;; sorted SYM vector — usable directly with .db.parted.get, no binding. +;; Returns the splayed-table subdirectories of the LAST (most recent) +;; partition as a sorted SYM vector — usable directly with .db.parted.get. (.db.parted.tables "/tmp/rfl_get_parted/") -- ['A 'B] (count (.db.parted.tables "/tmp/rfl_get_parted/")) -- 2 ;; a discovered name feeds straight back into .db.parted.get. @@ -93,6 +93,16 @@ (.db.parted.tables "/tmp/rfl_get_splayed/") !- io (.db.parted.tables "/tmp/rfl_get_does_not_exist/") !- io +;; .db.parted.tables reads the LAST partition, so a table added in a later +;; partition is discovered, and one present only in the OLDEST partition is +;; not listed. Root: 01.01 = {A, OLD}; 01.02 (last) = {A, NEW}. +(set TX (table [s v] (list ['a] [1]))) +(.db.splayed.set "/tmp/rfl_get_parted2/2024.01.01/A/" TX) +(.db.splayed.set "/tmp/rfl_get_parted2/2024.01.01/OLD/" TX) +(.db.splayed.set "/tmp/rfl_get_parted2/2024.01.02/A/" TX) +(.db.splayed.set "/tmp/rfl_get_parted2/2024.01.02/NEW/" TX) +(.db.parted.tables "/tmp/rfl_get_parted2/") -- ['A 'NEW] + ;; ────────────── error path: not a parted root ────────────── ;; The splayed root from the first test has no partition directories, ;; so .db.parted.get on it must fail rather than returning an empty diff --git a/test/rfl/system/db_parted_fill.rfl b/test/rfl/system/db_parted_fill.rfl new file mode 100644 index 00000000..a1466b05 --- /dev/null +++ b/test/rfl/system/db_parted_fill.rfl @@ -0,0 +1,46 @@ +;; .db.parted.fill — fill missing tables across a parted db's partitions. +;; +;; For every table present in ANY partition, write an empty copy (schema +;; from the most recent partition that has it) into every partition that +;; lacks it, so cross-partition queries never fail on a missing table. +;; Returns a sorted SYM vector of the partition names that were filled. + +(.sys.exec "rm -rf /tmp/rfl_fill") + +;; Layout (A everywhere; B only in the oldest; C only in the newest): +;; 2024.01.01 : {A, B} +;; 2024.01.02 : {A} +;; 2024.01.03 : {A, C} +(set TA (table [s v] (list ['x 'y] [1 2]))) +(set TB (table [s v] (list ['p 'q] [10 20]))) +(set TC (table [s v] (list ['m] [99]))) +(.db.splayed.set "/tmp/rfl_fill/2024.01.01/A/" TA) +(.db.splayed.set "/tmp/rfl_fill/2024.01.01/B/" TB) +(.db.splayed.set "/tmp/rfl_fill/2024.01.02/A/" TA) +(.db.splayed.set "/tmp/rfl_fill/2024.01.03/A/" TA) +(.db.splayed.set "/tmp/rfl_fill/2024.01.03/C/" TC) + +;; Before fill, the newest partition only knows A and C. +(.db.parted.tables "/tmp/rfl_fill/") -- ['A 'C] + +;; Fill: B is missing from 01.02 and 01.03; C is missing from 01.01 and +;; 01.02 — so all three partitions get at least one table created. +(.db.parted.fill "/tmp/rfl_fill/") -- ['2024.01.01 '2024.01.02 '2024.01.03] + +;; The newest partition now carries every table. +(.db.parted.tables "/tmp/rfl_fill/") -- ['A 'B 'C] + +;; Filled copies are EMPTY, so cross-partition totals are unchanged: +;; B has only the 2 rows from 01.01, C only the 1 row from 01.03, A all 6. +(count (.db.parted.get "/tmp/rfl_fill/" 'B)) -- 2 +(sum (at (.db.parted.get "/tmp/rfl_fill/" 'B) 'v)) -- 30 +(count (.db.parted.get "/tmp/rfl_fill/" 'C)) -- 1 +(count (.db.parted.get "/tmp/rfl_fill/" 'A)) -- 6 + +;; Idempotent: a second fill finds nothing missing → empty result. +(.db.parted.fill "/tmp/rfl_fill/") -- [] + +;; Error paths: missing root and a non-parted (splayed) root. +(.db.parted.fill "/tmp/rfl_fill_nope/") !- io + +(.sys.exec "rm -rf /tmp/rfl_fill") diff --git a/test/rfl/system/ipc_open_timeout.rfl b/test/rfl/system/ipc_open_timeout.rfl index 0df1f448..6a525b1b 100644 --- a/test/rfl/system/ipc_open_timeout.rfl +++ b/test/rfl/system/ipc_open_timeout.rfl @@ -44,8 +44,16 @@ (set h2 (.ipc.open "127.0.0.1:19998")) (>= h2 0) -- true (* 6 7) -- (.ipc.send h2 "(* 6 7)") +(.ipc.close h2) + +;; ── "localhost" must connect even though the server is IPv4-only ── +;; localhost often resolves to ::1 (IPv6) first; ray_sock_connect now tries +;; every resolved address, so it falls through to 127.0.0.1. +(set h3 (.ipc.open "localhost:19998")) +(>= h3 0) -- true +(- 9 4) -- (.ipc.send h3 "(- 9 4)") ;; ── teardown ─────────────────────────────────────────────────────── -(try (.ipc.send h2 "(exit 0)") (fn [e] 0)) -(.ipc.close h2) +(try (.ipc.send h3 "(exit 0)") (fn [e] 0)) +(.ipc.close h3) (.sys.exec "sleep 0.1; pkill -KILL -f 'rayforce -p 19998' 2>/dev/null; true") diff --git a/test/rfl/system/reserved_namespace.rfl b/test/rfl/system/reserved_namespace.rfl index 8f5b318c..2bbcabc8 100644 --- a/test/rfl/system/reserved_namespace.rfl +++ b/test/rfl/system/reserved_namespace.rfl @@ -37,11 +37,12 @@ (type .db.parted) -- 'DICT (count .db) -- 2 ;; splayed, parted (count .db.splayed) -- 2 ;; set, get -(count .db.parted) -- 2 ;; get, tables +(count .db.parted) -- 3 ;; get, tables, fill (nil? (resolve '.db.splayed.set)) -- false (nil? (resolve '.db.splayed.get)) -- false (nil? (resolve '.db.parted.get)) -- false (nil? (resolve '.db.parted.tables)) -- false +(nil? (resolve '.db.parted.fill)) -- false ;; REPL completion + syntax highlighting go through ;; ray_env_lookup_prefix scanning flat g_env keys. The namespace ;; dicts alone don't satisfy that path, so every reserved builtin