Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 14 additions & 14 deletions packages/api/auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,44 @@ license.workspace = true
edition.workspace = true

[dependencies]
rivet-convert.workspace = true
api-helper.workspace = true
async-trait = "0.1"
captcha-verify.workspace = true
chirp-client.workspace = true
rivet-operation.workspace = true
chirp-workflow.workspace = true
chrono = "0.4"
email-verification-complete.workspace = true
email-verification-create.workspace = true
headers = "0.3"
http = "0.2"
hyper = { version = "0.14", features = ["server", "http1", "stream", "tcp"] }
lazy_static = "1.4"
prost = "0.10"
rivet-api.workspace = true
rivet-auth-server.workspace = true
rivet-cache.workspace = true
rivet-claims.workspace = true
rivet-config.workspace = true
rivet-convert.workspace = true
rivet-env.workspace = true
rivet-health-checks.workspace = true
rivet-operation.workspace = true
rivet-pools.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
token-create.workspace = true
token-revoke.workspace = true
tokio = { version = "1.40" }
tracing = "0.1"
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] }
url = "2.2.2"
uuid = { version = "1", features = ["v4"] }
rivet-api.workspace = true

user-get.workspace = true
token-revoke.workspace = true
captcha-verify.workspace = true
email-verification-create.workspace = true
email-verification-complete.workspace = true
user-identity-create.workspace = true
user-resolve-email.workspace = true
user-token-create.workspace = true
user-identity-create.workspace = true
token-create.workspace = true
rivet-config.workspace = true
rivet-env.workspace = true
user.workspace = true
chirp-workflow.workspace = true
uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
rivet-auth.workspace = true
Expand All @@ -54,3 +53,4 @@ faker-user.workspace = true
user-token-create.workspace = true
debug-email-res.workspace = true
user-identity-get.workspace = true

3 changes: 2 additions & 1 deletion packages/api/provision/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ edition.workspace = true
api-helper.workspace = true
async-trait = "0.1"
chirp-client.workspace = true
rivet-operation.workspace = true
chrono = "0.4"
http = "0.2"
hyper = { version = "0.14", features = ["server", "http1", "stream", "tcp"] }
Expand All @@ -18,7 +17,9 @@ prost = "0.10"
rivet-api.workspace = true
rivet-cache.workspace = true
rivet-claims.workspace = true
rivet-convert.workspace = true
rivet-health-checks.workspace = true
rivet-operation.workspace = true
rivet-pools.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
39 changes: 39 additions & 0 deletions packages/api/provision/src/route/datacenters.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use api_helper::{anchor::WatchIndexQuery, ctx::Ctx};
use rivet_api::models;
use rivet_convert::ApiInto;
use rivet_operation::prelude::*;
use serde::Deserialize;

use crate::auth::Auth;

Expand Down Expand Up @@ -30,3 +32,40 @@ pub async fn tls(
job_private_key_pem: job_private_key_pem.clone(),
})
}

#[derive(Deserialize)]
pub struct ServerFilterQuery {
pools: Vec<models::ProvisionPoolType>,
}

// MARK: GET /datacenters/{}/servers
pub async fn servers(
ctx: Ctx<Auth>,
datacenter_id: Uuid,
_watch_index: WatchIndexQuery,
query: ServerFilterQuery,
) -> GlobalResult<models::ProvisionDatacentersGetServersResponse> {
ctx.auth().server()?;

// Find server based on public ip
let servers_res = ctx
.op(cluster::ops::server::list::Input {
filter: cluster::types::Filter {
pool_types: query
.pools
.is_empty()
.then(|| query.pools.into_iter().map(ApiInto::api_into).collect()),
..Default::default()
},
include_destroyed: false,
})
.await?;

Ok(models::ProvisionDatacentersGetServersResponse {
servers: servers_res
.servers
.into_iter()
.map(ApiInto::api_into)
.collect(),
})
}
7 changes: 7 additions & 0 deletions packages/api/provision/src/route/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ define_router! {
),
},

"datacenters" / Uuid / "servers": {
GET: datacenters::servers(
query: datacenters::ServerFilterQuery,
internal_endpoint: true,
),
},

"servers" / Ipv4Addr / "info": {
GET: servers::info(
internal_endpoint: true,
Expand Down
1 change: 1 addition & 0 deletions packages/common/convert/src/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod cloud;
pub mod group;
pub mod identity;
pub mod portal;
pub mod provision;

impl ApiFrom<common::ValidationError> for new_models::ValidationError {
fn api_from(value: common::ValidationError) -> new_models::ValidationError {
Expand Down
42 changes: 42 additions & 0 deletions packages/common/convert/src/impls/provision.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use rivet_api::models;
use rivet_operation::prelude::*;

use crate::{ApiFrom, ApiInto};

impl ApiFrom<models::ProvisionPoolType> for cluster::types::PoolType {
fn api_from(value: models::ProvisionPoolType) -> cluster::types::PoolType {
match value {
models::ProvisionPoolType::Job => cluster::types::PoolType::Job,
models::ProvisionPoolType::Gg => cluster::types::PoolType::Gg,
models::ProvisionPoolType::Ats => cluster::types::PoolType::Ats,
models::ProvisionPoolType::Pegboard => cluster::types::PoolType::Pegboard,
models::ProvisionPoolType::PegboardIsolate => cluster::types::PoolType::PegboardIsolate,
models::ProvisionPoolType::Fdb => cluster::types::PoolType::Fdb,
}
}
}

impl ApiFrom<cluster::types::PoolType> for models::ProvisionPoolType {
fn api_from(value: cluster::types::PoolType) -> models::ProvisionPoolType {
match value {
cluster::types::PoolType::Job => models::ProvisionPoolType::Job,
cluster::types::PoolType::Gg => models::ProvisionPoolType::Gg,
cluster::types::PoolType::Ats => models::ProvisionPoolType::Ats,
cluster::types::PoolType::Pegboard => models::ProvisionPoolType::Pegboard,
cluster::types::PoolType::PegboardIsolate => models::ProvisionPoolType::PegboardIsolate,
cluster::types::PoolType::Fdb => models::ProvisionPoolType::Fdb,
}
}
}

impl ApiFrom<cluster::types::Server> for models::ProvisionServer {
fn api_from(value: cluster::types::Server) -> models::ProvisionServer {
models::ProvisionServer {
server_id: value.server_id,
datacenter_id: value.datacenter_id,
pool_type: value.pool_type.api_into(),
vlan_ip: value.vlan_ip.map(|ip| ip.to_string()),
public_ip: value.public_ip.map(|ip| ip.to_string()),
}
}
}
10 changes: 8 additions & 2 deletions packages/infra/client/isolate-v8-runner/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::collections::HashMap;
use std::{collections::HashMap, path::PathBuf};

use deno_runtime::deno_permissions;
use serde::Deserialize;

#[derive(Clone, Deserialize)]
pub struct Config {
pub actors_path: PathBuf,
pub fdb_cluster_path: PathBuf,
}

/// Config for running an isolate. Similar to runc config.
#[derive(Deserialize)]
pub struct Config {
pub struct ActorConfig {
pub resources: Resources,
pub ports: Vec<Port>,
pub env: HashMap<String, String>,
Expand Down
65 changes: 36 additions & 29 deletions packages/infra/client/isolate-v8-runner/src/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ use nix::{libc, unistd::pipe};
use tokio::{fs, sync::mpsc};
use uuid::Uuid;

use crate::{config::Config, ext, log_shipper, utils};
use crate::{
config::{ActorConfig, Config},
ext, log_shipper, utils,
};

pub fn run(
actors_path: PathBuf,
config: Config,
actor_id: Uuid,
terminate_tx: mpsc::Sender<MainWorkerTerminateHandle>,
) -> Result<()> {
let actor_path = actors_path.join(actor_id.to_string());
let actor_path = config.actors_path.join(actor_id.to_string());

// Write PID to file
std::fs::write(
Expand All @@ -40,38 +43,39 @@ pub fn run(
// Read config
let config_data = std::fs::read_to_string(actor_path.join("config.json"))
.context("Failed to read config file")?;
let config =
serde_json::from_str::<Config>(&config_data).context("Failed to parse config file")?;
let actor_config =
serde_json::from_str::<ActorConfig>(&config_data).context("Failed to parse config file")?;

let (shutdown_tx, shutdown_rx) = smpsc::sync_channel(1);

// Start log shipper
let (msg_tx, log_shipper_thread) = if let Some(vector_socket_addr) = &config.vector_socket_addr
{
let (msg_tx, msg_rx) = smpsc::sync_channel::<log_shipper::ReceivedMessage>(
log_shipper::MAX_BUFFER_BYTES / log_shipper::MAX_LINE_BYTES,
);
let log_shipper = log_shipper::LogShipper {
actor_id,
shutdown_rx,
msg_rx,
vector_socket_addr: vector_socket_addr.clone(),
owner: config.owner.clone(),
let (msg_tx, log_shipper_thread) =
if let Some(vector_socket_addr) = &actor_config.vector_socket_addr {
let (msg_tx, msg_rx) = smpsc::sync_channel::<log_shipper::ReceivedMessage>(
log_shipper::MAX_BUFFER_BYTES / log_shipper::MAX_LINE_BYTES,
);
let log_shipper = log_shipper::LogShipper {
actor_id,
shutdown_rx,
msg_rx,
vector_socket_addr: vector_socket_addr.clone(),
owner: actor_config.owner.clone(),
};
let log_shipper_thread = log_shipper.spawn();

(Some(msg_tx), Some(log_shipper_thread))
} else {
(None, None)
};
let log_shipper_thread = log_shipper.spawn();

(Some(msg_tx), Some(log_shipper_thread))
} else {
(None, None)
};

// Run the isolate
let exit_code = match create_and_run_current_thread(run_inner(
config,
actor_path.clone(),
actor_id,
terminate_tx,
msg_tx.clone(),
config,
actor_config,
))? {
Result::Ok(exit_code) => exit_code,
Err(err) => {
Expand Down Expand Up @@ -119,18 +123,21 @@ pub fn run(
}

pub async fn run_inner(
config: Config,
actor_path: PathBuf,
actor_id: Uuid,
terminate_tx: mpsc::Sender<MainWorkerTerminateHandle>,
msg_tx: Option<smpsc::SyncSender<log_shipper::ReceivedMessage>>,
config: Config,
actor_config: ActorConfig,
) -> Result<i32> {
tracing::info!(?actor_id, "Starting isolate");

// Init KV store (create or open)
let mut kv = ActorKv::new(utils::fdb_handle()?, actor_id);
let mut kv = ActorKv::new(utils::fdb_handle(&config)?, actor_id);
kv.init().await?;

tracing::info!(?actor_id, "Isolate KV initialized");

// Load script into a static module loader. No dynamic scripts can be loaded this way.
let script_content = fs::read_to_string(actor_path.join("index.js"))
.await
Expand All @@ -151,7 +158,7 @@ pub async fn run_inner(
let loopback = Ipv4Addr::new(0, 0, 0, 0);
permissions.net_listen = Permissions::new_unary::<NetListenDescriptor>(
Some(
config
actor_config
.ports
.iter()
.map(|port| {
Expand Down Expand Up @@ -227,8 +234,8 @@ pub async fn run_inner(

// Memory must be aligned with PAGESIZE
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) }.try_into()?;
let mem = floor_align(config.resources.memory.try_into()?, page_size);
let mem_max = floor_align(config.resources.memory_max.try_into()?, page_size);
let mem = floor_align(actor_config.resources.memory.try_into()?, page_size);
let mem_max = floor_align(actor_config.resources.memory_max.try_into()?, page_size);

Some(CreateParams::default().heap_limits(mem, mem_max))
},
Expand All @@ -238,7 +245,7 @@ pub async fn run_inner(
stdout: StdioPipe::file(stdout_writer),
stderr: StdioPipe::file(stderr_writer),
},
env: config.env,
env: actor_config.env,
..Default::default()
},
);
Expand Down
Loading