Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: store blocks while crawling transactions and chain #190

Merged
merged 4 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 98 additions & 48 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use shared::crawler_state::ChainCrawlerState;
use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError};
use shared::id::Id;
use shared::token::Token;
use shared::utils::BalanceChange;
use shared::validator::ValidatorSet;
use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse;
use tendermint_rpc::HttpClient;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
Expand Down Expand Up @@ -64,6 +66,7 @@ async fn main() -> Result<(), MainError> {
initial_query(
&client,
&conn,
checksums.clone(),
config.initial_query_retry_time,
config.initial_query_retry_attempts,
)
Expand Down Expand Up @@ -108,46 +111,15 @@ async fn crawling_fn(
return Err(MainError::NoAction);
}

tracing::debug!(block = block_height, "Query block...");
let tm_block_response =
tendermint_service::query_raw_block_at_height(&client, block_height)
.await
.into_rpc_error()?;
tracing::debug!(
block = block_height,
"Raw block contains {} txs...",
tm_block_response.block.data.len()
);

tracing::debug!(block = block_height, "Query block results...");
let tm_block_results_response =
tendermint_service::query_raw_block_results_at_height(
&client,
block_height,
)
.await
.into_rpc_error()?;
let block_results = BlockResult::from(tm_block_results_response);

tracing::debug!(block = block_height, "Query epoch...");
let epoch =
namada_service::get_epoch_at_block_height(&client, block_height)
.await
.into_rpc_error()?;

tracing::debug!(block = block_height, "Query first block in epoch...");
let first_block_in_epoch =
namada_service::get_first_block_in_epoch(&client)
.await
.into_rpc_error()?;

let block = Block::from(
tm_block_response,
&block_results,
checksums,
epoch,
block_height,
);
let (block, tm_block_response, epoch) =
get_block(block_height, &client, checksums).await?;

tracing::debug!(
block = block_height,
txs = block.transactions.len(),
Expand All @@ -167,13 +139,14 @@ async fn crawling_fn(

let addresses = block.addresses_with_balance_change(&native_token);

let block_proposer_address = namada_service::get_block_proposer_address(
&client,
&block,
&native_token,
)
.await
.into_rpc_error()?;
let block_proposer_address = block
.header
.proposer_address_namada
.as_ref()
.map(|address| BalanceChange {
address: Id::Account(address.clone()),
token: Token::Native(native_token.clone()),
});

let all_balance_changed_addresses = addresses
.iter()
Expand All @@ -191,8 +164,7 @@ async fn crawling_fn(

tracing::debug!(
block = block_height,
addresses = addresses.len(),
block_proposer_address = block_proposer_address.len(),
addresses = all_balance_changed_addresses.len(),
"Updating balance for {} addresses...",
all_balance_changed_addresses.len()
);
Expand Down Expand Up @@ -228,7 +200,10 @@ async fn crawling_fn(
};

let validators_state_change = block.update_validators_state();
tracing::debug!("Updating {} validators state", validators_state_change.len());
tracing::debug!(
"Updating {} validators state",
validators_state_change.len()
);

let addresses = block.bond_addresses();
let bonds = query_bonds(&client, addresses).await.into_rpc_error()?;
Expand Down Expand Up @@ -310,6 +285,12 @@ async fn crawling_fn(
ibc_tokens,
)?;

repository::block::upsert_block(
transaction_conn,
block,
tm_block_response,
)?;

repository::balance::insert_balances(
transaction_conn,
balances,
Expand Down Expand Up @@ -385,29 +366,35 @@ async fn crawling_fn(
async fn initial_query(
client: &HttpClient,
conn: &Object,
checksums: Checksums,
retry_time: u64,
retry_attempts: usize,
) -> Result<(), MainError> {
let retry_strategy = ExponentialBackoff::from_millis(retry_time)
.map(jitter)
.take(retry_attempts);
Retry::spawn(retry_strategy, || try_initial_query(client, conn)).await
Retry::spawn(retry_strategy, || {
try_initial_query(client, conn, checksums.clone())
})
.await
}

async fn try_initial_query(
client: &HttpClient,
conn: &Object,
checksums: Checksums,
) -> Result<(), MainError> {
tracing::debug!("Querying initial data...");
let block_height =
query_last_block_height(client).await.into_rpc_error()?;
let epoch = namada_service::get_epoch_at_block_height(client, block_height)
.await
.into_rpc_error()?;

let first_block_in_epoch = namada_service::get_first_block_in_epoch(client)
.await
.into_rpc_error()?;

let (block, tm_block_response, epoch) =
get_block(block_height, client, checksums.clone()).await?;

let tokens = query_tokens(client).await.into_rpc_error()?;

// This can sometimes fail if the last block height in the node has moved
Expand Down Expand Up @@ -467,6 +454,12 @@ async fn try_initial_query(
.run(|transaction_conn| {
repository::balance::insert_tokens(transaction_conn, tokens)?;

repository::block::upsert_block(
transaction_conn,
block,
tm_block_response,
)?;

tracing::debug!(
block = block_height,
"Inserting {} balances...",
Expand Down Expand Up @@ -543,3 +536,60 @@ async fn update_crawler_timestamp(
.and_then(identity)
.into_db_error()
}

async fn get_block(
block_height: u32,
client: &HttpClient,
checksums: Checksums,
) -> Result<(Block, TendermintBlockResponse, u32), MainError> {
tracing::debug!(block = block_height, "Query block...");
let tm_block_response =
tendermint_service::query_raw_block_at_height(client, block_height)
.await
.into_rpc_error()?;
tracing::debug!(
block = block_height,
"Raw block contains {} txs...",
tm_block_response.block.data.len()
);

tracing::debug!(block = block_height, "Query block results...");
let tm_block_results_response =
tendermint_service::query_raw_block_results_at_height(
client,
block_height,
)
.await
.into_rpc_error()?;
let block_results = BlockResult::from(tm_block_results_response);

tracing::debug!(block = block_height, "Query epoch...");
let epoch = namada_service::get_epoch_at_block_height(client, block_height)
.await
.into_rpc_error()?;

let proposer_address_namada = namada_service::get_validator_namada_address(
client,
&Id::from(&tm_block_response.block.header.proposer_address),
)
.await
.into_rpc_error()?;

tracing::info!(
block = block_height,
tm_address = tm_block_response.block.header.proposer_address.to_string(),
namada_address = ?proposer_address_namada,
"Got block proposer address"
);

let block = Block::from(
&tm_block_response,
&block_results,
&proposer_address_namada,
checksums,
epoch,
block_height,
);

Ok((block, tm_block_response, epoch))
}
34 changes: 34 additions & 0 deletions chain/src/repository/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ mod tests {
use namada_sdk::token::Amount as NamadaAmount;
use namada_sdk::uint::MAX_SIGNED_VALUE;
use orm::balances::BalanceDb;
use orm::blocks::BlockInsertDb;
use orm::schema::blocks;
use orm::views::balances;
use shared::balance::{Amount, Balance};
use shared::id::Id;
use shared::token::IbcToken;
use std::collections::HashSet;
use test_helpers::db::TestDb;

use super::*;
Expand Down Expand Up @@ -130,6 +133,8 @@ mod tests {

insert_tokens(conn, vec![token.clone()])?;

seed_blocks_from_balances(conn, &vec![balance.clone()])?;

insert_balances(conn, vec![balance.clone()])?;

let queried_balance = query_balance_by_address(conn, owner, token)?;
Expand Down Expand Up @@ -175,6 +180,7 @@ mod tests {
..(balance.clone())
};

seed_blocks_from_balances(conn, &vec![new_balance.clone()])?;
insert_balances(conn, vec![new_balance])?;

let queried_balance =
Expand Down Expand Up @@ -376,6 +382,8 @@ mod tests {

seed_tokens_from_balance(conn, fake_balances.clone())?;

seed_blocks_from_balances(conn, &fake_balances)?;

insert_balances(conn, fake_balances.clone())?;

assert_eq!(query_all_balances(conn)?.len(), fake_balances.len());
Expand Down Expand Up @@ -410,6 +418,7 @@ mod tests {

insert_tokens(conn, vec![token.clone()])?;

seed_blocks_from_balances(conn, &vec![balance.clone()])?;
insert_balances(conn, vec![balance.clone()])?;

let queried_balance = query_balance_by_address(conn, owner, token)?;
Expand Down Expand Up @@ -442,6 +451,8 @@ mod tests {

insert_tokens(conn, vec![token])?;

seed_blocks_from_balances(conn, &balances)?;

let res = insert_balances(conn, balances);

assert!(res.is_ok());
Expand Down Expand Up @@ -475,6 +486,8 @@ mod tests {

seed_tokens_from_balance(conn, balances.clone())?;

seed_blocks_from_balances(conn, &balances)?;

let res = insert_balances(conn, balances);

assert!(res.is_ok());
Expand All @@ -500,12 +513,33 @@ mod tests {
anyhow::Ok(())
}

fn seed_blocks_from_balances(
conn: &mut PgConnection,
balances: &Vec<Balance>,
) -> anyhow::Result<()> {
for height in balances
.into_iter()
.map(|balance| balance.height as i32)
.collect::<HashSet<_>>()
{
diesel::insert_into(blocks::table)
.values::<&BlockInsertDb>(&BlockInsertDb::fake(height))
.on_conflict_do_nothing()
.execute(conn)
.context("Failed to insert block in db")?;
}

anyhow::Ok(())
}

fn seed_balance(
conn: &mut PgConnection,
balances: Vec<Balance>,
) -> anyhow::Result<()> {
seed_tokens_from_balance(conn, balances.clone())?;

seed_blocks_from_balances(conn, &balances)?;

diesel::insert_into(balance_changes::table)
.values::<&Vec<BalanceChangesInsertDb>>(
&balances
Expand Down
32 changes: 32 additions & 0 deletions chain/src/repository/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use anyhow::Context;
use diesel::upsert::excluded;
use diesel::{ExpressionMethods, PgConnection, RunQueryDsl};
use orm::blocks::BlockInsertDb;
use orm::schema::blocks;
use shared::block::Block;
use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse;

pub fn upsert_block(
transaction_conn: &mut PgConnection,
block: Block,
tm_block_response: TendermintBlockResponse,
) -> anyhow::Result<()> {
diesel::insert_into(blocks::table)
.values::<&BlockInsertDb>(&BlockInsertDb::from((
block,
tm_block_response,
)))
.on_conflict(blocks::height)
.do_update()
.set((
blocks::hash.eq(excluded(blocks::hash)),
blocks::app_hash.eq(excluded(blocks::app_hash)),
blocks::timestamp.eq(excluded(blocks::timestamp)),
blocks::proposer.eq(excluded(blocks::proposer)),
blocks::epoch.eq(excluded(blocks::epoch)),
))
.execute(transaction_conn)
.context("Failed to insert block in db")?;

anyhow::Ok(())
}
1 change: 1 addition & 0 deletions chain/src/repository/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod balance;
pub mod block;
pub mod crawler_state;
pub mod gov;
pub mod pos;
Expand Down
Loading
Loading