From 99c91d95a1d664ffdc9700ef492a00bd76c9c5d1 Mon Sep 17 00:00:00 2001 From: MiniFrenchBread <103425574+MiniFrenchBread@users.noreply.github.com> Date: Fri, 15 Nov 2024 15:31:06 +0800 Subject: [PATCH] refactor: storage (#32) * rebase to 0g-storage-node log-entry-sync * modified log-entry-sync * feat: sync log-entry-sync * update: dependency * feat: update log entry sycn * refactor: storage * fix: flow store * chore: update dependency --- Cargo.lock | 63 +++-- Cargo.toml | 14 +- node/kv_types/src/lib.rs | 52 +++- node/log_entry_sync/Cargo.toml | 4 +- .../log_entry_sync/src/sync_manager/config.rs | 7 + .../src/sync_manager/log_entry_fetcher.rs | 87 +++---- .../src/sync_manager/metrics.rs | 10 +- node/log_entry_sync/src/sync_manager/mod.rs | 52 +--- node/rpc/Cargo.toml | 2 +- node/src/client/builder.rs | 27 +-- node/src/config/convert.rs | 3 +- node/src/config/mod.rs | 2 + .../src/store/data_store.rs | 180 ++++++++++++++ .../src/store/flow_store.rs | 181 ++++++++++++++ .../src/store/metadata_store.rs | 45 ---- node/storage_with_stream/src/store/mod.rs | 94 ++------ .../src/store/store_manager.rs | 228 +++--------------- .../storage_with_stream/src/store/tx_store.rs | 210 ++++++++++++++++ node/stream/Cargo.toml | 4 +- node/stream/src/stream_manager/mod.rs | 14 +- .../src/stream_manager/stream_data_fetcher.rs | 56 ++--- .../src/stream_manager/stream_replayer.rs | 108 ++++----- 22 files changed, 861 insertions(+), 582 deletions(-) create mode 100644 node/storage_with_stream/src/store/data_store.rs create mode 100644 node/storage_with_stream/src/store/flow_store.rs delete mode 100644 node/storage_with_stream/src/store/metadata_store.rs create mode 100644 node/storage_with_stream/src/store/tx_store.rs diff --git a/Cargo.lock b/Cargo.lock index 6f6f51c..1aa2607 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,7 +163,7 @@ dependencies = [ [[package]] name = "append_merkle" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "anyhow", "eth2_ssz", @@ -172,6 +172,7 @@ dependencies = [ "itertools 0.13.0", "lazy_static", "lru 0.12.5", + "metrics 0.1.0 (git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603)", "once_cell", "serde", "tiny-keccak", @@ -834,7 +835,7 @@ dependencies = [ [[package]] name = "channel" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "metrics 0.1.0 (git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603)", "tokio", @@ -857,12 +858,14 @@ dependencies = [ [[package]] name = "chunk_pool" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "anyhow", "async-lock 2.8.0", "hashlink 0.8.4", - "log_entry_sync 0.1.0 (git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8)", + "lazy_static", + "log_entry_sync 0.1.0 (git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54)", + "metrics 0.1.0 (git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603)", "network", "shared_types", "storage-async", @@ -1044,7 +1047,7 @@ checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" [[package]] name = "contract-interface" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "ethers", "serde_json", @@ -1388,7 +1391,7 @@ dependencies = [ [[package]] name = "directory" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" [[package]] name = "dirs" @@ -2232,7 +2235,7 @@ checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" [[package]] name = "file_location_cache" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "hashlink 0.8.4", "lazy_static", @@ -2763,7 +2766,7 @@ dependencies = [ [[package]] name = "hashset_delay" version = "0.2.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "futures", "tokio-util 0.6.10", @@ -4432,7 +4435,7 @@ dependencies = [ [[package]] name = "lighthouse_metrics" version = "0.2.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "lazy_static", "prometheus", @@ -4534,6 +4537,7 @@ dependencies = [ "kv_types", "lazy_static", "metrics 0.1.0 (git+https://github.com/Conflux-Chain/conflux-rust.git?rev=992ebc5483d937c8f6b883e266f8ed2a67a7fa9a)", + "reqwest", "serde_json", "shared_types", "storage", @@ -4541,12 +4545,13 @@ dependencies = [ "task_executor", "thiserror", "tokio", + "url", ] [[package]] name = "log_entry_sync" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "anyhow", "append_merkle", @@ -4560,12 +4565,14 @@ dependencies = [ "jsonrpsee", "lazy_static", "metrics 0.1.0 (git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603)", + "reqwest", "serde_json", "shared_types", "storage", "task_executor", "thiserror", "tokio", + "url", ] [[package]] @@ -4635,7 +4642,7 @@ checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "merkle_light" version = "0.4.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "rayon", ] @@ -4643,7 +4650,7 @@ dependencies = [ [[package]] name = "merkle_tree" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "merkle_light", "tiny-keccak", @@ -4697,7 +4704,7 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miner" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "async-trait", "blake2", @@ -4956,8 +4963,9 @@ dependencies = [ [[package]] name = "network" version = "0.2.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ + "channel", "directory", "dirs 4.0.0", "discv5", @@ -6375,7 +6383,7 @@ dependencies = [ "kv_types", "merkle_light", "merkle_tree", - "rpc 0.1.0 (git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8)", + "rpc 0.1.0 (git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54)", "serde", "serde_json", "shared_types", @@ -6388,7 +6396,7 @@ dependencies = [ [[package]] name = "rpc" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "append_merkle", "base64 0.13.1", @@ -6892,7 +6900,7 @@ dependencies = [ [[package]] name = "shared_types" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "anyhow", "append_merkle", @@ -7104,7 +7112,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "storage" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "anyhow", "append_merkle", @@ -7117,8 +7125,10 @@ dependencies = [ "kvdb", "kvdb-memorydb", "kvdb-rocksdb", + "lazy_static", "merkle_light", "merkle_tree", + "metrics 0.1.0 (git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603)", "once_cell", "parking_lot 0.12.3", "rayon", @@ -7138,9 +7148,10 @@ dependencies = [ [[package]] name = "storage-async" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "anyhow", + "backtrace", "eth2_ssz", "shared_types", "storage", @@ -7195,7 +7206,7 @@ dependencies = [ "jsonrpsee", "kv_types", "rpc 0.1.0", - "rpc 0.1.0 (git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8)", + "rpc 0.1.0 (git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54)", "rusqlite", "serde_json", "shared_types", @@ -7321,7 +7332,7 @@ dependencies = [ [[package]] name = "sync" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "anyhow", "append_merkle", @@ -7331,7 +7342,7 @@ dependencies = [ "file_location_cache", "lazy_static", "libp2p", - "log_entry_sync 0.1.0 (git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8)", + "log_entry_sync 0.1.0 (git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54)", "metrics 0.1.0 (git+https://github.com/Conflux-Chain/conflux-rust.git?rev=c4734e337c66d38e6396742cd5117b596e8d2603)", "network", "rand 0.8.5", @@ -7409,7 +7420,7 @@ checksum = "c63f48baada5c52e65a29eef93ab4f8982681b67f9e8d29c7b05abcfec2b9ffe" [[package]] name = "task_executor" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "exit-future", "futures", @@ -8820,7 +8831,7 @@ dependencies = [ [[package]] name = "zgs_seal" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "ethereum-types 0.14.1", "tiny-keccak", @@ -8830,12 +8841,12 @@ dependencies = [ [[package]] name = "zgs_spec" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" [[package]] name = "zgs_version" version = "0.1.0" -source = "git+https://github.com/0glabs/0g-storage-node.git?rev=da2cdec8a1ddfd52333a6c10141df31611d0e1f8#da2cdec8a1ddfd52333a6c10141df31611d0e1f8" +source = "git+https://github.com/0glabs/0g-storage-node.git?rev=96f846073f111adfa31df926be0e4a25e0b85f54#96f846073f111adfa31df926be0e4a25e0b85f54" dependencies = [ "git-version", "target_info", diff --git a/Cargo.toml b/Cargo.toml index 641d3f0..b3fface 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,11 +16,11 @@ enr = { path = "version-meld/enr" } discv5 = { path = "version-meld/discv5" } [workspace.dependencies] -append_merkle = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8" } -merkle_light = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8" } -merkle_tree = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8" } -shared_types = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8" } -task_executor = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8" } -storage = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8", package = "storage" } -contract-interface = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8" } +append_merkle = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "96f846073f111adfa31df926be0e4a25e0b85f54" } +merkle_light = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "96f846073f111adfa31df926be0e4a25e0b85f54" } +merkle_tree = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "96f846073f111adfa31df926be0e4a25e0b85f54" } +shared_types = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "96f846073f111adfa31df926be0e4a25e0b85f54" } +task_executor = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "96f846073f111adfa31df926be0e4a25e0b85f54" } +storage = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "96f846073f111adfa31df926be0e4a25e0b85f54", package = "storage" } +contract-interface = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "96f846073f111adfa31df926be0e4a25e0b85f54" } metrics = { git = "https://github.com/Conflux-Chain/conflux-rust.git", rev = "992ebc5483d937c8f6b883e266f8ed2a67a7fa9a" } \ No newline at end of file diff --git a/node/kv_types/src/lib.rs b/node/kv_types/src/lib.rs index f40e80a..ebc38bc 100644 --- a/node/kv_types/src/lib.rs +++ b/node/kv_types/src/lib.rs @@ -1,9 +1,11 @@ use ethereum_types::{H160, H256}; use serde::{Deserialize, Serialize}; -use shared_types::Transaction; +use shared_types::{DataRoot, Transaction, TxID}; +use ssz::Encode; use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode}; use std::collections::HashSet; +use tiny_keccak::{Hasher, Keccak}; use std::sync::Arc; @@ -29,16 +31,52 @@ pub fn submission_topic_to_stream_ids(topic: Vec) -> Vec { #[derive(Clone, Debug, Eq, PartialEq, DeriveDecode, DeriveEncode, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] -pub struct KVMetadata { +pub struct KVTransaction { pub stream_ids: Vec, pub sender: H160, + pub data_merkle_root: DataRoot, + /// `(subtree_depth, subtree_root)` + pub merkle_nodes: Vec<(usize, DataRoot)>, + + pub start_entry_index: u64, + pub size: u64, + pub seq: u64, } -#[derive(Clone, Debug, Eq, PartialEq, DeriveDecode, DeriveEncode, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct KVTransaction { - pub metadata: KVMetadata, - pub transaction: Transaction, +impl KVTransaction { + pub fn num_entries_of_node(depth: usize) -> usize { + 1 << (depth - 1) + } + + pub fn num_entries_of_list(merkle_nodes: &[(usize, DataRoot)]) -> usize { + merkle_nodes.iter().fold(0, |size, &(depth, _)| { + size + Transaction::num_entries_of_node(depth) + }) + } + + pub fn num_entries(&self) -> usize { + Self::num_entries_of_list(&self.merkle_nodes) + } + + pub fn hash(&self) -> H256 { + let bytes = self.as_ssz_bytes(); + let mut h = Keccak::v256(); + let mut e = H256::zero(); + h.update(&bytes); + h.finalize(e.as_mut()); + e + } + + pub fn id(&self) -> TxID { + TxID { + seq: self.seq, + hash: self.hash(), + } + } + + pub fn start_entry_index(&self) -> u64 { + self.start_entry_index + } } #[derive(Debug)] diff --git a/node/log_entry_sync/Cargo.toml b/node/log_entry_sync/Cargo.toml index 5e95c6d..dbebc79 100644 --- a/node/log_entry_sync/Cargo.toml +++ b/node/log_entry_sync/Cargo.toml @@ -25,4 +25,6 @@ futures-core = "0.3.28" futures-util = "0.3.28" thiserror = "1.0.44" lazy_static = "1.4.0" -metrics = { workspace = true } \ No newline at end of file +metrics = { workspace = true } +reqwest = {version = "0.11", features = ["json"]} +url = { version = "2.4", default-features = false } diff --git a/node/log_entry_sync/src/sync_manager/config.rs b/node/log_entry_sync/src/sync_manager/config.rs index d9d281f..5b0d190 100644 --- a/node/log_entry_sync/src/sync_manager/config.rs +++ b/node/log_entry_sync/src/sync_manager/config.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::ContractAddress; pub struct LogSyncConfig { @@ -34,6 +36,9 @@ pub struct LogSyncConfig { pub watch_loop_wait_time_ms: u64, // force to sync log from start block number pub force_log_sync_from_start_block_number: bool, + + // the timeout for blockchain rpc connection + pub blockchain_rpc_timeout: Duration, } #[derive(Clone)] @@ -61,6 +66,7 @@ impl LogSyncConfig { remove_finalized_block_interval_minutes: u64, watch_loop_wait_time_ms: u64, force_log_sync_from_start_block_number: bool, + blockchain_rpc_timeout: Duration, ) -> Self { Self { rpc_endpoint_url, @@ -77,6 +83,7 @@ impl LogSyncConfig { remove_finalized_block_interval_minutes, watch_loop_wait_time_ms, force_log_sync_from_start_block_number, + blockchain_rpc_timeout, } } } diff --git a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs index e9abc42..b5dc6bb 100644 --- a/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs +++ b/node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs @@ -1,6 +1,6 @@ use crate::sync_manager::log_query::LogQuery; -use crate::sync_manager::RETRY_WAIT_MS; -use crate::ContractAddress; +use crate::sync_manager::{metrics, RETRY_WAIT_MS}; +use crate::{ContractAddress, LogSyncConfig}; use anyhow::{anyhow, bail, Result}; use append_merkle::{Algorithm, Sha3Algorithm}; use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow}; @@ -10,21 +10,17 @@ use ethers::providers::{HttpRateLimitRetryPolicy, RetryClient, RetryClientBuilde use ethers::types::{Block, Log, H256}; use futures::StreamExt; use jsonrpsee::tracing::{debug, error, info, warn}; -use kv_types::{submission_topic_to_stream_ids, KVMetadata, KVTransaction}; -use shared_types::{DataRoot, Transaction}; +use kv_types::{submission_topic_to_stream_ids, KVTransaction}; +use shared_types::DataRoot; use std::collections::{BTreeMap, HashMap}; -use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use storage::log_store::tx_store::BlockHashAndSubmissionIndex; use storage_with_stream::Store; use task_executor::TaskExecutor; -use tokio::{ - sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - RwLock, - }, - time::Instant, +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + RwLock, }; pub struct LogEntryFetcher { @@ -36,28 +32,29 @@ pub struct LogEntryFetcher { } impl LogEntryFetcher { - pub async fn new( - url: &str, - contract_address: ContractAddress, - log_page_size: u64, - confirmation_delay: u64, - rate_limit_retries: u32, - timeout_retries: u32, - initial_backoff: u64, - ) -> Result { + pub async fn new(config: &LogSyncConfig) -> Result { let provider = Arc::new(Provider::new( RetryClientBuilder::default() - .rate_limit_retries(rate_limit_retries) - .timeout_retries(timeout_retries) - .initial_backoff(Duration::from_millis(initial_backoff)) - .build(Http::from_str(url)?, Box::new(HttpRateLimitRetryPolicy)), + .rate_limit_retries(config.rate_limit_retries) + .timeout_retries(config.timeout_retries) + .initial_backoff(Duration::from_millis(config.initial_backoff)) + .build( + Http::new_with_client( + url::Url::parse(&config.rpc_endpoint_url)?, + reqwest::Client::builder() + .timeout(config.blockchain_rpc_timeout) + .connect_timeout(config.blockchain_rpc_timeout) + .build()?, + ), + Box::new(HttpRateLimitRetryPolicy), + ), )); // TODO: `error` types are removed from the ABI json file. Ok(Self { - contract_address, + contract_address: config.contract_address, provider, - log_page_size, - confirmation_delay, + log_page_size: config.log_page_size, + confirmation_delay: config.confirmation_block_count, }) } @@ -246,6 +243,7 @@ impl LogEntryFetcher { ); let (mut block_hash_sent, mut block_number_sent) = (None, None); while let Some(maybe_log) = stream.next().await { + let start_time = Instant::now(); match maybe_log { Ok(log) => { let sync_progress = @@ -305,6 +303,7 @@ impl LogEntryFetcher { tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await; } } + metrics::RECOVER_LOG.update_since(start_time); } info!("log recover end"); @@ -766,25 +765,19 @@ pub enum LogFetchProgress { fn submission_event_to_transaction(e: SubmitFilter, block_number: u64) -> LogFetchProgress { LogFetchProgress::Transaction(( KVTransaction { - metadata: KVMetadata { - stream_ids: submission_topic_to_stream_ids(e.submission.tags.to_vec()), - sender: e.sender, - }, - transaction: Transaction { - data: vec![], - stream_ids: vec![], - data_merkle_root: nodes_to_root(&e.submission.nodes), - merkle_nodes: e - .submission - .nodes - .iter() - // the submission height is the height of the root node starting from height 0. - .map(|SubmissionNode { root, height }| (height.as_usize() + 1, root.into())) - .collect(), - start_entry_index: e.start_pos.as_u64(), - size: e.submission.length.as_u64(), - seq: e.submission_index.as_u64(), - }, + stream_ids: submission_topic_to_stream_ids(e.submission.tags.to_vec()), + sender: e.sender, + data_merkle_root: nodes_to_root(&e.submission.nodes), + merkle_nodes: e + .submission + .nodes + .iter() + // the submission height is the height of the root node starting from height 0. + .map(|SubmissionNode { root, height }| (height.as_usize() + 1, root.into())) + .collect(), + start_entry_index: e.start_pos.as_u64(), + size: e.submission.length.as_u64(), + seq: e.submission_index.as_u64(), }, block_number, )) diff --git a/node/log_entry_sync/src/sync_manager/metrics.rs b/node/log_entry_sync/src/sync_manager/metrics.rs index 37bee24..c2ba946 100644 --- a/node/log_entry_sync/src/sync_manager/metrics.rs +++ b/node/log_entry_sync/src/sync_manager/metrics.rs @@ -1,7 +1,13 @@ use std::sync::Arc; -use metrics::{register_timer, Timer}; +use metrics::{register_timer, Gauge, GaugeUsize, Timer}; lazy_static::lazy_static! { - pub static ref STORE_PUT_TX: Arc = register_timer("log_entry_sync_store_put_tx"); + pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc = register_timer("log_manager_handle_data_transaction"); + + pub static ref STORE_PUT_TX: Arc = register_timer("log_entry_sync_manager_put_tx_inner"); + + pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes"); + + pub static ref RECOVER_LOG: Arc = register_timer("log_entry_sync_manager_recover_log"); } diff --git a/node/log_entry_sync/src/sync_manager/mod.rs b/node/log_entry_sync/src/sync_manager/mod.rs index b7753db..a2734ef 100644 --- a/node/log_entry_sync/src/sync_manager/mod.rs +++ b/node/log_entry_sync/src/sync_manager/mod.rs @@ -85,16 +85,7 @@ impl LogSyncManager { .expect("shutdown send error") }, async move { - let log_fetcher = LogEntryFetcher::new( - &config.rpc_endpoint_url, - config.contract_address, - config.log_page_size, - config.confirmation_block_count, - config.rate_limit_retries, - config.timeout_retries, - config.initial_backoff, - ) - .await?; + let log_fetcher = LogEntryFetcher::new(&config).await?; let block_hash_cache = Arc::new(RwLock::new( store @@ -301,7 +292,7 @@ impl LogSyncManager { async fn put_tx(&mut self, tx: KVTransaction) -> Option { // We call this after process chain reorg, so the sequence number should match. - match tx.transaction.seq.cmp(&self.next_tx_seq) { + match tx.seq.cmp(&self.next_tx_seq) { std::cmp::Ordering::Less => Some(true), std::cmp::Ordering::Equal => { debug!("log entry sync get entry: {:?}", tx); @@ -310,7 +301,7 @@ impl LogSyncManager { std::cmp::Ordering::Greater => { error!( "Unexpected transaction seq: next={} get={}", - self.next_tx_seq, tx.transaction.seq + self.next_tx_seq, tx.seq ); None } @@ -436,7 +427,6 @@ impl LogSyncManager { async fn put_tx_inner(&mut self, tx: KVTransaction) -> bool { let start_time = Instant::now(); let result = self.store.write().await.put_tx(tx.clone()); - metrics::STORE_PUT_TX.update_since(start_time); if let Err(e) = result { error!("put_tx error: e={:?}", e); @@ -444,39 +434,9 @@ impl LogSyncManager { } else { self.next_tx_seq += 1; - // Check if the computed data root matches on-chain state. - // If the call fails, we won't check the root here and return `true` directly. - let flow_contract = self.log_fetcher.flow_contract(); - match flow_contract - .get_flow_root_by_tx_seq(tx.transaction.seq.into()) - .call() - .await - { - Ok(contract_root_bytes) => { - let contract_root = H256::from_slice(&contract_root_bytes); - // contract_root is zero for tx submitted before upgrading. - if !contract_root.is_zero() { - match self.store.read().await.get_context() { - Ok((local_root, _)) => { - if contract_root != local_root { - error!( - ?contract_root, - ?local_root, - "local flow root and on-chain flow root mismatch" - ); - return false; - } - } - Err(e) => { - warn!(?e, "fail to read the local flow root"); - } - } - } - } - Err(e) => { - warn!(?e, "fail to read the on-chain flow root"); - } - } + metrics::STORE_PUT_TX_SPEED_IN_BYTES + .update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize); + metrics::STORE_PUT_TX.update_since(start_time); true } diff --git a/node/rpc/Cargo.toml b/node/rpc/Cargo.toml index baa2a38..6361423 100644 --- a/node/rpc/Cargo.toml +++ b/node/rpc/Cargo.toml @@ -17,7 +17,7 @@ tokio = { version = "1.19.2", features = ["macros", "sync"] } tracing = "0.1.35" merkle_light = { workspace = true } merkle_tree = { workspace = true } -zgs_rpc = { git = "https://github.com/0glabs/0g-storage-node.git", rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8", package = "rpc" } +zgs_rpc = { git = "https://github.com/0glabs/0g-storage-node.git", rev = "96f846073f111adfa31df926be0e4a25e0b85f54", package = "rpc" } futures-channel = "^0.3" ethereum-types = "0.14" storage_with_stream = { path = "../storage_with_stream" } diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 8da2b6a..9a7742d 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -3,7 +3,7 @@ use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager}; use rpc::HttpClient; use rpc::RPCConfig; use std::sync::Arc; -use storage_with_stream::log_store::log_manager::LogConfig; + use storage_with_stream::Store; use storage_with_stream::{StorageConfig, StoreManager}; use stream::{StreamConfig, StreamManager}; @@ -47,14 +47,10 @@ impl ClientBuilder { /// Initializes in-memory storage. pub async fn with_memory_store(mut self) -> Result { - let executor = require!("storage", self, runtime_context).clone().executor; - - // TODO(zz): Set config. - let store = Arc::new(RwLock::new( - StoreManager::memorydb(LogConfig::default(), executor) - .await - .map_err(|e| format!("Unable to start in-memory store: {:?}", e))?, - )); + let store = + Arc::new(RwLock::new(StoreManager::memorydb().await.map_err( + |e| format!("Unable to start in-memory store: {:?}", e), + )?)); self.store = Some(store); @@ -63,17 +59,10 @@ impl ClientBuilder { /// Initializes RocksDB storage. pub async fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result { - let executor = require!("storage", self, runtime_context).clone().executor; - let store = Arc::new(RwLock::new( - StoreManager::rocks_db( - LogConfig::default(), - &config.log_config.db_dir, - &config.kv_db_file, - executor, - ) - .await - .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, + StoreManager::rocks_db(&config.log_config.db_dir, &config.kv_db_file) + .await + .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, )); self.store = Some(store); diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index 6a0f69f..7fc869e 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -1,6 +1,6 @@ #![allow(clippy::field_reassign_with_default)] -use std::{collections::HashSet, str::FromStr}; +use std::{collections::HashSet, str::FromStr, time::Duration}; use crate::ZgsKVConfig; use ethereum_types::H256; @@ -95,6 +95,7 @@ impl ZgsKVConfig { self.remove_finalized_block_interval_minutes, self.watch_loop_wait_time_ms, self.force_log_sync_from_start_block_number, + Duration::from_secs(self.blockchain_rpc_timeout_secs), )) } } diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index 190544e..38012ec 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -27,6 +27,8 @@ build_config! { (remove_finalized_block_interval_minutes, (u64), 30) (watch_loop_wait_time_ms, (u64), 500) + (blockchain_rpc_timeout_secs, (u64), 120) + // rpc (rpc_enabled, (bool), true) diff --git a/node/storage_with_stream/src/store/data_store.rs b/node/storage_with_stream/src/store/data_store.rs new file mode 100644 index 0000000..f15786f --- /dev/null +++ b/node/storage_with_stream/src/store/data_store.rs @@ -0,0 +1,180 @@ +use std::{path::Path, sync::Arc, time::Instant}; + +use anyhow::{anyhow, bail, Result}; +use kv_types::KVTransaction; +use kvdb_rocksdb::{Database, DatabaseConfig}; +use shared_types::{compute_padded_chunk_size, ChunkArray, FlowProof}; + +use storage::{ + log_store::{ + log_manager::{bytes_to_entries, ENTRY_SIZE, PORA_CHUNK_SIZE}, + tx_store::BlockHashAndSubmissionIndex, + }, + H256, +}; +use tracing::{debug, instrument, trace}; + +use super::{ + flow_store::{batch_iter, FlowStore}, + tx_store::TransactionStore, +}; + +pub const COL_TX: u32 = 0; +pub const COL_ENTRY_BATCH: u32 = 1; +pub const COL_TX_COMPLETED: u32 = 2; +pub const COL_MISC: u32 = 3; +pub const COL_BLOCK_PROGRESS: u32 = 4; +pub const COL_NUM: u32 = 5; + +pub struct DataStore { + flow_store: FlowStore, + tx_store: TransactionStore, +} + +impl DataStore { + pub fn rocksdb(path: impl AsRef) -> Result { + let mut db_config = DatabaseConfig::with_columns(COL_NUM); + db_config.enable_statistics = true; + let db = Arc::new(Database::open(&db_config, path)?); + Ok(Self { + flow_store: FlowStore::new(db.clone()), + tx_store: TransactionStore::new(db.clone())?, + }) + } + + pub fn memorydb() -> Self { + let db = Arc::new(kvdb_memorydb::create(COL_NUM)); + Self { + flow_store: FlowStore::new(db.clone()), + tx_store: TransactionStore::new(db.clone()).unwrap(), + } + } + + #[instrument(skip(self))] + pub fn put_tx(&self, tx: KVTransaction) -> Result<()> { + self.tx_store.put_tx(tx) + } + + pub fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()> { + self.tx_store.put_progress(progress) + } + + pub fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> { + self.tx_store.delete_block_hash_by_number(block_number) + } + + pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> { + self.tx_store.put_log_latest_block_number(block_number) + } + + pub fn get_tx_by_seq_number(&self, seq: u64) -> Result> { + self.tx_store.get_tx_by_seq_number(seq) + } + + pub fn check_tx_completed(&self, tx_seq: u64) -> Result { + self.tx_store.check_tx_completed(tx_seq) + } + + pub fn get_sync_progress(&self) -> Result> { + self.tx_store.get_progress() + } + + pub fn get_log_latest_block_number(&self) -> Result> { + self.tx_store.get_log_latest_block_number() + } + + pub fn get_block_hashes(&self) -> Result> { + self.tx_store.get_block_hashes() + } + + pub fn next_tx_seq(&self) -> u64 { + self.tx_store.next_tx_seq() + } + + pub fn revert_to(&self, tx_seq: u64) -> Result<()> { + let start = if tx_seq != u64::MAX { tx_seq + 1 } else { 0 }; + let removed_txs = self.tx_store.remove_tx_after(start)?; + if !removed_txs.is_empty() { + let start_index = removed_txs.first().unwrap().start_entry_index; + self.flow_store.truncate(start_index)?; + } + Ok(()) + } + + pub fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result { + let _start_time = Instant::now(); + trace!( + "finalize_tx_with_hash: tx_seq={} tx_hash={:?}", + tx_seq, + tx_hash + ); + let tx = self + .tx_store + .get_tx_by_seq_number(tx_seq)? + .ok_or_else(|| anyhow!("finalize_tx with tx missing: tx_seq={}", tx_seq))?; + debug!("finalize_tx_with_hash: tx={:?}", tx); + if tx.hash() != tx_hash { + return Ok(false); + } + + let tx_end_index = tx.start_entry_index + bytes_to_entries(tx.size); + if self.check_data_completed(tx.start_entry_index, tx_end_index)? { + self.tx_store.finalize_tx(tx_seq)?; + Ok(true) + } else { + bail!("finalize tx hash with data missing: tx_seq={}", tx_seq) + } + } + + pub fn check_data_completed(&self, start: u64, end: u64) -> Result { + for (batch_start, batch_end) in batch_iter(start, end, PORA_CHUNK_SIZE) { + if self + .flow_store + .get_entries(batch_start, batch_end)? + .is_none() + { + return Ok(false); + } + } + Ok(true) + } + + pub fn put_chunks_with_tx_hash( + &self, + tx_seq: u64, + tx_hash: H256, + chunks: ChunkArray, + _maybe_file_proof: Option, + ) -> Result { + let tx = self + .tx_store + .get_tx_by_seq_number(tx_seq)? + .ok_or_else(|| anyhow!("put chunks with missing tx: tx_seq={}", tx_seq))?; + if tx.hash() != tx_hash { + return Ok(false); + } + let (chunks_for_proof, _) = compute_padded_chunk_size(tx.size as usize); + if chunks.start_index.saturating_mul(ENTRY_SIZE as u64) + chunks.data.len() as u64 + > (chunks_for_proof * ENTRY_SIZE) as u64 + { + bail!( + "put chunks with data out of tx range: tx_seq={} start_index={} data_len={}", + tx_seq, + chunks.start_index, + chunks.data.len() + ); + } + // TODO: Use another struct to avoid confusion. + let mut flow_entry_array = chunks; + flow_entry_array.start_index += tx.start_entry_index; + self.flow_store.append_entries(flow_entry_array)?; + Ok(true) + } + + pub fn get_chunk_by_flow_index(&self, index: u64, length: u64) -> Result> { + let start_flow_index = index; + let end_flow_index = index + length; + self.flow_store + .get_entries(start_flow_index, end_flow_index) + } +} diff --git a/node/storage_with_stream/src/store/flow_store.rs b/node/storage_with_stream/src/store/flow_store.rs new file mode 100644 index 0000000..a83ba4a --- /dev/null +++ b/node/storage_with_stream/src/store/flow_store.rs @@ -0,0 +1,181 @@ +use std::{cmp, sync::Arc}; + +use anyhow::{anyhow, bail, Result}; + +use shared_types::ChunkArray; +use ssz::{Decode, Encode}; +use storage::{ + error::Error, + log_store::{ + load_chunk::EntryBatchData, + log_manager::{bytes_to_entries, ENTRY_SIZE}, + }, + ZgsKeyValueDB, +}; +use tracing::{error, trace}; + +use crate::try_option; + +use super::data_store::COL_ENTRY_BATCH; + +pub const ENTRY_BATCH_SIZE: usize = 1024; + +fn try_decode_usize(data: &[u8]) -> Result { + Ok(usize::from_be_bytes( + data.try_into().map_err(|e| anyhow!("{:?}", e))?, + )) +} + +fn decode_batch_index(data: &[u8]) -> Result { + try_decode_usize(data) +} + +pub struct FlowStore { + kvdb: Arc, +} + +impl FlowStore { + pub fn new(kvdb: Arc) -> Self { + Self { kvdb } + } + + pub fn truncate(&self, start_index: u64) -> crate::error::Result<()> { + let mut tx = self.kvdb.transaction(); + let mut start_batch_index = start_index / ENTRY_BATCH_SIZE as u64; + let first_batch_offset = start_index as usize % ENTRY_BATCH_SIZE; + if first_batch_offset != 0 { + if let Some(mut first_batch) = self.get_entry_batch(start_batch_index)? { + first_batch.truncate(first_batch_offset); + if !first_batch.is_empty() { + tx.put( + COL_ENTRY_BATCH, + &start_batch_index.to_be_bytes(), + &first_batch.as_ssz_bytes(), + ); + } else { + tx.delete(COL_ENTRY_BATCH, &start_batch_index.to_be_bytes()); + } + } + + start_batch_index += 1; + } + // TODO: `kvdb` and `kvdb-rocksdb` does not support `seek_to_last` yet. + // We'll need to fork it or use another wrapper for a better performance in this. + let end = match self.kvdb.iter(COL_ENTRY_BATCH).last() { + Some(Ok((k, _))) => decode_batch_index(k.as_ref())?, + Some(Err(e)) => { + error!("truncate db error: e={:?}", e); + return Err(e.into()); + } + None => { + // The db has no data, so we can just return; + return Ok(()); + } + }; + for batch_index in start_batch_index as usize..=end { + tx.delete(COL_ENTRY_BATCH, &batch_index.to_be_bytes()); + } + self.kvdb.write(tx)?; + Ok(()) + } + + fn get_entry_batch(&self, batch_index: u64) -> Result> { + let raw = try_option!(self.kvdb.get(COL_ENTRY_BATCH, &batch_index.to_be_bytes())?); + Ok(Some( + EntryBatchData::from_ssz_bytes(&raw).map_err(Error::from)?, + )) + } + + fn put_entry_batch_list(&self, batch_list: Vec<(u64, EntryBatchData)>) -> Result<()> { + let mut tx = self.kvdb.transaction(); + for (batch_index, batch) in batch_list { + tx.put( + COL_ENTRY_BATCH, + &batch_index.to_be_bytes(), + &batch.as_ssz_bytes(), + ); + } + self.kvdb.write(tx)?; + Ok(()) + } + + pub fn append_entries(&self, data: ChunkArray) -> Result<()> { + trace!("append_entries: {} {}", data.start_index, data.data.len()); + if data.data.len() % ENTRY_SIZE != 0 { + bail!("append_entries: invalid data size, len={}", data.data.len()); + } + let mut batch_list = Vec::new(); + for (start_entry_index, end_entry_index) in batch_iter( + data.start_index, + data.start_index + bytes_to_entries(data.data.len() as u64), + ENTRY_BATCH_SIZE, + ) { + let chunk = data + .sub_array(start_entry_index, end_entry_index) + .expect("in range"); + + let chunk_index = chunk.start_index / ENTRY_BATCH_SIZE as u64; + + let mut batch = self + .get_entry_batch(chunk_index)? + .unwrap_or_else(EntryBatchData::new); + let start_byte = (chunk.start_index % ENTRY_BATCH_SIZE as u64) as usize * ENTRY_SIZE; + // check data existance + if chunk.data.is_empty() || batch.get(start_byte, chunk.data.len()).is_some() { + continue; + } + batch.insert_data( + (chunk.start_index % ENTRY_BATCH_SIZE as u64) as usize * ENTRY_SIZE, + chunk.data, + )?; + + batch_list.push((chunk_index, batch)); + } + + self.put_entry_batch_list(batch_list) + } + + pub fn get_entries(&self, index_start: u64, index_end: u64) -> Result> { + if index_end <= index_start { + bail!( + "invalid entry index: start={} end={}", + index_start, + index_end + ); + } + let mut data = Vec::with_capacity((index_end - index_start) as usize * ENTRY_SIZE); + for (start_entry_index, end_entry_index) in + batch_iter(index_start, index_end, ENTRY_BATCH_SIZE) + { + let chunk_index = start_entry_index / ENTRY_BATCH_SIZE as u64; + let mut offset = start_entry_index - chunk_index * ENTRY_BATCH_SIZE as u64; + let mut length = end_entry_index - start_entry_index; + + // Tempfix: for first chunk, its offset is always 1 + if chunk_index == 0 && offset == 0 { + offset = 1; + length -= 1; + } + + let entry_batch = try_option!(self.get_entry_batch(chunk_index)?); + let entry_batch_data = try_option!( + entry_batch.get(offset as usize * ENTRY_SIZE, length as usize * ENTRY_SIZE) + ); + data.append(&mut entry_batch_data.to_vec()); + } + Ok(Some(ChunkArray { + data, + start_index: index_start, + })) + } +} + +pub fn batch_iter(start: u64, end: u64, batch_size: usize) -> Vec<(u64, u64)> { + let mut list = Vec::new(); + for i in (start / batch_size as u64 * batch_size as u64..end).step_by(batch_size) { + let batch_start = cmp::max(start, i); + let batch_end = cmp::min(end, i + batch_size as u64); + list.push((batch_start, batch_end)); + } + list +} diff --git a/node/storage_with_stream/src/store/metadata_store.rs b/node/storage_with_stream/src/store/metadata_store.rs deleted file mode 100644 index a571110..0000000 --- a/node/storage_with_stream/src/store/metadata_store.rs +++ /dev/null @@ -1,45 +0,0 @@ -use std::{path::Path, sync::Arc}; - -use anyhow::Result; -use kv_types::KVMetadata; -use kvdb_rocksdb::{Database, DatabaseConfig}; -use ssz::{Decode, Encode}; -use storage::{error::Error, log_store::log_manager::COL_TX, ZgsKeyValueDB}; -use tracing::instrument; - -use crate::try_option; - -const COL_METADATA: u32 = 0; -const COL_NUM: u32 = 1; - -pub struct MetadataStore { - kvdb: Arc, -} - -impl MetadataStore { - pub fn rocksdb(path: impl AsRef) -> Result { - let mut db_config = DatabaseConfig::with_columns(COL_NUM); - db_config.enable_statistics = true; - let db = Arc::new(Database::open(&db_config, path)?); - Ok(Self { kvdb: db }) - } - - pub fn memorydb() -> Self { - let db = Arc::new(kvdb_memorydb::create(COL_NUM)); - Self { kvdb: db } - } - - #[instrument(skip(self))] - pub fn put_metadata(&self, seq: u64, metadata: KVMetadata) -> Result<()> { - let mut db_tx = self.kvdb.transaction(); - db_tx.put(COL_METADATA, &seq.to_be_bytes(), &metadata.as_ssz_bytes()); - self.kvdb.write(db_tx)?; - Ok(()) - } - - pub fn get_metadata_by_seq_number(&self, seq: u64) -> Result> { - let value = try_option!(self.kvdb.get(COL_TX, &seq.to_be_bytes())?); - let metadata = KVMetadata::from_ssz_bytes(&value).map_err(Error::from)?; - Ok(Some(metadata)) - } -} diff --git a/node/storage_with_stream/src/store/mod.rs b/node/storage_with_stream/src/store/mod.rs index e9473d3..cc1816f 100644 --- a/node/storage_with_stream/src/store/mod.rs +++ b/node/storage_with_stream/src/store/mod.rs @@ -3,120 +3,68 @@ use std::sync::Arc; use async_trait::async_trait; use ethereum_types::{H160, H256}; use kv_types::{AccessControlSet, KVTransaction, KeyValuePair, StreamWriteSet}; -use shared_types::ChunkArrayWithProof; -use shared_types::ChunkWithProof; -use shared_types::DataRoot; -use shared_types::FlowRangeProof; -use storage::log_store::config::Configurable; +use shared_types::ChunkArray; + +use shared_types::FlowProof; + use storage::log_store::tx_store::BlockHashAndSubmissionIndex; -use storage::log_store::LogStoreChunkRead; -use storage::log_store::LogStoreChunkWrite; use crate::error::Result; -mod metadata_store; +mod data_store; +mod flow_store; mod sqlite_db_statements; pub mod store_manager; mod stream_store; +mod tx_store; pub use stream_store::to_access_control_op_name; pub use stream_store::AccessControlOps; -pub trait LogStoreRead: LogStoreChunkRead { - /// Get a transaction by its global log sequence number. +pub trait DataStoreRead { fn get_tx_by_seq_number(&self, seq: u64) -> Result>; - /// Get a transaction by the data root of its data. - fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result>; - - fn get_tx_by_data_root(&self, data_root: &DataRoot) -> Result> { - match self.get_tx_seq_by_data_root(data_root)? { - Some(seq) => self.get_tx_by_seq_number(seq), - None => Ok(None), - } - } - - fn get_chunk_with_proof_by_tx_and_index( - &self, - tx_seq: u64, - index: usize, - ) -> Result>; - - fn get_chunks_with_proof_by_tx_and_index_range( - &self, - tx_seq: u64, - index_start: usize, - index_end: usize, - ) -> Result>; - fn check_tx_completed(&self, tx_seq: u64) -> Result; fn next_tx_seq(&self) -> u64; fn get_sync_progress(&self) -> Result>; - fn get_block_hash_by_number(&self, block_number: u64) -> Result)>>; - fn get_block_hashes(&self) -> Result>; - fn validate_range_proof(&self, tx_seq: u64, data: &ChunkArrayWithProof) -> Result; - - fn get_proof_at_root(&self, root: &DataRoot, index: u64, length: u64) - -> Result; - fn get_log_latest_block_number(&self) -> Result>; - fn get_context(&self) -> Result<(DataRoot, u64)>; + fn get_chunk_by_flow_index(&self, index: u64, length: u64) -> Result>; } -pub trait LogStoreWrite: LogStoreChunkWrite { - /// Store a data entry metadata. +pub trait DataStoreWrite { fn put_tx(&mut self, tx: KVTransaction) -> Result<()>; - /// Finalize a transaction storage. - /// This will compute and the merkle tree, check the data root, and persist a part of the merkle - /// tree for future queries. - /// - /// This will return error if not all chunks are stored. But since this check can be expensive, - /// the caller is supposed to track chunk statuses and call this after storing all the chunks. - fn finalize_tx(&mut self, tx_seq: u64) -> Result<()>; fn finalize_tx_with_hash(&mut self, tx_seq: u64, tx_hash: H256) -> Result; - /// Store the progress of synced block number and its hash. fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()>; - /// Revert the log state to a given tx seq. - /// This is needed when transactions are reverted because of chain reorg. - /// - /// Reverted transactions are returned in order. fn revert_to(&mut self, tx_seq: u64) -> Result<()>; - /// If the proof is valid, fill the tree nodes with the new data. - fn validate_and_insert_range_proof( - &mut self, - tx_seq: u64, - data: &ChunkArrayWithProof, - ) -> Result; - fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>; fn put_log_latest_block_number(&self, block_number: u64) -> Result<()>; + + fn put_chunks_with_tx_hash( + &self, + tx_seq: u64, + tx_hash: H256, + chunks: ChunkArray, + maybe_file_proof: Option, + ) -> Result; } pub trait Store: - LogStoreRead + LogStoreWrite + Configurable + Send + Sync + StreamRead + StreamWrite + 'static + DataStoreRead + DataStoreWrite + Send + Sync + StreamRead + StreamWrite + 'static { } -impl< - T: LogStoreRead - + LogStoreWrite - + Configurable - + Send - + Sync - + StreamRead - + StreamWrite - + 'static, - > Store for T +impl Store + for T { } diff --git a/node/storage_with_stream/src/store/store_manager.rs b/node/storage_with_stream/src/store/store_manager.rs index f9ffae2..11618be 100644 --- a/node/storage_with_stream/src/store/store_manager.rs +++ b/node/storage_with_stream/src/store/store_manager.rs @@ -1,26 +1,17 @@ -use crate::try_option; use anyhow::{Error, Result}; use async_trait::async_trait; use ethereum_types::{H160, H256}; use kv_types::{AccessControlSet, KVTransaction, KeyValuePair, StreamWriteSet}; -use shared_types::{ - Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, -}; +use shared_types::{ChunkArray, FlowProof}; use std::path::Path; use std::sync::Arc; -use storage::log_store::config::Configurable; -use storage::log_store::log_manager::LogConfig; use storage::log_store::tx_store::BlockHashAndSubmissionIndex; -use storage::log_store::{ - LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead as _, LogStoreWrite as _, -}; -use storage::LogManager; use tracing::instrument; -use super::metadata_store::MetadataStore; +use super::data_store::DataStore; use super::stream_store::StreamStore; -use super::{LogStoreRead, LogStoreWrite, StreamRead, StreamWrite}; +use super::{DataStoreRead, DataStoreWrite, StreamRead, StreamWrite}; /// 256 Bytes pub const ENTRY_SIZE: usize = 256; @@ -28,42 +19,14 @@ pub const ENTRY_SIZE: usize = 256; pub const PORA_CHUNK_SIZE: usize = 1024; pub struct StoreManager { - metadata_store: MetadataStore, - log_store: LogManager, + data_store: DataStore, stream_store: StreamStore, } -impl LogStoreChunkWrite for StoreManager { - fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> { - self.log_store.put_chunks(tx_seq, chunks) - } - - fn put_chunks_with_tx_hash( - &self, - tx_seq: u64, - tx_hash: H256, - chunks: ChunkArray, - maybe_file_proof: Option, - ) -> storage::error::Result { - self.log_store - .put_chunks_with_tx_hash(tx_seq, tx_hash, chunks, maybe_file_proof) - } - - fn remove_chunks_batch(&self, batch_list: &[u64]) -> storage::error::Result<()> { - self.log_store.remove_chunks_batch(batch_list) - } -} - -impl LogStoreWrite for StoreManager { +impl DataStoreWrite for StoreManager { #[instrument(skip(self))] fn put_tx(&mut self, tx: KVTransaction) -> Result<()> { - self.metadata_store - .put_metadata(tx.transaction.seq, tx.metadata)?; - self.log_store.put_tx(tx.transaction) - } - - fn finalize_tx(&mut self, tx_seq: u64) -> Result<()> { - self.log_store.finalize_tx(tx_seq) + self.data_store.put_tx(tx) } fn finalize_tx_with_hash( @@ -71,178 +34,65 @@ impl LogStoreWrite for StoreManager { tx_seq: u64, tx_hash: H256, ) -> storage::error::Result { - self.log_store.finalize_tx_with_hash(tx_seq, tx_hash) + self.data_store.finalize_tx_with_hash(tx_seq, tx_hash) } fn put_sync_progress(&self, progress: (u64, H256, Option>)) -> Result<()> { - self.log_store.put_sync_progress(progress) + self.data_store.put_sync_progress(progress) } fn revert_to(&mut self, tx_seq: u64) -> Result<()> { - self.log_store.revert_to(tx_seq)?; + self.data_store.revert_to(tx_seq)?; Ok(()) } - fn validate_and_insert_range_proof( - &mut self, - tx_seq: u64, - data: &ChunkArrayWithProof, - ) -> storage::error::Result { - self.log_store.validate_and_insert_range_proof(tx_seq, data) - } - fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> { - self.log_store.delete_block_hash_by_number(block_number) + self.data_store.delete_block_hash_by_number(block_number) } fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> { - self.log_store.put_log_latest_block_number(block_number) - } -} - -impl LogStoreChunkRead for StoreManager { - fn get_chunk_by_tx_and_index( - &self, - tx_seq: u64, - index: usize, - ) -> crate::error::Result> { - self.log_store.get_chunk_by_tx_and_index(tx_seq, index) + self.data_store.put_log_latest_block_number(block_number) } - fn get_chunks_by_tx_and_index_range( + fn put_chunks_with_tx_hash( &self, tx_seq: u64, - index_start: usize, - index_end: usize, - ) -> crate::error::Result> { - self.log_store - .get_chunks_by_tx_and_index_range(tx_seq, index_start, index_end) - } - - fn get_chunk_by_data_root_and_index( - &self, - data_root: &DataRoot, - index: usize, - ) -> crate::error::Result> { - self.log_store - .get_chunk_by_data_root_and_index(data_root, index) - } - - fn get_chunks_by_data_root_and_index_range( - &self, - data_root: &DataRoot, - index_start: usize, - index_end: usize, - ) -> crate::error::Result> { - self.log_store - .get_chunks_by_data_root_and_index_range(data_root, index_start, index_end) - } - - fn get_chunk_index_list(&self, tx_seq: u64) -> crate::error::Result> { - self.log_store.get_chunk_index_list(tx_seq) - } - - fn get_chunk_by_flow_index( - &self, - index: u64, - length: u64, - ) -> crate::error::Result> { - self.log_store.get_chunk_by_flow_index(index, length) + tx_hash: H256, + chunks: ChunkArray, + maybe_file_proof: Option, + ) -> Result { + self.data_store + .put_chunks_with_tx_hash(tx_seq, tx_hash, chunks, maybe_file_proof) } } -impl LogStoreRead for StoreManager { - fn get_tx_by_seq_number(&self, seq: u64) -> crate::error::Result> { - Ok(Some(KVTransaction { - transaction: try_option!(self.log_store.get_tx_by_seq_number(seq)?), - metadata: try_option!(self.metadata_store.get_metadata_by_seq_number(seq)?), - })) - } - - fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> crate::error::Result> { - self.log_store.get_tx_seq_by_data_root(data_root) - } - - fn get_chunk_with_proof_by_tx_and_index( - &self, - tx_seq: u64, - index: usize, - ) -> crate::error::Result> { - self.log_store - .get_chunk_with_proof_by_tx_and_index(tx_seq, index) - } - - fn get_chunks_with_proof_by_tx_and_index_range( - &self, - tx_seq: u64, - index_start: usize, - index_end: usize, - ) -> crate::error::Result> { - self.log_store.get_chunks_with_proof_by_tx_and_index_range( - tx_seq, - index_start, - index_end, - None, - ) +impl DataStoreRead for StoreManager { + fn get_tx_by_seq_number(&self, seq: u64) -> Result> { + self.data_store.get_tx_by_seq_number(seq) } fn check_tx_completed(&self, tx_seq: u64) -> crate::error::Result { - self.log_store.check_tx_completed(tx_seq) - } - - fn validate_range_proof(&self, tx_seq: u64, data: &ChunkArrayWithProof) -> Result { - self.log_store.validate_range_proof(tx_seq, data) + self.data_store.check_tx_completed(tx_seq) } fn get_sync_progress(&self) -> Result> { - self.log_store.get_sync_progress() - } - - fn get_block_hash_by_number(&self, block_number: u64) -> Result)>> { - self.log_store.get_block_hash_by_number(block_number) + self.data_store.get_sync_progress() } fn get_block_hashes(&self) -> Result> { - self.log_store.get_block_hashes() + self.data_store.get_block_hashes() } fn next_tx_seq(&self) -> u64 { - self.log_store.next_tx_seq() - } - - fn get_proof_at_root( - &self, - root: &DataRoot, - index: u64, - length: u64, - ) -> Result { - self.log_store.get_proof_at_root(Some(*root), index, length) - } - - fn get_context(&self) -> Result<(DataRoot, u64)> { - self.log_store.get_context() + self.data_store.next_tx_seq() } fn get_log_latest_block_number(&self) -> storage::error::Result> { - self.log_store.get_log_latest_block_number() - } -} - -impl Configurable for StoreManager { - fn get_config(&self, key: &[u8]) -> Result>> { - self.log_store.get_config(key) - } - - fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.log_store.set_config(key, value) - } - - fn remove_config(&self, key: &[u8]) -> Result<()> { - self.log_store.remove_config(key) + self.data_store.get_log_latest_block_number() } - fn exec_configs(&self, tx: storage::log_store::config::ConfigTx) -> Result<()> { - self.log_store.exec_configs(tx) + fn get_chunk_by_flow_index(&self, index: u64, length: u64) -> Result> { + self.data_store.get_chunk_by_flow_index(index, length) } } @@ -422,7 +272,7 @@ impl StreamWrite for StoreManager { result: String, commit_data: Option<(StreamWriteSet, AccessControlSet)>, ) -> Result<()> { - match self.log_store.get_tx_by_seq_number(tx_seq) { + match self.data_store.get_tx_by_seq_number(tx_seq) { Ok(Some(tx)) => { if tx.data_merkle_root != data_merkle_root { return Err(Error::msg("data merkle root deos not match")); @@ -444,36 +294,26 @@ impl StreamWrite for StoreManager { async fn revert_stream(&mut self, tx_seq: u64) -> Result<()> { self.stream_store.revert_to(tx_seq).await?; - self.log_store.revert_to(tx_seq)?; + self.data_store.revert_to(tx_seq)?; Ok(()) } } impl StoreManager { - pub async fn memorydb( - config: LogConfig, - executor: task_executor::TaskExecutor, - ) -> Result { + pub async fn memorydb() -> Result { let stream_store = StreamStore::new_in_memory().await?; stream_store.create_tables_if_not_exist().await?; Ok(Self { - metadata_store: MetadataStore::memorydb(), - log_store: LogManager::memorydb(config, executor)?, + data_store: DataStore::memorydb(), stream_store, }) } - pub async fn rocks_db( - config: LogConfig, - path: impl AsRef, - kv_db_file: impl AsRef, - executor: task_executor::TaskExecutor, - ) -> Result { + pub async fn rocks_db(path: impl AsRef, kv_db_file: impl AsRef) -> Result { let stream_store = StreamStore::new(kv_db_file.as_ref()).await?; stream_store.create_tables_if_not_exist().await?; Ok(Self { - metadata_store: MetadataStore::rocksdb(path.as_ref().join("metadata"))?, - log_store: LogManager::rocksdb(config, path.as_ref().join("log"), executor)?, + data_store: DataStore::rocksdb(path.as_ref())?, stream_store, }) } diff --git a/node/storage_with_stream/src/store/tx_store.rs b/node/storage_with_stream/src/store/tx_store.rs new file mode 100644 index 0000000..91b3e3f --- /dev/null +++ b/node/storage_with_stream/src/store/tx_store.rs @@ -0,0 +1,210 @@ +use crate::error::Error; +use crate::try_option; +use anyhow::{anyhow, Result}; +use ethereum_types::H256; +use kv_types::KVTransaction; +use ssz::{Decode, Encode}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Instant; +use storage::log_store::tx_store::BlockHashAndSubmissionIndex; +use storage::ZgsKeyValueDB; +use tracing::{error, instrument}; + +use super::data_store::{COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED}; + +const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress"; +const NEXT_TX_KEY: &str = "next_tx_seq"; +const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key"; + +pub enum TxStatus { + Finalized, + Pruned, +} + +impl From for u8 { + fn from(value: TxStatus) -> Self { + match value { + TxStatus::Finalized => 0, + TxStatus::Pruned => 1, + } + } +} + +impl TryFrom for TxStatus { + type Error = anyhow::Error; + + fn try_from(value: u8) -> std::result::Result { + match value { + 0 => Ok(TxStatus::Finalized), + 1 => Ok(TxStatus::Pruned), + _ => Err(anyhow!("invalid value for tx status {}", value)), + } + } +} + +pub struct TransactionStore { + kvdb: Arc, + /// This is always updated before writing the database to ensure no intermediate states. + next_tx_seq: AtomicU64, +} + +impl TransactionStore { + pub fn new(kvdb: Arc) -> Result { + let next_tx_seq = kvdb + .get(COL_TX, NEXT_TX_KEY.as_bytes())? + .map(|a| decode_tx_seq(&a)) + .unwrap_or(Ok(0))?; + Ok(Self { + kvdb, + next_tx_seq: AtomicU64::new(next_tx_seq), + }) + } + + #[instrument(skip(self))] + pub fn put_tx(&self, tx: KVTransaction) -> Result<()> { + let _start_time = Instant::now(); + + let mut db_tx = self.kvdb.transaction(); + db_tx.put(COL_TX, &tx.seq.to_be_bytes(), &tx.as_ssz_bytes()); + db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &(tx.seq + 1).to_be_bytes()); + self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst); + self.kvdb.write(db_tx)?; + Ok(()) + } + + pub fn get_tx_by_seq_number(&self, seq: u64) -> Result> { + if seq >= self.next_tx_seq() { + return Ok(None); + } + let value = try_option!(self.kvdb.get(COL_TX, &seq.to_be_bytes())?); + let tx = KVTransaction::from_ssz_bytes(&value).map_err(Error::from)?; + Ok(Some(tx)) + } + + pub fn remove_tx_after(&self, min_seq: u64) -> Result> { + let mut removed_txs = Vec::new(); + let max_seq = self.next_tx_seq(); + let mut db_tx = self.kvdb.transaction(); + for seq in min_seq..max_seq { + let Some(tx) = self.get_tx_by_seq_number(seq)? else { + error!(?seq, ?max_seq, "Transaction missing before the end"); + break; + }; + db_tx.delete(COL_TX, &seq.to_be_bytes()); + db_tx.delete(COL_TX_COMPLETED, &seq.to_be_bytes()); + removed_txs.push(tx); + } + db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &min_seq.to_be_bytes()); + self.next_tx_seq.store(min_seq, Ordering::SeqCst); + self.kvdb.write(db_tx)?; + Ok(removed_txs) + } + + #[instrument(skip(self))] + pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> { + Ok(self.kvdb.put( + COL_TX_COMPLETED, + &tx_seq.to_be_bytes(), + &[TxStatus::Finalized.into()], + )?) + } + + pub fn get_tx_status(&self, tx_seq: u64) -> Result> { + let value = try_option!(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?); + match value.first() { + Some(v) => Ok(Some(TxStatus::try_from(*v)?)), + None => Ok(None), + } + } + + pub fn check_tx_completed(&self, tx_seq: u64) -> Result { + let _start_time = Instant::now(); + let status = self.get_tx_status(tx_seq)?; + + Ok(matches!(status, Some(TxStatus::Finalized))) + } + + pub fn next_tx_seq(&self) -> u64 { + self.next_tx_seq.load(Ordering::SeqCst) + } + + #[instrument(skip(self))] + pub fn put_progress(&self, progress: (u64, H256, Option>)) -> Result<()> { + let mut items = vec![( + COL_MISC, + LOG_SYNC_PROGRESS_KEY.as_bytes().to_vec(), + (progress.0, progress.1).as_ssz_bytes(), + )]; + + if let Some(p) = progress.2 { + items.push(( + COL_BLOCK_PROGRESS, + progress.0.to_be_bytes().to_vec(), + (progress.1, p).as_ssz_bytes(), + )); + } + Ok(self.kvdb.puts(items)?) + } + + #[instrument(skip(self))] + pub fn get_progress(&self) -> Result> { + Ok(Some( + <(u64, H256)>::from_ssz_bytes(&try_option!(self + .kvdb + .get(COL_MISC, LOG_SYNC_PROGRESS_KEY.as_bytes())?)) + .map_err(Error::from)?, + )) + } + + #[instrument(skip(self))] + pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> { + Ok(self.kvdb.put( + COL_MISC, + LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(), + &block_number.as_ssz_bytes(), + )?) + } + + #[instrument(skip(self))] + pub fn get_log_latest_block_number(&self) -> Result> { + Ok(Some( + ::from_ssz_bytes(&try_option!(self + .kvdb + .get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?)) + .map_err(Error::from)?, + )) + } + + pub fn get_block_hashes(&self) -> Result> { + let mut block_numbers = vec![]; + for r in self.kvdb.iter(COL_BLOCK_PROGRESS) { + let (key, val) = r?; + let block_number = + u64::from_be_bytes(key.as_ref().try_into().map_err(|e| anyhow!("{:?}", e))?); + let val = <(H256, Option)>::from_ssz_bytes(val.as_ref()).map_err(Error::from)?; + + block_numbers.push(( + block_number, + BlockHashAndSubmissionIndex { + block_hash: val.0, + first_submission_index: val.1, + }, + )); + } + + Ok(block_numbers) + } + + pub fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> { + Ok(self + .kvdb + .delete(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?) + } +} + +fn decode_tx_seq(data: &[u8]) -> Result { + Ok(u64::from_be_bytes( + data.try_into().map_err(|e| anyhow!("{:?}", e))?, + )) +} diff --git a/node/stream/Cargo.toml b/node/stream/Cargo.toml index 868006a..f24883b 100644 --- a/node/stream/Cargo.toml +++ b/node/stream/Cargo.toml @@ -20,8 +20,8 @@ ethers = { version = "^2", features = ["ws"] } serde_json = "1.0.127" storage_with_stream = { path = "../storage_with_stream" } rpc = {path = "../rpc"} -zgs_rpc = { git = "https://github.com/0glabs/0g-storage-node.git", rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8", package = "rpc" } -zgs_storage = { git = "https://github.com/0glabs/0g-storage-node.git", rev = "da2cdec8a1ddfd52333a6c10141df31611d0e1f8", package = "storage" } +zgs_rpc = { git = "https://github.com/0glabs/0g-storage-node.git", rev = "96f846073f111adfa31df926be0e4a25e0b85f54", package = "rpc" } +zgs_storage = { git = "https://github.com/0glabs/0g-storage-node.git", rev = "96f846073f111adfa31df926be0e4a25e0b85f54", package = "storage" } contract-interface = { workspace = true } rusqlite = { version = "0.28.0", features = ["bundled"] } tracing = "0.1.35" diff --git a/node/stream/src/stream_manager/mod.rs b/node/stream/src/stream_manager/mod.rs index 174803f..24b28bd 100644 --- a/node/stream/src/stream_manager/mod.rs +++ b/node/stream/src/stream_manager/mod.rs @@ -91,23 +91,17 @@ async fn skippable( config: &StreamConfig, store: Arc>, ) -> Result<(bool, bool)> { - if tx.metadata.stream_ids.is_empty() { + if tx.stream_ids.is_empty() { Ok((false, false)) } else { let replay_progress = store.read().await.get_stream_replay_progress().await?; // if replayer is not up-to-date, always make can_write be true - let mut can_write = replay_progress < tx.transaction.seq; - for id in tx.metadata.stream_ids.iter() { + let mut can_write = replay_progress < tx.seq; + for id in tx.stream_ids.iter() { if !config.stream_set.contains(id) { return Ok((false, false)); } - if !can_write - && store - .read() - .await - .can_write(tx.metadata.sender, *id, tx.transaction.seq) - .await? - { + if !can_write && store.read().await.can_write(tx.sender, *id, tx.seq).await? { can_write = true; } } diff --git a/node/stream/src/stream_manager/stream_data_fetcher.rs b/node/stream/src/stream_manager/stream_data_fetcher.rs index a4f142a..bc3c3a6 100644 --- a/node/stream/src/stream_manager/stream_data_fetcher.rs +++ b/node/stream/src/stream_manager/stream_data_fetcher.rs @@ -50,8 +50,7 @@ async fn download_with_proof( while fail_cnt < clients.len() { // find next let seg_index = start_index / ENTRIES_PER_SEGMENT; - let flow_seg_index = - (tx.transaction.start_entry_index as usize + start_index) / ENTRIES_PER_SEGMENT; + let flow_seg_index = (tx.start_entry_index as usize + start_index) / ENTRIES_PER_SEGMENT; let mut try_cnt = 0; loop { let configs = shard_configs.read().await; @@ -65,7 +64,7 @@ async fn download_with_proof( if try_cnt >= clients.len() { error!( "there is no storage nodes hold segment index {:?} of file with root {:?}", - seg_index, tx.transaction.data_merkle_root + seg_index, tx.data_merkle_root ); if let Err(e) = sender.send(Err((start_index, end_index, false))) { error!("send error: {:?}", e); @@ -75,10 +74,10 @@ async fn download_with_proof( } debug!( "download_with_proof for tx_seq: {}, start_index: {}, end_index {} from client #{}", - tx.transaction.seq, start_index, end_index, index + tx.seq, start_index, end_index, index ); match clients[index] - .download_segment_with_proof_by_tx_seq(tx.transaction.seq, seg_index) + .download_segment_with_proof_by_tx_seq(tx.seq, seg_index) .await { Ok(Some(segment)) => { @@ -93,7 +92,7 @@ async fn download_with_proof( return; } - if segment.root != tx.transaction.data_merkle_root { + if segment.root != tx.data_merkle_root { debug!("invalid file root"); if let Err(e) = sender.send(Err((start_index, end_index, true))) { error!("send error: {:?}", e); @@ -112,8 +111,8 @@ async fn download_with_proof( } if let Err(e) = store.write().await.put_chunks_with_tx_hash( - tx.transaction.seq, - tx.transaction.hash(), + tx.seq, + tx.hash(), ChunkArray { data: segment.data, start_index: (segment.index * ENTRIES_PER_SEGMENT) as u64, @@ -138,7 +137,7 @@ async fn download_with_proof( Ok(None) => { debug!( "tx_seq {}, start_index {}, end_index {}, client #{} response is none", - tx.transaction.seq, start_index, end_index, index + tx.seq, start_index, end_index, index ); fail_cnt += 1; tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await; @@ -146,7 +145,7 @@ async fn download_with_proof( Err(e) => { warn!( "tx_seq {}, start_index {}, end_index {}, client #{} response error: {:?}", - tx.transaction.seq, start_index, end_index, index, e + tx.seq, start_index, end_index, index, e ); fail_cnt += 1; tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await; @@ -256,18 +255,13 @@ impl StreamDataFetcher { } async fn sync_data(&self, tx: &KVTransaction) -> Result<()> { - if self - .store - .read() - .await - .check_tx_completed(tx.transaction.seq)? - { + if self.store.read().await.check_tx_completed(tx.seq)? { return Ok(()); } - let tx_size_in_entry = if tx.transaction.size % ENTRY_SIZE as u64 == 0 { - tx.transaction.size / ENTRY_SIZE as u64 + let tx_size_in_entry = if tx.size % ENTRY_SIZE as u64 == 0 { + tx.size / ENTRY_SIZE as u64 } else { - tx.transaction.size / ENTRY_SIZE as u64 + 1 + tx.size / ENTRY_SIZE as u64 + 1 }; let mut pending_entries = VecDeque::new(); @@ -283,7 +277,7 @@ impl StreamDataFetcher { ); debug!( "task_start_index: {:?}, tasks_end_index: {:?}, tx_size_in_entry: {:?}, root: {:?}", - i, tasks_end_index, tx_size_in_entry, tx.transaction.data_merkle_root + i, tasks_end_index, tx_size_in_entry, tx.data_merkle_root ); for j in (i..tasks_end_index).step_by(ENTRIES_PER_SEGMENT) { let task_end_index = cmp::min(tasks_end_index, j + ENTRIES_PER_SEGMENT as u64); @@ -322,7 +316,7 @@ impl StreamDataFetcher { } } Err((start_index, end_index, data_err)) => { - warn!("Download data of tx_seq {:?}, start_index {:?}, end_index {:?}, failed",tx.transaction.seq, start_index, end_index); + warn!("Download data of tx_seq {:?}, start_index {:?}, end_index {:?}, failed",tx.seq, start_index, end_index); match failed_tasks.get_mut(&start_index) { Some(c) => { @@ -339,12 +333,12 @@ impl StreamDataFetcher { } } - match self.request_file(tx.transaction.seq).await { + match self.request_file(tx.seq).await { Ok(_) => {} Err(e) => { warn!( "Failed to request file with tx seq {:?}, error: {}", - tx.transaction.seq, e + tx.seq, e ); } } @@ -364,7 +358,7 @@ impl StreamDataFetcher { self.store .write() .await - .finalize_tx_with_hash(tx.transaction.seq, tx.transaction.hash())?; + .finalize_tx_with_hash(tx.seq, tx.hash())?; Ok(()) } @@ -429,16 +423,10 @@ impl StreamDataFetcher { ); if stream_matched && can_write { // sync data - info!( - "syncing data of tx with sequence number {:?}..", - tx.transaction.seq - ); + info!("syncing data of tx with sequence number {:?}..", tx.seq); match self.sync_data(&tx).await { Ok(()) => { - info!( - "data of tx with sequence number {:?} synced.", - tx.transaction.seq - ); + info!("data of tx with sequence number {:?} synced.", tx.seq); } Err(e) => { error!("stream data sync error: e={:?}", e); @@ -450,11 +438,11 @@ impl StreamDataFetcher { // stream not matched, go to next tx info!( "sender of tx {:?} has no write permission, skipped.", - tx.transaction.seq + tx.seq ); } else { // stream not matched, go to next tx - info!("tx {:?} is not in stream, skipped.", tx.transaction.seq); + info!("tx {:?} is not in stream, skipped.", tx.seq); } // update progress, get next tx_seq to sync match self diff --git a/node/stream/src/stream_manager/stream_replayer.rs b/node/stream/src/stream_manager/stream_replayer.rs index 8208054..67f65cc 100644 --- a/node/stream/src/stream_manager/stream_replayer.rs +++ b/node/stream/src/stream_manager/stream_replayer.rs @@ -95,10 +95,10 @@ impl<'a> StreamReader<'a> { Self { store, tx, - tx_size_in_entry: if tx.transaction.size % ENTRY_SIZE as u64 == 0 { - tx.transaction.size / ENTRY_SIZE as u64 + tx_size_in_entry: if tx.size % ENTRY_SIZE as u64 == 0 { + tx.size / ENTRY_SIZE as u64 } else { - tx.transaction.size / ENTRY_SIZE as u64 + 1 + tx.size / ENTRY_SIZE as u64 + 1 }, current_position: 0, buffer: vec![], @@ -106,15 +106,17 @@ impl<'a> StreamReader<'a> { } pub fn current_position_in_bytes(&self) -> u64 { - (self.current_position + self.tx.transaction.start_entry_index) * (ENTRY_SIZE as u64) + (self.current_position + self.tx.start_entry_index) * (ENTRY_SIZE as u64) - (self.buffer.len() as u64) } async fn load(&mut self, length: u64) -> Result<()> { - match self.store.read().await.get_chunk_by_flow_index( - self.current_position + self.tx.transaction.start_entry_index, - length, - )? { + match self + .store + .read() + .await + .get_chunk_by_flow_index(self.current_position + self.tx.start_entry_index, length)? + { Some(mut x) => { self.buffer.append(&mut x.data); self.current_position += length; @@ -227,11 +229,7 @@ impl StreamReplayer { .store .read() .await - .get_latest_version_before( - stream_read.stream_id, - stream_read.key.clone(), - tx.transaction.seq, - ) + .get_latest_version_before(stream_read.stream_id, stream_read.key.clone(), tx.seq) .await? > version { @@ -290,7 +288,7 @@ impl StreamReplayer { tx: &KVTransaction, version: u64, ) -> Result> { - let stream_set = HashSet::::from_iter(tx.metadata.stream_ids.iter().cloned()); + let stream_set = HashSet::::from_iter(tx.stream_ids.iter().cloned()); let store_read = self.store.read().await; for stream_write in stream_write_set.stream_writes.iter() { if !stream_set.contains(&stream_write.stream_id) { @@ -299,11 +297,7 @@ impl StreamReplayer { } // check version confiction if store_read - .get_latest_version_before( - stream_write.stream_id, - stream_write.key.clone(), - tx.transaction.seq, - ) + .get_latest_version_before(stream_write.stream_id, stream_write.key.clone(), tx.seq) .await? > version { @@ -312,10 +306,10 @@ impl StreamReplayer { // check write permission if !(store_read .has_write_permission( - tx.metadata.sender, + tx.sender, stream_write.stream_id, stream_write.key.clone(), - tx.transaction.seq, + tx.seq, ) .await?) { @@ -344,13 +338,13 @@ impl StreamReplayer { // pad GRANT_ADMIN_ROLE prefix to handle the first write to new stream let mut is_admin = HashSet::new(); let store_read = self.store.read().await; - for id in &tx.metadata.stream_ids { - if store_read.is_new_stream(*id, tx.transaction.seq).await? { + for id in &tx.stream_ids { + if store_read.is_new_stream(*id, tx.seq).await? { let op_meta = ( AccessControlOps::GRANT_ADMIN_ROLE & 0xf0, *id, Arc::new(vec![]), - tx.metadata.sender, + tx.sender, ); access_ops.insert( op_meta, @@ -358,15 +352,12 @@ impl StreamReplayer { op_type: AccessControlOps::GRANT_ADMIN_ROLE, stream_id: *id, key: Arc::new(vec![]), - account: tx.metadata.sender, + account: tx.sender, operator: H160::zero(), }, ); is_admin.insert(*id); - } else if store_read - .is_admin(tx.metadata.sender, *id, tx.transaction.seq) - .await? - { + } else if store_read.is_admin(tx.sender, *id, tx.seq).await? { is_admin.insert(*id); } } @@ -406,11 +397,11 @@ impl StreamReplayer { } // renounce type AccessControlOps::RENOUNCE_ADMIN_ROLE | AccessControlOps::RENOUNCE_WRITER_ROLE => { - account = tx.metadata.sender; + account = tx.sender; } AccessControlOps::RENOUNCE_SPECIAL_WRITER_ROLE => { key = Arc::new(self.parse_key(stream_reader).await?); - account = tx.metadata.sender; + account = tx.sender; } // unexpected type _ => { @@ -419,7 +410,7 @@ impl StreamReplayer { } let op_meta = (op_type & 0xf0, stream_id, key.clone(), account); if op_type != AccessControlOps::GRANT_ADMIN_ROLE - || (!access_ops.contains_key(&op_meta) && account != tx.metadata.sender) + || (!access_ops.contains_key(&op_meta) && account != tx.sender) { access_ops.insert( op_meta, @@ -428,7 +419,7 @@ impl StreamReplayer { stream_id, key: key.clone(), account, - operator: tx.metadata.sender, + operator: tx.sender, }, ); } @@ -446,7 +437,7 @@ impl StreamReplayer { ) -> Result> { // validate let store_read = self.store.read().await; - let stream_set = HashSet::::from_iter(tx.metadata.stream_ids.iter().cloned()); + let stream_set = HashSet::::from_iter(tx.stream_ids.iter().cloned()); for access_control in &access_control_set.access_controls { if !stream_set.contains(&access_control.stream_id) { // the write set in data is conflict with tx tags @@ -475,11 +466,7 @@ impl StreamReplayer { .is_admin .contains(&access_control.stream_id) && !store_read - .is_writer_of_stream( - tx.metadata.sender, - access_control.stream_id, - tx.transaction.seq, - ) + .is_writer_of_stream(tx.sender, access_control.stream_id, tx.seq) .await? { return Ok(Some(ReplayResult::AccessControlPermissionDenied( @@ -496,10 +483,10 @@ impl StreamReplayer { .contains(&access_control.stream_id) && !store_read .is_writer_of_key( - tx.metadata.sender, + tx.sender, access_control.stream_id, access_control.key.clone(), - tx.transaction.seq, + tx.seq, ) .await? { @@ -518,12 +505,7 @@ impl StreamReplayer { } async fn replay(&self, tx: &KVTransaction) -> Result { - if !self - .store - .read() - .await - .check_tx_completed(tx.transaction.seq)? - { + if !self.store.read().await.check_tx_completed(tx.seq)? { return Ok(ReplayResult::DataUnavailable); } let mut stream_reader = StreamReader::new(self.store.clone(), tx); @@ -585,7 +567,7 @@ impl StreamReplayer { return Ok(result); } Ok(ReplayResult::Commit( - tx.transaction.seq, + tx.seq, stream_write_set, access_control_set, )) @@ -635,10 +617,7 @@ impl StreamReplayer { }; // replay data if stream_matched && can_write { - info!( - "replaying data of tx with sequence number {:?}..", - tx.transaction.seq - ); + info!("replaying data of tx with sequence number {:?}..", tx.seq); match self.replay(&tx).await { Ok(result) => { let result_str = result.to_string(); @@ -654,7 +633,7 @@ impl StreamReplayer { .await .put_stream( tx_seq, - tx.transaction.data_merkle_root, + tx.data_merkle_root, result_str.clone(), Some((stream_write_set, access_control_set)), ) @@ -663,7 +642,7 @@ impl StreamReplayer { Ok(_) => { info!( "tx with sequence number {:?} commit.", - tx.transaction.seq + tx.seq ); } Err(e) => { @@ -675,7 +654,7 @@ impl StreamReplayer { } ReplayResult::DataUnavailable => { // data not available - info!("data of tx with sequence number {:?} is not available yet, wait..", tx.transaction.seq); + info!("data of tx with sequence number {:?} is not available yet, wait..", tx.seq); tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)) .await; check_replay_progress = true; @@ -687,8 +666,8 @@ impl StreamReplayer { .write() .await .put_stream( - tx.transaction.seq, - tx.transaction.data_merkle_root, + tx.seq, + tx.data_merkle_root, result_str.clone(), None, ) @@ -697,7 +676,7 @@ impl StreamReplayer { Ok(_) => { info!( "tx with sequence number {:?} reverted with reason {:?}", - tx.transaction.seq, result_str + tx.seq, result_str ); } Err(e) => { @@ -719,25 +698,20 @@ impl StreamReplayer { } else if stream_matched { info!( "tx {:?} is in stream but sender has no write permission.", - tx.transaction.seq + tx.seq ); let result_str = ReplayResult::SenderNoWritePermission.to_string(); match self .store .write() .await - .put_stream( - tx.transaction.seq, - tx.transaction.data_merkle_root, - result_str.clone(), - None, - ) + .put_stream(tx.seq, tx.data_merkle_root, result_str.clone(), None) .await { Ok(_) => { info!( "tx with sequence number {:?} reverted with reason {:?}", - tx.transaction.seq, result_str + tx.seq, result_str ); } Err(e) => { @@ -747,7 +721,7 @@ impl StreamReplayer { } } } else { - info!("tx {:?} is not in stream, skipped.", tx.transaction.seq); + info!("tx {:?} is not in stream, skipped.", tx.seq); } // parse success // update progress, get next tx_seq to sync