Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mkdocs-material>=9.5
6 changes: 3 additions & 3 deletions .github/workflows/pages.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ on:
- 'docs/**'
- 'overrides/**'
- 'mkdocs.yml'
- 'requirements-docs.txt'
- '.github/requirements-docs.txt'
- '.github/workflows/pages.yml'
workflow_dispatch:

Expand All @@ -36,9 +36,9 @@ jobs:
with:
python-version: '3.12'
cache: pip
cache-dependency-path: requirements-docs.txt
cache-dependency-path: .github/requirements-docs.txt
- name: Install MkDocs Material
run: pip install -r requirements-docs.txt
run: pip install -r .github/requirements-docs.txt
- name: Build site (MkDocs)
run: mkdocs build --strict
- uses: actions/configure-pages@v4
Expand Down
14 changes: 14 additions & 0 deletions docs/docs/language/control-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ Recursive lambdas use `self` to refer to the enclosing function:

If no error is raised, `try` returns the result of the body expression normally. Works inside lambdas compiled to bytecode.

### Fallback value

If the second argument is **not** a function, it is returned as-is as the fallback value on error (evaluated only when the body fails). Because lambdas do not capture closures, this is the only way to surface an outer binding from the failure branch:

```lisp
‣ (try (raise "boom") 0)
0

‣ ((fn [data] (try (raise "boom") data)) 123)
123
```

A handler must accept the single error argument, so only a lambda or a unary builtin is *called* with the error; any other value (including a multi-argument builtin) is treated as a fallback value.

## Early Return: return

`return` exits the innermost enclosing compiled lambda early with the given value:
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/language/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ Cross-temporal comparisons are supported: dates, times, and timestamps are all c
| `if` | variadic, special | Conditional (if/then/else) | `(if (> x 0) "pos" "neg")` |
| `do` | variadic, special | Sequential execution, returns last | `(do (set x 1) (set y 2) (+ x y))` |
| `fn` | variadic, special | Create lambda function | `(fn [x] (* x x))` |
| `try` | binary, special | Error handling (expr handler) | `(try (/ 1 0) (fn [e] 0))` |
| `try` | binary, special | Error handling (expr handler-or-fallback) | `(try (/ 1 0) (fn [e] 0))` |
| `raise` | unary | Throw an error | `(raise "bad input")` |
| `return` | variadic | Early return from compiled lambda (0 args → null) | `(return 42)` |
| `quote` | variadic, special | Return argument unevaluated; a bare name becomes a literal symbol (`(quote x)` ≡ `'x`) | `(quote (+ 1 2))` → `(+ 1 2)` |
Expand Down
16 changes: 11 additions & 5 deletions docs/docs/namespaces/ipc.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,25 @@ Connect to a Rayforce server (`./rayforce -p <port>`) and exchange messages over

| Function | Arity | Flags | Description |
|---|---|---|---|
| [`.ipc.open`](#ipc-open) | unary | restricted | Open a TCP connection; return an i64 handle. |
| [`.ipc.open`](#ipc-open) | variadic | restricted | Open a TCP connection (optional connect timeout); return an i64 handle. |
| [`.ipc.send`](#ipc-send) | binary | restricted | Send a message synchronously; return the server's result. |
| [`.ipc.post`](#ipc-post) | binary | restricted | Send a message asynchronously (fire-and-forget); return the null object. |
| [`.ipc.close`](#ipc-close) | unary | restricted | Close a connection handle. |
| [`.ipc.handle`](#ipc-handle) | variadic | — | The current connection's handle inside a server-side hook; `-1` otherwise. |

## `.ipc.open` { #ipc-open }

Signature: `(.ipc.open "host:port")` or `(.ipc.open "host:port:user:password")`.
Signature: `(.ipc.open "host:port")` or `(.ipc.open "host:port:user:password")`, with an optional trailing connect timeout in milliseconds: `(.ipc.open "host:port" 2000)`.

Returns: an `i64` handle. Negative handles never escape — errors are surfaced as Rayfall error objects:

- `type` — argument is not a string.
- `domain` — malformed address (missing port, port out of `(0, 65535]`, oversized host/user/password).
- `type` — address argument is not a string, or the timeout argument is not an integer.
- `rank` — called with fewer than 1 or more than 2 arguments.
- `domain` — malformed address (missing port, port out of `(0, 65535]`, oversized host/user/password), or a negative timeout.
- `access` — server requires auth and you didn't supply credentials, **or** the password is wrong.
- `io` — connection refused / network error.
- `io` — connection refused / network error, or `connection timed out` when the connect did not complete within the timeout.

The optional timeout bounds **both** the TCP connect and the handshake I/O. A blocking `connect()` ignores socket send/receive timeouts, so without an explicit bound a dead or packet-filtered peer would otherwise hang for the operating-system default (often minutes). When omitted, a default budget of 5 seconds applies.

The handshake exchanges a 2-byte `{wire_version, auth_flag}` greeting. A wire-version mismatch closes the connection before any payload is exchanged.

Expand All @@ -37,6 +40,9 @@ The handshake exchanges a 2-byte `{wire_version, auth_flag}` greeting. A wire-ve

;; With credentials (server started with -u or -U)
(set h (.ipc.open "127.0.0.1:5000:admin:secret123"))

;; Fail fast if the peer doesn't answer within 2 seconds
(set h (.ipc.open "127.0.0.1:5000" 2000))
```

## `.ipc.send` { #ipc-send }
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/reference/all-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ Special forms receive their arguments unevaluated. These are the core language p
| `if` | variadic | special | Conditional: (if cond then else) | `(if (> x 0) "pos" "neg")` |
| `do` | variadic | special | Sequential execution, returns last value | `(do (set x 1) (set y 2) (+ x y))` |
| `fn` | variadic | special | Create lambda function | `(fn [x y] (+ x y))` |
| `try` | binary | special | Error handling: (try expr handler-fn) | `(try (/ 1 0) (fn [e] 0))` |
| `try` | binary | special | Error handling: (try expr handler-fn-or-fallback-value) | `(try (/ 1 0) (fn [e] 0))` |
| `raise` | unary | — | Throw an error with message | `(raise "bad input")` |
| `return` | unary | — | Early return from function body | `(return 42)` |
| `quote` | variadic | special | Return argument unevaluated; a bare name yields a literal symbol (`(quote x)` ≡ `'x`) | `(quote (+ 1 2))` → `(+ 1 2)` |
Expand Down Expand Up @@ -530,7 +530,7 @@ TCP-based IPC for connecting to remote Rayforce instances. Uses binary serializa

| Function | Type | Flags | Description | Example |
|---|---|---|---|---|
| `.ipc.open` | unary | restricted | Open TCP connection to host:port, returns handle | `(.ipc.open "localhost:5000")` |
| `.ipc.open` | variadic | restricted | Open TCP connection to host:port (optional connect timeout in ms), returns handle | `(.ipc.open "localhost:5000" 2000)` |
| `.ipc.close` | unary | restricted | Close an IPC connection handle | `(.ipc.close h)` |
| `.ipc.send` | binary | restricted | Send a value over an IPC handle (sync request) | `(.ipc.send h "(sum (til 100))")` |
| `.ipc.handle` | variadic | — | Current connection handle inside any `.ipc.on.*` hook, `-1` outside | `(.ipc.handle)` |
Expand Down
3 changes: 2 additions & 1 deletion include/rayforce.h
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ ray_t* ray_fmt(ray_t* obj, int mode);
* exchange; ray_ipc_send_async sends a fire-and-forget frame. */

int64_t ray_ipc_connect(const char* host, uint16_t port,
const char* user, const char* password);
const char* user, const char* password,
int timeout_ms);
void ray_ipc_close(int64_t handle);
ray_t* ray_ipc_send(int64_t handle, ray_t* msg);
ray_err_t ray_ipc_send_async(int64_t handle, ray_t* msg);
Expand Down
2 changes: 1 addition & 1 deletion src/app/repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ ray_t* ray_repl_connect_fn(ray_t* host_port_str) {
* and the connect-error-to-rayfall-error mapping; reusing it
* keeps .repl.connect a one-line wrapper instead of a parallel
* implementation that drifts. */
ray_t* opened = ray_hopen_fn(host_port_str);
ray_t* opened = ray_hopen_fn(&host_port_str, 1);
if (!opened || RAY_IS_ERR(opened)) return opened;
if (!ray_is_atom(opened) ||
(opened->type != -RAY_I64 && opened->type != -RAY_I32)) {
Expand Down
38 changes: 33 additions & 5 deletions src/core/ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,24 @@ static int hook_call_auth(ray_poll_t* poll, int64_t handle,
static void send_response(ray_sock_t fd, ray_t* result)
{
int64_t ser_size = ray_serde_size(result);
if (ser_size <= 0) return;

/* A result we cannot serialize must never leave the client waiting on
* a reply that never arrives. Substitute a serializable error so the
* caller observes a clean failure instead of a silent infinite hang
* (issue #285). ray_error frames serialize, so the substitute always
* goes out unless even that fails — in which case there is nothing we
* can put on the wire and we drop as before. */
ray_t* fallback = NULL;
if (ser_size <= 0) {
fallback = ray_error("type", "result of type %s is not serializable over IPC",
result ? ray_type_name(result->type) : "null");
result = fallback;
ser_size = ray_serde_size(result);
if (ser_size <= 0) { if (fallback) ray_error_free(fallback); return; }
}

uint8_t* payload = (uint8_t*)ray_sys_alloc((size_t)ser_size);
if (!payload) return;
if (!payload) { if (fallback) ray_error_free(fallback); return; }
ray_ser_raw(payload, result);

uint8_t* send_buf = NULL;
Expand Down Expand Up @@ -387,6 +401,7 @@ static void send_response(ray_sock_t fd, ray_t* result)

ray_sys_free(send_buf);
if (payload) ray_sys_free(payload);
if (fallback) ray_error_free(fallback);
}

/* Decompress (when flagged) + de-serialize one framed payload into an
Expand Down Expand Up @@ -501,6 +516,15 @@ static ray_t* eval_payload_core(uint8_t* payload, size_t payload_len,
ray_release(msg);
}
}
/* A lazy result is an internal deferred-DAG representation that cannot
* be serialized — force it to a concrete value before it reaches the
* wire. The direct ray_eval(msg) path (non-STR payloads, e.g. an
* expression list `(first v)`) returns lazy chains verbatim; the
* ray_eval_str path already materializes, so this is a no-op there.
* Without this, send_response cannot serialize the result and the
* client blocks forever waiting for a reply (issue #285). */
if (result && ray_is_lazy(result))
result = ray_lazy_materialize(result); /* consumes the retain */
return result ? result : RAY_NULL_OBJ;
}

Expand Down Expand Up @@ -1401,16 +1425,20 @@ static int64_t conn_write_msg(ray_sock_t fd, ray_t* msg, uint8_t msgtype,
}

int64_t ray_ipc_connect(const char* host, uint16_t port,
const char* user, const char* password)
const char* user, const char* password,
int timeout_ms)
{
/* The connection lives in the active poll's selector table — its
* selector id IS the handle. No poll, no handle namespace: refuse
* up front rather than hand out an integer nothing can resolve. */
ray_poll_t* poll = ipc_active_poll();
if (!poll) return -1;

ray_sock_t fd = ray_sock_connect(host, port, 5000);
if (fd == RAY_INVALID_SOCK) return -1;
/* Default the connect/handshake budget to 5s when the caller gives
* no explicit timeout, matching the long-standing handshake timeout. */
int connect_to = timeout_ms > 0 ? timeout_ms : 5000;
ray_sock_t fd = ray_sock_connect(host, port, connect_to);
if (fd == RAY_INVALID_SOCK) return (errno == ETIMEDOUT) ? -5 : -1;

uint8_t hs[2] = { RAY_SERDE_WIRE_VERSION, 0x00 };
if (ray_sock_send(fd, hs, 2) < 0) {
Expand Down
7 changes: 6 additions & 1 deletion src/core/ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,13 @@ int ray_ipc_poll(ray_ipc_server_t* srv, int timeout_ms);
* the connection's rx machinery while waiting, dispatching any
* interleaved async/sync frames from the peer. */

/* timeout_ms > 0 bounds the TCP connect and the handshake I/O; <= 0 uses
* the default budget. Returns the handle (>= 0) or a negative code:
* -1 refused/error, -2 auth required, -3 auth failed, -4 wire mismatch,
* -5 connect timed out. */
int64_t ray_ipc_connect(const char* host, uint16_t port,
const char* user, const char* password);
const char* user, const char* password,
int timeout_ms);
void ray_ipc_close(int64_t handle);
ray_t* ray_ipc_send(int64_t handle, ray_t* msg);
ray_err_t ray_ipc_send_async(int64_t handle, ray_t* msg);
Expand Down
70 changes: 66 additions & 4 deletions src/core/sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,57 @@ ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms)
return RAY_INVALID_SOCK;
}

/* Set send/recv timeout if requested */
if (timeout_ms > 0) {
/* Bounded connect. A blocking connect() ignores SO_*TIMEO, so to
* honor the requested timeout we connect non-blocking and wait at
* most timeout_ms for the socket to become writable (= connected
* or failed), then restore blocking mode for the handshake. */
ray_sock_set_nonblocking(fd);
int rc = connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen);
if (rc < 0) {
#ifdef RAY_OS_WINDOWS
int werr = WSAGetLastError();
int in_progress = (werr == WSAEWOULDBLOCK || werr == WSAEINPROGRESS);
#else
int in_progress = (errno == EINPROGRESS);
#endif
if (!in_progress) {
ray_sock_close(fd);
freeaddrinfo(res);
return RAY_INVALID_SOCK;
}
struct pollfd pfd = { .fd = fd, .events = POLLOUT };
int pr;
#ifdef RAY_OS_WINDOWS
pr = WSAPoll(&pfd, 1, timeout_ms);
#else
do { pr = poll(&pfd, 1, timeout_ms); } while (pr < 0 && errno == EINTR);
#endif
if (pr == 0) { /* timed out — distinct from refused */
ray_sock_close(fd);
freeaddrinfo(res);
errno = ETIMEDOUT;
return RAY_INVALID_SOCK;
}
if (pr < 0) {
ray_sock_close(fd);
freeaddrinfo(res);
return RAY_INVALID_SOCK;
}
/* Writable: harvest the pending connect result via SO_ERROR. */
int soerr = 0;
socklen_t soerr_len = sizeof(soerr);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char*)&soerr, &soerr_len) < 0
|| soerr != 0) {
ray_sock_close(fd);
freeaddrinfo(res);
if (soerr != 0) errno = soerr;
return RAY_INVALID_SOCK;
}
}
ray_sock_set_blocking(fd);

/* Apply the same budget as the handshake send/recv timeout. */
#ifdef RAY_OS_WINDOWS
DWORD tv = (DWORD)timeout_ms;
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv));
Expand All @@ -119,9 +168,7 @@ ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms)
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
#endif
}

if (connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen) < 0) {
} else if (connect(fd, res->ai_addr, (socklen_t)res->ai_addrlen) < 0) {
ray_sock_close(fd);
freeaddrinfo(res);
return RAY_INVALID_SOCK;
Expand Down Expand Up @@ -218,3 +265,18 @@ ray_err_t ray_sock_set_nonblocking(ray_sock_t s)
#endif
return RAY_OK;
}

ray_err_t ray_sock_set_blocking(ray_sock_t s)
{
#ifdef RAY_OS_WINDOWS
u_long mode = 0;
if (ioctlsocket(s, FIONBIO, &mode) != 0)
return RAY_ERR_IO;
#else
int flags = fcntl(s, F_GETFL, 0);
if (flags < 0) return RAY_ERR_IO;
if (fcntl(s, F_SETFL, flags & ~O_NONBLOCK) < 0)
return RAY_ERR_IO;
#endif
return RAY_OK;
}
8 changes: 8 additions & 0 deletions src/core/sock.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@

ray_sock_t ray_sock_listen(uint16_t port);
ray_sock_t ray_sock_accept(ray_sock_t srv);
/* Connect to host:port. timeout_ms > 0 bounds the connect: the socket
* connects non-blocking and waits at most timeout_ms for completion (a
* blocking connect() ignores SO_*TIMEO and would otherwise hang for the
* OS default), then the same value is applied as SO_RCVTIMEO/SO_SNDTIMEO
* for the subsequent handshake I/O. timeout_ms <= 0 = blocking connect,
* no I/O timeout. On a connect timeout, errno is set to ETIMEDOUT and
* RAY_INVALID_SOCK is returned. */
ray_sock_t ray_sock_connect(const char* host, uint16_t port, int timeout_ms);
int64_t ray_sock_send(ray_sock_t s, const void* buf, size_t len);
int64_t ray_sock_recv(ray_sock_t s, void* buf, size_t len);
Expand All @@ -46,5 +53,6 @@ int64_t ray_sock_recv(ray_sock_t s, void* buf, size_t len);
int ray_sock_wait_readable(ray_sock_t s, int timeout_ms);
void ray_sock_close(ray_sock_t s);
ray_err_t ray_sock_set_nonblocking(ray_sock_t s);
ray_err_t ray_sock_set_blocking(ray_sock_t s);

#endif /* RAY_SOCK_H */
5 changes: 2 additions & 3 deletions src/lang/compile.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,10 @@ static void compile_list(compiler_t *c, ray_t *ast) {
* Stash it, compile handler fn, reload err_val, call. */
emit(c, OP_STOREENV);
emit(c, (uint8_t)err_slot);
compile_expr(c, elems[2]); /* handler fn */
compile_expr(c, elems[2]); /* handler (fn or fallback value) */
emit(c, OP_LOADENV);
emit(c, (uint8_t)err_slot);
emit(c, OP_CALLF);
emit(c, 1); /* call handler(err_val) */
emit(c, OP_TRYH); /* callable → call(err); else value */
patch_jump(c, jmp_pos); /* end */
return;
}
Expand Down
Loading
Loading