Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decompression traits and a test case #2295

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2389](https://github.com/FuelLabs/fuel-core/pull/2389): Fix construction of reverse iterator in RocksDB.

### Changed
- [2295](https://github.com/FuelLabs/fuel-core/pull/2295): `CombinedDb::from_config` now respects `state_rewind_policy` with tmp RocksDB.
- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.
- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently.

Expand Down
4 changes: 3 additions & 1 deletion benches/benches/block_target_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use fuel_core::{
Config,
FuelService,
},
state::historical_rocksdb::StateRewindPolicy,
};
use fuel_core_benches::{
default_gas_costs::default_gas_costs,
Expand Down Expand Up @@ -265,7 +266,8 @@ fn service_with_many_contracts(
.build()
.unwrap();
let _drop = rt.enter();
let mut database = Database::rocksdb_temp();
let mut database = Database::rocksdb_temp(StateRewindPolicy::NoRewind)
.expect("Failed to create database");

let mut chain_config = ChainConfig::local_testnet();

Expand Down
10 changes: 10 additions & 0 deletions crates/compression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use registry::RegistryKeyspace;
use fuel_core_types::{
blockchain::header::PartialBlockHeader,
fuel_tx::CompressedTransaction,
fuel_types::BlockHeight,
};
use registry::RegistrationsPerTable;

Expand All @@ -42,6 +43,15 @@ impl Default for VersionedCompressedBlock {
}
}

impl VersionedCompressedBlock {
/// Returns the height of the compressed block.
pub fn height(&self) -> &BlockHeight {
match self {
VersionedCompressedBlock::V0(block) => block.header.height(),
}
}
}

#[cfg(test)]
mod tests {
use fuel_core_compression as _;
Expand Down
2 changes: 1 addition & 1 deletion crates/compression/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ macro_rules! tables {


impl RegistrationsPerTable {
pub(crate) fn write_to_registry<R>(&self, registry: &mut R, timestamp: Tai64) -> anyhow::Result<()>
pub fn write_to_registry<R>(&self, registry: &mut R, timestamp: Tai64) -> anyhow::Result<()>
where
R: TemporalRegistryAll
{
Expand Down
17 changes: 16 additions & 1 deletion crates/fuel-core/src/combined_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ impl CombinedDatabase {
})
}

/// A test-only temporary rocksdb database with given rewind policy.
#[cfg(feature = "rocksdb")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be "test-helpers" if it's "test-only"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5251318

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this in beaee56 since it seems like we're using it with when empty path is provided, and I'm not sure if we need that outside tests as well.

pub fn temp_database_with_state_rewind_policy(
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
Ok(Self {
on_chain: Database::rocksdb_temp(state_rewind_policy)?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should use ShallowTempDir here? we moved it from fuel-core to the benches crate but if the use case is the same we can use it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason behind this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we have a way to clean up the directory after shutdown.. not sure if this method deletes the db after we are done with it.

off_chain: Database::rocksdb_temp(state_rewind_policy)?,
relayer: Default::default(),
gas_price: Default::default(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, why the difference for the gas_price database?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xgreenx might know, I have no idea

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because gas price table and relayer table doesn't need to support state rewind in the case of the temporary database(right now at least). Because we don't have any logic around it.

})
}

pub fn from_config(config: &CombinedDatabaseConfig) -> DatabaseResult<Self> {
let combined_database = match config.database_type {
#[cfg(feature = "rocksdb")]
Expand All @@ -114,7 +127,9 @@ impl CombinedDatabase {
tracing::warn!(
"No RocksDB path configured, initializing database with a tmp directory"
);
CombinedDatabase::default()
CombinedDatabase::temp_database_with_state_rewind_policy(
config.state_rewind_policy,
)?
} else {
tracing::info!(
"Opening database {:?} with cache size \"{}\" and state rewind policy \"{:?}\"",
Expand Down
14 changes: 7 additions & 7 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,11 @@ where
}

#[cfg(feature = "rocksdb")]
pub fn rocksdb_temp() -> Self {
let db = RocksDb::<Historical<Description>>::default_open_temp(None).unwrap();
let historical_db =
HistoricalRocksDB::new(db, StateRewindPolicy::NoRewind).unwrap();
pub fn rocksdb_temp(rewind_policy: StateRewindPolicy) -> Result<Self> {
let db = RocksDb::<Historical<Description>>::default_open_temp(None)?;
let historical_db = HistoricalRocksDB::new(db, rewind_policy)?;
let data = Arc::new(historical_db);
Self::from_storage(DataSource::new(data, Stage::default()))
Ok(Self::from_storage(DataSource::new(data, Stage::default())))
}
}

Expand All @@ -275,7 +274,8 @@ where
}
#[cfg(feature = "rocksdb")]
{
Self::rocksdb_temp()
Self::rocksdb_temp(StateRewindPolicy::NoRewind)
.expect("Failed to create a temporary database")
}
}
}
Expand Down Expand Up @@ -408,7 +408,7 @@ impl Modifiable for GenesisDatabase<Relayer> {
}
}

fn commit_changes_with_height_update<Description>(
pub fn commit_changes_with_height_update<Description>(
database: &mut Database<Description>,
changes: Changes,
heights_lookup: impl Fn(
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};

pub mod api_service;
mod da_compression;
pub mod da_compression;
pub mod database;
pub(crate) mod metrics_extension;
pub mod ports;
Expand Down
181 changes: 172 additions & 9 deletions crates/fuel-core/src/graphql_api/da_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@ use fuel_core_compression::{
config::Config,
ports::{
EvictorDb,
HistoryLookup,
TemporalRegistry,
UtxoIdToPointer,
},
};
use fuel_core_storage::{
not_found,
tables::{
Coins,
FuelBlocks,
Messages,
},
StorageAsMut,
StorageAsRef,
StorageInspect,
};
use fuel_core_types::{
blockchain::block::Block,
Expand Down Expand Up @@ -49,8 +56,8 @@ where
{
let compressed = compress(
config,
CompressTx {
db_tx,
CompressDbTx {
db_tx: DbTx { db_tx },
block_events,
},
block,
Expand All @@ -65,14 +72,23 @@ where
Ok(())
}

struct CompressTx<'a, Tx> {
db_tx: &'a mut Tx,
pub struct DbTx<'a, Tx> {
pub db_tx: &'a mut Tx,
}
netrome marked this conversation as resolved.
Show resolved Hide resolved

struct CompressDbTx<'a, Tx> {
db_tx: DbTx<'a, Tx>,
block_events: &'a [Event],
}

pub struct DecompressDbTx<'a, Tx, Onchain> {
pub db_tx: DbTx<'a, Tx>,
pub onchain_db: Onchain,
}

macro_rules! impl_temporal_registry {
($type:ty) => { paste::paste! {
impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx>
impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand Down Expand Up @@ -150,15 +166,87 @@ macro_rules! impl_temporal_registry {
}
}

impl<'a, Tx> EvictorDb<$type> for CompressTx<'a, Tx>
impl<'a, Tx> TemporalRegistry<$type> for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressDbTx<'a, Tx, Offchain>
where
Tx: OffChainDatabaseTransaction,
{
fn read_registry(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<$type> {
self.db_tx.read_registry(key)
}

fn read_timestamp(
&self,
key: &fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<Tai64> {
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
}

fn write_registry(
&mut self,
key: &fuel_core_types::fuel_compression::RegistryKey,
value: &$type,
timestamp: Tai64,
) -> anyhow::Result<()> {
self.db_tx.write_registry(key, value, timestamp)
}

fn registry_index_lookup(
&self,
value: &$type,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
{
self.db_tx.registry_index_lookup(value)
}
}

impl<'a, Tx> EvictorDb<$type> for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
fn set_latest_assigned_key(
&mut self,
key: fuel_core_types::fuel_compression::RegistryKey,
) -> anyhow::Result<()> {
self.db_tx
self.db_tx.db_tx
.storage_as_mut::<DaCompressionTemporalRegistryEvictorCache>()
.insert(&MetadataKey::$type, &key)?;
Ok(())
Expand All @@ -168,7 +256,7 @@ macro_rules! impl_temporal_registry {
&self,
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>> {
Ok(self
.db_tx
.db_tx.db_tx
.storage_as_ref::<DaCompressionTemporalRegistryEvictorCache>()
.get(&MetadataKey::$type)?
.map(|v| v.into_owned())
Expand All @@ -185,7 +273,7 @@ impl_temporal_registry!(ContractId);
impl_temporal_registry!(ScriptCode);
impl_temporal_registry!(PredicateCode);

impl<'a, Tx> UtxoIdToPointer for CompressTx<'a, Tx>
impl<'a, Tx> UtxoIdToPointer for CompressDbTx<'a, Tx>
where
Tx: OffChainDatabaseTransaction,
{
Expand All @@ -210,3 +298,78 @@ where
anyhow::bail!("UtxoId not found in the block events");
}
}

impl<'a, Tx, Onchain> HistoryLookup for DecompressDbTx<'a, Tx, Onchain>
where
Tx: OffChainDatabaseTransaction,
Onchain: StorageInspect<Coins, Error = fuel_core_storage::Error>
+ StorageInspect<Messages, Error = fuel_core_storage::Error>
+ StorageInspect<FuelBlocks, Error = fuel_core_storage::Error>,
{
fn utxo_id(
&self,
c: fuel_core_types::fuel_tx::CompressedUtxoId,
) -> anyhow::Result<fuel_core_types::fuel_tx::UtxoId> {
if c.tx_pointer.block_height() == 0u32.into() {
// This is a genesis coin, which is handled differently.
// See CoinConfigGenerator::generate which generates the genesis coins.
let mut bytes = [0u8; 32];
let tx_index = c.tx_pointer.tx_index();
bytes[..std::mem::size_of_val(&tx_index)]
.copy_from_slice(&tx_index.to_be_bytes());
return Ok(fuel_core_types::fuel_tx::UtxoId::new(
fuel_core_types::fuel_tx::TxId::from(bytes),
0,
));
}

let block_info = self
.onchain_db
.storage_as_ref::<FuelBlocks>()
.get(&c.tx_pointer.block_height())?
.ok_or(not_found!(FuelBlocks))?;

let tx_id = *block_info
.transactions()
.get(c.tx_pointer.tx_index() as usize)
.ok_or(anyhow::anyhow!(
"Transaction not found in the block: {:?}",
c.tx_pointer
))?;

Ok(fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index))
}

fn coin(
&self,
utxo_id: fuel_core_types::fuel_tx::UtxoId,
) -> anyhow::Result<fuel_core_compression::ports::CoinInfo> {
let coin = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Coins>()
.get(&utxo_id)?
.ok_or(not_found!(fuel_core_storage::tables::Coins))?;
Ok(fuel_core_compression::ports::CoinInfo {
owner: *coin.owner(),
asset_id: *coin.asset_id(),
amount: *coin.amount(),
})
}

fn message(
&self,
nonce: fuel_core_types::fuel_types::Nonce,
) -> anyhow::Result<fuel_core_compression::ports::MessageInfo> {
let message = self
.onchain_db
.storage_as_ref::<fuel_core_storage::tables::Messages>()
.get(&nonce)?
.ok_or(not_found!(fuel_core_storage::tables::Messages))?;
Ok(fuel_core_compression::ports::MessageInfo {
sender: *message.sender(),
recipient: *message.recipient(),
amount: message.amount(),
data: message.data().clone(),
})
}
}
2 changes: 1 addition & 1 deletion crates/fuel-core/src/state/generic_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<Storage> GenericDatabase<Storage> {
}

pub fn into_inner(self) -> Storage {
self.storage.into_inner()
self.storage.into_storage()
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/storage/src/structured_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<S> StructuredStorage<S> {
}

/// Returns the inner storage.
pub fn into_inner(self) -> S {
pub fn into_storage(self) -> S {
self.inner
}
}
Expand Down
Loading
Loading