Skip to content

Latest commit

 

History

History
382 lines (267 loc) · 11.3 KB

File metadata and controls

382 lines (267 loc) · 11.3 KB

🪐 [NexusClient] (Rust)

The [NexusClient] provides a high-level interface for interacting with Nexus. It wraps key functionality including transaction signing and execution, gas management, and DAG workflow execution.


✨ Overview

The [NexusClient] provides access to:

  • [GasActions]: manage gas coins, budgets and tickets
  • [WorkflowActions]: publish and execute workflows (DAGs)
  • [SchedulerActions]: create and manage scheduler tasks, occurrences, and periodic schedules
  • [NetworkAuthActions]: manage message-signing key bindings for Tools/Leader nodes

You can initialize a NexusClient via [NexusClient::builder()] with:

  • a Sui ed25519 private key
  • an RPC URL
  • one or more gas coins + a gas budget
  • the on-chain [NexusObjects]

⚙️ Building a NexusClient

use nexus_sdk::nexus::client::NexusClient;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Build the Nexus client
    let client = NexusClient::builder()
        .with_rpc_url(/* your Sui RPC URL */)
        .with_private_key(/* Your wallet private key */)
        .with_gas(vec![/* your gas coins */], 10_000_000)
        .with_nexus_objects(/* your `nexus_sdk::types::NexusObjects` */)
        .build()
        .await?;

    println!("✅ Nexus client initialized!");

    Ok(())
}

🔏 Network Auth (signed HTTP)

NexusClient exposes network_auth() for Tool/Leader node message-signing key operations:

  • register/rotate a Tool message-signing key on-chain, and
  • export a local allowlist file of permitted Leader nodes for Tool-side verification (no RPC at runtime).
  • continuously sync an allowlist file from on-chain network_auth (polling).

This is the same functionality exposed via the CLI under nexus tool auth ....


🔑 Signer

Signing Mechanism

The Signer struct accepts a [sui::crypto::Ed25519PrivateKey] and is responsible for signing and executing transactions on behalf of the active wallet address.

Key Public Behaviors

  • Automatically signs and executes transactions.
  • Keeps track of active wallet address.
  • Keeps track of gas coins and updates the gas coin references after a transaction.
  • Handles errors as [NexusError].

⛽ Nexus Gas Budget Management

Nexus gas budget is managed through the [GasActions] struct.

Add Budget

use nexus_sdk::{nexus::client::NexusClient, sui};

let coin_object_id: sui::types::Address = /* your coin object ID */;

let result = nexus_client.gas().add_budget(coin_object_id).await?;

println!("Gas budget added in tx: {:?}", result.tx_digest);

What it does:

  • Fetches the given coin object.
  • Adds it to Nexus as available gas budget for workflows.
  • Returns the transaction digest.

Returns:

[AddBudgetResult]: includes the transaction digest.


⚡ Workflow Actions

The [WorkflowActions] API allows you to publish, execute, and inspect DAG-based workflows on Nexus.

1. Publish a DAG

use nexus_sdk::types::Dag;

let dag = serde_json::from_str::<Dag>(include_str!(/* path to your DAG JSON file */))?;

let publish_result = nexus_client.workflow().publish(dag).await?;

println!("Published DAG ID: {:?}", publish_result.dag_object_id);

What it does:

  • Builds and submits a programmable transaction that creates and publishes the DAG object.
  • Returns the transaction digest and created DAG object ID.

Returns:

[PublishResult]: includes the transaction digest and DAG object ID.


2. Execute a Workflow

use nexus_sdk::types::{PortsData, StorageConf};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;

// Prepare input ports data
let mut entry_data: HashMap<String, PortsData> = /* your input data */

// Prepare storage and session
let storage_conf = StorageConf::default();

let execute_result = nexus_client
    .workflow()
    .execute(
        publish_result.dag_object_id,
        entry_data,
        None, // use default entry group
        &storage_conf,
    )
    .await?;

println!("Execution object ID: {:?}", execute_result.execution_object_id);

What it does:

  1. Commits input data to the appropriate storage (inline, Walrus).
  2. Constructs and executes a transaction to start the workflow execution.
  3. Returns the transaction digest and new DAG execution object ID.

Returns:

[ExecuteResult]: includes the transaction digest and execution object ID.


3. Inspect Workflow Execution

use tokio::time::Duration;

let inspect = nexus_client
    .workflow()
    .inspect_execution(
        execute_result.execution_object_id,
        execute_result.tx_digest,
        Some(Duration::from_secs(180)), // timeout
    )
    .await?;

let mut event_stream = inspect.next_event;

// Listen for events
while let Some(event) = event_stream.recv().await {
    println!("Event: {:?}", event);
}

// Await the poller completion
inspect.poller.await??;

println!("✅ Execution finished successfully!");

What it does:

  • Polls Nexus events on Sui to stream live workflow execution updates.
  • Automatically stops when an ExecutionFinished event is detected or timeout is reached.

Returns:

[InspectExecutionResult]: includes an event stream and a poller handle.


⏱️ Scheduler Actions

The [SchedulerActions] API allows you to create and manage on-chain scheduler tasks.

A scheduler task is split into:

  • an execution policy (“what to run”): today this is “begin DAG execution”, but tasks are designed to support additional execution types in the future
  • a constraints policy (“when it may run”): defines when the task is eligible to execute. In the current scheduler this eligibility is time-based and expressed via occurrences (start + optional deadline windows) produced by either queue-based scheduling or periodic scheduling

The scheduler APIs are task/schedule/occurrence oriented; starting DAG executions is just the current default execution policy.

Tasks also carry metadata and lifecycle state (active/paused/canceled), which you can update via update_metadata and set_task_state.

An occurrence is an eligibility window for a single task run (start time + optional deadline + priority_fee_per_gas_unit). When multiple occurrences are eligible, ordering is deterministic: earlier start wins; ties break on higher priority_fee_per_gas_unit; then FIFO.

Each eligible (consumed) occurrence triggers one run of the task’s execution policy. This means the same execution definition can run multiple times:

  • Periodic tasks: the scheduler generates occurrences automatically from a (first_start_ms, period_ms, …) config, so the same execution runs periodically.
  • Queue tasks: you can enqueue any number of occurrences; the task’s execution runs once per occurrence.

Each run is independent: the scheduler does not automatically pass outputs/data from one run to the next. If you need stateful behavior across runs (e.g., chaining results, retries with state, counters/backoff), persist and manage that state externally.

Queue-based scheduling is intentionally generic: by enqueueing occurrences at different times (and with different priorities), you can implement delayed runs, retries, backoff, and other custom strategies.

1. Create a Queue Task

use nexus_sdk::{
    nexus::scheduler::{CreateTaskParams, GeneratorKind},
    types::{NexusData, DEFAULT_ENTRY_GROUP},
};
use std::collections::HashMap;

let input_data = HashMap::from([(
    "entry_vertex".to_string(),
    HashMap::from([(
        "input_port".to_string(),
        NexusData::new_inline(serde_json::json!({"hello": "world"})).commit_inline_plain(),
    )]),
)]);

let queue_task = nexus_client
    .scheduler()
    .create_task(CreateTaskParams {
        dag_id: publish_result.dag_object_id,
        entry_group: DEFAULT_ENTRY_GROUP.to_string(),
        input_data,
        metadata: vec![("env".into(), "demo".into())],
        execution_priority_fee_per_gas_unit: 0,
        initial_schedule: None,
        generator: GeneratorKind::Queue,
    })
    .await?;

println!("Queue task ID: {:?}", queue_task.task_id);

Queue-based tasks can enqueue the first occurrence at creation time by passing initial_schedule: Some(OccurrenceRequest::new(...)).

2. Enqueue a One-Off Occurrence

use nexus_sdk::nexus::scheduler::OccurrenceRequest;

let occurrence = OccurrenceRequest::new(
    Some(/* start_ms */),
    None, // deadline_ms
    None, // start_offset_ms
    Some(/* deadline_offset_ms */),
    0,    // priority_fee_per_gas_unit
    true, // require_start
)?;

let scheduled = nexus_client
    .scheduler()
    .add_occurrence(queue_task.task_id, occurrence)
    .await?;

println!("Occurrence queued in tx: {:?}", scheduled.tx_digest);

3. Create a Periodic Task and Configure Scheduling

use nexus_sdk::{
    nexus::scheduler::{CreateTaskParams, GeneratorKind, PeriodicScheduleConfig},
    types::{NexusData, DEFAULT_ENTRY_GROUP},
};
use std::collections::HashMap;

let input_data = HashMap::from([(
    "entry_vertex".to_string(),
    HashMap::from([(
        "input_port".to_string(),
        NexusData::new_inline(serde_json::json!({"hello": "world"})).commit_inline_plain(),
    )]),
)]);

let periodic_task = nexus_client
    .scheduler()
    .create_task(CreateTaskParams {
        dag_id: publish_result.dag_object_id,
        entry_group: DEFAULT_ENTRY_GROUP.to_string(),
        input_data,
        metadata: vec![("env".into(), "demo".into())],
        execution_priority_fee_per_gas_unit: 0,
        initial_schedule: None,
        generator: GeneratorKind::Periodic,
    })
    .await?;

nexus_client
    .scheduler()
    .configure_periodic(
        periodic_task.task_id,
        PeriodicScheduleConfig {
            first_start_ms: /* absolute timestamp */,
            period_ms: /* period in milliseconds */,
            deadline_offset_ms: None,
            max_iterations: None,
            priority_fee_per_gas_unit: 0,
        },
    )
    .await?;

nexus_client
    .scheduler()
    .disable_periodic(periodic_task.task_id)
    .await?;

4. Update Task Metadata

let updated = nexus_client
    .scheduler()
    .update_metadata(queue_task.task_id, vec![("key".into(), "value".into())])
    .await?;

println!("Metadata updated in tx: {:?}", updated.tx_digest);

5. Pause / Resume / Cancel a Task

use nexus_sdk::nexus::scheduler::TaskStateAction;

nexus_client
    .scheduler()
    .set_task_state(queue_task.task_id, TaskStateAction::Pause)
    .await?;

🧱 Module Summary

Module Purpose
nexus::client Core client, builder, signer, gas management
nexus::gas Gas management for Nexus workflows
nexus::workflow DAG workflow publishing, execution, and event streaming
nexus::scheduler Scheduler tasks, occurrences, and periodic schedules

🧭 Error Handling

All methods return a Result<T, NexusError>. The NexusError enum categorizes issues from configuration errors to RPC and transaction issues.


🪶 Summary

The [NexusClient] aims to make building, publishing, and executing Nexus workflows simple, safe, and async-ready. It abstracts away Sui transaction signing and gas management while providing a clean modular interface.