Skip to content

Commit

Permalink
Update the merkle tree with proof data. (#9)
Browse files Browse the repository at this point in the history
* Add proof data for chunk proof.

* Support file proof.

* Update with segment proof and fix issues.

* Fix more issues.

* Fix the process of file proof.

* Merge branch 'main' into fix_proof

* Enable sync_test.

* Fix wrongly updated submodule.

* Fix bsc node version.
  • Loading branch information
peilun-conflux authored Jan 26, 2024
1 parent 2c2dba8 commit 0c12350
Show file tree
Hide file tree
Showing 16 changed files with 236 additions and 34 deletions.
86 changes: 81 additions & 5 deletions common/append_merkle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use tracing::warn;
use tracing::{debug, warn};

pub use crate::merkle_tree::{Algorithm, HashElement, MerkleTreeInitialData, MerkleTreeRead};
pub use proof::{Proof, RangeProof};
Expand Down Expand Up @@ -188,6 +188,81 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
}

/// Fill nodes with a valid proof data.
/// This requires that the proof is built against this tree.
/// This should only be called after validating the proof (including checking root existence).
/// Returns `Error` if the data is conflict with existing ones.
pub fn fill_with_range_proof(&mut self, proof: RangeProof<E>) -> Result<()> {
self.fill_with_proof(
proof
.left_proof
.proof_nodes_in_tree()
.split_off(self.leaf_height),
)?;
self.fill_with_proof(
proof
.right_proof
.proof_nodes_in_tree()
.split_off(self.leaf_height),
)
}

pub fn fill_with_file_proof(
&mut self,
proof: Proof<E>,
mut tx_merkle_nodes: Vec<(usize, E)>,
start_index: u64,
) -> Result<()> {
if self.leaf_height != 0 {
tx_merkle_nodes = tx_merkle_nodes
.into_iter()
.filter_map(|(height, data)| {
if height >= self.leaf_height + 1 {
Some((height - self.leaf_height - 1, data))
} else {
None
}
})
.collect();
}
if tx_merkle_nodes.is_empty() {
return Ok(());
}
let mut position_and_data = proof.file_proof_nodes_in_tree(tx_merkle_nodes);
let start_index = (start_index >> self.leaf_height) as usize;
for (i, (position, _)) in position_and_data.iter_mut().enumerate() {
*position += start_index >> i;
}
self.fill_with_proof(position_and_data)
}

/// This assumes that the proof leaf is no lower than the tree leaf. It holds for both SegmentProof and ChunkProof.
fn fill_with_proof(&mut self, position_and_data: Vec<(usize, E)>) -> Result<()> {
// A valid proof should not fail the following checks.
for (i, (position, data)) in position_and_data.into_iter().enumerate() {
let layer = &mut self.layers[i];
if position > layer.len() {
bail!(
"proof position out of range, position={} layer.len()={}",
position,
layer.len()
);
}
if position == layer.len() {
// skip padding node.
continue;
}
if layer[position] != E::null() && layer[position] != data {
// This is possible for a valid file proof only when the file proof node is an intermediate node,
// so the correct proof node in the flow merkle tree must have been computed as we pad rear data.
// Thus, it's okay to skip this case directly.
continue;
}
layer[position] = data;
}
Ok(())
}

pub fn gen_range_proof(&self, start_index: usize, end_index: usize) -> Result<RangeProof<E>> {
if end_index <= start_index {
bail!(
Expand Down Expand Up @@ -609,23 +684,23 @@ mod tests {
AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
merkle.append_list(data.clone());
merkle.commit(Some(0));
verify(&data, &merkle);
verify(&data, &mut merkle);

data.push(H256::random());
merkle.append(*data.last().unwrap());
merkle.commit(Some(1));
verify(&data, &merkle);
verify(&data, &mut merkle);

for _ in 0..6 {
data.push(H256::random());
}
merkle.append_list(data[data.len() - 6..].to_vec());
merkle.commit(Some(2));
verify(&data, &merkle);
verify(&data, &mut merkle);
}
}

fn verify(data: &Vec<H256>, merkle: &AppendMerkleTree<H256, Sha3Algorithm>) {
fn verify(data: &Vec<H256>, merkle: &mut AppendMerkleTree<H256, Sha3Algorithm>) {
for i in 0..data.len() {
let proof = merkle.gen_proof(i + 1).unwrap();
let r = merkle.validate(&proof, &data[i], i + 1);
Expand All @@ -636,6 +711,7 @@ mod tests {
let range_proof = merkle.gen_range_proof(i + 1, end + 1).unwrap();
let r = range_proof.validate::<Sha3Algorithm>(&data[i..end], i + 1);
assert!(r.is_ok(), "{:?}", r);
merkle.fill_with_range_proof(range_proof).unwrap();
}
}
}
7 changes: 7 additions & 0 deletions common/append_merkle/src/merkle_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ pub trait MerkleTreeRead {
index_in_layer >>= 1;
}
lemma.push(self.root().clone());
if lemma.contains(&Self::E::null()) {
bail!(
"Not enough data to generate proof, lemma={:?} path={:?}",
lemma,
path
);
}
Ok(Proof::new(lemma, path))
}
}
Expand Down
50 changes: 50 additions & 0 deletions common/append_merkle/src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,56 @@ impl<T: HashElement> Proof<T> {
}
pos
}

/// Return `Vec<(index_in_layer, data)>`.
pub fn proof_nodes_in_tree(&self) -> Vec<(usize, T)> {
let mut r = Vec::with_capacity(self.lemma.len());
let mut pos = 0;
r.push((0, self.root()));
for (i, is_left) in self.path.iter().rev().enumerate() {
pos <<= 1;
if !*is_left {
pos += 1;
}
let lemma_pos = if *is_left { pos + 1 } else { pos - 1 };
r.push((lemma_pos, self.lemma[self.lemma.len() - 2 - i].clone()));
}
r.reverse();
r
}

pub fn file_proof_nodes_in_tree(&self, tx_merkle_nodes: Vec<(usize, T)>) -> Vec<(usize, T)> {
let mut r = Vec::with_capacity(self.lemma.len());
let mut subtree_pos = 0;
let mut root_pos = 0;
let mut in_subtree = tx_merkle_nodes.len() == 1;
for (i, is_left) in self.path.iter().rev().enumerate() {
if !in_subtree {
if *is_left {
in_subtree = true;
} else {
if i < tx_merkle_nodes.len() {
root_pos += 1 << tx_merkle_nodes[i].0;
} else {
break;
}
}
} else {
subtree_pos <<= 1;
if !*is_left {
subtree_pos += 1;
}
let lemma_pos = if *is_left {
root_pos + subtree_pos + 1
} else {
root_pos + subtree_pos - 1
};
r.push((lemma_pos, self.lemma[self.lemma.len() - 2 - i].clone()));
}
}
r.reverse();
r
}
}

#[derive(Clone, Debug, Eq, PartialEq, DeriveEncode, DeriveDecode, Deserialize, Serialize)]
Expand Down
13 changes: 9 additions & 4 deletions node/chunk_pool/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::mem_pool::MemoryChunkPool;
use crate::mem_pool::FileID;
use anyhow::Result;
use network::NetworkMessage;
use shared_types::ChunkArray;
use shared_types::{ChunkArray, FileProof};
use std::sync::Arc;
use storage_async::Store;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -45,11 +45,16 @@ impl ChunkPoolHandler {
// when store support to write chunks with reference.
if let Some(file) = self.mem_pool.remove_cached_file(&id.root).await {
// If there is still cache of chunks, write them into store
let mut segments: Vec<ChunkArray> = file.segments.into_values().collect();
while let Some(seg) = segments.pop() {
let mut segments: Vec<(ChunkArray, FileProof)> = file.segments.into_values().collect();
while let Some((seg, proof)) = segments.pop() {
if !self
.log_store
.put_chunks_with_tx_hash(id.tx_id.seq, id.tx_id.hash, seg)
.put_chunks_with_tx_hash(
id.tx_id.seq,
id.tx_id.hash,
seg,
Some(proof.try_into()?),
)
.await?
{
return Ok(false);
Expand Down
4 changes: 2 additions & 2 deletions node/chunk_pool/src/mem_pool/chunk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::FileID;
use crate::{Config, SegmentInfo};
use anyhow::{bail, Result};
use hashlink::LinkedHashMap;
use shared_types::{bytes_to_chunks, ChunkArray, DataRoot, Transaction, CHUNK_SIZE};
use shared_types::{bytes_to_chunks, ChunkArray, DataRoot, FileProof, Transaction, CHUNK_SIZE};
use std::collections::HashMap;
use std::ops::Add;
use std::time::{Duration, Instant};
Expand All @@ -13,7 +13,7 @@ pub struct MemoryCachedFile {
pub id: FileID,
pub chunks_per_segment: usize,
/// Window to control the cache of each file
pub segments: HashMap<usize, ChunkArray>,
pub segments: HashMap<usize, (ChunkArray, FileProof)>,
/// Total number of chunks for the cache file, which is updated from log entry.
pub total_chunks: usize,
/// Used for garbage collection. It is updated when new segment uploaded.
Expand Down
39 changes: 27 additions & 12 deletions node/chunk_pool/src/mem_pool/chunk_pool_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::{bail, Result};
use async_lock::Mutex;
use log_entry_sync::LogSyncEvent;
use shared_types::{
bytes_to_chunks, compute_segment_size, ChunkArray, DataRoot, Transaction, CHUNK_SIZE,
bytes_to_chunks, compute_segment_size, ChunkArray, DataRoot, FileProof, Transaction, CHUNK_SIZE,
};
use std::sync::Arc;
use storage_async::Store;
Expand Down Expand Up @@ -37,7 +37,7 @@ impl Inner {
fn get_all_cached_segments_to_write(
&mut self,
root: &DataRoot,
) -> Result<(FileID, Vec<ChunkArray>)> {
) -> Result<(FileID, Vec<(ChunkArray, FileProof)>)> {
// Limits the number of writing threads.
if self.write_control.total_writings >= self.config.max_writings {
bail!("too many data writing: {}", self.config.max_writings);
Expand All @@ -59,17 +59,21 @@ impl Inner {
pub struct SegmentInfo {
pub root: DataRoot,
pub seg_data: Vec<u8>,
pub seg_proof: FileProof,
pub seg_index: usize,
pub chunks_per_segment: usize,
}

impl From<SegmentInfo> for ChunkArray {
impl From<SegmentInfo> for (ChunkArray, FileProof) {
fn from(seg_info: SegmentInfo) -> Self {
let start_index = seg_info.seg_index * seg_info.chunks_per_segment;
ChunkArray {
data: seg_info.seg_data,
start_index: start_index as u64,
}
(
ChunkArray {
data: seg_info.seg_data,
start_index: start_index as u64,
},
seg_info.seg_proof,
)
}
}

Expand Down Expand Up @@ -155,7 +159,12 @@ impl MemoryChunkPool {

match self
.log_store
.put_chunks_with_tx_hash(file_id.tx_id.seq, file_id.tx_id.hash, seg)
.put_chunks_with_tx_hash(
file_id.tx_id.seq,
file_id.tx_id.hash,
seg,
Some(seg_info.seg_proof.try_into()?),
)
.await
{
Ok(true) => {}
Expand Down Expand Up @@ -212,11 +221,12 @@ impl MemoryChunkPool {
.remove_file(&tx.data_merkle_root);
if let Some(mut file) = maybe_file {
file.update_with_tx(tx);
for (seg_index, seg) in file.segments.into_iter() {
for (seg_index, (seg, proof)) in file.segments.into_iter() {
self.write_chunks(
SegmentInfo {
root: tx.data_merkle_root,
seg_data: seg.data.clone(),
seg_proof: proof,
seg_index,
chunks_per_segment: file.chunks_per_segment,
},
Expand Down Expand Up @@ -276,19 +286,24 @@ impl MemoryChunkPool {
}

async fn write_all_cached_chunks_and_finalize(&self, root: DataRoot) -> Result<()> {
let (file, mut segments) = self
let (file, mut segments_with_proof) = self
.inner
.lock()
.await
.get_all_cached_segments_to_write(&root)?;

while let Some(seg) = segments.pop() {
while let Some((seg, proof)) = segments_with_proof.pop() {
// TODO(qhz): error handling
// 1. Push the failed segment back to front. (enhance store to return Err(ChunkArray))
// 2. Put the incompleted segments back to memory pool.
match self
.log_store
.put_chunks_with_tx_hash(file.tx_id.seq, file.tx_id.hash, seg)
.put_chunks_with_tx_hash(
file.tx_id.seq,
file.tx_id.hash,
seg,
Some(proof.try_into()?),
)
.await
{
Ok(true) => {}
Expand Down
1 change: 1 addition & 0 deletions node/log_entry_sync/src/sync_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ impl LogSyncManager {
data,
start_index: 0,
},
None,
)
.and_then(|_| store.finalize_tx_with_hash(tx.seq, tx.hash()))
{
Expand Down
1 change: 1 addition & 0 deletions node/rpc/src/zgs/impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl RpcServer for RpcServerImpl {
let seg_info = SegmentInfo {
root: segment.root,
seg_data: segment.data,
seg_proof: segment.proof,
seg_index: segment.index,
chunks_per_segment: self.ctx.config.chunks_per_segment,
};
Expand Down
18 changes: 17 additions & 1 deletion node/shared_types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod proof;

use anyhow::bail;
use anyhow::{anyhow, bail, Error};
use append_merkle::{
AppendMerkleTree, Proof as RawProof, RangeProof as RawRangeProof, Sha3Algorithm,
};
Expand Down Expand Up @@ -346,3 +346,19 @@ pub fn compute_segment_merkle_root(data: &[u8], segment_chunks: usize) -> [u8; 3

MerkleTree::<_, RawLeafSha3Algorithm>::new(hashes).root()
}

impl TryFrom<FileProof> for FlowProof {
type Error = Error;

fn try_from(value: FileProof) -> Result<Self, Self::Error> {
let mut lemma = value.lemma;
if value.path.is_empty() {
lemma.push(*lemma.first().ok_or(anyhow!("empty file proof"))?);
}
if lemma.len() != value.path.len() + 2 {
Err(anyhow!("invalid file proof"))
} else {
Ok(Self::new(lemma, value.path))
}
}
}
Loading

0 comments on commit 0c12350

Please sign in to comment.