Support reloading the mempool from disk

This commit is contained in:
Luke Parker
2023-04-14 15:51:43 -04:00
parent c032f66f8a
commit 2e2bc59703
7 changed files with 127 additions and 41 deletions

View File

@@ -29,8 +29,8 @@ pub trait Db: 'static + Send + Sync + Clone + Debug + Get {
} }
/// An atomic operation for the in-memory databae. /// An atomic operation for the in-memory databae.
#[derive(Debug)]
#[must_use] #[must_use]
#[derive(PartialEq, Eq, Debug)]
pub struct MemDbTxn<'a>(&'a MemDb, HashMap<Vec<u8>, Vec<u8>>, HashSet<Vec<u8>>); pub struct MemDbTxn<'a>(&'a MemDb, HashMap<Vec<u8>, Vec<u8>>, HashSet<Vec<u8>>);
impl<'a> Get for MemDbTxn<'a> { impl<'a> Get for MemDbTxn<'a> {
@@ -65,6 +65,13 @@ impl<'a> DbTxn for MemDbTxn<'a> {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MemDb(Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>); pub struct MemDb(Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>);
impl PartialEq for MemDb {
fn eq(&self, other: &MemDb) -> bool {
*self.0.read().unwrap() == *other.0.read().unwrap()
}
}
impl Eq for MemDb {}
impl Default for MemDb { impl Default for MemDb {
fn default() -> MemDb { fn default() -> MemDb {
MemDb(Arc::new(RwLock::new(HashMap::new()))) MemDb(Arc::new(RwLock::new(HashMap::new())))

View File

@@ -19,27 +19,27 @@ pub(crate) struct Blockchain<D: Db, T: Transaction> {
next_nonces: HashMap<<Ristretto as Ciphersuite>::G, u32>, next_nonces: HashMap<<Ristretto as Ciphersuite>::G, u32>,
provided: ProvidedTransactions<D, T>, provided: ProvidedTransactions<D, T>,
mempool: Mempool<T>, mempool: Mempool<D, T>,
} }
impl<D: Db, T: Transaction> Blockchain<D, T> { impl<D: Db, T: Transaction> Blockchain<D, T> {
fn tip_key(&self) -> Vec<u8> { fn tip_key(&self) -> Vec<u8> {
D::key(b"tributary", b"tip", self.genesis) D::key(b"tributary_blockchain", b"tip", self.genesis)
} }
fn block_number_key(&self) -> Vec<u8> { fn block_number_key(&self) -> Vec<u8> {
D::key(b"tributary", b"block_number", self.genesis) D::key(b"tributary_blockchain", b"block_number", self.genesis)
} }
fn block_key(&self, hash: &[u8; 32]) -> Vec<u8> { fn block_key(&self, hash: &[u8; 32]) -> Vec<u8> {
// Since block hashes incorporate their parent, and the first parent is the genesis, this is // Since block hashes incorporate their parent, and the first parent is the genesis, this is
// fine not incorporating the hash unless there's a hash collision // fine not incorporating the hash unless there's a hash collision
D::key(b"tributary", b"block", hash) D::key(b"tributary_blockchain", b"block", hash)
} }
fn commit_key(&self, hash: &[u8; 32]) -> Vec<u8> { fn commit_key(&self, hash: &[u8; 32]) -> Vec<u8> {
D::key(b"tributary", b"commit", hash) D::key(b"tributary_blockchain", b"commit", hash)
} }
fn next_nonce_key(&self, signer: &<Ristretto as Ciphersuite>::G) -> Vec<u8> { fn next_nonce_key(&self, signer: &<Ristretto as Ciphersuite>::G) -> Vec<u8> {
D::key( D::key(
b"tributary", b"tributary_blockchain",
b"next_nonce", b"next_nonce",
[self.genesis.as_ref(), signer.to_bytes().as_ref()].concat(), [self.genesis.as_ref(), signer.to_bytes().as_ref()].concat(),
) )
@@ -50,8 +50,6 @@ impl<D: Db, T: Transaction> Blockchain<D, T> {
genesis: [u8; 32], genesis: [u8; 32],
participants: &[<Ristretto as Ciphersuite>::G], participants: &[<Ristretto as Ciphersuite>::G],
) -> Self { ) -> Self {
// TODO: Reload mempool
let mut next_nonces = HashMap::new(); let mut next_nonces = HashMap::new();
for participant in participants { for participant in participants {
next_nonces.insert(*participant, 0); next_nonces.insert(*participant, 0);
@@ -65,8 +63,8 @@ impl<D: Db, T: Transaction> Blockchain<D, T> {
tip: genesis, tip: genesis,
next_nonces, next_nonces,
provided: ProvidedTransactions::new(db, genesis), provided: ProvidedTransactions::new(db.clone(), genesis),
mempool: Mempool::new(genesis), mempool: Mempool::new(db, genesis),
}; };
if let Some((block_number, tip)) = { if let Some((block_number, tip)) = {

View File

@@ -98,11 +98,11 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
validators: HashMap<<Ristretto as Ciphersuite>::G, u64>, validators: HashMap<<Ristretto as Ciphersuite>::G, u64>,
p2p: P, p2p: P,
) -> Self { ) -> Option<Self> {
let validators_vec = validators.keys().cloned().collect::<Vec<_>>(); let validators_vec = validators.keys().cloned().collect::<Vec<_>>();
let signer = Arc::new(Signer::new(genesis, key)); let signer = Arc::new(Signer::new(genesis, key));
let validators = Arc::new(Validators::new(genesis, validators)); let validators = Arc::new(Validators::new(genesis, validators)?);
let mut blockchain = Blockchain::new(db, genesis, &validators_vec); let mut blockchain = Blockchain::new(db, genesis, &validators_vec);
let block_number = blockchain.block_number(); let block_number = blockchain.block_number();
@@ -123,7 +123,7 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
tokio::task::spawn(machine.run()); tokio::task::spawn(machine.run());
Self { network, synced_block, messages } Some(Self { network, synced_block, messages })
} }
pub fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> { pub fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> {

View File

@@ -2,18 +2,54 @@ use std::collections::HashMap;
use ciphersuite::{Ciphersuite, Ristretto}; use ciphersuite::{Ciphersuite, Ristretto};
use serai_db::{DbTxn, Db};
use crate::{ACCOUNT_MEMPOOL_LIMIT, Signed, TransactionKind, Transaction, verify_transaction}; use crate::{ACCOUNT_MEMPOOL_LIMIT, Signed, TransactionKind, Transaction, verify_transaction};
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq, Eq, Debug)]
pub(crate) struct Mempool<T: Transaction> { pub(crate) struct Mempool<D: Db, T: Transaction> {
db: D,
genesis: [u8; 32], genesis: [u8; 32],
txs: HashMap<[u8; 32], T>, txs: HashMap<[u8; 32], T>,
next_nonces: HashMap<<Ristretto as Ciphersuite>::G, u32>, next_nonces: HashMap<<Ristretto as Ciphersuite>::G, u32>,
} }
impl<T: Transaction> Mempool<T> { impl<D: Db, T: Transaction> Mempool<D, T> {
pub(crate) fn new(genesis: [u8; 32]) -> Self { fn transaction_key(&self, hash: &[u8]) -> Vec<u8> {
Mempool { genesis, txs: HashMap::new(), next_nonces: HashMap::new() } D::key(b"tributary_mempool", b"transaction", [self.genesis.as_ref(), hash].concat())
}
fn current_mempool_key(&self) -> Vec<u8> {
D::key(b"tributary_mempool", b"current", self.genesis)
}
pub(crate) fn new(db: D, genesis: [u8; 32]) -> Self {
let mut res = Mempool { db, genesis, txs: HashMap::new(), next_nonces: HashMap::new() };
let current_mempool = res.db.get(res.current_mempool_key()).unwrap_or(vec![]);
let mut hash = [0; 32];
let mut i = 0;
while i < current_mempool.len() {
hash.copy_from_slice(&current_mempool[i .. (i + 32)]);
let tx =
T::read::<&[u8]>(&mut res.db.get(res.transaction_key(&hash)).unwrap().as_ref()).unwrap();
match tx.kind() {
TransactionKind::Signed(Signed { signer, nonce, .. }) => {
if let Some(prev) = res.next_nonces.insert(*signer, nonce + 1) {
// These mempool additions should've been ordered
assert!(prev < *nonce);
}
}
_ => panic!("mempool database had a non-signed transaction"),
}
debug_assert_eq!(tx.hash(), hash);
res.txs.insert(hash, tx);
i += 32;
}
res
} }
/// Returns true if this is a valid, new transaction. /// Returns true if this is a valid, new transaction.
@@ -53,7 +89,20 @@ impl<T: Transaction> Mempool<T> {
} }
assert_eq!(self.next_nonces[signer], nonce + 1); assert_eq!(self.next_nonces[signer], nonce + 1);
self.txs.insert(tx.hash(), tx); let tx_hash = tx.hash();
let transaction_key = self.transaction_key(&tx_hash);
let current_mempool_key = self.current_mempool_key();
let mut current_mempool = self.db.get(&current_mempool_key).unwrap_or(vec![]);
let mut txn = self.db.txn();
txn.put(transaction_key, tx.serialize());
current_mempool.extend(tx_hash);
txn.put(current_mempool_key, current_mempool);
txn.commit();
self.txs.insert(tx_hash, tx);
true true
} }
_ => false, _ => false,
@@ -77,7 +126,7 @@ impl<T: Transaction> Mempool<T> {
match tx.kind() { match tx.kind() {
TransactionKind::Signed(Signed { signer, nonce, .. }) => { TransactionKind::Signed(Signed { signer, nonce, .. }) => {
if blockchain_next_nonces[signer] > *nonce { if blockchain_next_nonces[signer] > *nonce {
self.txs.remove(&hash); self.remove(&hash);
continue; continue;
} }
} }
@@ -103,6 +152,27 @@ impl<T: Transaction> Mempool<T> {
/// Remove a transaction from the mempool. /// Remove a transaction from the mempool.
pub(crate) fn remove(&mut self, tx: &[u8; 32]) { pub(crate) fn remove(&mut self, tx: &[u8; 32]) {
let transaction_key = self.transaction_key(tx);
let current_mempool_key = self.current_mempool_key();
let current_mempool = self.db.get(&current_mempool_key).unwrap_or(vec![]);
let mut i = 0;
while i < current_mempool.len() {
if &current_mempool[i .. (i + 32)] == tx {
break;
}
i += 32;
}
// This doesn't have to be atomic with any greater operation
let mut txn = self.db.txn();
txn.del(transaction_key);
if i != current_mempool.len() {
txn
.put(current_mempool_key, [&current_mempool[.. i], &current_mempool[(i + 32) ..]].concat());
}
txn.commit();
self.txs.remove(tx); self.txs.remove(tx);
} }

View File

@@ -28,22 +28,26 @@ pub struct ProvidedTransactions<D: Db, T: Transaction> {
} }
impl<D: Db, T: Transaction> ProvidedTransactions<D, T> { impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
fn provided_key(&self, hash: &[u8]) -> Vec<u8> { fn transaction_key(&self, hash: &[u8]) -> Vec<u8> {
D::key(b"tributary", b"provided", [self.genesis.as_ref(), hash].concat()) D::key(b"tributary_provided", b"transaction", [self.genesis.as_ref(), hash].concat())
} }
fn currently_provided_key(&self) -> Vec<u8> { fn current_provided_key(&self) -> Vec<u8> {
D::key(b"tributary", b"currently_provided", self.genesis) D::key(b"tributary_provided", b"current", self.genesis)
} }
pub(crate) fn new(db: D, genesis: [u8; 32]) -> Self { pub(crate) fn new(db: D, genesis: [u8; 32]) -> Self {
let mut res = ProvidedTransactions { db, genesis, transactions: VecDeque::new() }; let mut res = ProvidedTransactions { db, genesis, transactions: VecDeque::new() };
let currently_provided = res.db.get(res.currently_provided_key()).unwrap_or(vec![]); let currently_provided = res.db.get(res.current_provided_key()).unwrap_or(vec![]);
let mut i = 0; let mut i = 0;
while i < currently_provided.len() { while i < currently_provided.len() {
res.transactions.push_back( res.transactions.push_back(
T::read::<&[u8]>( T::read::<&[u8]>(
&mut res.db.get(res.provided_key(&currently_provided[i .. (i + 32)])).unwrap().as_ref(), &mut res
.db
.get(res.transaction_key(&currently_provided[i .. (i + 32)]))
.unwrap()
.as_ref(),
) )
.unwrap(), .unwrap(),
); );
@@ -65,18 +69,18 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
} }
let tx_hash = tx.hash(); let tx_hash = tx.hash();
let provided_key = self.provided_key(&tx_hash); let provided_key = self.transaction_key(&tx_hash);
if self.db.get(&provided_key).is_some() { if self.db.get(&provided_key).is_some() {
Err(ProvidedError::AlreadyProvided)?; Err(ProvidedError::AlreadyProvided)?;
} }
let currently_provided_key = self.currently_provided_key(); let current_provided_key = self.current_provided_key();
let mut currently_provided = self.db.get(&currently_provided_key).unwrap_or(vec![]); let mut currently_provided = self.db.get(&current_provided_key).unwrap_or(vec![]);
let mut txn = self.db.txn(); let mut txn = self.db.txn();
txn.put(provided_key, tx.serialize()); txn.put(provided_key, tx.serialize());
currently_provided.extend(tx_hash); currently_provided.extend(tx_hash);
txn.put(currently_provided_key, currently_provided); txn.put(current_provided_key, currently_provided);
txn.commit(); txn.commit();
self.transactions.push_back(tx); self.transactions.push_back(tx);
@@ -87,9 +91,9 @@ impl<D: Db, T: Transaction> ProvidedTransactions<D, T> {
pub(crate) fn complete(&mut self, txn: &mut D::Transaction<'_>, tx: [u8; 32]) { pub(crate) fn complete(&mut self, txn: &mut D::Transaction<'_>, tx: [u8; 32]) {
assert_eq!(self.transactions.pop_front().unwrap().hash(), tx); assert_eq!(self.transactions.pop_front().unwrap().hash(), tx);
let currently_provided_key = self.currently_provided_key(); let current_provided_key = self.current_provided_key();
let mut currently_provided = txn.get(&currently_provided_key).unwrap(); let mut currently_provided = txn.get(&current_provided_key).unwrap();
assert_eq!(&currently_provided.drain(.. 32).collect::<Vec<_>>(), &tx); assert_eq!(&currently_provided.drain(.. 32).collect::<Vec<_>>(), &tx);
txn.put(currently_provided_key, currently_provided); txn.put(current_provided_key, currently_provided);
} }
} }

View File

@@ -126,7 +126,7 @@ impl Validators {
pub(crate) fn new( pub(crate) fn new(
genesis: [u8; 32], genesis: [u8; 32],
validators: HashMap<<Ristretto as Ciphersuite>::G, u64>, validators: HashMap<<Ristretto as Ciphersuite>::G, u64>,
) -> Validators { ) -> Option<Validators> {
let mut total_weight = 0; let mut total_weight = 0;
let mut weights = HashMap::new(); let mut weights = HashMap::new();
@@ -134,8 +134,9 @@ impl Validators {
let mut robin = vec![]; let mut robin = vec![];
for (validator, weight) in validators { for (validator, weight) in validators {
let validator = validator.to_bytes(); let validator = validator.to_bytes();
// TODO: Make an error out of this if weight == 0 {
assert!(weight != 0); return None;
}
total_weight += weight; total_weight += weight;
weights.insert(validator, weight); weights.insert(validator, weight);
@@ -145,7 +146,7 @@ impl Validators {
} }
robin.shuffle(&mut ChaCha12Rng::from_seed(transcript.rng_seed(b"robin"))); robin.shuffle(&mut ChaCha12Rng::from_seed(transcript.rng_seed(b"robin")));
Validators { genesis, total_weight, weights, robin } Some(Validators { genesis, total_weight, weights, robin })
} }
} }

View File

@@ -5,20 +5,23 @@ use rand::{RngCore, rngs::OsRng};
use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto}; use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto};
use serai_db::MemDb;
use crate::{ use crate::{
ACCOUNT_MEMPOOL_LIMIT, Transaction, Mempool, ACCOUNT_MEMPOOL_LIMIT, Transaction, Mempool,
tests::{SignedTransaction, signed_transaction}, tests::{SignedTransaction, signed_transaction},
}; };
fn new_mempool<T: Transaction>() -> ([u8; 32], Mempool<T>) { fn new_mempool<T: Transaction>() -> ([u8; 32], MemDb, Mempool<MemDb, T>) {
let mut genesis = [0; 32]; let mut genesis = [0; 32];
OsRng.fill_bytes(&mut genesis); OsRng.fill_bytes(&mut genesis);
(genesis, Mempool::new(genesis)) let db = MemDb::new();
(genesis, db.clone(), Mempool::new(db, genesis))
} }
#[test] #[test]
fn mempool_addition() { fn mempool_addition() {
let (genesis, mut mempool) = new_mempool::<SignedTransaction>(); let (genesis, db, mut mempool) = new_mempool::<SignedTransaction>();
let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng)); let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
@@ -31,6 +34,9 @@ fn mempool_addition() {
assert!(mempool.add(&blockchain_next_nonces, true, first_tx.clone())); assert!(mempool.add(&blockchain_next_nonces, true, first_tx.clone()));
assert_eq!(mempool.next_nonce(&signer), Some(1)); assert_eq!(mempool.next_nonce(&signer), Some(1));
// Test reloading works
assert_eq!(mempool, Mempool::new(db, genesis));
// Adding it again should fail // Adding it again should fail
assert!(!mempool.add(&blockchain_next_nonces, true, first_tx.clone())); assert!(!mempool.add(&blockchain_next_nonces, true, first_tx.clone()));
@@ -67,7 +73,7 @@ fn mempool_addition() {
#[test] #[test]
fn too_many_mempool() { fn too_many_mempool() {
let (genesis, mut mempool) = new_mempool::<SignedTransaction>(); let (genesis, _, mut mempool) = new_mempool::<SignedTransaction>();
let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng)); let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut OsRng));
let signer = signed_transaction(&mut OsRng, genesis, &key, 0).1.signer; let signer = signed_transaction(&mut OsRng, genesis, &key, 0).1.signer;