diff --git a/Cargo.lock b/Cargo.lock index b744955b9c..6b131a692a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2375,6 +2375,7 @@ dependencies = [ "serde_derive 1.0.130", "serde_json 1.0.72", "sgx_crypto_helper", + "thiserror 1.0.30", "url 2.2.2", "ws", ] @@ -2525,6 +2526,17 @@ dependencies = [ "sp-std", ] +[[package]] +name = "itp-primitives-cache" +version = "0.8.0" +dependencies = [ + "lazy_static", + "log 0.4.14 (registry+https://github.com/rust-lang/crates.io-index)", + "sgx_tstd", + "thiserror 1.0.30", + "thiserror 1.0.9", +] + [[package]] name = "itp-settings" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 53e79be04c..7627ee0f68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "core-primitives/extrinsics-factory", "core-primitives/nonce-cache", "core-primitives/ocall-api", + "core-primitives/primitives-cache", "core-primitives/settings", "core-primitives/sgx/crypto", "core-primitives/sgx/io", diff --git a/cli/demo_direct_call.sh b/cli/demo_direct_call.sh index 0cb33decd4..30849a4bc3 100755 --- a/cli/demo_direct_call.sh +++ b/cli/demo_direct_call.sh @@ -42,7 +42,7 @@ NPORT=${NPORT:-9944} RPORT=${RPORT:-2000} echo "Using node-port ${NPORT}" -echo "Using worker-rpc-port ${RPORT}" +echo "Using trusted-worker-port ${RPORT}" AMOUNTSHIELD=50000000000 AMOUNTTRANSFER=40000000000 diff --git a/cli/demo_private_tx.sh b/cli/demo_private_tx.sh index d4614f86b9..e0b2f6f1e8 100755 --- a/cli/demo_private_tx.sh +++ b/cli/demo_private_tx.sh @@ -22,7 +22,7 @@ NPORT=${1:-9944} RPORT=${3:-2000} echo "Using node-port ${NPORT}" -echo "Using worker-rpc-port ${RPORT}" +echo "Using trusted-worker-port ${RPORT}" echo "" CLIENT="./../bin/integritee-cli -p ${NPORT} -P ${RPORT}" diff --git a/cli/demo_shielding_unshielding.sh b/cli/demo_shielding_unshielding.sh index eff7916cd6..5266f6a106 100755 --- a/cli/demo_shielding_unshielding.sh +++ b/cli/demo_shielding_unshielding.sh @@ -42,7 +42,7 @@ NPORT=${NPORT:-9944} RPORT=${RPORT:-2000} echo "Using node-port ${NPORT}" -echo "Using worker-rpc-port ${RPORT}" +echo "Using trusted-worker-port ${RPORT}" echo "" AMOUNTSHIELD=50000000000 diff --git a/cli/src/main.rs b/cli/src/main.rs index 8995c378a4..bf13048e4b 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -102,9 +102,9 @@ fn main() { .help("worker url"), ) .arg( - Arg::with_name("worker-rpc-port") + Arg::with_name("trusted-worker-port") .short("P") - .long("worker-rpc-port") + .long("trusted-worker-port") .global(true) .takes_value(true) .value_name("STRING") @@ -457,10 +457,7 @@ fn get_state(matches: &ArgMatches<'_>, getter: TrustedOperation) -> Option {}, - Err(_) => panic!("Error when sending direct invocation call"), - } + direct_api.watch(jsonrpc_call, sender); loop { match receiver.recv() { @@ -494,7 +491,7 @@ fn encode_encrypt( let worker_api_direct = get_worker_api_direct(matches); let shielding_pubkey: Rsa3072PubKey = match worker_api_direct.get_rsa_pubkey() { Ok(key) => key, - Err(err_msg) => return Err(err_msg), + Err(err_msg) => return Err(err_msg.to_string()), }; let encoded = to_encrypt.encode(); @@ -555,7 +552,7 @@ fn get_worker_api_direct(matches: &ArgMatches<'_>) -> DirectWorkerApi { let url = format!( "{}:{}", matches.value_of("worker-url").unwrap(), - matches.value_of("worker-rpc-port").unwrap() + matches.value_of("trusted-worker-port").unwrap() ); info!("Connecting to integritee-service-direct-port on '{}'", url); DirectWorkerApi::new(url) @@ -606,10 +603,7 @@ fn send_direct_request( debug!("setup sender and receiver"); let (sender, receiver) = channel(); - match direct_api.watch(jsonrpc_call, sender) { - Ok(_) => {}, - Err(_) => panic!("Error when sending direct invocation call"), - } + direct_api.watch(jsonrpc_call, sender); debug!("waiting for rpc response"); loop { diff --git a/core-primitives/enclave-api/ffi/src/lib.rs b/core-primitives/enclave-api/ffi/src/lib.rs index 0cc41456a4..8751beaa27 100644 --- a/core-primitives/enclave-api/ffi/src/lib.rs +++ b/core-primitives/enclave-api/ffi/src/lib.rs @@ -4,7 +4,14 @@ use sgx_types::{c_int, sgx_enclave_id_t, sgx_quote_sign_type_t, sgx_status_t}; extern "C" { - pub fn init(eid: sgx_enclave_id_t, retval: *mut sgx_status_t) -> sgx_status_t; + pub fn init( + eid: sgx_enclave_id_t, + retval: *mut sgx_status_t, + mu_ra_addr: *const u8, + mu_ra_addr_size: u32, + untrusted_worker_addr: *const u8, + untrusted_worker_addr_size: u32, + ) -> sgx_status_t; pub fn get_state( eid: sgx_enclave_id_t, diff --git a/core-primitives/enclave-api/src/enclave_base.rs b/core-primitives/enclave-api/src/enclave_base.rs index d2b050075a..7ec6ecb9a4 100644 --- a/core-primitives/enclave-api/src/enclave_base.rs +++ b/core-primitives/enclave-api/src/enclave_base.rs @@ -33,7 +33,7 @@ use sp_runtime::traits::Header; /// Trait for base/common Enclave API functions pub trait EnclaveBase: Send + Sync + 'static { /// Initialize the enclave (needs to be called once at application startup). - fn init(&self) -> EnclaveResult<()>; + fn init(&self, mu_ra_addr: &str, untrusted_worker_addr: &str) -> EnclaveResult<()>; /// Initialize the direct invocation RPC server. fn init_direct_invocation_server(&self, rpc_server_addr: String) -> EnclaveResult<()>; @@ -63,10 +63,22 @@ pub trait EnclaveBase: Send + Sync + 'static { /// EnclaveApi implementation for Enclave struct impl EnclaveBase for Enclave { - fn init(&self) -> EnclaveResult<()> { + fn init(&self, mu_ra_addr: &str, untrusted_worker_addr: &str) -> EnclaveResult<()> { let mut retval = sgx_status_t::SGX_SUCCESS; - let result = unsafe { ffi::init(self.eid, &mut retval) }; + let encoded_mu_ra_addr = mu_ra_addr.encode(); + let encoded_untrusted_worker_addr = untrusted_worker_addr.encode(); + + let result = unsafe { + ffi::init( + self.eid, + &mut retval, + encoded_mu_ra_addr.as_ptr(), + encoded_mu_ra_addr.len() as u32, + encoded_untrusted_worker_addr.as_ptr(), + encoded_untrusted_worker_addr.len() as u32, + ) + }; ensure!(result == sgx_status_t::SGX_SUCCESS, Error::Sgx(result)); ensure!(retval == sgx_status_t::SGX_SUCCESS, Error::Sgx(retval)); diff --git a/core-primitives/primitives-cache/Cargo.toml b/core-primitives/primitives-cache/Cargo.toml new file mode 100644 index 0000000000..4270ff0341 --- /dev/null +++ b/core-primitives/primitives-cache/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "itp-primitives-cache" +version = "0.8.0" +authors = ["Integritee AG "] +edition = "2018" +resolver = "2" + + +[features] +default = ["std"] +std = [ + "log/std", + "thiserror", +] +sgx = [ + "sgx_tstd", + "thiserror_sgx", +] + +[dependencies] +# sgx dependencies +sgx_tstd = { branch = "master", git = "https://github.com/apache/teaclave-sgx-sdk.git", optional = true } + +# local dependencies + +# sgx enabled external libraries +thiserror_sgx = { package = "thiserror", git = "https://github.com/mesalock-linux/thiserror-sgx", tag = "sgx_1.1.3", optional = true } + +# std compatible external libraries (make sure these versions match with the sgx-enabled ones above) +thiserror = { version = "1.0", optional = true } + +# no-std dependencies +log = { version = "0.4", default-features = false } +lazy_static = { version = "1.1.0", features = ["spin_no_std"] } diff --git a/core-primitives/primitives-cache/src/error.rs b/core-primitives/primitives-cache/src/error.rs new file mode 100644 index 0000000000..2873dd8156 --- /dev/null +++ b/core-primitives/primitives-cache/src/error.rs @@ -0,0 +1,31 @@ +/* + Copyright 2021 Integritee AG and Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +#[cfg(all(not(feature = "std"), feature = "sgx"))] +use crate::sgx_reexport_prelude::*; + +use std::boxed::Box; + +pub type Result = core::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Primitives lock is poisoned")] + LockPoisoning, + #[error(transparent)] + Other(#[from] Box), +} diff --git a/core-primitives/primitives-cache/src/lib.rs b/core-primitives/primitives-cache/src/lib.rs new file mode 100644 index 0000000000..781a082b6e --- /dev/null +++ b/core-primitives/primitives-cache/src/lib.rs @@ -0,0 +1,120 @@ +/* + Copyright 2021 Integritee AG and Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +//! Stores all primitives of the enclave that do need to be accessed often, but are +//! not be frequently mutated, such as keys and server urls. +//! +//! TODO: For now only the mu-ra server and untrusted wokrer url is stored here. Keys and such could also be stored here. + +#![cfg_attr(not(feature = "std"), no_std)] +#![feature(assert_matches)] + +#[cfg(all(feature = "std", feature = "sgx"))] +compile_error!("feature \"std\" and feature \"sgx\" cannot be enabled at the same time"); + +#[cfg(all(not(feature = "std"), feature = "sgx"))] +extern crate sgx_tstd as std; + +// Re-export module to properly feature gate sgx and regular std environment. +#[cfg(all(not(feature = "std"), feature = "sgx"))] +pub mod sgx_reexport_prelude { + pub use thiserror_sgx as thiserror; +} + +#[cfg(feature = "std")] +use std::sync::RwLockReadGuard; +#[cfg(feature = "std")] +use std::sync::RwLockWriteGuard; + +#[cfg(all(not(feature = "std"), feature = "sgx"))] +use std::sync::SgxRwLockReadGuard as RwLockReadGuard; +#[cfg(all(not(feature = "std"), feature = "sgx"))] +use std::sync::SgxRwLockWriteGuard as RwLockWriteGuard; + +use crate::error::Result; +use lazy_static::lazy_static; +use std::{ + string::{String, ToString}, + sync::Arc, +}; + +pub use primitives_cache::PrimitivesCache; + +lazy_static! { + /// Global instance of the primitves cache. + /// + /// Concurrent access is managed internally, using RW locks. + pub static ref GLOBAL_PRIMITIVES_CACHE: Arc = Default::default(); +} + +pub mod error; +pub mod primitives_cache; + +#[derive(Default, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +pub struct Primitives { + mu_ra_url: String, + untrusted_worker_url: String, +} + +impl Primitives { + pub fn new(mu_ra_url: &str, untrusted_worker_url: &str) -> Primitives { + Primitives { + mu_ra_url: mu_ra_url.to_string(), + untrusted_worker_url: untrusted_worker_url.to_string(), + } + } + + pub fn mu_ra_url(&self) -> &str { + &self.mu_ra_url + } + + pub fn untrusted_worker_url(&self) -> &str { + &self.untrusted_worker_url + } +} + +/// Trait to mutate the primitives. +/// +/// Used in a combination of loading a lock and then writing the updated +/// value back, returning the lock again. +pub trait MutatePrimitives { + fn load_for_mutation(&self) -> Result>; +} + +/// Trait to get the primitives. +pub trait GetPrimitives { + /// Returns a clone of the full Primitives struct. + fn get_primitives(&self) -> Result>; + + fn get_mu_ra_url(&self) -> Result; + + fn get_untrusted_worker_url(&self) -> Result; +} + +// Helper function to set primitives of a given cache. +pub fn set_primitives( + cache: &E, + mu_ra_url: &str, + untrusted_worker_url: &str, +) -> Result<()> { + let primitives = Primitives::new(mu_ra_url, untrusted_worker_url); + let mut rw_lock = cache.load_for_mutation()?; + + *rw_lock = primitives; + + Ok(()) +} diff --git a/core-primitives/primitives-cache/src/primitives_cache.rs b/core-primitives/primitives-cache/src/primitives_cache.rs new file mode 100644 index 0000000000..569cc22d4b --- /dev/null +++ b/core-primitives/primitives-cache/src/primitives_cache.rs @@ -0,0 +1,117 @@ +/* + Copyright 2021 Integritee AG and Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +#[cfg(all(not(feature = "std"), feature = "sgx"))] +use std::sync::SgxRwLock as RwLock; +#[cfg(all(not(feature = "std"), feature = "sgx"))] +use std::sync::SgxRwLockReadGuard as RwLockReadGuard; +#[cfg(all(not(feature = "std"), feature = "sgx"))] +use std::sync::SgxRwLockWriteGuard as RwLockWriteGuard; + +#[cfg(feature = "std")] +use std::sync::RwLock; +#[cfg(feature = "std")] +use std::sync::RwLockReadGuard; +#[cfg(feature = "std")] +use std::sync::RwLockWriteGuard; + +use std::string::{String, ToString}; + +use crate::{ + error::{Error, Result}, + GetPrimitives, MutatePrimitives, Primitives, +}; + +/// Local primitives cache. +/// +/// Stores the primitives internally, protected by a RW lock for concurrent access. +#[derive(Default)] +pub struct PrimitivesCache { + primitives_lock: RwLock, +} + +impl PrimitivesCache { + pub fn new(primitives_lock: RwLock) -> Self { + PrimitivesCache { primitives_lock } + } +} + +impl MutatePrimitives for PrimitivesCache { + fn load_for_mutation(&self) -> Result> { + self.primitives_lock.write().map_err(|_| Error::LockPoisoning) + } +} + +impl GetPrimitives for PrimitivesCache { + fn get_primitives(&self) -> Result> { + self.primitives_lock.read().map_err(|_| Error::LockPoisoning) + } + + fn get_mu_ra_url(&self) -> Result { + let primitives_lock = self.primitives_lock.read().map_err(|_| Error::LockPoisoning)?; + Ok(primitives_lock.mu_ra_url().to_string()) + } + + fn get_untrusted_worker_url(&self) -> Result { + let primitives_lock = self.primitives_lock.read().map_err(|_| Error::LockPoisoning)?; + Ok(primitives_lock.untrusted_worker_url().to_string()) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use std::{sync::Arc, thread}; + + #[test] + pub fn set_primitives_works() { + let cache = PrimitivesCache::default(); + let mut lock = cache.load_for_mutation().unwrap(); + let mu_ra_url = "hello"; + let untrusted_url = "world"; + let primitives = Primitives::new(mu_ra_url, untrusted_url); + *lock = primitives.clone(); + std::mem::drop(lock); + assert_eq!(primitives, *cache.get_primitives().unwrap()); + } + + #[test] + pub fn concurrent_read_access_blocks_until_write_is_done() { + let cache = Arc::new(PrimitivesCache::default()); + let mu_ra_url = "hello"; + let untrusted_url = "world"; + let primitives = Primitives::new(mu_ra_url, untrusted_url); + + let mut write_lock = cache.load_for_mutation().unwrap(); + + // Spawn a new thread that reads the primitives. + // This thread should be blocked until the write lock is released, i.e. until + // the new primitves are written. We can verify this, by trying to read the primitives variable + // that will be inserted further down below. + let new_thread_cache = cache.clone(); + let primitives_one = primitives.clone(); + let join_handle = thread::spawn(move || { + let read = new_thread_cache.get_primitives().unwrap(); + assert_eq!(primitives_one, *read); + }); + + *write_lock = primitives; + std::mem::drop(write_lock); + + join_handle.join().unwrap(); + } +} diff --git a/core-primitives/types/src/rpc.rs b/core-primitives/types/src/rpc.rs index 6b81171952..412af7ceb3 100644 --- a/core-primitives/types/src/rpc.rs +++ b/core-primitives/types/src/rpc.rs @@ -5,7 +5,7 @@ use alloc::{borrow::ToOwned, string::String, vec::Vec}; use codec::{Decode, Encode}; use serde::{Deserialize, Serialize}; -#[derive(Encode, Decode)] +#[derive(Encode, Decode, Debug)] pub struct RpcReturnValue { pub value: Vec, pub do_watch: bool, @@ -31,7 +31,7 @@ impl RpcReturnValue { } } -#[derive(Clone, Encode, Decode, Serialize, Deserialize)] +#[derive(Clone, Encode, Decode, Debug, Serialize, Deserialize)] // Todo: result should not be Vec, but `T: Serialize` pub struct RpcResponse { pub jsonrpc: String, diff --git a/core/rpc-client/Cargo.toml b/core/rpc-client/Cargo.toml index 1325e86b30..ca6e044dae 100644 --- a/core/rpc-client/Cargo.toml +++ b/core/rpc-client/Cargo.toml @@ -5,14 +5,15 @@ authors = ["Integritee AG "] edition = "2018" [dependencies] -ws = { version = "0.9.1", features = ["ssl"] } -openssl = { version = "0.10" } -url = { version = "2.0.0" } +codec = { package = "parity-scale-codec", version = "2.0.0", features = ["derive"] } log = "0.4" -serde_json = "1.0" +openssl = { version = "0.10" } serde_derive = "1.0" +serde_json = "1.0" sgx_crypto_helper = { branch = "master", git = "https://github.com/apache/teaclave-sgx-sdk.git" } -codec = { package = "parity-scale-codec", version = "2.0.0", default-features = false, features = ["derive"] } +thiserror = "1.0" +url = { version = "2.0.0" } +ws = { version = "0.9.1", features = ["ssl"] } -[dependencies.itp-types] -path = "../../core-primitives/types" +# local dependencies +itp-types = { path = "../../core-primitives/types" } diff --git a/core/rpc-client/src/direct_client.rs b/core/rpc-client/src/direct_client.rs index 5464d36daf..df75e727db 100644 --- a/core/rpc-client/src/direct_client.rs +++ b/core/rpc-client/src/direct_client.rs @@ -1,197 +1,119 @@ -///! Interface for direct access to a workers rpc. -/// -/// This should be replaced with the `jsonrpsee::WsClient`. It is async an removes a lot of -/// boilerplate code. Example usage in worker/worker.rs. -/// +/* + Copyright 2021 Integritee AG and Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +//! Interface for direct access to a workers rpc. + +use crate::ws_client::WsClient; use codec::Decode; +use itp_types::{DirectRequestStatus, RpcRequest, RpcResponse, RpcReturnValue}; use log::*; -use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode}; +use sgx_crypto_helper::rsa3072::Rsa3072PubKey; use std::{ sync::mpsc::{channel, Sender as MpscSender}, thread, + thread::JoinHandle, }; -use url; -use ws::{ - connect, util::TcpStream, CloseCode, Handler, Handshake, Message, Result as ClientResult, - Sender, -}; - -use itp_types::{DirectRequestStatus, RpcRequest, RpcResponse, RpcReturnValue}; -use sgx_crypto_helper::rsa3072::Rsa3072PubKey; - -pub struct WsClient { - pub out: Sender, - pub request: String, - pub result: MpscSender, - pub do_watch: bool, -} - -impl Handler for WsClient { - fn on_open(&mut self, _: Handshake) -> ClientResult<()> { - debug!("sending request: {:?}", self.request.clone()); - match self.out.send(self.request.clone()) { - Ok(_) => Ok(()), - Err(e) => Err(e), - } - } - - fn on_message(&mut self, msg: Message) -> ClientResult<()> { - info!("got message"); - debug!("{}", msg); - info!("sending result to MpscSender.."); - self.result.send(msg.to_string()).unwrap(); - if !self.do_watch { - info!("do_watch is false, closing connection"); - self.out.close(CloseCode::Normal).unwrap(); - info!("connection is closed"); - } - info!("on_message successful, returning"); - Ok(()) - } - - /// we are overriding the `upgrade_ssl_client` method in order to disable hostname verification - /// this is taken from https://github.com/housleyjk/ws-rs/blob/master/examples/unsafe-ssl-client.rs - /// TODO: hostname verification should probably be enabled again for production? - fn upgrade_ssl_client( - &mut self, - sock: TcpStream, - _: &url::Url, - ) -> ws::Result> { - let mut builder = SslConnector::builder(SslMethod::tls_client()).map_err(|e| { - ws::Error::new( - ws::ErrorKind::Internal, - format!("Failed to upgrade client to SSL: {}", e), - ) - })?; - builder.set_verify(SslVerifyMode::empty()); - - let connector = builder.build(); - connector - .configure() - .unwrap() - .use_server_name_indication(false) - .verify_hostname(false) - .connect("", sock) - .map_err(From::from) - } -} +pub use crate::error::{Error, Result}; #[derive(Clone)] pub struct DirectClient { url: String, } - pub trait DirectApi { - // will remove unit err in refactoring process - #[allow(clippy::result_unit_err)] - fn watch(&self, request: String, sender: MpscSender) -> Result<(), ()>; - fn get_rsa_pubkey(&self) -> Result; + /// Server connection with only one response. + fn get(&self, request: &str) -> Result; + /// Server connection with more than one response. + fn watch(&self, request: String, sender: MpscSender) -> JoinHandle<()>; + fn get_rsa_pubkey(&self) -> Result; + fn get_mu_ra_url(&self) -> Result; + fn get_untrusted_worker_url(&self) -> Result; } impl DirectClient { pub fn new(url: String) -> Self { Self { url } } +} - /// server connection with only one response - #[allow(clippy::result_unit_err)] - pub fn get(&self, request: String) -> Result { - let url = self.url.clone(); +impl DirectApi for DirectClient { + fn get(&self, request: &str) -> Result { let (port_in, port_out) = channel(); info!("[WorkerApi Direct]: (get) Sending request: {:?}", request); - let client = thread::spawn(move || { - match connect(url, |out| WsClient { - out, - request: request.clone(), - result: port_in.clone(), - do_watch: false, - }) { - Ok(c) => c, - Err(_) => { - error!("Could not connect to direct invocation server"); - }, - } - }); - client.join().unwrap(); - - match port_out.recv() { - Ok(p) => Ok(p), - Err(_) => { - error!("[-] [WorkerApi Direct]: error while handling request, returning"); - Err(()) - }, - } + WsClient::connect(&self.url, request, &port_in, false)?; + port_out.recv().map_err(Error::MspcReceiver) } -} -impl DirectApi for DirectClient { - /// server connection with more than one response - #[allow(clippy::result_unit_err)] - fn watch(&self, request: String, sender: MpscSender) -> Result<(), ()> { + fn watch(&self, request: String, sender: MpscSender) -> JoinHandle<()> { + info!("[WorkerApi Direct]: (watch) Sending request: {:?}", request); let url = self.url.clone(); - info!("[WorkerApi Direct]: (watch) Sending request: {:?}", request); - thread::spawn(move || { - info!("attempting to connect to RPC"); - match connect(url, |out| WsClient { - out, - request: request.clone(), - result: sender.clone(), - do_watch: true, - }) { - Ok(c) => { - info!("connect was successful"); - c - }, - Err(_) => { - error!("Could not connect to direct invocation server"); - }, - } - }); - Ok(()) + // Unwrap is fine here, because JoinHandle can be used to handle a Thread panic. + thread::spawn(move || WsClient::connect(&url, &request, &sender, true).unwrap()) } - fn get_rsa_pubkey(&self) -> Result { - // compose jsonrpc call - let method = "author_getShieldingKey".to_owned(); - let jsonrpc_call: String = RpcRequest::compose_jsonrpc_call(method, vec![]); - - let response_str = match Self::get(self, jsonrpc_call) { - Ok(resp) => resp, - Err(err_msg) => - return Err(format! {"Could not retrieve shielding pubkey: {:?}", err_msg}), - }; - - // decode result - let response: RpcResponse = match serde_json::from_str(&response_str) { - Ok(resp) => resp, - Err(err_msg) => - return Err(format! {"Could not retrieve shielding pubkey: {:?}", err_msg}), - }; - let return_value = match RpcReturnValue::decode(&mut response.result.as_slice()) { - Ok(val) => val, - Err(err_msg) => - return Err(format! {"Could not retrieve shielding pubkey: {:?}", err_msg}), - }; - let shielding_pubkey_string: String = match return_value.status { - DirectRequestStatus::Ok => match String::decode(&mut return_value.value.as_slice()) { - Ok(key) => key, - Err(err) => return Err(format! {"Could not retrieve shielding pubkey: {:?}", err}), - }, - _ => match String::decode(&mut return_value.value.as_slice()) { - Ok(err_msg) => - return Err(format! {"Could not retrieve shielding pubkey: {}", err_msg}), - Err(err) => return Err(format! {"Could not retrieve shielding pubkey: {:?}", err}), - }, - }; - let shielding_pubkey: Rsa3072PubKey = match serde_json::from_str(&shielding_pubkey_string) { - Ok(key) => key, - Err(err) => return Err(format! {"Could not retrieve shielding pubkey: {:?}", err}), - }; + fn get_rsa_pubkey(&self) -> Result { + let jsonrpc_call: String = + RpcRequest::compose_jsonrpc_call("author_getShieldingKey".to_string(), vec![]); + + // Send json rpc call to ws server. + let response_str = Self::get(self, &jsonrpc_call)?; + + let shielding_pubkey_string = decode_from_rpc_response(&response_str)?; + let shielding_pubkey: Rsa3072PubKey = serde_json::from_str(&shielding_pubkey_string)?; info!("[+] Got RSA public key of enclave"); Ok(shielding_pubkey) } + + fn get_mu_ra_url(&self) -> Result { + let jsonrpc_call: String = + RpcRequest::compose_jsonrpc_call("author_getMuRaUrl".to_string(), vec![]); + + // Send json rpc call to ws server. + let response_str = Self::get(self, &jsonrpc_call)?; + + let mu_ra_url: String = decode_from_rpc_response(&response_str)?; + + info!("[+] Got mutual remote attestation url of enclave: {}", mu_ra_url); + Ok(mu_ra_url) + } + + fn get_untrusted_worker_url(&self) -> Result { + let jsonrpc_call: String = + RpcRequest::compose_jsonrpc_call("author_getUntrustedUrl".to_string(), vec![]); + + // Send json rpc call to ws server. + let response_str = Self::get(self, &jsonrpc_call)?; + + let untrusted_url: String = decode_from_rpc_response(&response_str)?; + + info!("[+] Got untrusted websocket url of worker: {}", untrusted_url); + Ok(untrusted_url) + } +} + +fn decode_from_rpc_response(json_rpc_response: &str) -> Result { + let rpc_response: RpcResponse = serde_json::from_str(json_rpc_response)?; + let rpc_return_value = RpcReturnValue::decode(&mut rpc_response.result.as_slice())?; + let response_message = String::decode(&mut rpc_return_value.value.as_slice())?; + match rpc_return_value.status { + DirectRequestStatus::Ok => Ok(response_message), + _ => Err(Error::Status(response_message)), + } } diff --git a/core/rpc-client/src/error.rs b/core/rpc-client/src/error.rs new file mode 100644 index 0000000000..e750c859ed --- /dev/null +++ b/core/rpc-client/src/error.rs @@ -0,0 +1,39 @@ +/* + Copyright 2021 Integritee AG and Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ +use codec::Error as CodecError; +use serde_json::Error as JsonError; +use std::{boxed::Box, result::Result as StdResult, sync::mpsc::RecvError}; +use thiserror; +use ws::Error as WsClientError; + +pub type Result = StdResult; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("{0}")] + Codec(#[from] CodecError), + #[error("{0}")] + SerdeJson(#[from] JsonError), + #[error("Validateer returned the following error message: {0}")] + Status(String), + #[error("Websocket error: {0}")] + WsClientError(#[from] WsClientError), + #[error("Faulty channel: {0}")] + MspcReceiver(#[from] RecvError), + #[error("Custom Error: {0}")] + Other(Box), +} diff --git a/core/rpc-client/src/lib.rs b/core/rpc-client/src/lib.rs index 7fdbab8f18..0cbe4c4384 100644 --- a/core/rpc-client/src/lib.rs +++ b/core/rpc-client/src/lib.rs @@ -15,3 +15,7 @@ */ pub mod direct_client; +pub mod error; +#[cfg(test)] +pub mod mock; +pub mod ws_client; diff --git a/core/rpc-client/src/mock.rs b/core/rpc-client/src/mock.rs new file mode 100644 index 0000000000..b16b9b3b47 --- /dev/null +++ b/core/rpc-client/src/mock.rs @@ -0,0 +1,72 @@ +/* + Copyright 2021 Integritee AG and Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +//! Interface for direct access to a workers rpc. + +use crate::{direct_client::DirectApi, error::Result}; +use sgx_crypto_helper::rsa3072::Rsa3072PubKey; +use std::{sync::mpsc::Sender as MpscSender, thread::JoinHandle}; + +#[derive(Clone, Default)] +pub struct DirectClientMock { + rsa_pubkey: Rsa3072PubKey, + mu_ra_url: String, + untrusted_worker_url: String, +} + +impl DirectClientMock { + pub fn new(rsa_pubkey: Rsa3072PubKey, mu_ra_url: String, untrusted_worker_url: String) -> Self { + Self { rsa_pubkey, mu_ra_url, untrusted_worker_url } + } + + pub fn with_rsa_pubkey(mut self, key: Rsa3072PubKey) -> Self { + self.rsa_pubkey = key; + self + } + + pub fn with_mu_ra_url(mut self, url: &str) -> Self { + self.mu_ra_url = url.to_string(); + self + } + + pub fn with_untrusted_worker_url(mut self, url: &str) -> Self { + self.untrusted_worker_url = url.to_string(); + self + } +} + +impl DirectApi for DirectClientMock { + fn get(&self, _request: &str) -> Result { + Ok("Hello_world".to_string()) + } + + fn watch(&self, _request: String, _sender: MpscSender) -> JoinHandle<()> { + unimplemented!() + } + + fn get_rsa_pubkey(&self) -> Result { + Ok(self.rsa_pubkey) + } + + fn get_mu_ra_url(&self) -> Result { + Ok(self.mu_ra_url.clone()) + } + + fn get_untrusted_worker_url(&self) -> Result { + Ok(self.untrusted_worker_url.clone()) + } +} diff --git a/core/rpc-client/src/ws_client.rs b/core/rpc-client/src/ws_client.rs new file mode 100644 index 0000000000..3b33d5c3bd --- /dev/null +++ b/core/rpc-client/src/ws_client.rs @@ -0,0 +1,89 @@ +///! Websocket client implementation to access the direct-rpc-server running inside an enclave. +/// +/// This should be replaced with the `jsonrpsee::WsClient`as soon as available in no-std: +/// https://github.com/paritytech/jsonrpsee/issues/1 +use log::*; +use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode}; +use std::sync::mpsc::Sender as MpscSender; +use url; +use ws::{connect, util::TcpStream, CloseCode, Handler, Handshake, Message, Result, Sender}; + +#[derive(Clone)] +pub struct WsClient { + pub out: Sender, + pub request: String, + pub result: MpscSender, + pub do_watch: bool, +} + +impl WsClient { + pub fn new( + out: Sender, + request: String, + result: MpscSender, + do_watch: bool, + ) -> WsClient { + WsClient { out, request, result, do_watch } + } + + pub fn connect( + url: &str, + request: &str, + result: &MpscSender, + do_watch: bool, + ) -> Result<()> { + connect(url.to_string(), |out| { + WsClient::new(out, request.to_string(), result.clone(), do_watch) + }) + } +} + +impl Handler for WsClient { + fn on_open(&mut self, _: Handshake) -> Result<()> { + debug!("sending request: {:?}", self.request.clone()); + match self.out.send(self.request.clone()) { + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + + fn on_message(&mut self, msg: Message) -> Result<()> { + info!("got message"); + debug!("{}", msg); + info!("sending result to MpscSender.."); + self.result.send(msg.to_string()).unwrap(); + if !self.do_watch { + info!("do_watch is false, closing connection"); + self.out.close(CloseCode::Normal).unwrap(); + info!("connection is closed"); + } + info!("on_message successful, returning"); + Ok(()) + } + + /// we are overriding the `upgrade_ssl_client` method in order to disable hostname verification + /// this is taken from https://github.com/housleyjk/ws-rs/blob/master/examples/unsafe-ssl-client.rs + /// TODO: hostname verification should probably be enabled again for production? + fn upgrade_ssl_client( + &mut self, + sock: TcpStream, + _: &url::Url, + ) -> Result> { + let mut builder = SslConnector::builder(SslMethod::tls_client()).map_err(|e| { + ws::Error::new( + ws::ErrorKind::Internal, + format!("Failed to upgrade client to SSL: {}", e), + ) + })?; + builder.set_verify(SslVerifyMode::empty()); + + let connector = builder.build(); + connector + .configure() + .unwrap() + .use_server_name_indication(false) + .verify_hostname(false) + .connect("", sock) + .map_err(From::from) + } +} diff --git a/core/rpc-server/src/lib.rs b/core/rpc-server/src/lib.rs index ed0f975d6c..2fe6c2e45e 100644 --- a/core/rpc-server/src/lib.rs +++ b/core/rpc-server/src/lib.rs @@ -47,6 +47,7 @@ where let mut module = RpcModule::new(enclave); + // FIXME: import block should be moved to trusted side. module.register_method("sidechain_importBlock", |params, enclave| { debug!("sidechain_importBlock params: {:?}", params); diff --git a/enclave-runtime/Cargo.lock b/enclave-runtime/Cargo.lock index 4c8c8901b6..35b690c78b 100644 --- a/enclave-runtime/Cargo.lock +++ b/enclave-runtime/Cargo.lock @@ -513,6 +513,7 @@ dependencies = [ "itp-extrinsics-factory", "itp-nonce-cache", "itp-ocall-api", + "itp-primitives-cache", "itp-settings", "itp-sgx-crypto", "itp-sgx-io", @@ -1381,6 +1382,16 @@ dependencies = [ "sp-std", ] +[[package]] +name = "itp-primitives-cache" +version = "0.8.0" +dependencies = [ + "lazy_static", + "log 0.4.14 (git+https://github.com/mesalock-linux/log-sgx)", + "sgx_tstd", + "thiserror 1.0.9", +] + [[package]] name = "itp-settings" version = "0.8.0" diff --git a/enclave-runtime/Cargo.toml b/enclave-runtime/Cargo.toml index d044e72696..2bd65160cd 100644 --- a/enclave-runtime/Cargo.toml +++ b/enclave-runtime/Cargo.toml @@ -87,8 +87,9 @@ itc-direct-rpc-server = { path = "../core/direct-rpc-server", default-features = itc-parentchain = { path = "../core/parentchain/parentchain-crate", default-features = false, features = ["sgx"] } itp-component-container = { path = "../core-primitives/component-container", default-features = false, features = ["sgx"] } itp-extrinsics-factory = { path = "../core-primitives/extrinsics-factory", default-features = false, features = ["sgx"]} -itp-ocall-api = { path = "../core-primitives/ocall-api", default-features = false } itp-nonce-cache = { path = "../core-primitives/nonce-cache", default-features = false, features = ["sgx"] } +itp-ocall-api = { path = "../core-primitives/ocall-api", default-features = false } +itp-primitives-cache = { path = "../core-primitives/primitives-cache", default-features = false, features = ["sgx"] } itp-time-utils = { path = "../core-primitives/time-utils", default-features = false, features = ["sgx"] } itp-settings = { path = "../core-primitives/settings" } itp-sgx-io = { path = "../core-primitives/sgx/io", default-features = false, features = ["sgx"] } diff --git a/enclave-runtime/Enclave.edl b/enclave-runtime/Enclave.edl index 10a68f1f4e..c70678c168 100644 --- a/enclave-runtime/Enclave.edl +++ b/enclave-runtime/Enclave.edl @@ -32,7 +32,10 @@ enclave { trusted { /* define ECALLs here. */ - public sgx_status_t init(); + public sgx_status_t init( + [in, size=mu_ra_addr_size] uint8_t* mu_ra_addr, uint32_t mu_ra_addr_size, + [in, size=untrusted_worker_addr_size] uint8_t* untrusted_worker_addr, uint32_t untrusted_worker_addr_size + ); public sgx_status_t get_state( [in, size=cyphertext_size] uint8_t* cyphertext, uint32_t cyphertext_size, diff --git a/enclave-runtime/src/error.rs b/enclave-runtime/src/error.rs index c3bee2a093..f5e03a3713 100644 --- a/enclave-runtime/src/error.rs +++ b/enclave-runtime/src/error.rs @@ -37,6 +37,7 @@ pub enum Error { StfStateHandler(itp_stf_state_handler::error::Error), StfExecution(itp_stf_executor::error::Error), ParentchainBlockImportDispatch(itc_parentchain::block_import_dispatcher::error::Error), + PrimitivesAccess(itp_primitives_cache::error::Error), MutexAccess, Other(Box), } @@ -47,7 +48,7 @@ impl From for sgx_status_t { match error { Error::Sgx(status) => status, _ => { - log::warn!("Tried extracting sgx_status from non-sgx error: {:?}", error); + log::error!("Returning error {:?} as sgx unexpeted.", error); sgx_status_t::SGX_ERROR_UNEXPECTED }, } diff --git a/enclave-runtime/src/lib.rs b/enclave-runtime/src/lib.rs index 29f75be6b9..caf3183f88 100644 --- a/enclave-runtime/src/lib.rs +++ b/enclave-runtime/src/lib.rs @@ -65,6 +65,7 @@ use itp_component_container::{ComponentGetter, ComponentInitializer}; use itp_extrinsics_factory::ExtrinsicsFactory; use itp_nonce_cache::{MutateNonce, Nonce, GLOBAL_NONCE_CACHE}; use itp_ocall_api::EnclaveAttestationOCallApi; +use itp_primitives_cache::GLOBAL_PRIMITIVES_CACHE; use itp_settings::node::{ REGISTER_ENCLAVE, RUNTIME_SPEC_VERSION, RUNTIME_TRANSACTION_VERSION, TEEREX_MODULE, }; @@ -121,14 +122,18 @@ pub type Hash = sp_core::H256; pub type AuthorityPair = sp_core::ed25519::Pair; #[no_mangle] -pub unsafe extern "C" fn init() -> sgx_status_t { - // initialize the logging environment in the enclave +pub unsafe extern "C" fn init( + mu_ra_addr: *const u8, + mu_ra_addr_size: u32, + untrusted_worker_addr: *const u8, + untrusted_worker_addr_size: u32, +) -> sgx_status_t { + // Initialize the logging environment in the enclave. env_logger::init(); if let Err(e) = ed25519::create_sealed_if_absent().map_err(Error::Crypto) { return e.into() } - let signer = match Ed25519Seal::unseal().map_err(Error::Crypto) { Ok(pair) => pair, Err(e) => return e.into(), @@ -139,21 +144,48 @@ pub unsafe extern "C" fn init() -> sgx_status_t { return e.into() } - // create the aes key that is used for state encryption such that a key is always present in tests. - // It will be overwritten anyway if mutual remote attastation is performed with the primary worker + // Create the aes key that is used for state encryption such that a key is always present in tests. + // It will be overwritten anyway if mutual remote attastation is performed with the primary worker. if let Err(e) = aes::create_sealed_if_absent().map_err(Error::Crypto) { return e.into() } let state_handler = GlobalFileStateHandler; - // for debug purposes, list shards. no problem to panic if fails + // For debug purposes, list shards. no problem to panic if fails. let shards = state_handler.list_shards().unwrap(); debug!("found the following {} shards on disk:", shards.len()); for s in shards { debug!("{}", s.encode().to_base58()) } - //shards.into_iter().map(|s| debug!("{}", s.encode().to_base58())); + + let mu_ra_url = + match String::decode(&mut slice::from_raw_parts(mu_ra_addr, mu_ra_addr_size as usize)) + .map_err(Error::Codec) + { + Ok(addr) => addr, + Err(e) => return e.into(), + }; + + let untrusted_worker_url = match String::decode(&mut slice::from_raw_parts( + untrusted_worker_addr, + untrusted_worker_addr_size as usize, + )) + .map_err(Error::Codec) + { + Ok(addr) => addr, + Err(e) => return e.into(), + }; + + if let Err(e) = itp_primitives_cache::set_primitives( + GLOBAL_PRIMITIVES_CACHE.as_ref(), + &mu_ra_url, + &untrusted_worker_url, + ) + .map_err(Error::PrimitivesAccess) + { + return e.into() + } sgx_status_t::SGX_SUCCESS } diff --git a/enclave-runtime/src/rpc/worker_api_direct.rs b/enclave-runtime/src/rpc/worker_api_direct.rs index d15f14ebd2..5a0e032712 100644 --- a/enclave-runtime/src/rpc/worker_api_direct.rs +++ b/enclave-runtime/src/rpc/worker_api_direct.rs @@ -17,6 +17,7 @@ use codec::Encode; use core::result::Result; +use itp_primitives_cache::{GetPrimitives, GLOBAL_PRIMITIVES_CACHE}; use itp_sgx_crypto::Rsa3072Seal; use itp_types::{DirectRequestStatus, RpcReturnValue, H256}; use its_sidechain::{ @@ -74,6 +75,34 @@ where Ok(json!(json_value.encode())) }); + let mu_ra_url_name: &str = "author_getMuRaUrl"; + io.add_sync_method(mu_ra_url_name, move |_: Params| { + let url = match GLOBAL_PRIMITIVES_CACHE.get_mu_ra_url() { + Ok(url) => url, + Err(status) => { + let error_msg: String = format!("Could not get mu ra url due to: {}", status); + return Ok(json!(compute_encoded_return_error(error_msg.as_str()))) + }, + }; + + let json_value = RpcReturnValue::new(url.encode(), false, DirectRequestStatus::Ok); + Ok(json!(json_value.encode())) + }); + + let untrusted_url_name: &str = "author_getUntrustedUrl"; + io.add_sync_method(untrusted_url_name, move |_: Params| { + let url = match GLOBAL_PRIMITIVES_CACHE.get_untrusted_worker_url() { + Ok(url) => url, + Err(status) => { + let error_msg: String = format!("Could not get untrusted url due to: {}", status); + return Ok(json!(compute_encoded_return_error(error_msg.as_str()))) + }, + }; + + let json_value = RpcReturnValue::new(url.encode(), false, DirectRequestStatus::Ok); + Ok(json!(json_value.encode())) + }); + // chain_subscribeAllHeads let chain_subscribe_all_heads_name: &str = "chain_subscribeAllHeads"; io.add_sync_method(chain_subscribe_all_heads_name, |_: Params| { diff --git a/local-setup/github-action-config.json b/local-setup/github-action-config.json index 58b008ce49..c1668721f8 100644 --- a/local-setup/github-action-config.json +++ b/local-setup/github-action-config.json @@ -12,7 +12,9 @@ "source": "bin", "flags": [ "-P", - "2000" + "2000", + "-w", + "2001" ], "subcommand_flags": [ "--skip-ra", @@ -23,7 +25,9 @@ "source": "bin", "flags": [ "-P", - "3000" + "3000", + "-w", + "3001" ], "subcommand_flags": [ "--skip-ra", @@ -31,4 +35,4 @@ ] } ] -} \ No newline at end of file +} diff --git a/local-setup/launch.py b/local-setup/launch.py index 5ca3d7bb4a..69b2d5d4e9 100755 --- a/local-setup/launch.py +++ b/local-setup/launch.py @@ -38,22 +38,16 @@ def run_node(config): print(f'Run node with command: {node_cmd}') return Popen(node_cmd, stdout=node_log, stderr=STDOUT, bufsize=1) - -def key_provider_addr(config): - key_provider = config.get('mu-ra-port', '3443') - return f'localhost:{key_provider}' - - -def run_worker(config, i: int, provider_addr): +def run_worker(config, i: int): log = open(f'{log_dir}/worker{i}.log', 'w+') w = setup_worker(f'/tmp/w{i}', config["source"], log) if i > 1: - print(f'Worker {i} fetching keys from first worker at {provider_addr}.') + print(f'Worker {i} fetching keys from registered worker.') skip_ra = "--skip-ra" in config["subcommand_flags"] print(f'Skip remote attestation: {skip_ra}') - w.request_keys(provider_addr, skip_ra=skip_ra) + w.request_keys(flags=config["flags"], skip_ra=skip_ra) print(f'Starting worker {i} in background') w.run_in_background(log_file=log, flags=config["flags"], subcommand_flags=config["subcommand_flags"]) @@ -67,13 +61,14 @@ def main(processes, config_path): processes.append(run_node(config)) - provider_addr = key_provider_addr(config["workers"][0]) - i = 1 for w_conf in config["workers"]: - processes.append(run_worker(w_conf, i, provider_addr)) + processes.append(run_worker(w_conf, i)) # sleep to prevent nonce clash when bootstrapping the enclave's account sleep(6) + if i == 1: + # Give worker 1 some time to register itself, otherwise key & state sharing will not work. + sleep(60) i += 1 diff --git a/local-setup/py/worker.py b/local-setup/py/worker.py index fd0aa73767..203f7526b7 100644 --- a/local-setup/py/worker.py +++ b/local-setup/py/worker.py @@ -143,15 +143,15 @@ def write_shielding_pub(self): def write_signer_pub(self): return run_subprocess(self.cli + ['signing-key'], stdout=subprocess.PIPE, stderr=self.std_err, cwd=self.cwd) - def request_keys(self, provider_addr: str, skip_ra: bool = False): + def request_keys(self, flags: [str] = None, skip_ra: bool = False): """ Returns the keys from another worker. """ if skip_ra: - flags = ['request-keys', '--skip-ra', provider_addr] + subcommand_flags = ['request-keys', '--skip-ra'] else: - flags = ['request-keys', provider_addr] + subcommand_flags = ['request-keys'] - return run_subprocess(self.cli + flags, stdout=subprocess.PIPE, stderr=self.std_err, cwd=self.cwd) + return run_subprocess(self.cli + flags + subcommand_flags, stdout=subprocess.PIPE, stderr=self.std_err, cwd=self.cwd) def _shard_path(self, shard): return pathlib.Path(f'{self.cwd}/shards/{shard}') diff --git a/local-setup/simple-config.json b/local-setup/simple-config.json index 749a40f06f..229abd831a 100644 --- a/local-setup/simple-config.json +++ b/local-setup/simple-config.json @@ -22,7 +22,9 @@ "-p", "9990", "-r", - "3490" + "3490", + "-w", + "2091" ], "subcommand_flags": [ "--skip-ra", @@ -37,7 +39,9 @@ "-p", "9990", "-r", - "3590" + "3590", + "-w", + "3091" ], "subcommand_flags": [ "--skip-ra", diff --git a/service/src/cli.yml b/service/src/cli.yml index 405d5e9042..ab9f902e01 100644 --- a/service/src/cli.yml +++ b/service/src/cli.yml @@ -26,20 +26,44 @@ args: default_value: "9944" - ws-external: long: ws-external - help: let worker api listen to external requests too + help: Set this flag in case the worker should listen to external requests. - mu-ra-port: short: r long: mu-ra-port help: Set the websocket port to listen for mu-ra requests takes_value: true default_value: "3443" - - worker-rpc-port: + - trusted-worker-port: short: P - long: worker-rpc-port - help: Set the websocket port where the worker rpc direct invocations port listens + long: trusted-worker-port + help: Set the trusted websocket port of the worker, running directly in the enclave. takes_value: true default_value: "2000" - + - untrusted-worker-port: + short: w + long: untrusted-worker-port + help: Set the untrusted websocket port of the worker + takes_value: true + default_value: "2001" + - trusted-external-address: + short: T + long: trusted-external-address + help: Set the trusted worker address to be advertised on the parentchain. If no port is given, the same as in `trusted-worker-port` will be used. + takes_value: true + required: false + - untrusted-external-address: + short: U + long: untrusted-external-address + help: Set the untrusted worker address to be retrieved by a trusted rpc call. If no port is given, the same as in `untrusted-worker-port` will be used. + takes_value: true + required: false + - mu-ra-external-address: + short: M + long: mu-ra-external-address + help: Set the mutual remote attestation worker address to be retrieved by a trusted rpc call. If no port is given, the same as in `mu-ra-port` will be used. + takes_value: true + required: false + subcommands: - run: about: Start the integritee-service @@ -48,11 +72,6 @@ subcommands: long: skip-ra short: s help: skip remote attestation. Set this flag if running enclave in SW mode - - w-server: - short: U - long: worker-url - help: Set the worker's IP address to be advertised in onchain registry. Include port if not 443 - takes_value: true - shard: required: false index: 1 @@ -64,10 +83,6 @@ subcommands: - request-keys: about: join a shard by requesting key provisioning from another worker args: - - provider: - required: true - index: 1 - help: URL and port of a peer worker providing provisioning (i.e. 'my.server.io:3443') - shard: long: shard short: s diff --git a/service/src/config.rs b/service/src/config.rs index 65ef1656ad..3511abf57d 100644 --- a/service/src/config.rs +++ b/service/src/config.rs @@ -1,61 +1,312 @@ use clap::ArgMatches; use serde::{Deserialize, Serialize}; +static DEFAULT_NODE_SERVER: &str = "ws://127.0.0.1"; +static DEFAULT_NODE_PORT: &str = "9944"; +static DEFAULT_TRUSTED_PORT: &str = "2000"; +static DEFAULT_UNTRUSTED_PORT: &str = "2001"; +static DEFAULT_MU_RA_PORT: &str = "3443"; + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Config { pub node_ip: String, pub node_port: String, pub worker_ip: String, - pub worker_rpc_port: String, - /// listening port for the mutual-remote attestation requests - pub worker_mu_ra_port: String, - /// Todo: Is this deprecated? I can only see it in `enclave_perform_ra` - pub ext_api_url: Option, + /// Trusted worker address that will be advertised on the parentchain. + pub trusted_external_worker_address: Option, + /// Port to directly communicate with the trusted tls server inside the enclave. + pub trusted_worker_port: String, + /// Untrusted worker address that will be returned by the dedicated trusted ws rpc call. + pub untrusted_external_worker_address: Option, + /// Port to the untrusted ws of the validateer. + pub untrusted_worker_port: String, + /// Mututal remote attestion address that will be returned by the dedicated trusted ws rpc call. + pub mu_ra_external_address: Option, + /// Port for mutual-remote attestation requests. + pub mu_ra_port: String, } +#[allow(clippy::too_many_arguments)] impl Config { pub fn new( node_ip: String, node_port: String, worker_ip: String, - worker_rpc_port: String, - worker_mu_ra_port: String, + trusted_external_worker_address: Option, + trusted_worker_port: String, + untrusted_external_worker_address: Option, + untrusted_worker_port: String, + mu_ra_external_address: Option, + mu_ra_port: String, ) -> Self { Self { node_ip, node_port, worker_ip, - worker_rpc_port, - worker_mu_ra_port, - ext_api_url: None, + trusted_external_worker_address, + trusted_worker_port, + untrusted_external_worker_address, + untrusted_worker_port, + mu_ra_external_address, + mu_ra_port, } } + /// Returns the client url of the node (including ws://). pub fn node_url(&self) -> String { format!("{}:{}", self.node_ip, self.node_port) } - pub fn worker_url(&self) -> String { - format!("{}:{}", self.worker_ip, self.worker_rpc_port) + pub fn trusted_worker_url_internal(&self) -> String { + format!("{}:{}", self.worker_ip, self.trusted_worker_port) + } + + /// Returns the trusted worker url that should be addressed by external clients. + pub fn trusted_worker_url_external(&self) -> String { + match &self.trusted_external_worker_address { + Some(external_address) => external_address.to_string(), + None => format!("wss://{}:{}", self.worker_ip, self.trusted_worker_port), + } + } + + pub fn untrusted_worker_url(&self) -> String { + format!("{}:{}", self.worker_ip, self.untrusted_worker_port) + } + + /// Returns the untrusted worker url that should be addressed by external clients. + pub fn untrusted_worker_url_external(&self) -> String { + match &self.untrusted_external_worker_address { + Some(external_address) => external_address.to_string(), + None => format!("ws://{}:{}", self.worker_ip, self.untrusted_worker_port), + } } pub fn mu_ra_url(&self) -> String { - format!("{}:{}", self.worker_ip, self.worker_mu_ra_port) + format!("{}:{}", self.worker_ip, self.mu_ra_port) } - pub fn set_ext_api_url(&mut self, url: String) { - self.ext_api_url = Some(url) + /// Returns the mutual remote attestion worker url that should be addressed by external workers. + pub fn mu_ra_url_external(&self) -> String { + match &self.mu_ra_external_address { + Some(external_address) => external_address.to_string(), + None => format!("{}:{}", self.worker_ip, self.mu_ra_port), + } } } impl From<&ArgMatches<'_>> for Config { fn from(m: &ArgMatches<'_>) -> Self { + let trusted_port = m.value_of("trusted-worker-port").unwrap_or(DEFAULT_TRUSTED_PORT); + let untrusted_port = m.value_of("untrusted-worker-port").unwrap_or(DEFAULT_UNTRUSTED_PORT); + let mu_ra_port = m.value_of("mu-ra-port").unwrap_or(DEFAULT_MU_RA_PORT); + Self::new( - m.value_of("node-server").unwrap_or("ws://127.0.0.1").into(), - m.value_of("node-port").unwrap_or("9944").into(), + m.value_of("node-server").unwrap_or(DEFAULT_NODE_SERVER).into(), + m.value_of("node-port").unwrap_or(DEFAULT_NODE_PORT).into(), if m.is_present("ws-external") { "0.0.0.0".into() } else { "127.0.0.1".into() }, - m.value_of("worker-rpc-port").unwrap_or("2000").into(), - m.value_of("mu-ra-port").unwrap_or("3443").into(), + m.value_of("trusted-external-address") + .map(|url| add_port_if_necessary(url, trusted_port)), + trusted_port.to_string(), + m.value_of("untrusted-external-address") + .map(|url| add_port_if_necessary(url, untrusted_port)), + untrusted_port.to_string(), + m.value_of("mu-ra-external-address") + .map(|url| add_port_if_necessary(url, mu_ra_port)), + mu_ra_port.to_string(), ) } } + +fn add_port_if_necessary(url: &str, port: &str) -> String { + // [Option("ws(s)"), ip, Option(port)] + match url.split(':').count() { + 3 => url.to_string(), + 2 => { + if url.contains("ws") { + // url is of format ws://127.0.0.1, no port added + format!("{}:{}", url, port) + } else { + // url is of format 127.0.0.1:4000, port was added + url.to_string() + } + }, + 1 => format!("{}:{}", url, port), + _ => panic!("Invalid worker url format in url input {:?}", url), + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::collections::HashMap; + + #[test] + fn check_correct_config_assignment_for_empty_input() { + let empty_args = ArgMatches::default(); + let config = Config::from(&empty_args); + let expected_worker_ip = "127.0.0.1"; + + assert_eq!(config.node_ip, DEFAULT_NODE_SERVER); + assert_eq!(config.node_port, DEFAULT_NODE_PORT); + assert_eq!(config.trusted_worker_port, DEFAULT_TRUSTED_PORT); + assert_eq!(config.untrusted_worker_port, DEFAULT_UNTRUSTED_PORT); + assert_eq!(config.mu_ra_port, DEFAULT_MU_RA_PORT); + assert_eq!(config.worker_ip, expected_worker_ip); + assert!(config.trusted_external_worker_address.is_none()); + assert!(config.untrusted_external_worker_address.is_none()); + assert!(config.mu_ra_external_address.is_none()); + } + + #[test] + fn worker_ip_is_set_correcty_for_set_ws_external_flag() { + let expected_worker_ip = "0.0.0.0"; + + let mut args = ArgMatches::default(); + args.args = HashMap::from([("ws-external", Default::default())]); + let config = Config::from(&args); + + assert_eq!(config.worker_ip, expected_worker_ip); + } + + #[test] + fn check_correct_config_assignment_for_given_input() { + let node_ip = "ws://12.1.58.1"; + let node_port = "111111"; + let trusted_ext_addr = "wss://1.1.1.2:700"; + let trusted_port = "7119"; + let untrusted_ext_addr = "ws://1.723.3.1:11"; + let untrusted_port = "9119"; + let mu_ra_ext_addr = "1.1.3.1:1000"; + let mu_ra_port = "99"; + + let mut args = ArgMatches::default(); + args.args = HashMap::from([ + ("node-server", Default::default()), + ("node-port", Default::default()), + ("ws-external", Default::default()), + ("trusted-external-address", Default::default()), + ("untrusted-external-address", Default::default()), + ("mu-ra-external-address", Default::default()), + ("mu-ra-port", Default::default()), + ("untrusted-worker-port", Default::default()), + ("trusted-worker-port", Default::default()), + ]); + // Workaround because MatchedArg is private. + args.args.get_mut("node-server").unwrap().vals = vec![node_ip.into()]; + args.args.get_mut("node-port").unwrap().vals = vec![node_port.into()]; + args.args.get_mut("trusted-external-address").unwrap().vals = vec![trusted_ext_addr.into()]; + args.args.get_mut("untrusted-external-address").unwrap().vals = + vec![untrusted_ext_addr.into()]; + args.args.get_mut("mu-ra-external-address").unwrap().vals = vec![mu_ra_ext_addr.into()]; + args.args.get_mut("mu-ra-port").unwrap().vals = vec![mu_ra_port.into()]; + args.args.get_mut("untrusted-worker-port").unwrap().vals = vec![untrusted_port.into()]; + args.args.get_mut("trusted-worker-port").unwrap().vals = vec![trusted_port.into()]; + + let config = Config::from(&args); + + assert_eq!(config.node_ip, node_ip); + assert_eq!(config.node_port, node_port); + assert_eq!(config.trusted_worker_port, trusted_port); + assert_eq!(config.untrusted_worker_port, untrusted_port); + assert_eq!(config.mu_ra_port, mu_ra_port); + assert_eq!(config.trusted_external_worker_address, Some(trusted_ext_addr.to_string())); + assert_eq!(config.untrusted_external_worker_address, Some(untrusted_ext_addr.to_string())); + assert_eq!(config.mu_ra_external_address, Some(mu_ra_ext_addr.to_string())); + } + + #[test] + fn external_addresses_are_returned_correctly_if_not_set() { + let trusted_port = "7119"; + let untrusted_port = "9119"; + let mu_ra_port = "99"; + let expected_worker_ip = "127.0.0.1"; + + let mut args = ArgMatches::default(); + args.args = HashMap::from([ + ("mu-ra-port", Default::default()), + ("untrusted-worker-port", Default::default()), + ("trusted-worker-port", Default::default()), + ]); + // Workaround because MatchedArg is private. + args.args.get_mut("mu-ra-port").unwrap().vals = vec![mu_ra_port.into()]; + args.args.get_mut("untrusted-worker-port").unwrap().vals = vec![untrusted_port.into()]; + args.args.get_mut("trusted-worker-port").unwrap().vals = vec![trusted_port.into()]; + + let config = Config::from(&args); + + assert_eq!( + config.trusted_worker_url_external(), + format!("wss://{}:{}", expected_worker_ip, trusted_port) + ); + assert_eq!( + config.untrusted_worker_url_external(), + format!("ws://{}:{}", expected_worker_ip, untrusted_port) + ); + assert_eq!(config.mu_ra_url_external(), format!("{}:{}", expected_worker_ip, mu_ra_port)); + } + + #[test] + fn external_addresses_are_returned_correctly_if_set() { + let trusted_ext_addr = "wss://1.1.1.2:700"; + let untrusted_ext_addr = "ws://1.723.3.1:11"; + let mu_ra_ext_addr = "1.1.3.1:1000"; + + let mut args = ArgMatches::default(); + args.args = HashMap::from([ + ("trusted-external-address", Default::default()), + ("untrusted-external-address", Default::default()), + ("mu-ra-external-address", Default::default()), + ]); + // Workaround because MatchedArg is private. + args.args.get_mut("trusted-external-address").unwrap().vals = vec![trusted_ext_addr.into()]; + args.args.get_mut("untrusted-external-address").unwrap().vals = + vec![untrusted_ext_addr.into()]; + args.args.get_mut("mu-ra-external-address").unwrap().vals = vec![mu_ra_ext_addr.into()]; + + let config = Config::from(&args); + + assert_eq!(config.trusted_worker_url_external(), trusted_ext_addr); + assert_eq!(config.untrusted_worker_url_external(), untrusted_ext_addr); + assert_eq!(config.mu_ra_url_external(), mu_ra_ext_addr); + } + + #[test] + fn ensure_no_port_is_added_to_url_with_port() { + let url = "ws://hello:4000"; + let port = "0"; + + let resulting_url = add_port_if_necessary(url, port); + + assert_eq!(resulting_url, url); + } + + #[test] + fn ensure_port_is_added_to_url_without_port() { + let url = "wss://hello"; + let port = "0"; + + let resulting_url = add_port_if_necessary(url, port); + + assert_eq!(resulting_url, format!("{}:{}", url, port)); + } + + #[test] + fn ensure_no_port_is_added_to_url_with_port_without_prefix() { + let url = "hello:10001"; + let port = "012"; + + let resulting_url = add_port_if_necessary(url, port); + + assert_eq!(resulting_url, url); + } + + #[test] + fn ensure_port_is_added_to_url_without_port_without_prefix() { + let url = "hello_world"; + let port = "10"; + + let resulting_url = add_port_if_necessary(url, port); + + assert_eq!(resulting_url, format!("{}:{}", url, port)); + } +} diff --git a/service/src/enclave/api.rs b/service/src/enclave/api.rs index b315f09637..48a900c89b 100644 --- a/service/src/enclave/api.rs +++ b/service/src/enclave/api.rs @@ -15,6 +15,7 @@ */ +use crate::config::Config; use itp_enclave_api::{ enclave_base::EnclaveBase, error::Error as EnclaveApiError, Enclave, EnclaveResult, }; @@ -26,7 +27,7 @@ use sgx_urts::SgxEnclave; use std::io::{Read, Write}; use std::{fs::File, path::PathBuf}; -pub fn enclave_init() -> EnclaveResult { +pub fn enclave_init(config: &Config) -> EnclaveResult { const LEN: usize = 1024; let mut launch_token = [0; LEN]; let mut launch_token_updated = 0; @@ -102,7 +103,7 @@ pub fn enclave_init() -> EnclaveResult { // create an enclave API and initialize it let enclave_api = Enclave::new(enclave); - enclave_api.init()?; + enclave_api.init(&config.mu_ra_url_external(), &config.untrusted_worker_url_external())?; Ok(enclave_api) } diff --git a/service/src/error.rs b/service/src/error.rs index a039b510ca..cf51065c4d 100644 --- a/service/src/error.rs +++ b/service/src/error.rs @@ -1,6 +1,8 @@ use codec::Error as CodecError; use substrate_api_client::ApiClientError; +pub type ServiceResult = Result; + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("{0}")] @@ -11,6 +13,8 @@ pub enum Error { ApiSubscriptionDisconnected(#[from] std::sync::mpsc::RecvError), #[error("Enclave API error: {0}")] EnclaveApi(#[from] itp_enclave_api::error::Error), + #[error("Trusted Rpc Client error: {0}")] + TrustedRpcClient(#[from] itc_rpc_client::error::Error), #[error("{0}")] JsonRpSeeClient(#[from] jsonrpsee::types::Error), #[error("{0}")] diff --git a/service/src/global_peer_updater.rs b/service/src/global_peer_updater.rs new file mode 100644 index 0000000000..50c67686c4 --- /dev/null +++ b/service/src/global_peer_updater.rs @@ -0,0 +1,64 @@ +/* + Copyright 2021 Integritee AG and Supercomputing Systems AG + Copyright (C) 2017-2019 Baidu, Inc. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +use crate::{ + error::Error, + globals::worker::GetMutWorker, + worker::{UpdatePeers, WorkerResult}, +}; +use log::*; +use std::sync::Arc; + +#[cfg(test)] +use mockall::predicate::*; +#[cfg(test)] +use mockall::*; + +/// Updates the peers of the global worker. +#[cfg_attr(test, automock)] +pub trait UpdateWorkerPeers { + fn update_peers(&self) -> WorkerResult<()>; +} + +pub struct GlobalPeerUpdater { + worker: Arc, +} + +impl GlobalPeerUpdater { + pub fn new(worker: Arc) -> Self { + GlobalPeerUpdater { worker } + } +} + +// FIXME: We should write unit tests for this one here - but with the global worker struct, which is not yet made to be mocked, +// this would require a lot of changes. +impl UpdateWorkerPeers for GlobalPeerUpdater +where + Worker: GetMutWorker, +{ + fn update_peers(&self) -> WorkerResult<()> { + let maybe_worker = &mut *self.worker.get_mut_worker(); + match maybe_worker { + Some(w) => w.update_peers(), + None => { + error!("Failed to get worker instance"); + Err(Error::ApplicationSetup) + }, + } + } +} diff --git a/service/src/globals/tokio_handle.rs b/service/src/globals/tokio_handle.rs index 780c992cdf..e1ab5bfb1b 100644 --- a/service/src/globals/tokio_handle.rs +++ b/service/src/globals/tokio_handle.rs @@ -77,7 +77,7 @@ mod tests { let handle = GlobalTokioHandle.get_handle(); - let result = handle.spawn_blocking(|| format!("now running on a worker thread")).await; + let result = handle.spawn_blocking(|| "now running on a worker thread").await; assert!(result.is_ok()); assert!(!result.unwrap().is_empty()) diff --git a/service/src/globals/worker.rs b/service/src/globals/worker.rs index 49ff664b2e..2acc93f525 100644 --- a/service/src/globals/worker.rs +++ b/service/src/globals/worker.rs @@ -17,29 +17,32 @@ */ use crate::{config::Config, worker::Worker as WorkerGen}; -use itc_rpc_client::direct_client::DirectClient; use itp_enclave_api::Enclave; use lazy_static::lazy_static; -use parking_lot::{RwLock, RwLockReadGuard}; +use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use sp_core::sr25519; use substrate_api_client::{rpc::WsRpcClient, Api}; -pub type Worker = WorkerGen, Enclave, DirectClient>; +pub type Worker = WorkerGen, Enclave>; lazy_static! { static ref WORKER: RwLock> = RwLock::new(None); } -/// Trait for accessing a worker instance -/// Prefer injecting this trait instead of using the associated functions of WorkerAccessorImpl +/// Trait for accessing a worker instance. pub trait GetWorker { fn get_worker<'a>(&self) -> RwLockReadGuard<'a, Option>; } +/// Trait for accessing a muteable worker instance. +pub trait GetMutWorker { + fn get_mut_worker<'a>(&self) -> RwLockWriteGuard<'a, Option>; +} + pub struct GlobalWorker; -/// these are the static (global) accessors -/// reduce their usage where possible and use an instance of WorkerAccessorImpl or the trait +/// These are the static (global) accessors. +/// Reduce their usage where possible and use an instance of WorkerAccessorImpl or the trait. impl GlobalWorker { pub fn reset_worker(worker: Worker) { *WORKER.write() = Some(worker); @@ -48,6 +51,10 @@ impl GlobalWorker { fn read_worker<'a>() -> RwLockReadGuard<'a, Option> { WORKER.read() } + + fn write_worker<'a>() -> RwLockWriteGuard<'a, Option> { + WORKER.write() + } } impl GetWorker for GlobalWorker { @@ -55,3 +62,9 @@ impl GetWorker for GlobalWorker { GlobalWorker::read_worker() } } + +impl GetMutWorker for GlobalWorker { + fn get_mut_worker<'a>(&self) -> RwLockWriteGuard<'a, Option> { + GlobalWorker::write_worker() + } +} diff --git a/service/src/main.rs b/service/src/main.rs index 2b146cf14a..504fae5954 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -16,6 +16,7 @@ */ use crate::{ error::Error, + global_peer_updater::GlobalPeerUpdater, globals::{ tokio_handle::{GetTokioHandle, GlobalTokioHandle}, worker::{GlobalWorker, Worker}, @@ -27,7 +28,6 @@ use crate::{ parentchain_block_syncer::{ParentchainBlockSyncer, SyncParentchainBlocks}, sync_block_gossiper::SyncBlockGossiper, utils::{check_files, extract_shard}, - worker::worker_url_into_async_rpc_url, }; use base58::ToBase58; use clap::{load_yaml, App}; @@ -38,7 +38,6 @@ use enclave::{ tls_ra::{enclave_request_key_provisioning, enclave_run_key_provisioning_server}, }; use futures::executor::block_on; -use itc_rpc_client::direct_client::DirectClient; use itp_api_client_extensions::{AccountApi, ChainApi, PalletTeerexApi}; use itp_enclave_api::{ direct_request::DirectRequest, @@ -87,10 +86,12 @@ use teerex_primitives::ShardIdentifier; mod config; mod enclave; mod error; +mod global_peer_updater; mod globals; mod node_api_factory; mod ocall_bridge; mod parentchain_block_syncer; +mod request_keys; mod sync_block_gossiper; mod tests; mod utils; @@ -106,7 +107,7 @@ fn main() { let yml = load_yaml!("cli.yml"); let matches = App::from_yaml(yml).get_matches(); - let mut config = Config::from(&matches); + let config = Config::from(&matches); GlobalTokioHandle::initialize(); @@ -120,13 +121,15 @@ fn main() { // build the entire dependency tree let worker = Arc::new(GlobalWorker {}); let tokio_handle = Arc::new(GlobalTokioHandle {}); - let sync_block_gossiper = Arc::new(SyncBlockGossiper::new(tokio_handle.clone(), worker)); + let sync_block_gossiper = + Arc::new(SyncBlockGossiper::new(tokio_handle.clone(), worker.clone())); + let peer_updater = Arc::new(GlobalPeerUpdater::new(worker)); let sidechain_blockstorage = Arc::new( SidechainStorageLock::::new(PathBuf::from(&SIDECHAIN_STORAGE_PATH)) .unwrap(), ); let node_api_factory = Arc::new(GlobalUrlNodeApiFactory::new(config.node_url())); - let enclave = Arc::new(enclave_init().unwrap()); + let enclave = Arc::new(enclave_init(&config).unwrap()); // initialize o-call bridge with a concrete factory implementation OCallBridge::initialize(Arc::new(OCallBridgeComponentFactory::new( @@ -134,19 +137,12 @@ fn main() { sync_block_gossiper, enclave.clone(), sidechain_blockstorage.clone(), + peer_updater, ))); if let Some(smatches) = matches.subcommand_matches("run") { let shard = extract_shard(smatches, enclave.as_ref()); - // Todo: Is this deprecated?? It is only used in remote attestation. - config.set_ext_api_url( - smatches - .value_of("w-server") - .map(ToString::to_string) - .unwrap_or_else(|| format!("ws://127.0.0.1:{}", config.worker_rpc_port)), - ); - println!("Worker Config: {:?}", config); let skip_ra = smatches.is_present("skip-ra"); let dev = smatches.is_present("dev"); @@ -157,7 +153,7 @@ fn main() { config.clone(), node_api.clone(), enclave.clone(), - DirectClient::new(config.worker_url()), + Vec::new(), )); start_worker( @@ -171,9 +167,14 @@ fn main() { tokio_handle, ); } else if let Some(smatches) = matches.subcommand_matches("request-keys") { - let shard = extract_shard(smatches, enclave.as_ref()); - let provider_url = smatches.value_of("provider").expect("provider must be specified"); - request_keys(provider_url, &shard, enclave.as_ref(), smatches.is_present("skip-ra")); + println!("*** Requesting keys from a registered worker \n"); + let node_api = node_api_factory.create_api().set_signer(AccountKeyring::Alice.pair()); + request_keys::request_keys( + &node_api, + &extract_shard(smatches, enclave.as_ref()), + enclave.as_ref(), + smatches.is_present("skip-ra"), + ); } else if matches.is_present("shielding-key") { info!("*** Get the public key from the TEE\n"); let pubkey = enclave.get_rsa_shielding_pubkey().unwrap(); @@ -212,7 +213,7 @@ fn main() { enclave_run_key_provisioning_server( enclave.as_ref(), sgx_quote_sign_type_t::SGX_UNLINKABLE_SIGNATURE, - &format!("localhost:{}", config.worker_mu_ra_port), + &config.mu_ra_url(), _matches.is_present("skip-ra"), ); println!("[+] Done!"); @@ -221,7 +222,7 @@ fn main() { enclave_request_key_provisioning( enclave.as_ref(), sgx_quote_sign_type_t::SGX_UNLINKABLE_SIGNATURE, - &format!("localhost:{}", config.worker_mu_ra_port), + &config.mu_ra_url_external(), _matches.is_present("skip-ra"), ) .unwrap(); @@ -268,7 +269,7 @@ fn start_worker( // ------------------------------------------------------------------------ // let new workers call us for key provisioning - println!("MU-RA server listening on ws://{}", config.mu_ra_url()); + println!("MU-RA server listening on {}", config.mu_ra_url()); let ra_url = config.mu_ra_url(); let enclave_api_key_prov = enclave.clone(); thread::spawn(move || { @@ -281,12 +282,12 @@ fn start_worker( }); // ------------------------------------------------------------------------ - // start worker api direct invocation server - let direct_invocation_server_addr = config.worker_url(); + // Start trusted worker rpc server. + let direct_invocation_server_addr = config.trusted_worker_url_internal(); let enclave_for_direct_invocation = enclave.clone(); thread::spawn(move || { println!( - "[+] RPC direction invocation server listening on wss://{}", + "[+] Trusted RPC direction invocation server listening on {}", direct_invocation_server_addr ); enclave_for_direct_invocation @@ -295,43 +296,43 @@ fn start_worker( println!("[+] RPC direction invocation server shut down"); }); - // listen for sidechain_block import request. Later the `start_worker_api_direct_server` - // should be merged into this one. - let url = worker_url_into_async_rpc_url(&config.worker_url()).unwrap(); - + // ------------------------------------------------------------------------ + // Start untrusted worker rpc server. let handle = tokio_handle.get_handle(); + // FIXME: this should be removed - this server should only handle untrusted things. + // i.e move sidechain block importing to trusted worker. let enclave_for_block_gossip_rpc_server = enclave.clone(); + let untrusted_url = config.untrusted_worker_url(); + println!("[+] Untrusted RPC server listening on {}", &untrusted_url); handle.spawn(async move { - itc_rpc_server::run_server(&url, enclave_for_block_gossip_rpc_server) + itc_rpc_server::run_server(&untrusted_url, enclave_for_block_gossip_rpc_server) .await .unwrap() }); + // ------------------------------------------------------------------------ - // start the substrate-api-client to communicate with the node + // Start the substrate-api-client to communicate with the node. let genesis_hash = node_api.genesis_hash.as_bytes().to_vec(); let tee_accountid = enclave_account(enclave.as_ref()); // ------------------------------------------------------------------------ - // perform a remote attestation and get an unchecked extrinsic back + // Perform a remote attestation and get an unchecked extrinsic back. - // get enclaves's account nonce let nonce = node_api.get_nonce_of(&tee_accountid).unwrap(); info!("Enclave nonce = {:?}", nonce); enclave .set_nonce(nonce) .expect("Could not set nonce of enclave. Returning here..."); - + let trusted_url = config.trusted_worker_url_external(); let uxt = if skip_ra { println!( "[!] skipping remote attestation. Registering enclave without attestation report." ); - enclave - .mock_register_xt(node_api.genesis_hash, nonce, &config.ext_api_url.unwrap()) - .unwrap() + enclave.mock_register_xt(node_api.genesis_hash, nonce, &trusted_url).unwrap() } else { enclave - .perform_ra(genesis_hash, nonce, config.ext_api_url.unwrap().as_bytes().to_vec()) + .perform_ra(genesis_hash, nonce, trusted_url.as_bytes().to_vec()) .unwrap() }; @@ -489,32 +490,6 @@ where } } -fn request_keys( - provider_url: &str, - _shard: &ShardIdentifier, - enclave_api: &E, - skip_ra: bool, -) { - // FIXME: we now assume that keys are equal for all shards - - // initialize the enclave - #[cfg(feature = "production")] - println!("*** Starting enclave in production mode"); - #[cfg(not(feature = "production"))] - println!("*** Starting enclave in development mode"); - - println!("Requesting key provisioning from worker at {}", provider_url); - - enclave_request_key_provisioning( - enclave_api, - sgx_quote_sign_type_t::SGX_UNLINKABLE_SIGNATURE, - provider_url, - skip_ra, - ) - .unwrap(); - println!("key provisioning successfully performed"); -} - type Events = Vec>; fn parse_events(event: String) -> Result { @@ -771,7 +746,7 @@ fn bootstrap_funds_from_alice( if funding_amount > alice_free { println!( - "funding amount is to high: please change MIN_FUND_INCREASE_FACTOR ({:?})", + "funding amount is to high: please change EXISTENTIAL_DEPOSIT_FACTOR_FOR_INIT_FUNDS ({:?})", funding_amount ); return Err(Error::ApplicationSetup) diff --git a/service/src/ocall_bridge/component_factory.rs b/service/src/ocall_bridge/component_factory.rs index 46a2a5692d..91f41620f7 100644 --- a/service/src/ocall_bridge/component_factory.rs +++ b/service/src/ocall_bridge/component_factory.rs @@ -17,6 +17,7 @@ */ use crate::{ + global_peer_updater::UpdateWorkerPeers, node_api_factory::CreateNodeApi, ocall_bridge::{ bridge_api::{ @@ -38,37 +39,53 @@ use std::sync::Arc; /// Concrete implementation, should be moved out of the OCall Bridge, into the worker /// since the OCall bridge itself should not know any concrete types to ensure /// our dependency graph is worker -> ocall bridge -pub struct OCallBridgeComponentFactory { - node_api_factory: Arc, - block_gossiper: Arc, - enclave_api: Arc, - block_storage: Arc, +pub struct OCallBridgeComponentFactory { + node_api_factory: Arc, + block_gossiper: Arc, + enclave_api: Arc, + block_storage: Arc, + peer_updater: Arc, } -impl OCallBridgeComponentFactory { +impl + OCallBridgeComponentFactory +{ pub fn new( - node_api_factory: Arc, - block_gossiper: Arc, - enclave_api: Arc, - block_storage: Arc, + node_api_factory: Arc, + block_gossiper: Arc, + enclave_api: Arc, + block_storage: Arc, + peer_updater: Arc, ) -> Self { - OCallBridgeComponentFactory { node_api_factory, block_gossiper, enclave_api, block_storage } + OCallBridgeComponentFactory { + node_api_factory, + block_gossiper, + enclave_api, + block_storage, + peer_updater, + } } } -impl GetOCallBridgeComponents for OCallBridgeComponentFactory +impl GetOCallBridgeComponents + for OCallBridgeComponentFactory where - N: CreateNodeApi + 'static, - B: GossipBlocks + 'static, - E: RemoteAttestationCallBacks + 'static, - D: BlockStorage + 'static, + NodeApi: CreateNodeApi + 'static, + Gossiper: GossipBlocks + 'static, + EnclaveApi: RemoteAttestationCallBacks + 'static, + Storage: BlockStorage + 'static, + PeerUpdater: UpdateWorkerPeers + 'static, { fn get_ra_api(&self) -> Arc { Arc::new(RemoteAttestationOCall::new(self.enclave_api.clone())) } fn get_sidechain_api(&self) -> Arc { - Arc::new(SidechainOCall::new(self.block_gossiper.clone(), self.block_storage.clone())) + Arc::new(SidechainOCall::new( + self.block_gossiper.clone(), + self.block_storage.clone(), + self.peer_updater.clone(), + )) } fn get_oc_api(&self) -> Arc { diff --git a/service/src/ocall_bridge/sidechain_ocall.rs b/service/src/ocall_bridge/sidechain_ocall.rs index 0354b80349..8cd26d96c8 100644 --- a/service/src/ocall_bridge/sidechain_ocall.rs +++ b/service/src/ocall_bridge/sidechain_ocall.rs @@ -17,6 +17,7 @@ */ use crate::{ + global_peer_updater::UpdateWorkerPeers, ocall_bridge::bridge_api::{OCallBridgeError, OCallBridgeResult, SidechainBridge}, sync_block_gossiper::GossipBlocks, }; @@ -26,21 +27,28 @@ use its_storage::BlockStorage; use log::*; use std::sync::Arc; -pub struct SidechainOCall { - block_gossiper: Arc, - block_storage: Arc, +pub struct SidechainOCall { + block_gossiper: Arc, + block_storage: Arc, + peer_updater: Arc, } -impl SidechainOCall { - pub fn new(block_gossiper: Arc, block_storage: Arc) -> Self { - SidechainOCall { block_gossiper, block_storage } +impl SidechainOCall { + pub fn new( + block_gossiper: Arc, + block_storage: Arc, + peer_updater: Arc, + ) -> Self { + SidechainOCall { block_gossiper, block_storage, peer_updater } } } -impl SidechainBridge for SidechainOCall +impl SidechainBridge + for SidechainOCall where - S: GossipBlocks, - D: BlockStorage, + BlockGossiper: GossipBlocks, + Storage: BlockStorage, + PeerUpdater: UpdateWorkerPeers, { fn propose_sidechain_blocks(&self, signed_blocks_encoded: Vec) -> OCallBridgeResult<()> { // TODO: improve error handling, using a mut status is not good design? @@ -67,6 +75,12 @@ where debug!("Enclave did not produce sidechain blocks"); } + if let Err(e) = self.peer_updater.update_peers() { + error!("Error updating peers: {:?}", e); + // Fixme: returning an error here results in a `HeaderAncestryMismatch` error. + // status = sgx_status_t::SGX_ERROR_UNEXPECTED; + } + if let Err(e) = self.block_gossiper.gossip_blocks(signed_blocks) { error!("Error gossiping blocks: {:?}", e); // Fixme: returning an error here results in a `HeaderAncestryMismatch` error. diff --git a/service/src/request_keys.rs b/service/src/request_keys.rs new file mode 100644 index 0000000000..93c66c3ca1 --- /dev/null +++ b/service/src/request_keys.rs @@ -0,0 +1,73 @@ +/* + Copyright 2021 Integritee AG and Supercomputing Systems AG + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +//! Request state keys from a fellow validateer. + +use crate::{ + enclave::tls_ra::enclave_request_key_provisioning, + error::{Error, ServiceResult as Result}, +}; +use futures::executor; +use itc_rpc_client::direct_client::{DirectApi, DirectClient as DirectWorkerApi}; +use itp_api_client_extensions::PalletTeerexApi; +use itp_enclave_api::remote_attestation::TlsRemoteAttestation; +use itp_types::ShardIdentifier; +use sgx_types::sgx_quote_sign_type_t; +use std::string::String; + +pub(crate) fn request_keys( + node_api: &NodeApi, + shard: &ShardIdentifier, + enclave_api: &E, + skip_ra: bool, +) { + // FIXME: we now assume that keys are equal for all shards + + // Initialize the enclave. + #[cfg(feature = "production")] + println!("*** Starting enclave in production mode"); + #[cfg(not(feature = "production"))] + println!("*** Starting enclave in development mode"); + + let provider_url = + executor::block_on(get_author_url_of_last_finalized_sidechain_block(node_api, shard)) + .unwrap(); + println!("Requesting key provisioning from worker at {}", &provider_url); + + enclave_request_key_provisioning( + enclave_api, + sgx_quote_sign_type_t::SGX_UNLINKABLE_SIGNATURE, + &provider_url, + skip_ra, + ) + .unwrap(); + println!("[+] Key provisioning successfully performed."); +} + +/// Returns the url of the last sidechain block author that has been stored +/// in the parentchain state as "worker for shard". +/// +/// Note: The sidechainblock author will only change whenever a new parentchain block is +/// produced. And even then, it might be the same as the last block. So if several workers +/// are started in a timely manner, they will all get the same url. +async fn get_author_url_of_last_finalized_sidechain_block( + node_api: &NodeApi, + shard: &ShardIdentifier, +) -> Result { + let enclave = node_api.worker_for_shard(shard, None)?.ok_or(Error::EmptyValue)?; + let worker_api_direct = DirectWorkerApi::new(enclave.url); + Ok(worker_api_direct.get_mu_ra_url()?) +} diff --git a/service/src/sync_block_gossiper.rs b/service/src/sync_block_gossiper.rs index 541d636600..d1fde63f38 100644 --- a/service/src/sync_block_gossiper.rs +++ b/service/src/sync_block_gossiper.rs @@ -19,7 +19,7 @@ use crate::{ error::Error, globals::{tokio_handle::GetTokioHandle, worker::GetWorker}, - worker::{WorkerResult, WorkerT}, + worker::{AsyncBlockGossiper, WorkerResult}, }; use its_primitives::types::SignedBlock as SignedSidechainBlock; use log::*; diff --git a/service/src/tests/commons.rs b/service/src/tests/commons.rs index ff59285304..ec4414fef4 100644 --- a/service/src/tests/commons.rs +++ b/service/src/tests/commons.rs @@ -36,16 +36,22 @@ pub fn test_trusted_getter_signed(who: AccountKeyring) -> Getter { Getter::trusted(getter.sign(&KeyPair::Sr25519(who.pair()))) } -/// Local Worker config. Fields are the default values except for -/// the worker's rpc server. #[cfg(test)] -pub fn local_worker_config(worker_url: String) -> Config { - let mut url = worker_url.split(":"); +pub fn local_worker_config( + worker_url: String, + untrusted_worker_port: String, + mu_ra_port: String, +) -> Config { + let mut url = worker_url.split(':'); Config::new( Default::default(), Default::default(), url.next().unwrap().into(), + None, url.next().unwrap().into(), - Default::default(), + None, + untrusted_worker_port, + None, + mu_ra_port, ) } diff --git a/service/src/tests/mock.rs b/service/src/tests/mock.rs index 83fae20368..b89d41d896 100644 --- a/service/src/tests/mock.rs +++ b/service/src/tests/mock.rs @@ -8,8 +8,8 @@ pub const W2_URL: &str = "127.0.0.1:3333"; pub fn enclaves() -> Vec { vec![ - Enclave::new([0; 32].into(), [1; 32], 1, format!("ws://{}", W1_URL)), - Enclave::new([2; 32].into(), [3; 32], 2, format!("ws://{}", W2_URL)), + Enclave::new([0; 32].into(), [1; 32], 1, format!("wss://{}", W1_URL)), + Enclave::new([2; 32].into(), [3; 32], 2, format!("wss://{}", W2_URL)), ] } diff --git a/service/src/tests/mocks/enclave_api_mock.rs b/service/src/tests/mocks/enclave_api_mock.rs index b883cf1922..3e6751a197 100644 --- a/service/src/tests/mocks/enclave_api_mock.rs +++ b/service/src/tests/mocks/enclave_api_mock.rs @@ -26,7 +26,7 @@ use sp_runtime::traits::Header; pub struct EnclaveBaseMock; impl EnclaveBase for EnclaveBaseMock { - fn init(&self) -> EnclaveResult<()> { + fn init(&self, _mu_ra_url: &str, _untrusted_url: &str) -> EnclaveResult<()> { Ok(()) } diff --git a/service/src/tests/mocks/parentchain_api_mock.rs b/service/src/tests/mocks/parentchain_api_mock.rs index 3d21d4c9bf..d746b63e18 100644 --- a/service/src/tests/mocks/parentchain_api_mock.rs +++ b/service/src/tests/mocks/parentchain_api_mock.rs @@ -37,8 +37,7 @@ impl ParentchainApiMock { self.parentchain = (1..=number_of_blocks) .map(|n| { let header = ParentchainHeaderBuilder::default().with_number(n).build(); - let block = ParentchainBlockBuilder::default().with_header(header).build_signed(); - block + ParentchainBlockBuilder::default().with_header(header).build_signed() }) .collect(); self diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index 4e8e814021..c296fe6459 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -17,7 +17,7 @@ use clap::ArgMatches; -use crate::enclave::api::*; +use crate::{config::Config, enclave::api::*}; use self::ecalls::*; use itp_enclave_api::enclave_test::EnclaveTest; @@ -37,7 +37,8 @@ pub mod parentchain_block_syncer_test; pub fn run_enclave_tests(matches: &ArgMatches) { println!("*** Starting Test enclave"); - let enclave = enclave_init().unwrap(); + let config = Config::from(matches); + let enclave = enclave_init(&config).unwrap(); if matches.is_present("all") || matches.is_present("unit") { println!("Running unit Tests"); diff --git a/service/src/tests/worker.rs b/service/src/tests/worker.rs index e2c53e86a8..44f8dd642b 100644 --- a/service/src/tests/worker.rs +++ b/service/src/tests/worker.rs @@ -12,7 +12,7 @@ use crate::{ }; use std::sync::Arc; -type TestWorker = WorkerGen; +type TestWorker = WorkerGen; lazy_static! { static ref WORKER: RwLock> = RwLock::new(None); @@ -23,10 +23,10 @@ fn worker_rw_lock_works() { { let mut w = WORKER.write(); *w = Some(TestWorker::new( - local_worker_config(W2_URL.into()), + local_worker_config(W2_URL.into(), "10".to_string(), "20".to_string()), TestNodeApi, Arc::new(()), - (), + Vec::new(), )); } diff --git a/service/src/worker.rs b/service/src/worker.rs index 9eade158d2..9eae2c5042 100644 --- a/service/src/worker.rs +++ b/service/src/worker.rs @@ -4,36 +4,36 @@ /// from the main.rs should be covered by the worker struct here - hidden and split across /// multiple traits. use async_trait::async_trait; +use itc_rpc_client::direct_client::{DirectApi, DirectClient as DirectWorkerApi}; use itp_api_client_extensions::PalletTeerexApi; -use itp_types::Enclave as EnclaveMetadata; use its_primitives::types::SignedBlock as SignedSidechainBlock; use jsonrpsee::{ types::{to_json_value, traits::Client}, ws_client::WsClientBuilder, }; use log::*; -use std::num::ParseIntError; use crate::{config::Config, error::Error}; use std::sync::Arc; pub type WorkerResult = Result; -pub struct Worker { +pub type Url = String; +pub struct Worker { _config: Config, node_api: NodeApi, // todo: Depending on system design, all the api fields should be Arc // unused yet, but will be used when more methods are migrated to the worker _enclave_api: Arc, - _worker_api_direct: WorkerApiDirect, + peers: Vec, } -impl Worker { +impl Worker { pub fn new( _config: Config, node_api: NodeApi, _enclave_api: Arc, - _worker_api_direct: WorkerApiDirect, + peers: Vec, ) -> Self { - Self { _config, node_api, _enclave_api, _worker_api_direct } + Self { _config, node_api, _enclave_api, peers } } // will soon be used. @@ -44,21 +44,16 @@ impl Worker) -> WorkerResult<()>; - - /// Returns all enclave urls registered on the parentchain. - fn peers(&self) -> WorkerResult>; } #[async_trait] -impl WorkerT - for Worker +impl AsyncBlockGossiper for Worker where NodeApi: PalletTeerexApi + Send + Sync, Enclave: Send + Sync, - WorkerApiDirect: Send + Sync, { async fn gossip_blocks(&self, blocks: Vec) -> WorkerResult<()> { if blocks.is_empty() { @@ -66,16 +61,12 @@ where return Ok(()) } - let peers = self.peers()?; - debug!("Gossiping sidechain blocks to peers: {:?}", peers); let blocks_json = vec![to_json_value(blocks)?]; - for p in peers.iter() { - // Todo: once the two direct servers are merged, remove this. - let url = worker_url_into_async_rpc_url(&p.url)?; + for url in self.peers.iter() { trace!("Gossiping block to peer with address: {:?}", url); - // FIXME: Websocket connectionto a worker should stay once etablished. - let client = WsClientBuilder::default().build(&url).await?; + // FIXME: Websocket connection to a worker should stay, once etablished. + let client = WsClientBuilder::default().build(url).await?; let blocks = blocks_json.clone(); if let Err(e) = client.request::>("sidechain_importBlock", blocks.into()).await { @@ -84,35 +75,38 @@ where } Ok(()) } +} - fn peers(&self) -> WorkerResult> { - Ok(self.node_api.all_enclaves(None)?) +/// Looks for new peers and updates them. +pub trait UpdatePeers { + fn search_peers(&self) -> WorkerResult>; + fn set_peers(&mut self, peers: Vec) -> WorkerResult<()>; + fn update_peers(&mut self) -> WorkerResult<()> { + let peers = self.search_peers()?; + self.set_peers(peers) } } -/// Temporary method that transforms the workers rpc port of the direct api defined in rpc/direct_client -/// to the new version in rpc/server. Remove this, when all the methods have been migrated to the new one -/// in rpc/server. -pub fn worker_url_into_async_rpc_url(url: &str) -> WorkerResult { - // [Option("ws(s)"), //ip, port] - let mut url_vec: Vec<&str> = url.split(':').collect(); - match url_vec.len() { - 3 | 2 => (), - _ => return Err(Error::Custom("Invalid worker url format".into())), - }; - - let ip = if url_vec.len() == 3 { - format!("{}:{}", url_vec.remove(0), url_vec.remove(0)) - } else { - url_vec.remove(0).into() - }; - - let port: i32 = - url_vec.remove(0).parse().map_err(|e: ParseIntError| Error::Custom(e.into()))?; +impl UpdatePeers for Worker +where + NodeApi: PalletTeerexApi + Send + Sync, +{ + fn search_peers(&self) -> WorkerResult> { + let enclaves = self.node_api.all_enclaves(None)?; + let mut peer_urls = Vec::::new(); + for enclave in enclaves { + // FIXME: This is temporary only, as block gossiping should be moved to trusted ws server. + let worker_api_direct = DirectWorkerApi::new(enclave.url); + peer_urls.push(worker_api_direct.get_untrusted_worker_url()?); + } + Ok(peer_urls) + } - Ok(format!("{}:{}", ip, (port + 1))) + fn set_peers(&mut self, peers: Vec) -> WorkerResult<()> { + self.peers = peers; + Ok(()) + } } - #[cfg(test)] mod tests { use frame_support::assert_ok; @@ -128,7 +122,7 @@ mod tests { commons::local_worker_config, mock::{TestNodeApi, W1_URL, W2_URL}, }, - worker::{worker_url_into_async_rpc_url, Worker, WorkerT}, + worker::{AsyncBlockGossiper, Worker}, }; use std::sync::Arc; @@ -156,10 +150,17 @@ mod tests { #[tokio::test] async fn gossip_blocks_works() { init(); - run_server(worker_url_into_async_rpc_url(W1_URL).unwrap()).await.unwrap(); - run_server(worker_url_into_async_rpc_url(W2_URL).unwrap()).await.unwrap(); - - let worker = Worker::new(local_worker_config(W1_URL.into()), TestNodeApi, Arc::new(()), ()); + run_server(W1_URL).await.unwrap(); + run_server(W2_URL).await.unwrap(); + let untrusted_worker_port = "4000".to_string(); + let peers = vec![format!("ws://{}", W1_URL), format!("ws://{}", W2_URL)]; + + let worker = Worker::new( + local_worker_config(W1_URL.into(), untrusted_worker_port.clone(), "30".to_string()), + TestNodeApi, + Arc::new(()), + peers, + ); let resp = worker .gossip_blocks(vec![SidechainBlockBuilder::default().build_signed()])