Skip to content

Commit

Permalink
Merge pull request #190 from joel-u410/joel/record-block-details
Browse files Browse the repository at this point in the history
enhancement: store blocks while crawling transactions and chain
  • Loading branch information
mateuszjasiuk authored Dec 17, 2024
2 parents 7cf71cf + 593ef20 commit e6ec420
Show file tree
Hide file tree
Showing 19 changed files with 378 additions and 84 deletions.
146 changes: 98 additions & 48 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use shared::crawler_state::ChainCrawlerState;
use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError};
use shared::id::Id;
use shared::token::Token;
use shared::utils::BalanceChange;
use shared::validator::ValidatorSet;
use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse;
use tendermint_rpc::HttpClient;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
Expand Down Expand Up @@ -64,6 +66,7 @@ async fn main() -> Result<(), MainError> {
initial_query(
&client,
&conn,
checksums.clone(),
config.initial_query_retry_time,
config.initial_query_retry_attempts,
)
Expand Down Expand Up @@ -108,46 +111,15 @@ async fn crawling_fn(
return Err(MainError::NoAction);
}

tracing::debug!(block = block_height, "Query block...");
let tm_block_response =
tendermint_service::query_raw_block_at_height(&client, block_height)
.await
.into_rpc_error()?;
tracing::debug!(
block = block_height,
"Raw block contains {} txs...",
tm_block_response.block.data.len()
);

tracing::debug!(block = block_height, "Query block results...");
let tm_block_results_response =
tendermint_service::query_raw_block_results_at_height(
&client,
block_height,
)
.await
.into_rpc_error()?;
let block_results = BlockResult::from(tm_block_results_response);

tracing::debug!(block = block_height, "Query epoch...");
let epoch =
namada_service::get_epoch_at_block_height(&client, block_height)
.await
.into_rpc_error()?;

tracing::debug!(block = block_height, "Query first block in epoch...");
let first_block_in_epoch =
namada_service::get_first_block_in_epoch(&client)
.await
.into_rpc_error()?;

let block = Block::from(
tm_block_response,
&block_results,
checksums,
epoch,
block_height,
);
let (block, tm_block_response, epoch) =
get_block(block_height, &client, checksums).await?;

tracing::debug!(
block = block_height,
txs = block.transactions.len(),
Expand All @@ -167,13 +139,14 @@ async fn crawling_fn(

let addresses = block.addresses_with_balance_change(&native_token);

let block_proposer_address = namada_service::get_block_proposer_address(
&client,
&block,
&native_token,
)
.await
.into_rpc_error()?;
let block_proposer_address = block
.header
.proposer_address_namada
.as_ref()
.map(|address| BalanceChange {
address: Id::Account(address.clone()),
token: Token::Native(native_token.clone()),
});

let all_balance_changed_addresses = addresses
.iter()
Expand All @@ -191,8 +164,7 @@ async fn crawling_fn(

tracing::debug!(
block = block_height,
addresses = addresses.len(),
block_proposer_address = block_proposer_address.len(),
addresses = all_balance_changed_addresses.len(),
"Updating balance for {} addresses...",
all_balance_changed_addresses.len()
);
Expand Down Expand Up @@ -228,7 +200,10 @@ async fn crawling_fn(
};

let validators_state_change = block.update_validators_state();
tracing::debug!("Updating {} validators state", validators_state_change.len());
tracing::debug!(
"Updating {} validators state",
validators_state_change.len()
);

let addresses = block.bond_addresses();
let bonds = query_bonds(&client, addresses).await.into_rpc_error()?;
Expand Down Expand Up @@ -310,6 +285,12 @@ async fn crawling_fn(
ibc_tokens,
)?;

repository::block::upsert_block(
transaction_conn,
block,
tm_block_response,
)?;

repository::balance::insert_balances(
transaction_conn,
balances,
Expand Down Expand Up @@ -385,29 +366,35 @@ async fn crawling_fn(
async fn initial_query(
client: &HttpClient,
conn: &Object,
checksums: Checksums,
retry_time: u64,
retry_attempts: usize,
) -> Result<(), MainError> {
let retry_strategy = ExponentialBackoff::from_millis(retry_time)
.map(jitter)
.take(retry_attempts);
Retry::spawn(retry_strategy, || try_initial_query(client, conn)).await
Retry::spawn(retry_strategy, || {
try_initial_query(client, conn, checksums.clone())
})
.await
}

async fn try_initial_query(
client: &HttpClient,
conn: &Object,
checksums: Checksums,
) -> Result<(), MainError> {
tracing::debug!("Querying initial data...");
let block_height =
query_last_block_height(client).await.into_rpc_error()?;
let epoch = namada_service::get_epoch_at_block_height(client, block_height)
.await
.into_rpc_error()?;

let first_block_in_epoch = namada_service::get_first_block_in_epoch(client)
.await
.into_rpc_error()?;

let (block, tm_block_response, epoch) =
get_block(block_height, client, checksums.clone()).await?;

let tokens = query_tokens(client).await.into_rpc_error()?;

// This can sometimes fail if the last block height in the node has moved
Expand Down Expand Up @@ -467,6 +454,12 @@ async fn try_initial_query(
.run(|transaction_conn| {
repository::balance::insert_tokens(transaction_conn, tokens)?;

repository::block::upsert_block(
transaction_conn,
block,
tm_block_response,
)?;

tracing::debug!(
block = block_height,
"Inserting {} balances...",
Expand Down Expand Up @@ -543,3 +536,60 @@ async fn update_crawler_timestamp(
.and_then(identity)
.into_db_error()
}

async fn get_block(
block_height: u32,
client: &HttpClient,
checksums: Checksums,
) -> Result<(Block, TendermintBlockResponse, u32), MainError> {
tracing::debug!(block = block_height, "Query block...");
let tm_block_response =
tendermint_service::query_raw_block_at_height(client, block_height)
.await
.into_rpc_error()?;
tracing::debug!(
block = block_height,
"Raw block contains {} txs...",
tm_block_response.block.data.len()
);

tracing::debug!(block = block_height, "Query block results...");
let tm_block_results_response =
tendermint_service::query_raw_block_results_at_height(
client,
block_height,
)
.await
.into_rpc_error()?;
let block_results = BlockResult::from(tm_block_results_response);

tracing::debug!(block = block_height, "Query epoch...");
let epoch = namada_service::get_epoch_at_block_height(client, block_height)
.await
.into_rpc_error()?;

let proposer_address_namada = namada_service::get_validator_namada_address(
client,
&Id::from(&tm_block_response.block.header.proposer_address),
)
.await
.into_rpc_error()?;

tracing::info!(
block = block_height,
tm_address = tm_block_response.block.header.proposer_address.to_string(),
namada_address = ?proposer_address_namada,
"Got block proposer address"
);

let block = Block::from(
&tm_block_response,
&block_results,
&proposer_address_namada,
checksums,
epoch,
block_height,
);

Ok((block, tm_block_response, epoch))
}
34 changes: 34 additions & 0 deletions chain/src/repository/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ mod tests {
use namada_sdk::token::Amount as NamadaAmount;
use namada_sdk::uint::MAX_SIGNED_VALUE;
use orm::balances::BalanceDb;
use orm::blocks::BlockInsertDb;
use orm::schema::blocks;
use orm::views::balances;
use shared::balance::{Amount, Balance};
use shared::id::Id;
use shared::token::IbcToken;
use std::collections::HashSet;
use test_helpers::db::TestDb;

use super::*;
Expand Down Expand Up @@ -130,6 +133,8 @@ mod tests {

insert_tokens(conn, vec![token.clone()])?;

seed_blocks_from_balances(conn, &vec![balance.clone()])?;

insert_balances(conn, vec![balance.clone()])?;

let queried_balance = query_balance_by_address(conn, owner, token)?;
Expand Down Expand Up @@ -175,6 +180,7 @@ mod tests {
..(balance.clone())
};

seed_blocks_from_balances(conn, &vec![new_balance.clone()])?;
insert_balances(conn, vec![new_balance])?;

let queried_balance =
Expand Down Expand Up @@ -376,6 +382,8 @@ mod tests {

seed_tokens_from_balance(conn, fake_balances.clone())?;

seed_blocks_from_balances(conn, &fake_balances)?;

insert_balances(conn, fake_balances.clone())?;

assert_eq!(query_all_balances(conn)?.len(), fake_balances.len());
Expand Down Expand Up @@ -410,6 +418,7 @@ mod tests {

insert_tokens(conn, vec![token.clone()])?;

seed_blocks_from_balances(conn, &vec![balance.clone()])?;
insert_balances(conn, vec![balance.clone()])?;

let queried_balance = query_balance_by_address(conn, owner, token)?;
Expand Down Expand Up @@ -442,6 +451,8 @@ mod tests {

insert_tokens(conn, vec![token])?;

seed_blocks_from_balances(conn, &balances)?;

let res = insert_balances(conn, balances);

assert!(res.is_ok());
Expand Down Expand Up @@ -475,6 +486,8 @@ mod tests {

seed_tokens_from_balance(conn, balances.clone())?;

seed_blocks_from_balances(conn, &balances)?;

let res = insert_balances(conn, balances);

assert!(res.is_ok());
Expand All @@ -500,12 +513,33 @@ mod tests {
anyhow::Ok(())
}

fn seed_blocks_from_balances(
conn: &mut PgConnection,
balances: &Vec<Balance>,
) -> anyhow::Result<()> {
for height in balances
.into_iter()
.map(|balance| balance.height as i32)
.collect::<HashSet<_>>()
{
diesel::insert_into(blocks::table)
.values::<&BlockInsertDb>(&BlockInsertDb::fake(height))
.on_conflict_do_nothing()
.execute(conn)
.context("Failed to insert block in db")?;
}

anyhow::Ok(())
}

fn seed_balance(
conn: &mut PgConnection,
balances: Vec<Balance>,
) -> anyhow::Result<()> {
seed_tokens_from_balance(conn, balances.clone())?;

seed_blocks_from_balances(conn, &balances)?;

diesel::insert_into(balance_changes::table)
.values::<&Vec<BalanceChangesInsertDb>>(
&balances
Expand Down
32 changes: 32 additions & 0 deletions chain/src/repository/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use anyhow::Context;
use diesel::upsert::excluded;
use diesel::{ExpressionMethods, PgConnection, RunQueryDsl};
use orm::blocks::BlockInsertDb;
use orm::schema::blocks;
use shared::block::Block;
use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse;

pub fn upsert_block(
transaction_conn: &mut PgConnection,
block: Block,
tm_block_response: TendermintBlockResponse,
) -> anyhow::Result<()> {
diesel::insert_into(blocks::table)
.values::<&BlockInsertDb>(&BlockInsertDb::from((
block,
tm_block_response,
)))
.on_conflict(blocks::height)
.do_update()
.set((
blocks::hash.eq(excluded(blocks::hash)),
blocks::app_hash.eq(excluded(blocks::app_hash)),
blocks::timestamp.eq(excluded(blocks::timestamp)),
blocks::proposer.eq(excluded(blocks::proposer)),
blocks::epoch.eq(excluded(blocks::epoch)),
))
.execute(transaction_conn)
.context("Failed to insert block in db")?;

anyhow::Ok(())
}
1 change: 1 addition & 0 deletions chain/src/repository/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod balance;
pub mod block;
pub mod crawler_state;
pub mod gov;
pub mod pos;
Expand Down
Loading

0 comments on commit e6ec420

Please sign in to comment.