diff --git a/Cargo.toml b/Cargo.toml index 2d6377f34..57f3bd9d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ authors = ["Heliax "] edition = "2021" license = "GPL-3.0" readme = "README.md" -version = "1.0.0" +version = "1.1.4" [workspace.dependencies] clokwerk = "0.4.0" diff --git a/chain/src/main.rs b/chain/src/main.rs index f64aa4405..ee866438e 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -28,7 +28,9 @@ use shared::crawler_state::ChainCrawlerState; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; use shared::id::Id; use shared::token::Token; +use shared::utils::BalanceChange; use shared::validator::ValidatorSet; +use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse; use tendermint_rpc::HttpClient; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; @@ -140,6 +142,7 @@ async fn main() -> Result<(), MainError> { initial_query( &client, &conn, + checksums.clone(), config.initial_query_retry_time, config.initial_query_retry_attempts, ) @@ -186,46 +189,15 @@ async fn crawling_fn( return Err(MainError::NoAction); } - tracing::debug!(block = block_height, "Query block..."); - let tm_block_response = - tendermint_service::query_raw_block_at_height(&client, block_height) - .await - .into_rpc_error()?; - tracing::debug!( - block = block_height, - "Raw block contains {} txs...", - tm_block_response.block.data.len() - ); - - tracing::debug!(block = block_height, "Query block results..."); - let tm_block_results_response = - tendermint_service::query_raw_block_results_at_height( - &client, - block_height, - ) - .await - .into_rpc_error()?; - let block_results = BlockResult::from(tm_block_results_response); - - tracing::debug!(block = block_height, "Query epoch..."); - let epoch = - namada_service::get_epoch_at_block_height(&client, block_height) - .await - .into_rpc_error()?; - tracing::debug!(block = block_height, "Query first block in epoch..."); let first_block_in_epoch = namada_service::get_first_block_in_epoch(&client) .await .into_rpc_error()?; - let block = Block::from( - tm_block_response, - &block_results, - checksums, - epoch, - block_height, - ); + let (block, tm_block_response, epoch) = + get_block(block_height, &client, checksums).await?; + tracing::debug!( block = block_height, txs = block.transactions.len(), @@ -245,6 +217,15 @@ async fn crawling_fn( let addresses = block.addresses_with_balance_change(&native_token); + let block_proposer_address = block + .header + .proposer_address_namada + .as_ref() + .map(|address| BalanceChange { + address: Id::Account(address.clone()), + token: Token::Native(native_token.clone()), + }); + let pgf_receipient_addresses = if first_block_in_epoch.eq(&block_height) { conn.interact(move |conn| { namada_pgf_repository::get_pgf_receipients_balance_changes( @@ -260,8 +241,10 @@ async fn crawling_fn( HashSet::default() }; - let all_balance_changed_addresses = pgf_receipient_addresses - .union(&addresses) + let all_balance_changed_addresses = addresses + .iter() + .chain(block_proposer_address.iter()) + .chain(pgf_receipient_addresses.iter()) .cloned() .collect::>(); @@ -272,7 +255,13 @@ async fn crawling_fn( ) .await .into_rpc_error()?; - tracing::info!("Updating balance for {} addresses...", addresses.len()); + + tracing::debug!( + block = block_height, + addresses = all_balance_changed_addresses.len(), + "Updating balance for {} addresses...", + all_balance_changed_addresses.len() + ); let next_governance_proposal_id = namada_service::query_next_governance_id(&client, block_height) @@ -298,12 +287,18 @@ async fn crawling_fn( proposals_votes.len() ); - let validators = block.validators(); + let validators = block.new_validators(); let validator_set = ValidatorSet { validators: validators.clone(), epoch, }; + let validators_state_change = block.update_validators_state(); + tracing::debug!( + "Updating {} validators state", + validators_state_change.len() + ); + let addresses = block.bond_addresses(); let bonds = query_bonds(&client, addresses).await.into_rpc_error()?; tracing::debug!( @@ -368,6 +363,7 @@ async fn crawling_fn( withdraws = withdraw_addreses.len(), claimed_rewards = reward_claimers.len(), revealed_pks = revealed_pks.len(), + validator_state = validators_state_change.len(), epoch = epoch, first_block_in_epoch = first_block_in_epoch, block = block_height, @@ -383,6 +379,12 @@ async fn crawling_fn( ibc_tokens, )?; + repository::block::upsert_block( + transaction_conn, + block, + tm_block_response, + )?; + repository::balance::insert_balances( transaction_conn, balances, @@ -402,6 +404,11 @@ async fn crawling_fn( validator_set, )?; + repository::pos::upsert_validator_state( + transaction_conn, + validators_state_change, + )?; + // We first remove all the bonds and then insert the new ones repository::pos::clear_bonds( transaction_conn, @@ -453,29 +460,35 @@ async fn crawling_fn( async fn initial_query( client: &HttpClient, conn: &Object, + checksums: Checksums, retry_time: u64, retry_attempts: usize, ) -> Result<(), MainError> { let retry_strategy = ExponentialBackoff::from_millis(retry_time) .map(jitter) .take(retry_attempts); - Retry::spawn(retry_strategy, || try_initial_query(client, conn)).await + Retry::spawn(retry_strategy, || { + try_initial_query(client, conn, checksums.clone()) + }) + .await } async fn try_initial_query( client: &HttpClient, conn: &Object, + checksums: Checksums, ) -> Result<(), MainError> { tracing::debug!("Querying initial data..."); let block_height = query_last_block_height(client).await.into_rpc_error()?; - let epoch = namada_service::get_epoch_at_block_height(client, block_height) - .await - .into_rpc_error()?; + let first_block_in_epoch = namada_service::get_first_block_in_epoch(client) .await .into_rpc_error()?; + let (block, tm_block_response, epoch) = + get_block(block_height, client, checksums.clone()).await?; + let tokens = query_tokens(client).await.into_rpc_error()?; // This can sometimes fail if the last block height in the node has moved @@ -535,6 +548,12 @@ async fn try_initial_query( .run(|transaction_conn| { repository::balance::insert_tokens(transaction_conn, tokens)?; + repository::block::upsert_block( + transaction_conn, + block, + tm_block_response, + )?; + tracing::debug!( block = block_height, "Inserting {} balances...", @@ -611,3 +630,60 @@ async fn update_crawler_timestamp( .and_then(identity) .into_db_error() } + +async fn get_block( + block_height: u32, + client: &HttpClient, + checksums: Checksums, +) -> Result<(Block, TendermintBlockResponse, u32), MainError> { + tracing::debug!(block = block_height, "Query block..."); + let tm_block_response = + tendermint_service::query_raw_block_at_height(client, block_height) + .await + .into_rpc_error()?; + tracing::debug!( + block = block_height, + "Raw block contains {} txs...", + tm_block_response.block.data.len() + ); + + tracing::debug!(block = block_height, "Query block results..."); + let tm_block_results_response = + tendermint_service::query_raw_block_results_at_height( + client, + block_height, + ) + .await + .into_rpc_error()?; + let block_results = BlockResult::from(tm_block_results_response); + + tracing::debug!(block = block_height, "Query epoch..."); + let epoch = namada_service::get_epoch_at_block_height(client, block_height) + .await + .into_rpc_error()?; + + let proposer_address_namada = namada_service::get_validator_namada_address( + client, + &Id::from(&tm_block_response.block.header.proposer_address), + ) + .await + .into_rpc_error()?; + + tracing::info!( + block = block_height, + tm_address = tm_block_response.block.header.proposer_address.to_string(), + namada_address = ?proposer_address_namada, + "Got block proposer address" + ); + + let block = Block::from( + &tm_block_response, + &block_results, + &proposer_address_namada, + checksums, + epoch, + block_height, + ); + + Ok((block, tm_block_response, epoch)) +} diff --git a/chain/src/repository/balance.rs b/chain/src/repository/balance.rs index 101d08722..253e8c3ef 100644 --- a/chain/src/repository/balance.rs +++ b/chain/src/repository/balance.rs @@ -73,6 +73,8 @@ pub fn insert_tokens( #[cfg(test)] mod tests { + use std::collections::HashSet; + use anyhow::Context; use diesel::{ BoolExpressionMethods, ExpressionMethods, QueryDsl, SelectableHelper, @@ -80,6 +82,8 @@ mod tests { use namada_sdk::token::Amount as NamadaAmount; use namada_sdk::uint::MAX_SIGNED_VALUE; use orm::balances::BalanceDb; + use orm::blocks::BlockInsertDb; + use orm::schema::blocks; use orm::views::balances; use shared::balance::{Amount, Balance}; use shared::id::Id; @@ -130,6 +134,8 @@ mod tests { insert_tokens(conn, vec![token.clone()])?; + seed_blocks_from_balances(conn, &[balance.clone()])?; + insert_balances(conn, vec![balance.clone()])?; let queried_balance = query_balance_by_address(conn, owner, token)?; @@ -175,6 +181,7 @@ mod tests { ..(balance.clone()) }; + seed_blocks_from_balances(conn, &[new_balance.clone()])?; insert_balances(conn, vec![new_balance])?; let queried_balance = @@ -376,6 +383,8 @@ mod tests { seed_tokens_from_balance(conn, fake_balances.clone())?; + seed_blocks_from_balances(conn, &fake_balances)?; + insert_balances(conn, fake_balances.clone())?; assert_eq!(query_all_balances(conn)?.len(), fake_balances.len()); @@ -410,6 +419,7 @@ mod tests { insert_tokens(conn, vec![token.clone()])?; + seed_blocks_from_balances(conn, &[balance.clone()])?; insert_balances(conn, vec![balance.clone()])?; let queried_balance = query_balance_by_address(conn, owner, token)?; @@ -442,6 +452,8 @@ mod tests { insert_tokens(conn, vec![token])?; + seed_blocks_from_balances(conn, &balances)?; + let res = insert_balances(conn, balances); assert!(res.is_ok()); @@ -475,6 +487,8 @@ mod tests { seed_tokens_from_balance(conn, balances.clone())?; + seed_blocks_from_balances(conn, &balances)?; + let res = insert_balances(conn, balances); assert!(res.is_ok()); @@ -500,12 +514,33 @@ mod tests { anyhow::Ok(()) } + fn seed_blocks_from_balances( + conn: &mut PgConnection, + balances: &[Balance], + ) -> anyhow::Result<()> { + for height in balances + .iter() + .map(|balance| balance.height as i32) + .collect::>() + { + diesel::insert_into(blocks::table) + .values::<&BlockInsertDb>(&BlockInsertDb::fake(height)) + .on_conflict_do_nothing() + .execute(conn) + .context("Failed to insert block in db")?; + } + + anyhow::Ok(()) + } + fn seed_balance( conn: &mut PgConnection, balances: Vec, ) -> anyhow::Result<()> { seed_tokens_from_balance(conn, balances.clone())?; + seed_blocks_from_balances(conn, &balances)?; + diesel::insert_into(balance_changes::table) .values::<&Vec>( &balances diff --git a/chain/src/repository/block.rs b/chain/src/repository/block.rs new file mode 100644 index 000000000..5420dd939 --- /dev/null +++ b/chain/src/repository/block.rs @@ -0,0 +1,32 @@ +use anyhow::Context; +use diesel::upsert::excluded; +use diesel::{ExpressionMethods, PgConnection, RunQueryDsl}; +use orm::blocks::BlockInsertDb; +use orm::schema::blocks; +use shared::block::Block; +use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse; + +pub fn upsert_block( + transaction_conn: &mut PgConnection, + block: Block, + tm_block_response: TendermintBlockResponse, +) -> anyhow::Result<()> { + diesel::insert_into(blocks::table) + .values::<&BlockInsertDb>(&BlockInsertDb::from(( + block, + tm_block_response, + ))) + .on_conflict(blocks::height) + .do_update() + .set(( + blocks::hash.eq(excluded(blocks::hash)), + blocks::app_hash.eq(excluded(blocks::app_hash)), + blocks::timestamp.eq(excluded(blocks::timestamp)), + blocks::proposer.eq(excluded(blocks::proposer)), + blocks::epoch.eq(excluded(blocks::epoch)), + )) + .execute(transaction_conn) + .context("Failed to insert block in db")?; + + anyhow::Ok(()) +} diff --git a/chain/src/repository/mod.rs b/chain/src/repository/mod.rs index 296150acf..c33a98c69 100644 --- a/chain/src/repository/mod.rs +++ b/chain/src/repository/mod.rs @@ -1,4 +1,5 @@ pub mod balance; +pub mod block; pub mod crawler_state; pub mod gov; pub mod pgf; diff --git a/chain/src/repository/pos.rs b/chain/src/repository/pos.rs index 0960c229e..f85824e05 100644 --- a/chain/src/repository/pos.rs +++ b/chain/src/repository/pos.rs @@ -10,14 +10,17 @@ use orm::bond::BondInsertDb; use orm::schema::{bonds, pos_rewards, unbonds, validators}; use orm::unbond::UnbondInsertDb; use orm::validators::{ - ValidatorDb, ValidatorUpdateMetadataDb, ValidatorWithMetaInsertDb, + ValidatorDb, ValidatorStateDb, ValidatorUpdateMetadataDb, + ValidatorWithMetaInsertDb, }; use shared::block::Epoch; use shared::bond::Bonds; use shared::id::Id; use shared::tuple_len::TupleLen; use shared::unbond::{UnbondAddresses, Unbonds}; -use shared::validator::{ValidatorMetadataChange, ValidatorSet}; +use shared::validator::{ + ValidatorMetadataChange, ValidatorSet, ValidatorStateChange, +}; use super::utils::MAX_PARAM_SIZE; @@ -250,6 +253,30 @@ pub fn update_validator_metadata( anyhow::Ok(()) } +pub fn upsert_validator_state( + transaction_conn: &mut PgConnection, + validators_states: HashSet, +) -> anyhow::Result<()> { + for change in validators_states { + let state = ValidatorStateDb::from(change.state); + let validator_address = change.address.to_string(); + + diesel::update( + validators::table.filter( + validators::columns::namada_address.eq(validator_address), + ), + ) + .set(validators::columns::state.eq(state)) + .execute(transaction_conn) + .context(format!( + "Failed to update validator state for {}", + change.address + ))?; + } + + Ok(()) +} + pub fn upsert_validators( transaction_conn: &mut PgConnection, validators_set: ValidatorSet, diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index abc6a36d1..b87f96076 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -110,7 +110,7 @@ pub async fn query_balance( }) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await) } @@ -440,7 +440,7 @@ pub async fn query_bonds( Some(bonds) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -513,7 +513,7 @@ pub async fn query_unbonds( } }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -573,7 +573,7 @@ pub async fn query_tallies( Some((proposal, tally_type)) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -603,7 +603,7 @@ pub async fn query_all_votes( Some(votes) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -686,13 +686,26 @@ pub async fn get_validator_set_at_epoch( state: validator_state }) }) - .buffer_unordered(100) + .buffer_unordered(32) .try_collect::>() .await?; Ok(ValidatorSet { validators, epoch }) } +pub async fn get_validator_namada_address( + client: &HttpClient, + tm_addr: &Id, +) -> anyhow::Result> { + let validator = RPC + .vp() + .pos() + .validator_by_tm_addr(client, &tm_addr.to_string().to_uppercase()) + .await?; + + Ok(validator.map(Id::from)) +} + pub async fn query_pipeline_length(client: &HttpClient) -> anyhow::Result { let pos_parameters = rpc::get_pos_params(client) .await diff --git a/governance/src/services/namada.rs b/governance/src/services/namada.rs index 8d1ccd281..ca09b292d 100644 --- a/governance/src/services/namada.rs +++ b/governance/src/services/namada.rs @@ -83,7 +83,7 @@ pub async fn get_governance_proposals_updates( } }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await) } diff --git a/orm/Cargo.toml b/orm/Cargo.toml index 892fbd590..3d0234cf1 100644 --- a/orm/Cargo.toml +++ b/orm/Cargo.toml @@ -24,3 +24,4 @@ shared.workspace = true bigdecimal.workspace = true chrono.workspace = true serde_json.workspace = true +tendermint-rpc.workspace = true diff --git a/orm/migrations/2024-07-04-103941_crawler_state/down.sql b/orm/migrations/2024-07-04-103941_crawler_state/down.sql index 9122f91e4..23de38c6d 100644 --- a/orm/migrations/2024-07-04-103941_crawler_state/down.sql +++ b/orm/migrations/2024-07-04-103941_crawler_state/down.sql @@ -1,4 +1,5 @@ -- This file should undo anything in `up.sql` DROP TABLE crawler_state; + DROP TYPE CRAWLER_NAME; diff --git a/orm/migrations/2024-12-01-170248_ibc_ack/down.sql b/orm/migrations/2024-12-01-170248_ibc_ack/down.sql new file mode 100644 index 000000000..0e3dc5ef1 --- /dev/null +++ b/orm/migrations/2024-12-01-170248_ibc_ack/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS ibc_ack; \ No newline at end of file diff --git a/orm/migrations/2024-12-01-170248_ibc_ack/up.sql b/orm/migrations/2024-12-01-170248_ibc_ack/up.sql new file mode 100644 index 000000000..652a3e23e --- /dev/null +++ b/orm/migrations/2024-12-01-170248_ibc_ack/up.sql @@ -0,0 +1,9 @@ +-- Your SQL goes here +CREATE TYPE IBC_STATUS AS ENUM ('fail', 'success', 'timeout', 'unknown'); + +CREATE TABLE ibc_ack ( + id VARCHAR PRIMARY KEY, + tx_hash VARCHAR NOT NULL, + timeout BIGINT NOT NULL, + status IBC_STATUS NOT NULL +); diff --git a/orm/migrations/2024-12-09-225148_init_blocks/down.sql b/orm/migrations/2024-12-09-225148_init_blocks/down.sql new file mode 100644 index 000000000..a57d19684 --- /dev/null +++ b/orm/migrations/2024-12-09-225148_init_blocks/down.sql @@ -0,0 +1,9 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE balance_changes + DROP CONSTRAINT fk_balance_changes_height; + +ALTER TABLE wrapper_transactions + DROP CONSTRAINT fk_wrapper_transactions_height; + +DROP TABLE IF EXISTS blocks; + diff --git a/orm/migrations/2024-12-09-225148_init_blocks/up.sql b/orm/migrations/2024-12-09-225148_init_blocks/up.sql new file mode 100644 index 000000000..2c519cce3 --- /dev/null +++ b/orm/migrations/2024-12-09-225148_init_blocks/up.sql @@ -0,0 +1,40 @@ +-- Your SQL goes here +CREATE TABLE blocks ( + height integer PRIMARY KEY, + hash VARCHAR(64), + app_hash varchar(64), + timestamp timestamp, + proposer varchar, + epoch int +); + +ALTER TABLE blocks + ADD UNIQUE (hash); + +CREATE INDEX index_blocks_epoch ON blocks (epoch); + +-- Populate null blocks for all existing wrapper_transactions and balance_changes to satisfy foreign key constraints +INSERT INTO blocks ( SELECT DISTINCT + height, + NULL::varchar AS hash, + NULL::varchar AS app_hash, + NULL::timestamp AS timestamp, + NULL::varchar AS proposer, + NULL::int AS epoch + FROM ( SELECT DISTINCT + block_height AS height + FROM + wrapper_transactions + UNION + SELECT DISTINCT + height + FROM + balance_changes)); + +-- Create foreign key constraints for wrapper_transactions and balance_changes +ALTER TABLE wrapper_transactions + ADD CONSTRAINT fk_wrapper_transactions_height FOREIGN KEY (block_height) REFERENCES blocks (height) ON DELETE RESTRICT; + +ALTER TABLE balance_changes + ADD CONSTRAINT fk_balance_changes_height FOREIGN KEY (height) REFERENCES blocks (height) ON DELETE RESTRICT; + diff --git a/orm/migrations/2024-12-10-104502_transaction_types/down.sql b/orm/migrations/2024-12-10-104502_transaction_types/down.sql new file mode 100644 index 000000000..0a1dceb69 --- /dev/null +++ b/orm/migrations/2024-12-10-104502_transaction_types/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TYPE IBC_STATUS; \ No newline at end of file diff --git a/orm/migrations/2024-12-10-104502_transaction_types/up.sql b/orm/migrations/2024-12-10-104502_transaction_types/up.sql new file mode 100644 index 000000000..6656ce277 --- /dev/null +++ b/orm/migrations/2024-12-10-104502_transaction_types/up.sql @@ -0,0 +1,4 @@ +-- Your SQL goes here +ALTER TYPE TRANSACTION_KIND ADD VALUE 'reactivate_validator'; +ALTER TYPE TRANSACTION_KIND ADD VALUE 'deactivate_validator'; +ALTER TYPE TRANSACTION_KIND ADD VALUE 'unjail_validator'; \ No newline at end of file diff --git a/orm/migrations/2024-12-10-110059_validator_states/down.sql b/orm/migrations/2024-12-10-110059_validator_states/down.sql new file mode 100644 index 000000000..d9a93fe9a --- /dev/null +++ b/orm/migrations/2024-12-10-110059_validator_states/down.sql @@ -0,0 +1 @@ +-- This file should undo anything in `up.sql` diff --git a/orm/migrations/2024-12-10-110059_validator_states/up.sql b/orm/migrations/2024-12-10-110059_validator_states/up.sql new file mode 100644 index 000000000..ef893141f --- /dev/null +++ b/orm/migrations/2024-12-10-110059_validator_states/up.sql @@ -0,0 +1,4 @@ +-- Your SQL goes here +ALTER TYPE VALIDATOR_STATE ADD VALUE 'deactivating'; +ALTER TYPE VALIDATOR_STATE ADD VALUE 'reactivating'; +ALTER TYPE VALIDATOR_STATE ADD VALUE 'unjailing'; \ No newline at end of file diff --git a/orm/migrations/2024-12-17-095036_transaction_gas_used/down.sql b/orm/migrations/2024-12-17-095036_transaction_gas_used/down.sql new file mode 100644 index 000000000..0e2cb6fad --- /dev/null +++ b/orm/migrations/2024-12-17-095036_transaction_gas_used/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE wrapper_transactions DROP COLUMN gas_used; \ No newline at end of file diff --git a/orm/migrations/2024-12-17-095036_transaction_gas_used/up.sql b/orm/migrations/2024-12-17-095036_transaction_gas_used/up.sql new file mode 100644 index 000000000..2ace24652 --- /dev/null +++ b/orm/migrations/2024-12-17-095036_transaction_gas_used/up.sql @@ -0,0 +1,2 @@ +-- Your SQL goes here +ALTER TABLE wrapper_transactions ADD COLUMN gas_used VARCHAR; \ No newline at end of file diff --git a/orm/src/blocks.rs b/orm/src/blocks.rs new file mode 100644 index 000000000..09a03a616 --- /dev/null +++ b/orm/src/blocks.rs @@ -0,0 +1,59 @@ +use diesel::{Insertable, Queryable, Selectable}; +use shared::block::Block; +use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse; + +use crate::schema::blocks; + +#[derive(Insertable, Clone, Queryable, Selectable, Debug)] +#[diesel(table_name = blocks)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct BlockInsertDb { + pub height: i32, + pub hash: Option, + pub app_hash: Option, + pub timestamp: Option, + pub proposer: Option, + pub epoch: Option, +} + +pub type BlockDb = BlockInsertDb; + +impl From<(Block, TendermintBlockResponse)> for BlockInsertDb { + fn from( + (block, tm_block_response): (Block, TendermintBlockResponse), + ) -> Self { + let timestamp = chrono::DateTime::from_timestamp( + tm_block_response.block.header.time.unix_timestamp(), + 0, + ) + .expect("Invalid timestamp") + .naive_utc(); + + Self { + height: block.header.height as i32, + hash: Some(block.hash.to_string()), + app_hash: Some(block.header.app_hash.to_string()), + timestamp: Some(timestamp), + proposer: block.header.proposer_address_namada, + epoch: Some(block.epoch as i32), + } + } +} + +impl BlockInsertDb { + pub fn fake(height: i32) -> Self { + Self { + height, + hash: Some(height.to_string()), /* fake hash but ensures + * uniqueness + * with height */ + app_hash: Some("fake_app_hash".to_string()), /* doesn't require + * uniqueness */ + timestamp: Some( + chrono::DateTime::from_timestamp(0, 0).unwrap().naive_utc(), + ), + proposer: Some("fake_proposer".to_string()), + epoch: Some(0), + } + } +} diff --git a/orm/src/ibc.rs b/orm/src/ibc.rs new file mode 100644 index 000000000..0583f47e5 --- /dev/null +++ b/orm/src/ibc.rs @@ -0,0 +1,56 @@ +use diesel::prelude::Queryable; +use diesel::{AsChangeset, Insertable, Selectable}; +use serde::{Deserialize, Serialize}; +use shared::transaction::{IbcAckStatus, IbcSequence}; + +use crate::schema::ibc_ack; + +#[derive(Debug, Clone, Serialize, Deserialize, diesel_derive_enum::DbEnum)] +#[ExistingTypePath = "crate::schema::sql_types::IbcStatus"] +pub enum IbcAckStatusDb { + Unknown, + Timeout, + Fail, + Success, +} + +impl From for IbcAckStatusDb { + fn from(value: IbcAckStatus) -> Self { + match value { + IbcAckStatus::Success => Self::Success, + IbcAckStatus::Fail => Self::Fail, + IbcAckStatus::Timeout => Self::Timeout, + IbcAckStatus::Unknown => Self::Unknown, + } + } +} + +#[derive(Serialize, Queryable, Insertable, Selectable, Clone, Debug)] +#[diesel(table_name = ibc_ack)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct IbcAckDb { + pub id: String, + pub tx_hash: String, + pub timeout: i64, + pub status: IbcAckStatusDb, +} + +pub type IbcAckInsertDb = IbcAckDb; + +impl From for IbcAckInsertDb { + fn from(value: IbcSequence) -> Self { + Self { + id: value.id(), + tx_hash: value.tx_id.to_string(), + timeout: value.timeout as i64, + status: IbcAckStatusDb::Unknown, + } + } +} + +#[derive(Serialize, AsChangeset, Clone)] +#[diesel(table_name = ibc_ack)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct IbcSequencekStatusUpdateDb { + pub status: IbcAckStatusDb, +} diff --git a/orm/src/lib.rs b/orm/src/lib.rs index 82aa2cdd3..2427200c7 100644 --- a/orm/src/lib.rs +++ b/orm/src/lib.rs @@ -1,4 +1,5 @@ pub mod balances; +pub mod blocks; pub mod bond; pub mod crawler_state; pub mod gas; @@ -6,6 +7,7 @@ pub mod governance_proposal; pub mod governance_votes; pub mod group_by_macros; pub mod helpers; +pub mod ibc; pub mod migrations; pub mod parameters; pub mod pgf; diff --git a/orm/src/schema.rs b/orm/src/schema.rs index 0314a7053..67262f842 100644 --- a/orm/src/schema.rs +++ b/orm/src/schema.rs @@ -49,6 +49,14 @@ pub mod sql_types { #[diesel(postgres_type(name = "payment_recurrence"))] pub struct PaymentRecurrence; + #[derive( + diesel::query_builder::QueryId, + std::fmt::Debug, + diesel::sql_types::SqlType, + )] + #[diesel(postgres_type(name = "ibc_status"))] + pub struct IbcStatus; + #[derive( diesel::query_builder::QueryId, std::fmt::Debug, @@ -101,6 +109,19 @@ diesel::table! { } } +diesel::table! { + blocks (height) { + height -> Int4, + #[max_length = 64] + hash -> Nullable, + #[max_length = 64] + app_hash -> Nullable, + timestamp -> Nullable, + proposer -> Nullable, + epoch -> Nullable, + } +} + diesel::table! { bonds (id) { id -> Int4, @@ -196,6 +217,18 @@ diesel::table! { } } +diesel::table! { + use diesel::sql_types::*; + use super::sql_types::IbcStatus; + + ibc_ack (id) { + id -> Varchar, + tx_hash -> Varchar, + timeout -> Int8, + status -> IbcStatus, + } +} + diesel::table! { ibc_token (address) { #[max_length = 45] @@ -307,9 +340,11 @@ diesel::table! { block_height -> Int4, exit_code -> TransactionResult, atomic -> Bool, + gas_used -> Nullable, } } +diesel::joinable!(balance_changes -> blocks (height)); diesel::joinable!(balance_changes -> token (token)); diesel::joinable!(bonds -> validators (validator_id)); diesel::joinable!(governance_votes -> governance_proposals (proposal_id)); @@ -318,9 +353,11 @@ diesel::joinable!(inner_transactions -> wrapper_transactions (wrapper_id)); diesel::joinable!(pos_rewards -> validators (validator_id)); diesel::joinable!(public_good_funding -> governance_proposals (proposal_id)); diesel::joinable!(unbonds -> validators (validator_id)); +diesel::joinable!(wrapper_transactions -> blocks (block_height)); diesel::allow_tables_to_appear_in_same_query!( balance_changes, + blocks, bonds, chain_parameters, crawler_state, @@ -328,6 +365,7 @@ diesel::allow_tables_to_appear_in_same_query!( gas_price, governance_proposals, governance_votes, + ibc_ack, ibc_token, inner_transactions, pos_rewards, diff --git a/orm/src/transactions.rs b/orm/src/transactions.rs index b4f820fb3..7a9438f92 100644 --- a/orm/src/transactions.rs +++ b/orm/src/transactions.rs @@ -26,6 +26,9 @@ pub enum TransactionKindDb { ChangeCommission, RevealPk, BecomeValidator, + ReactivateValidator, + DeactivateValidator, + UnjailValidator, Unknown, } @@ -33,31 +36,28 @@ impl From for TransactionKindDb { fn from(value: TransactionKind) -> Self { match value { TransactionKind::TransparentTransfer(_) => { - TransactionKindDb::TransparentTransfer + Self::TransparentTransfer } - TransactionKind::ShieldedTransfer(_) => { - TransactionKindDb::ShieldedTransfer + TransactionKind::ShieldedTransfer(_) => Self::ShieldedTransfer, + TransactionKind::IbcMsgTransfer(_) => Self::IbcMsgTransfer, + TransactionKind::Bond(_) => Self::Bond, + TransactionKind::Redelegation(_) => Self::Redelegation, + TransactionKind::Unbond(_) => Self::Unbond, + TransactionKind::Withdraw(_) => Self::Withdraw, + TransactionKind::ClaimRewards(_) => Self::ClaimRewards, + TransactionKind::ProposalVote(_) => Self::VoteProposal, + TransactionKind::InitProposal(_) => Self::InitProposal, + TransactionKind::MetadataChange(_) => Self::ChangeMetadata, + TransactionKind::CommissionChange(_) => Self::ChangeCommission, + TransactionKind::DeactivateValidator(_) => { + Self::DeactivateValidator } - TransactionKind::IbcMsgTransfer(_) => { - TransactionKindDb::IbcMsgTransfer - } - TransactionKind::Bond(_) => TransactionKindDb::Bond, - TransactionKind::Redelegation(_) => TransactionKindDb::Redelegation, - TransactionKind::Unbond(_) => TransactionKindDb::Unbond, - TransactionKind::Withdraw(_) => TransactionKindDb::Withdraw, - TransactionKind::ClaimRewards(_) => TransactionKindDb::ClaimRewards, - TransactionKind::ProposalVote(_) => TransactionKindDb::VoteProposal, - TransactionKind::InitProposal(_) => TransactionKindDb::InitProposal, - TransactionKind::MetadataChange(_) => { - TransactionKindDb::ChangeMetadata - } - TransactionKind::CommissionChange(_) => { - TransactionKindDb::ChangeCommission - } - TransactionKind::RevealPk(_) => TransactionKindDb::RevealPk, - TransactionKind::BecomeValidator(_) => { - TransactionKindDb::BecomeValidator + TransactionKind::ReactivateValidator(_) => { + Self::ReactivateValidator } + TransactionKind::RevealPk(_) => Self::RevealPk, + TransactionKind::BecomeValidator(_) => Self::BecomeValidator, + TransactionKind::UnjailValidator(_) => Self::UnjailValidator, TransactionKind::Unknown(_) => TransactionKindDb::Unknown, } } @@ -114,6 +114,7 @@ pub struct WrapperTransactionInsertDb { pub fee_payer: String, pub fee_token: String, pub gas_limit: String, + pub gas_used: Option, pub block_height: i32, pub exit_code: TransactionResultDb, pub atomic: bool, @@ -128,6 +129,7 @@ impl WrapperTransactionInsertDb { fee_payer: tx.fee.gas_payer.to_string(), fee_token: tx.fee.gas_token.to_string(), gas_limit: tx.fee.gas, + gas_used: tx.fee.gas_used, block_height: tx.block_height as i32, exit_code: TransactionResultDb::from(tx.exit_code), atomic: tx.atomic, diff --git a/orm/src/validators.rs b/orm/src/validators.rs index 79027c0dc..395325a95 100644 --- a/orm/src/validators.rs +++ b/orm/src/validators.rs @@ -28,6 +28,9 @@ pub enum ValidatorStateDb { BelowThreshold, Inactive, Jailed, + Deactivating, + Reactivating, + Unjailing, Unknown, } @@ -39,6 +42,9 @@ impl From for ValidatorStateDb { ValidatorState::BelowThreshold => Self::BelowThreshold, ValidatorState::Inactive => Self::Inactive, ValidatorState::Jailed => Self::Jailed, + ValidatorState::Deactivating => Self::Deactivating, + ValidatorState::Reactivating => Self::Reactivating, + ValidatorState::Unjailing => Self::Unjailing, ValidatorState::Unknown => Self::Unknown, } } @@ -90,6 +96,14 @@ pub struct ValidatorWithMetaInsertDb { pub state: ValidatorStateDb, } +#[derive(Serialize, Insertable, Clone)] +#[diesel(table_name = validators)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct ValidatorStateChangeDb { + pub namada_address: String, + pub state: ValidatorStateDb, +} + #[derive(Serialize, AsChangeset, Clone)] #[diesel(table_name = validators)] #[diesel(check_for_backend(diesel::pg::Pg))] diff --git a/pos/src/repository/pos.rs b/pos/src/repository/pos.rs index 3efaddd37..2a644ac2c 100644 --- a/pos/src/repository/pos.rs +++ b/pos/src/repository/pos.rs @@ -19,6 +19,7 @@ pub fn upsert_validators( .eq(excluded(validators::columns::max_commission)), validators::columns::commission .eq(excluded(validators::columns::commission)), + validators::columns::state.eq(excluded(validators::columns::state)), )) .execute(transaction_conn) .context("Failed to update validators in db")?; diff --git a/pos/src/services/namada.rs b/pos/src/services/namada.rs index 9261648a8..fa18e9e4a 100644 --- a/pos/src/services/namada.rs +++ b/pos/src/services/namada.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use anyhow::Context; use futures::{StreamExt, TryStreamExt}; use namada_core::chain::Epoch as NamadaSdkEpoch; +use namada_sdk::address::Address; use namada_sdk::rpc; use shared::block::Epoch; use shared::id::Id; @@ -84,7 +85,65 @@ pub async fn get_validator_set_at_epoch( state: validator_state }) }) - .buffer_unordered(100) + .buffer_unordered(32) + .try_collect::>() + .await?; + + Ok(ValidatorSet { validators, epoch }) +} + +pub async fn get_validators_state( + client: &HttpClient, + validators: Vec, + epoch: Epoch, +) -> anyhow::Result { + let namada_epoch = to_epoch(epoch); + + let validators = futures::stream::iter(validators) + .map(|mut validator| async move { + let validator_address = Address::from(validator.address.clone()); + let validator_state = rpc::get_validator_state( + client, + &validator_address, + Some(namada_epoch), + ) + .await + .with_context(|| { + format!("Failed to query validator {validator_address} state") + })?; + + let validator_state = validator_state + .0 + .map(ValidatorState::from) + .unwrap_or(ValidatorState::Unknown); + + let from_unjailing_state = + validator.state.eq(&ValidatorState::Unjailing) + && !validator_state.eq(&ValidatorState::Jailed); + let from_deactivating_state = + validator.state.eq(&ValidatorState::Deactivating) + && validator_state.eq(&ValidatorState::Inactive); + let from_reactivating_state = + validator.state.eq(&ValidatorState::Reactivating) + && !validator_state.eq(&ValidatorState::Inactive); + let from_concrete_state = ![ + ValidatorState::Deactivating, + ValidatorState::Reactivating, + ValidatorState::Unjailing, + ] + .contains(&validator.state); + + if from_unjailing_state + || from_deactivating_state + || from_reactivating_state + || from_concrete_state + { + validator.state = validator_state; + } + + anyhow::Ok(validator) + }) + .buffer_unordered(32) .try_collect::>() .await?; diff --git a/rewards/src/main.rs b/rewards/src/main.rs index a28e09f54..74a108c0c 100644 --- a/rewards/src/main.rs +++ b/rewards/src/main.rs @@ -81,7 +81,7 @@ async fn crawling_fn( return Err(MainError::NoAction); } - tracing::info!("Starting to update proposals..."); + tracing::info!("Starting to update pos rewards..."); // TODO: change this by querying all the pairs in the database let delegations_pairs = namada_service::query_delegation_pairs(&client) diff --git a/rewards/src/repository/mod.rs b/rewards/src/repository/mod.rs index 9c4ae7b13..399a24fdb 100644 --- a/rewards/src/repository/mod.rs +++ b/rewards/src/repository/mod.rs @@ -1,2 +1,3 @@ pub mod crawler_state; pub mod pos_rewards; +mod utils; diff --git a/rewards/src/repository/pos_rewards.rs b/rewards/src/repository/pos_rewards.rs index 1987926f5..255715c44 100644 --- a/rewards/src/repository/pos_rewards.rs +++ b/rewards/src/repository/pos_rewards.rs @@ -1,12 +1,33 @@ +use anyhow::Context; use diesel::upsert::excluded; use diesel::{ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl}; use orm::pos_rewards::PosRewardInsertDb; use orm::schema::{pos_rewards, validators}; use shared::rewards::Reward; +use shared::tuple_len::TupleLen; + +use super::utils::MAX_PARAM_SIZE; pub fn upsert_rewards( transaction_conn: &mut PgConnection, rewards: Vec, +) -> anyhow::Result<()> { + let rewards_col_count = pos_rewards::all_columns.len() as i64; + + for chunk in rewards + .into_iter() + .collect::>() + .chunks((MAX_PARAM_SIZE as i64 / rewards_col_count) as usize) + { + upsert_rewards_chunk(transaction_conn, chunk.to_vec())?; + } + + anyhow::Ok(()) +} + +fn upsert_rewards_chunk( + transaction_conn: &mut PgConnection, + rewards: Vec, ) -> anyhow::Result<()> { diesel::insert_into(pos_rewards::table) .values::>( @@ -37,7 +58,8 @@ pub fn upsert_rewards( pos_rewards::columns::raw_amount .eq(excluded(pos_rewards::columns::raw_amount)), ) - .execute(transaction_conn)?; + .execute(transaction_conn) + .context("Failed to upsert rewards in db")?; Ok(()) } diff --git a/rewards/src/repository/utils.rs b/rewards/src/repository/utils.rs new file mode 100644 index 000000000..bd4b8ce6d --- /dev/null +++ b/rewards/src/repository/utils.rs @@ -0,0 +1,4 @@ +// Represents maximum number of parameters that we can insert into postgres in +// one go. To get the number of rows that we can insert in one chunk, we have to +// divide MAX_PARAM_SIZE by the number of columns in the given table. +pub const MAX_PARAM_SIZE: u16 = u16::MAX; diff --git a/rewards/src/services/namada.rs b/rewards/src/services/namada.rs index af020d3fd..06a5c225d 100644 --- a/rewards/src/services/namada.rs +++ b/rewards/src/services/namada.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::time::Duration; use anyhow::Context; use futures::StreamExt; @@ -37,13 +38,97 @@ pub async fn query_rewards( client: &HttpClient, delegation_pairs: HashSet, ) -> anyhow::Result> { - Ok(futures::stream::iter(delegation_pairs) + let mut all_rewards: Vec = Vec::new(); + + let batches: Vec<(usize, Vec)> = delegation_pairs + .clone() + .into_iter() + .collect::>() + .chunks(32) + .enumerate() + .map(|(i, chunk)| (i, chunk.to_vec())) + .collect(); + + tracing::info!( + "Got {} batches with a total of {} rewards to query...", + batches.len(), + delegation_pairs.len() + ); + + let results = futures::stream::iter(batches) + .map(|batch| process_batch_with_retries(client, batch)) + .buffer_unordered(3) + .collect::>() + .await; + + tracing::info!("Done fetching rewards!"); + + for result in results { + match result { + Ok(mut rewards) => all_rewards.append(&mut rewards), + Err(err) => return Err(err), + } + } + + Ok(all_rewards) +} + +pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result { + let epoch = rpc::query_epoch(client) + .await + .context("Failed to query Namada's current epoch")?; + + Ok(epoch.0 as Epoch) +} + +async fn process_batch_with_retries( + client: &HttpClient, + batch: (usize, Vec), +) -> anyhow::Result> { + let mut retries = 0; + + tracing::info!("Processing batch {}", batch.0); + loop { + let result = process_batch(client, batch.1.clone()).await; + + match result { + Ok(rewards) => { + tracing::info!("Batch {} done!", batch.0); + return Ok(rewards); + } + Err(err) => { + retries += 1; + tracing::warn!( + "Batch reward failed (attempt {}/{}) - Error: {:?}", + retries, + 3, + err + ); + + if retries >= 3 { + tracing::error!( + "Batch reward failed after maximum retries." + ); + return Err(err); + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } +} + +async fn process_batch( + client: &HttpClient, + batch: Vec, +) -> anyhow::Result> { + Ok(futures::stream::iter(batch) .filter_map(|delegation| async move { - tracing::info!( + tracing::debug!( "Fetching rewards {} -> {} ...", delegation.validator_address, delegation.delegator_address ); + let reward = RPC .vp() .pos() @@ -55,7 +140,7 @@ pub async fn query_rewards( .await .ok()?; - tracing::info!( + tracing::debug!( "Done fetching reward for {} -> {}!", delegation.validator_address, delegation.delegator_address @@ -67,15 +152,7 @@ pub async fn query_rewards( }) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await) } - -pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result { - let epoch = rpc::query_epoch(client) - .await - .context("Failed to query Namada's current epoch")?; - - Ok(epoch.0 as Epoch) -} diff --git a/shared/src/balance.rs b/shared/src/balance.rs index 9912f5f95..9c96d9ba9 100644 --- a/shared/src/balance.rs +++ b/shared/src/balance.rs @@ -110,7 +110,7 @@ impl Balance { owner: Id::Account(address.to_string()), token, amount: Amount::fake(), - height: (0..10000).fake::(), + height: 0, } } } diff --git a/shared/src/block.rs b/shared/src/block.rs index 7827455e2..44b812327 100644 --- a/shared/src/block.rs +++ b/shared/src/block.rs @@ -24,7 +24,9 @@ use crate::transaction::{ }; use crate::unbond::UnbondAddresses; use crate::utils::BalanceChange; -use crate::validator::{Validator, ValidatorMetadataChange, ValidatorState}; +use crate::validator::{ + Validator, ValidatorMetadataChange, ValidatorState, ValidatorStateChange, +}; use crate::vote::GovernanceVote; pub type Epoch = u32; @@ -107,8 +109,11 @@ pub struct Block { impl Block { pub fn from( - block_response: TendermintBlockResponse, + block_response: &TendermintBlockResponse, block_results: &BlockResult, + proposer_address_namada: &Option, /* Provide the namada address + * of the proposer, if + * available */ checksums: Checksums, epoch: Epoch, block_height: BlockHeight, @@ -138,14 +143,17 @@ impl Block { header: BlockHeader { height: block_response.block.header.height.value() as BlockHeight, - proposer_address: block_response + proposer_address_tm: block_response .block .header .proposer_address .to_string() .to_lowercase(), + proposer_address_namada: proposer_address_namada + .as_ref() + .map(Id::to_string), timestamp: block_response.block.header.time.to_string(), - app_hash: Id::from(block_response.block.header.app_hash), + app_hash: Id::from(&block_response.block.header.app_hash), }, transactions, epoch, @@ -506,7 +514,7 @@ impl Block { Some(recv_msg) } - pub fn validators(&self) -> HashSet { + pub fn new_validators(&self) -> HashSet { self.transactions .iter() .flat_map(|(_, txs)| txs) @@ -538,6 +546,34 @@ impl Block { .collect() } + pub fn update_validators_state(&self) -> HashSet { + self.transactions + .iter() + .flat_map(|(_, txs)| txs) + .filter(|tx| { + tx.data.is_some() + && tx.exit_code == TransactionExitStatus::Applied + }) + .filter_map(|tx| match &tx.kind { + TransactionKind::DeactivateValidator(data) => { + let data = data.clone().unwrap(); + Some(ValidatorStateChange { + address: Id::from(data), + state: ValidatorState::Deactivating, + }) + } + TransactionKind::ReactivateValidator(data) => { + let data = data.clone().unwrap(); + Some(ValidatorStateChange { + address: Id::from(data), + state: ValidatorState::Reactivating, + }) + } + _ => None, + }) + .collect() + } + pub fn bond_addresses(&self) -> HashSet { self.transactions .iter() diff --git a/shared/src/block_result.rs b/shared/src/block_result.rs index d0e7f08d8..a140abe9d 100644 --- a/shared/src/block_result.rs +++ b/shared/src/block_result.rs @@ -10,6 +10,7 @@ use crate::transaction::TransactionExitStatus; #[derive(Debug, Clone)] pub enum EventKind { Applied, + SendPacket, Unknown, } @@ -17,6 +18,7 @@ impl From<&String> for EventKind { fn from(value: &String) -> Self { match value.as_str() { "tx/applied" => Self::Applied, + "send_packet" => Self::SendPacket, _ => Self::Unknown, } } @@ -32,7 +34,7 @@ pub struct BlockResult { #[derive(Debug, Clone)] pub struct Event { pub kind: EventKind, - pub attributes: Option, + pub attributes: Option, } #[derive(Debug, Clone, Default, Copy)] @@ -107,7 +109,7 @@ impl BatchResults { } #[derive(Debug, Clone, Default)] -pub struct TxAttributes { +pub struct TxApplied { pub code: TxEventStatusCode, pub gas: u64, pub hash: Id, @@ -116,14 +118,59 @@ pub struct TxAttributes { pub info: String, } -impl TxAttributes { +#[derive(Debug, Clone, Default)] +pub struct SendPacket { + pub source_port: String, + pub dest_port: String, + pub source_channel: String, + pub dest_channel: String, + pub timeout_timestamp: u64, + pub sequence: String, +} + +#[derive(Debug, Clone)] +pub enum TxAttributesType { + TxApplied(TxApplied), + SendPacket(SendPacket), +} + +impl TxAttributesType { pub fn deserialize( event_kind: &EventKind, attributes: &BTreeMap, ) -> Option { match event_kind { EventKind::Unknown => None, - EventKind::Applied => Some(Self { + EventKind::SendPacket => { + let source_port = + attributes.get("packet_src_port").unwrap().to_owned(); + let dest_port = + attributes.get("packet_dst_port").unwrap().to_owned(); + let source_channel = + attributes.get("packet_src_channel").unwrap().to_owned(); + let dest_channel = + attributes.get("packet_dst_channel").unwrap().to_owned(); + let sequence = + attributes.get("packet_sequence").unwrap().to_owned(); + let timeout_timestamp = attributes + .get("packet_timeout_timestamp") + .unwrap_or(&"0".to_string()) + .parse::() + .unwrap_or_default() + .to_owned(); + + tracing::error!("{}", timeout_timestamp); + + Some(Self::SendPacket(SendPacket { + source_port, + dest_port, + source_channel, + dest_channel, + timeout_timestamp, + sequence, + })) + } + EventKind::Applied => Some(Self::TxApplied(TxApplied { code: attributes .get("code") .map(|code| TxEventStatusCode::from(code.as_str())) @@ -153,7 +200,7 @@ impl TxAttributes { }) .unwrap(), info: attributes.get("info").unwrap().to_owned(), - }), + })), } } } @@ -177,7 +224,7 @@ impl From for BlockResult { }, ); let attributes = - TxAttributes::deserialize(&kind, &raw_attributes); + TxAttributesType::deserialize(&kind, &raw_attributes); Event { kind, attributes } }) .collect::>(); @@ -198,7 +245,7 @@ impl From for BlockResult { }, ); let attributes = - TxAttributes::deserialize(&kind, &raw_attributes); + TxAttributesType::deserialize(&kind, &raw_attributes); Event { kind, attributes } }) .collect::>(); @@ -221,7 +268,15 @@ impl BlockResult { let exit_status = self .end_events .iter() - .filter_map(|event| event.attributes.clone()) + .filter_map(|event| { + if let Some(TxAttributesType::TxApplied(data)) = + &event.attributes + { + Some(data.clone()) + } else { + None + } + }) .find(|attributes| attributes.hash.eq(tx_hash)) .map(|attributes| attributes.clone().code) .map(TransactionExitStatus::from); @@ -229,6 +284,22 @@ impl BlockResult { exit_status.unwrap_or(TransactionExitStatus::Rejected) } + pub fn gas_used(&self, tx_hash: &Id) -> Option { + self.end_events + .iter() + .filter_map(|event| { + if let Some(TxAttributesType::TxApplied(data)) = + &event.attributes + { + Some(data.clone()) + } else { + None + } + }) + .find(|attributes| attributes.hash.eq(tx_hash)) + .map(|attributes| attributes.gas.to_string()) + } + pub fn is_inner_tx_accepted( &self, wrapper_hash: &Id, @@ -237,7 +308,15 @@ impl BlockResult { let exit_status = self .end_events .iter() - .filter_map(|event| event.attributes.clone()) + .filter_map(|event| { + if let Some(TxAttributesType::TxApplied(data)) = + &event.attributes + { + Some(data.clone()) + } else { + None + } + }) .find(|attributes| attributes.hash.eq(wrapper_hash)) .map(|attributes| attributes.batch.is_successful(inner_hash)) .map(|successful| match successful { diff --git a/shared/src/header.rs b/shared/src/header.rs index 792618895..decf3a9f9 100644 --- a/shared/src/header.rs +++ b/shared/src/header.rs @@ -4,7 +4,8 @@ use crate::block::BlockHeight; #[derive(Debug, Clone, Default)] pub struct BlockHeader { pub height: BlockHeight, - pub proposer_address: String, + pub proposer_address_tm: String, + pub proposer_address_namada: Option, pub timestamp: String, pub app_hash: Id, } diff --git a/shared/src/id.rs b/shared/src/id.rs index 556cee83f..f598ca70c 100644 --- a/shared/src/id.rs +++ b/shared/src/id.rs @@ -46,8 +46,8 @@ impl From for Id { } } -impl From for Id { - fn from(value: TendermintAppHash) -> Self { +impl From<&TendermintAppHash> for Id { + fn from(value: &TendermintAppHash) -> Self { Self::Hash(value.to_string()) } } diff --git a/shared/src/ser.rs b/shared/src/ser.rs index a776b783f..a9e9ee609 100644 --- a/shared/src/ser.rs +++ b/shared/src/ser.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use namada_core::address::Address; use namada_core::masp::MaspTxId; +use namada_sdk::borsh::BorshSerializeExt; use namada_sdk::ibc::IbcMessage as NamadaIbcMessage; use namada_sdk::token::{ Account as NamadaAccount, DenominatedAmount as NamadaDenominatedAmount, @@ -10,6 +11,7 @@ use namada_sdk::token::{ }; use serde::ser::SerializeStruct; use serde::{Deserialize, Serialize}; +use subtle_encoding::hex; #[derive(Debug, Clone)] pub struct AccountsMap(pub BTreeMap); @@ -120,10 +122,19 @@ impl Serialize for IbcMessage { state.end() } - NamadaIbcMessage::Envelope(_) => { - let state = serializer.serialize_struct("IbcEnvelope", 0)?; + NamadaIbcMessage::Envelope(data) => { + let mut state = + serializer.serialize_struct("IbcEnvelope", 1)?; + + // todo: implement this bs :( - // TODO: serialize envelope message correctly + state.serialize_field( + "data", + &String::from_utf8_lossy(&hex::encode( + data.serialize_to_vec(), + )) + .into_owned(), + )?; state.end() } diff --git a/shared/src/transaction.rs b/shared/src/transaction.rs index 79cabbcb0..4ffb2e531 100644 --- a/shared/src/transaction.rs +++ b/shared/src/transaction.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::fmt::Display; use namada_governance::{InitProposalData, VoteProposalData}; +use namada_sdk::address::Address; use namada_sdk::borsh::BorshDeserialize; use namada_sdk::key::common::PublicKey; use namada_sdk::masp::ShieldedTransfer; @@ -81,6 +82,9 @@ pub enum TransactionKind { CommissionChange(Option), RevealPk(Option), BecomeValidator(Option>), + ReactivateValidator(Option
), + DeactivateValidator(Option
), + UnjailValidator(Option
), Unknown(Option), } @@ -185,6 +189,22 @@ impl TransactionKind { }; TransactionKind::RevealPk(data) } + "tx_deactivate_validator" => { + let data = if let Ok(data) = Address::try_from_slice(data) { + Some(data) + } else { + None + }; + TransactionKind::DeactivateValidator(data) + } + "tx_reactivate_validator" => { + let data = if let Ok(data) = Address::try_from_slice(data) { + Some(data) + } else { + None + }; + TransactionKind::ReactivateValidator(data) + } "tx_ibc" => { let data = if let Ok(data) = namada_ibc::decode_message::(data) @@ -196,6 +216,14 @@ impl TransactionKind { }; TransactionKind::IbcMsgTransfer(data.map(IbcMessage)) } + "tx_unjail_validator" => { + let data = if let Ok(data) = Address::try_from_slice(data) { + Some(data) + } else { + None + }; + TransactionKind::UnjailValidator(data) + } "tx_become_validator" => { let data = if let Ok(data) = BecomeValidator::try_from_slice(data) { @@ -283,6 +311,7 @@ impl InnerTransaction { #[derive(Debug, Clone)] pub struct Fee { pub gas: String, + pub gas_used: Option, pub amount_per_gas_unit: String, pub gas_payer: Id, pub gas_token: Id, @@ -304,9 +333,11 @@ impl Transaction { let wrapper_tx_id = Id::from(transaction.header_hash()); let wrapper_tx_status = block_results.is_wrapper_tx_applied(&wrapper_tx_id); + let gas_used = block_results.gas_used(&wrapper_tx_id); let fee = Fee { gas: Uint::from(wrapper.gas_limit).to_string(), + gas_used, amount_per_gas_unit: wrapper .fee .amount_per_gas_unit @@ -434,3 +465,72 @@ impl Transaction { self.extra_sections.get(§ion_id).cloned() } } + +#[derive(Debug, Clone)] +pub struct IbcSequence { + pub sequence_number: String, + pub source_port: String, + pub dest_port: String, + pub source_channel: String, + pub dest_channel: String, + pub timeout: u64, + pub tx_id: Id, +} + +impl IbcSequence { + pub fn id(&self) -> String { + format!( + "{}/{}/{}/{}/{}", + self.dest_port, + self.dest_channel, + self.source_port, + self.source_channel, + self.sequence_number + ) + } +} + +#[derive(Debug, Clone)] +pub enum IbcAckStatus { + Success, + Fail, + Timeout, + Unknown, +} + +#[derive(Debug, Clone)] +pub struct IbcAck { + pub sequence_number: String, + pub source_port: String, + pub dest_port: String, + pub source_channel: String, + pub dest_channel: String, + pub status: IbcAckStatus, +} + +impl IbcAck { + pub fn id_source(&self) -> String { + format!( + "{}/{}/{}", + self.source_port, self.source_channel, self.sequence_number + ) + } + + pub fn id_dest(&self) -> String { + format!( + "{}/{}/{}", + self.dest_port, self.dest_channel, self.sequence_number + ) + } + + pub fn id(&self) -> String { + format!( + "{}/{}/{}/{}/{}", + self.dest_port, + self.dest_channel, + self.source_port, + self.source_channel, + self.sequence_number + ) + } +} diff --git a/shared/src/validator.rs b/shared/src/validator.rs index eead4e117..da5e6a7d9 100644 --- a/shared/src/validator.rs +++ b/shared/src/validator.rs @@ -18,6 +18,9 @@ pub enum ValidatorState { BelowThreshold, Inactive, Jailed, + Deactivating, + Reactivating, + Unjailing, Unknown, } @@ -43,6 +46,19 @@ pub struct ValidatorSet { pub epoch: Epoch, } +impl ValidatorSet { + pub fn union(&self, validator_set: &ValidatorSet) -> Self { + ValidatorSet { + validators: self + .validators + .union(&validator_set.validators) + .cloned() + .collect::>(), + epoch: self.epoch, + } + } +} + #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct Validator { pub address: Id, @@ -70,6 +86,12 @@ pub struct ValidatorMetadataChange { pub avatar: Option, } +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct ValidatorStateChange { + pub address: Id, + pub state: ValidatorState, +} + impl Validator { pub fn fake() -> Self { let address = diff --git a/swagger.yml b/swagger.yml index f56f1c739..e2d960bab 100644 --- a/swagger.yml +++ b/swagger.yml @@ -8,6 +8,8 @@ info: url: https://github.com/anoma/namada-indexer servers: - url: http://localhost:5001 + - url: https://namada-rpc.mandragora.io + - url: https://indexer.namada.tududes.com:443 paths: /health: get: @@ -113,6 +115,12 @@ paths: type: integer minimum: 1 description: Pagination parameter + - in: query + name: activeAt + schema: + type: integer + minimum: 0 + description: Get all bonds that are active at this epoch responses: '200': description: A list of bonds. @@ -175,6 +183,12 @@ paths: type: integer minimum: 1 description: Pagination parameter + - in: query + name: activeAt + schema: + type: integer + minimum: 0 + description: Get all unbonds that are active at this epoch( < ) responses: '200': description: A list of unbonds. @@ -621,6 +635,64 @@ paths: application/json: schema: $ref: '#/components/schemas/InnerTransaction' + /api/v1/ibc/{tx_id}/status: + get: + summary: Get the status of an IBC transfer by tx id + parameters: + - in: path + name: tx_id + schema: + type: string + required: true + description: Tx id hash + responses: + '200': + description: Status of the IBC transfer + content: + application/json: + schema: + type: array + items: + type: object + required: [status] + properties: + name: + type: string + enum: [unknown, timeout, success, fail] + /api/v1/block/height/{value}: + get: + summary: Get the block by height + parameters: + - in: path + name: value + schema: + type: number + required: true + description: Block height + responses: + '200': + description: Block info + content: + application/json: + schema: + $ref: '#/components/schemas/Block' + /api/v1/block/timestamp/{value}: + get: + summary: Get the block by timestamp + parameters: + - in: path + name: value + schema: + type: number + required: true + description: Block timestamp + responses: + '200': + description: Block info + content: + application/json: + schema: + $ref: '#/components/schemas/Block' /api/v1/crawlers/timestamps: get: summary: Get timestamps of the last activity of the crawlers @@ -686,7 +758,7 @@ components: $ref: '#/components/schemas/ValidatorStatus' ValidatorStatus: type: string - enum: [consensus, belowCapacity, belowThreshold, inactive, jailed, unknown] + enum: [consensus, belowCapacity, belowThreshold, inactive, jailed, unknown, unjailing, deactivating, reactivating] Proposal: type: object required: [id, content, type, author, startEpoch, endEpoch, activationEpoch, startTime, endTime, currentTime, activationTime, status, yayVotes, nayVotes, abstainVotes, tallyType] @@ -914,6 +986,8 @@ components: type: string gasLimit: type: string + gasUsed: + type: string blockHeight: type: string innerTransactions: @@ -926,7 +1000,7 @@ components: type: string kind: type: string - enum: ["transparentTransfer", "shieldedTransfer", "shieldingTransfer", "unshieldingTransfer", "bond", "redelegation", "unbond", "withdraw", "claimRewards", "voteProposal", "initProposal", "changeMetadata", "changeCommission", "revealPk", "unknown"] + enum: ["transparentTransfer", "shieldedTransfer", "shieldingTransfer", "unshieldingTransfer", "bond", "redelegation", "unbond", "withdraw", "claimRewards", "voteProposal", "initProposal", "changeMetadata", "changeCommission", "revealPk", "deactivateValidator", "reactivateValidator", "unjailValidator", "unknown"] exitCode: type: string enum: [applied, rejected] @@ -957,3 +1031,19 @@ components: type: string data: type: string + Block: + type: object + required: [height] + properties: + height: + type: string + hash: + type: string + appHash: + type: string + timestamp: + type: string + proposer: + type: string + epoch: + type: string diff --git a/transactions/Cargo.toml b/transactions/Cargo.toml index 57ff1f28d..7967dc86b 100644 --- a/transactions/Cargo.toml +++ b/transactions/Cargo.toml @@ -27,6 +27,8 @@ deadpool-diesel.workspace = true diesel.workspace = true diesel_migrations.workspace = true orm.workspace = true +clap-verbosity-flag.workspace = true +serde_json.workspace = true [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/transactions/src/main.rs b/transactions/src/main.rs index f6bbfc65e..fbaf35a5e 100644 --- a/transactions/src/main.rs +++ b/transactions/src/main.rs @@ -12,13 +12,16 @@ use shared::checksums::Checksums; use shared::crawler::crawl; use shared::crawler_state::BlockCrawlerState; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; +use shared::id::Id; use tendermint_rpc::HttpClient; use transactions::app_state::AppState; use transactions::config::AppConfig; -use transactions::repository::transactions as transaction_repo; +use transactions::repository::{ + block as block_repo, transactions as transaction_repo, +}; use transactions::services::{ db as db_service, namada as namada_service, - tendermint as tendermint_service, + tendermint as tendermint_service, tx as tx_service, }; #[tokio::main] @@ -114,22 +117,43 @@ async fn crawling_fn( .into_rpc_error()?; let block_results = BlockResult::from(tm_block_results_response); + let proposer_address_namada = namada_service::get_validator_namada_address( + &client, + &Id::from(&tm_block_response.block.header.proposer_address), + ) + .await + .into_rpc_error()?; + + tracing::debug!( + block = block_height, + tm_address = tm_block_response.block.header.proposer_address.to_string(), + namada_address = ?proposer_address_namada, + "Got block proposer address" + ); + let block = Block::from( - tm_block_response.clone(), + &tm_block_response, &block_results, + &proposer_address_namada, checksums, - 1_u32, + 1_u32, // placeholder, we dont need the epoch here block_height, ); let inner_txs = block.inner_txs(); let wrapper_txs = block.wrapper_txs(); - tracing::debug!( - block = block_height, - txs = inner_txs.len(), - "Deserialized {} txs...", - wrapper_txs.len() + inner_txs.len() + let ibc_sequence_packet = + tx_service::get_ibc_packets(&block_results, &inner_txs); + let ibc_ack_packet = tx_service::get_ibc_ack_packet(&inner_txs); + + tracing::info!( + "Deserialized {} wrappers, {} inners, {} ibc sequence numbers and {} \ + ibc acks events...", + wrapper_txs.len(), + inner_txs.len(), + ibc_sequence_packet.len(), + ibc_ack_packet.len() ); // Because transaction crawler starts from block 1 we read timestamp from @@ -151,6 +175,11 @@ async fn crawling_fn( conn.build_transaction() .read_write() .run(|transaction_conn| { + block_repo::upsert_block( + transaction_conn, + block, + tm_block_response, + )?; transaction_repo::insert_wrapper_transactions( transaction_conn, wrapper_txs, @@ -164,6 +193,16 @@ async fn crawling_fn( crawler_state, )?; + transaction_repo::insert_ibc_sequence( + transaction_conn, + ibc_sequence_packet, + )?; + + transaction_repo::update_ibc_sequence( + transaction_conn, + ibc_ack_packet, + )?; + anyhow::Ok(()) }) }) diff --git a/transactions/src/repository/block.rs b/transactions/src/repository/block.rs new file mode 100644 index 000000000..5420dd939 --- /dev/null +++ b/transactions/src/repository/block.rs @@ -0,0 +1,32 @@ +use anyhow::Context; +use diesel::upsert::excluded; +use diesel::{ExpressionMethods, PgConnection, RunQueryDsl}; +use orm::blocks::BlockInsertDb; +use orm::schema::blocks; +use shared::block::Block; +use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse; + +pub fn upsert_block( + transaction_conn: &mut PgConnection, + block: Block, + tm_block_response: TendermintBlockResponse, +) -> anyhow::Result<()> { + diesel::insert_into(blocks::table) + .values::<&BlockInsertDb>(&BlockInsertDb::from(( + block, + tm_block_response, + ))) + .on_conflict(blocks::height) + .do_update() + .set(( + blocks::hash.eq(excluded(blocks::hash)), + blocks::app_hash.eq(excluded(blocks::app_hash)), + blocks::timestamp.eq(excluded(blocks::timestamp)), + blocks::proposer.eq(excluded(blocks::proposer)), + blocks::epoch.eq(excluded(blocks::epoch)), + )) + .execute(transaction_conn) + .context("Failed to insert block in db")?; + + anyhow::Ok(()) +} diff --git a/transactions/src/repository/mod.rs b/transactions/src/repository/mod.rs index 0824d7a9c..5ae69f54d 100644 --- a/transactions/src/repository/mod.rs +++ b/transactions/src/repository/mod.rs @@ -1 +1,2 @@ +pub mod block; pub mod transactions; diff --git a/transactions/src/repository/transactions.rs b/transactions/src/repository/transactions.rs index 5a97e60c8..781e8c8f7 100644 --- a/transactions/src/repository/transactions.rs +++ b/transactions/src/repository/transactions.rs @@ -1,12 +1,20 @@ use anyhow::Context; use chrono::NaiveDateTime; use diesel::upsert::excluded; -use diesel::{ExpressionMethods, PgConnection, RunQueryDsl}; +use diesel::{ + ExpressionMethods, OptionalEmptyChangesetExtension, PgConnection, + RunQueryDsl, +}; use orm::crawler_state::{BlockStateInsertDb, CrawlerNameDb}; -use orm::schema::{crawler_state, inner_transactions, wrapper_transactions}; +use orm::ibc::{IbcAckInsertDb, IbcAckStatusDb, IbcSequencekStatusUpdateDb}; +use orm::schema::{ + crawler_state, ibc_ack, inner_transactions, wrapper_transactions, +}; use orm::transactions::{InnerTransactionInsertDb, WrapperTransactionInsertDb}; use shared::crawler_state::{BlockCrawlerState, CrawlerName}; -use shared::transaction::{InnerTransaction, WrapperTransaction}; +use shared::transaction::{ + IbcAck, IbcSequence, InnerTransaction, WrapperTransaction, +}; pub fn insert_inner_transactions( transaction_conn: &mut PgConnection, @@ -87,3 +95,38 @@ pub fn update_crawler_timestamp( anyhow::Ok(()) } + +pub fn insert_ibc_sequence( + transaction_conn: &mut PgConnection, + ibc_sequences: Vec, +) -> anyhow::Result<()> { + diesel::insert_into(ibc_ack::table) + .values::>( + ibc_sequences + .into_iter() + .map(IbcAckInsertDb::from) + .collect(), + ) + .execute(transaction_conn) + .context("Failed to update crawler state in db")?; + + anyhow::Ok(()) +} + +pub fn update_ibc_sequence( + transaction_conn: &mut PgConnection, + ibc_acks: Vec, +) -> anyhow::Result<()> { + for ack in ibc_acks { + let ack_update = IbcSequencekStatusUpdateDb { + status: IbcAckStatusDb::from(ack.status.clone()), + }; + diesel::update(ibc_ack::table) + .set(ack_update) + .filter(ibc_ack::dsl::id.eq(ack.id())) + .execute(transaction_conn) + .optional_empty_changeset() + .context("Failed to update validator metadata in db")?; + } + anyhow::Ok(()) +} diff --git a/transactions/src/services/mod.rs b/transactions/src/services/mod.rs index a9dfa39f9..233652f55 100644 --- a/transactions/src/services/mod.rs +++ b/transactions/src/services/mod.rs @@ -1,3 +1,4 @@ pub mod db; pub mod namada; pub mod tendermint; +pub mod tx; diff --git a/transactions/src/services/namada.rs b/transactions/src/services/namada.rs index 51d412202..cde1d642e 100644 --- a/transactions/src/services/namada.rs +++ b/transactions/src/services/namada.rs @@ -55,3 +55,16 @@ pub async fn query_tx_code_hash( None } } + +pub async fn get_validator_namada_address( + client: &HttpClient, + tm_addr: &Id, +) -> anyhow::Result> { + let validator = RPC + .vp() + .pos() + .validator_by_tm_addr(client, &tm_addr.to_string().to_uppercase()) + .await?; + + Ok(validator.map(Id::from)) +} diff --git a/transactions/src/services/tx.rs b/transactions/src/services/tx.rs new file mode 100644 index 000000000..833421442 --- /dev/null +++ b/transactions/src/services/tx.rs @@ -0,0 +1,115 @@ +use namada_sdk::ibc::core::channel::types::acknowledgement::AcknowledgementStatus; +use namada_sdk::ibc::core::channel::types::msgs::PacketMsg; +use namada_sdk::ibc::core::handler::types::msgs::MsgEnvelope; +use shared::block_result::{BlockResult, TxAttributesType}; +use shared::ser::IbcMessage; +use shared::transaction::{ + IbcAck, IbcAckStatus, IbcSequence, InnerTransaction, TransactionExitStatus, + TransactionKind, +}; + +pub fn get_ibc_packets( + block_results: &BlockResult, + inner_txs: &[InnerTransaction], +) -> Vec { + let mut ibc_txs = inner_txs + .iter() + .filter_map(|tx| { + if matches!( + tx.kind, + TransactionKind::IbcMsgTransfer(Some(IbcMessage(_))) + ) && matches!(tx.exit_code, TransactionExitStatus::Applied) + { + Some(tx.tx_id.clone()) + } else { + None + } + }) + .collect::>(); + + ibc_txs.reverse(); + + block_results + .end_events + .iter() + .filter_map(|event| { + if let Some(attributes) = &event.attributes { + match attributes { + TxAttributesType::SendPacket(packet) => Some(IbcSequence { + sequence_number: packet.sequence.clone(), + source_port: packet.source_port.clone(), + dest_port: packet.dest_port.clone(), + source_channel: packet.source_channel.clone(), + dest_channel: packet.dest_channel.clone(), + timeout: packet.timeout_timestamp, + tx_id: ibc_txs.pop().unwrap(), + }), + _ => None, + } + } else { + None + } + }) + .collect::>() +} + +pub fn get_ibc_ack_packet(inner_txs: &[InnerTransaction]) -> Vec { + inner_txs.iter().filter_map(|tx| match tx.kind.clone() { + TransactionKind::IbcMsgTransfer(Some(ibc_message)) => match ibc_message.0 { + namada_sdk::ibc::IbcMessage::Envelope(msg_envelope) => { + match *msg_envelope { + MsgEnvelope::Packet(packet_msg) => match packet_msg { + PacketMsg::Recv(_) => None, + PacketMsg::Ack(msg) => { + let ack = match serde_json::from_slice::< + AcknowledgementStatus, + >( + msg.acknowledgement.as_bytes() + ) { + Ok(status) => IbcAck { + sequence_number: msg.packet.seq_on_a.to_string(), + source_port: msg.packet.port_id_on_a.to_string(), + dest_port: msg.packet.port_id_on_b.to_string(), + source_channel: msg.packet.chan_id_on_a.to_string(), + dest_channel: msg.packet.chan_id_on_b.to_string(), + status: match status { + AcknowledgementStatus::Success(_) => IbcAckStatus::Success, + AcknowledgementStatus::Error(_) => IbcAckStatus::Fail, + }, + }, + Err(_) => IbcAck { + sequence_number: msg.packet.seq_on_a.to_string(), + source_port: msg.packet.port_id_on_a.to_string(), + dest_port: msg.packet.port_id_on_b.to_string(), + source_channel: msg.packet.chan_id_on_a.to_string(), + dest_channel: msg.packet.chan_id_on_b.to_string(), + status: IbcAckStatus::Unknown, + }, + }; + Some(ack) + } + PacketMsg::Timeout(msg) => Some(IbcAck { + sequence_number: msg.packet.seq_on_a.to_string(), + source_port: msg.packet.port_id_on_a.to_string(), + dest_port: msg.packet.port_id_on_b.to_string(), + source_channel: msg.packet.chan_id_on_a.to_string(), + dest_channel: msg.packet.chan_id_on_b.to_string(), + status: IbcAckStatus::Timeout, + }), + PacketMsg::TimeoutOnClose(msg) => Some(IbcAck { + sequence_number: msg.packet.seq_on_a.to_string(), + source_port: msg.packet.port_id_on_a.to_string(), + dest_port: msg.packet.port_id_on_b.to_string(), + source_channel: msg.packet.chan_id_on_a.to_string(), + dest_channel: msg.packet.chan_id_on_b.to_string(), + status: IbcAckStatus::Timeout, + }), + }, + _ => None, + } + }, + _ => None + }, + _ => None, + }).collect() +} diff --git a/webserver/Cargo.toml b/webserver/Cargo.toml index f019ff58a..ecd86ee16 100644 --- a/webserver/Cargo.toml +++ b/webserver/Cargo.toml @@ -22,6 +22,7 @@ production = [] [dependencies] axum.workspace = true +chrono.workspace = true tokio.workspace = true tower.workspace = true tower-http.workspace = true diff --git a/webserver/src/app.rs b/webserver/src/app.rs index fcb26cd3d..b2e619844 100644 --- a/webserver/src/app.rs +++ b/webserver/src/app.rs @@ -19,10 +19,11 @@ use tower_http::trace::TraceLayer; use crate::appstate::AppState; use crate::config::AppConfig; use crate::handler::{ - balance as balance_handlers, chain as chain_handlers, - crawler_state as crawler_state_handlers, gas as gas_handlers, - governance as gov_handlers, pgf as pgf_service, pk as pk_handlers, - pos as pos_handlers, transaction as transaction_handlers, + balance as balance_handlers, block as block_handlers, + chain as chain_handlers, crawler_state as crawler_state_handlers, + gas as gas_handlers, governance as gov_handlers, ibc as ibc_handler, + pgf as pgf_service, pk as pk_handlers, pos as pos_handlers, + transaction as transaction_handlers, }; use crate::state::common::CommonState; @@ -133,6 +134,7 @@ impl ApplicationServer { "/chain/epoch/latest", get(chain_handlers::get_last_processed_epoch), ) + .route("/ibc/:tx_id/status", get(ibc_handler::get_ibc_status)) .route( "/pgf/payments", get(pgf_service::get_pgf_continuous_payments), @@ -147,6 +149,14 @@ impl ApplicationServer { ) // Server sent events endpoints .route("/chain/status", get(chain_handlers::chain_status)) + .route( + "/block/height/:value", + get(block_handlers::get_block_by_height), + ) + .route( + "/block/timestamp/:value", + get(block_handlers::get_block_by_timestamp), + ) .route( "/metrics", get(|| async move { metric_handle.render() }), @@ -163,7 +173,7 @@ impl ApplicationServer { .nest("/api/v1", routes) .merge(Router::new().route( "/health", - get(|| async { env!("VERGEN_GIT_SHA").to_string() }), + get(|| async { json!({"commit": env!("VERGEN_GIT_SHA").to_string(), "version": env!("CARGO_PKG_VERSION") }).to_string() }), )) .layer( ServiceBuilder::new() diff --git a/webserver/src/dto/pos.rs b/webserver/src/dto/pos.rs index 9a72bec3e..4a1a06978 100644 --- a/webserver/src/dto/pos.rs +++ b/webserver/src/dto/pos.rs @@ -64,15 +64,21 @@ pub enum MyValidatorKindDto { } #[derive(Clone, Serialize, Deserialize, Validate)] +#[serde(rename_all = "camelCase")] pub struct BondsDto { #[validate(range(min = 1, max = 10000))] pub page: Option, + #[validate(range(min = 0))] + pub active_at: Option, } #[derive(Clone, Serialize, Deserialize, Validate)] +#[serde(rename_all = "camelCase")] pub struct UnbondsDto { #[validate(range(min = 1, max = 10000))] pub page: Option, + #[validate(range(min = 0))] + pub active_at: Option, } #[derive(Clone, Serialize, Deserialize, Validate)] diff --git a/webserver/src/error/api.rs b/webserver/src/error/api.rs index 86335a524..e18d55e12 100644 --- a/webserver/src/error/api.rs +++ b/webserver/src/error/api.rs @@ -2,10 +2,12 @@ use axum::response::{IntoResponse, Response}; use thiserror::Error; use super::balance::BalanceError; +use super::block::BlockError; use super::chain::ChainError; use super::crawler_state::CrawlerStateError; use super::gas::GasError; use super::governance::GovernanceError; +use super::ibc::IbcError; use super::pgf::PgfError; use super::pos::PoSError; use super::revealed_pk::RevealedPkError; @@ -13,6 +15,8 @@ use super::transaction::TransactionError; #[derive(Error, Debug)] pub enum ApiError { + #[error(transparent)] + BlockError(#[from] BlockError), #[error(transparent)] TransactionError(#[from] TransactionError), #[error(transparent)] @@ -28,6 +32,8 @@ pub enum ApiError { #[error(transparent)] GasError(#[from] GasError), #[error(transparent)] + IbcError(#[from] IbcError), + #[error(transparent)] PgfError(#[from] PgfError), #[error(transparent)] CrawlerStateError(#[from] CrawlerStateError), @@ -36,6 +42,7 @@ pub enum ApiError { impl IntoResponse for ApiError { fn into_response(self) -> Response { match self { + ApiError::BlockError(error) => error.into_response(), ApiError::TransactionError(error) => error.into_response(), ApiError::ChainError(error) => error.into_response(), ApiError::PoSError(error) => error.into_response(), @@ -43,6 +50,7 @@ impl IntoResponse for ApiError { ApiError::GovernanceError(error) => error.into_response(), ApiError::RevealedPkError(error) => error.into_response(), ApiError::GasError(error) => error.into_response(), + ApiError::IbcError(error) => error.into_response(), ApiError::PgfError(error) => error.into_response(), ApiError::CrawlerStateError(error) => error.into_response(), } diff --git a/webserver/src/error/block.rs b/webserver/src/error/block.rs new file mode 100644 index 000000000..43648a432 --- /dev/null +++ b/webserver/src/error/block.rs @@ -0,0 +1,28 @@ +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use thiserror::Error; + +use crate::response::api::ApiErrorResponse; + +#[derive(Error, Debug)] +pub enum BlockError { + #[error("Block not found error at {0}: {1}")] + NotFound(String, String), + #[error("Database error: {0}")] + Database(String), + #[error("Unknown error: {0}")] + Unknown(String), +} + +impl IntoResponse for BlockError { + fn into_response(self) -> Response { + let status_code = match self { + BlockError::Unknown(_) | BlockError::Database(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } + BlockError::NotFound(_, _) => StatusCode::NOT_FOUND, + }; + + ApiErrorResponse::send(status_code.as_u16(), Some(self.to_string())) + } +} diff --git a/webserver/src/error/ibc.rs b/webserver/src/error/ibc.rs new file mode 100644 index 000000000..869d738ce --- /dev/null +++ b/webserver/src/error/ibc.rs @@ -0,0 +1,28 @@ +use axum::http::StatusCode; +use axum::response::IntoResponse; +use thiserror::Error; + +use crate::response::api::ApiErrorResponse; + +#[derive(Error, Debug)] +pub enum IbcError { + #[error("Revealed public key {0} not found")] + NotFound(u64), + #[error("Database error: {0}")] + Database(String), + #[error("Unknown error: {0}")] + Unknown(String), +} + +impl IntoResponse for IbcError { + fn into_response(self) -> axum::response::Response { + let status_code = match self { + IbcError::NotFound(_) => StatusCode::NOT_FOUND, + IbcError::Unknown(_) | IbcError::Database(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } + }; + + ApiErrorResponse::send(status_code.as_u16(), Some(self.to_string())) + } +} diff --git a/webserver/src/error/mod.rs b/webserver/src/error/mod.rs index 092b8e379..9b21bf09a 100644 --- a/webserver/src/error/mod.rs +++ b/webserver/src/error/mod.rs @@ -1,9 +1,11 @@ pub mod api; pub mod balance; +pub mod block; pub mod chain; pub mod crawler_state; pub mod gas; pub mod governance; +pub mod ibc; pub mod pgf; pub mod pos; pub mod revealed_pk; diff --git a/webserver/src/handler/block.rs b/webserver/src/handler/block.rs new file mode 100644 index 000000000..ed4bb01b3 --- /dev/null +++ b/webserver/src/handler/block.rs @@ -0,0 +1,30 @@ +use axum::extract::{Path, State}; +use axum::http::HeaderMap; +use axum::Json; +use axum_macros::debug_handler; + +use crate::error::api::ApiError; +use crate::response::block::Block; +use crate::state::common::CommonState; + +#[debug_handler] +pub async fn get_block_by_height( + _headers: HeaderMap, + Path(value): Path, + State(state): State, +) -> Result, ApiError> { + let block = state.block_service.get_block_by_height(value).await?; + + Ok(Json(block)) +} + +#[debug_handler] +pub async fn get_block_by_timestamp( + _headers: HeaderMap, + Path(value): Path, + State(state): State, +) -> Result, ApiError> { + let block = state.block_service.get_block_by_timestamp(value).await?; + + Ok(Json(block)) +} diff --git a/webserver/src/handler/ibc.rs b/webserver/src/handler/ibc.rs new file mode 100644 index 000000000..ecf8ff83d --- /dev/null +++ b/webserver/src/handler/ibc.rs @@ -0,0 +1,19 @@ +use axum::extract::{Path, State}; +use axum::http::HeaderMap; +use axum::Json; +use axum_macros::debug_handler; + +use crate::error::api::ApiError; +use crate::response::ibc::IbcAck; +use crate::state::common::CommonState; + +#[debug_handler] +pub async fn get_ibc_status( + _headers: HeaderMap, + Path(tx_id): Path, + State(state): State, +) -> Result, ApiError> { + let ibc_ack_status = state.ibc_service.get_ack_by_tx_id(tx_id).await?; + + Ok(Json(ibc_ack_status)) +} diff --git a/webserver/src/handler/mod.rs b/webserver/src/handler/mod.rs index 9b0dd7931..79aef8369 100644 --- a/webserver/src/handler/mod.rs +++ b/webserver/src/handler/mod.rs @@ -1,8 +1,10 @@ pub mod balance; +pub mod block; pub mod chain; pub mod crawler_state; pub mod gas; pub mod governance; +pub mod ibc; pub mod pgf; pub mod pk; pub mod pos; diff --git a/webserver/src/handler/pos.rs b/webserver/src/handler/pos.rs index d589c8ed1..a6b8e4c37 100644 --- a/webserver/src/handler/pos.rs +++ b/webserver/src/handler/pos.rs @@ -57,7 +57,7 @@ pub async fn get_bonds( let (bonds, total_pages, total_bonds) = state .pos_service - .get_bonds_by_address(address, page) + .get_bonds_by_address(address, page, query.active_at) .await?; let response = @@ -97,7 +97,7 @@ pub async fn get_unbonds( let (unbonds, total_pages, total_unbonds) = state .pos_service - .get_unbonds_by_address(address, page) + .get_unbonds_by_address(address, page, query.active_at) .await?; let response = diff --git a/webserver/src/repository/block.rs b/webserver/src/repository/block.rs new file mode 100644 index 000000000..cb13f4ee1 --- /dev/null +++ b/webserver/src/repository/block.rs @@ -0,0 +1,72 @@ +use axum::async_trait; +use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; +use orm::blocks::BlockDb; +use orm::schema::blocks; + +use crate::appstate::AppState; + +#[derive(Clone)] +pub struct BlockRepository { + pub(crate) app_state: AppState, +} + +#[async_trait] +pub trait BlockRepositoryTrait { + fn new(app_state: AppState) -> Self; + + async fn find_block_by_height( + &self, + height: i32, + ) -> Result, String>; + + async fn find_block_by_timestamp( + &self, + timestamp: i64, + ) -> Result, String>; +} + +#[async_trait] +impl BlockRepositoryTrait for BlockRepository { + fn new(app_state: AppState) -> Self { + Self { app_state } + } + + async fn find_block_by_height( + &self, + height: i32, + ) -> Result, String> { + let conn = self.app_state.get_db_connection().await; + + conn.interact(move |conn| { + blocks::table + .filter(blocks::dsl::height.eq(height)) + .select(BlockDb::as_select()) + .first(conn) + .ok() + }) + .await + .map_err(|e| e.to_string()) + } + + /// Gets the last block preceeding the given timestamp + async fn find_block_by_timestamp( + &self, + timestamp: i64, + ) -> Result, String> { + let conn = self.app_state.get_db_connection().await; + let timestamp = chrono::DateTime::from_timestamp(timestamp, 0) + .expect("Invalid timestamp") + .naive_utc(); + + conn.interact(move |conn| { + blocks::table + .filter(blocks::timestamp.le(timestamp)) + .order(blocks::timestamp.desc()) + .select(BlockDb::as_select()) + .first(conn) + .ok() + }) + .await + .map_err(|e| e.to_string()) + } +} diff --git a/webserver/src/repository/ibc.rs b/webserver/src/repository/ibc.rs new file mode 100644 index 000000000..e55ef4a60 --- /dev/null +++ b/webserver/src/repository/ibc.rs @@ -0,0 +1,45 @@ +use axum::async_trait; +use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; +use orm::ibc::IbcAckDb; +use orm::schema::ibc_ack; + +use crate::appstate::AppState; + +#[derive(Clone)] +pub struct IbcRepository { + pub(crate) app_state: AppState, +} + +#[async_trait] +pub trait IbcRepositoryTrait { + fn new(app_state: AppState) -> Self; + + async fn find_ibc_ack( + &self, + id: String, + ) -> Result, String>; +} + +#[async_trait] +impl IbcRepositoryTrait for IbcRepository { + fn new(app_state: AppState) -> Self { + Self { app_state } + } + + async fn find_ibc_ack( + &self, + id: String, + ) -> Result, String> { + let conn = self.app_state.get_db_connection().await; + + conn.interact(move |conn| { + ibc_ack::table + .filter(ibc_ack::dsl::tx_hash.eq(id)) + .select(IbcAckDb::as_select()) + .first(conn) + .ok() + }) + .await + .map_err(|e| e.to_string()) + } +} diff --git a/webserver/src/repository/mod.rs b/webserver/src/repository/mod.rs index 8153b5ddd..5d6ace094 100644 --- a/webserver/src/repository/mod.rs +++ b/webserver/src/repository/mod.rs @@ -1,7 +1,9 @@ pub mod balance; +pub mod block; pub mod chain; pub mod gas; pub mod governance; +pub mod ibc; pub mod pgf; pub mod pos; pub mod revealed_pk; diff --git a/webserver/src/repository/pos.rs b/webserver/src/repository/pos.rs index 327592369..e623b045f 100644 --- a/webserver/src/repository/pos.rs +++ b/webserver/src/repository/pos.rs @@ -60,12 +60,14 @@ pub trait PosRepositoryTrait { &self, address: String, page: i64, + active_at: Option, ) -> Result, String>; async fn find_unbonds_by_address( &self, address: String, page: i64, + active_at: Option, ) -> Result, String>; async fn find_merged_unbonds_by_address( @@ -183,12 +185,19 @@ impl PosRepositoryTrait for PosRepository { &self, address: String, page: i64, + active_at: Option, ) -> Result, String> { let conn = self.app_state.get_db_connection().await; conn.interact(move |conn| { - validators::table - .inner_join(bonds::table) + let mut query = + validators::table.inner_join(bonds::table).into_boxed(); + + if let Some(at) = active_at { + query = query.filter(bonds::dsl::start.le(at)); + } + + query .filter(bonds::dsl::address.eq(address)) .select((validators::all_columns, bonds::all_columns)) .paginate(page) @@ -235,12 +244,19 @@ impl PosRepositoryTrait for PosRepository { &self, address: String, page: i64, + active_at: Option, ) -> Result, String> { let conn = self.app_state.get_db_connection().await; conn.interact(move |conn| { - validators::table - .inner_join(unbonds::table) + let mut query = + validators::table.inner_join(unbonds::table).into_boxed(); + + if let Some(at) = active_at { + query = query.filter(unbonds::dsl::withdraw_epoch.lt(at)); + } + + query .filter(unbonds::dsl::address.eq(address)) .select((validators::all_columns, unbonds::all_columns)) .paginate(page) diff --git a/webserver/src/response/block.rs b/webserver/src/response/block.rs new file mode 100644 index 000000000..3ab9b51c0 --- /dev/null +++ b/webserver/src/response/block.rs @@ -0,0 +1,28 @@ +use orm::blocks::BlockDb; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Block { + pub height: i32, + pub hash: Option, + pub app_hash: Option, + pub timestamp: Option, + pub proposer: Option, + pub epoch: Option, +} + +impl From for Block { + fn from(block_db: BlockDb) -> Self { + Self { + height: block_db.height, + hash: block_db.hash, + app_hash: block_db.app_hash, + timestamp: block_db + .timestamp + .map(|t| t.and_utc().timestamp().to_string()), + proposer: block_db.proposer, + epoch: block_db.epoch.map(|e| e.to_string()), + } + } +} diff --git a/webserver/src/response/ibc.rs b/webserver/src/response/ibc.rs new file mode 100644 index 000000000..2277c4c42 --- /dev/null +++ b/webserver/src/response/ibc.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum IbcAckStatus { + Success, + Fail, + Timeout, + Unknown, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct IbcAck { + pub status: IbcAckStatus, +} diff --git a/webserver/src/response/mod.rs b/webserver/src/response/mod.rs index e00cfb59d..8215c6410 100644 --- a/webserver/src/response/mod.rs +++ b/webserver/src/response/mod.rs @@ -1,9 +1,11 @@ pub mod api; pub mod balance; +pub mod block; pub mod chain; pub mod crawler_state; pub mod gas; pub mod governance; +pub mod ibc; pub mod pgf; pub mod pos; pub mod revealed_pk; diff --git a/webserver/src/response/pos.rs b/webserver/src/response/pos.rs index 6073ad177..a07eb8d53 100644 --- a/webserver/src/response/pos.rs +++ b/webserver/src/response/pos.rs @@ -16,6 +16,9 @@ pub enum ValidatorState { BelowThreshold, Inactive, Jailed, + Deactivating, + Reactivating, + Unjailing, Unknown, } @@ -27,6 +30,9 @@ impl From for ValidatorState { ValidatorStateDb::BelowThreshold => Self::BelowThreshold, ValidatorStateDb::Inactive => Self::Inactive, ValidatorStateDb::Jailed => Self::Jailed, + ValidatorStateDb::Deactivating => Self::Deactivating, + ValidatorStateDb::Reactivating => Self::Reactivating, + ValidatorStateDb::Unjailing => Self::Unjailing, ValidatorStateDb::Unknown => Self::Unknown, } } diff --git a/webserver/src/response/transaction.rs b/webserver/src/response/transaction.rs index 56d9b9b01..46dfc6c1d 100644 --- a/webserver/src/response/transaction.rs +++ b/webserver/src/response/transaction.rs @@ -29,6 +29,9 @@ pub enum TransactionKind { RevealPk, IbcMsgTransfer, BecomeValidator, + DeactivateValidator, + ReactivateValidator, + UnjailValidator, Unknown, } @@ -39,6 +42,7 @@ pub struct WrapperTransaction { pub fee_payer: String, pub fee_token: String, pub gas_limit: String, + pub gas_used: Option, pub block_height: u64, pub inner_transactions: Vec, pub exit_code: TransactionResult, @@ -90,39 +94,26 @@ impl From for TransactionResult { impl From for TransactionKind { fn from(value: TransactionKindDb) -> Self { match value { - TransactionKindDb::TransparentTransfer => { - TransactionKind::TransparentTransfer - } - TransactionKindDb::ShieldedTransfer => { - TransactionKind::ShieldedTransfer - } - TransactionKindDb::ShieldingTransfer => { - TransactionKind::ShieldingTransfer - } - TransactionKindDb::UnshieldingTransfer => { - TransactionKind::UnshieldingTransfer - } - TransactionKindDb::Bond => TransactionKind::Bond, - TransactionKindDb::Redelegation => TransactionKind::Redelegation, - TransactionKindDb::Unbond => TransactionKind::Unbond, - TransactionKindDb::Withdraw => TransactionKind::Withdraw, - TransactionKindDb::ClaimRewards => TransactionKind::ClaimRewards, - TransactionKindDb::VoteProposal => TransactionKind::VoteProposal, - TransactionKindDb::InitProposal => TransactionKind::InitProposal, - TransactionKindDb::ChangeMetadata => { - TransactionKind::ChangeMetadata - } - TransactionKindDb::ChangeCommission => { - TransactionKind::ChangeCommission - } - TransactionKindDb::RevealPk => TransactionKind::RevealPk, - TransactionKindDb::Unknown => TransactionKind::Unknown, - TransactionKindDb::IbcMsgTransfer => { - TransactionKind::IbcMsgTransfer - } - TransactionKindDb::BecomeValidator => { - TransactionKind::BecomeValidator - } + TransactionKindDb::TransparentTransfer => Self::TransparentTransfer, + TransactionKindDb::ShieldedTransfer => Self::ShieldedTransfer, + TransactionKindDb::ShieldingTransfer => Self::ShieldingTransfer, + TransactionKindDb::UnshieldingTransfer => Self::UnshieldingTransfer, + TransactionKindDb::Bond => Self::Bond, + TransactionKindDb::Redelegation => Self::Redelegation, + TransactionKindDb::Unbond => Self::Unbond, + TransactionKindDb::Withdraw => Self::Withdraw, + TransactionKindDb::ClaimRewards => Self::ClaimRewards, + TransactionKindDb::VoteProposal => Self::VoteProposal, + TransactionKindDb::InitProposal => Self::InitProposal, + TransactionKindDb::ChangeMetadata => Self::ChangeMetadata, + TransactionKindDb::ChangeCommission => Self::ChangeCommission, + TransactionKindDb::RevealPk => Self::RevealPk, + TransactionKindDb::Unknown => Self::Unknown, + TransactionKindDb::IbcMsgTransfer => Self::IbcMsgTransfer, + TransactionKindDb::BecomeValidator => Self::BecomeValidator, + TransactionKindDb::ReactivateValidator => Self::ReactivateValidator, + TransactionKindDb::DeactivateValidator => Self::DeactivateValidator, + TransactionKindDb::UnjailValidator => Self::UnjailValidator, } } } @@ -134,6 +125,7 @@ impl From for WrapperTransaction { fee_payer: value.fee_payer, fee_token: value.fee_token, gas_limit: value.gas_limit, + gas_used: value.gas_used, block_height: value.block_height as u64, inner_transactions: vec![], exit_code: TransactionResult::from(value.exit_code), diff --git a/webserver/src/service/block.rs b/webserver/src/service/block.rs new file mode 100644 index 000000000..54c7d73e9 --- /dev/null +++ b/webserver/src/service/block.rs @@ -0,0 +1,52 @@ +use crate::appstate::AppState; +use crate::error::block::BlockError; +use crate::repository::block::{BlockRepository, BlockRepositoryTrait}; +use crate::response::block::Block; + +#[derive(Clone)] +pub struct BlockService { + block_repo: BlockRepository, +} + +impl BlockService { + pub fn new(app_state: AppState) -> Self { + Self { + block_repo: BlockRepository::new(app_state), + } + } + + pub async fn get_block_by_height( + &self, + height: i32, + ) -> Result { + let block = self + .block_repo + .find_block_by_height(height) + .await + .map_err(BlockError::Database)?; + let block = block.ok_or(BlockError::NotFound( + "height".to_string(), + height.to_string(), + ))?; + + Ok(Block::from(block)) + } + + pub async fn get_block_by_timestamp( + &self, + timestamp: i64, + ) -> Result { + let block = self + .block_repo + .find_block_by_timestamp(timestamp) + .await + .map_err(BlockError::Database)?; + + let block = block.ok_or(BlockError::NotFound( + "timestamp".to_string(), + timestamp.to_string(), + ))?; + + Ok(Block::from(block)) + } +} diff --git a/webserver/src/service/ibc.rs b/webserver/src/service/ibc.rs new file mode 100644 index 000000000..a322b3601 --- /dev/null +++ b/webserver/src/service/ibc.rs @@ -0,0 +1,42 @@ +use orm::ibc::IbcAckStatusDb; + +use crate::appstate::AppState; +use crate::error::ibc::IbcError; +use crate::repository::ibc::{IbcRepository, IbcRepositoryTrait}; +use crate::response::ibc::{IbcAck, IbcAckStatus}; + +#[derive(Clone)] +pub struct IbcService { + pub ibc_repo: IbcRepository, +} + +impl IbcService { + pub fn new(app_state: AppState) -> Self { + Self { + ibc_repo: IbcRepository::new(app_state), + } + } + + pub async fn get_ack_by_tx_id( + &self, + tx_id: String, + ) -> Result { + self.ibc_repo + .find_ibc_ack(tx_id) + .await + .map_err(IbcError::Database) + .map(|ack| match ack { + Some(ack) => IbcAck { + status: match ack.status { + IbcAckStatusDb::Unknown => IbcAckStatus::Unknown, + IbcAckStatusDb::Timeout => IbcAckStatus::Timeout, + IbcAckStatusDb::Fail => IbcAckStatus::Fail, + IbcAckStatusDb::Success => IbcAckStatus::Success, + }, + }, + None => IbcAck { + status: IbcAckStatus::Unknown, + }, + }) + } +} diff --git a/webserver/src/service/mod.rs b/webserver/src/service/mod.rs index bc433827f..6e61b6107 100644 --- a/webserver/src/service/mod.rs +++ b/webserver/src/service/mod.rs @@ -1,8 +1,10 @@ pub mod balance; +pub mod block; pub mod chain; pub mod crawler_state; pub mod gas; pub mod governance; +pub mod ibc; pub mod pgf; pub mod pos; pub mod revealed_pk; diff --git a/webserver/src/service/pos.rs b/webserver/src/service/pos.rs index 636dad679..a2c30cbad 100644 --- a/webserver/src/service/pos.rs +++ b/webserver/src/service/pos.rs @@ -102,6 +102,7 @@ impl PosService { &self, address: String, page: u64, + active_at: Option, ) -> Result<(Vec, u64, u64), PoSError> { let pos_state = self .pos_repo @@ -111,7 +112,7 @@ impl PosService { let (db_bonds, total_pages, total_items) = self .pos_repo - .find_bonds_by_address(address, page as i64) + .find_bonds_by_address(address, page as i64, active_at) .await .map_err(PoSError::Database)?; @@ -154,10 +155,11 @@ impl PosService { &self, address: String, page: u64, + active_at: Option, ) -> Result<(Vec, u64, u64), PoSError> { let (db_unbonds, total_pages, total_items) = self .pos_repo - .find_unbonds_by_address(address, page as i64) + .find_unbonds_by_address(address, page as i64, active_at) .await .map_err(PoSError::Database)?; diff --git a/webserver/src/state/common.rs b/webserver/src/state/common.rs index 7580d706b..01a24bb48 100644 --- a/webserver/src/state/common.rs +++ b/webserver/src/state/common.rs @@ -3,10 +3,12 @@ use namada_sdk::tendermint_rpc::HttpClient; use crate::appstate::AppState; use crate::config::AppConfig; use crate::service::balance::BalanceService; +use crate::service::block::BlockService; use crate::service::chain::ChainService; use crate::service::crawler_state::CrawlerStateService; use crate::service::gas::GasService; use crate::service::governance::GovernanceService; +use crate::service::ibc::IbcService; use crate::service::pgf::PgfService; use crate::service::pos::PosService; use crate::service::revealed_pk::RevealedPkService; @@ -15,6 +17,7 @@ use crate::service::transaction::TransactionService; #[derive(Clone)] pub struct CommonState { pub pos_service: PosService, + pub block_service: BlockService, pub gov_service: GovernanceService, pub balance_service: BalanceService, pub chain_service: ChainService, @@ -23,6 +26,7 @@ pub struct CommonState { pub transaction_service: TransactionService, pub pgf_service: PgfService, pub crawler_state_service: CrawlerStateService, + pub ibc_service: IbcService, pub client: HttpClient, pub config: AppConfig, } @@ -30,6 +34,7 @@ pub struct CommonState { impl CommonState { pub fn new(client: HttpClient, config: AppConfig, data: AppState) -> Self { Self { + block_service: BlockService::new(data.clone()), pos_service: PosService::new(data.clone()), gov_service: GovernanceService::new(data.clone()), balance_service: BalanceService::new(data.clone()), @@ -39,6 +44,7 @@ impl CommonState { pgf_service: PgfService::new(data.clone()), transaction_service: TransactionService::new(data.clone()), crawler_state_service: CrawlerStateService::new(data.clone()), + ibc_service: IbcService::new(data.clone()), client, config, }