1 Commits

Author SHA1 Message Date
Luke Parker
8404844c4e Create a Snapshot within RocksDB when starting a transaction
This ensures if a TXN is reading a value, and another TXN mutates it, the TXN's
actions are atomic to the first value (and not affected by the inconsistency).

Because we can't replicate this with parity-db, I'm not sure this is worth
committing.
2024-03-31 10:20:59 -04:00
24 changed files with 514 additions and 1011 deletions

267
Cargo.lock generated
View File

@@ -173,9 +173,9 @@ dependencies = [
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.82" version = "1.0.81"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f538837af36e6f6a9be0faa67f9a314f8119e4e4b5867c6ab40ed60360142519" checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247"
[[package]] [[package]]
name = "approx" name = "approx"
@@ -287,7 +287,7 @@ checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b"
dependencies = [ dependencies = [
"event-listener 4.0.3", "event-listener 4.0.3",
"event-listener-strategy", "event-listener-strategy",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
] ]
[[package]] [[package]]
@@ -298,7 +298,7 @@ checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -322,7 +322,7 @@ dependencies = [
"futures-sink", "futures-sink",
"futures-util", "futures-util",
"memchr", "memchr",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
] ]
[[package]] [[package]]
@@ -344,7 +344,7 @@ checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -449,14 +449,14 @@ dependencies = [
"regex", "regex",
"rustc-hash", "rustc-hash",
"shlex", "shlex",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
name = "bitcoin" name = "bitcoin"
version = "0.31.2" version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c85783c2fe40083ea54a33aa2f0ba58831d90fcd190f5bdc47e74e84d2a96ae" checksum = "fd00f3c09b5f21fb357abe32d29946eb8bb7a0862bae62c0b5e4a692acbbe73c"
dependencies = [ dependencies = [
"bech32", "bech32",
"bitcoin-internals", "bitcoin-internals",
@@ -633,7 +633,7 @@ dependencies = [
"hyper 0.14.28", "hyper 0.14.28",
"hyperlocal", "hyperlocal",
"log", "log",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
@@ -677,7 +677,7 @@ dependencies = [
"proc-macro-crate 3.1.0", "proc-macro-crate 3.1.0",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
"syn_derive", "syn_derive",
] ]
@@ -723,9 +723,9 @@ dependencies = [
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.16.0" version = "3.15.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa"
[[package]] [[package]]
name = "byte-slice-cast" name = "byte-slice-cast"
@@ -964,7 +964,7 @@ dependencies = [
"heck", "heck",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -1301,14 +1301,14 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
name = "cxx" name = "cxx"
version = "1.0.121" version = "1.0.120"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21db378d04296a84d8b7d047c36bb3954f0b46529db725d7e62fb02f9ba53ccc" checksum = "ff4dc7287237dd438b926a81a1a5605dad33d286870e5eee2db17bf2bcd9e92a"
dependencies = [ dependencies = [
"cc", "cc",
"cxxbridge-flags", "cxxbridge-flags",
@@ -1318,9 +1318,9 @@ dependencies = [
[[package]] [[package]]
name = "cxx-build" name = "cxx-build"
version = "1.0.121" version = "1.0.120"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e5262a7fa3f0bae2a55b767c223ba98032d7c328f5c13fa5cdc980b77fc0658" checksum = "f47c6c8ad7c1a10d3ef0fe3ff6733f4db0d78f08ef0b13121543163ef327058b"
dependencies = [ dependencies = [
"cc", "cc",
"codespan-reporting", "codespan-reporting",
@@ -1328,24 +1328,24 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"scratch", "scratch",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
name = "cxxbridge-flags" name = "cxxbridge-flags"
version = "1.0.121" version = "1.0.120"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be8dcadd2e2fb4a501e1d9e93d6e88e6ea494306d8272069c92d5a9edf8855c0" checksum = "701a1ac7a697e249cdd8dc026d7a7dafbfd0dbcd8bd24ec55889f2bc13dd6287"
[[package]] [[package]]
name = "cxxbridge-macro" name = "cxxbridge-macro"
version = "1.0.121" version = "1.0.120"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad08a837629ad949b73d032c637653d069e909cffe4ee7870b02301939ce39cc" checksum = "b404f596046b0bb2d903a9c786b875a126261b52b7c3a64bbb66382c41c771df"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -1401,9 +1401,9 @@ dependencies = [
[[package]] [[package]]
name = "der" name = "der"
version = "0.7.9" version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c"
dependencies = [ dependencies = [
"const-oid", "const-oid",
"zeroize", "zeroize",
@@ -1531,7 +1531,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -1718,9 +1718,9 @@ dependencies = [
[[package]] [[package]]
name = "encoding_rs" name = "encoding_rs"
version = "0.8.34" version = "0.8.33"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
] ]
@@ -1764,7 +1764,7 @@ dependencies = [
"heck", "heck",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -1907,7 +1907,7 @@ dependencies = [
"regex", "regex",
"serde", "serde",
"serde_json", "serde_json",
"syn 2.0.58", "syn 2.0.55",
"toml 0.7.8", "toml 0.7.8",
"walkdir", "walkdir",
] ]
@@ -1925,7 +1925,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"serde_json", "serde_json",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -1951,7 +1951,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"strum 0.25.0", "strum 0.25.0",
"syn 2.0.58", "syn 2.0.55",
"tempfile", "tempfile",
"thiserror", "thiserror",
"tiny-keccak", "tiny-keccak",
@@ -2008,7 +2008,7 @@ checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e"
dependencies = [ dependencies = [
"concurrent-queue", "concurrent-queue",
"parking", "parking",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
] ]
[[package]] [[package]]
@@ -2018,7 +2018,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3"
dependencies = [ dependencies = [
"event-listener 4.0.3", "event-listener 4.0.3",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
] ]
[[package]] [[package]]
@@ -2040,7 +2040,7 @@ dependencies = [
"fs-err", "fs-err",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -2314,7 +2314,7 @@ dependencies = [
"proc-macro-warning", "proc-macro-warning",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -2326,7 +2326,7 @@ dependencies = [
"proc-macro-crate 1.3.1", "proc-macro-crate 1.3.1",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -2336,7 +2336,7 @@ source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -2484,7 +2484,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
] ]
[[package]] [[package]]
@@ -2495,7 +2495,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -2554,7 +2554,7 @@ dependencies = [
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr", "memchr",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"pin-utils", "pin-utils",
"slab", "slab",
] ]
@@ -2616,9 +2616,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.14" version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
@@ -2696,9 +2696,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.26" version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@@ -2875,7 +2875,7 @@ checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [ dependencies = [
"bytes", "bytes",
"http 0.2.12", "http 0.2.12",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
] ]
[[package]] [[package]]
@@ -2898,7 +2898,7 @@ dependencies = [
"futures-core", "futures-core",
"http 1.1.0", "http 1.1.0",
"http-body 1.0.0", "http-body 1.0.0",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
] ]
[[package]] [[package]]
@@ -2941,7 +2941,7 @@ dependencies = [
"httparse", "httparse",
"httpdate", "httpdate",
"itoa", "itoa",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"socket2 0.4.10", "socket2 0.4.10",
"tokio", "tokio",
"tower-service", "tower-service",
@@ -2962,7 +2962,7 @@ dependencies = [
"http-body 1.0.0", "http-body 1.0.0",
"httparse", "httparse",
"itoa", "itoa",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"smallvec", "smallvec",
"tokio", "tokio",
"want", "want",
@@ -2998,7 +2998,7 @@ dependencies = [
"http 1.1.0", "http 1.1.0",
"http-body 1.0.0", "http-body 1.0.0",
"hyper 1.2.0", "hyper 1.2.0",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"socket2 0.5.6", "socket2 0.5.6",
"tokio", "tokio",
"tower", "tower",
@@ -3822,7 +3822,7 @@ dependencies = [
"proc-macro-warning", "proc-macro-warning",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -4075,7 +4075,7 @@ dependencies = [
"macro_magic_core", "macro_magic_core",
"macro_magic_macros", "macro_magic_macros",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -4089,7 +4089,7 @@ dependencies = [
"macro_magic_core_macros", "macro_magic_core_macros",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -4100,7 +4100,7 @@ checksum = "d710e1214dffbab3b5dacb21475dde7d6ed84c69ff722b3a47a782668d44fbac"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -4111,7 +4111,7 @@ checksum = "b8fb85ec1620619edf2984a7693497d4ec88a9665d8b87e942856884c92dbf2a"
dependencies = [ dependencies = [
"macro_magic_core", "macro_magic_core",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -4164,9 +4164,9 @@ dependencies = [
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.7.2" version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149"
[[package]] [[package]]
name = "memfd" name = "memfd"
@@ -4518,9 +4518,9 @@ dependencies = [
[[package]] [[package]]
name = "nalgebra" name = "nalgebra"
version = "0.32.5" version = "0.32.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ea4908d4f23254adda3daa60ffef0f1ac7b8c3e9a864cf3cc154b251908a2ef" checksum = "4541eb06dce09c0241ebbaab7102f0a01a0c8994afed2e5d0d66775016e25ac2"
dependencies = [ dependencies = [
"approx", "approx",
"matrixmultiply", "matrixmultiply",
@@ -4607,9 +4607,9 @@ dependencies = [
[[package]] [[package]]
name = "netlink-sys" name = "netlink-sys"
version = "0.8.6" version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "416060d346fbaf1f23f9512963e3e878f1a78e707cb699ba9215761754244307" checksum = "6471bf08e7ac0135876a9581bf3217ef0333c191c128d34878079f42ee150411"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures", "futures",
@@ -4750,7 +4750,7 @@ dependencies = [
"proc-macro-crate 3.1.0", "proc-macro-crate 3.1.0",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -5219,7 +5219,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -5230,9 +5230,9 @@ checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777"
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.14" version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]] [[package]]
name = "pin-utils" name = "pin-utils"
@@ -5271,7 +5271,7 @@ dependencies = [
"cfg-if", "cfg-if",
"concurrent-queue", "concurrent-queue",
"hermit-abi", "hermit-abi",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"rustix", "rustix",
"tracing", "tracing",
"windows-sys 0.52.0", "windows-sys 0.52.0",
@@ -5359,7 +5359,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5" checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -5443,7 +5443,7 @@ checksum = "3d1eaa7fa0aa1929ffdf7eeb6eac234dde6268914a14ad44d23521ab6a9b258e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -5489,7 +5489,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -5602,7 +5602,7 @@ checksum = "8cc2c5017e4b43d5995dcea317bc46c1e09404c0a9664d2908f7f02dfe943d75"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-io", "futures-io",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"quinn-proto", "quinn-proto",
"quinn-udp", "quinn-udp",
"rustc-hash", "rustc-hash",
@@ -5644,9 +5644,9 @@ dependencies = [
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.36" version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@@ -5790,7 +5790,7 @@ checksum = "5fddb4f8d99b0a2ebafc65a87a69a7b9875e4b1ae1f00db265d300ef7f28bccc"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -5871,7 +5871,7 @@ dependencies = [
"mime", "mime",
"once_cell", "once_cell",
"percent-encoding", "percent-encoding",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"serde", "serde",
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
@@ -6119,9 +6119,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls-pki-types" name = "rustls-pki-types"
version = "1.4.1" version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" checksum = "868e20fada228fefaf6b652e00cc73623d54f8171e7352c18bb281571f2d92da"
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
@@ -6146,9 +6146,9 @@ dependencies = [
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.15" version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80af6f9131f277a45a3fba6ce8e2258037bb0477a67e610d3c1fe046ab31de47" checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4"
[[package]] [[package]]
name = "rw-stream-sink" name = "rw-stream-sink"
@@ -6289,7 +6289,7 @@ dependencies = [
"proc-macro-crate 1.3.1", "proc-macro-crate 1.3.1",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -7050,7 +7050,7 @@ dependencies = [
"proc-macro-crate 1.3.1", "proc-macro-crate 1.3.1",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -7112,9 +7112,9 @@ dependencies = [
[[package]] [[package]]
name = "scale-info" name = "scale-info"
version = "2.11.2" version = "2.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c453e59a955f81fb62ee5d596b450383d699f152d350e9d23a0db2adb78e4c0" checksum = "788745a868b0e751750388f4e6546eb921ef714a4317fa6954f7cde114eb2eb7"
dependencies = [ dependencies = [
"bitvec", "bitvec",
"cfg-if", "cfg-if",
@@ -7126,9 +7126,9 @@ dependencies = [
[[package]] [[package]]
name = "scale-info-derive" name = "scale-info-derive"
version = "2.11.2" version = "2.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18cf6c6447f813ef19eb450e985bcce6705f9ce7660db221b59093d15c79c4b7" checksum = "7dc2f4e8bc344b9fc3d5f74f72c2e55bfc38d28dc2ebc69c194a3df424e4d9ac"
dependencies = [ dependencies = [
"proc-macro-crate 1.3.1", "proc-macro-crate 1.3.1",
"proc-macro2", "proc-macro2",
@@ -7263,9 +7263,9 @@ dependencies = [
[[package]] [[package]]
name = "security-framework" name = "security-framework"
version = "2.10.0" version = "2.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6" checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags 1.3.2",
"core-foundation", "core-foundation",
@@ -7276,9 +7276,9 @@ dependencies = [
[[package]] [[package]]
name = "security-framework-sys" name = "security-framework-sys"
version = "2.10.0" version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41f3cc463c0ef97e11c3461a9d3787412d30e8e7eb907c79180c4a57bf7c04ef" checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a"
dependencies = [ dependencies = [
"core-foundation-sys", "core-foundation-sys",
"libc", "libc",
@@ -7613,7 +7613,6 @@ dependencies = [
"futures-util", "futures-util",
"hex", "hex",
"jsonrpsee", "jsonrpsee",
"libp2p",
"pallet-transaction-payment-rpc", "pallet-transaction-payment-rpc",
"rand_core", "rand_core",
"sc-authority-discovery", "sc-authority-discovery",
@@ -7901,7 +7900,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -7917,13 +7916,13 @@ dependencies = [
[[package]] [[package]]
name = "serde_repr" name = "serde_repr"
version = "0.1.19" version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" checksum = "0b2e6b945e9d3df726b65d6ee24060aff8e3533d431f677a9695db04eff9dfdb"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -8200,7 +8199,7 @@ dependencies = [
"proc-macro-crate 1.3.1", "proc-macro-crate 1.3.1",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -8396,7 +8395,7 @@ source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46
dependencies = [ dependencies = [
"quote", "quote",
"sp-core-hashing", "sp-core-hashing",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -8415,7 +8414,7 @@ source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -8587,7 +8586,7 @@ dependencies = [
"proc-macro-crate 1.3.1", "proc-macro-crate 1.3.1",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -8740,7 +8739,7 @@ dependencies = [
"parity-scale-codec", "parity-scale-codec",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -8877,9 +8876,9 @@ dependencies = [
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.11.1" version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01"
[[package]] [[package]]
name = "strum" name = "strum"
@@ -8919,7 +8918,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"rustversion", "rustversion",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -9007,9 +9006,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.58" version = "2.0.55"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44cfb93f38070beee36b3fef7d4f5a16f27751d94b187b666a5cc5e9b0d30687" checksum = "002a1b3dbf967edfafc32655d0f377ab0bb7b994aa1d32c8cc7e9b8bf3ebb8f0"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -9025,7 +9024,7 @@ dependencies = [
"proc-macro-error", "proc-macro-error",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -9132,7 +9131,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -9228,9 +9227,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.37.0" version = "1.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
dependencies = [ dependencies = [
"backtrace", "backtrace",
"bytes", "bytes",
@@ -9238,7 +9237,7 @@ dependencies = [
"mio", "mio",
"num_cpus", "num_cpus",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"signal-hook-registry", "signal-hook-registry",
"socket2 0.5.6", "socket2 0.5.6",
"tokio-macros", "tokio-macros",
@@ -9253,7 +9252,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -9274,7 +9273,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"tokio", "tokio",
"tokio-util", "tokio-util",
] ]
@@ -9289,7 +9288,7 @@ dependencies = [
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-sink", "futures-sink",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"tokio", "tokio",
"tracing", "tracing",
] ]
@@ -9357,7 +9356,7 @@ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"pin-project", "pin-project",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"tokio", "tokio",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
@@ -9377,7 +9376,7 @@ dependencies = [
"http 0.2.12", "http 0.2.12",
"http-body 0.4.6", "http-body 0.4.6",
"http-range-header", "http-range-header",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
] ]
@@ -9401,7 +9400,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [ dependencies = [
"log", "log",
"pin-project-lite 0.2.14", "pin-project-lite 0.2.13",
"tracing-attributes", "tracing-attributes",
"tracing-core", "tracing-core",
] ]
@@ -9414,7 +9413,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -9819,7 +9818,7 @@ dependencies = [
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@@ -9853,7 +9852,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
"wasm-bindgen-backend", "wasm-bindgen-backend",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@@ -10162,7 +10161,7 @@ checksum = "ca7af9bb3ee875c4907835e607a275d10b04d15623d3aebe01afe8fbd3f85050"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -10205,9 +10204,9 @@ dependencies = [
[[package]] [[package]]
name = "widestring" name = "widestring"
version = "1.1.0" version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311" checksum = "653f141f39ec16bba3c5abe400a0c60da7468261cc2cbf36805022876bc721a8"
[[package]] [[package]]
name = "winapi" name = "winapi"
@@ -10478,9 +10477,9 @@ dependencies = [
[[package]] [[package]]
name = "xml-rs" name = "xml-rs"
version = "0.8.20" version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "791978798f0597cfc70478424c2b4fdc2b7a8024aaff78497ef00f24ef674193" checksum = "0fcb9cbac069e033553e8bb871be2fbdffcab578eb25bd0f7c508cedc6dcd75a"
[[package]] [[package]]
name = "xmltree" name = "xmltree"
@@ -10539,7 +10538,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
@@ -10559,14 +10558,14 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.58", "syn 2.0.55",
] ]
[[package]] [[package]]
name = "zstd" name = "zstd"
version = "0.11.2+zstd.1.5.2" version = "0.11.2+zstd.1.5.2"
dependencies = [ dependencies = [
"zstd 0.13.1", "zstd 0.13.0",
] ]
[[package]] [[package]]
@@ -10580,11 +10579,11 @@ dependencies = [
[[package]] [[package]]
name = "zstd" name = "zstd"
version = "0.13.1" version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110"
dependencies = [ dependencies = [
"zstd-safe 7.1.0", "zstd-safe 7.0.0",
] ]
[[package]] [[package]]
@@ -10599,18 +10598,18 @@ dependencies = [
[[package]] [[package]]
name = "zstd-safe" name = "zstd-safe"
version = "7.1.0" version = "7.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a" checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e"
dependencies = [ dependencies = [
"zstd-sys", "zstd-sys",
] ]
[[package]] [[package]]
name = "zstd-sys" name = "zstd-sys"
version = "2.0.10+zstd.1.5.6" version = "2.0.9+zstd.1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa" checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656"
dependencies = [ dependencies = [
"cc", "cc",
"pkg-config", "pkg-config",

View File

@@ -9,7 +9,7 @@ use std_shims::{
use rand_core::{RngCore, CryptoRng}; use rand_core::{RngCore, CryptoRng};
use zeroize::{Zeroize, ZeroizeOnDrop, Zeroizing}; use zeroize::{Zeroize, ZeroizeOnDrop, Zeroizing};
use subtle::{ConstantTimeEq, ConditionallySelectable}; use subtle::{ConstantTimeEq, Choice, CtOption};
use curve25519_dalek::{ use curve25519_dalek::{
constants::ED25519_BASEPOINT_TABLE, constants::ED25519_BASEPOINT_TABLE,
@@ -169,8 +169,13 @@ fn core(
} }
// Perform the core loop // Perform the core loop
let mut c1 = c; let mut c1 = CtOption::new(Scalar::ZERO, Choice::from(0));
for i in (start .. end).map(|i| i % n) { for i in (start .. end).map(|i| i % n) {
// This will only execute once and shouldn't need to be constant time. Making it constant time
// removes the risk of branch prediction creating timing differences depending on ring index
// however
c1 = c1.or_else(|| CtOption::new(c, i.ct_eq(&0)));
let c_p = mu_P * c; let c_p = mu_P * c;
let c_c = mu_C * c; let c_c = mu_C * c;
@@ -183,15 +188,10 @@ fn core(
to_hash.extend(L.compress().to_bytes()); to_hash.extend(L.compress().to_bytes());
to_hash.extend(R.compress().to_bytes()); to_hash.extend(R.compress().to_bytes());
c = hash_to_scalar(&to_hash); c = hash_to_scalar(&to_hash);
// This will only execute once and shouldn't need to be constant time. Making it constant time
// removes the risk of branch prediction creating timing differences depending on ring index
// however
c1.conditional_assign(&c, i.ct_eq(&(n - 1)));
} }
// This first tuple is needed to continue signing, the latter is the c to be tested/worked with // This first tuple is needed to continue signing, the latter is the c to be tested/worked with
((D, c * mu_P, c * mu_C), c1) ((D, c * mu_P, c * mu_C), c1.unwrap_or(c))
} }
/// CLSAG signature, as used in Monero. /// CLSAG signature, as used in Monero.

View File

@@ -57,7 +57,7 @@ fn clsag() {
} }
let image = generate_key_image(&secrets.0); let image = generate_key_image(&secrets.0);
let (mut clsag, pseudo_out) = Clsag::sign( let (clsag, pseudo_out) = Clsag::sign(
&mut OsRng, &mut OsRng,
vec![( vec![(
secrets.0, secrets.0,
@@ -76,12 +76,7 @@ fn clsag() {
msg, msg,
) )
.swap_remove(0); .swap_remove(0);
clsag.verify(&ring, &image, &pseudo_out, &msg).unwrap(); clsag.verify(&ring, &image, &pseudo_out, &msg).unwrap();
// make sure verification fails if we throw a random `c1` at it.
clsag.c1 = random_scalar(&mut OsRng);
assert!(clsag.verify(&ring, &image, &pseudo_out, &msg).is_err());
} }
} }

View File

@@ -1,33 +1,40 @@
use std::sync::Arc; use std::{sync::Arc, collections::HashSet};
use rocksdb::{ use rocksdb::{
DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions, DBCompressionType, ThreadMode, SingleThreaded, LogLevel, WriteOptions,
Transaction as RocksTransaction, Options, OptimisticTransactionDB, Transaction as RocksTransaction, Options, OptimisticTransactionDB, SnapshotWithThreadMode,
}; };
use crate::*; use crate::*;
pub struct Transaction<'a, T: ThreadMode>( pub struct Transaction<'a, T: ThreadMode> {
RocksTransaction<'a, OptimisticTransactionDB<T>>, dirtied_keys: HashSet<Vec<u8>>,
&'a OptimisticTransactionDB<T>, txn: RocksTransaction<'a, OptimisticTransactionDB<T>>,
); snapshot: SnapshotWithThreadMode<'a, OptimisticTransactionDB<T>>,
db: &'a OptimisticTransactionDB<T>,
}
impl<T: ThreadMode> Get for Transaction<'_, T> { impl<T: ThreadMode> Get for Transaction<'_, T> {
fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> { fn get(&self, key: impl AsRef<[u8]>) -> Option<Vec<u8>> {
self.0.get(key).expect("couldn't read from RocksDB via transaction") if self.dirtied_keys.contains(key.as_ref()) {
return self.txn.get(key).expect("couldn't read from RocksDB via transaction");
}
self.snapshot.get(key).expect("couldn't read from RocksDB via snapshot")
} }
} }
impl<T: ThreadMode> DbTxn for Transaction<'_, T> { impl<T: ThreadMode> DbTxn for Transaction<'_, T> {
fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) { fn put(&mut self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) {
self.0.put(key, value).expect("couldn't write to RocksDB via transaction") self.dirtied_keys.insert(key.as_ref().to_vec());
self.txn.put(key, value).expect("couldn't write to RocksDB via transaction")
} }
fn del(&mut self, key: impl AsRef<[u8]>) { fn del(&mut self, key: impl AsRef<[u8]>) {
self.0.delete(key).expect("couldn't delete from RocksDB via transaction") self.dirtied_keys.insert(key.as_ref().to_vec());
self.txn.delete(key).expect("couldn't delete from RocksDB via transaction")
} }
fn commit(self) { fn commit(self) {
self.0.commit().expect("couldn't commit to RocksDB via transaction"); self.txn.commit().expect("couldn't commit to RocksDB via transaction");
self.1.flush_wal(true).expect("couldn't flush RocksDB WAL"); self.db.flush_wal(true).expect("couldn't flush RocksDB WAL");
self.1.flush().expect("couldn't flush RocksDB"); self.db.flush().expect("couldn't flush RocksDB");
} }
} }
@@ -41,7 +48,12 @@ impl<T: Send + ThreadMode + 'static> Db for Arc<OptimisticTransactionDB<T>> {
fn txn(&mut self) -> Self::Transaction<'_> { fn txn(&mut self) -> Self::Transaction<'_> {
let mut opts = WriteOptions::default(); let mut opts = WriteOptions::default();
opts.set_sync(true); opts.set_sync(true);
Transaction(self.transaction_opt(&opts, &Default::default()), &**self) Transaction {
dirtied_keys: HashSet::new(),
txn: self.transaction_opt(&opts, &Default::default()),
snapshot: self.snapshot(),
db: &**self,
}
} }
} }

View File

@@ -51,7 +51,7 @@ env_logger = { version = "0.10", default-features = false, features = ["humantim
futures-util = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] } libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "gossipsub", "macros"] }
[dev-dependencies] [dev-dependencies]
tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] } tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] }

View File

@@ -22,7 +22,7 @@ use serai_db::{Get, DbTxn, Db, create_db};
use processor_messages::coordinator::cosign_block_msg; use processor_messages::coordinator::cosign_block_msg;
use crate::{ use crate::{
p2p::{CosignedBlock, GossipMessageKind, P2p}, p2p::{CosignedBlock, P2pMessageKind, P2p},
substrate::LatestCosignedBlock, substrate::LatestCosignedBlock,
}; };
@@ -323,7 +323,7 @@ impl<D: Db> CosignEvaluator<D> {
for cosign in cosigns { for cosign in cosigns {
let mut buf = vec![]; let mut buf = vec![];
cosign.serialize(&mut buf).unwrap(); cosign.serialize(&mut buf).unwrap();
P2p::broadcast(&p2p, GossipMessageKind::CosignedBlock, buf).await; P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, buf).await;
} }
sleep(Duration::from_secs(60)).await; sleep(Duration::from_secs(60)).await;
} }

View File

@@ -260,7 +260,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
cosign_channel.send(cosigned_block).unwrap(); cosign_channel.send(cosigned_block).unwrap();
let mut buf = vec![]; let mut buf = vec![];
cosigned_block.serialize(&mut buf).unwrap(); cosigned_block.serialize(&mut buf).unwrap();
P2p::broadcast(p2p, GossipMessageKind::CosignedBlock, buf).await; P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, buf).await;
None None
} }
// This causes an action on Substrate yet not on any Tributary // This causes an action on Substrate yet not on any Tributary

View File

@@ -1,8 +1,8 @@
use core::{time::Duration, fmt}; use core::{time::Duration, fmt};
use std::{ use std::{
sync::Arc, sync::Arc,
io::{self, Read}, io::Read,
collections::{HashSet, HashMap}, collections::HashMap,
time::{SystemTime, Instant}, time::{SystemTime, Instant},
}; };
@@ -15,7 +15,7 @@ use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorS
use serai_db::Db; use serai_db::Db;
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt}; use futures_util::StreamExt;
use tokio::{ use tokio::{
sync::{Mutex, RwLock, mpsc, broadcast}, sync::{Mutex, RwLock, mpsc, broadcast},
time::sleep, time::sleep,
@@ -27,16 +27,12 @@ use libp2p::{
PeerId, PeerId,
tcp::Config as TcpConfig, tcp::Config as TcpConfig,
noise, yamux, noise, yamux,
request_response::{
Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig,
Behaviour as RrBehavior,
},
gossipsub::{ gossipsub::{
IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError, IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError,
Behaviour as GsBehavior, Behaviour as GsBehavior,
}, },
swarm::{NetworkBehaviour, SwarmEvent}, swarm::{NetworkBehaviour, SwarmEvent, Swarm},
SwarmBuilder, SwarmBuilder,
}; };
@@ -44,8 +40,6 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p};
use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
// Block size limit + 1 KB of space for signatures/metadata
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
const LIBP2P_TOPIC: &str = "serai-coordinator"; const LIBP2P_TOPIC: &str = "serai-coordinator";
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
@@ -57,112 +51,71 @@ pub struct CosignedBlock {
} }
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum ReqResMessageKind { pub enum P2pMessageKind {
KeepAlive, KeepAlive,
Tributary([u8; 32]),
Heartbeat([u8; 32]), Heartbeat([u8; 32]),
Block([u8; 32]), Block([u8; 32]),
}
impl ReqResMessageKind {
pub fn read<R: Read>(reader: &mut R) -> Option<ReqResMessageKind> {
let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some(ReqResMessageKind::KeepAlive),
1 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
ReqResMessageKind::Heartbeat(genesis)
}),
2 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
ReqResMessageKind::Block(genesis)
}),
_ => None,
}
}
pub fn serialize(&self) -> Vec<u8> {
match self {
ReqResMessageKind::KeepAlive => vec![0],
ReqResMessageKind::Heartbeat(genesis) => {
let mut res = vec![1];
res.extend(genesis);
res
}
ReqResMessageKind::Block(genesis) => {
let mut res = vec![2];
res.extend(genesis);
res
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum GossipMessageKind {
Tributary([u8; 32]),
CosignedBlock, CosignedBlock,
} }
impl GossipMessageKind {
pub fn read<R: Read>(reader: &mut R) -> Option<GossipMessageKind> {
let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
GossipMessageKind::Tributary(genesis)
}),
1 => Some(GossipMessageKind::CosignedBlock),
_ => None,
}
}
pub fn serialize(&self) -> Vec<u8> {
match self {
GossipMessageKind::Tributary(genesis) => {
let mut res = vec![0];
res.extend(genesis);
res
}
GossipMessageKind::CosignedBlock => {
vec![1]
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind {
ReqRes(ReqResMessageKind),
Gossip(GossipMessageKind),
}
impl P2pMessageKind { impl P2pMessageKind {
fn genesis(&self) -> Option<[u8; 32]> { fn genesis(&self) -> Option<[u8; 32]> {
match self { match self {
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) | P2pMessageKind::KeepAlive | P2pMessageKind::CosignedBlock => None,
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => None, P2pMessageKind::Tributary(genesis) |
P2pMessageKind::ReqRes( P2pMessageKind::Heartbeat(genesis) |
ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis), P2pMessageKind::Block(genesis) => Some(*genesis),
) |
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => Some(*genesis),
} }
} }
}
impl From<ReqResMessageKind> for P2pMessageKind { fn serialize(&self) -> Vec<u8> {
fn from(kind: ReqResMessageKind) -> P2pMessageKind { match self {
P2pMessageKind::ReqRes(kind) P2pMessageKind::KeepAlive => vec![0],
P2pMessageKind::Tributary(genesis) => {
let mut res = vec![1];
res.extend(genesis);
res
}
P2pMessageKind::Heartbeat(genesis) => {
let mut res = vec![2];
res.extend(genesis);
res
}
P2pMessageKind::Block(genesis) => {
let mut res = vec![3];
res.extend(genesis);
res
}
P2pMessageKind::CosignedBlock => {
vec![4]
}
}
} }
}
impl From<GossipMessageKind> for P2pMessageKind { fn read<R: Read>(reader: &mut R) -> Option<P2pMessageKind> {
fn from(kind: GossipMessageKind) -> P2pMessageKind { let mut kind = [0; 1];
P2pMessageKind::Gossip(kind) reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some(P2pMessageKind::KeepAlive),
1 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Tributary(genesis)
}),
2 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Heartbeat(genesis)
}),
3 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Block(genesis)
}),
4 => Some(P2pMessageKind::CosignedBlock),
_ => None,
}
} }
} }
@@ -180,21 +133,17 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
async fn subscribe(&self, set: ValidatorSet, genesis: [u8; 32]); async fn subscribe(&self, set: ValidatorSet, genesis: [u8; 32]);
async fn unsubscribe(&self, set: ValidatorSet, genesis: [u8; 32]); async fn unsubscribe(&self, set: ValidatorSet, genesis: [u8; 32]);
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>); async fn send_raw(&self, to: Self::Id, genesis: Option<[u8; 32]>, msg: Vec<u8>);
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>); async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec<u8>);
async fn receive(&self) -> Message<Self>; async fn receive_raw(&self) -> (Self::Id, Vec<u8>);
async fn send(&self, to: Self::Id, kind: ReqResMessageKind, msg: Vec<u8>) { async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec<u8>) {
let mut actual_msg = kind.serialize(); let mut actual_msg = kind.serialize();
actual_msg.extend(msg); actual_msg.extend(msg);
self.send_raw(to, actual_msg).await; self.send_raw(to, kind.genesis(), actual_msg).await;
} }
async fn broadcast(&self, kind: impl Send + Into<P2pMessageKind>, msg: Vec<u8>) { async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>) {
let kind = kind.into(); let mut actual_msg = kind.serialize();
let mut actual_msg = match kind {
P2pMessageKind::ReqRes(kind) => kind.serialize(),
P2pMessageKind::Gossip(kind) => kind.serialize(),
};
actual_msg.extend(msg); actual_msg.extend(msg);
/* /*
log::trace!( log::trace!(
@@ -208,70 +157,41 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
} }
); );
*/ */
self.broadcast_raw(kind, actual_msg).await; self.broadcast_raw(kind.genesis(), actual_msg).await;
}
async fn receive(&self) -> Message<Self> {
let (sender, kind, msg) = loop {
let (sender, msg) = self.receive_raw().await;
if msg.is_empty() {
log::error!("empty p2p message from {sender:?}");
continue;
} }
}
#[derive(Default, Clone, Copy, PartialEq, Eq, Debug)] let mut msg_ref = msg.as_ref();
struct RrCodec; let Some(kind) = P2pMessageKind::read::<&[u8]>(&mut msg_ref) else {
#[async_trait] log::error!("invalid p2p message kind from {sender:?}");
impl RrCodecTrait for RrCodec { continue;
type Protocol = &'static str; };
type Request = Vec<u8>; break (sender, kind, msg_ref.to_vec());
type Response = Vec<u8>; };
/*
async fn read_request<R: Send + Unpin + AsyncRead>( log::trace!(
&mut self, "received p2p message (kind {})",
_: &Self::Protocol, match kind {
io: &mut R, P2pMessageKind::KeepAlive => "KeepAlive".to_string(),
) -> io::Result<Vec<u8>> { P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
let mut len = [0; 4]; P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
io.read_exact(&mut len).await?; P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
let len = usize::try_from(u32::from_le_bytes(len)).expect("not a 32-bit platform?"); P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(),
if len > MAX_LIBP2P_MESSAGE_SIZE {
Err(io::Error::other("request length exceeded MAX_LIBP2P_MESSAGE_SIZE"))?;
} }
// This may be a non-trivial allocation easily causable );
// While we could chunk the read, meaning we only perform the allocation as bandwidth is used, */
// the max message size should be sufficiently sane Message { sender, kind, msg }
let mut buf = vec![0; len];
io.read_exact(&mut buf).await?;
Ok(buf)
}
async fn read_response<R: Send + Unpin + AsyncRead>(
&mut self,
proto: &Self::Protocol,
io: &mut R,
) -> io::Result<Vec<u8>> {
self.read_request(proto, io).await
}
async fn write_request<W: Send + Unpin + AsyncWrite>(
&mut self,
_: &Self::Protocol,
io: &mut W,
req: Vec<u8>,
) -> io::Result<()> {
io.write_all(
&u32::try_from(req.len())
.map_err(|_| io::Error::other("request length exceeded 2**32"))?
.to_le_bytes(),
)
.await?;
io.write_all(&req).await
}
async fn write_response<W: Send + Unpin + AsyncWrite>(
&mut self,
proto: &Self::Protocol,
io: &mut W,
res: Vec<u8>,
) -> io::Result<()> {
self.write_request(proto, io, res).await
} }
} }
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
struct Behavior { struct Behavior {
reqres: RrBehavior<RrCodec>,
gossipsub: GsBehavior, gossipsub: GsBehavior,
} }
@@ -279,9 +199,8 @@ struct Behavior {
#[derive(Clone)] #[derive(Clone)]
pub struct LibP2p { pub struct LibP2p {
subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, ValidatorSet, [u8; 32])>>>, subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, ValidatorSet, [u8; 32])>>>,
send: Arc<Mutex<mpsc::UnboundedSender<(PeerId, Vec<u8>)>>>, broadcast: Arc<Mutex<mpsc::UnboundedSender<(Option<[u8; 32]>, Vec<u8>)>>>,
broadcast: Arc<Mutex<mpsc::UnboundedSender<(P2pMessageKind, Vec<u8>)>>>, receive: Arc<Mutex<mpsc::UnboundedReceiver<(PeerId, Vec<u8>)>>>,
receive: Arc<Mutex<mpsc::UnboundedReceiver<Message<Self>>>>,
} }
impl fmt::Debug for LibP2p { impl fmt::Debug for LibP2p {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -292,12 +211,14 @@ impl fmt::Debug for LibP2p {
impl LibP2p { impl LibP2p {
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new(serai: Arc<Serai>) -> Self { pub fn new(serai: Arc<Serai>) -> Self {
// Block size limit + 1 KB of space for signatures/metadata
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
log::info!("creating a libp2p instance"); log::info!("creating a libp2p instance");
let throwaway_key_pair = Keypair::generate_ed25519(); let throwaway_key_pair = Keypair::generate_ed25519();
let behavior = Behavior { let behavior = Behavior {
reqres: { RrBehavior::new([], RrConfig::default()) },
gossipsub: { gossipsub: {
let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2; let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2;
let heartbeats_per_block = let heartbeats_per_block =
@@ -358,10 +279,9 @@ impl LibP2p {
.with_behaviour(|_| behavior) .with_behaviour(|_| behavior)
.unwrap() .unwrap()
.build(); .build();
const PORT: u16 = 30564; // 5132 ^ (('c' << 8) | 'o') const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap(); swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
let (send_send, mut send_recv) = mpsc::unbounded_channel();
let (broadcast_send, mut broadcast_recv) = mpsc::unbounded_channel(); let (broadcast_send, mut broadcast_recv) = mpsc::unbounded_channel();
let (receive_send, receive_recv) = mpsc::unbounded_channel(); let (receive_send, receive_recv) = mpsc::unbounded_channel();
let (subscribe_send, mut subscribe_recv) = mpsc::unbounded_channel(); let (subscribe_send, mut subscribe_recv) = mpsc::unbounded_channel();
@@ -370,31 +290,17 @@ impl LibP2p {
IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode()))) IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode())))
} }
// TODO: If a network has less than TARGET_PEERS, this will cause retries ad infinitum
const TARGET_PEERS: usize = 5;
// The addrs we're currently dialing, and the networks associated with them
let dialing_peers = Arc::new(RwLock::new(HashMap::new()));
// The peers we're currently connected to, and the networks associated with them
let connected_peers = Arc::new(RwLock::new(HashMap::<Multiaddr, HashSet<NetworkId>>::new()));
// Find and connect to peers // Find and connect to peers
let (connect_to_network_send, mut connect_to_network_recv) = let (pending_p2p_connections_send, mut pending_p2p_connections_recv) =
tokio::sync::mpsc::unbounded_channel(); tokio::sync::mpsc::unbounded_channel();
let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel(); let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn({ tokio::spawn({
let dialing_peers = dialing_peers.clone(); let pending_p2p_connections_send = pending_p2p_connections_send.clone();
let connected_peers = connected_peers.clone();
let connect_to_network_send = connect_to_network_send.clone();
async move { async move {
loop { loop {
let connect = |network: NetworkId, addr: Multiaddr| { // TODO: Add better peer management logic?
let dialing_peers = dialing_peers.clone(); {
let connected_peers = connected_peers.clone(); let connect = |addr: Multiaddr| {
let to_dial_send = to_dial_send.clone();
let connect_to_network_send = connect_to_network_send.clone();
async move {
log::info!("found peer from substrate: {addr}"); log::info!("found peer from substrate: {addr}");
let protocols = addr.iter().filter_map(|piece| match piece { let protocols = addr.iter().filter_map(|piece| match piece {
@@ -412,99 +318,46 @@ impl LibP2p {
let addr = new_addr; let addr = new_addr;
log::debug!("transformed found peer: {addr}"); log::debug!("transformed found peer: {addr}");
let (is_fresh_dial, nets) = { // TODO: Check this isn't a duplicate
let mut dialing_peers = dialing_peers.write().await; to_dial_send.send(addr).unwrap();
let is_fresh_dial = !dialing_peers.contains_key(&addr);
if is_fresh_dial {
dialing_peers.insert(addr.clone(), HashSet::new());
}
// Associate this network with this peer
dialing_peers.get_mut(&addr).unwrap().insert(network);
let nets = dialing_peers.get(&addr).unwrap().clone();
(is_fresh_dial, nets)
};
// Spawn a task to remove this peer from 'dialing' in sixty seconds, in case dialing
// fails
// This performs cleanup and bounds the size of the map to whatever growth occurs
// within a temporal window
tokio::spawn({
let dialing_peers = dialing_peers.clone();
let connected_peers = connected_peers.clone();
let connect_to_network_send = connect_to_network_send.clone();
let addr = addr.clone();
async move {
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
let mut dialing_peers = dialing_peers.write().await;
if let Some(expected_nets) = dialing_peers.remove(&addr) {
log::debug!("removed addr from dialing upon timeout: {addr}");
// TODO: De-duplicate this below instance
// If we failed to dial and haven't gotten enough actual connections, retry
let connected_peers = connected_peers.read().await;
for net in expected_nets {
let mut remaining_peers = 0;
for nets in connected_peers.values() {
if nets.contains(&net) {
remaining_peers += 1;
}
}
// If we do not, start connecting to this network again
if remaining_peers < TARGET_PEERS {
connect_to_network_send.send(net).expect(
"couldn't send net to connect to due to disconnects (receiver dropped?)",
);
}
}
}
}
});
if is_fresh_dial {
to_dial_send.send((addr, nets)).unwrap();
}
}
}; };
// TODO: We should also connect to random peers from random nets as needed for // TODO: We should also connect to random peers from random nets as needed for
// cosigning // cosigning
let mut to_retry = vec![];
// Drain the chainnel, de-duplicating any networks in it while let Some(network) = pending_p2p_connections_recv.recv().await {
let mut connect_to_network_networks = HashSet::new();
while let Ok(network) = connect_to_network_recv.try_recv() {
connect_to_network_networks.insert(network);
}
for network in connect_to_network_networks {
if let Ok(mut nodes) = serai.p2p_validators(network).await { if let Ok(mut nodes) = serai.p2p_validators(network).await {
// If there's an insufficient amount of nodes known, connect to all yet add it // If there's an insufficient amount of nodes known, connect to all yet add it
// back and break // back and break
if nodes.len() < TARGET_PEERS { if nodes.len() < 3 {
log::warn!( log::warn!(
"insufficient amount of P2P nodes known for {:?}: {}", "insufficient amount of P2P nodes known for {:?}: {}",
network, network,
nodes.len() nodes.len()
); );
// Retry this later to_retry.push(network);
connect_to_network_send.send(network).unwrap();
for node in nodes { for node in nodes {
connect(network, node).await; connect(node);
} }
continue; continue;
} }
// Randomly select up to 150% of the TARGET_PEERS // Randomly select up to 5
for _ in 0 .. ((3 * TARGET_PEERS) / 2) { for _ in 0 .. 5 {
if !nodes.is_empty() { if !nodes.is_empty() {
let to_connect = nodes.swap_remove( let to_connect = nodes.swap_remove(
usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap()) usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap())
.unwrap(), .unwrap(),
); );
connect(network, to_connect).await; connect(to_connect);
} }
} }
} }
} }
for to_retry in to_retry {
pending_p2p_connections_send.send(to_retry).unwrap();
}
}
// Sleep 60 seconds before moving to the next iteration // Sleep 60 seconds before moving to the next iteration
tokio::time::sleep(core::time::Duration::from_secs(60)).await; tokio::time::sleep(core::time::Duration::from_secs(60)).await;
} }
@@ -515,10 +368,35 @@ impl LibP2p {
tokio::spawn({ tokio::spawn({
let mut time_of_last_p2p_message = Instant::now(); let mut time_of_last_p2p_message = Instant::now();
async move { #[allow(clippy::needless_pass_by_ref_mut)] // False positive
let connected_peers = connected_peers.clone(); fn broadcast_raw(
p2p: &mut Swarm<Behavior>,
time_of_last_p2p_message: &mut Instant,
set: Option<ValidatorSet>,
msg: Vec<u8>,
) {
// Update the time of last message
*time_of_last_p2p_message = Instant::now();
let topic =
if let Some(set) = set { topic_for_set(set) } else { IdentTopic::new(LIBP2P_TOPIC) };
match p2p.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
Err(PublishError::InsufficientPeers) => {
log::warn!("failed to send p2p message due to insufficient peers")
}
Err(PublishError::MessageTooLarge) => {
panic!("tried to send a too large message: {}", hex::encode(msg))
}
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
Err(PublishError::Duplicate) | Ok(_) => {}
}
}
async move {
let mut set_for_genesis = HashMap::new(); let mut set_for_genesis = HashMap::new();
let mut connected_peers = 0;
loop { loop {
let time_since_last = Instant::now().duration_since(time_of_last_p2p_message); let time_since_last = Instant::now().duration_since(time_of_last_p2p_message);
tokio::select! { tokio::select! {
@@ -531,7 +409,7 @@ impl LibP2p {
let topic = topic_for_set(set); let topic = topic_for_set(set);
if subscribe { if subscribe {
log::info!("subscribing to p2p messages for {set:?}"); log::info!("subscribing to p2p messages for {set:?}");
connect_to_network_send.send(set.network).unwrap(); pending_p2p_connections_send.send(set.network).unwrap();
set_for_genesis.insert(genesis, set); set_for_genesis.insert(genesis, set);
swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap(); swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap();
} else { } else {
@@ -541,50 +419,17 @@ impl LibP2p {
} }
} }
msg = send_recv.recv() => {
let (peer, msg): (PeerId, Vec<u8>) =
msg.expect("send_recv closed. are we shutting down?");
swarm.behaviour_mut().reqres.send_request(&peer, msg);
},
// Handle any queued outbound messages // Handle any queued outbound messages
msg = broadcast_recv.recv() => { msg = broadcast_recv.recv() => {
// Update the time of last message let (genesis, msg): (Option<[u8; 32]>, Vec<u8>) =
time_of_last_p2p_message = Instant::now();
let (kind, msg): (P2pMessageKind, Vec<u8>) =
msg.expect("broadcast_recv closed. are we shutting down?"); msg.expect("broadcast_recv closed. are we shutting down?");
let set = genesis.and_then(|genesis| set_for_genesis.get(&genesis).copied());
if matches!(kind, P2pMessageKind::ReqRes(_)) { broadcast_raw(
// Use request/response, yet send to all connected peers &mut swarm,
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() { &mut time_of_last_p2p_message,
swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone()); set,
} msg,
} else { );
// Use gossipsub
let set =
kind.genesis().and_then(|genesis| set_for_genesis.get(&genesis).copied());
let topic = if let Some(set) = set {
topic_for_set(set)
} else {
IdentTopic::new(LIBP2P_TOPIC)
};
match swarm.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
Err(PublishError::SigningError(e)) => {
panic!("signing error when broadcasting: {e}")
},
Err(PublishError::InsufficientPeers) => {
log::warn!("failed to send p2p message due to insufficient peers")
}
Err(PublishError::MessageTooLarge) => {
panic!("tried to send a too large message: {}", hex::encode(msg))
}
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
Err(PublishError::Duplicate) | Ok(_) => {}
}
}
} }
// Handle new incoming messages // Handle new incoming messages
@@ -593,119 +438,42 @@ impl LibP2p {
Some(SwarmEvent::Dialing { connection_id, .. }) => { Some(SwarmEvent::Dialing { connection_id, .. }) => {
log::debug!("dialing to peer in connection ID {}", &connection_id); log::debug!("dialing to peer in connection ID {}", &connection_id);
} }
Some(SwarmEvent::ConnectionEstablished { Some(SwarmEvent::ConnectionEstablished { peer_id, connection_id, .. }) => {
peer_id,
connection_id,
endpoint,
..
}) => {
if &peer_id == swarm.local_peer_id() { if &peer_id == swarm.local_peer_id() {
log::warn!("established a libp2p connection to ourselves"); log::warn!("established a libp2p connection to ourselves");
swarm.close_connection(connection_id); swarm.close_connection(connection_id);
continue; continue;
} }
let addr = endpoint.get_remote_address(); connected_peers += 1;
let nets = {
let mut dialing_peers = dialing_peers.write().await;
if let Some(nets) = dialing_peers.remove(addr) {
nets
} else {
log::debug!("connected to a peer who we didn't have within dialing");
HashSet::new()
}
};
{
let mut connected_peers = connected_peers.write().await;
connected_peers.insert(addr.clone(), nets);
log::debug!( log::debug!(
"connection established to peer {} in connection ID {}, connected peers: {}", "connection established to peer {} in connection ID {}, connected peers: {}",
&peer_id, &peer_id,
&connection_id, &connection_id,
connected_peers.len(), connected_peers,
); );
} }
} Some(SwarmEvent::ConnectionClosed { peer_id, .. }) => {
Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, .. }) => { connected_peers -= 1;
let mut connected_peers = connected_peers.write().await;
let Some(nets) = connected_peers.remove(endpoint.get_remote_address()) else {
log::debug!("closed connection to peer which wasn't in connected_peers");
continue;
};
// Downgrade to a read lock
let connected_peers = connected_peers.downgrade();
// For each net we lost a peer for, check if we still have sufficient peers
// overall
for net in nets {
let mut remaining_peers = 0;
for nets in connected_peers.values() {
if nets.contains(&net) {
remaining_peers += 1;
}
}
// If we do not, start connecting to this network again
if remaining_peers < TARGET_PEERS {
connect_to_network_send
.send(net)
.expect(
"couldn't send net to connect to due to disconnects (receiver dropped?)"
);
}
}
log::debug!( log::debug!(
"connection with peer {peer_id} closed, connected peers: {}", "connection with peer {peer_id} closed, connected peers: {}",
connected_peers.len(), connected_peers,
); );
} }
Some(SwarmEvent::Behaviour(BehaviorEvent::Reqres(
RrEvent::Message { peer, message },
))) => {
let message = match message {
RrMessage::Request { request, .. } => request,
RrMessage::Response { response, .. } => response,
};
let mut msg_ref = message.as_slice();
let Some(kind) = ReqResMessageKind::read(&mut msg_ref) else { continue };
let message = Message {
sender: peer,
kind: P2pMessageKind::ReqRes(kind),
msg: msg_ref.to_vec(),
};
receive_send.send(message).expect("receive_send closed. are we shutting down?");
}
Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
GsEvent::Message { propagation_source, message, .. }, GsEvent::Message { propagation_source, message, .. },
))) => { ))) => {
let mut msg_ref = message.data.as_slice(); receive_send
let Some(kind) = GossipMessageKind::read(&mut msg_ref) else { continue }; .send((propagation_source, message.data))
let message = Message { .expect("receive_send closed. are we shutting down?");
sender: propagation_source,
kind: P2pMessageKind::Gossip(kind),
msg: msg_ref.to_vec(),
};
receive_send.send(message).expect("receive_send closed. are we shutting down?");
} }
_ => {} _ => {}
} }
} }
// Handle peers to dial // Handle peers to dial
addr_and_nets = to_dial_recv.recv() => { addr = to_dial_recv.recv() => {
let (addr, nets) = let addr = addr.expect("received address was None (sender dropped?)");
addr_and_nets.expect("received address was None (sender dropped?)");
// If we've already dialed and connected to this address, don't further dial them
// Just associate these networks with them
if let Some(existing_nets) = connected_peers.write().await.get_mut(&addr) {
for net in nets {
existing_nets.insert(net);
}
continue;
}
if let Err(e) = swarm.dial(addr) { if let Err(e) = swarm.dial(addr) {
log::warn!("dialing peer failed: {e:?}"); log::warn!("dialing peer failed: {e:?}");
} }
@@ -719,13 +487,12 @@ impl LibP2p {
// (where a finalized block only occurs due to network activity), meaning this won't be // (where a finalized block only occurs due to network activity), meaning this won't be
// run // run
() = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => { () = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => {
time_of_last_p2p_message = Instant::now(); broadcast_raw(
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() { &mut swarm,
swarm &mut time_of_last_p2p_message,
.behaviour_mut() None,
.reqres P2pMessageKind::KeepAlive.serialize()
.send_request(&peer_id, ReqResMessageKind::KeepAlive.serialize()); );
}
} }
} }
} }
@@ -734,7 +501,6 @@ impl LibP2p {
LibP2p { LibP2p {
subscribe: Arc::new(Mutex::new(subscribe_send)), subscribe: Arc::new(Mutex::new(subscribe_send)),
send: Arc::new(Mutex::new(send_send)),
broadcast: Arc::new(Mutex::new(broadcast_send)), broadcast: Arc::new(Mutex::new(broadcast_send)),
receive: Arc::new(Mutex::new(receive_recv)), receive: Arc::new(Mutex::new(receive_recv)),
} }
@@ -763,22 +529,22 @@ impl P2p for LibP2p {
.expect("subscribe_send closed. are we shutting down?"); .expect("subscribe_send closed. are we shutting down?");
} }
async fn send_raw(&self, peer: Self::Id, msg: Vec<u8>) { async fn send_raw(&self, _: Self::Id, genesis: Option<[u8; 32]>, msg: Vec<u8>) {
self.send.lock().await.send((peer, msg)).expect("send_send closed. are we shutting down?"); self.broadcast_raw(genesis, msg).await;
} }
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>) { async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec<u8>) {
self self
.broadcast .broadcast
.lock() .lock()
.await .await
.send((kind, msg)) .send((genesis, msg))
.expect("broadcast_send closed. are we shutting down?"); .expect("broadcast_send closed. are we shutting down?");
} }
// TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant // TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant
// lock acquisition? // lock acquisition?
async fn receive(&self) -> Message<Self> { async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?") self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?")
} }
} }
@@ -786,7 +552,7 @@ impl P2p for LibP2p {
#[async_trait] #[async_trait]
impl TributaryP2p for LibP2p { impl TributaryP2p for LibP2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) { async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
<Self as P2p>::broadcast(self, GossipMessageKind::Tributary(genesis), msg).await <Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
} }
} }
@@ -824,12 +590,16 @@ pub async fn heartbeat_tributaries_task<D: Db, P: P2p>(
if SystemTime::now() > (block_time + Duration::from_secs(60)) { if SystemTime::now() > (block_time + Duration::from_secs(60)) {
log::warn!("last known tributary block was over a minute ago"); log::warn!("last known tributary block was over a minute ago");
let mut msg = tip.to_vec(); let mut msg = tip.to_vec();
let time: u64 = SystemTime::now() // Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH) .duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock is wrong") .expect("system clock is wrong")
.as_secs(); .as_secs();
msg.extend(time.to_le_bytes()); // Divide by the block time so if multiple parties send a Heartbeat, they're more likely to
P2p::broadcast(&p2p, ReqResMessageKind::Heartbeat(tributary.genesis()), msg).await; // overlap
let time_unit = timestamp / u64::from(Tributary::<D, Transaction, P>::block_time());
msg.extend(time_unit.to_le_bytes());
P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await;
} }
} }
@@ -861,8 +631,6 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
// Subscribe to the topic for this tributary // Subscribe to the topic for this tributary
p2p.subscribe(tributary.spec.set(), genesis).await; p2p.subscribe(tributary.spec.set(), genesis).await;
let spec_set = tributary.spec.set();
// Per-Tributary P2P message handler // Per-Tributary P2P message handler
tokio::spawn({ tokio::spawn({
let p2p = p2p.clone(); let p2p = p2p.clone();
@@ -873,58 +641,91 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
break; break;
}; };
match msg.kind { match msg.kind {
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {} P2pMessageKind::KeepAlive => {}
// TODO: Slash on Heartbeat which justifies a response, since the node P2pMessageKind::Tributary(msg_genesis) => {
assert_eq!(msg_genesis, genesis);
log::trace!("handling message for tributary {:?}", tributary.spec.set());
if tributary.tributary.handle_message(&msg.msg).await {
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
}
}
// TODO2: Rate limit this per timestamp
// And/or slash on Heartbeat which justifies a response, since the node
// obviously was offline and we must now use our bandwidth to compensate for // obviously was offline and we must now use our bandwidth to compensate for
// them? // them?
P2pMessageKind::ReqRes(ReqResMessageKind::Heartbeat(msg_genesis)) => { P2pMessageKind::Heartbeat(msg_genesis) => {
assert_eq!(msg_genesis, genesis); assert_eq!(msg_genesis, genesis);
if msg.msg.len() != 40 { if msg.msg.len() != 40 {
log::error!("validator sent invalid heartbeat"); log::error!("validator sent invalid heartbeat");
continue; continue;
} }
// Only respond to recent heartbeats
let msg_time = u64::from_le_bytes(msg.msg[32 .. 40].try_into().expect(
"length-checked heartbeat message didn't have 8 bytes for the u64",
));
if SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock is wrong")
.as_secs()
.saturating_sub(msg_time) >
10
{
continue;
}
log::debug!("received heartbeat with a recent timestamp");
let reader = tributary.tributary.reader();
let p2p = p2p.clone(); let p2p = p2p.clone();
let spec = tributary.spec.clone();
let reader = tributary.tributary.reader();
// Spawn a dedicated task as this may require loading large amounts of data // Spawn a dedicated task as this may require loading large amounts of data
// from disk and take a notable amount of time // from disk and take a notable amount of time
tokio::spawn(async move { tokio::spawn(async move {
let mut latest = msg.msg[.. 32].try_into().unwrap(); /*
let mut to_send = vec![]; // Have sqrt(n) nodes reply with the blocks
while let Some(next) = reader.block_after(&latest) { let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64;
to_send.push(next); // Try to have at least 3 responders
latest = next; if responders < 3 {
responders = tributary.spec.n().min(3).into();
} }
if to_send.len() > 3 { */
for next in to_send {
/*
// Have up to three nodes respond
let responders = u64::from(spec.n().min(3));
// Decide which nodes will respond by using the latest block's hash as a
// mutually agreed upon entropy source
// This isn't a secure source of entropy, yet it's fine for this
let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap());
// If n = 10, responders = 3, we want `start` to be 0 ..= 7
// (so the highest is 7, 8, 9)
// entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7
let start =
usize::try_from(entropy % (u64::from(spec.n() + 1) - responders))
.unwrap();
let mut selected = false;
for validator in &spec.validators()
[start .. (start + usize::try_from(responders).unwrap())]
{
if our_key == validator.0 {
selected = true;
break;
}
}
if !selected {
log::debug!("received heartbeat and not selected to respond");
return;
}
log::debug!("received heartbeat and selected to respond");
*/
// Have every node respond
// While we could only have a subset respond, LibP2P will sync all messages
// it isn't aware of
// It's cheaper to be aware from our disk than from over the network
// TODO: Spawn a dedicated topic for this heartbeat response?
let mut latest = msg.msg[.. 32].try_into().unwrap();
while let Some(next) = reader.block_after(&latest) {
let mut res = reader.block(&next).unwrap().serialize(); let mut res = reader.block(&next).unwrap().serialize();
res.extend(reader.commit(&next).unwrap()); res.extend(reader.commit(&next).unwrap());
// Also include the timestamp used within the Heartbeat // Also include the timestamp used within the Heartbeat
res.extend(&msg.msg[32 .. 40]); res.extend(&msg.msg[32 .. 40]);
p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await; p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await;
} latest = next;
} }
}); });
} }
P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => { P2pMessageKind::Block(msg_genesis) => {
assert_eq!(msg_genesis, genesis); assert_eq!(msg_genesis, genesis);
let mut msg_ref: &[u8] = msg.msg.as_ref(); let mut msg_ref: &[u8] = msg.msg.as_ref();
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else { let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
@@ -943,15 +744,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
); );
} }
P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => { P2pMessageKind::CosignedBlock => unreachable!(),
assert_eq!(msg_genesis, genesis);
log::trace!("handling message for tributary {:?}", spec_set);
if tributary.tributary.handle_message(&msg.msg).await {
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
}
}
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => unreachable!(),
} }
} }
} }
@@ -971,16 +764,15 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
loop { loop {
let msg = p2p.receive().await; let msg = p2p.receive().await;
match msg.kind { match msg.kind {
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {} P2pMessageKind::KeepAlive => {}
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) | P2pMessageKind::Tributary(genesis) |
P2pMessageKind::ReqRes( P2pMessageKind::Heartbeat(genesis) |
ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis), P2pMessageKind::Block(genesis) => {
) => {
if let Some(channel) = channels.read().await.get(&genesis) { if let Some(channel) = channels.read().await.get(&genesis) {
channel.send(msg).unwrap(); channel.send(msg).unwrap();
} }
} }
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => { P2pMessageKind::CosignedBlock => {
let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else { let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else {
log::error!("received CosignedBlock message with invalidly serialized contents"); log::error!("received CosignedBlock message with invalidly serialized contents");
continue; continue;

View File

@@ -14,7 +14,7 @@ use tokio::sync::RwLock;
use crate::{ use crate::{
processors::{Message, Processors}, processors::{Message, Processors},
TributaryP2p, ReqResMessageKind, GossipMessageKind, P2pMessageKind, Message as P2pMessage, P2p, TributaryP2p, P2pMessageKind, P2p,
}; };
pub mod tributary; pub mod tributary;
@@ -45,10 +45,7 @@ impl Processors for MemProcessors {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct LocalP2p( pub struct LocalP2p(usize, pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, Vec<u8>)>>)>>);
usize,
pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, P2pMessageKind, Vec<u8>)>>)>>,
);
impl LocalP2p { impl LocalP2p {
pub fn new(validators: usize) -> Vec<LocalP2p> { pub fn new(validators: usize) -> Vec<LocalP2p> {
@@ -68,13 +65,11 @@ impl P2p for LocalP2p {
async fn subscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {} async fn subscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {} async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) { async fn send_raw(&self, to: Self::Id, _genesis: Option<[u8; 32]>, msg: Vec<u8>) {
let mut msg_ref = msg.as_slice(); self.1.write().await.1[to].push_back((self.0, msg));
let kind = ReqResMessageKind::read(&mut msg_ref).unwrap();
self.1.write().await.1[to].push_back((self.0, P2pMessageKind::ReqRes(kind), msg_ref.to_vec()));
} }
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>) { async fn broadcast_raw(&self, _genesis: Option<[u8; 32]>, msg: Vec<u8>) {
// Content-based deduplication // Content-based deduplication
let mut lock = self.1.write().await; let mut lock = self.1.write().await;
{ {
@@ -86,26 +81,19 @@ impl P2p for LocalP2p {
} }
let queues = &mut lock.1; let queues = &mut lock.1;
let kind_len = (match kind {
P2pMessageKind::ReqRes(kind) => kind.serialize(),
P2pMessageKind::Gossip(kind) => kind.serialize(),
})
.len();
let msg = msg[kind_len ..].to_vec();
for (i, msg_queue) in queues.iter_mut().enumerate() { for (i, msg_queue) in queues.iter_mut().enumerate() {
if i == self.0 { if i == self.0 {
continue; continue;
} }
msg_queue.push_back((self.0, kind, msg.clone())); msg_queue.push_back((self.0, msg.clone()));
} }
} }
async fn receive(&self) -> P2pMessage<Self> { async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
// This is a cursed way to implement an async read from a Vec // This is a cursed way to implement an async read from a Vec
loop { loop {
if let Some((sender, kind, msg)) = self.1.write().await.1[self.0].pop_front() { if let Some(res) = self.1.write().await.1[self.0].pop_front() {
return P2pMessage { sender, kind, msg }; return res;
} }
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await;
} }
@@ -115,11 +103,6 @@ impl P2p for LocalP2p {
#[async_trait] #[async_trait]
impl TributaryP2p for LocalP2p { impl TributaryP2p for LocalP2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) { async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
<Self as P2p>::broadcast( <Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
self,
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)),
msg,
)
.await
} }
} }

View File

@@ -26,7 +26,7 @@ use serai_db::MemDb;
use tributary::Tributary; use tributary::Tributary;
use crate::{ use crate::{
GossipMessageKind, P2pMessageKind, P2p, P2pMessageKind, P2p,
tributary::{Transaction, TributarySpec}, tributary::{Transaction, TributarySpec},
tests::LocalP2p, tests::LocalP2p,
}; };
@@ -98,7 +98,7 @@ pub async fn run_tributaries(
for (p2p, tributary) in &mut tributaries { for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) { while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind { match msg.kind {
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => { P2pMessageKind::Tributary(genesis) => {
assert_eq!(genesis, tributary.genesis()); assert_eq!(genesis, tributary.genesis());
if tributary.handle_message(&msg.msg).await { if tributary.handle_message(&msg.msg).await {
p2p.broadcast(msg.kind, msg.msg).await; p2p.broadcast(msg.kind, msg.msg).await;
@@ -173,7 +173,7 @@ async fn tributary_test() {
for (p2p, tributary) in &mut tributaries { for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) { while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind { match msg.kind {
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => { P2pMessageKind::Tributary(genesis) => {
assert_eq!(genesis, tributary.genesis()); assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await; tributary.handle_message(&msg.msg).await;
} }
@@ -199,7 +199,7 @@ async fn tributary_test() {
for (p2p, tributary) in &mut tributaries { for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) { while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind { match msg.kind {
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => { P2pMessageKind::Tributary(genesis) => {
assert_eq!(genesis, tributary.genesis()); assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await; tributary.handle_message(&msg.msg).await;
} }

View File

@@ -116,8 +116,8 @@ async fn sync_test() {
.map_err(|_| "failed to send ActiveTributary to heartbeat") .map_err(|_| "failed to send ActiveTributary to heartbeat")
.unwrap(); .unwrap();
// The heartbeat is once every 10 blocks, with some limitations // The heartbeat is once every 10 blocks
sleep(Duration::from_secs(20 * block_time)).await; sleep(Duration::from_secs(10 * block_time)).await;
assert!(syncer_tributary.tip().await != spec.genesis()); assert!(syncer_tributary.tip().await != spec.genesis());
// Verify it synced to the tip // Verify it synced to the tip

View File

@@ -74,7 +74,7 @@ impl TributarySpec {
pub fn genesis(&self) -> [u8; 32] { pub fn genesis(&self) -> [u8; 32] {
// Calculate the genesis for this Tributary // Calculate the genesis for this Tributary
let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis Testnet 2.1"); let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis");
// This locks it to a specific Serai chain // This locks it to a specific Serai chain
genesis.append_message(b"serai_block", self.serai_block); genesis.append_message(b"serai_block", self.serai_block);
genesis.append_message(b"session", self.set.session.0.to_le_bytes()); genesis.append_message(b"session", self.set.session.0.to_le_bytes());

View File

@@ -59,7 +59,8 @@ pub const ACCOUNT_MEMPOOL_LIMIT: u32 = 50;
pub const BLOCK_SIZE_LIMIT: usize = 3_001_000; pub const BLOCK_SIZE_LIMIT: usize = 3_001_000;
pub(crate) const TENDERMINT_MESSAGE: u8 = 0; pub(crate) const TENDERMINT_MESSAGE: u8 = 0;
pub(crate) const TRANSACTION_MESSAGE: u8 = 2; // TODO: Normalize to 1 pub(crate) const BLOCK_MESSAGE: u8 = 1;
pub(crate) const TRANSACTION_MESSAGE: u8 = 2;
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq, Eq, Debug)]
@@ -335,6 +336,9 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
// Return true if the message should be rebroadcasted. // Return true if the message should be rebroadcasted.
pub async fn handle_message(&self, msg: &[u8]) -> bool { pub async fn handle_message(&self, msg: &[u8]) -> bool {
// Acquire the lock now to prevent sync_block from being run at the same time
let mut sync_block = self.synced_block_result.write().await;
match msg.first() { match msg.first() {
Some(&TRANSACTION_MESSAGE) => { Some(&TRANSACTION_MESSAGE) => {
let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else { let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else {
@@ -366,6 +370,19 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
false false
} }
Some(&BLOCK_MESSAGE) => {
let mut msg_ref = &msg[1 ..];
let Ok(block) = Block::<T>::read(&mut msg_ref) else {
log::error!("received invalid block message");
return false;
};
let commit = msg[(msg.len() - msg_ref.len()) ..].to_vec();
if self.sync_block_internal(block, commit, &mut sync_block).await {
log::debug!("synced block over p2p net instead of building the commit ourselves");
}
false
}
_ => false, _ => false,
} }
} }

View File

@@ -41,8 +41,9 @@ use tendermint::{
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::{ use crate::{
TENDERMINT_MESSAGE, TRANSACTION_MESSAGE, ReadWrite, transaction::Transaction as TransactionTrait, TENDERMINT_MESSAGE, TRANSACTION_MESSAGE, BLOCK_MESSAGE, ReadWrite,
Transaction, BlockHeader, Block, BlockError, Blockchain, P2p, transaction::Transaction as TransactionTrait, Transaction, BlockHeader, Block, BlockError,
Blockchain, P2p,
}; };
pub mod tx; pub mod tx;
@@ -413,7 +414,12 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
); );
match block_res { match block_res {
Ok(()) => { Ok(()) => {
// If we successfully added this block, break // If we successfully added this block, broadcast it
// TODO: Move this under the coordinator once we set up on new block notifications?
let mut msg = serialized_block.0;
msg.insert(0, BLOCK_MESSAGE);
msg.extend(encoded_commit);
self.p2p.broadcast(self.genesis, msg).await;
break; break;
} }
Err(BlockError::NonLocalProvided(hash)) => { Err(BlockError::NonLocalProvided(hash)) => {
@@ -422,7 +428,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
hex::encode(hash), hex::encode(hash),
hex::encode(self.genesis) hex::encode(self.genesis)
); );
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
} }
_ => return invalid_block(), _ => return invalid_block(),
} }

View File

@@ -313,16 +313,11 @@ impl<N: Network + 'static> TendermintMachine<N> {
let time_until_round_end = round_end.instant().saturating_duration_since(Instant::now()); let time_until_round_end = round_end.instant().saturating_duration_since(Instant::now());
if time_until_round_end == Duration::ZERO { if time_until_round_end == Duration::ZERO {
log::trace!( log::trace!(
target: "tendermint",
"resetting when prior round ended {}ms ago", "resetting when prior round ended {}ms ago",
Instant::now().saturating_duration_since(round_end.instant()).as_millis(), Instant::now().saturating_duration_since(round_end.instant()).as_millis(),
); );
} }
log::trace!( log::trace!("sleeping until round ends in {}ms", time_until_round_end.as_millis());
target: "tendermint",
"sleeping until round ends in {}ms",
time_until_round_end.as_millis(),
);
sleep(time_until_round_end).await; sleep(time_until_round_end).await;
// Clear our outbound message queue // Clear our outbound message queue
@@ -603,11 +598,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
); );
let id = block.id(); let id = block.id();
let proposal = self.network.add_block(block, commit).await; let proposal = self.network.add_block(block, commit).await;
log::trace!( log::trace!("added block {} (produced by machine)", hex::encode(id.as_ref()));
target: "tendermint",
"added block {} (produced by machine)",
hex::encode(id.as_ref()),
);
self.reset(msg.round, proposal).await; self.reset(msg.round, proposal).await;
} }
Err(TendermintError::Malicious(sender, evidence)) => { Err(TendermintError::Malicious(sender, evidence)) => {
@@ -701,12 +692,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
(msg.round == self.block.round().number) && (msg.round == self.block.round().number) &&
(msg.data.step() == Step::Propose) (msg.data.step() == Step::Propose)
{ {
log::trace!( log::trace!("received Propose for block {}, round {}", msg.block.0, msg.round.0);
target: "tendermint",
"received Propose for block {}, round {}",
msg.block.0,
msg.round.0,
);
} }
// If this is a precommit, verify its signature // If this is a precommit, verify its signature
@@ -724,13 +710,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
if !self.block.log.log(signed.clone())? { if !self.block.log.log(signed.clone())? {
return Err(TendermintError::AlreadyHandled); return Err(TendermintError::AlreadyHandled);
} }
log::debug!( log::debug!(target: "tendermint", "received new tendermint message");
target: "tendermint",
"received new tendermint message (block: {}, round: {}, step: {:?})",
msg.block.0,
msg.round.0,
msg.data.step(),
);
// All functions, except for the finalizer and the jump, are locked to the current round // All functions, except for the finalizer and the jump, are locked to the current round
@@ -777,13 +757,6 @@ impl<N: Network + 'static> TendermintMachine<N> {
// 55-56 // 55-56
// Jump, enabling processing by the below code // Jump, enabling processing by the below code
if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() { if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() {
log::debug!(
target: "tendermint",
"jumping from round {} to round {}",
self.block.round().number.0,
msg.round.0,
);
// Jump to the new round. // Jump to the new round.
let proposer = self.round(msg.round, None); let proposer = self.round(msg.round, None);
@@ -841,26 +814,13 @@ impl<N: Network + 'static> TendermintMachine<N> {
if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) { if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
let (participation, weight) = let (participation, weight) =
self.block.log.message_instances(self.block.round().number, &Data::Prevote(None)); self.block.log.message_instances(self.block.round().number, &Data::Prevote(None));
let threshold_weight = self.weights.threshold();
if participation < threshold_weight {
log::trace!(
target: "tendermint",
"progess towards setting prevote timeout, participation: {}, needed: {}",
participation,
threshold_weight,
);
}
// 34-35 // 34-35
if participation >= threshold_weight { if participation >= self.weights.threshold() {
log::trace!(
target: "tendermint",
"setting timeout for prevote due to sufficient participation",
);
self.block.round_mut().set_timeout(Step::Prevote); self.block.round_mut().set_timeout(Step::Prevote);
} }
// 44-46 // 44-46
if weight >= threshold_weight { if weight >= self.weights.threshold() {
self.broadcast(Data::Precommit(None)); self.broadcast(Data::Precommit(None));
return Ok(None); return Ok(None);
} }
@@ -870,10 +830,6 @@ impl<N: Network + 'static> TendermintMachine<N> {
if matches!(msg.data, Data::Precommit(_)) && if matches!(msg.data, Data::Precommit(_)) &&
self.block.log.has_participation(self.block.round().number, Step::Precommit) self.block.log.has_participation(self.block.round().number, Step::Precommit)
{ {
log::trace!(
target: "tendermint",
"setting timeout for precommit due to sufficient participation",
);
self.block.round_mut().set_timeout(Step::Precommit); self.block.round_mut().set_timeout(Step::Precommit);
} }

View File

@@ -1,5 +1,6 @@
use std::{sync::Arc, collections::HashMap}; use std::{sync::Arc, collections::HashMap};
use log::debug;
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence}; use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence};
@@ -26,7 +27,7 @@ impl<N: Network> MessageLog<N> {
let step = msg.data.step(); let step = msg.data.step();
if let Some(existing) = msgs.get(&step) { if let Some(existing) = msgs.get(&step) {
if existing.msg.data != msg.data { if existing.msg.data != msg.data {
log::debug!( debug!(
target: "tendermint", target: "tendermint",
"Validator sent multiple messages for the same block + round + step" "Validator sent multiple messages for the same block + round + step"
); );

View File

@@ -57,7 +57,6 @@ impl<N: Network> RoundData<N> {
// Poll all set timeouts, returning the Step whose timeout has just expired // Poll all set timeouts, returning the Step whose timeout has just expired
pub(crate) async fn timeout_future(&self) -> Step { pub(crate) async fn timeout_future(&self) -> Step {
/*
let now = Instant::now(); let now = Instant::now();
log::trace!( log::trace!(
target: "tendermint", target: "tendermint",
@@ -65,7 +64,6 @@ impl<N: Network> RoundData<N> {
self.step, self.step,
self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::<HashMap<_, _>>() self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::<HashMap<_, _>>()
); );
*/
let timeout_future = |step| { let timeout_future = |step| {
let timeout = self.timeouts.get(&step).copied(); let timeout = self.timeouts.get(&step).copied();

View File

@@ -511,7 +511,7 @@ fn start(network: Network, services: HashSet<String>) {
command command
} else { } else {
// Publish the port // Publish the port
command.arg("-p").arg("30564:30564") command.arg("-p").arg("30563:30563")
} }
} }
"serai" => { "serai" => {

View File

@@ -159,11 +159,9 @@ pub mod pallet {
/// ///
/// Errors if any amount overflows. /// Errors if any amount overflows.
pub fn mint(to: Public, balance: Balance) -> Result<(), Error<T, I>> { pub fn mint(to: Public, balance: Balance) -> Result<(), Error<T, I>> {
/*
if !T::AllowMint::is_allowed(&balance) { if !T::AllowMint::is_allowed(&balance) {
Err(Error::<T, I>::MintNotAllowed)?; Err(Error::<T, I>::MintNotAllowed)?;
} }
*/
// update the balance // update the balance
Self::increase_balance_internal(to, balance)?; Self::increase_balance_internal(to, balance)?;

View File

@@ -26,8 +26,6 @@ hex = "0.4"
rand_core = "0.6" rand_core = "0.6"
schnorrkel = "0.11" schnorrkel = "0.11"
libp2p = "0.52"
sp-core = { git = "https://github.com/serai-dex/substrate" } sp-core = { git = "https://github.com/serai-dex/substrate" }
sp-keystore = { git = "https://github.com/serai-dex/substrate" } sp-keystore = { git = "https://github.com/serai-dex/substrate" }
sp-timestamp = { git = "https://github.com/serai-dex/substrate" } sp-timestamp = { git = "https://github.com/serai-dex/substrate" }

View File

@@ -1,7 +1,6 @@
use core::marker::PhantomData; use core::marker::PhantomData;
use std::collections::HashSet;
use sp_core::{Decode, Pair as PairTrait, sr25519::Public}; use sp_core::Pair as PairTrait;
use sc_service::ChainType; use sc_service::ChainType;
@@ -24,7 +23,7 @@ fn wasm_binary() -> Vec<u8> {
WASM_BINARY.ok_or("compiled in wasm not available").unwrap().to_vec() WASM_BINARY.ok_or("compiled in wasm not available").unwrap().to_vec()
} }
fn devnet_genesis( fn testnet_genesis(
wasm_binary: &[u8], wasm_binary: &[u8],
validators: &[&'static str], validators: &[&'static str],
endowed_accounts: Vec<PublicKey>, endowed_accounts: Vec<PublicKey>,
@@ -73,57 +72,6 @@ fn devnet_genesis(
} }
} }
fn testnet_genesis(wasm_binary: &[u8], validators: Vec<&'static str>) -> RuntimeGenesisConfig {
let validators = validators
.into_iter()
.map(|validator| Public::decode(&mut hex::decode(validator).unwrap().as_slice()).unwrap())
.collect::<Vec<_>>();
assert_eq!(validators.iter().collect::<HashSet<_>>().len(), validators.len());
RuntimeGenesisConfig {
system: SystemConfig { code: wasm_binary.to_vec(), _config: PhantomData },
transaction_payment: Default::default(),
coins: CoinsConfig {
accounts: validators
.iter()
.map(|a| (*a, Balance { coin: Coin::Serai, amount: Amount(5_000_000 * 10_u64.pow(8)) }))
.collect(),
_ignore: Default::default(),
},
dex: DexConfig {
pools: vec![Coin::Bitcoin, Coin::Ether, Coin::Dai, Coin::Monero],
_ignore: Default::default(),
},
validator_sets: ValidatorSetsConfig {
networks: serai_runtime::primitives::NETWORKS
.iter()
.map(|network| match network {
NetworkId::Serai => (NetworkId::Serai, Amount(50_000 * 10_u64.pow(8))),
NetworkId::Bitcoin => (NetworkId::Bitcoin, Amount(1_000_000 * 10_u64.pow(8))),
NetworkId::Ethereum => (NetworkId::Ethereum, Amount(1_000_000 * 10_u64.pow(8))),
NetworkId::Monero => (NetworkId::Monero, Amount(100_000 * 10_u64.pow(8))),
})
.collect(),
participants: validators.clone(),
},
signals: SignalsConfig::default(),
babe: BabeConfig {
authorities: validators.iter().map(|validator| ((*validator).into(), 1)).collect(),
epoch_config: Some(BABE_GENESIS_EPOCH_CONFIG),
_config: PhantomData,
},
grandpa: GrandpaConfig {
authorities: validators.into_iter().map(|validator| (validator.into(), 1)).collect(),
_config: PhantomData,
},
}
}
pub fn development_config() -> ChainSpec { pub fn development_config() -> ChainSpec {
let wasm_binary = wasm_binary(); let wasm_binary = wasm_binary();
@@ -134,7 +82,7 @@ pub fn development_config() -> ChainSpec {
"devnet", "devnet",
ChainType::Development, ChainType::Development,
move || { move || {
devnet_genesis( testnet_genesis(
&wasm_binary, &wasm_binary,
&["Alice"], &["Alice"],
vec![ vec![
@@ -152,7 +100,7 @@ pub fn development_config() -> ChainSpec {
// Telemetry // Telemetry
None, None,
// Protocol ID // Protocol ID
Some("serai-devnet"), Some("serai"),
// Fork ID // Fork ID
None, None,
// Properties // Properties
@@ -162,7 +110,7 @@ pub fn development_config() -> ChainSpec {
) )
} }
pub fn local_config() -> ChainSpec { pub fn testnet_config() -> ChainSpec {
let wasm_binary = wasm_binary(); let wasm_binary = wasm_binary();
ChainSpec::from_genesis( ChainSpec::from_genesis(
@@ -172,7 +120,7 @@ pub fn local_config() -> ChainSpec {
"local", "local",
ChainType::Local, ChainType::Local,
move || { move || {
devnet_genesis( testnet_genesis(
&wasm_binary, &wasm_binary,
&["Alice", "Bob", "Charlie", "Dave"], &["Alice", "Bob", "Charlie", "Dave"],
vec![ vec![
@@ -190,7 +138,7 @@ pub fn local_config() -> ChainSpec {
// Telemetry // Telemetry
None, None,
// Protocol ID // Protocol ID
Some("serai-local"), Some("serai"),
// Fork ID // Fork ID
None, None,
// Properties // Properties
@@ -199,137 +147,3 @@ pub fn local_config() -> ChainSpec {
None, None,
) )
} }
pub fn testnet_config() -> ChainSpec {
{
use std::time::{Duration, SystemTime};
let secs_since_epoch = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("current time is before the epoch")
.as_secs();
let secs_till_start = 1713283200_u64.saturating_sub(secs_since_epoch);
std::thread::sleep(Duration::from_secs(secs_till_start));
}
let wasm_binary = wasm_binary();
ChainSpec::from_genesis(
// Name
"Test Network 2",
// ID
"testnet-2",
ChainType::Live,
move || {
testnet_genesis(
&wasm_binary,
vec![
// Kayaba
"4cef4080d00c6ff5ad93d61d1ca631cc10f8c9bd733e8c0c873a85b5fbe5c625",
// CommunityStaking
"587723d333049d9f4e6f027bbd701d603544a422329ea4e1027d60f7947e1074",
// SHossain
"6e30ec71b331d73992307fa7c53719ff238666d7d895487a1b691cc1e4481344",
// StormyCloud
"b0ebef6d712b3eb0f01e69a80519e55feff4be8b226fa64d84691e4b3ca2fb38",
// Yangu
"c692a906f9c63b7e4d12ad3cde204c6715b9a96b5b8ce565794917b7eaaa5f08",
// t-900
"6a9d5a3ca9422baec670e47238decf4a8515f2de0060b0a566a56dfd72686e52",
// tappokone
"36acb4be05513bed670ef3b43dc3a0fdfde8dc45339f81c80b53b2289dc3730c",
// Sleipnir
"0e87d766c9acec45b39445579cd3f40c8a4b42e9a34049bdbef0da83d000410e",
"c2f96300a956e949883a5e8952270fb8193154a68533d0dd6b10076224e30167",
"7a66312c53dfb153e842456f4e9a38dcda7e1788a3366df3e54125e29821f870",
// jberman
"b6e23eec7dbdb2bf72a087e335b44464cedfcc11c669033d6e520b3bc8de1650",
// krytie
"82815723c498d2aaaead050e63b979bb49a94a00c97b971c22340dffeaa36829",
// toplel
"4243da92918333bfc46f4d17ddeda0c3420d920231627dca1b6049f2f13cac6d",
// clamking
"941a6efa9e4dee6c3015cc42339fe56f43c2230133787746828befcee957cb1f",
// Helios
"56a0e89cffe57337e9e232e41dca0ad7306a17fa0ca63fbac048190fdd45d511",
// akil
"1caffa33b0ea1c7ed95c8450c0baf57baf9e1c1f43af3e28a722ef6d3d4db27e",
// Eumaios
"9ec7b5edf854f6285205468ed7402e40e5bed8238dc226dd4fd718a40efdce44",
// pigeons
"66c71ebf040542ab467def0ad935ec30ea693953d4322b3b168f6f4e9fcacb63",
// joe_land1
"94e25d8247b2f0e718bee169213052c693b78743dd91f403398a8837c34e0e6a",
// rlking1255
"82592430fe65e353510d3c1018cebc9806290e2d9098a94a1190f120f471c52b",
// Seth For Privacy
"f8ebbdb8ff2a77527528577bad6fd3297017f7b35a0613ba31d8af8e7e78cd7b",
// lemon_respector
"ce4a4cd996e4601a0226f3c8d9c9cae84519a1a7277b4822e1694b4a8c3ef10b",
// tuxsudo
"c6804a561d07d77c2806844a59c24bb9472df16043767721aae0caa20e82391e",
// Awakeninghumanity.eth
"5046c9f55a65e08df86c132c142f055db0376563fabc190f47a6851e0ff2af2b",
// ART3MIS.CLOUD
"5c1793880b0c06a5ce232288c7789cf4451ab20a8da49b84c88789965bc67356",
// michnovka
"98db8174ec40046b1bae39cad69ea0000d67e120524d46bc298d167407410618",
// kgminer
"8eca72a4bf684d7c4a20a34048003b504a046bce1289d3ae79a3b4422afaf808",
// Benny
"74b4f2d2347a4426c536e6ba48efa14b989b05f03c0ea9b1c67b23696c1a831d",
// Argo
"4025bbbe9c9be72769a27e5e6a3749782f4c9b2a47624bdcb0bfbd29f5e2056a",
// vdo
"1c87bbcd666099abc1ee2ec3f065abd073c237f95c4d0658b945e9d66d67622d",
// PotR
"b29ffbb4a4c0f14eb8c22fabaaacb43f92a62214ff45f0b4f50b7031c3a61a5a",
// Ghalleb
"48f903ed592638cee1c7f239a6ac14cbb0224a3153cff0f85eb0873113cf163f",
// monerobull
"56a2e3b410cb87bdb8125ae19d76a7be042de49693dc27f03e7a0dcc72b42f6c",
// Adorid
"3430222157262d6187c4537b026bcbaeb133695bbb512a7be8f25cc5a082d933",
// KeepKey
"a0ce13fb50c3d56548334af703b6ffb9a1b2f66e9dccf4a3688140b77fa58a06",
// Username
"b0e62f04f625447673a840d9c5f0e5867b355a67b0dee322334dc00925547b71",
// R0BC0D3R
"7e32cebc21b7979c36e477f0a849df1830cc052c879baf13107888654c0be654",
// worksmarter
"c4f2f6ffead84fcaa2e3c894d57c342a24c461eab5d1d17cae3d1a9e61d73e46",
],
)
},
// Bootnodes
vec![],
// Telemetry
None,
// Protocol ID
Some("serai-testnet-2"),
// Fork ID
None,
// Properties
None,
// Extensions
None,
)
}
pub fn bootnode_multiaddrs(id: &str) -> Vec<libp2p::Multiaddr> {
match id {
"local" | "devnet" => vec![],
"testnet-2" => vec![
// Kayaba
"/ip4/107.161.20.133/tcp/30333".parse().unwrap(),
// lemon_respector
"/ip4/188.66.62.11/tcp/30333".parse().unwrap(),
// Ghalleb
"/ip4/65.21.156.202/tcp/30333".parse().unwrap(),
// ART3MIS.CLOUD
"/ip4/51.195.60.217/tcp/30333".parse().unwrap(),
// worksmarter
"/ip4/37.60.255.101/tcp/30333".parse().unwrap(),
],
_ => panic!("requesting bootnodes for an unrecognized network"),
}
}

View File

@@ -40,8 +40,7 @@ impl SubstrateCli for Cli {
fn load_spec(&self, id: &str) -> Result<Box<dyn sc_service::ChainSpec>, String> { fn load_spec(&self, id: &str) -> Result<Box<dyn sc_service::ChainSpec>, String> {
match id { match id {
"dev" | "devnet" => Ok(Box::new(chain_spec::development_config())), "dev" | "devnet" => Ok(Box::new(chain_spec::development_config())),
"local" => Ok(Box::new(chain_spec::local_config())), "local" => Ok(Box::new(chain_spec::testnet_config())),
"testnet" => Ok(Box::new(chain_spec::testnet_config())),
_ => panic!("Unknown network ID"), _ => panic!("Unknown network ID"),
} }
} }

View File

@@ -19,7 +19,6 @@ pub use sc_rpc_api::DenyUnsafe;
use sc_transaction_pool_api::TransactionPool; use sc_transaction_pool_api::TransactionPool;
pub struct FullDeps<C, P> { pub struct FullDeps<C, P> {
pub id: String,
pub client: Arc<C>, pub client: Arc<C>,
pub pool: Arc<P>, pub pool: Arc<P>,
pub deny_unsafe: DenyUnsafe, pub deny_unsafe: DenyUnsafe,
@@ -47,19 +46,18 @@ where
use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer}; use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer};
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
let FullDeps { id, client, pool, deny_unsafe, authority_discovery } = deps; let FullDeps { client, pool, deny_unsafe, authority_discovery } = deps;
module.merge(System::new(client.clone(), pool, deny_unsafe).into_rpc())?; module.merge(System::new(client.clone(), pool, deny_unsafe).into_rpc())?;
module.merge(TransactionPayment::new(client.clone()).into_rpc())?; module.merge(TransactionPayment::new(client.clone()).into_rpc())?;
if let Some(authority_discovery) = authority_discovery { if let Some(authority_discovery) = authority_discovery {
let mut authority_discovery_module = let mut authority_discovery_module = RpcModule::new((client, RwLock::new(authority_discovery)));
RpcModule::new((id, client, RwLock::new(authority_discovery)));
authority_discovery_module.register_async_method( authority_discovery_module.register_async_method(
"p2p_validators", "p2p_validators",
|params, context| async move { |params, context| async move {
let network: NetworkId = params.parse()?; let network: NetworkId = params.parse()?;
let (id, client, authority_discovery) = &*context; let (client, authority_discovery) = &*context;
let latest_block = client.info().best_hash; let latest_block = client.info().best_hash;
let validators = client.runtime_api().validators(latest_block, network).map_err(|_| { let validators = client.runtime_api().validators(latest_block, network).map_err(|_| {
@@ -68,9 +66,7 @@ where
"please report this at https://github.com/serai-dex/serai", "please report this at https://github.com/serai-dex/serai",
))) )))
})?; })?;
// Always return the protocol's bootnodes let mut all_p2p_addresses = vec![];
let mut all_p2p_addresses = crate::chain_spec::bootnode_multiaddrs(id);
// Additionally returns validators found over the DHT
for validator in validators { for validator in validators {
let mut returned_addresses = authority_discovery let mut returned_addresses = authority_discovery
.write() .write()

View File

@@ -161,7 +161,7 @@ pub fn new_partial(
)) ))
} }
pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError> { pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
let ( let (
sc_service::PartialComponents { sc_service::PartialComponents {
client, client,
@@ -176,11 +176,6 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
keystore_container, keystore_container,
) = new_partial(&config)?; ) = new_partial(&config)?;
config.network.node_name = "serai".to_string();
config.network.client_version = "0.1.0".to_string();
config.network.listen_addresses =
vec!["/ip4/0.0.0.0/tcp/30333".parse().unwrap(), "/ip6/::/tcp/30333".parse().unwrap()];
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network); let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
let grandpa_protocol_name = let grandpa_protocol_name =
grandpa::protocol_standard_name(&client.block_hash(0).unwrap().unwrap(), &config.chain_spec); grandpa::protocol_standard_name(&client.block_hash(0).unwrap().unwrap(), &config.chain_spec);
@@ -208,59 +203,6 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
})?; })?;
task_manager.spawn_handle().spawn("bootnodes", "bootnodes", {
let network = network.clone();
let id = config.chain_spec.id().to_string();
async move {
// Transforms the above Multiaddrs into MultiaddrWithPeerIds
// While the PeerIds *should* be known in advance and hardcoded, that data wasn't collected in
// time and this fine for a testnet
let bootnodes = || async {
use libp2p::{Transport as TransportTrait, tcp::tokio::Transport, noise::Config};
let bootnode_multiaddrs = crate::chain_spec::bootnode_multiaddrs(&id);
let mut tasks = vec![];
for multiaddr in bootnode_multiaddrs {
tasks.push(tokio::time::timeout(
core::time::Duration::from_secs(10),
tokio::task::spawn(async move {
let Ok(noise) = Config::new(&sc_network::Keypair::generate_ed25519()) else { None? };
let mut transport = Transport::default()
.upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(noise)
.multiplex(libp2p::yamux::Config::default());
let Ok(transport) = transport.dial(multiaddr.clone()) else { None? };
let Ok((peer_id, _)) = transport.await else { None? };
Some(sc_network::config::MultiaddrWithPeerId { multiaddr, peer_id })
}),
));
}
let mut res = vec![];
for task in tasks {
if let Ok(Ok(Some(bootnode))) = task.await {
res.push(bootnode);
}
}
res
};
use sc_network::{NetworkStatusProvider, NetworkPeers};
loop {
if let Ok(status) = network.status().await {
if status.num_connected_peers < 3 {
for bootnode in bootnodes().await {
let _ = network.add_reserved_peer(bootnode);
}
}
}
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
}
}
});
if config.offchain_worker.enabled { if config.offchain_worker.enabled {
task_manager.spawn_handle().spawn( task_manager.spawn_handle().spawn(
"offchain-workers-runner", "offchain-workers-runner",
@@ -316,13 +258,11 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
}; };
let rpc_builder = { let rpc_builder = {
let id = config.chain_spec.id().to_string();
let client = client.clone(); let client = client.clone();
let pool = transaction_pool.clone(); let pool = transaction_pool.clone();
Box::new(move |deny_unsafe, _| { Box::new(move |deny_unsafe, _| {
crate::rpc::create_full(crate::rpc::FullDeps { crate::rpc::create_full(crate::rpc::FullDeps {
id: id.clone(),
client: client.clone(), client: client.clone(),
pool: pool.clone(), pool: pool.clone(),
deny_unsafe, deny_unsafe,