From e8aab4c49d805902d11f30acf8131d5049bddbe2 Mon Sep 17 00:00:00 2001 From: driftluo Date: Thu, 21 Nov 2024 21:13:31 +0800 Subject: [PATCH] feat: impl peer store load/dump on wasm --- Cargo.lock | 29 +++- network/Cargo.toml | 2 + network/src/lib.rs | 2 +- network/src/network.rs | 55 +++++++- network/src/peer_store/browser.rs | 172 ++++++++++++++++++++++++ network/src/peer_store/mod.rs | 2 + network/src/peer_store/peer_store_db.rs | 85 ++++++++---- network/src/services/dump_peer_store.rs | 16 ++- util/app-config/src/configs/network.rs | 50 ++----- 9 files changed, 341 insertions(+), 72 deletions(-) create mode 100644 network/src/peer_store/browser.rs diff --git a/Cargo.lock b/Cargo.lock index dc2351bf12..04e8b5d12f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1287,6 +1287,7 @@ dependencies = [ "criterion", "faster-hex", "futures", + "idb", "ipnetwork", "num_cpus", "proptest", @@ -1294,6 +1295,7 @@ dependencies = [ "secp256k1", "sentry", "serde", + "serde-wasm-bindgen", "serde_json", "snap", "socket2", @@ -3300,6 +3302,20 @@ dependencies = [ "cc", ] +[[package]] +name = "idb" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3afe8830d5802f769dc0be20a87f9f116798c896650cb6266eb5c19a3c109eed" +dependencies = [ + "js-sys", + "num-traits", + "thiserror", + "tokio", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -3628,7 +3644,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -5297,6 +5313,17 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-wasm-bindgen" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8302e169f0eddcc139c70f139d19d6467353af16f9fce27e8c30158036a1e16b" +dependencies = [ + "js-sys", + "serde", + "wasm-bindgen", +] + [[package]] name = "serde_derive" version = "1.0.210" diff --git a/network/Cargo.toml b/network/Cargo.toml index 04846b76ad..4f7e0bcf25 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -51,6 +51,8 @@ socket2 = "0.5" p2p = { version = "0.6.2", package = "tentacle", default-features = false, features = [ "wasm-timer", ] } +idb = "0.6" +serde-wasm-bindgen = "0.6.5" [features] diff --git a/network/src/lib.rs b/network/src/lib.rs index 3bb66e24bb..c672dab65a 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -40,7 +40,7 @@ pub use p2p::{ async_trait, builder::ServiceBuilder, bytes, multiaddr, runtime, - secio::{PeerId, PublicKey}, + secio::{self, PeerId, PublicKey}, service::{ServiceControl, SessionType, TargetProtocol, TargetSession}, traits::ServiceProtocol, utils::{extract_peer_id, multiaddr_to_socketaddr}, diff --git a/network/src/network.rs b/network/src/network.rs index f84b4721dd..6134e7cb12 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -94,8 +94,8 @@ pub struct NetworkState { impl NetworkState { /// Init from config + #[cfg(not(target_family = "wasm"))] pub fn from_config(config: NetworkConfig) -> Result { - #[cfg(not(target_family = "wasm"))] config.create_dir_if_not_exists()?; let local_private_key = config.fetch_private_key()?; let local_peer_id = local_private_key.peer_id(); @@ -117,12 +117,10 @@ impl NetworkState { }) .collect(); info!("Loading the peer store. This process may take a few seconds to complete."); - #[cfg(not(target_family = "wasm"))] + let peer_store = Mutex::new(PeerStore::load_from_dir_or_default( config.peer_store_path(), )); - #[cfg(target_family = "wasm")] - let peer_store = Mutex::new(PeerStore::load_from_config(&config)); let bootnodes = config.bootnodes(); let peer_registry = PeerRegistry::new( @@ -150,6 +148,55 @@ impl NetworkState { }) } + #[cfg(target_family = "wasm")] + pub async fn from_config(config: NetworkConfig) -> Result { + let local_private_key = config.fetch_private_key()?; + let local_peer_id = local_private_key.peer_id(); + // set max score to public addresses + let public_addrs: HashSet = config + .listen_addresses + .iter() + .chain(config.public_addresses.iter()) + .cloned() + .filter_map(|mut addr| { + multiaddr_to_socketaddr(&addr) + .filter(|addr| is_reachable(addr.ip())) + .and({ + if extract_peer_id(&addr).is_none() { + addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))); + } + Some(addr) + }) + }) + .collect(); + info!("Loading the peer store. This process may take a few seconds to complete."); + let peer_store = Mutex::new(PeerStore::load_from_idb(config.peer_store_path()).await); + let bootnodes = config.bootnodes(); + + let peer_registry = PeerRegistry::new( + config.max_inbound_peers(), + config.max_outbound_peers(), + config.whitelist_only, + config.whitelist_peers(), + ); + Ok(NetworkState { + peer_store, + config, + bootnodes, + peer_registry: RwLock::new(peer_registry), + dialing_addrs: RwLock::new(HashMap::default()), + public_addrs: RwLock::new(public_addrs), + listened_addrs: RwLock::new(Vec::new()), + pending_observed_addrs: RwLock::new(HashSet::default()), + local_private_key, + local_peer_id, + active: AtomicBool::new(true), + protocols: RwLock::new(Vec::new()), + required_flags: Flags::SYNC | Flags::DISCOVERY | Flags::RELAY, + ckb2023: AtomicBool::new(false), + }) + } + /// fork flag pub fn ckb2023(self, init: bool) -> Self { self.ckb2023.store(init, Ordering::SeqCst); diff --git a/network/src/peer_store/browser.rs b/network/src/peer_store/browser.rs new file mode 100644 index 0000000000..20fd775fd5 --- /dev/null +++ b/network/src/peer_store/browser.rs @@ -0,0 +1,172 @@ +use idb::{ + DatabaseEvent, Factory, IndexParams, KeyPath, ObjectStoreParams, TransactionMode, + TransactionResult, +}; +use p2p::runtime; +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc::channel, OnceCell}; + +use std::path::Path; + +use crate::errors::PeerStoreError; + +static DB: OnceCell = OnceCell::const_new(); + +#[derive(Deserialize, Serialize, Debug)] +pub struct KV { + pub key: Vec, + pub value: Vec, +} + +struct Request { + cmd: CommandRequest, + resp: tokio::sync::oneshot::Sender, +} + +enum CommandResponse { + Read { value: Option> }, + Put, + Shutdown, +} + +enum CommandRequest { + Read { key: Vec }, + Put { kv: KV }, + Shutdown, +} + +impl std::fmt::Debug for CommandResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CommandResponse::Read { .. } => write!(f, "Read"), + CommandResponse::Put { .. } => write!(f, "Put"), + CommandResponse::Shutdown => write!(f, "Shutdown"), + } + } +} + +impl std::fmt::Debug for CommandRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CommandRequest::Read { .. } => write!(f, "Read"), + CommandRequest::Put { .. } => write!(f, "Put"), + CommandRequest::Shutdown => write!(f, "Shutdown"), + } + } +} + +pub async fn get_db>(path: P) -> &'static Storage { + DB.get_or_init(|| Storage::new(path)).await +} + +#[derive(Clone)] +pub struct Storage { + chan: tokio::sync::mpsc::Sender, +} + +impl Storage { + pub async fn new>(path: P) -> Self { + let factory = Factory::new().unwrap(); + let mut open_request = factory.open("network", Some(1)).unwrap(); + let store_name = path.as_ref().to_str().unwrap().to_owned(); + let store_name_clone = store_name.clone(); + open_request.on_upgrade_needed(move |event| { + let database = event.database().unwrap(); + let store_params = ObjectStoreParams::new(); + + let store = database + .create_object_store(&store_name_clone, store_params) + .unwrap(); + let mut index_params = IndexParams::new(); + index_params.unique(true); + store + .create_index("key", KeyPath::new_single("key"), Some(index_params)) + .unwrap(); + }); + let db = open_request.await.unwrap(); + let (tx, mut rx) = channel(128); + + runtime::spawn(async move { + loop { + let request: Request = rx.recv().await.unwrap(); + match request.cmd { + CommandRequest::Read { key } => { + let tran = db + .transaction(&[&store_name], TransactionMode::ReadOnly) + .unwrap(); + let store = tran.object_store(&store_name).unwrap(); + let key = serde_wasm_bindgen::to_value(&key).unwrap(); + let value = store + .get(key) + .unwrap() + .await + .unwrap() + .map(|v| serde_wasm_bindgen::from_value::(v).unwrap().value); + assert_eq!(TransactionResult::Committed, tran.await.unwrap()); + request.resp.send(CommandResponse::Read { value }).unwrap() + } + CommandRequest::Put { kv } => { + let tran = db + .transaction(&[&store_name], TransactionMode::ReadWrite) + .unwrap(); + let store = tran.object_store(&store_name).unwrap(); + + let key = serde_wasm_bindgen::to_value(&kv.key).unwrap(); + let value = serde_wasm_bindgen::to_value(&kv).unwrap(); + store.put(&value, Some(&key)).unwrap().await.unwrap(); + assert_eq!( + TransactionResult::Committed, + tran.commit().unwrap().await.unwrap() + ); + request.resp.send(CommandResponse::Put).unwrap(); + } + CommandRequest::Shutdown => { + request.resp.send(CommandResponse::Shutdown).unwrap(); + break; + } + } + } + }); + + Self { chan: tx } + } + + pub async fn get>(&self, key: K) -> Result>, PeerStoreError> { + let value = send_command( + &self.chan, + CommandRequest::Read { + key: key.as_ref().to_vec(), + }, + ) + .await; + if let CommandResponse::Read { value } = value { + return Ok(value); + } else { + unreachable!() + } + } + + pub async fn put(&self, key: Vec, value: Vec) -> Result<(), PeerStoreError> { + let kv = KV { key, value }; + + send_command(&self.chan, CommandRequest::Put { kv }).await; + Ok(()) + } + + pub async fn shutdown(&self) { + if let CommandResponse::Shutdown = send_command(&self.chan, CommandRequest::Shutdown).await + { + } else { + unreachable!() + } + } +} + +async fn send_command( + chan: &tokio::sync::mpsc::Sender, + cmd: CommandRequest, +) -> CommandResponse { + let (tx, rx) = tokio::sync::oneshot::channel(); + chan.send(Request { cmd, resp: tx }).await.unwrap(); + rx.await.unwrap() +} diff --git a/network/src/peer_store/mod.rs b/network/src/peer_store/mod.rs index 7461e68541..ab71862676 100644 --- a/network/src/peer_store/mod.rs +++ b/network/src/peer_store/mod.rs @@ -9,6 +9,8 @@ pub mod addr_manager; pub mod ban_list; +#[cfg(target_family = "wasm")] +pub(crate) mod browser; mod peer_store_db; mod peer_store_impl; pub mod types; diff --git a/network/src/peer_store/peer_store_db.rs b/network/src/peer_store/peer_store_db.rs index daeb604414..16572964a6 100644 --- a/network/src/peer_store/peer_store_db.rs +++ b/network/src/peer_store/peer_store_db.rs @@ -39,11 +39,9 @@ impl AddrManager { } #[cfg(target_family = "wasm")] - pub fn dump_with_fn(&self, f: F) { + pub fn dump_data(&self) -> Vec { let addrs: Vec<_> = self.addrs_iter().collect(); - debug!("Dump {} addrs", addrs.len()); - let bytes = serde_json::to_string(&addrs).unwrap(); - f(bytes.as_bytes()) + serde_json::to_string(&addrs).unwrap().into_bytes() } } @@ -72,11 +70,9 @@ impl BanList { } #[cfg(target_family = "wasm")] - pub fn dump_with_fn(&self, f: F) { - let addrs: Vec<_> = self.get_banned_addrs(); - debug!("Dump {} banned addrs", addrs.len()); - let bytes = serde_json::to_string(&addrs).unwrap(); - f(bytes.as_bytes()) + pub fn dump_data(&self) -> Vec { + let banned_addrs = self.get_banned_addrs(); + serde_json::to_string(&banned_addrs).unwrap().into_bytes() } } @@ -124,17 +120,43 @@ impl PeerStore { } #[cfg(target_family = "wasm")] - pub fn load_from_config(config: &ckb_app_config::NetworkConfig) -> Self { - let addr_manager = Ok((config.peer_store_fns.load_fn)()) - .and_then(|file| { - AddrManager::load(std::io::BufReader::new(std::io::Cursor::new(file))) - .map_err(|err| error!("Failed to load AddrManager db, error: {:?}", err)) + pub async fn load_from_idb>(path: P) -> Self { + use crate::peer_store::browser::get_db; + + let addr_manager_path = path + .as_ref() + .join(DEFAULT_ADDR_MANAGER_DB) + .to_str() + .unwrap() + .to_owned() + .into_bytes(); + let ban_list_path = path + .as_ref() + .join(DEFAULT_BAN_LIST_DB) + .to_str() + .unwrap() + .to_owned() + .into_bytes(); + + let db = get_db(path).await; + + let addr_manager = db + .get(&addr_manager_path) + .await + .map_err(|err| debug!("Failed to get indexdb value, error: {:?}", err)) + .and_then(|data| { + AddrManager::load(std::io::Cursor::new(data.unwrap_or_default())) + .map_err(|err| debug!("Failed to load peer store value, error: {:?}", err)) }) .unwrap_or_default(); - let ban_list = Ok((config.peer_store_ban_list_fns.load_fn)()) - .and_then(|file| { - BanList::load(std::io::BufReader::new(std::io::Cursor::new(file))) - .map_err(|err| error!("Failed to load BanList db, error: {:?}", err)) + + let ban_list = db + .get(&ban_list_path) + .await + .map_err(|err| debug!("Failed to get indexdb value, error: {:?}", err)) + .and_then(|data| { + BanList::load(std::io::Cursor::new(data.unwrap_or_default())) + .map_err(|err| error!("Failed to load BanList value, error: {:?}", err)) }) .unwrap_or_default(); PeerStore::new(addr_manager, ban_list) @@ -174,11 +196,28 @@ impl PeerStore { } #[cfg(target_family = "wasm")] - pub fn dump_with_config(&self, config: &ckb_app_config::NetworkConfig) { - self.addr_manager() - .dump_with_fn(&config.peer_store_fns.dump_fn); - self.ban_list() - .dump_with_fn(&config.peer_store_ban_list_fns.dump_fn); + pub fn dump_to_idb>(&self, path: P) -> impl std::future::Future { + use crate::peer_store::browser::get_db; + let ban_list = self.ban_list().dump_data(); + let addr_manager = self.addr_manager().dump_data(); + let addr_manager_path = path + .as_ref() + .join(DEFAULT_ADDR_MANAGER_DB) + .to_str() + .unwrap() + .to_owned(); + let ban_list_path = path + .as_ref() + .join(DEFAULT_BAN_LIST_DB) + .to_str() + .unwrap() + .to_owned(); + async { + let db = get_db(path).await; + + let _ignore = db.put(addr_manager_path.into_bytes(), addr_manager).await; + let _ignore = db.put(ban_list_path.into_bytes(), ban_list).await; + } } } diff --git a/network/src/services/dump_peer_store.rs b/network/src/services/dump_peer_store.rs index f95230a79f..c0916b8c4f 100644 --- a/network/src/services/dump_peer_store.rs +++ b/network/src/services/dump_peer_store.rs @@ -38,9 +38,11 @@ impl DumpPeerStoreService { #[cfg(target_family = "wasm")] fn dump_peer_store(&self) { - let config = &self.network_state.config; - self.network_state - .with_peer_store_mut(|peer_store| peer_store.dump_with_config(config)); + let path = self.network_state.config.peer_store_path(); + self.network_state.with_peer_store_mut(|peer_store| { + let task = peer_store.dump_to_idb(path); + p2p::runtime::spawn(task) + }); } } @@ -48,6 +50,14 @@ impl Drop for DumpPeerStoreService { fn drop(&mut self) { debug!("Dump peer store before exiting"); self.dump_peer_store(); + #[cfg(target_family = "wasm")] + { + use crate::peer_store::browser::get_db; + let path = self.network_state.config.peer_store_path(); + p2p::runtime::spawn(async { + let _ignore = get_db(path).await.shutdown().await; + }); + } } } diff --git a/util/app-config/src/configs/network.rs b/util/app-config/src/configs/network.rs index e047fe9635..70d8cb3550 100644 --- a/util/app-config/src/configs/network.rs +++ b/util/app-config/src/configs/network.rs @@ -91,37 +91,7 @@ pub struct Config { #[cfg(target_family = "wasm")] #[serde(skip)] - pub secret_key: Option<[u8; 32]>, - - #[cfg(target_family = "wasm")] - #[serde(skip)] - pub peer_store_ban_list_fns: std::sync::Arc, - #[cfg(target_family = "wasm")] - #[serde(skip)] - pub peer_store_fns: std::sync::Arc, -} - -#[cfg(target_family = "wasm")] -pub struct PeerStoreWasm { - pub dump_fn: Box, - pub load_fn: Box Vec + Send + Sync>, -} - -#[cfg(target_family = "wasm")] -impl std::fmt::Debug for PeerStoreWasm { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "wasm peer store functions") - } -} - -#[cfg(target_family = "wasm")] -impl Default for PeerStoreWasm { - fn default() -> Self { - PeerStoreWasm { - dump_fn: Box::new(|_| {}), - load_fn: Box::new(|| Vec::new()), - } - } + pub secret_key: [u8; 32], } /// Chain synchronization config options. @@ -318,12 +288,14 @@ impl Config { /// Reads the secret key from secret key file. /// /// If the key file does not exists, it returns `Ok(None)`. + #[cfg(not(target_family = "wasm"))] fn read_secret_key(&self) -> Result, Error> { let path = self.secret_key_path(); read_secret_key(path) } /// Generates a random secret key and saves it into the file. + #[cfg(not(target_family = "wasm"))] fn write_secret_key_to_file(&self) -> Result<(), Error> { let path = self.secret_key_path(); let random_key_pair = generate_random_key(); @@ -344,17 +316,15 @@ impl Config { #[cfg(target_family = "wasm")] pub fn fetch_private_key(&self) -> Result { - self.secret_key - .clone() - .ok_or(Error::new( + if self.secret_key == [0; 32] { + return Err(Error::new( ErrorKind::InvalidData, "invalid secret key data", - )) - .or_else(|_| Ok(generate_random_key())) - .map(|secret| { - secio::SecioKeyPair::secp256k1_raw_key(&secret) - .expect("network secret key is invalid") - }) + )); + } else { + secio::SecioKeyPair::secp256k1_raw_key(&self.secret_key) + .map_err(|_| Error::new(ErrorKind::InvalidData, "invalid secret key data")) + } } /// Gets the list of whitelist peers.