Skip to content

Commit

Permalink
fix: shard download (#25)
Browse files Browse the repository at this point in the history
* fix: download from shard

* test: zgs
  • Loading branch information
MiniFrenchBread authored Oct 14, 2024
1 parent 5a041db commit cbb23b3
Show file tree
Hide file tree
Showing 19 changed files with 519 additions and 558 deletions.
613 changes: 393 additions & 220 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ enr = { path = "version-meld/enr" }
discv5 = { path = "version-meld/discv5" }

[workspace.dependencies]
append_merkle = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "59d24b073d93c1146666295e48609825f761abea" }
merkle_light = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "59d24b073d93c1146666295e48609825f761abea" }
merkle_tree = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "59d24b073d93c1146666295e48609825f761abea" }
shared_types = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "59d24b073d93c1146666295e48609825f761abea" }
task_executor = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "59d24b073d93c1146666295e48609825f761abea" }
storage = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "59d24b073d93c1146666295e48609825f761abea" }
contract-interface = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "59d24b073d93c1146666295e48609825f761abea" }
append_merkle = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995" }
merkle_light = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995" }
merkle_tree = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995" }
shared_types = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995" }
task_executor = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995" }
storage = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995", package = "storage" }
contract-interface = { git = "https://github.com/0glabs/0g-storage-node.git",rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995" }
2 changes: 1 addition & 1 deletion node/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tokio = { version = "1.19.2", features = ["macros", "sync"] }
tracing = "0.1.35"
merkle_light = { workspace = true }
merkle_tree = { workspace = true }
zgs_rpc = { git = "https://github.com/0glabs/0g-storage-node.git", branch = "main", package = "rpc" }
zgs_rpc = { git = "https://github.com/0glabs/0g-storage-node.git", rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995", package = "rpc" }
futures-channel = "^0.3"
ethereum-types = "0.14"
storage_with_stream = { path = "../storage_with_stream" }
Expand Down
7 changes: 6 additions & 1 deletion node/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ impl ClientBuilder {

/// Initializes in-memory storage.
pub async fn with_memory_store(mut self) -> Result<Self, String> {
let executor = require!("storage", self, runtime_context).clone().executor;

// TODO(zz): Set config.
let store = Arc::new(RwLock::new(
StoreManager::memorydb(LogConfig::default())
StoreManager::memorydb(LogConfig::default(), executor)
.await
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
));
Expand All @@ -61,11 +63,14 @@ impl ClientBuilder {

/// Initializes RocksDB storage.
pub async fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("storage", self, runtime_context).clone().executor;

let store = Arc::new(RwLock::new(
StoreManager::rocks_db(
LogConfig::default(),
&config.log_config.db_dir,
&config.kv_db_file,
executor,
)
.await
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
Expand Down
1 change: 1 addition & 0 deletions node/storage_with_stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rusqlite = { version = "0.28.0", features = ["bundled"] }
const_format = "0.2.26"
tokio-rusqlite = "0.3.0"
async-trait = "0.1.56"
task_executor = { workspace = true }

[dev-dependencies]
tempdir = "0.3.7"
Expand Down
9 changes: 6 additions & 3 deletions node/storage_with_stream/src/store/store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,14 @@ impl StreamWrite for StoreManager {
}

impl StoreManager {
pub async fn memorydb(config: LogConfig) -> Result<Self> {
pub async fn memorydb(config: LogConfig,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let stream_store = StreamStore::new_in_memory().await?;
stream_store.create_tables_if_not_exist().await?;
Ok(Self {
metadata_store: MetadataStore::memorydb(),
log_store: LogManager::memorydb(config)?,
log_store: LogManager::memorydb(config, executor)?,
stream_store,
})
}
Expand All @@ -455,12 +457,13 @@ impl StoreManager {
config: LogConfig,
path: impl AsRef<Path>,
kv_db_file: impl AsRef<Path>,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let stream_store = StreamStore::new(kv_db_file.as_ref()).await?;
stream_store.create_tables_if_not_exist().await?;
Ok(Self {
metadata_store: MetadataStore::rocksdb(path.as_ref().join("metadata"))?,
log_store: LogManager::rocksdb(config, path.as_ref().join("log"))?,
log_store: LogManager::rocksdb(config, path.as_ref().join("log"), executor)?,
stream_store,
})
}
Expand Down
4 changes: 2 additions & 2 deletions node/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ ethers = { version = "^2", features = ["ws"] }
serde_json = "1.0.127"
storage_with_stream = { path = "../storage_with_stream" }
rpc = {path = "../rpc"}
zgs_rpc = { git = "https://github.com/0glabs/0g-storage-node.git", branch = "main", package = "rpc" }
zgs_storage = { git = "https://github.com/0glabs/0g-storage-node.git", branch = "main", package = "storage" }
zgs_rpc = { git = "https://github.com/0glabs/0g-storage-node.git", rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995", package = "rpc" }
zgs_storage = { git = "https://github.com/0glabs/0g-storage-node.git", rev = "3fc1543fb4a1b9321cf5ecc9a81e152a14950995", package = "storage" }
contract-interface = { workspace = true }
rusqlite = { version = "0.28.0", features = ["bundled"] }
tracing = "0.1.35"
Expand Down
5 changes: 3 additions & 2 deletions node/stream/src/stream_manager/stream_data_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ async fn download_with_proof(
while fail_cnt < clients.len() {
// find next
let seg_index = start_index / ENTRIES_PER_SEGMENT;
let flow_seg_index = (tx.transaction.start_entry_index as usize + start_index) / ENTRIES_PER_SEGMENT;
let mut try_cnt = 0;
loop {
let configs = shard_configs.read().await;
if let Some(shard_config) = configs[index] {
if seg_index % shard_config.num_shard == shard_config.shard_id {
if flow_seg_index % shard_config.num_shard == shard_config.shard_id {
break;
}
}
Expand All @@ -76,7 +77,7 @@ async fn download_with_proof(
tx.transaction.seq, start_index, end_index, index
);
match clients[index]
.download_segment_with_proof(tx.transaction.data_merkle_root, seg_index)
.download_segment_with_proof_by_tx_seq(tx.transaction.seq, seg_index)
.await
{
Ok(Some(segment)) => {
Expand Down
2 changes: 1 addition & 1 deletion tests/storage-contracts-abis/0g-storage-contracts-rev
Original file line number Diff line number Diff line change
@@ -1 +1 @@
dbeff538b949599c203e43be6ecc05e9e997d09d
bea58429e436e4952ae69235d9079cfc4ac5f3b3
4 changes: 2 additions & 2 deletions tests/storage-contracts-abis/ChunkLinearReward.json

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions tests/storage-contracts-abis/DummyMarket.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,23 @@
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [],
"name": "pricePerSector",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "pure",
"type": "function"
}
],
"bytecode": "0x6080604052348015600f57600080fd5b5060a08061001e6000396000f3fe6080604052348015600f57600080fd5b506004361060285760003560e01c8063da6eb36a14602d575b600080fd5b603d6038366004603f565b505050565b005b600080600060608486031215605357600080fd5b50508135936020830135935060409092013591905056fea264697066735822122054eb84b374e7eb5c57b284f82f977fe19500436ef4128d3e147969cefdd4cbcd64736f6c63430008100033",
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060285760003560e01c8063da6eb36a14602d575b600080fd5b603d6038366004603f565b505050565b005b600080600060608486031215605357600080fd5b50508135936020830135935060409092013591905056fea264697066735822122054eb84b374e7eb5c57b284f82f977fe19500436ef4128d3e147969cefdd4cbcd64736f6c63430008100033",
"bytecode": "0x608060405234801561001057600080fd5b5060be8061001f6000396000f3fe6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
"deployedBytecode": "0x6080604052348015600f57600080fd5b506004361060325760003560e01c806361ec5082146037578063da6eb36a14604b575b600080fd5b600060405190815260200160405180910390f35b605b6056366004605d565b505050565b005b600080600060608486031215607157600080fd5b50508135936020830135935060409092013591905056fea264697066735822122080db0b00f4b93cc320a2df449a74e503451a2675da518eff0fc5b7cf0ae8c90c64736f6c63430008100033",
"linkReferences": {},
"deployedLinkReferences": {}
}
4 changes: 2 additions & 2 deletions tests/storage-contracts-abis/DummyReward.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@
"type": "function"
}
],
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea264697066735822122031a993c3def9ed899c5b5a53bab495d498047e1a8ce262b61e700511cfb9adf164736f6c63430008100033",
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea264697066735822122031a993c3def9ed899c5b5a53bab495d498047e1a8ce262b61e700511cfb9adf164736f6c63430008100033",
"bytecode": "0x608060405234801561001057600080fd5b5060f18061001f6000396000f3fe60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
"deployedBytecode": "0x60806040526004361060265760003560e01c806359e9670014602b578063b7a3c04c14603c575b600080fd5b603a60363660046058565b5050565b005b348015604757600080fd5b50603a60533660046079565b505050565b60008060408385031215606a57600080fd5b50508035926020909101359150565b600080600060608486031215608d57600080fd5b8335925060208401356001600160a01b038116811460aa57600080fd5b92959294505050604091909101359056fea2646970667358221220d2f22ec6a41724281bad8a768c241562927a5fcc8ba600f3b3784f584a68c65864736f6c63430008100033",
"linkReferences": {},
"deployedLinkReferences": {}
}
4 changes: 2 additions & 2 deletions tests/storage-contracts-abis/FixedPrice.json

Large diffs are not rendered by default.

36 changes: 34 additions & 2 deletions tests/storage-contracts-abis/FixedPriceFlow.json

Large diffs are not rendered by default.

36 changes: 34 additions & 2 deletions tests/storage-contracts-abis/Flow.json

Large diffs are not rendered by default.

Loading

0 comments on commit cbb23b3

Please sign in to comment.