diff --git a/.github/requirements-docs.txt b/.github/requirements-docs.txt new file mode 100644 index 00000000..4df33c5c --- /dev/null +++ b/.github/requirements-docs.txt @@ -0,0 +1 @@ +mkdocs-material>=9.5 diff --git a/.github/workflows/pages.yml b/.github/workflows/pages.yml index c926b2fb..6fa00322 100644 --- a/.github/workflows/pages.yml +++ b/.github/workflows/pages.yml @@ -14,7 +14,7 @@ on: - 'docs/**' - 'overrides/**' - 'mkdocs.yml' - - 'requirements-docs.txt' + - '.github/requirements-docs.txt' - '.github/workflows/pages.yml' workflow_dispatch: @@ -36,9 +36,9 @@ jobs: with: python-version: '3.12' cache: pip - cache-dependency-path: requirements-docs.txt + cache-dependency-path: .github/requirements-docs.txt - name: Install MkDocs Material - run: pip install -r requirements-docs.txt + run: pip install -r .github/requirements-docs.txt - name: Build site (MkDocs) run: mkdocs build --strict - uses: actions/configure-pages@v4 diff --git a/docs/docs/language/control-flow.md b/docs/docs/language/control-flow.md index 4975751b..a1e7111e 100644 --- a/docs/docs/language/control-flow.md +++ b/docs/docs/language/control-flow.md @@ -79,6 +79,20 @@ Recursive lambdas use `self` to refer to the enclosing function: If no error is raised, `try` returns the result of the body expression normally. Works inside lambdas compiled to bytecode. +### Fallback value + +If the second argument is **not** a function, it is returned as-is as the fallback value on error (evaluated only when the body fails). Because lambdas do not capture closures, this is the only way to surface an outer binding from the failure branch: + +```lisp +‣ (try (raise "boom") 0) +0 + +‣ ((fn [data] (try (raise "boom") data)) 123) +123 +``` + +A handler must accept the single error argument, so only a lambda or a unary builtin is *called* with the error; any other value (including a multi-argument builtin) is treated as a fallback value. + ## Early Return: return `return` exits the innermost enclosing compiled lambda early with the given value: diff --git a/docs/docs/language/functions.md b/docs/docs/language/functions.md index 2b87d46b..c5b1653b 100644 --- a/docs/docs/language/functions.md +++ b/docs/docs/language/functions.md @@ -316,7 +316,7 @@ Cross-temporal comparisons are supported: dates, times, and timestamps are all c | `if` | variadic, special | Conditional (if/then/else) | `(if (> x 0) "pos" "neg")` | | `do` | variadic, special | Sequential execution, returns last | `(do (set x 1) (set y 2) (+ x y))` | | `fn` | variadic, special | Create lambda function | `(fn [x] (* x x))` | -| `try` | binary, special | Error handling (expr handler) | `(try (/ 1 0) (fn [e] 0))` | +| `try` | binary, special | Error handling (expr handler-or-fallback) | `(try (/ 1 0) (fn [e] 0))` | | `raise` | unary | Throw an error | `(raise "bad input")` | | `return` | variadic | Early return from compiled lambda (0 args → null) | `(return 42)` | | `quote` | variadic, special | Return argument unevaluated; a bare name becomes a literal symbol (`(quote x)` ≡ `'x`) | `(quote (+ 1 2))` → `(+ 1 2)` | diff --git a/docs/docs/namespaces/ipc.md b/docs/docs/namespaces/ipc.md index 4f411c21..8e64bc34 100644 --- a/docs/docs/namespaces/ipc.md +++ b/docs/docs/namespaces/ipc.md @@ -12,7 +12,7 @@ Connect to a Rayforce server (`./rayforce -p `) and exchange messages over | Function | Arity | Flags | Description | |---|---|---|---| -| [`.ipc.open`](#ipc-open) | unary | restricted | Open a TCP connection; return an i64 handle. | +| [`.ipc.open`](#ipc-open) | variadic | restricted | Open a TCP connection (optional connect timeout); return an i64 handle. | | [`.ipc.send`](#ipc-send) | binary | restricted | Send a message synchronously; return the server's result. | | [`.ipc.post`](#ipc-post) | binary | restricted | Send a message asynchronously (fire-and-forget); return the null object. | | [`.ipc.close`](#ipc-close) | unary | restricted | Close a connection handle. | @@ -20,14 +20,17 @@ Connect to a Rayforce server (`./rayforce -p `) and exchange messages over ## `.ipc.open` { #ipc-open } -Signature: `(.ipc.open "host:port")` or `(.ipc.open "host:port:user:password")`. +Signature: `(.ipc.open "host:port")` or `(.ipc.open "host:port:user:password")`, with an optional trailing connect timeout in milliseconds: `(.ipc.open "host:port" 2000)`. Returns: an `i64` handle. Negative handles never escape — errors are surfaced as Rayfall error objects: -- `type` — argument is not a string. -- `domain` — malformed address (missing port, port out of `(0, 65535]`, oversized host/user/password). +- `type` — address argument is not a string, or the timeout argument is not an integer. +- `rank` — called with fewer than 1 or more than 2 arguments. +- `domain` — malformed address (missing port, port out of `(0, 65535]`, oversized host/user/password), or a negative timeout. - `access` — server requires auth and you didn't supply credentials, **or** the password is wrong. -- `io` — connection refused / network error. +- `io` — connection refused / network error, or `connection timed out` when the connect did not complete within the timeout. + +The optional timeout bounds **both** the TCP connect and the handshake I/O. A blocking `connect()` ignores socket send/receive timeouts, so without an explicit bound a dead or packet-filtered peer would otherwise hang for the operating-system default (often minutes). When omitted, a default budget of 5 seconds applies. The handshake exchanges a 2-byte `{wire_version, auth_flag}` greeting. A wire-version mismatch closes the connection before any payload is exchanged. @@ -37,6 +40,9 @@ The handshake exchanges a 2-byte `{wire_version, auth_flag}` greeting. A wire-ve ;; With credentials (server started with -u or -U) (set h (.ipc.open "127.0.0.1:5000:admin:secret123")) + +;; Fail fast if the peer doesn't answer within 2 seconds +(set h (.ipc.open "127.0.0.1:5000" 2000)) ``` ## `.ipc.send` { #ipc-send } diff --git a/docs/docs/reference/all-functions.md b/docs/docs/reference/all-functions.md index 649b37da..54f38b14 100644 --- a/docs/docs/reference/all-functions.md +++ b/docs/docs/reference/all-functions.md @@ -231,7 +231,7 @@ Special forms receive their arguments unevaluated. These are the core language p | `if` | variadic | special | Conditional: (if cond then else) | `(if (> x 0) "pos" "neg")` | | `do` | variadic | special | Sequential execution, returns last value | `(do (set x 1) (set y 2) (+ x y))` | | `fn` | variadic | special | Create lambda function | `(fn [x y] (+ x y))` | -| `try` | binary | special | Error handling: (try expr handler-fn) | `(try (/ 1 0) (fn [e] 0))` | +| `try` | binary | special | Error handling: (try expr handler-fn-or-fallback-value) | `(try (/ 1 0) (fn [e] 0))` | | `raise` | unary | — | Throw an error with message | `(raise "bad input")` | | `return` | unary | — | Early return from function body | `(return 42)` | | `quote` | variadic | special | Return argument unevaluated; a bare name yields a literal symbol (`(quote x)` ≡ `'x`) | `(quote (+ 1 2))` → `(+ 1 2)` | @@ -530,7 +530,7 @@ TCP-based IPC for connecting to remote Rayforce instances. Uses binary serializa | Function | Type | Flags | Description | Example | |---|---|---|---|---| -| `.ipc.open` | unary | restricted | Open TCP connection to host:port, returns handle | `(.ipc.open "localhost:5000")` | +| `.ipc.open` | variadic | restricted | Open TCP connection to host:port (optional connect timeout in ms), returns handle | `(.ipc.open "localhost:5000" 2000)` | | `.ipc.close` | unary | restricted | Close an IPC connection handle | `(.ipc.close h)` | | `.ipc.send` | binary | restricted | Send a value over an IPC handle (sync request) | `(.ipc.send h "(sum (til 100))")` | | `.ipc.handle` | variadic | — | Current connection handle inside any `.ipc.on.*` hook, `-1` outside | `(.ipc.handle)` | diff --git a/include/rayforce.h b/include/rayforce.h index fafb7a89..0ca15cea 100644 --- a/include/rayforce.h +++ b/include/rayforce.h @@ -647,7 +647,8 @@ ray_t* ray_fmt(ray_t* obj, int mode); * exchange; ray_ipc_send_async sends a fire-and-forget frame. */ int64_t ray_ipc_connect(const char* host, uint16_t port, - const char* user, const char* password); + const char* user, const char* password, + int timeout_ms); void ray_ipc_close(int64_t handle); ray_t* ray_ipc_send(int64_t handle, ray_t* msg); ray_err_t ray_ipc_send_async(int64_t handle, ray_t* msg); diff --git a/src/app/repl.c b/src/app/repl.c index 2f51b79d..ff7dcde5 100644 --- a/src/app/repl.c +++ b/src/app/repl.c @@ -573,7 +573,7 @@ ray_t* ray_repl_connect_fn(ray_t* host_port_str) { * and the connect-error-to-rayfall-error mapping; reusing it * keeps .repl.connect a one-line wrapper instead of a parallel * implementation that drifts. */ - ray_t* opened = ray_hopen_fn(host_port_str); + ray_t* opened = ray_hopen_fn(&host_port_str, 1); if (!opened || RAY_IS_ERR(opened)) return opened; if (!ray_is_atom(opened) || (opened->type != -RAY_I64 && opened->type != -RAY_I32)) { diff --git a/src/core/ipc.c b/src/core/ipc.c index ec9e43b8..890e91a6 100644 --- a/src/core/ipc.c +++ b/src/core/ipc.c @@ -339,10 +339,24 @@ static int hook_call_auth(ray_poll_t* poll, int64_t handle, static void send_response(ray_sock_t fd, ray_t* result) { int64_t ser_size = ray_serde_size(result); - if (ser_size <= 0) return; + + /* A result we cannot serialize must never leave the client waiting on + * a reply that never arrives. Substitute a serializable error so the + * caller observes a clean failure instead of a silent infinite hang + * (issue #285). ray_error frames serialize, so the substitute always + * goes out unless even that fails — in which case there is nothing we + * can put on the wire and we drop as before. */ + ray_t* fallback = NULL; + if (ser_size <= 0) { + fallback = ray_error("type", "result of type %s is not serializable over IPC", + result ? ray_type_name(result->type) : "null"); + result = fallback; + ser_size = ray_serde_size(result); + if (ser_size <= 0) { if (fallback) ray_error_free(fallback); return; } + } uint8_t* payload = (uint8_t*)ray_sys_alloc((size_t)ser_size); - if (!payload) return; + if (!payload) { if (fallback) ray_error_free(fallback); return; } ray_ser_raw(payload, result); uint8_t* send_buf = NULL; @@ -387,6 +401,7 @@ static void send_response(ray_sock_t fd, ray_t* result) ray_sys_free(send_buf); if (payload) ray_sys_free(payload); + if (fallback) ray_error_free(fallback); } /* Decompress (when flagged) + de-serialize one framed payload into an @@ -501,6 +516,15 @@ static ray_t* eval_payload_core(uint8_t* payload, size_t payload_len, ray_release(msg); } } + /* A lazy result is an internal deferred-DAG representation that cannot + * be serialized — force it to a concrete value before it reaches the + * wire. The direct ray_eval(msg) path (non-STR payloads, e.g. an + * expression list `(first v)`) returns lazy chains verbatim; the + * ray_eval_str path already materializes, so this is a no-op there. + * Without this, send_response cannot serialize the result and the + * client blocks forever waiting for a reply (issue #285). */ + if (result && ray_is_lazy(result)) + result = ray_lazy_materialize(result); /* consumes the retain */ return result ? result : RAY_NULL_OBJ; } @@ -1401,7 +1425,8 @@ static int64_t conn_write_msg(ray_sock_t fd, ray_t* msg, uint8_t msgtype, } int64_t ray_ipc_connect(const char* host, uint16_t port, - const char* user, const char* password) + const char* user, const char* password, + int timeout_ms) { /* The connection lives in the active poll's selector table — its * selector id IS the handle. No poll, no handle namespace: refuse @@ -1409,8 +1434,11 @@ int64_t ray_ipc_connect(const char* host, uint16_t port, ray_poll_t* poll = ipc_active_poll(); if (!poll) return -1; - ray_sock_t fd = ray_sock_connect(host, port, 5000); - if (fd == RAY_INVALID_SOCK) return -1; + /* Default the connect/handshake budget to 5s when the caller gives + * no explicit timeout, matching the long-standing handshake timeout. */ + int connect_to = timeout_ms > 0 ? timeout_ms : 5000; + ray_sock_t fd = ray_sock_connect(host, port, connect_to); + if (fd == RAY_INVALID_SOCK) return (errno == ETIMEDOUT) ? -5 : -1; uint8_t hs[2] = { RAY_SERDE_WIRE_VERSION, 0x00 }; if (ray_sock_send(fd, hs, 2) < 0) { diff --git a/src/core/ipc.h b/src/core/ipc.h index 2c746921..f880c2ad 100644 --- a/src/core/ipc.h +++ b/src/core/ipc.h @@ -114,8 +114,13 @@ int ray_ipc_poll(ray_ipc_server_t* srv, int timeout_ms); * the connection's rx machinery while waiting, dispatching any * interleaved async/sync frames from the peer. */ +/* timeout_ms > 0 bounds the TCP connect and the handshake I/O; <= 0 uses + * the default budget. Returns the handle (>= 0) or a negative code: + * -1 refused/error, -2 auth required, -3 auth failed, -4 wire mismatch, + * -5 connect timed out. */ int64_t ray_ipc_connect(const char* host, uint16_t port, - const char* user, const char* password); + const char* user, const char* password, + int timeout_ms); void ray_ipc_close(int64_t handle); ray_t* ray_ipc_send(int64_t handle, ray_t* msg); ray_err_t ray_ipc_send_async(int64_t handle, ray_t* msg); diff --git a/src/core/sock.c b/src/core/sock.c index c1f8dd9c..a13b0263 100644 --- a/src/core/sock.c +++ b/src/core/sock.c @@ -106,8 +106,57 @@ ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms) return RAY_INVALID_SOCK; } - /* Set send/recv timeout if requested */ if (timeout_ms > 0) { + /* Bounded connect. A blocking connect() ignores SO_*TIMEO, so to + * honor the requested timeout we connect non-blocking and wait at + * most timeout_ms for the socket to become writable (= connected + * or failed), then restore blocking mode for the handshake. */ + ray_sock_set_nonblocking(fd); + int rc = connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen); + if (rc < 0) { +#ifdef RAY_OS_WINDOWS + int werr = WSAGetLastError(); + int in_progress = (werr == WSAEWOULDBLOCK || werr == WSAEINPROGRESS); +#else + int in_progress = (errno == EINPROGRESS); +#endif + if (!in_progress) { + ray_sock_close(fd); + freeaddrinfo(res); + return RAY_INVALID_SOCK; + } + struct pollfd pfd = { .fd = fd, .events = POLLOUT }; + int pr; +#ifdef RAY_OS_WINDOWS + pr = WSAPoll(&pfd, 1, timeout_ms); +#else + do { pr = poll(&pfd, 1, timeout_ms); } while (pr < 0 && errno == EINTR); +#endif + if (pr == 0) { /* timed out — distinct from refused */ + ray_sock_close(fd); + freeaddrinfo(res); + errno = ETIMEDOUT; + return RAY_INVALID_SOCK; + } + if (pr < 0) { + ray_sock_close(fd); + freeaddrinfo(res); + return RAY_INVALID_SOCK; + } + /* Writable: harvest the pending connect result via SO_ERROR. */ + int soerr = 0; + socklen_t soerr_len = sizeof(soerr); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&soerr, &soerr_len) < 0 + || soerr != 0) { + ray_sock_close(fd); + freeaddrinfo(res); + if (soerr != 0) errno = soerr; + return RAY_INVALID_SOCK; + } + } + ray_sock_set_blocking(fd); + + /* Apply the same budget as the handshake send/recv timeout. */ #ifdef RAY_OS_WINDOWS DWORD tv = (DWORD)timeout_ms; setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv)); @@ -119,9 +168,7 @@ ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms) setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); #endif - } - - if (connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen) < 0) { + } else if (connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen) < 0) { ray_sock_close(fd); freeaddrinfo(res); return RAY_INVALID_SOCK; @@ -218,3 +265,18 @@ ray_err_t ray_sock_set_nonblocking(ray_sock_t s) #endif return RAY_OK; } + +ray_err_t ray_sock_set_blocking(ray_sock_t s) +{ +#ifdef RAY_OS_WINDOWS + u_long mode = 0; + if (ioctlsocket(s, FIONBIO, &mode) != 0) + return RAY_ERR_IO; +#else + int flags = fcntl(s, F_GETFL, 0); + if (flags < 0) return RAY_ERR_IO; + if (fcntl(s, F_SETFL, flags & ~O_NONBLOCK) < 0) + return RAY_ERR_IO; +#endif + return RAY_OK; +} diff --git a/src/core/sock.h b/src/core/sock.h index d32d3f3d..1b4b7b23 100644 --- a/src/core/sock.h +++ b/src/core/sock.h @@ -38,6 +38,13 @@ ray_sock_t ray_sock_listen(uint16_t port); ray_sock_t ray_sock_accept(ray_sock_t srv); +/* Connect to host:port. timeout_ms > 0 bounds the connect: the socket + * connects non-blocking and waits at most timeout_ms for completion (a + * blocking connect() ignores SO_*TIMEO and would otherwise hang for the + * OS default), then the same value is applied as SO_RCVTIMEO/SO_SNDTIMEO + * for the subsequent handshake I/O. timeout_ms <= 0 = blocking connect, + * no I/O timeout. On a connect timeout, errno is set to ETIMEDOUT and + * RAY_INVALID_SOCK is returned. */ ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms); int64_t ray_sock_send(ray_sock_t s, const void* buf, size_t len); int64_t ray_sock_recv(ray_sock_t s, void* buf, size_t len); @@ -46,5 +53,6 @@ int64_t ray_sock_recv(ray_sock_t s, void* buf, size_t len); int ray_sock_wait_readable(ray_sock_t s, int timeout_ms); void ray_sock_close(ray_sock_t s); ray_err_t ray_sock_set_nonblocking(ray_sock_t s); +ray_err_t ray_sock_set_blocking(ray_sock_t s); #endif /* RAY_SOCK_H */ diff --git a/src/lang/compile.c b/src/lang/compile.c index f9b8ccd0..680dd896 100644 --- a/src/lang/compile.c +++ b/src/lang/compile.c @@ -381,11 +381,10 @@ static void compile_list(compiler_t *c, ray_t *ast) { * Stash it, compile handler fn, reload err_val, call. */ emit(c, OP_STOREENV); emit(c, (uint8_t)err_slot); - compile_expr(c, elems[2]); /* handler fn */ + compile_expr(c, elems[2]); /* handler (fn or fallback value) */ emit(c, OP_LOADENV); emit(c, (uint8_t)err_slot); - emit(c, OP_CALLF); - emit(c, 1); /* call handler(err_val) */ + emit(c, OP_TRYH); /* callable → call(err); else value */ patch_jump(c, jmp_pos); /* end */ return; } diff --git a/src/lang/eval.c b/src/lang/eval.c index 96b9e36e..bb54a512 100644 --- a/src/lang/eval.c +++ b/src/lang/eval.c @@ -267,7 +267,27 @@ ray_t* ray_raise_fn(ray_t* val) { return ray_error("domain", NULL); } -/* (try expr handler) — evaluate expr, if error call handler with error value. +/* Dispatch a `try` handler value against the error value. A *callable* + * handler (a lambda or a unary builtin) is invoked with the error and its + * result returned; ANY other value is returned as-is as a fallback result. + * + * The fallback form exists because Rayfall lambdas do not capture closures, + * so a handler lambda `(fn [e] outer)` cannot see an outer binding. Passing + * the value directly — evaluated in the current scope — is the only way to + * surface an outer variable from the failure branch: + * ((fn [data] (try (raise "x") data)) 123) -> 123 + * + * Borrows both `handler` and `err_val`; returns a new owned ref. */ +ray_t* ray_try_handle(ray_t* handler, ray_t* err_val) { + if (handler->type == RAY_LAMBDA || handler->type == RAY_UNARY) + return call_fn1(handler, err_val); /* borrows err_val */ + ray_retain(handler); + return handler; /* fallback value */ +} + +/* (try expr handler) — evaluate expr; on error, dispatch the (evaluated) + * handler via ray_try_handle (callable → invoked with the error value; + * otherwise returned as a fallback value). * Special form: receives unevaluated args. */ ray_t* ray_try_fn(ray_t* expr, ray_t* handler_expr) { ray_t* result = ray_eval(expr); @@ -278,25 +298,14 @@ ray_t* ray_try_fn(ray_t* expr, ray_t* handler_expr) { __VM->raise_val = NULL; if (!err_val) err_val = make_i64(0); - /* Evaluate handler expression */ + /* Evaluate handler expression (in the current scope) */ ray_t* handler = ray_eval(handler_expr); if (RAY_IS_ERR(handler)) { ray_release(err_val); return handler; } - /* Call handler with error value */ - ray_t* handler_result; - if (handler->type == RAY_LAMBDA) { - ray_t* args[1] = { err_val }; - handler_result = call_lambda(handler, args, 1); - } else if (handler->type == RAY_UNARY) { - ray_unary_fn fn = (ray_unary_fn)(uintptr_t)handler->i64; - handler_result = fn(err_val); - } else { - handler_result = ray_error("type", "try: handler must be a function, got %s", ray_type_name(handler->type)); - } - + ray_t* handler_result = ray_try_handle(handler, err_val); ray_release(err_val); ray_release(handler); return handler_result; @@ -1833,6 +1842,7 @@ static ray_t* vm_exec(ray_t* lambda, ray_t** call_args, int64_t argc) { [OP_STOREGLOBAL_W] = &&op_storeglobal_w, [OP_SCOPE_BEGIN] = &&op_scope_begin, [OP_SCOPE_END] = &&op_scope_end, + [OP_TRYH] = &&op_tryh, }; /* Arity check before allocating VM state */ @@ -2353,6 +2363,19 @@ op_trap_end: { DISPATCH(); } +op_tryh: { + /* Stack: [.., handler, err_val] (err_val on top). A callable handler + * is invoked with the error; any other value is the fallback result. */ + ray_t* err_val = POP(); + ray_t* handler = POP(); + ray_t* result = ray_try_handle(handler, err_val); /* borrows both */ + ray_release(err_val); + ray_release(handler); + if (RAY_IS_ERR(result)) { vm_err_obj = result; goto vm_error; } + PUSH(result); + DISPATCH(); +} + op_scope_begin: { /* Stack: [.., syms_vec] — materialize the live locals (slot i holds * the value of syms[i]) plus self into a fresh scope frame so the @@ -2884,7 +2907,7 @@ static void ray_register_builtins(void) { register_unary( ".os.list", RAY_FN_NONE, ray_os_list_fn); /* IPC client primitives under `.ipc.*` */ - register_unary( ".ipc.open", RAY_FN_RESTRICTED, ray_hopen_fn); + register_vary( ".ipc.open", RAY_FN_RESTRICTED, ray_hopen_fn); register_unary( ".ipc.close", RAY_FN_RESTRICTED, ray_hclose_fn); register_binary(".ipc.send", RAY_FN_RESTRICTED, ray_hsend_fn); register_binary(".ipc.post", RAY_FN_RESTRICTED, ray_hpost_fn); diff --git a/src/lang/eval.h b/src/lang/eval.h index 7d7139d0..8d7f0059 100644 --- a/src/lang/eval.h +++ b/src/lang/eval.h @@ -86,6 +86,9 @@ enum { OP_SCOPE_END, /* pop sym-id vec; sync frame values back into the * local slots, pop the frame (leaves the CALLD result * on the stack) */ + OP_TRYH, /* try-handler dispatch: pop err_val + handler; if + * handler is callable, call it with err_val, else + * push handler as a fallback value */ OP__COUNT }; diff --git a/src/lang/internal.h b/src/lang/internal.h index 60399199..fec5005d 100644 --- a/src/lang/internal.h +++ b/src/lang/internal.h @@ -405,6 +405,7 @@ ray_t* to_boxed_list(ray_t* x); ray_t* unbox_vec_arg(ray_t* x, ray_t** _bx); ray_t* call_lambda(ray_t* lambda, ray_t** call_args, int64_t argc); ray_t* call_fn1(ray_t* fn, ray_t* arg); +ray_t* ray_try_handle(ray_t* handler, ray_t* err_val); ray_t* call_fn2(ray_t* fn, ray_t* a, ray_t* b); ray_t* gather_by_idx(ray_t* vec, int64_t* idx, int64_t n); ray_t* ray_sort(ray_t** cols, uint8_t* descs, uint8_t* nulls_first, @@ -557,7 +558,7 @@ ray_t* ray_sysinfo_fn(ray_t** args, int64_t n); ray_t* ray_sys_args_fn(ray_t** args, int64_t n); ray_t* ray_ser_fn(ray_t* val); ray_t* ray_de_fn(ray_t* val); -ray_t* ray_hopen_fn(ray_t* x); +ray_t* ray_hopen_fn(ray_t** args, int64_t n); ray_t* ray_hclose_fn(ray_t* x); ray_t* ray_hsend_fn(ray_t* handle, ray_t* msg); ray_t* ray_hpost_fn(ray_t* handle, ray_t* msg); diff --git a/src/ops/query.c b/src/ops/query.c index 21b9ef76..68ef38f7 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -1135,6 +1135,61 @@ ray_op_t* compile_expr_dag(ray_graph_t* g, ray_t* expr) { return ray_cast(g, col, tgt); } + /* (within col [lo hi]) — inclusive range membership. Lowers to + * `(and (>= col lo) (<= col hi))`, reusing the comparison + * executors so a `within` WHERE predicate inherits range-index + * rowsel, partition pruning, null handling and type promotion + * for free. The range must compile to a 2-element constant + * vector — the same literal-operand constraint OP_IN's set has; + * a non-const or wrong-length range returns NULL so the select + * falls back to the eval-level `within` builtin. */ + if (fname_len == 6 && memcmp(fname, "within", 6) == 0) { + if (n != 3) return NULL; + ray_op_t* col = compile_expr_dag(g, elems[1]); + if (!col) return NULL; + uint32_t col_id = col->id; + ray_op_t* rng = compile_expr_dag(g, elems[2]); + if (!rng || rng->opcode != OP_CONST) return NULL; + ray_op_ext_t* rext = find_ext(g, rng->id); + if (!rext || !rext->literal) return NULL; + ray_t* rv = rext->literal; + if (!ray_is_vec(rv) || rv->len != 2) return NULL; + /* Extract the two bounds as typed atoms via ray_at_fn so a + * DATE/TIME/I32 range keeps its element type for the + * comparison (ray_const_atom retains, so release ours). */ + ray_t* lo_idx = ray_i64(0); + ray_t* hi_idx = ray_i64(1); + ray_t* lo_atom = ray_at_fn(rv, lo_idx); + ray_t* hi_atom = ray_at_fn(rv, hi_idx); + ray_release(lo_idx); + ray_release(hi_idx); + if (!lo_atom || RAY_IS_ERR(lo_atom) || + !hi_atom || RAY_IS_ERR(hi_atom)) { + if (lo_atom) ray_release(lo_atom); + if (hi_atom) ray_release(hi_atom); + return NULL; + } + ray_op_t* lo = ray_const_atom(g, lo_atom); + ray_op_t* hi = ray_const_atom(g, hi_atom); + ray_release(lo_atom); + ray_release(hi_atom); + if (!lo || !hi) return NULL; + /* Snapshot IDs and re-resolve after each graph alloc — the + * node array may realloc between constructor calls. */ + uint32_t lo_id = lo->id, hi_id = hi->id; + col = &g->nodes[col_id]; + lo = &g->nodes[lo_id]; + ray_op_t* ge = ray_ge(g, col, lo); + if (!ge) return NULL; + uint32_t ge_id = ge->id; + col = &g->nodes[col_id]; + hi = &g->nodes[hi_id]; + ray_op_t* le = ray_le(g, col, hi); + if (!le) return NULL; + ge = &g->nodes[ge_id]; + return ray_and(g, ge, le); + } + /* Temporal extract: (year col), (month col), (day col), ... */ if (n == 2) { int64_t field = -1; diff --git a/src/ops/system.c b/src/ops/system.c index 19abd9c5..7b0358f8 100644 --- a/src/ops/system.c +++ b/src/ops/system.c @@ -42,6 +42,7 @@ void ray_runtime_set_sys_args(void* dict); void* ray_runtime_get_sys_args(void); #include #include +#include #include #include #if !defined(RAY_OS_WINDOWS) @@ -845,11 +846,28 @@ ray_t* ray_sys_args_fn(ray_t** args, int64_t n) { * IPC builtins * ══════════════════════════════════════════ */ -/* (hopen "host:port[:user:password]") → i64 handle */ -ray_t* ray_hopen_fn(ray_t* x) { +/* (hopen "host:port[:user:password]" [timeout-ms]) → i64 handle. + * The optional second argument bounds the TCP connect and handshake in + * milliseconds; omitted leaves the default budget. */ +ray_t* ray_hopen_fn(ray_t** args, int64_t n) { + if (n < 1 || n > 2) + return ray_error("rank", ".ipc.open expects 1 or 2 arguments: \"host:port[:user:password]\" [timeout-ms]"); + + ray_t* x = args[0]; if (!ray_is_atom(x) || x->type != -RAY_STR) return ray_error("type", ".ipc.open expects a string \"host:port[:user:password]\", got %s", ray_type_name(x->type)); + /* Optional connect timeout in milliseconds (0 = use default). */ + int timeout_ms = 0; + if (n == 2) { + ray_t* t = args[1]; + if (!ray_is_atom(t) || (t->type != -RAY_I64 && t->type != -RAY_I32)) + return ray_error("type", ".ipc.open timeout must be an integer (milliseconds), got %s", ray_type_name(t->type)); + int64_t tv = (t->type == -RAY_I64) ? t->i64 : t->i32; + if (tv < 0) return ray_error("domain", ".ipc.open timeout must be >= 0, got %lld", (long long)tv); + timeout_ms = (tv > INT_MAX) ? INT_MAX : (int)tv; + } + const char* s = ray_str_ptr(x); size_t slen = ray_str_len(x); @@ -896,9 +914,10 @@ ray_t* ray_hopen_fn(ray_t* x) { const char* pw_ptr = (n_parts >= 4) ? password : NULL; const char* us_ptr = (n_parts >= 4) ? user : NULL; - int64_t h = ray_ipc_connect(host, (uint16_t)port, us_ptr, pw_ptr); + int64_t h = ray_ipc_connect(host, (uint16_t)port, us_ptr, pw_ptr, timeout_ms); if (h == -2) return ray_error("access", "server requires authentication"); if (h == -3) return ray_error("access", "authentication failed"); + if (h == -5) return ray_error("io", "connection timed out: %s:%d", host, port); if (h < 0) return ray_error("io", "connection refused: %s:%d", host, port); return make_i64(h); diff --git a/test/rfl/hof/eval_branch_cov.rfl b/test/rfl/hof/eval_branch_cov.rfl index 0c45a829..e78d3a47 100644 --- a/test/rfl/hof/eval_branch_cov.rfl +++ b/test/rfl/hof/eval_branch_cov.rfl @@ -305,15 +305,15 @@ captured_x -- 77 (nil? null) -- true ;; ═══════════════════════════════════════════════════════════════════ -;; 13. ray_try_fn handler dispatch (eval.c 289-301). -;; Handler can be LAMBDA (line 289), UNARY (line 292), else type. -;; The LAMBDA path is heavily covered; UNARY exercise via builtin -;; unary fn as handler (also covered by try_raise.rfl §3). The -;; `else` arm (line 295-297) requires a non-fn handler value. -;; A VARY (list) handler reaches the else arm. -;; ═══════════════════════════════════════════════════════════════════ -(try (raise 5) list) !- type -(try (raise 5) +) !- type +;; 13. ray_try_handle dispatch (eval.c). +;; A 1-arg-callable handler (LAMBDA or UNARY builtin) is invoked with +;; the error; ANY other 2nd arg — including multi-arg builtins (VARY +;; `list`, BINARY `+`) and plain values — is returned as a fallback +;; value (a function is a first-class value). This is the fallback +;; arm; the LAMBDA/UNARY call arms are covered by try_raise.rfl. +;; ═══════════════════════════════════════════════════════════════════ +(type (try (raise 5) list)) -- (type list) +(type (try (raise 5) +)) -- (type +) ;; Handler that itself raises → recursive error path inside the handler. (try (raise 5) (fn [e] (raise (* e 10)))) !- domain diff --git a/test/rfl/hof/try_raise.rfl b/test/rfl/hof/try_raise.rfl index 5f541c71..7f33e510 100644 --- a/test/rfl/hof/try_raise.rfl +++ b/test/rfl/hof/try_raise.rfl @@ -27,8 +27,31 @@ ;; ── handler expression itself fails → propagates that error ───────────────── (try (raise 1) no_such_fn) !- name -;; ── type error handler: invalid handler type ──────────────────────────────── -(try (raise 1) "not-a-fn") !- type +;; ── non-callable handler → returned as a fallback VALUE ───────────────────── +;; A 2nd arg that is not a function is the value to return on failure. This +;; is the only way to surface an outer binding from the failure branch, since +;; Rayfall lambdas do not capture closures. (Interpreter path.) +(try (raise 1) "not-a-fn") -- "not-a-fn" +(try (raise 1) 42) -- 42 +(try (raise 1) [1 2 3]) -- [1 2 3] +(try (raise 1) "") -- "" +;; A list expression in the handler slot is evaluated; its value is the +;; fallback (empty list, list ctor, and an arbitrary value-expression). +(try (raise 1) ()) -- () +(try (raise 1) (list 1 2 3)) -- (list 1 2 3) +(try (raise 1) (+ 2 3)) -- 5 +;; The fallback is only evaluated on failure; the success result wins. +(try 7 "fallback") -- 7 +(try 7 ()) -- 7 +;; The fallback expression is evaluated in the current scope. +(set outer 123) +(try (raise "x") outer) -- 123 + +;; ── compiled path: fallback returns the outer lambda parameter ────────────── +;; The reported case — `data` is the lambda's bound param, returned on failure. +((fn [data] (try (raise "Oops") data)) 123) -- 123 +((fn [data] (try (+ data 1) data)) 5) -- 6 +((fn [a b] (try (raise 0) b)) 1 2) -- 2 ;; ── nested try ────────────────────────────────────────────────────────────── (try (try (raise 5) (fn [e] (raise (+ e 10)))) (fn [e] e)) -- 15 diff --git a/test/rfl/query/where_within.rfl b/test/rfl/query/where_within.rfl new file mode 100644 index 00000000..c832d4a5 --- /dev/null +++ b/test/rfl/query/where_within.rfl @@ -0,0 +1,41 @@ +;; `within` as a select WHERE predicate (issue #284). +;; +;; `within` works as a plain vector op but used to be rejected by the DAG +;; WHERE compiler. It is now lowered to `(and (>= col lo) (<= col hi))` +;; inside `compile_expr_dag` (src/ops/query.c), so it compiles to the same +;; comparison opcodes any hand-written range predicate would — inheriting +;; range-index rowsel, partition pruning, null handling and type promotion. +;; +;; The range operand must be a compile-time-constant 2-element vector, the +;; same literal constraint OP_IN's set operand has. + +;; -------------------------------------------------------------------- +;; I64 column — inclusive on both ends. +;; -------------------------------------------------------------------- +(set t (table [sn] (list [1 2 3 4 5]))) +;; sn in [2 4] → {2,3,4}. +(at (select {from: t where: (within sn [2 4]) c: sn}) 'c) -- [2 3 4] +;; Equivalent hand-written range predicate produces the same rows. +(at (select {from: t where: (and (>= sn 2) (<= sn 4)) c: sn}) 'c) -- [2 3 4] +;; Lower bound inclusive: [1 1] keeps exactly row 1. +(at (select {from: t where: (within sn [1 1]) c: sn}) 'c) -- [1] +;; Whole range keeps everything. +(count (select {from: t where: (within sn [1 5])})) -- 5 +;; Empty range keeps nothing. +(count (select {from: t where: (within sn [4 2])})) -- 0 + +;; -------------------------------------------------------------------- +;; F64 column — promotion path (F64 col vs integer-literal bounds). +;; -------------------------------------------------------------------- +(set tf (table [p] (list [1.0 2.5 3.5 4.5 9.0]))) +(at (select {from: tf where: (within p [2 5]) c: p}) 'c) -- [2.5 3.5 4.5] + +;; -------------------------------------------------------------------- +;; Inside an AND chain alongside other conjuncts (chained-filter path). +;; -------------------------------------------------------------------- +(set Nrow 1000) +(set T0 (table [k v] (list (take ['A 'B 'C] Nrow) (til Nrow)))) +;; (and (within v [100 500]) (!= k 'C)) — v in {100..500} minus r%3==2. +;; Oracle via explicit comparisons must match (single line per the DSL). +(count (select {from: T0 where: (and (within v [100 500]) (!= k 'C))})) -- (count (select {from: T0 where: (and (>= v 100) (<= v 500) (!= k 'C))})) +(sum (at (select {from: T0 where: (and (within v [100 500]) (!= k 'C))}) 'v)) -- (sum (at (select {from: T0 where: (and (>= v 100) (<= v 500) (!= k 'C))}) 'v)) diff --git a/test/rfl/system/ipc_first_last.rfl b/test/rfl/system/ipc_first_last.rfl new file mode 100644 index 00000000..fcd36642 --- /dev/null +++ b/test/rfl/system/ipc_first_last.rfl @@ -0,0 +1,54 @@ +;; Regression: `first`/`last` returned over IPC used to hang the client +;; forever (issue #285). +;; +;; Root cause: `(first v)` / `(last v)` over an I64/F64 vector build a +;; deferred-DAG *lazy* result (AGG_VEC_VIA_DAG → ray_lazy_wrap). The +;; string-payload eval path (ray_eval_str) materializes it, but the +;; expression-list payload path (ray_eval) returned the lazy object +;; verbatim. send_response could not serialize a lazy object +;; (ray_serde_size <= 0) and silently sent nothing, so the client blocked +;; forever waiting for a reply. +;; +;; Fix: eval_payload_core now materializes a lazy result before it reaches +;; the wire, and send_response substitutes a serializable error rather than +;; dropping an unserializable reply (so a future unserializable value +;; surfaces as a clean client-side error instead of a hang). +;; +;; Port 19996 is fixed and unprivileged; distinct from the other IPC tests. + +;; ── lifecycle: spawn server ──────────────────────────────────────── +(.sys.exec "pkill -TERM -f 'rayforce -p 19996' 2>/dev/null; true") +(.sys.exec "./rayforce -p 19996 /dev/null 2>&1 &") +(.sys.exec "for i in $(seq 30); do bash -c '(echo > /dev/tcp/127.0.0.1/19996) 2>/dev/null' && exit 0; sleep 0.1; done; exit 1") -- 0 + +(set h (.ipc.open "127.0.0.1:19996")) +(>= h 0) -- true + +;; ── expression-list payload returning first/last — the hang repro ── +;; `(list 'first [..])` is sent as an expression list; the server evals it +;; directly via ray_eval (the path that returned the un-materialized lazy). +(.ipc.send h (list 'first [10 20 30])) -- 10 +(.ipc.send h (list 'last [10 20 30])) -- 30 +;; Float vectors take the same DAG/lazy path. +(.ipc.send h (list 'first [1.5 2.5 3.5])) -- 1.5 +(.ipc.send h (list 'last [1.5 2.5 3.5])) -- 3.5 + +;; ── via a server-side lambda, exactly as reported in the issue ───── +(.ipc.send h "(set f1 (fn [] (first [10 20 30])))") +(.ipc.send h "(set f2 (fn [] (last [10 20 30])))") +(.ipc.send h (list 'f1)) -- 10 +(.ipc.send h (list 'f2)) -- 30 + +;; ── differential oracle: wire result equals local result ─────────── +(first [10 20 30]) -- (.ipc.send h (list 'first [10 20 30])) +(last [10 20 30]) -- (.ipc.send h (list 'last [10 20 30])) + +;; ── sum/min/max take the same lazy path; confirm they round-trip too +(.ipc.send h (list 'sum [10 20 30])) -- 60 +(.ipc.send h (list 'max [10 20 30])) -- 30 +(.ipc.send h (list 'min [10 20 30])) -- 10 + +;; ── teardown ─────────────────────────────────────────────────────── +(try (.ipc.send h "(exit 0)") (fn [e] 0)) +(.ipc.close h) +(.sys.exec "sleep 0.1; pkill -KILL -f 'rayforce -p 19996' 2>/dev/null; true") diff --git a/test/rfl/system/ipc_open_timeout.rfl b/test/rfl/system/ipc_open_timeout.rfl new file mode 100644 index 00000000..0df1f448 --- /dev/null +++ b/test/rfl/system/ipc_open_timeout.rfl @@ -0,0 +1,51 @@ +;; Connect timeout argument for `.ipc.open` (issue #286). +;; +;; `.ipc.open` is now a variadic builtin accepting an optional second +;; argument — a connect/handshake timeout in milliseconds: +;; +;; (.ipc.open "host:port") +;; (.ipc.open "host:port" 2000) +;; +;; The timeout bounds the TCP connect (driven non-blocking + poll in +;; src/core/sock.c) and the handshake I/O. A blocking connect() ignores +;; SO_*TIMEO, so without this a dead/filtered peer would hang for the OS +;; default (~minutes). +;; +;; Port 19998 is fixed and unprivileged; distinct from ipc_diff.rfl's +;; 19999 so the two never collide. `make test` is sequential anyway. + +;; ── argument validation (no server needed) ───────────────────────── +;; The harness `!-` operator matches the error tag (ray_fmt(err, 0)), +;; so assert on the tag; the human-readable detail is exercised by the +;; REPL. Wrong arity → rank. +(.ipc.open "127.0.0.1:19998" 1 2) !- rank +;; Non-integer timeout → type. +(.ipc.open "127.0.0.1:19998" "soon") !- type +;; Negative timeout → domain. +(.ipc.open "127.0.0.1:19998" -5) !- domain +;; A refused peer reports an error promptly rather than hanging — port +;; 19997 has no listener, so the connect is refused (RST) → io. +(.ipc.open "127.0.0.1:19997" 500) !- io + +;; ── lifecycle: spawn server ──────────────────────────────────────── +(.sys.exec "pkill -TERM -f 'rayforce -p 19998' 2>/dev/null; true") +(.sys.exec "./rayforce -p 19998 /dev/null 2>&1 &") +(.sys.exec "for i in $(seq 30); do bash -c '(echo > /dev/tcp/127.0.0.1/19998) 2>/dev/null' && exit 0; sleep 0.1; done; exit 1") -- 0 + +;; ── happy path: timeout arg does not break a real connect ────────── +(set h (.ipc.open "127.0.0.1:19998" 2000)) +(>= h 0) -- true +;; Round-trip over the wire to prove the handshake completed normally. +(+ 3 4) -- (.ipc.send h "(+ 3 4)") +(sum [1 2 3 4 5]) -- (.ipc.send h "(sum [1 2 3 4 5])") +(.ipc.close h) + +;; ── backward compatibility: 1-arg form still works ──────────────── +(set h2 (.ipc.open "127.0.0.1:19998")) +(>= h2 0) -- true +(* 6 7) -- (.ipc.send h2 "(* 6 7)") + +;; ── teardown ─────────────────────────────────────────────────────── +(try (.ipc.send h2 "(exit 0)") (fn [e] 0)) +(.ipc.close h2) +(.sys.exec "sleep 0.1; pkill -KILL -f 'rayforce -p 19998' 2>/dev/null; true") diff --git a/test/test_ipc.c b/test/test_ipc.c index 71267a97..8fb46814 100644 --- a/test/test_ipc.c +++ b/test/test_ipc.c @@ -188,7 +188,7 @@ static test_result_t test_ipc_send_verbose(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Send via verbose path — server captures stdout/stderr and returns @@ -245,7 +245,7 @@ static test_result_t test_ipc_send_verbose_captures_output(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Use (println 42) — writes "42\n" to stdout via fwrite/fflush. @@ -403,7 +403,7 @@ static test_result_t test_ipc_send_list_select_msg(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); const char* setup_src = "(set t (table [sym px] (list [AAPL GOOG] [10.0 20.0])))"; @@ -443,7 +443,7 @@ static test_result_t test_ipc_send_list_select_msg(void) { */ static test_result_t test_ipc_connect_fail_no_server(void) { /* Connect to port 1 (reserved, always refused) */ - int64_t bad_h = ray_ipc_connect("127.0.0.1", 1, NULL, NULL); + int64_t bad_h = ray_ipc_connect("127.0.0.1", 1, NULL, NULL, 0); TEST_ASSERT_EQ_I(bad_h, -1); PASS(); } @@ -470,7 +470,7 @@ static test_result_t test_ipc_connect_auth_no_user(void) { ray_thread_create(&tid, server_thread_fn, &ctx); /* Connect with NULL user but valid password */ - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, "mypass"); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, "mypass", 0); TEST_ASSERT((h) >= (0), "h >= 0 (auth with no user)"); ray_t* msg = ray_str("(+ 1 1)", 7); @@ -561,7 +561,7 @@ static test_result_t test_ipc_poll_based_listen(void) { sleep_ms(20); /* Client: connect and send a query */ - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "poll client h >= 0"); ray_t* msg = ray_str("(+ 3 4)", 7); @@ -619,7 +619,7 @@ static test_result_t test_ipc_poll_auth_creds_path(void) { ray_thread_create(&tid, (void(*)(void*))poll_server_thread_fn, &pctx); sleep_ms(20); - int64_t h = ray_ipc_connect("127.0.0.1", port, "user", "pollpass"); + int64_t h = ray_ipc_connect("127.0.0.1", port, "user", "pollpass", 0); TEST_ASSERT((h) >= (0), "connect with correct password should succeed"); if (h >= 0) ray_ipc_close(h); @@ -658,11 +658,11 @@ static test_result_t test_ipc_poll_auth_reject(void) { sleep_ms(20); /* Connect with wrong password: should get -3 (auth rejected) */ - int64_t h = ray_ipc_connect("127.0.0.1", port, "user", "wrongpass"); + int64_t h = ray_ipc_connect("127.0.0.1", port, "user", "wrongpass", 0); TEST_ASSERT_EQ_I(h, -3); /* Connect with no password: should get -2 (auth required but no creds) */ - int64_t h2 = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h2 = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT_EQ_I(h2, -2); poll_stop(poll, port); @@ -710,7 +710,7 @@ static test_result_t test_ipc_poll_handshake_version_mismatch(void) { ray_sock_close(s); /* A correct client should still work after the bad handshake */ - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "well-behaved client still connects"); ray_ipc_close(h); @@ -743,7 +743,7 @@ static test_result_t test_ipc_send_large_compressible(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Build a large string with many repeated chars so it serializes large. */ @@ -817,7 +817,7 @@ static test_result_t test_ipc_journal_path(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); ray_t* msg = ray_str("(+ 10 5)", 8); @@ -925,7 +925,7 @@ static test_result_t test_ipc_poll_async_send(void) { ray_thread_create(&tid, (void(*)(void*))poll_server_thread_fn, &pctx); sleep_ms(20); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); ray_t* msg = ray_str("(+ 1 1)", 7); @@ -969,7 +969,7 @@ static test_result_t test_ipc_poll_multiple_requests(void) { ray_thread_create(&tid, (void(*)(void*))poll_server_thread_fn, &pctx); sleep_ms(20); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); for (int i = 1; i <= 5; i++) { @@ -1051,7 +1051,7 @@ static test_result_t test_ipc_poll_bad_header(void) { ray_sock_close(s); /* Server should still be running for next client */ - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "server still running after bad header"); ray_ipc_close(h); @@ -1087,7 +1087,7 @@ static test_result_t test_ipc_send_large_result(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Build an expression that generates a large result. @@ -1144,7 +1144,7 @@ static test_result_t test_ipc_send_large_msg_client_compress(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Build a 300-element i64 vector with sequential values 0..299. @@ -1205,7 +1205,7 @@ static test_result_t test_ipc_send_verbose_large_result(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* (til 1000) returns a 1000-element i64 vector (~8000 bytes serialized). @@ -1258,9 +1258,9 @@ static test_result_t test_ipc_server_destroy_active_conns(void) { ray_thread_create(&tid, server_thread_fn, &ctx); /* Connect two clients */ - int64_t h1 = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h1 = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h1) >= (0), "h1 >= 0"); - int64_t h2 = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h2 = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h2) >= (0), "h2 >= 0"); /* Do one round-trip to ensure the server has accepted the connections */ @@ -1354,7 +1354,7 @@ static test_result_t test_ipc_server_conn_swap(void) { ray_sock_close(s1); /* s2 should still work; do a proper round-trip on it */ - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); if (h >= 0) { ray_t* msg = ray_str("(+ 1 1)", 7); ray_t* r = ray_ipc_send(h, msg); @@ -1409,7 +1409,7 @@ static test_result_t test_ipc_journal_restricted(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* SYNC message → eval_payload_core sets restricted flag on log header */ @@ -1458,7 +1458,7 @@ static test_result_t test_ipc_send_lazy_msg(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Build a lazy that materialises to int 15 (sum of 1..5). */ @@ -1567,7 +1567,7 @@ static test_result_t test_ipc_hooks_lifecycle(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* One SYNC round-trip — drives on.open (after handshake), on.sync @@ -1661,11 +1661,11 @@ static test_result_t test_ipc_hooks_auth_narrow(void) { /* "ban":secret → password is correct, but the hook returns false * → handshake rejected with the same 0x01 byte the wrong-password * path uses, so the client surfaces -3 (auth rejected). */ - int64_t h_banned = ray_ipc_connect("127.0.0.1", port, "ban", "secret"); + int64_t h_banned = ray_ipc_connect("127.0.0.1", port, "ban", "secret", 0); TEST_ASSERT_EQ_I(h_banned, -3); /* "ok":secret → both checks pass, connection succeeds. */ - int64_t h_ok = ray_ipc_connect("127.0.0.1", port, "ok", "secret"); + int64_t h_ok = ray_ipc_connect("127.0.0.1", port, "ok", "secret", 0); TEST_ASSERT((h_ok) >= (0), "h_ok >= 0"); if (h_ok >= 0) ray_ipc_close(h_ok); @@ -1710,7 +1710,7 @@ static test_result_t test_ipc_post_delivery(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Post async via the BUILTIN — build the call string with the live @@ -1848,7 +1848,7 @@ static test_result_t test_ipc_server_push(void) { ray_thread_create(&tid, (void(*)(void*))poll_server_thread_fn, &pctx); sleep_ms(20); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Barrier 1: drains the on.open push before returning. */ diff --git a/test/test_public_api.c b/test/test_public_api.c index 0709e77c..6026aa61 100644 --- a/test/test_public_api.c +++ b/test/test_public_api.c @@ -44,7 +44,7 @@ static void public_api_setup(void) { ray_runtime_create(0, NULL); } static void public_api_teardown(void) { ray_runtime_destroy(__RUNTIME); } static test_result_t test_public_ipc_client_symbols(void) { - int64_t (*connect_fn)(const char*, uint16_t, const char*, const char*) = ray_ipc_connect; + int64_t (*connect_fn)(const char*, uint16_t, const char*, const char*, int) = ray_ipc_connect; void (*close_fn)(int64_t) = ray_ipc_close; ray_t* (*send_fn)(int64_t, ray_t*) = ray_ipc_send; ray_err_t (*async_fn)(int64_t, ray_t*) = ray_ipc_send_async; diff --git a/test/test_store.c b/test/test_store.c index 7c18b777..fa6f6788 100644 --- a/test/test_store.c +++ b/test/test_store.c @@ -3280,7 +3280,7 @@ static test_result_t test_ipc_sync_roundtrip(void) { ray_thread_create(&tid, server_thread_fn, &ctx); /* Client: connect */ - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Client: send sync query "(+ 1 2)" — expects result 3 */ @@ -3331,7 +3331,7 @@ static test_result_t test_ipc_async_send(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Send async — should not block or error */ @@ -3376,7 +3376,7 @@ static test_result_t test_ipc_auth_success(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, "admin", "secret123"); + int64_t h = ray_ipc_connect("127.0.0.1", port, "admin", "secret123", 0); TEST_ASSERT((h) >= (0), "h >= 0"); ray_t* msg = ray_str("(+ 10 20)", 9); @@ -3421,7 +3421,7 @@ static test_result_t test_ipc_auth_reject(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, "admin", "wrong"); + int64_t h = ray_ipc_connect("127.0.0.1", port, "admin", "wrong", 0); TEST_ASSERT_EQ_I(h, -3); srv.running = false; @@ -3456,7 +3456,7 @@ static test_result_t test_ipc_auth_no_creds(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT_EQ_I(h, -2); srv.running = false; @@ -3492,7 +3492,7 @@ static test_result_t test_ipc_restricted(void) { ray_thread_t tid; ray_thread_create(&tid, server_thread_fn, &ctx); - int64_t h = ray_ipc_connect("127.0.0.1", port, "admin", "secret123"); + int64_t h = ray_ipc_connect("127.0.0.1", port, "admin", "secret123", 0); TEST_ASSERT((h) >= (0), "h >= 0"); /* Arithmetic should work */ @@ -3589,7 +3589,7 @@ static test_result_t test_ipc_handshake_version_mismatch(void) { /* A subsequent well-behaved client must still succeed, proving the * server is still running and only the bad handshake was rejected. */ - int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL); + int64_t h = ray_ipc_connect("127.0.0.1", port, NULL, NULL, 0); TEST_ASSERT((h) >= (0), "h >= 0"); ray_ipc_close(h);