Skip to content

Commit

Permalink
Reorder COLUMNs key by block_number
Browse files Browse the repository at this point in the history
Signed-off-by: Eval EXEC <[email protected]>
  • Loading branch information
eval-exec committed Jun 27, 2024
1 parent 189e665 commit fb68c30
Show file tree
Hide file tree
Showing 43 changed files with 975 additions and 470 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 5 additions & 8 deletions block-filter/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl BlockFilter {
header.hash()
);
let db = self.shared.store();
if db.get_block_filter_hash(&header.hash()).is_some() {
let num_hash = header.num_hash();
if db.get_block_filter_hash(num_hash.clone()).is_some() {
debug!(
"Filter data for block {:#x} already exists. Skip building.",
header.hash()
Expand All @@ -134,11 +135,11 @@ impl BlockFilter {
let parent_block_filter_hash = if header.is_genesis() {
Byte32::zero()
} else {
db.get_block_filter_hash(&header.parent_hash())
db.get_block_filter_hash(header.parent_num_hash())
.expect("parent block filter data stored")
};

let transactions = db.get_block_body(&header.hash());
let transactions = db.get_block_body_by_num_hash(num_hash.clone());
let transactions_size: usize = transactions.iter().map(|tx| tx.data().total_size()).sum();
let provider = WrappedChainDB::new(db);
let (filter_data, missing_out_points) = build_filter_data(provider, &transactions);
Expand All @@ -151,11 +152,7 @@ impl BlockFilter {
}
let db_transaction = db.begin_transaction();
db_transaction
.insert_block_filter(
&header.hash(),
&filter_data.pack(),
&parent_block_filter_hash,
)
.insert_block_filter(&num_hash, &filter_data.pack(), &parent_block_filter_hash)
.expect("insert_block_filter should be ok");
db_transaction.commit().expect("commit should be ok");
debug!("Inserted filter data for block: {}, hash: {:#x}, filter data size: {}, transactions size: {}", header.number(), header.hash(), filter_data.len(), transactions_size);
Expand Down
26 changes: 15 additions & 11 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ckb_types::{
},
packed::{Byte32, ProposalShortId},
utilities::merkle_mountain_range::ChainRootMMR,
U256,
BlockNumberAndHash, U256,
};
use ckb_verification::cache::Completed;
use ckb_verification::{BlockVerifier, InvalidParentError, NonContextualBlockTxsVerifier};
Expand Down Expand Up @@ -480,7 +480,7 @@ impl ChainService {
}
total_difficulty = cannon_total_difficulty.clone();
} else {
db_txn.insert_block_ext(&block.header().hash(), &ext)?;
db_txn.insert_block_ext(block.header().num_hash(), &ext)?;
}
db_txn.commit()?;

Expand Down Expand Up @@ -804,7 +804,7 @@ impl ChainService {

self.insert_ok_ext(
&txn,
&b.header().hash(),
b.header().num_hash(),
ext.clone(),
Some(&cache_entries),
Some(txs_sizes),
Expand All @@ -822,24 +822,28 @@ impl ChainService {
Err(err) => {
self.print_error(b, &err);
found_error = Some(err);
self.insert_failure_ext(&txn, &b.header().hash(), ext.clone())?;
self.insert_failure_ext(
&txn,
b.header().num_hash(),
ext.clone(),
)?;
}
}
}
Err(err) => {
found_error = Some(err);
self.insert_failure_ext(&txn, &b.header().hash(), ext.clone())?;
self.insert_failure_ext(&txn, b.header().num_hash(), ext.clone())?;
}
}
} else {
self.insert_failure_ext(&txn, &b.header().hash(), ext.clone())?;
self.insert_failure_ext(&txn, b.header().num_hash(), ext.clone())?;
}
} else {
txn.attach_block(b)?;
attach_block_cell(&txn, b)?;
mmr.push(b.digest())
.map_err(|e| InternalErrorKind::MMR.other(e))?;
self.insert_ok_ext(&txn, &b.header().hash(), ext.clone(), None, None)?;
self.insert_ok_ext(&txn, b.header().num_hash(), ext.clone(), None, None)?;
}
}

Expand Down Expand Up @@ -877,7 +881,7 @@ impl ChainService {
fn insert_ok_ext(
&self,
txn: &StoreTransaction,
hash: &Byte32,
num_hash: BlockNumberAndHash,
mut ext: BlockExt,
cache_entries: Option<&[Completed]>,
txs_sizes: Option<Vec<u64>>,
Expand All @@ -892,17 +896,17 @@ impl ChainService {
ext.cycles = Some(cycles);
}
ext.txs_sizes = txs_sizes;
txn.insert_block_ext(hash, &ext)
txn.insert_block_ext(num_hash, &ext)
}

fn insert_failure_ext(
&self,
txn: &StoreTransaction,
hash: &Byte32,
num_hash: BlockNumberAndHash,
mut ext: BlockExt,
) -> Result<(), Error> {
ext.verified = Some(false);
txn.insert_block_ext(hash, &ext)
txn.insert_block_ext(num_hash, &ext)
}

fn monitor_block_txs_verified(
Expand Down
7 changes: 5 additions & 2 deletions chain/src/tests/uncle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ fn test_get_block_body_after_inserting() {
chain_service
.process_block(Arc::new(blk.clone()), Switch::DISABLE_ALL)
.unwrap();
let len = shared.snapshot().get_block_body(&blk.hash()).len();
let len = shared
.snapshot()
.get_block_body(blk.number(), &blk.hash())
.len();
assert_eq!(len, 1, "[fork1] snapshot.get_block_body({})", blk.hash(),);
}
for blk in fork2.blocks() {
Expand All @@ -40,7 +43,7 @@ fn test_get_block_body_after_inserting() {
assert!(snapshot.get_block_header(&blk.hash()).is_some());
assert!(snapshot.get_block_uncles(&blk.hash()).is_some());
assert!(snapshot.get_block_proposal_txs_ids(&blk.hash()).is_some());
let len = snapshot.get_block_body(&blk.hash()).len();
let len = snapshot.get_block_body(blk.number(), &blk.hash()).len();
assert_eq!(len, 1, "[fork2] snapshot.get_block_body({})", blk.hash(),);
}
}
20 changes: 10 additions & 10 deletions db-migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use ckb_channel::select;
use ckb_channel::unbounded;
use ckb_channel::Receiver;
use ckb_db::{ReadOnlyDB, RocksDB};
use ckb_db_schema::{COLUMN_META, META_TIP_HEADER_KEY, MIGRATION_VERSION_KEY};
use ckb_db_schema::COLUMN_META;
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::{debug, error, info};
use ckb_stop_handler::register_thread;
Expand Down Expand Up @@ -79,7 +79,7 @@ impl MigrationWorker {
pb
};
if let Ok(db) = task.migrate(self.db.clone(), Arc::new(pb)) {
db.put_default(MIGRATION_VERSION_KEY, task.version())
db.put_default(COLUMN_META::MIGRATION_VERSION_KEY, task.version())
.map_err(|err| {
internal_error(format!("failed to migrate the database: {err}"))
})
Expand Down Expand Up @@ -117,7 +117,7 @@ impl Migrations {
/// Requires upgrade the executable binary.
pub fn check(&self, db: &ReadOnlyDB, include_background: bool) -> Ordering {
let db_version = match db
.get_pinned_default(MIGRATION_VERSION_KEY)
.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.expect("get the version of database")
{
Some(version_bytes) => {
Expand Down Expand Up @@ -152,7 +152,7 @@ impl Migrations {
/// Check if the migrations will consume a lot of time.
pub fn expensive(&self, db: &ReadOnlyDB, include_background: bool) -> bool {
let db_version = match db
.get_pinned_default(MIGRATION_VERSION_KEY)
.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.expect("get the version of database")
{
Some(version_bytes) => {
Expand All @@ -178,7 +178,7 @@ impl Migrations {
/// Check if all the pending migrations will be executed in background.
pub fn can_run_in_background(&self, db: &ReadOnlyDB) -> bool {
let db_version = match db
.get_pinned_default(MIGRATION_VERSION_KEY)
.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.expect("get the version of database")
{
Some(version_bytes) => {
Expand All @@ -198,7 +198,7 @@ impl Migrations {
}

fn is_non_empty_rdb(&self, db: &ReadOnlyDB) -> bool {
if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
if let Ok(v) = db.get_pinned(COLUMN_META::NAME, COLUMN_META::META_TIP_HEADER_KEY) {
if v.is_some() {
return true;
}
Expand All @@ -207,7 +207,7 @@ impl Migrations {
}

fn is_non_empty_db(&self, db: &RocksDB) -> bool {
if let Ok(v) = db.get_pinned(COLUMN_META, META_TIP_HEADER_KEY) {
if let Ok(v) = db.get_pinned(COLUMN_META::NAME, COLUMN_META::META_TIP_HEADER_KEY) {
if v.is_some() {
return true;
}
Expand All @@ -232,7 +232,7 @@ impl Migrations {
pb
};
db = m.migrate(db, Arc::new(pb))?;
db.put_default(MIGRATION_VERSION_KEY, m.version())
db.put_default(COLUMN_META::MIGRATION_VERSION_KEY, m.version())
.map_err(|err| internal_error(format!("failed to migrate the database: {err}")))?;
}
mpb.join_and_clear().expect("MultiProgress join");
Expand Down Expand Up @@ -273,7 +273,7 @@ impl Migrations {

fn get_migration_version(&self, db: &RocksDB) -> Result<Option<String>, Error> {
let raw = db
.get_pinned_default(MIGRATION_VERSION_KEY)
.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.map_err(|err| {
internal_error(format!("failed to get the version of database: {err}"))
})?;
Expand All @@ -289,7 +289,7 @@ impl Migrations {
if db_version.is_none() {
if let Some(m) = self.migrations.values().last() {
info!("Init database version {}", m.version());
db.put_default(MIGRATION_VERSION_KEY, m.version())
db.put_default(COLUMN_META::MIGRATION_VERSION_KEY, m.version())
.map_err(|err| {
internal_error(format!("failed to migrate the database: {err}"))
})?;
Expand Down
12 changes: 6 additions & 6 deletions db-migration/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use ckb_app_config::DBConfig;
use ckb_db::ReadOnlyDB;
use ckb_db::RocksDB;
use ckb_db_schema::MIGRATION_VERSION_KEY;
use ckb_db_schema::COLUMN_META;
use ckb_error::Error;
use indicatif::ProgressBar;
use std::sync::Arc;
Expand All @@ -26,7 +26,7 @@ fn test_default_migration() {
let r = migrations.migrate(db, false).unwrap();
assert_eq!(
b"20191116225943".to_vec(),
r.get_pinned_default(MIGRATION_VERSION_KEY)
r.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand All @@ -41,7 +41,7 @@ fn test_default_migration() {
.unwrap();
assert_eq!(
b"20191127101121".to_vec(),
r.get_pinned_default(MIGRATION_VERSION_KEY)
r.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand Down Expand Up @@ -117,7 +117,7 @@ fn test_customized_migration() {
);
assert_eq!(
VERSION.as_bytes(),
db.get_pinned_default(MIGRATION_VERSION_KEY)
db.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand Down Expand Up @@ -209,7 +209,7 @@ fn test_background_migration() {
let r = migrations.migrate(db, false).unwrap();
assert_eq!(
b"20191116225943".to_vec(),
r.get_pinned_default(MIGRATION_VERSION_KEY)
r.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand Down Expand Up @@ -248,7 +248,7 @@ fn test_background_migration() {
std::thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(
b"20241127101122".to_vec(),
db.get_pinned_default(MIGRATION_VERSION_KEY)
db.get_pinned_default(COLUMN_META::MIGRATION_VERSION_KEY)
.unwrap()
.unwrap()
.to_vec()
Expand Down
1 change: 1 addition & 0 deletions db-schema/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ repository = "https://github.com/nervosnetwork/ckb"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ckb-types = { version = "0.117.0-pre", path = "../util/types" }
Loading

0 comments on commit fb68c30

Please sign in to comment.