Skip to content

Commit

Permalink
enhancement: store blocks while crawling transactions and chain
Browse files Browse the repository at this point in the history
  • Loading branch information
joel-u410 committed Dec 13, 2024
1 parent fec6b0e commit d6a58c9
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 7 deletions.
8 changes: 7 additions & 1 deletion chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async fn crawling_fn(
.into_rpc_error()?;

let block = Block::from(
tm_block_response,
&tm_block_response,
&block_results,
checksums,
epoch,
Expand Down Expand Up @@ -285,6 +285,12 @@ async fn crawling_fn(
ibc_tokens,
)?;

repository::block::upsert_block(
transaction_conn,
block,
tm_block_response,
)?;

repository::balance::insert_balances(
transaction_conn,
balances,
Expand Down
34 changes: 34 additions & 0 deletions chain/src/repository/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ mod tests {
use namada_sdk::token::Amount as NamadaAmount;
use namada_sdk::uint::MAX_SIGNED_VALUE;
use orm::balances::BalanceDb;
use orm::blocks::BlockInsertDb;
use orm::schema::blocks;
use orm::views::balances;
use shared::balance::{Amount, Balance};
use shared::id::Id;
use shared::token::IbcToken;
use std::collections::HashSet;
use test_helpers::db::TestDb;

use super::*;
Expand Down Expand Up @@ -130,6 +133,8 @@ mod tests {

insert_tokens(conn, vec![token.clone()])?;

seed_blocks_from_balances(conn, &vec![balance.clone()])?;

insert_balances(conn, vec![balance.clone()])?;

let queried_balance = query_balance_by_address(conn, owner, token)?;
Expand Down Expand Up @@ -175,6 +180,7 @@ mod tests {
..(balance.clone())
};

seed_blocks_from_balances(conn, &vec![new_balance.clone()])?;
insert_balances(conn, vec![new_balance])?;

let queried_balance =
Expand Down Expand Up @@ -376,6 +382,8 @@ mod tests {

seed_tokens_from_balance(conn, fake_balances.clone())?;

seed_blocks_from_balances(conn, &fake_balances)?;

insert_balances(conn, fake_balances.clone())?;

assert_eq!(query_all_balances(conn)?.len(), fake_balances.len());
Expand Down Expand Up @@ -410,6 +418,7 @@ mod tests {

insert_tokens(conn, vec![token.clone()])?;

seed_blocks_from_balances(conn, &vec![balance.clone()])?;
insert_balances(conn, vec![balance.clone()])?;

let queried_balance = query_balance_by_address(conn, owner, token)?;
Expand Down Expand Up @@ -442,6 +451,8 @@ mod tests {

insert_tokens(conn, vec![token])?;

seed_blocks_from_balances(conn, &balances)?;

let res = insert_balances(conn, balances);

assert!(res.is_ok());
Expand Down Expand Up @@ -475,6 +486,8 @@ mod tests {

seed_tokens_from_balance(conn, balances.clone())?;

seed_blocks_from_balances(conn, &balances)?;

let res = insert_balances(conn, balances);

assert!(res.is_ok());
Expand All @@ -500,12 +513,33 @@ mod tests {
anyhow::Ok(())
}

fn seed_blocks_from_balances(
conn: &mut PgConnection,
balances: &Vec<Balance>,
) -> anyhow::Result<()> {
for height in balances
.into_iter()
.map(|balance| balance.height as i32)
.collect::<HashSet<_>>()
{
diesel::insert_into(blocks::table)
.values::<&BlockInsertDb>(&BlockInsertDb::fake(height))
.on_conflict_do_nothing()
.execute(conn)
.context("Failed to insert block in db")?;
}

anyhow::Ok(())
}

fn seed_balance(
conn: &mut PgConnection,
balances: Vec<Balance>,
) -> anyhow::Result<()> {
seed_tokens_from_balance(conn, balances.clone())?;

seed_blocks_from_balances(conn, &balances)?;

diesel::insert_into(balance_changes::table)
.values::<&Vec<BalanceChangesInsertDb>>(
&balances
Expand Down
32 changes: 32 additions & 0 deletions chain/src/repository/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use anyhow::Context;
use diesel::upsert::excluded;
use diesel::{ExpressionMethods, PgConnection, RunQueryDsl};
use orm::blocks::BlockInsertDb;
use orm::schema::blocks;
use shared::block::Block;
use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse;

pub fn upsert_block(
transaction_conn: &mut PgConnection,
block: Block,
tm_block_response: TendermintBlockResponse,
) -> anyhow::Result<()> {
diesel::insert_into(blocks::table)
.values::<&BlockInsertDb>(&BlockInsertDb::from((
block,
tm_block_response,
)))
.on_conflict(blocks::height)
.do_update()
.set((
blocks::hash.eq(excluded(blocks::hash)),
blocks::app_hash.eq(excluded(blocks::app_hash)),
blocks::timestamp.eq(excluded(blocks::timestamp)),
blocks::proposer.eq(excluded(blocks::proposer)),
blocks::epoch.eq(excluded(blocks::epoch)),
))
.execute(transaction_conn)
.context("Failed to insert block in db")?;

anyhow::Ok(())
}
1 change: 1 addition & 0 deletions chain/src/repository/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod balance;
pub mod block;
pub mod crawler_state;
pub mod gov;
pub mod pos;
Expand Down
1 change: 1 addition & 0 deletions orm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ shared.workspace = true
bigdecimal.workspace = true
chrono.workspace = true
serde_json.workspace = true
tendermint-rpc.workspace = true
9 changes: 9 additions & 0 deletions orm/migrations/2024-12-09-225148_init_blocks/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file should undo anything in `up.sql`
ALTER TABLE balance_changes
DROP CONSTRAINT fk_balance_changes_height;

ALTER TABLE wrapper_transactions
DROP CONSTRAINT fk_wrapper_transactions_height;

DROP TABLE IF EXISTS blocks;

40 changes: 40 additions & 0 deletions orm/migrations/2024-12-09-225148_init_blocks/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Your SQL goes here
CREATE TABLE blocks (
height integer PRIMARY KEY,
hash VARCHAR(64),
app_hash varchar(64),
timestamp timestamp,
proposer varchar,
epoch int
);

ALTER TABLE blocks
ADD UNIQUE (hash);

CREATE INDEX index_blocks_epoch ON blocks (epoch);

-- Populate null blocks for all existing wrapper_transactions and balance_changes to satisfy foreign key constraints
INSERT INTO blocks ( SELECT DISTINCT
height,
NULL::varchar AS hash,
NULL::varchar AS app_hash,
NULL::timestamp AS timestamp,
NULL::varchar AS proposer,
NULL::int AS epoch
FROM ( SELECT DISTINCT
block_height AS height
FROM
wrapper_transactions
UNION
SELECT DISTINCT
height
FROM
balance_changes));

-- Create foreign key constraints for wrapper_transactions and balance_changes
ALTER TABLE wrapper_transactions
ADD CONSTRAINT fk_wrapper_transactions_height FOREIGN KEY (block_height) REFERENCES blocks (height) ON DELETE RESTRICT;

ALTER TABLE balance_changes
ADD CONSTRAINT fk_balance_changes_height FOREIGN KEY (height) REFERENCES blocks (height) ON DELETE RESTRICT;

56 changes: 56 additions & 0 deletions orm/src/blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use diesel::{Insertable, Queryable, Selectable};
use shared::block::Block;
use tendermint_rpc::endpoint::block::Response as TendermintBlockResponse;

use crate::schema::blocks;

#[derive(Insertable, Clone, Queryable, Selectable, Debug)]
#[diesel(table_name = blocks)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct BlockInsertDb {
pub height: i32,
pub hash: String,
pub app_hash: String,
pub timestamp: chrono::NaiveDateTime,
pub proposer: String,
pub epoch: i32,
}

pub type BlockDb = BlockInsertDb;

impl From<(Block, TendermintBlockResponse)> for BlockInsertDb {
fn from(
(block, tm_block_response): (Block, TendermintBlockResponse),
) -> Self {
let timestamp = chrono::DateTime::from_timestamp(
tm_block_response.block.header.time.unix_timestamp(),
0,
)
.expect("Invalid timestamp")
.naive_utc();

Self {
height: block.header.height as i32,
hash: block.hash.to_string(),
app_hash: block.header.app_hash.to_string(),
timestamp,
proposer: block.header.proposer_address,
epoch: block.epoch as i32,
}
}
}

impl BlockInsertDb {
pub fn fake(height: i32) -> Self {
Self {
height,
hash: height.to_string(), // fake hash but ensures uniqueness with height
app_hash: "fake_app_hash".to_string(), // doesn't require uniqueness
timestamp: chrono::DateTime::from_timestamp(0, 0)
.unwrap()
.naive_utc(),
proposer: "fake_proposer".to_string(),
epoch: 0,
}
}
}
1 change: 1 addition & 0 deletions orm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod balances;
pub mod blocks;
pub mod bond;
pub mod crawler_state;
pub mod gas;
Expand Down
14 changes: 14 additions & 0 deletions orm/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ diesel::table! {
}
}

diesel::table! {
blocks (height) {
height -> Int4,
#[max_length = 64]
hash -> Varchar,
#[max_length = 64]
app_hash -> Varchar,
timestamp -> Timestamp,
proposer -> Varchar,
epoch -> Int4,
}
}

diesel::table! {
bonds (id) {
id -> Int4,
Expand Down Expand Up @@ -289,6 +302,7 @@ diesel::joinable!(unbonds -> validators (validator_id));

diesel::allow_tables_to_appear_in_same_query!(
balance_changes,
blocks,
bonds,
chain_parameters,
crawler_state,
Expand Down
2 changes: 1 addition & 1 deletion shared/src/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl Balance {
owner: Id::Account(address.to_string()),
token,
amount: Amount::fake(),
height: (0..10000).fake::<u32>(),
height: 0,
}
}
}
4 changes: 2 additions & 2 deletions shared/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub struct Block {

impl Block {
pub fn from(
block_response: TendermintBlockResponse,
block_response: &TendermintBlockResponse,
block_results: &BlockResult,
checksums: Checksums,
epoch: Epoch,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl Block {
.to_string()
.to_lowercase(),
timestamp: block_response.block.header.time.to_string(),
app_hash: Id::from(block_response.block.header.app_hash),
app_hash: Id::from(&block_response.block.header.app_hash),
},
transactions,
epoch,
Expand Down
4 changes: 2 additions & 2 deletions shared/src/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ impl From<TendermintHash> for Id {
}
}

impl From<TendermintAppHash> for Id {
fn from(value: TendermintAppHash) -> Self {
impl From<&TendermintAppHash> for Id {
fn from(value: &TendermintAppHash) -> Self {
Self::Hash(value.to_string())
}
}
Expand Down
8 changes: 7 additions & 1 deletion transactions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError};
use tendermint_rpc::HttpClient;
use transactions::app_state::AppState;
use transactions::config::AppConfig;
use transactions::repository::block as block_repo;
use transactions::repository::transactions as transaction_repo;
use transactions::services::{
db as db_service, namada as namada_service,
Expand Down Expand Up @@ -115,7 +116,7 @@ async fn crawling_fn(
let block_results = BlockResult::from(tm_block_results_response);

let block = Block::from(
tm_block_response.clone(),
&tm_block_response,
&block_results,
checksums,
1_u32,
Expand Down Expand Up @@ -151,6 +152,11 @@ async fn crawling_fn(
conn.build_transaction()
.read_write()
.run(|transaction_conn| {
block_repo::upsert_block(
transaction_conn,
block,
tm_block_response,
)?;
transaction_repo::insert_wrapper_transactions(
transaction_conn,
wrapper_txs,
Expand Down
Loading

0 comments on commit d6a58c9

Please sign in to comment.