From 6588900a5781c1660ae1b3b06e66548764b81336 Mon Sep 17 00:00:00 2001 From: Joel Nordell Date: Mon, 9 Dec 2024 17:02:47 -0600 Subject: [PATCH 1/4] enhancement: store blocks while crawling transactions and chain --- chain/src/main.rs | 8 ++- chain/src/repository/balance.rs | 34 +++++++++++ chain/src/repository/block.rs | 32 +++++++++++ chain/src/repository/mod.rs | 1 + orm/Cargo.toml | 1 + .../2024-12-09-225148_init_blocks/down.sql | 9 +++ .../2024-12-09-225148_init_blocks/up.sql | 40 +++++++++++++ orm/src/blocks.rs | 56 +++++++++++++++++++ orm/src/lib.rs | 1 + orm/src/schema.rs | 14 +++++ shared/src/balance.rs | 2 +- shared/src/block.rs | 4 +- shared/src/id.rs | 4 +- transactions/src/main.rs | 8 ++- transactions/src/repository/block.rs | 32 +++++++++++ transactions/src/repository/mod.rs | 1 + 16 files changed, 240 insertions(+), 7 deletions(-) create mode 100644 chain/src/repository/block.rs create mode 100644 orm/migrations/2024-12-09-225148_init_blocks/down.sql create mode 100644 orm/migrations/2024-12-09-225148_init_blocks/up.sql create mode 100644 orm/src/blocks.rs create mode 100644 transactions/src/repository/block.rs diff --git a/chain/src/main.rs b/chain/src/main.rs index 4fb5f071f..6734277ed 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -142,7 +142,7 @@ async fn crawling_fn( .into_rpc_error()?; let block = Block::from( - tm_block_response, + &tm_block_response, &block_results, checksums, epoch, @@ -310,6 +310,12 @@ async fn crawling_fn( ibc_tokens, )?; + repository::block::upsert_block( + transaction_conn, + block, + tm_block_response, + )?; + repository::balance::insert_balances( transaction_conn, balances, diff --git a/chain/src/repository/balance.rs b/chain/src/repository/balance.rs index 101d08722..c09bbdb5c 100644 --- a/chain/src/repository/balance.rs +++ b/chain/src/repository/balance.rs @@ -80,10 +80,13 @@ mod tests { use namada_sdk::token::Amount as NamadaAmount; use namada_sdk::uint::MAX_SIGNED_VALUE; use orm::balances::BalanceDb; + use orm::blocks::BlockInsertDb; + use orm::schema::blocks; use orm::views::balances; use shared::balance::{Amount, Balance}; use shared::id::Id; use shared::token::IbcToken; + use std::collections::HashSet; use test_helpers::db::TestDb; use super::*; @@ -130,6 +133,8 @@ mod tests { insert_tokens(conn, vec![token.clone()])?; + seed_blocks_from_balances(conn, &vec![balance.clone()])?; + insert_balances(conn, vec![balance.clone()])?; let queried_balance = query_balance_by_address(conn, owner, token)?; @@ -175,6 +180,7 @@ mod tests { ..(balance.clone()) }; + seed_blocks_from_balances(conn, &vec![new_balance.clone()])?; insert_balances(conn, vec![new_balance])?; let queried_balance = @@ -376,6 +382,8 @@ mod tests { seed_tokens_from_balance(conn, fake_balances.clone())?; + seed_blocks_from_balances(conn, &fake_balances)?; + insert_balances(conn, fake_balances.clone())?; assert_eq!(query_all_balances(conn)?.len(), fake_balances.len()); @@ -410,6 +418,7 @@ mod tests { insert_tokens(conn, vec![token.clone()])?; + seed_blocks_from_balances(conn, &vec![balance.clone()])?; insert_balances(conn, vec![balance.clone()])?; let queried_balance = query_balance_by_address(conn, owner, token)?; @@ -442,6 +451,8 @@ mod tests { insert_tokens(conn, vec![token])?; + seed_blocks_from_balances(conn, &balances)?; + let res = insert_balances(conn, balances); assert!(res.is_ok()); @@ -475,6 +486,8 @@ mod tests { seed_tokens_from_balance(conn, balances.clone())?; + seed_blocks_from_balances(conn, &balances)?; + let res = insert_balances(conn, balances); assert!(res.is_ok()); @@ -500,12 +513,33 @@ mod tests { anyhow::Ok(()) } + fn seed_blocks_from_balances( + conn: &mut PgConnection, + balances: &Vec, + ) -> anyhow::Result<()> { + for height in balances + .into_iter() + .map(|balance| balance.height as i32) + .collect::>() + { + diesel::insert_into(blocks::table) + .values::<&BlockInsertDb>(&BlockInsertDb::fake(height)) + .on_conflict_do_nothing() + .execute(conn) + .context("Failed to insert block in db")?; + } + + anyhow::Ok(()) + } + fn seed_balance( conn: &mut PgConnection, balances: Vec, ) -> anyhow::Result<()> { seed_tokens_from_balance(conn, balances.clone())?; + seed_blocks_from_balances(conn, &balances)?; + diesel::insert_into(balance_changes::table) .values::<&Vec>( &balances diff --git a/chain/src/repository/block.rs b/chain/src/repository/block.rs new file mode 100644 index 000000000..5420dd939 --- /dev/null +++ b/chain/src/repository/block.rs @@ -0,0 +1,32 @@ +use anyhow::Context; +use diesel::upsert::excluded; +use diesel::{ExpressionMethods, PgConnection, RunQueryDsl}; +use orm::blocks::BlockInsertDb; +use orm::schema::blocks; +use shared::block::Block; +use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse; + +pub fn upsert_block( + transaction_conn: &mut PgConnection, + block: Block, + tm_block_response: TendermintBlockResponse, +) -> anyhow::Result<()> { + diesel::insert_into(blocks::table) + .values::<&BlockInsertDb>(&BlockInsertDb::from(( + block, + tm_block_response, + ))) + .on_conflict(blocks::height) + .do_update() + .set(( + blocks::hash.eq(excluded(blocks::hash)), + blocks::app_hash.eq(excluded(blocks::app_hash)), + blocks::timestamp.eq(excluded(blocks::timestamp)), + blocks::proposer.eq(excluded(blocks::proposer)), + blocks::epoch.eq(excluded(blocks::epoch)), + )) + .execute(transaction_conn) + .context("Failed to insert block in db")?; + + anyhow::Ok(()) +} diff --git a/chain/src/repository/mod.rs b/chain/src/repository/mod.rs index efdb8fcdc..c5b0567b4 100644 --- a/chain/src/repository/mod.rs +++ b/chain/src/repository/mod.rs @@ -1,4 +1,5 @@ pub mod balance; +pub mod block; pub mod crawler_state; pub mod gov; pub mod pos; diff --git a/orm/Cargo.toml b/orm/Cargo.toml index 892fbd590..3d0234cf1 100644 --- a/orm/Cargo.toml +++ b/orm/Cargo.toml @@ -24,3 +24,4 @@ shared.workspace = true bigdecimal.workspace = true chrono.workspace = true serde_json.workspace = true +tendermint-rpc.workspace = true diff --git a/orm/migrations/2024-12-09-225148_init_blocks/down.sql b/orm/migrations/2024-12-09-225148_init_blocks/down.sql new file mode 100644 index 000000000..a57d19684 --- /dev/null +++ b/orm/migrations/2024-12-09-225148_init_blocks/down.sql @@ -0,0 +1,9 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE balance_changes + DROP CONSTRAINT fk_balance_changes_height; + +ALTER TABLE wrapper_transactions + DROP CONSTRAINT fk_wrapper_transactions_height; + +DROP TABLE IF EXISTS blocks; + diff --git a/orm/migrations/2024-12-09-225148_init_blocks/up.sql b/orm/migrations/2024-12-09-225148_init_blocks/up.sql new file mode 100644 index 000000000..2c519cce3 --- /dev/null +++ b/orm/migrations/2024-12-09-225148_init_blocks/up.sql @@ -0,0 +1,40 @@ +-- Your SQL goes here +CREATE TABLE blocks ( + height integer PRIMARY KEY, + hash VARCHAR(64), + app_hash varchar(64), + timestamp timestamp, + proposer varchar, + epoch int +); + +ALTER TABLE blocks + ADD UNIQUE (hash); + +CREATE INDEX index_blocks_epoch ON blocks (epoch); + +-- Populate null blocks for all existing wrapper_transactions and balance_changes to satisfy foreign key constraints +INSERT INTO blocks ( SELECT DISTINCT + height, + NULL::varchar AS hash, + NULL::varchar AS app_hash, + NULL::timestamp AS timestamp, + NULL::varchar AS proposer, + NULL::int AS epoch + FROM ( SELECT DISTINCT + block_height AS height + FROM + wrapper_transactions + UNION + SELECT DISTINCT + height + FROM + balance_changes)); + +-- Create foreign key constraints for wrapper_transactions and balance_changes +ALTER TABLE wrapper_transactions + ADD CONSTRAINT fk_wrapper_transactions_height FOREIGN KEY (block_height) REFERENCES blocks (height) ON DELETE RESTRICT; + +ALTER TABLE balance_changes + ADD CONSTRAINT fk_balance_changes_height FOREIGN KEY (height) REFERENCES blocks (height) ON DELETE RESTRICT; + diff --git a/orm/src/blocks.rs b/orm/src/blocks.rs new file mode 100644 index 000000000..e141ad175 --- /dev/null +++ b/orm/src/blocks.rs @@ -0,0 +1,56 @@ +use diesel::{Insertable, Queryable, Selectable}; +use shared::block::Block; +use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse; + +use crate::schema::blocks; + +#[derive(Insertable, Clone, Queryable, Selectable, Debug)] +#[diesel(table_name = blocks)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct BlockInsertDb { + pub height: i32, + pub hash: String, + pub app_hash: String, + pub timestamp: chrono::NaiveDateTime, + pub proposer: String, + pub epoch: i32, +} + +pub type BlockDb = BlockInsertDb; + +impl From<(Block, TendermintBlockResponse)> for BlockInsertDb { + fn from( + (block, tm_block_response): (Block, TendermintBlockResponse), + ) -> Self { + let timestamp = chrono::DateTime::from_timestamp( + tm_block_response.block.header.time.unix_timestamp(), + 0, + ) + .expect("Invalid timestamp") + .naive_utc(); + + Self { + height: block.header.height as i32, + hash: block.hash.to_string(), + app_hash: block.header.app_hash.to_string(), + timestamp, + proposer: block.header.proposer_address, + epoch: block.epoch as i32, + } + } +} + +impl BlockInsertDb { + pub fn fake(height: i32) -> Self { + Self { + height, + hash: height.to_string(), // fake hash but ensures uniqueness with height + app_hash: "fake_app_hash".to_string(), // doesn't require uniqueness + timestamp: chrono::DateTime::from_timestamp(0, 0) + .unwrap() + .naive_utc(), + proposer: "fake_proposer".to_string(), + epoch: 0, + } + } +} diff --git a/orm/src/lib.rs b/orm/src/lib.rs index 24d01b10b..340f5ae82 100644 --- a/orm/src/lib.rs +++ b/orm/src/lib.rs @@ -1,4 +1,5 @@ pub mod balances; +pub mod blocks; pub mod bond; pub mod crawler_state; pub mod gas; diff --git a/orm/src/schema.rs b/orm/src/schema.rs index 0fb2d3085..58c232d9b 100644 --- a/orm/src/schema.rs +++ b/orm/src/schema.rs @@ -101,6 +101,19 @@ diesel::table! { } } +diesel::table! { + blocks (height) { + height -> Int4, + #[max_length = 64] + hash -> Varchar, + #[max_length = 64] + app_hash -> Varchar, + timestamp -> Timestamp, + proposer -> Varchar, + epoch -> Int4, + } +} + diesel::table! { bonds (id) { id -> Int4, @@ -305,6 +318,7 @@ diesel::joinable!(unbonds -> validators (validator_id)); diesel::allow_tables_to_appear_in_same_query!( balance_changes, + blocks, bonds, chain_parameters, crawler_state, diff --git a/shared/src/balance.rs b/shared/src/balance.rs index 9912f5f95..9c96d9ba9 100644 --- a/shared/src/balance.rs +++ b/shared/src/balance.rs @@ -110,7 +110,7 @@ impl Balance { owner: Id::Account(address.to_string()), token, amount: Amount::fake(), - height: (0..10000).fake::(), + height: 0, } } } diff --git a/shared/src/block.rs b/shared/src/block.rs index 8d22d430e..2af89ec43 100644 --- a/shared/src/block.rs +++ b/shared/src/block.rs @@ -109,7 +109,7 @@ pub struct Block { impl Block { pub fn from( - block_response: TendermintBlockResponse, + block_response: &TendermintBlockResponse, block_results: &BlockResult, checksums: Checksums, epoch: Epoch, @@ -147,7 +147,7 @@ impl Block { .to_string() .to_lowercase(), timestamp: block_response.block.header.time.to_string(), - app_hash: Id::from(block_response.block.header.app_hash), + app_hash: Id::from(&block_response.block.header.app_hash), }, transactions, epoch, diff --git a/shared/src/id.rs b/shared/src/id.rs index 556cee83f..f598ca70c 100644 --- a/shared/src/id.rs +++ b/shared/src/id.rs @@ -46,8 +46,8 @@ impl From for Id { } } -impl From for Id { - fn from(value: TendermintAppHash) -> Self { +impl From<&TendermintAppHash> for Id { + fn from(value: &TendermintAppHash) -> Self { Self::Hash(value.to_string()) } } diff --git a/transactions/src/main.rs b/transactions/src/main.rs index f6bbfc65e..a3c1e3bda 100644 --- a/transactions/src/main.rs +++ b/transactions/src/main.rs @@ -15,6 +15,7 @@ use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; use tendermint_rpc::HttpClient; use transactions::app_state::AppState; use transactions::config::AppConfig; +use transactions::repository::block as block_repo; use transactions::repository::transactions as transaction_repo; use transactions::services::{ db as db_service, namada as namada_service, @@ -115,7 +116,7 @@ async fn crawling_fn( let block_results = BlockResult::from(tm_block_results_response); let block = Block::from( - tm_block_response.clone(), + &tm_block_response, &block_results, checksums, 1_u32, @@ -151,6 +152,11 @@ async fn crawling_fn( conn.build_transaction() .read_write() .run(|transaction_conn| { + block_repo::upsert_block( + transaction_conn, + block, + tm_block_response, + )?; transaction_repo::insert_wrapper_transactions( transaction_conn, wrapper_txs, diff --git a/transactions/src/repository/block.rs b/transactions/src/repository/block.rs new file mode 100644 index 000000000..5420dd939 --- /dev/null +++ b/transactions/src/repository/block.rs @@ -0,0 +1,32 @@ +use anyhow::Context; +use diesel::upsert::excluded; +use diesel::{ExpressionMethods, PgConnection, RunQueryDsl}; +use orm::blocks::BlockInsertDb; +use orm::schema::blocks; +use shared::block::Block; +use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse; + +pub fn upsert_block( + transaction_conn: &mut PgConnection, + block: Block, + tm_block_response: TendermintBlockResponse, +) -> anyhow::Result<()> { + diesel::insert_into(blocks::table) + .values::<&BlockInsertDb>(&BlockInsertDb::from(( + block, + tm_block_response, + ))) + .on_conflict(blocks::height) + .do_update() + .set(( + blocks::hash.eq(excluded(blocks::hash)), + blocks::app_hash.eq(excluded(blocks::app_hash)), + blocks::timestamp.eq(excluded(blocks::timestamp)), + blocks::proposer.eq(excluded(blocks::proposer)), + blocks::epoch.eq(excluded(blocks::epoch)), + )) + .execute(transaction_conn) + .context("Failed to insert block in db")?; + + anyhow::Ok(()) +} diff --git a/transactions/src/repository/mod.rs b/transactions/src/repository/mod.rs index 0824d7a9c..5ae69f54d 100644 --- a/transactions/src/repository/mod.rs +++ b/transactions/src/repository/mod.rs @@ -1 +1,2 @@ +pub mod block; pub mod transactions; From daac8191e76fe8c060b97f9021ba961205dafa5e Mon Sep 17 00:00:00 2001 From: Joel Nordell Date: Mon, 16 Dec 2024 14:12:51 -0600 Subject: [PATCH 2/4] update diesel schema for blocks --- orm/src/blocks.rs | 35 ++++++++++++++++++----------------- orm/src/schema.rs | 12 +++++++----- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/orm/src/blocks.rs b/orm/src/blocks.rs index e141ad175..7d40fae8c 100644 --- a/orm/src/blocks.rs +++ b/orm/src/blocks.rs @@ -9,11 +9,11 @@ use crate::schema::blocks; #[diesel(check_for_backend(diesel::pg::Pg))] pub struct BlockInsertDb { pub height: i32, - pub hash: String, - pub app_hash: String, - pub timestamp: chrono::NaiveDateTime, - pub proposer: String, - pub epoch: i32, + pub hash: Option, + pub app_hash: Option, + pub timestamp: Option, + pub proposer: Option, + pub epoch: Option, } pub type BlockDb = BlockInsertDb; @@ -31,11 +31,11 @@ impl From<(Block, TendermintBlockResponse)> for BlockInsertDb { Self { height: block.header.height as i32, - hash: block.hash.to_string(), - app_hash: block.header.app_hash.to_string(), - timestamp, - proposer: block.header.proposer_address, - epoch: block.epoch as i32, + hash: Some(block.hash.to_string()), + app_hash: Some(block.header.app_hash.to_string()), + timestamp: Some(timestamp), + proposer: Some(block.header.proposer_address), + epoch: Some(block.epoch as i32), } } } @@ -44,13 +44,14 @@ impl BlockInsertDb { pub fn fake(height: i32) -> Self { Self { height, - hash: height.to_string(), // fake hash but ensures uniqueness with height - app_hash: "fake_app_hash".to_string(), // doesn't require uniqueness - timestamp: chrono::DateTime::from_timestamp(0, 0) - .unwrap() - .naive_utc(), - proposer: "fake_proposer".to_string(), - epoch: 0, + hash: Some(height.to_string()), /* fake hash but ensures uniqueness + * with height */ + app_hash: Some("fake_app_hash".to_string()), // doesn't require uniqueness + timestamp: Some( + chrono::DateTime::from_timestamp(0, 0).unwrap().naive_utc(), + ), + proposer: Some("fake_proposer".to_string()), + epoch: Some(0), } } } diff --git a/orm/src/schema.rs b/orm/src/schema.rs index 58c232d9b..796a8d6ae 100644 --- a/orm/src/schema.rs +++ b/orm/src/schema.rs @@ -105,12 +105,12 @@ diesel::table! { blocks (height) { height -> Int4, #[max_length = 64] - hash -> Varchar, + hash -> Nullable, #[max_length = 64] - app_hash -> Varchar, - timestamp -> Timestamp, - proposer -> Varchar, - epoch -> Int4, + app_hash -> Nullable, + timestamp -> Nullable, + proposer -> Nullable, + epoch -> Nullable, } } @@ -308,6 +308,7 @@ diesel::table! { } } +diesel::joinable!(balance_changes -> blocks (height)); diesel::joinable!(balance_changes -> token (token)); diesel::joinable!(bonds -> validators (validator_id)); diesel::joinable!(governance_votes -> governance_proposals (proposal_id)); @@ -315,6 +316,7 @@ diesel::joinable!(ibc_token -> token (address)); diesel::joinable!(inner_transactions -> wrapper_transactions (wrapper_id)); diesel::joinable!(pos_rewards -> validators (validator_id)); diesel::joinable!(unbonds -> validators (validator_id)); +diesel::joinable!(wrapper_transactions -> blocks (block_height)); diesel::allow_tables_to_appear_in_same_query!( balance_changes, From 64d042a2c263d5f5002dc5d5404febb0581d1b64 Mon Sep 17 00:00:00 2001 From: Joel Nordell Date: Mon, 16 Dec 2024 14:35:42 -0600 Subject: [PATCH 3/4] upsert block before balances during initial_query --- chain/src/main.rs | 106 +++++++++++++++++++++++++-------------- transactions/src/main.rs | 5 +- 2 files changed, 70 insertions(+), 41 deletions(-) diff --git a/chain/src/main.rs b/chain/src/main.rs index 6734277ed..51a9b96c7 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -28,6 +28,7 @@ use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; use shared::id::Id; use shared::token::Token; use shared::validator::ValidatorSet; +use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse; use tendermint_rpc::HttpClient; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; @@ -64,6 +65,7 @@ async fn main() -> Result<(), MainError> { initial_query( &client, &conn, + checksums.clone(), config.initial_query_retry_time, config.initial_query_retry_attempts, ) @@ -108,46 +110,15 @@ async fn crawling_fn( return Err(MainError::NoAction); } - tracing::debug!(block = block_height, "Query block..."); - let tm_block_response = - tendermint_service::query_raw_block_at_height(&client, block_height) - .await - .into_rpc_error()?; - tracing::debug!( - block = block_height, - "Raw block contains {} txs...", - tm_block_response.block.data.len() - ); - - tracing::debug!(block = block_height, "Query block results..."); - let tm_block_results_response = - tendermint_service::query_raw_block_results_at_height( - &client, - block_height, - ) - .await - .into_rpc_error()?; - let block_results = BlockResult::from(tm_block_results_response); - - tracing::debug!(block = block_height, "Query epoch..."); - let epoch = - namada_service::get_epoch_at_block_height(&client, block_height) - .await - .into_rpc_error()?; - tracing::debug!(block = block_height, "Query first block in epoch..."); let first_block_in_epoch = namada_service::get_first_block_in_epoch(&client) .await .into_rpc_error()?; - let block = Block::from( - &tm_block_response, - &block_results, - checksums, - epoch, - block_height, - ); + let (block, tm_block_response, epoch) = + get_block(block_height, &client, checksums).await?; + tracing::debug!( block = block_height, txs = block.transactions.len(), @@ -228,7 +199,10 @@ async fn crawling_fn( }; let validators_state_change = block.update_validators_state(); - tracing::debug!("Updating {} validators state", validators_state_change.len()); + tracing::debug!( + "Updating {} validators state", + validators_state_change.len() + ); let addresses = block.bond_addresses(); let bonds = query_bonds(&client, addresses).await.into_rpc_error()?; @@ -391,29 +365,35 @@ async fn crawling_fn( async fn initial_query( client: &HttpClient, conn: &Object, + checksums: Checksums, retry_time: u64, retry_attempts: usize, ) -> Result<(), MainError> { let retry_strategy = ExponentialBackoff::from_millis(retry_time) .map(jitter) .take(retry_attempts); - Retry::spawn(retry_strategy, || try_initial_query(client, conn)).await + Retry::spawn(retry_strategy, || { + try_initial_query(client, conn, checksums.clone()) + }) + .await } async fn try_initial_query( client: &HttpClient, conn: &Object, + checksums: Checksums, ) -> Result<(), MainError> { tracing::debug!("Querying initial data..."); let block_height = query_last_block_height(client).await.into_rpc_error()?; - let epoch = namada_service::get_epoch_at_block_height(client, block_height) - .await - .into_rpc_error()?; + let first_block_in_epoch = namada_service::get_first_block_in_epoch(client) .await .into_rpc_error()?; + let (block, tm_block_response, epoch) = + get_block(block_height, client, checksums.clone()).await?; + let tokens = query_tokens(client).await.into_rpc_error()?; // This can sometimes fail if the last block height in the node has moved @@ -473,6 +453,12 @@ async fn try_initial_query( .run(|transaction_conn| { repository::balance::insert_tokens(transaction_conn, tokens)?; + repository::block::upsert_block( + transaction_conn, + block, + tm_block_response, + )?; + tracing::debug!( block = block_height, "Inserting {} balances...", @@ -549,3 +535,45 @@ async fn update_crawler_timestamp( .and_then(identity) .into_db_error() } + +async fn get_block( + block_height: u32, + client: &HttpClient, + checksums: Checksums, +) -> Result<(Block, TendermintBlockResponse, u32), MainError> { + tracing::debug!(block = block_height, "Query block..."); + let tm_block_response = + tendermint_service::query_raw_block_at_height(client, block_height) + .await + .into_rpc_error()?; + tracing::debug!( + block = block_height, + "Raw block contains {} txs...", + tm_block_response.block.data.len() + ); + + tracing::debug!(block = block_height, "Query block results..."); + let tm_block_results_response = + tendermint_service::query_raw_block_results_at_height( + client, + block_height, + ) + .await + .into_rpc_error()?; + let block_results = BlockResult::from(tm_block_results_response); + + tracing::debug!(block = block_height, "Query epoch..."); + let epoch = namada_service::get_epoch_at_block_height(client, block_height) + .await + .into_rpc_error()?; + + let block = Block::from( + &tm_block_response, + &block_results, + checksums, + epoch, + block_height, + ); + + Ok((block, tm_block_response, epoch)) +} diff --git a/transactions/src/main.rs b/transactions/src/main.rs index a3c1e3bda..8d3e491fc 100644 --- a/transactions/src/main.rs +++ b/transactions/src/main.rs @@ -15,8 +15,9 @@ use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; use tendermint_rpc::HttpClient; use transactions::app_state::AppState; use transactions::config::AppConfig; -use transactions::repository::block as block_repo; -use transactions::repository::transactions as transaction_repo; +use transactions::repository::{ + block as block_repo, transactions as transaction_repo, +}; use transactions::services::{ db as db_service, namada as namada_service, tendermint as tendermint_service, From 593ef204ab7f3545f44aea818091d36dc4ff52b3 Mon Sep 17 00:00:00 2001 From: Joel Nordell Date: Mon, 16 Dec 2024 17:22:43 -0600 Subject: [PATCH 4/4] Store the validator namada address on blocks instead of the tendermint address --- chain/src/main.rs | 34 +++++++++++++++++++++-------- chain/src/services/namada.rs | 33 +++++----------------------- orm/src/blocks.rs | 2 +- shared/src/block.rs | 6 ++++- shared/src/header.rs | 3 ++- transactions/src/main.rs | 16 ++++++++++++++ transactions/src/services/namada.rs | 13 +++++++++++ 7 files changed, 68 insertions(+), 39 deletions(-) diff --git a/chain/src/main.rs b/chain/src/main.rs index 51a9b96c7..c56fcf0cc 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -27,6 +27,7 @@ use shared::crawler_state::ChainCrawlerState; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; use shared::id::Id; use shared::token::Token; +use shared::utils::BalanceChange; use shared::validator::ValidatorSet; use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse; use tendermint_rpc::HttpClient; @@ -138,13 +139,14 @@ async fn crawling_fn( let addresses = block.addresses_with_balance_change(&native_token); - let block_proposer_address = namada_service::get_block_proposer_address( - &client, - &block, - &native_token, - ) - .await - .into_rpc_error()?; + let block_proposer_address = block + .header + .proposer_address_namada + .as_ref() + .map(|address| BalanceChange { + address: Id::Account(address.clone()), + token: Token::Native(native_token.clone()), + }); let all_balance_changed_addresses = addresses .iter() @@ -162,8 +164,7 @@ async fn crawling_fn( tracing::debug!( block = block_height, - addresses = addresses.len(), - block_proposer_address = block_proposer_address.len(), + addresses = all_balance_changed_addresses.len(), "Updating balance for {} addresses...", all_balance_changed_addresses.len() ); @@ -567,9 +568,24 @@ async fn get_block( .await .into_rpc_error()?; + let proposer_address_namada = namada_service::get_validator_namada_address( + client, + &Id::from(&tm_block_response.block.header.proposer_address), + ) + .await + .into_rpc_error()?; + + tracing::info!( + block = block_height, + tm_address = tm_block_response.block.header.proposer_address.to_string(), + namada_address = ?proposer_address_namada, + "Got block proposer address" + ); + let block = Block::from( &tm_block_response, &block_results, + &proposer_address_namada, checksums, epoch, block_height, diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index 95479b5bc..6912c1fbc 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -19,7 +19,7 @@ use namada_sdk::state::Key; use namada_sdk::token::Amount as NamadaSdkAmount; use namada_sdk::{borsh, rpc, token}; use shared::balance::{Amount, Balance, Balances}; -use shared::block::{Block, BlockHeight, Epoch}; +use shared::block::{BlockHeight, Epoch}; use shared::bond::{Bond, BondAddresses, Bonds}; use shared::id::Id; use shared::proposal::{GovernanceProposal, TallyType}; @@ -693,38 +693,17 @@ pub async fn get_validator_set_at_epoch( Ok(ValidatorSet { validators, epoch }) } -pub async fn get_block_proposer_address( +pub async fn get_validator_namada_address( client: &HttpClient, - block: &Block, - native_token: &Id, -) -> anyhow::Result> { + tm_addr: &Id, +) -> anyhow::Result> { let validator = RPC .vp() .pos() - .validator_by_tm_addr( - client, - &block.header.proposer_address.to_uppercase(), - ) + .validator_by_tm_addr(client, &tm_addr.to_string().to_uppercase()) .await?; - tracing::debug!( - block = block.header.height, - native_token = native_token.to_string(), - proposer_address = block.header.proposer_address, - namada_address = ?validator, - "Got block proposer address" - ); - - match validator { - Some(validator) => { - let balance_change = BalanceChange { - address: Id::from(validator), - token: Token::Native(native_token.clone()), - }; - Ok(std::iter::once(balance_change).collect()) - } - None => Ok(HashSet::new()), - } + Ok(validator.map(Id::from)) } pub async fn query_pipeline_length(client: &HttpClient) -> anyhow::Result { diff --git a/orm/src/blocks.rs b/orm/src/blocks.rs index 7d40fae8c..379c398bb 100644 --- a/orm/src/blocks.rs +++ b/orm/src/blocks.rs @@ -34,7 +34,7 @@ impl From<(Block, TendermintBlockResponse)> for BlockInsertDb { hash: Some(block.hash.to_string()), app_hash: Some(block.header.app_hash.to_string()), timestamp: Some(timestamp), - proposer: Some(block.header.proposer_address), + proposer: block.header.proposer_address_namada, epoch: Some(block.epoch as i32), } } diff --git a/shared/src/block.rs b/shared/src/block.rs index 2af89ec43..465ad40fe 100644 --- a/shared/src/block.rs +++ b/shared/src/block.rs @@ -111,6 +111,7 @@ impl Block { pub fn from( block_response: &TendermintBlockResponse, block_results: &BlockResult, + proposer_address_namada: &Option, // Provide the namada address of the proposer, if available checksums: Checksums, epoch: Epoch, block_height: BlockHeight, @@ -140,12 +141,15 @@ impl Block { header: BlockHeader { height: block_response.block.header.height.value() as BlockHeight, - proposer_address: block_response + proposer_address_tm: block_response .block .header .proposer_address .to_string() .to_lowercase(), + proposer_address_namada: proposer_address_namada + .as_ref() + .map(Id::to_string), timestamp: block_response.block.header.time.to_string(), app_hash: Id::from(&block_response.block.header.app_hash), }, diff --git a/shared/src/header.rs b/shared/src/header.rs index 792618895..decf3a9f9 100644 --- a/shared/src/header.rs +++ b/shared/src/header.rs @@ -4,7 +4,8 @@ use crate::block::BlockHeight; #[derive(Debug, Clone, Default)] pub struct BlockHeader { pub height: BlockHeight, - pub proposer_address: String, + pub proposer_address_tm: String, + pub proposer_address_namada: Option, pub timestamp: String, pub app_hash: Id, } diff --git a/transactions/src/main.rs b/transactions/src/main.rs index 8d3e491fc..36c9bc56d 100644 --- a/transactions/src/main.rs +++ b/transactions/src/main.rs @@ -12,6 +12,7 @@ use shared::checksums::Checksums; use shared::crawler::crawl; use shared::crawler_state::BlockCrawlerState; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; +use shared::id::Id; use tendermint_rpc::HttpClient; use transactions::app_state::AppState; use transactions::config::AppConfig; @@ -116,9 +117,24 @@ async fn crawling_fn( .into_rpc_error()?; let block_results = BlockResult::from(tm_block_results_response); + let proposer_address_namada = namada_service::get_validator_namada_address( + &client, + &Id::from(&tm_block_response.block.header.proposer_address), + ) + .await + .into_rpc_error()?; + + tracing::debug!( + block = block_height, + tm_address = tm_block_response.block.header.proposer_address.to_string(), + namada_address = ?proposer_address_namada, + "Got block proposer address" + ); + let block = Block::from( &tm_block_response, &block_results, + &proposer_address_namada, checksums, 1_u32, block_height, diff --git a/transactions/src/services/namada.rs b/transactions/src/services/namada.rs index 51d412202..cde1d642e 100644 --- a/transactions/src/services/namada.rs +++ b/transactions/src/services/namada.rs @@ -55,3 +55,16 @@ pub async fn query_tx_code_hash( None } } + +pub async fn get_validator_namada_address( + client: &HttpClient, + tm_addr: &Id, +) -> anyhow::Result> { + let validator = RPC + .vp() + .pos() + .validator_by_tm_addr(client, &tm_addr.to_string().to_uppercase()) + .await?; + + Ok(validator.map(Id::from)) +}