Skip to content

Commit

Permalink
RPC return file pruned info (#266)
Browse files Browse the repository at this point in the history
* RPC return file pruned info

* return tx status in atomic manner

* fix clippy
  • Loading branch information
boqiu authored Nov 13, 2024
1 parent 16e70bd commit d93f453
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 14 deletions.
2 changes: 2 additions & 0 deletions node/rpc/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct FileInfo {
pub finalized: bool,
pub is_cached: bool,
pub uploaded_seg_num: usize,
/// Whether file is pruned, in which case `finalized` will be `false`.
pub pruned: bool,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
16 changes: 14 additions & 2 deletions node/rpc/src/zgs/impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result};
use storage::config::ShardConfig;
use storage::log_store::tx_store::TxStatus;
use storage::{try_option, H256};

pub struct RpcServerImpl {
Expand Down Expand Up @@ -245,7 +246,17 @@ impl RpcServerImpl {
}

async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
let finalized = self.ctx.log_store.check_tx_completed(tx.seq).await?;
let (finalized, pruned) = match self
.ctx
.log_store
.get_store()
.get_tx_status(TxSeqOrRoot::TxSeq(tx.seq))?
{
Some(TxStatus::Finalized) => (true, false),
Some(TxStatus::Pruned) => (false, true),
None => (false, false),
};

let (uploaded_seg_num, is_cached) = match self
.ctx
.chunk_pool
Expand All @@ -254,7 +265,7 @@ impl RpcServerImpl {
{
Some(v) => v,
_ => (
if finalized {
if finalized || pruned {
let chunks_per_segment = self.ctx.config.chunks_per_segment;
let (num_segments, _) = SegmentWithProof::split_file_into_segments(
tx.size as usize,
Expand All @@ -273,6 +284,7 @@ impl RpcServerImpl {
finalized,
is_cached,
uploaded_seg_num,
pruned,
})
}

Expand Down
14 changes: 13 additions & 1 deletion node/storage/src/log_store/log_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::config::ShardConfig;
use crate::log_store::flow_store::{
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
};
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore};
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore, TxStatus};
use crate::log_store::{
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
Expand All @@ -21,6 +21,7 @@ use rayon::prelude::ParallelSlice;
use shared_types::{
bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray,
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
TxSeqOrRoot,
};
use std::cmp::Ordering;

Expand Down Expand Up @@ -572,6 +573,17 @@ impl LogStoreRead for LogManager {
}))
}

fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>> {
let tx_seq = match tx_seq_or_data_root {
TxSeqOrRoot::TxSeq(v) => v,
TxSeqOrRoot::Root(root) => {
try_option!(self.tx_store.get_first_tx_seq_by_data_root(&root)?)
}
};

self.tx_store.get_tx_status(tx_seq)
}

fn check_tx_completed(&self, tx_seq: u64) -> crate::error::Result<bool> {
self.tx_store.check_tx_completed(tx_seq)
}
Expand Down
6 changes: 4 additions & 2 deletions node/storage/src/log_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use ethereum_types::H256;
use flow_store::PadPair;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
Transaction,
Transaction, TxSeqOrRoot,
};
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};

use crate::error::Result;

use self::tx_store::BlockHashAndSubmissionIndex;
use self::tx_store::{BlockHashAndSubmissionIndex, TxStatus};

pub mod config;
mod flow_store;
Expand Down Expand Up @@ -57,6 +57,8 @@ pub trait LogStoreRead: LogStoreChunkRead {

fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;

fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>>;

fn next_tx_seq(&self) -> u64;

fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
Expand Down
52 changes: 43 additions & 9 deletions node/storage/src/log_store/tx_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,31 @@ const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
const NEXT_TX_KEY: &str = "next_tx_seq";
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";

const TX_STATUS_FINALIZED: u8 = 0;
const TX_STATUS_PRUNED: u8 = 1;
pub enum TxStatus {
Finalized,
Pruned,
}

impl From<TxStatus> for u8 {
fn from(value: TxStatus) -> Self {
match value {
TxStatus::Finalized => 0,
TxStatus::Pruned => 1,
}
}
}

impl TryFrom<u8> for TxStatus {
type Error = anyhow::Error;

fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
match value {
0 => Ok(TxStatus::Finalized),
1 => Ok(TxStatus::Pruned),
_ => Err(anyhow!("invalid value for tx status {}", value)),
}
}
}

#[derive(Clone, Debug)]
pub struct BlockHashAndSubmissionIndex {
Expand Down Expand Up @@ -163,24 +186,35 @@ impl TransactionStore {
Ok(self.kvdb.put(
COL_TX_COMPLETED,
&tx_seq.to_be_bytes(),
&[TX_STATUS_FINALIZED],
&[TxStatus::Finalized.into()],
)?)
}

#[instrument(skip(self))]
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self
.kvdb
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?)
Ok(self.kvdb.put(
COL_TX_COMPLETED,
&tx_seq.to_be_bytes(),
&[TxStatus::Pruned.into()],
)?)
}

pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
let value = try_option!(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?);
match value.first() {
Some(v) => Ok(Some(TxStatus::try_from(*v)?)),
None => Ok(None),
}
}

pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
== Some(vec![TX_STATUS_FINALIZED]))
let status = self.get_tx_status(tx_seq)?;
Ok(matches!(status, Some(TxStatus::Finalized)))
}

pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED]))
let status = self.get_tx_status(tx_seq)?;
Ok(matches!(status, Some(TxStatus::Pruned)))
}

pub fn next_tx_seq(&self) -> u64 {
Expand Down

0 comments on commit d93f453

Please sign in to comment.