diff --git a/Cargo.lock b/Cargo.lock index ee38b85..c4bb721 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,12 +34,12 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "backtrace" -version = "0.3.74" +version = "0.3.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" dependencies = [ "addr2line", - "cfg-if", + "cfg-if 1.0.0", "libc", "miniz_oxide", "object", @@ -56,6 +56,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "bank_threads" +version = "0.1.0" +dependencies = [ + "spawned-concurrency", + "spawned-rt", + "tracing", +] + [[package]] name = "base64" version = "0.21.7" @@ -70,9 +79,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.9.0" +version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" [[package]] name = "bumpalo" @@ -88,13 +97,19 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" -version = "1.2.21" +version = "1.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8691782945451c1c383942c4874dbe63814f61cb57ef773cda2972682b7bb3c0" +checksum = "5f4ac86a9e5bc1e2b3449ab9d7d3a6a405e3d1bb28d7b9be8614f55846ae3766" dependencies = [ "shlex", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -117,6 +132,78 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crossbeam" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" +dependencies = [ + "cfg-if 0.1.10", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" +dependencies = [ + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-deque" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20ff29ded3204c5106278a81a38f4b482636ed4fa1e6cfbeef193291beb29ed" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "crossbeam-utils", + "lazy_static", + "maybe-uninit", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-queue" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" +dependencies = [ + "cfg-if 0.1.10", + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -134,7 +221,7 @@ version = "0.8.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -145,9 +232,9 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "errno" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" +checksum = "cea14ef9355e3beab063703aa9dab15afd25f0667c341310c1e5274bb1d0da18" dependencies = [ "libc", "windows-sys 0.59.0", @@ -280,11 +367,11 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "r-efi", "wasi 0.14.2+wasi-0.2.4", @@ -394,21 +481,22 @@ dependencies = [ [[package]] name = "icu_collections" -version = "1.5.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47" dependencies = [ "displaydoc", + "potential_utf", "yoke", "zerofrom", "zerovec", ] [[package]] -name = "icu_locid" -version = "1.5.0" +name = "icu_locale_core" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a" dependencies = [ "displaydoc", "litemap", @@ -417,31 +505,11 @@ dependencies = [ "zerovec", ] -[[package]] -name = "icu_locid_transform" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_locid_transform_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_locid_transform_data" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7515e6d781098bf9f7205ab3fc7e9709d34554ae0b21ddbcb5febfa4bc7df11d" - [[package]] name = "icu_normalizer" -version = "1.5.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979" dependencies = [ "displaydoc", "icu_collections", @@ -449,67 +517,54 @@ dependencies = [ "icu_properties", "icu_provider", "smallvec", - "utf16_iter", - "utf8_iter", - "write16", "zerovec", ] [[package]] name = "icu_normalizer_data" -version = "1.5.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5e8338228bdc8ab83303f16b797e177953730f601a96c25d10cb3ab0daa0cb7" +checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" [[package]] name = "icu_properties" -version = "1.5.1" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b" dependencies = [ "displaydoc", "icu_collections", - "icu_locid_transform", + "icu_locale_core", "icu_properties_data", "icu_provider", - "tinystr", + "potential_utf", + "zerotrie", "zerovec", ] [[package]] name = "icu_properties_data" -version = "1.5.1" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85fb8799753b75aee8d2a21d7c14d9f38921b54b3dbda10f5a3c7a7b82dba5e2" +checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632" [[package]] name = "icu_provider" -version = "1.5.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af" dependencies = [ "displaydoc", - "icu_locid", - "icu_provider_macros", + "icu_locale_core", "stable_deref_trait", "tinystr", "writeable", "yoke", "zerofrom", + "zerotrie", "zerovec", ] -[[package]] -name = "icu_provider_macros" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "idna" version = "1.0.3" @@ -523,9 +578,9 @@ dependencies = [ [[package]] name = "idna_adapter" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" dependencies = [ "icu_normalizer", "icu_properties", @@ -571,9 +626,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.171" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "linux-raw-sys" @@ -583,9 +638,9 @@ checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" [[package]] name = "litemap" -version = "0.7.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" +checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" [[package]] name = "lock_api" @@ -612,12 +667,27 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "memoffset" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -626,9 +696,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miniz_oxide" -version = "0.8.5" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" +checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" dependencies = [ "adler2", ] @@ -710,8 +780,8 @@ version = "0.10.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da" dependencies = [ - "bitflags 2.9.0", - "cfg-if", + "bitflags 2.9.1", + "cfg-if 1.0.0", "foreign-types", "libc", "once_cell", @@ -770,7 +840,7 @@ version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "redox_syscall", "smallvec", @@ -804,17 +874,35 @@ dependencies = [ "tracing", ] +[[package]] +name = "ping_pong_threads" +version = "0.1.0" +dependencies = [ + "spawned-concurrency", + "spawned-rt", + "tracing", +] + [[package]] name = "pkg-config" version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "potential_utf" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" +dependencies = [ + "zerovec", +] + [[package]] name = "proc-macro2" -version = "1.0.94" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" dependencies = [ "unicode-ident", ] @@ -836,11 +924,11 @@ checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" [[package]] name = "redox_syscall" -version = "0.5.10" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" +checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", ] [[package]] @@ -939,7 +1027,7 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "errno", "libc", "linux-raw-sys", @@ -988,7 +1076,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "core-foundation", "core-foundation-sys", "libc", @@ -1066,9 +1154,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook-registry" -version = "1.4.2" +version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" dependencies = [ "libc", ] @@ -1084,15 +1172,15 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" +checksum = "8917285742e9f3e1683f0a9c4e6b57960b7314d0b08d30d1ecd426713ee2eee9" [[package]] name = "socket2" -version = "0.5.8" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" dependencies = [ "libc", "windows-sys 0.52.0", @@ -1111,6 +1199,7 @@ dependencies = [ name = "spawned-rt" version = "0.1.0" dependencies = [ + "crossbeam", "tokio", "tracing", "tracing-subscriber", @@ -1124,9 +1213,9 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" [[package]] name = "syn" -version = "2.0.100" +version = "2.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" +checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" dependencies = [ "proc-macro2", "quote", @@ -1173,9 +1262,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.19.1" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7437ac7763b9b123ccf33c338a5cc1bac6f69b45a136c19bdd8a65e3916435bf" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ "fastrand", "getrandom", @@ -1190,15 +1279,15 @@ version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "once_cell", ] [[package]] name = "tinystr" -version = "0.7.6" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b" dependencies = [ "displaydoc", "zerovec", @@ -1206,9 +1295,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.44.1" +version = "1.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" +checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" dependencies = [ "backtrace", "bytes", @@ -1347,6 +1436,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "updater_threads" +version = "0.1.0" +dependencies = [ + "futures", + "reqwest", + "spawned-concurrency", + "spawned-rt", + "tracing", +] + [[package]] name = "url" version = "2.5.4" @@ -1358,12 +1458,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "utf16_iter" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" - [[package]] name = "utf8_iter" version = "1.0.4" @@ -1412,7 +1506,7 @@ version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "once_cell", "rustversion", "wasm-bindgen-macro", @@ -1438,7 +1532,7 @@ version = "0.4.50" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "once_cell", "wasm-bindgen", @@ -1663,7 +1757,7 @@ version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "windows-sys 0.48.0", ] @@ -1673,26 +1767,20 @@ version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", ] -[[package]] -name = "write16" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" - [[package]] name = "writeable" -version = "0.5.5" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" [[package]] name = "yoke" -version = "0.7.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" +checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc" dependencies = [ "serde", "stable_deref_trait", @@ -1702,9 +1790,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.7.5" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" +checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6" dependencies = [ "proc-macro2", "quote", @@ -1733,11 +1821,22 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zerotrie" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + [[package]] name = "zerovec" -version = "0.10.4" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428" dependencies = [ "yoke", "zerofrom", @@ -1746,9 +1845,9 @@ dependencies = [ [[package]] name = "zerovec-derive" -version = "0.10.3" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 2ec1059..a6f68bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,14 @@ [workspace] members = [ "rt", - "examples/ping_pong", + "examples/bank", + "examples/bank_threads", "examples/name_server", "examples/name_server_with_error", + "examples/ping_pong", + "examples/ping_pong_threads", "examples/updater", - "examples/bank", + "examples/updater_threads", ] [workspace.dependencies] diff --git a/README.md b/README.md index 9037298..74cc3ae 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Spawned -Simple library for concurrent Rust. +Library for concurrent Rust. # Goals: @@ -7,10 +7,17 @@ Simple library for concurrent Rust. - Set coding guidelines to apply along LambdaClass repositories and codebase. - Starting point to ideate what we want for Concrete. +# Versions: + +Two versions exist in their own submodules: +- threads: no use of async/await. Just IO threads code. +- tasks: a runtime is required to run async/await code. The runtime is selected in `spawned_rt::tasks` module that abstracts it. + # Inspiration: - [Erlang](https://www.erlang.org/) - [Commonware.xyz](https://commonware.xyz) - [Ractor](https://slawlor.github.io/ractor/) +- [Actors with Tokio](https://ryhl.io/blog/actors-with-tokio/) - [Vale.dev](https://vale.dev/) - [Gleam](https://gleam.run/) \ No newline at end of file diff --git a/concurrency/README.md b/concurrency/README.md index ff74e4d..ef8799e 100644 --- a/concurrency/README.md +++ b/concurrency/README.md @@ -1,3 +1,7 @@ # Spawned concurrency Some basic traits and structs to implement à-la-Erlang concurrent code. +Currently two versions: + +- threads: no use of async/await. Just IO threads code +- tasks: a runtime is required to run async/await code. It uses `spawned_rt::tasks` module that abstracts the runtime. diff --git a/concurrency/src/error.rs b/concurrency/src/error.rs deleted file mode 100644 index a87f300..0000000 --- a/concurrency/src/error.rs +++ /dev/null @@ -1,11 +0,0 @@ -#[derive(Debug)] -pub enum GenServerError { - CallbackError, - ServerError, -} - -impl From> for GenServerError { - fn from(_value: spawned_rt::mpsc::SendError) -> Self { - Self::ServerError - } -} diff --git a/concurrency/src/lib.rs b/concurrency/src/lib.rs index ab6d8b0..28c3ee2 100644 --- a/concurrency/src/lib.rs +++ b/concurrency/src/lib.rs @@ -1,12 +1,5 @@ -//! λ-kit concurrency -//! Some basic traits and structs to implement À-la-Erlang concurrent code. +//! spawned concurrency +//! Some basic traits and structs to implement concurrent code à-la-Erlang. -mod error; -mod gen_server; -mod process; -mod time; - -pub use error::GenServerError; -pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; -pub use process::{Process, ProcessInfo, send}; -pub use time::send_after; +pub mod tasks; +pub mod threads; diff --git a/concurrency/src/tasks/error.rs b/concurrency/src/tasks/error.rs new file mode 100644 index 0000000..05653fa --- /dev/null +++ b/concurrency/src/tasks/error.rs @@ -0,0 +1,11 @@ +#[derive(Debug)] +pub enum GenServerError { + CallbackError, + ServerError, +} + +impl From> for GenServerError { + fn from(_value: spawned_rt::tasks::mpsc::SendError) -> Self { + Self::ServerError + } +} diff --git a/concurrency/src/gen_server.rs b/concurrency/src/tasks/gen_server.rs similarity index 97% rename from concurrency/src/gen_server.rs rename to concurrency/src/tasks/gen_server.rs index ecd63fd..1723d3c 100644 --- a/concurrency/src/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -1,10 +1,10 @@ -//! GernServer trait and structs to create an abstraction similar to Erlang gen_server. +//! GenServer trait and structs to create an abstraction similar to Erlang gen_server. //! See examples/name_server for a usage example. use futures::future::FutureExt as _; -use spawned_rt::{self as rt, JoinHandle, mpsc, oneshot}; +use spawned_rt::tasks::{self as rt, mpsc, oneshot, JoinHandle}; use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe}; -use crate::error::GenServerError; +use super::error::GenServerError; #[derive(Debug)] pub struct GenServerHandle { diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs new file mode 100644 index 0000000..93bcedb --- /dev/null +++ b/concurrency/src/tasks/mod.rs @@ -0,0 +1,12 @@ +//! spawned concurrency +//! Runtime tasks-based traits and structs to implement concurrent code à-la-Erlang. + +mod error; +mod gen_server; +mod process; +mod time; + +pub use error::GenServerError; +pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; +pub use process::{send, Process, ProcessInfo}; +pub use time::send_after; diff --git a/concurrency/src/process.rs b/concurrency/src/tasks/process.rs similarity index 97% rename from concurrency/src/process.rs rename to concurrency/src/tasks/process.rs index 8c984cd..b623d2b 100644 --- a/concurrency/src/process.rs +++ b/concurrency/src/tasks/process.rs @@ -1,10 +1,9 @@ //! Process trait and struct to create a process abstraction similar to Erlang processes. //! See examples/ping_pong for a usage example. +use spawned_rt::tasks::{self as rt, mpsc, JoinHandle}; use std::future::Future; -use spawned_rt::{self as rt, JoinHandle, mpsc}; - #[derive(Debug)] pub struct ProcessInfo { pub tx: mpsc::Sender, diff --git a/concurrency/src/tasks/time.rs b/concurrency/src/tasks/time.rs new file mode 100644 index 0000000..79752c8 --- /dev/null +++ b/concurrency/src/tasks/time.rs @@ -0,0 +1,21 @@ +use std::time::Duration; + +use spawned_rt::tasks::{self as rt, mpsc::Sender, JoinHandle}; + +use super::{GenServer, GenServerInMsg}; + +// Sends a message after a given period to the specified GenServer. The task terminates +// once the send has completed +pub fn send_after( + period: Duration, + tx: Sender>, + message: T::InMsg, +) -> JoinHandle<()> +where + T: GenServer + 'static, +{ + rt::spawn(async move { + rt::sleep(period).await; + let _ = tx.send(GenServerInMsg::Cast { message }); + }) +} diff --git a/concurrency/src/threads/error.rs b/concurrency/src/threads/error.rs new file mode 100644 index 0000000..735e37e --- /dev/null +++ b/concurrency/src/threads/error.rs @@ -0,0 +1,11 @@ +#[derive(Debug)] +pub enum GenServerError { + CallbackError, + ServerError, +} + +impl From> for GenServerError { + fn from(_value: spawned_rt::threads::mpsc::SendError) -> Self { + Self::ServerError + } +} diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs new file mode 100644 index 0000000..e759a0d --- /dev/null +++ b/concurrency/src/threads/gen_server.rs @@ -0,0 +1,179 @@ +//! GenServer trait and structs to create an abstraction similar to Erlang gen_server. +//! See examples/name_server for a usage example. +use spawned_rt::threads::{self as rt, mpsc, oneshot, JoinHandle}; +use std::{ + fmt::Debug, + panic::{catch_unwind, AssertUnwindSafe}, +}; + +use super::error::GenServerError; + +#[derive(Debug)] +pub struct GenServerHandle { + pub tx: mpsc::Sender>, + #[allow(unused)] + handle: JoinHandle<()>, +} + +impl GenServerHandle { + pub(crate) fn new(mut initial_state: G::State) -> Self { + let (tx, mut rx) = mpsc::channel::>(); + let tx_clone = tx.clone(); + let mut gen_server: G = GenServer::new(); + let handle = rt::spawn(move || { + if gen_server + .run(&tx_clone, &mut rx, &mut initial_state) + .is_err() + { + tracing::trace!("GenServer crashed") + }; + }); + GenServerHandle { tx, handle } + } + + pub fn sender(&self) -> mpsc::Sender> { + self.tx.clone() + } + + pub fn call(&mut self, message: G::InMsg) -> Result { + let (oneshot_tx, oneshot_rx) = oneshot::channel::>(); + self.tx.send(GenServerInMsg::Call { + sender: oneshot_tx, + message, + })?; + match oneshot_rx.recv() { + Ok(result) => result, + Err(_) => Err(GenServerError::ServerError), + } + } + + pub fn cast(&mut self, message: G::InMsg) -> Result<(), GenServerError> { + self.tx + .send(GenServerInMsg::Cast { message }) + .map_err(|_error| GenServerError::ServerError) + } +} + +pub enum GenServerInMsg { + Call { + sender: oneshot::Sender>, + message: A::InMsg, + }, + Cast { + message: A::InMsg, + }, +} + +pub enum CallResponse { + Reply(U), + Stop(U), +} + +pub enum CastResponse { + NoReply, + Stop, +} + +pub trait GenServer +where + Self: Send + Sized, +{ + type InMsg: Send + Sized; + type OutMsg: Send + Sized; + type State: Clone + Send; + type Error: Debug; + + fn new() -> Self; + + fn start(initial_state: Self::State) -> GenServerHandle { + GenServerHandle::new(initial_state) + } + + fn run( + &mut self, + tx: &mpsc::Sender>, + rx: &mut mpsc::Receiver>, + state: &mut Self::State, + ) -> Result<(), GenServerError> { + self.main_loop(tx, rx, state)?; + Ok(()) + } + + fn main_loop( + &mut self, + tx: &mpsc::Sender>, + rx: &mut mpsc::Receiver>, + state: &mut Self::State, + ) -> Result<(), GenServerError> { + loop { + if !self.receive(tx, rx, state)? { + break; + } + } + tracing::trace!("Stopping GenServer"); + Ok(()) + } + + fn receive( + &mut self, + tx: &mpsc::Sender>, + rx: &mut mpsc::Receiver>, + state: &mut Self::State, + ) -> Result { + let message = rx.recv().ok(); + + // Save current state in case of a rollback + let state_clone = state.clone(); + + let (keep_running, error) = match message { + Some(GenServerInMsg::Call { sender, message }) => { + let (keep_running, error, response) = + match catch_unwind(AssertUnwindSafe(|| self.handle_call(message, tx, state))) { + Ok(response) => match response { + CallResponse::Reply(response) => (true, None, Ok(response)), + CallResponse::Stop(response) => (false, None, Ok(response)), + }, + Err(error) => (true, Some(error), Err(GenServerError::CallbackError)), + }; + // Send response back + if sender.send(response).is_err() { + tracing::trace!("GenServer failed to send response back, client must have died") + }; + (keep_running, error) + } + Some(GenServerInMsg::Cast { message }) => { + match catch_unwind(AssertUnwindSafe(|| self.handle_cast(message, tx, state))) { + Ok(response) => match response { + CastResponse::NoReply => (true, None), + CastResponse::Stop => (false, None), + }, + Err(error) => (true, Some(error)), + } + } + None => { + // Channel has been closed; won't receive further messages. Stop the server. + (false, None) + } + }; + if let Some(error) = error { + tracing::trace!("Error in callback, reverting state - Error: '{error:?}'"); + // Restore initial state (ie. dismiss any change) + *state = state_clone; + }; + Ok(keep_running) + } + + fn handle_call( + &mut self, + message: Self::InMsg, + tx: &mpsc::Sender>, + state: &mut Self::State, + ) -> CallResponse; + + fn handle_cast( + &mut self, + _message: Self::InMsg, + _tx: &mpsc::Sender>, + state: &mut Self::State, + ) -> CastResponse; +} diff --git a/concurrency/src/threads/mod.rs b/concurrency/src/threads/mod.rs new file mode 100644 index 0000000..858dd52 --- /dev/null +++ b/concurrency/src/threads/mod.rs @@ -0,0 +1,11 @@ +//! spawned concurrency +//! IO threads-based traits and structs to implement concurrent code à-la-Erlang. + +mod error; +mod gen_server; +mod process; +mod time; + +pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; +pub use process::{send, Process, ProcessInfo}; +pub use time::send_after; diff --git a/concurrency/src/threads/process.rs b/concurrency/src/threads/process.rs new file mode 100644 index 0000000..3dfd87d --- /dev/null +++ b/concurrency/src/threads/process.rs @@ -0,0 +1,71 @@ +//! Process trait and struct to create a process abstraction similar to Erlang processes. +//! See examples/ping_pong for a usage example. + +use spawned_rt::threads::{self as rt, mpsc, JoinHandle}; + +#[derive(Debug)] +pub struct ProcessInfo { + pub tx: mpsc::Sender, + pub handle: JoinHandle<()>, +} + +impl ProcessInfo { + pub fn sender(&self) -> mpsc::Sender { + self.tx.clone() + } + + pub fn handle(self) -> JoinHandle<()> { + self.handle + } +} + +pub trait Process +where + Self: Send + Sync + Sized + 'static, +{ + fn spawn(mut self) -> ProcessInfo { + let (tx, mut rx) = mpsc::channel::(); + let tx_clone = tx.clone(); + let handle = rt::spawn(move || self.run(&tx_clone, &mut rx)); + ProcessInfo { tx, handle } + } + + fn run(&mut self, tx: &mpsc::Sender, rx: &mut mpsc::Receiver) { + self.init(tx); + self.main_loop(tx, rx); + } + + fn main_loop(&mut self, tx: &mpsc::Sender, rx: &mut mpsc::Receiver) { + loop { + if self.should_stop() { + break; + } + + self.receive(tx, rx); + } + } + + fn should_stop(&self) -> bool { + false + } + + fn init(&mut self, _tx: &mpsc::Sender) { + {} + } + + fn receive(&mut self, tx: &mpsc::Sender, rx: &mut mpsc::Receiver) -> T { + match rx.recv().ok() { + Some(message) => self.handle(message, tx), + None => todo!(), + } + } + + fn handle(&mut self, message: T, tx: &mpsc::Sender) -> T; +} + +pub fn send(tx: &mpsc::Sender, message: T) +where + T: Send, +{ + let _ = tx.send(message); +} diff --git a/concurrency/src/threads/time.rs b/concurrency/src/threads/time.rs new file mode 100644 index 0000000..034295f --- /dev/null +++ b/concurrency/src/threads/time.rs @@ -0,0 +1,21 @@ +use std::time::Duration; + +use spawned_rt::threads::{self as rt, mpsc::Sender, JoinHandle}; + +use super::gen_server::{GenServer, GenServerInMsg}; + +// Sends a message after a given period to the specified GenServer. The task terminates +// once the send has completed +pub fn send_after( + period: Duration, + tx: Sender>, + message: T::InMsg, +) -> JoinHandle<()> +where + T: GenServer + 'static, +{ + rt::spawn(move || { + rt::sleep(period); + let _ = tx.send(GenServerInMsg::Cast { message }); + }) +} diff --git a/examples/README.md b/examples/README.md index d944f56..97b021d 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,7 +1,11 @@ # Spawned examples Some examples to test runtime and concurrency: -- ping_pong: Simple example to test Process abstraction. -- name_server: Simple example to test GenServer abstraction. -- bank: A bit more complex example for GenServer. - +- ping_pong: Simple example to test Process abstraction using `tasks` implementation. +- ping_pong_threads: ping_pong example on `threads` implementation. +- name_server: Simple example to test GenServer abstraction using `tasks` implementation. +- name_server_with_error: Same name_server example with a deliverate error to check catching mechanism to prevent panicking on callback code. +- bank: A bit more complex example for GenServer using `tasks` implementation. +- bank_threads: bank example on `threads` implementation. +- updater: A "live" process that checks an url periodicaly using `tasks` implementation. +- updater_threads: updater example on `threads` implementation. \ No newline at end of file diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index db6e7a0..90f97eb 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -26,8 +26,8 @@ use std::collections::HashMap; use messages::{BankError, BankOutMessage}; use server::Bank; -use spawned_concurrency::GenServer as _; -use spawned_rt as rt; +use spawned_concurrency::tasks::GenServer as _; +use spawned_rt::tasks as rt; fn main() { rt::run(async { diff --git a/examples/bank/src/server.rs b/examples/bank/src/server.rs index d5e43bc..3218e00 100644 --- a/examples/bank/src/server.rs +++ b/examples/bank/src/server.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; -use spawned_concurrency::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; -use spawned_rt::mpsc::Sender; +use spawned_concurrency::tasks::{ + CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, +}; +use spawned_rt::tasks::mpsc::Sender; use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as OutMessage}; diff --git a/examples/bank_threads/Cargo.toml b/examples/bank_threads/Cargo.toml new file mode 100644 index 0000000..0f4f4e0 --- /dev/null +++ b/examples/bank_threads/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "bank_threads" +version = "0.1.0" +edition = "2021" + +[dependencies] +spawned-rt = { workspace = true } +spawned-concurrency = { workspace = true } +tracing = { workspace = true } + +[[bin]] +name = "bank_threads" +path = "src/main.rs" \ No newline at end of file diff --git a/examples/bank_threads/src/main.rs b/examples/bank_threads/src/main.rs new file mode 100644 index 0000000..c04a6ac --- /dev/null +++ b/examples/bank_threads/src/main.rs @@ -0,0 +1,90 @@ +//! Simple example to test concurrency/Process abstraction. +//! +//! Based on Joe's Armstrong book: Programming Erlang, Second edition +//! Section 22.1 - The Road to the Generic Server +//! +//! Erlang usage example: +//! 1> my_bank:start(). +//! {ok,<0.33.0>} +//! 2> my_bank:deposit("joe", 10). +//! not_a_customer +//! 3> my_bank:new_account("joe"). +//! {welcome,"joe"} +//! 4> my_bank:deposit("joe", 10). +//! {thanks,"joe",your_balance_is,10} +//! 5> my_bank:deposit("joe", 30). +//! {thanks,"joe",your_balance_is,40} +//! 6> my_bank:withdraw("joe", 15). +//! {thanks,"joe",your_balance_is,25} +//! 7> my_bank:withdraw("joe", 45). +//! {sorry,"joe",you_only_have,25,in_the_bank + +mod messages; +mod server; + +use std::collections::HashMap; + +use messages::{BankError, BankOutMessage}; +use server::Bank; +use spawned_concurrency::threads::GenServer as _; +use spawned_rt::threads as rt; + +fn main() { + rt::run(|| { + let mut name_server = Bank::start(HashMap::new()); + + let joe = "Joe".to_string(); + + let result = Bank::deposit(&mut name_server, joe.clone(), 10); + tracing::info!("Deposit result {result:?}"); + assert_eq!(result, Err(BankError::NotACustomer { who: joe.clone() })); + + let result = Bank::new_account(&mut name_server, "Joe".to_string()); + tracing::info!("New account result {result:?}"); + assert_eq!(result, Ok(BankOutMessage::Welcome { who: joe.clone() })); + + let result = Bank::deposit(&mut name_server, "Joe".to_string(), 10); + tracing::info!("Deposit result {result:?}"); + assert_eq!( + result, + Ok(BankOutMessage::Balance { + who: joe.clone(), + amount: 10 + }) + ); + + let result = Bank::deposit(&mut name_server, "Joe".to_string(), 30); + tracing::info!("Deposit result {result:?}"); + assert_eq!( + result, + Ok(BankOutMessage::Balance { + who: joe.clone(), + amount: 40 + }) + ); + + let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 15); + tracing::info!("Withdraw result {result:?}"); + assert_eq!( + result, + Ok(BankOutMessage::WidrawOk { + who: joe.clone(), + amount: 25 + }) + ); + + let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 45); + tracing::info!("Withdraw result {result:?}"); + assert_eq!( + result, + Err(BankError::InsufficientBalance { + who: joe, + amount: 25 + }) + ); + + let result = Bank::stop(&mut name_server); + tracing::info!("Stop result {result:?}"); + assert_eq!(result, Ok(BankOutMessage::Stopped)); + }) +} diff --git a/examples/bank_threads/src/messages.rs b/examples/bank_threads/src/messages.rs new file mode 100644 index 0000000..d58ae9d --- /dev/null +++ b/examples/bank_threads/src/messages.rs @@ -0,0 +1,25 @@ +#[derive(Debug, Clone)] +pub enum BankInMessage { + New { who: String }, + Add { who: String, amount: i32 }, + Remove { who: String, amount: i32 }, + Stop, +} + +#[allow(dead_code)] +#[derive(Debug, Clone, PartialEq)] +pub enum BankOutMessage { + Welcome { who: String }, + Balance { who: String, amount: i32 }, + WidrawOk { who: String, amount: i32 }, + Stopped, +} + +#[allow(dead_code)] +#[derive(Debug, Clone, PartialEq)] +pub enum BankError { + AlreadyACustomer { who: String }, + NotACustomer { who: String }, + InsufficientBalance { who: String, amount: i32 }, + ServerError, +} diff --git a/examples/bank_threads/src/server.rs b/examples/bank_threads/src/server.rs new file mode 100644 index 0000000..8faf775 --- /dev/null +++ b/examples/bank_threads/src/server.rs @@ -0,0 +1,107 @@ +use std::collections::HashMap; + +use spawned_concurrency::threads::{ + CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, +}; +use spawned_rt::threads::mpsc::Sender; + +use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as OutMessage}; + +type MsgResult = Result; +type BankHandle = GenServerHandle; +type BankHandleMessage = GenServerInMsg; +type BankState = HashMap; + +pub struct Bank {} + +impl Bank { + pub fn stop(server: &mut BankHandle) -> MsgResult { + server + .call(InMessage::Stop) + .unwrap_or(Err(BankError::ServerError)) + } + + pub fn new_account(server: &mut BankHandle, who: String) -> MsgResult { + server + .call(InMessage::New { who }) + .unwrap_or(Err(BankError::ServerError)) + } + + pub fn deposit(server: &mut BankHandle, who: String, amount: i32) -> MsgResult { + server + .call(InMessage::Add { who, amount }) + .unwrap_or(Err(BankError::ServerError)) + } + + pub fn withdraw(server: &mut BankHandle, who: String, amount: i32) -> MsgResult { + server + .call(InMessage::Remove { who, amount }) + .unwrap_or(Err(BankError::ServerError)) + } +} + +impl GenServer for Bank { + type InMsg = InMessage; + type OutMsg = MsgResult; + type Error = BankError; + type State = BankState; + + fn new() -> Self { + Self {} + } + + fn handle_call( + &mut self, + message: InMessage, + _tx: &Sender, + state: &mut Self::State, + ) -> CallResponse { + match message.clone() { + InMessage::New { who } => match state.get(&who) { + Some(_amount) => CallResponse::Reply(Err(BankError::AlreadyACustomer { who })), + None => { + state.insert(who.clone(), 0); + CallResponse::Reply(Ok(OutMessage::Welcome { who })) + } + }, + InMessage::Add { who, amount } => match state.get(&who) { + Some(current) => { + let new_amount = current + amount; + state.insert(who.clone(), new_amount); + CallResponse::Reply(Ok(OutMessage::Balance { + who, + amount: new_amount, + })) + } + None => CallResponse::Reply(Err(BankError::NotACustomer { who })), + }, + InMessage::Remove { who, amount } => match state.get(&who) { + Some(current) => match current < &amount { + true => CallResponse::Reply(Err(BankError::InsufficientBalance { + who, + amount: *current, + })), + false => { + let new_amount = current - amount; + state.insert(who.clone(), new_amount); + CallResponse::Reply(Ok(OutMessage::WidrawOk { + who, + amount: new_amount, + })) + } + }, + None => CallResponse::Reply(Err(BankError::NotACustomer { who })), + }, + InMessage::Stop => CallResponse::Stop(Ok(OutMessage::Stopped)), + } + } + + fn handle_cast( + &mut self, + _message: InMessage, + _tx: &Sender, + _state: &mut Self::State, + ) -> CastResponse { + CastResponse::NoReply + } +} diff --git a/examples/name_server/src/main.rs b/examples/name_server/src/main.rs index 8eb69d8..713d41c 100644 --- a/examples/name_server/src/main.rs +++ b/examples/name_server/src/main.rs @@ -18,8 +18,8 @@ use std::collections::HashMap; use messages::NameServerOutMessage; use server::NameServer; -use spawned_concurrency::GenServer as _; -use spawned_rt as rt; +use spawned_concurrency::tasks::GenServer as _; +use spawned_rt::tasks as rt; fn main() { rt::run(async { diff --git a/examples/name_server/src/server.rs b/examples/name_server/src/server.rs index d282251..5e77401 100644 --- a/examples/name_server/src/server.rs +++ b/examples/name_server/src/server.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; -use spawned_concurrency::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; -use spawned_rt::mpsc::Sender; +use spawned_concurrency::tasks::{ + CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, +}; +use spawned_rt::tasks::mpsc::Sender; use crate::messages::{NameServerInMessage as InMessage, NameServerOutMessage as OutMessage}; diff --git a/examples/name_server_with_error/src/main.rs b/examples/name_server_with_error/src/main.rs index ae4aae2..7cc5f67 100644 --- a/examples/name_server_with_error/src/main.rs +++ b/examples/name_server_with_error/src/main.rs @@ -18,8 +18,8 @@ use std::collections::HashMap; use messages::NameServerOutMessage; use server::NameServer; -use spawned_concurrency::GenServer as _; -use spawned_rt as rt; +use spawned_concurrency::tasks::GenServer as _; +use spawned_rt::tasks as rt; fn main() { rt::run(async { diff --git a/examples/name_server_with_error/src/server.rs b/examples/name_server_with_error/src/server.rs index 134c7c8..c72028d 100644 --- a/examples/name_server_with_error/src/server.rs +++ b/examples/name_server_with_error/src/server.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; -use spawned_concurrency::{ +use spawned_concurrency::tasks::{ CallResponse, CastResponse, GenServer, GenServerError, GenServerHandle, GenServerInMsg, }; -use spawned_rt::mpsc::Sender; +use spawned_rt::tasks::mpsc::Sender; use crate::messages::{NameServerInMessage as InMessage, NameServerOutMessage as OutMessage}; diff --git a/examples/ping_pong/src/consumer.rs b/examples/ping_pong/src/consumer.rs index 9a2b57c..8ead269 100644 --- a/examples/ping_pong/src/consumer.rs +++ b/examples/ping_pong/src/consumer.rs @@ -1,5 +1,5 @@ -use spawned_concurrency::{self as concurrency, Process, ProcessInfo}; -use spawned_rt::mpsc::Sender; +use spawned_concurrency::tasks::{self as concurrency, Process, ProcessInfo}; +use spawned_rt::tasks::mpsc::Sender; use crate::messages::Message; diff --git a/examples/ping_pong/src/main.rs b/examples/ping_pong/src/main.rs index 57cc655..1b1599b 100644 --- a/examples/ping_pong/src/main.rs +++ b/examples/ping_pong/src/main.rs @@ -41,7 +41,7 @@ use std::{thread, time::Duration}; use consumer::Consumer; use producer::Producer; -use spawned_rt as rt; +use spawned_rt::tasks as rt; fn main() { rt::run(async { diff --git a/examples/ping_pong/src/messages.rs b/examples/ping_pong/src/messages.rs index e72ccfa..a22ae6c 100644 --- a/examples/ping_pong/src/messages.rs +++ b/examples/ping_pong/src/messages.rs @@ -1,4 +1,4 @@ -use spawned_rt::mpsc::Sender; +use spawned_rt::tasks::mpsc::Sender; #[derive(Debug, Clone)] pub enum Message { diff --git a/examples/ping_pong/src/producer.rs b/examples/ping_pong/src/producer.rs index 6b950a1..71829a1 100644 --- a/examples/ping_pong/src/producer.rs +++ b/examples/ping_pong/src/producer.rs @@ -1,5 +1,5 @@ -use spawned_concurrency::{self as concurrency, Process, ProcessInfo}; -use spawned_rt::mpsc::Sender; +use spawned_concurrency::tasks::{self as concurrency, Process, ProcessInfo}; +use spawned_rt::tasks::mpsc::Sender; use crate::messages::Message; diff --git a/examples/ping_pong_threads/Cargo.toml b/examples/ping_pong_threads/Cargo.toml new file mode 100644 index 0000000..fb2b28a --- /dev/null +++ b/examples/ping_pong_threads/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "ping_pong_threads" +version = "0.1.0" +edition = "2021" + +[dependencies] +spawned-rt = { workspace = true } +spawned-concurrency = { workspace = true } +tracing = { workspace = true } + +[[bin]] +name = "ping_pong_threads" +path = "src/main.rs" \ No newline at end of file diff --git a/examples/ping_pong_threads/src/consumer.rs b/examples/ping_pong_threads/src/consumer.rs new file mode 100644 index 0000000..44777c4 --- /dev/null +++ b/examples/ping_pong_threads/src/consumer.rs @@ -0,0 +1,26 @@ +use spawned_concurrency::threads::{self as concurrency, Process, ProcessInfo}; +use spawned_rt::threads::mpsc::Sender; + +use crate::messages::Message; + +pub struct Consumer {} + +impl Consumer { + pub fn spawn_new() -> ProcessInfo { + Self {}.spawn() + } +} + +impl Process for Consumer { + fn handle(&mut self, message: Message, _tx: &Sender) -> Message { + tracing::info!("Consumer received {message:?}"); + match message.clone() { + Message::Ping { from } => { + tracing::info!("Consumer sent Pong"); + concurrency::send(&from, Message::Pong); + } + Message::Pong => (), + }; + message + } +} diff --git a/examples/ping_pong_threads/src/main.rs b/examples/ping_pong_threads/src/main.rs new file mode 100644 index 0000000..73fc4d6 --- /dev/null +++ b/examples/ping_pong_threads/src/main.rs @@ -0,0 +1,55 @@ +//! Simple example to test concurrency/Process abstraction +//! +//! Based on an Erlang example: +//! -module(ping). +//! +//! -export([ping/1, pong/0, spawn_consumer/0, spawn_producer/1, start/0]). +//! +//! ping(Pid) -> +//! Pid ! {ping, self()}, +//! receive +//! pong -> +//! io:format("Received pong!!!~n"), +//! ping(Pid) +//! end. +//! +//! pong() -> +//! receive +//! {ping, Pid} -> +//! io:format("Received ping!!~n"), +//! Pid ! pong, +//! pong(); +//! die -> +//! ok +//! end. +//! +//! spawn_consumer() -> +//! spawn(ping, pong, []). +//! +//! spawn_producer(Pid) -> +//! spawn(ping, ping, [Pid]). +//! +//! start() -> +//! Pid = spawn_consumer(), +//! spawn_producer(Pid). + +mod consumer; +mod messages; +mod producer; + +use std::{thread, time::Duration}; + +use consumer::Consumer; +use producer::Producer; +use spawned_rt::threads as rt; + +fn main() { + rt::run(|| { + let consumer = Consumer::spawn_new(); + + Producer::spawn_new(consumer.sender()); + + // giving it some time before ending + thread::sleep(Duration::from_millis(1)); + }) +} diff --git a/examples/ping_pong_threads/src/messages.rs b/examples/ping_pong_threads/src/messages.rs new file mode 100644 index 0000000..e8a07ef --- /dev/null +++ b/examples/ping_pong_threads/src/messages.rs @@ -0,0 +1,7 @@ +use spawned_rt::threads::mpsc::Sender; + +#[derive(Debug, Clone)] +pub enum Message { + Ping { from: Sender }, + Pong, +} diff --git a/examples/ping_pong_threads/src/producer.rs b/examples/ping_pong_threads/src/producer.rs new file mode 100644 index 0000000..01dd564 --- /dev/null +++ b/examples/ping_pong_threads/src/producer.rs @@ -0,0 +1,32 @@ +use spawned_concurrency::threads::{self as concurrency, Process, ProcessInfo}; +use spawned_rt::threads::mpsc::Sender; + +use crate::messages::Message; + +pub struct Producer { + consumer: Sender, +} + +impl Producer { + pub fn spawn_new(consumer: Sender) -> ProcessInfo { + Self { consumer }.spawn() + } + + fn send_ping(&self, tx: &Sender, consumer: &Sender) { + let message = Message::Ping { from: tx.clone() }; + tracing::info!("Producer sent Ping"); + concurrency::send(consumer, message); + } +} + +impl Process for Producer { + fn init(&mut self, tx: &Sender) { + self.send_ping(tx, &self.consumer); + } + + fn handle(&mut self, message: Message, tx: &Sender) -> Message { + tracing::info!("Producer received {message:?}"); + self.send_ping(tx, &self.consumer); + message + } +} diff --git a/examples/updater/src/main.rs b/examples/updater/src/main.rs index 4b3aa50..5119b9e 100644 --- a/examples/updater/src/main.rs +++ b/examples/updater/src/main.rs @@ -10,8 +10,8 @@ use std::{thread, time::Duration}; use messages::UpdaterOutMessage; use server::{UpdateServerState, UpdaterServer}; -use spawned_concurrency::GenServer as _; -use spawned_rt as rt; +use spawned_concurrency::tasks::GenServer as _; +use spawned_rt::tasks as rt; fn main() { rt::run(async { diff --git a/examples/updater/src/server.rs b/examples/updater/src/server.rs index 31e817e..c600ed9 100644 --- a/examples/updater/src/server.rs +++ b/examples/updater/src/server.rs @@ -1,9 +1,9 @@ use std::time::Duration; -use spawned_concurrency::{ - CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, send_after, +use spawned_concurrency::tasks::{ + send_after, CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, }; -use spawned_rt::mpsc::Sender; +use spawned_rt::tasks::mpsc::Sender; use crate::messages::{UpdaterInMessage as InMessage, UpdaterOutMessage as OutMessage}; diff --git a/examples/updater_threads/Cargo.toml b/examples/updater_threads/Cargo.toml new file mode 100644 index 0000000..7266750 --- /dev/null +++ b/examples/updater_threads/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "updater_threads" +version = "0.1.0" +edition = "2021" + +[dependencies] +spawned-rt = { workspace = true } +spawned-concurrency = { workspace = true } +tracing = { workspace = true } +reqwest = { version = "0.11", features = ["blocking"] } +futures = "0.3.1" + +[[bin]] +name = "updater_threads" +path = "src/main.rs" \ No newline at end of file diff --git a/examples/updater_threads/src/main.rs b/examples/updater_threads/src/main.rs new file mode 100644 index 0000000..64236be --- /dev/null +++ b/examples/updater_threads/src/main.rs @@ -0,0 +1,30 @@ +//! Example to test a recurrent gen_server. +//! +//! Just activates periodically and performs an http request +//! + +mod messages; +mod server; + +use std::{thread, time::Duration}; + +use messages::UpdaterOutMessage; +use server::{UpdateServerState, UpdaterServer}; +use spawned_concurrency::threads::GenServer as _; +use spawned_rt::threads as rt; + +fn main() { + rt::run(|| { + let mut update_server = UpdaterServer::start(UpdateServerState { + url: "https://httpbin.org/ip".to_string(), + periodicity: Duration::from_millis(1000), + }); + + let result = UpdaterServer::check(&mut update_server); + tracing::info!("Update check done: {result:?}"); + assert_eq!(result, UpdaterOutMessage::Ok); + + // giving it some time before ending + thread::sleep(Duration::from_secs(10)); + }) +} diff --git a/examples/updater_threads/src/messages.rs b/examples/updater_threads/src/messages.rs new file mode 100644 index 0000000..daa0589 --- /dev/null +++ b/examples/updater_threads/src/messages.rs @@ -0,0 +1,11 @@ +#[derive(Debug, Clone)] +pub enum UpdaterInMessage { + Check, +} + +#[allow(dead_code)] +#[derive(Debug, Clone, PartialEq)] +pub enum UpdaterOutMessage { + Ok, + Error, +} diff --git a/examples/updater_threads/src/server.rs b/examples/updater_threads/src/server.rs new file mode 100644 index 0000000..cbd9643 --- /dev/null +++ b/examples/updater_threads/src/server.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use spawned_concurrency::threads::{ + send_after, CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, +}; +use spawned_rt::threads::{block_on, mpsc::Sender}; + +use crate::messages::{UpdaterInMessage as InMessage, UpdaterOutMessage as OutMessage}; + +type UpdateServerHandle = GenServerHandle; +type UpdateServerMessage = GenServerInMsg; + +#[derive(Clone)] +pub struct UpdateServerState { + pub url: String, + pub periodicity: Duration, +} +pub struct UpdaterServer {} + +impl UpdaterServer { + pub fn check(server: &mut UpdateServerHandle) -> OutMessage { + match server.cast(InMessage::Check) { + Ok(_) => OutMessage::Ok, + Err(_) => OutMessage::Error, + } + } +} + +impl GenServer for UpdaterServer { + type InMsg = InMessage; + type OutMsg = OutMessage; + type Error = std::fmt::Error; + type State = UpdateServerState; + + fn new() -> Self { + Self {} + } + + fn handle_call( + &mut self, + _message: InMessage, + _tx: &Sender, + _state: &mut Self::State, + ) -> CallResponse { + CallResponse::Reply(OutMessage::Ok) + } + + fn handle_cast( + &mut self, + message: InMessage, + tx: &Sender, + state: &mut Self::State, + ) -> CastResponse { + match message { + Self::InMsg::Check => { + send_after(state.periodicity, tx.clone(), InMessage::Check); + let url = state.url.clone(); + tracing::info!("Fetching: {url}"); + let resp = block_on(req(url)); + + tracing::info!("Response: {resp:?}"); + + CastResponse::NoReply + } + } + } +} + +async fn req(url: String) -> Result { + reqwest::get(url).await?.text().await +} diff --git a/rt/Cargo.toml b/rt/Cargo.toml index f368c44..e514397 100644 --- a/rt/Cargo.toml +++ b/rt/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] tokio = { version = "1", features = ["full"] } +crossbeam = { version = "0.7.3" } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/rt/src/lib.rs b/rt/src/lib.rs index ab74777..dc56e81 100644 --- a/rt/src/lib.rs +++ b/rt/src/lib.rs @@ -7,34 +7,6 @@ //! Currently, only a very limited set of tokio functionality is reexported. We may want to //! extend this functionality as needed. -mod tokio; - -use std::future::Future; -use std::str::FromStr; - -use tracing_subscriber::EnvFilter; -use tracing_subscriber::FmtSubscriber; -use tracing_subscriber::filter::Directive; - -pub use crate::tokio::mpsc; -pub use crate::tokio::oneshot; -pub use crate::tokio::sleep; -pub use crate::tokio::{JoinHandle, Runtime, spawn}; - -pub fn run(future: F) -> F::Output { - init_tracing(); - - let rt = Runtime::new().unwrap(); - rt.block_on(future) -} - -fn init_tracing() { - let subscriber = FmtSubscriber::builder() - .with_env_filter( - EnvFilter::builder() - .with_default_directive(Directive::from_str("info").unwrap()) - .from_env_lossy(), - ) - .finish(); - tracing::subscriber::set_global_default(subscriber).unwrap(); -} +pub mod tasks; +pub mod threads; +mod tracing; diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs new file mode 100644 index 0000000..5cb9a41 --- /dev/null +++ b/rt/src/tasks/mod.rs @@ -0,0 +1,25 @@ +//! Runtime wrapper to remove dependencies from code. Using this library will +//! allow to set a tokio runtime or any other runtime, once implemented just by +//! changing the enabled feature. +//! May implement the `deterministic` version based on comonware.xyz's runtime: +//! https://github.com/commonwarexyz/monorepo/blob/main/runtime/src/deterministic.rs +//! +//! Currently, only a very limited set of tokio functionality is reexported. We may want to +//! extend this functionality as needed. + +mod tokio; + +use crate::tracing::init_tracing; + +pub use crate::tasks::tokio::mpsc; +pub use crate::tasks::tokio::oneshot; +pub use crate::tasks::tokio::sleep; +pub use crate::tasks::tokio::{spawn, JoinHandle, Runtime}; +use std::future::Future; + +pub fn run(future: F) -> F::Output { + init_tracing(); + + let rt = Runtime::new().unwrap(); + rt.block_on(future) +} diff --git a/rt/src/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs similarity index 84% rename from rt/src/tokio/mod.rs rename to rt/src/tasks/tokio/mod.rs index 0ad2c20..7d7ba9a 100644 --- a/rt/src/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -4,6 +4,6 @@ pub mod oneshot; pub use tokio::{ runtime::Runtime, - task::{JoinHandle, spawn}, + task::{spawn, JoinHandle}, time::sleep, }; diff --git a/rt/src/tasks/tokio/mpsc.rs b/rt/src/tasks/tokio/mpsc.rs new file mode 100644 index 0000000..ec520a6 --- /dev/null +++ b/rt/src/tasks/tokio/mpsc.rs @@ -0,0 +1,6 @@ +//! Tokio.rs reexports to prevent tokio dependencies within external code + +pub use tokio::sync::mpsc::{ + error::SendError, unbounded_channel as channel, UnboundedReceiver as Receiver, + UnboundedSender as Sender, +}; diff --git a/rt/src/tokio/oneshot.rs b/rt/src/tasks/tokio/oneshot.rs similarity index 55% rename from rt/src/tokio/oneshot.rs rename to rt/src/tasks/tokio/oneshot.rs index f147d7d..682aba2 100644 --- a/rt/src/tokio/oneshot.rs +++ b/rt/src/tasks/tokio/oneshot.rs @@ -1,3 +1,3 @@ //! Tokio.rs reexports to prevent tokio dependencies within external code -pub use tokio::sync::oneshot::{Receiver, Sender, channel}; +pub use tokio::sync::oneshot::{channel, Receiver, Sender}; diff --git a/rt/src/threads/mod.rs b/rt/src/threads/mod.rs new file mode 100644 index 0000000..cd8b543 --- /dev/null +++ b/rt/src/threads/mod.rs @@ -0,0 +1,22 @@ +//! IO-threads based module to support shared behavior with task based version. + +pub mod mpsc; +pub mod oneshot; + +pub use std::{ + future::Future, + thread::{sleep, spawn, JoinHandle}, +}; + +use crate::{tasks::Runtime, tracing::init_tracing}; + +pub fn run(f: fn()) { + init_tracing(); + + f() +} + +pub fn block_on(future: F) -> F::Output { + let rt = Runtime::new().unwrap(); + rt.block_on(future) +} diff --git a/rt/src/threads/mpsc.rs b/rt/src/threads/mpsc.rs new file mode 100644 index 0000000..c5b140a --- /dev/null +++ b/rt/src/threads/mpsc.rs @@ -0,0 +1,3 @@ +//! non-async replacement for mpsc channels + +pub use std::sync::mpsc::{channel, Receiver, SendError, Sender}; diff --git a/rt/src/threads/oneshot.rs b/rt/src/threads/oneshot.rs new file mode 100644 index 0000000..ba0accb --- /dev/null +++ b/rt/src/threads/oneshot.rs @@ -0,0 +1,3 @@ +//! non-async replacement for oneshot channels + +pub use crossbeam::{crossbeam_channel::unbounded as channel, Receiver, Sender}; diff --git a/rt/src/tokio/mpsc.rs b/rt/src/tokio/mpsc.rs deleted file mode 100644 index 62e4977..0000000 --- a/rt/src/tokio/mpsc.rs +++ /dev/null @@ -1,6 +0,0 @@ -//! Tokio.rs reexports to prevent tokio dependencies within external code - -pub use tokio::sync::mpsc::{ - UnboundedReceiver as Receiver, UnboundedSender as Sender, error::SendError, - unbounded_channel as channel, -}; diff --git a/rt/src/tracing/mod.rs b/rt/src/tracing/mod.rs new file mode 100644 index 0000000..276be66 --- /dev/null +++ b/rt/src/tracing/mod.rs @@ -0,0 +1,19 @@ +//! Tracing initializer +//! + +use std::str::FromStr; + +use tracing_subscriber::filter::Directive; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::FmtSubscriber; + +pub(crate) fn init_tracing() { + let subscriber = FmtSubscriber::builder() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(Directive::from_str("info").unwrap()) + .from_env_lossy(), + ) + .finish(); + tracing::subscriber::set_global_default(subscriber).unwrap(); +}