Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fetch validators out of consensus set #187

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ async fn main() -> Result<(), MainError> {
{
Some(crawler_state) => {
tracing::info!(
"Found chain crawler state, attempting initial crawl at block {}...",
crawler_state.last_processed_block
);
"Found chain crawler state, attempting initial crawl at block \
{}...",
crawler_state.last_processed_block
);

// Try to run crawler_fn with the last processed block
let crawl_result = crawling_fn(
Expand All @@ -84,28 +85,34 @@ async fn main() -> Result<(), MainError> {

match crawl_result {
Err(MainError::RpcError) => {
// If there was an RpcError, it likely means the block was pruned from the node.
// We need to do an initial_query in that case.
// If there was an RpcError, it likely means the block was
// pruned from the node. We need to do
// an initial_query in that case.
tracing::error!(
"Failed to query block {}, starting from initial_query ...",
"Failed to query block {}, starting from \
initial_query ...",
crawler_state.last_processed_block,
);
None
}
Err(_) => {
// If any other type of error occurred, we should not increment
// last_processed_block but crawl from there without initial_query
// If any other type of error occurred, we should not
// increment last_processed_block but
// crawl from there without initial_query
tracing::info!(
"Initial crawl had an error (not RpcError), continuing from block {}...",
"Initial crawl had an error (not RpcError), \
continuing from block {}...",
crawler_state.last_processed_block
);
Some(crawler_state)
}
Ok(_) => {
// If the crawl was successful, increment last_processed block and continue from there.
// If the crawl was successful, increment last_processed
// block and continue from there.
let next_block = crawler_state.last_processed_block + 1;
tracing::info!(
"Initial crawl was successful, continuing from block {}...",
"Initial crawl was successful, continuing from block \
{}...",
next_block
);
Some(ChainCrawlerState {
Expand Down Expand Up @@ -288,12 +295,15 @@ async fn crawling_fn(
proposals_votes.len()
);

let validators = block.validators();
let validators = block.new_validators();
let validator_set = ValidatorSet {
validators: validators.clone(),
epoch,
};

let validators_state_change = block.update_validators_state();
tracing::info!("{:?}", validators_state_change);

let addresses = block.bond_addresses();
let bonds = query_bonds(&client, addresses).await.into_rpc_error()?;
tracing::debug!(
Expand Down Expand Up @@ -392,6 +402,11 @@ async fn crawling_fn(
validator_set,
)?;

repository::pos::upsert_validator_state(
transaction_conn,
validators_state_change,
)?;

// We first remove all the bonds and then insert the new ones
repository::pos::clear_bonds(
transaction_conn,
Expand Down
31 changes: 29 additions & 2 deletions chain/src/repository/pos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ use orm::bond::BondInsertDb;
use orm::schema::{bonds, pos_rewards, unbonds, validators};
use orm::unbond::UnbondInsertDb;
use orm::validators::{
ValidatorDb, ValidatorUpdateMetadataDb, ValidatorWithMetaInsertDb,
ValidatorDb, ValidatorStateDb,
ValidatorUpdateMetadataDb, ValidatorWithMetaInsertDb,
};
use shared::block::Epoch;
use shared::bond::Bonds;
use shared::id::Id;
use shared::tuple_len::TupleLen;
use shared::unbond::{UnbondAddresses, Unbonds};
use shared::validator::{ValidatorMetadataChange, ValidatorSet};
use shared::validator::{
ValidatorMetadataChange, ValidatorSet, ValidatorStateChange,
};

use super::utils::MAX_PARAM_SIZE;

Expand Down Expand Up @@ -250,6 +253,30 @@ pub fn update_validator_metadata(
anyhow::Ok(())
}

pub fn upsert_validator_state(
transaction_conn: &mut PgConnection,
validators_states: HashSet<ValidatorStateChange>,
) -> anyhow::Result<()> {
for change in validators_states {
let state = ValidatorStateDb::from(change.state);
let validator_address = change.address.to_string();

diesel::update(
validators::table.filter(
validators::columns::namada_address.eq(validator_address),
),
)
.set(validators::columns::state.eq(state))
.execute(transaction_conn)
.context(format!(
"Failed to update validator state for {}",
change.address
))?;
}

Ok(())
}

pub fn upsert_validators(
transaction_conn: &mut PgConnection,
validators_set: ValidatorSet,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This file should undo anything in `up.sql`
4 changes: 4 additions & 0 deletions orm/migrations/2024-12-10-104502_transaction_types/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here
ALTER TYPE TRANSACTION_KIND ADD VALUE 'reactivate_validator';
ALTER TYPE TRANSACTION_KIND ADD VALUE 'deactivate_validator';
ALTER TYPE TRANSACTION_KIND ADD VALUE 'unjail_validator';
1 change: 1 addition & 0 deletions orm/migrations/2024-12-10-110059_validator_states/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This file should undo anything in `up.sql`
4 changes: 4 additions & 0 deletions orm/migrations/2024-12-10-110059_validator_states/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here
ALTER TYPE VALIDATOR_STATE ADD VALUE 'deactivating';
ALTER TYPE VALIDATOR_STATE ADD VALUE 'reactivating';
ALTER TYPE VALIDATOR_STATE ADD VALUE 'unjailing';
46 changes: 23 additions & 23 deletions orm/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,39 @@ pub enum TransactionKindDb {
ChangeCommission,
RevealPk,
BecomeValidator,
ReactivateValidator,
DeactivateValidator,
UnjailValidator,
Unknown,
}

impl From<TransactionKind> for TransactionKindDb {
fn from(value: TransactionKind) -> Self {
match value {
TransactionKind::TransparentTransfer(_) => {
TransactionKindDb::TransparentTransfer
Self::TransparentTransfer
}
TransactionKind::ShieldedTransfer(_) => {
TransactionKindDb::ShieldedTransfer
TransactionKind::ShieldedTransfer(_) => Self::ShieldedTransfer,
TransactionKind::IbcMsgTransfer(_) => Self::IbcMsgTransfer,
TransactionKind::Bond(_) => Self::Bond,
TransactionKind::Redelegation(_) => Self::Redelegation,
TransactionKind::Unbond(_) => Self::Unbond,
TransactionKind::Withdraw(_) => Self::Withdraw,
TransactionKind::ClaimRewards(_) => Self::ClaimRewards,
TransactionKind::ProposalVote(_) => Self::VoteProposal,
TransactionKind::InitProposal(_) => Self::InitProposal,
TransactionKind::MetadataChange(_) => Self::ChangeMetadata,
TransactionKind::CommissionChange(_) => Self::ChangeCommission,
TransactionKind::DeactivateValidator(_) => {
Self::DeactivateValidator
}
TransactionKind::IbcMsgTransfer(_) => {
TransactionKindDb::IbcMsgTransfer
TransactionKind::ReactivateValidator(_) => {
Self::ReactivateValidator
}
TransactionKind::Bond(_) => TransactionKindDb::Bond,
TransactionKind::Redelegation(_) => TransactionKindDb::Redelegation,
TransactionKind::Unbond(_) => TransactionKindDb::Unbond,
TransactionKind::Withdraw(_) => TransactionKindDb::Withdraw,
TransactionKind::ClaimRewards(_) => TransactionKindDb::ClaimRewards,
TransactionKind::ProposalVote(_) => TransactionKindDb::VoteProposal,
TransactionKind::InitProposal(_) => TransactionKindDb::InitProposal,
TransactionKind::MetadataChange(_) => {
TransactionKindDb::ChangeMetadata
}
TransactionKind::CommissionChange(_) => {
TransactionKindDb::ChangeCommission
}
TransactionKind::RevealPk(_) => TransactionKindDb::RevealPk,
TransactionKind::BecomeValidator(_) => {
TransactionKindDb::BecomeValidator
}
TransactionKind::Unknown(_) => TransactionKindDb::Unknown,
TransactionKind::RevealPk(_) => Self::RevealPk,
TransactionKind::BecomeValidator(_) => Self::BecomeValidator,
TransactionKind::UnjailValidator(_) => Self::UnjailValidator,
TransactionKind::Unknown(_) => Self::Unknown,
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions orm/src/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub enum ValidatorStateDb {
BelowThreshold,
Inactive,
Jailed,
Deactivating,
Reactivating,
Unjailing,
Unknown,
}

Expand All @@ -39,6 +42,9 @@ impl From<ValidatorState> for ValidatorStateDb {
ValidatorState::BelowThreshold => Self::BelowThreshold,
ValidatorState::Inactive => Self::Inactive,
ValidatorState::Jailed => Self::Jailed,
ValidatorState::Deactivating => Self::Deactivating,
ValidatorState::Reactivating => Self::Reactivating,
ValidatorState::Unjailing => Self::Unjailing,
ValidatorState::Unknown => Self::Unknown,
}
}
Expand Down Expand Up @@ -90,6 +96,14 @@ pub struct ValidatorWithMetaInsertDb {
pub state: ValidatorStateDb,
}

#[derive(Serialize, Insertable, Clone)]
#[diesel(table_name = validators)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct ValidatorStateChangeDb {
pub namada_address: String,
pub state: ValidatorStateDb,
}

#[derive(Serialize, AsChangeset, Clone)]
#[diesel(table_name = validators)]
#[diesel(check_for_backend(diesel::pg::Pg))]
Expand Down
1 change: 1 addition & 0 deletions pos/src/repository/pos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub fn upsert_validators(
.eq(excluded(validators::columns::max_commission)),
validators::columns::commission
.eq(excluded(validators::columns::commission)),
validators::columns::state.eq(excluded(validators::columns::state)),
))
.execute(transaction_conn)
.context("Failed to update validators in db")?;
Expand Down
59 changes: 59 additions & 0 deletions pos/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashSet;
use anyhow::Context;
use futures::{StreamExt, TryStreamExt};
use namada_core::chain::Epoch as NamadaSdkEpoch;
use namada_sdk::address::Address;
use namada_sdk::rpc;
use shared::block::Epoch;
use shared::id::Id;
Expand Down Expand Up @@ -91,6 +92,64 @@ pub async fn get_validator_set_at_epoch(
Ok(ValidatorSet { validators, epoch })
}

pub async fn get_validators_state(
client: &HttpClient,
validators: Vec<Validator>,
epoch: Epoch,
) -> anyhow::Result<ValidatorSet> {
let namada_epoch = to_epoch(epoch);

let validators = futures::stream::iter(validators)
.map(|mut validator| async move {
let validator_address = Address::from(validator.address.clone());
let validator_state = rpc::get_validator_state(
client,
&validator_address,
Some(namada_epoch),
)
.await
.with_context(|| {
format!("Failed to query validator {validator_address} state")
})?;

let validator_state = validator_state
.0
.map(ValidatorState::from)
.unwrap_or(ValidatorState::Unknown);

let from_unjailing_state =
validator.state.eq(&ValidatorState::Unjailing)
&& !validator_state.eq(&ValidatorState::Jailed);
let from_deactivating_state =
validator.state.eq(&ValidatorState::Deactivating)
&& validator_state.eq(&ValidatorState::Inactive);
let from_reactivating_state =
validator.state.eq(&ValidatorState::Reactivating)
&& !validator_state.eq(&ValidatorState::Inactive);
let from_concrete_state = ![
ValidatorState::Deactivating,
ValidatorState::Reactivating,
ValidatorState::Unjailing,
]
.contains(&validator.state);

if from_unjailing_state
|| from_deactivating_state
|| from_reactivating_state
|| from_concrete_state
{
validator.state = validator_state;
}

anyhow::Ok(validator)
})
.buffer_unordered(100)
.try_collect::<HashSet<_>>()
.await?;

Ok(ValidatorSet { validators, epoch })
}

pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result<Epoch> {
let epoch = rpc::query_epoch(client)
.await
Expand Down
34 changes: 32 additions & 2 deletions shared/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::transaction::{
};
use crate::unbond::UnbondAddresses;
use crate::utils::BalanceChange;
use crate::validator::{Validator, ValidatorMetadataChange, ValidatorState};
use crate::validator::{
Validator, ValidatorMetadataChange, ValidatorState, ValidatorStateChange,
};
use crate::vote::GovernanceVote;

pub type Epoch = u32;
Expand Down Expand Up @@ -506,7 +508,7 @@ impl Block {
Some(recv_msg)
}

pub fn validators(&self) -> HashSet<Validator> {
pub fn new_validators(&self) -> HashSet<Validator> {
self.transactions
.iter()
.flat_map(|(_, txs)| txs)
Expand Down Expand Up @@ -538,6 +540,34 @@ impl Block {
.collect()
}

pub fn update_validators_state(&self) -> HashSet<ValidatorStateChange> {
self.transactions
.iter()
.flat_map(|(_, txs)| txs)
.filter(|tx| {
tx.data.is_some()
&& tx.exit_code == TransactionExitStatus::Applied
})
.filter_map(|tx| match &tx.kind {
TransactionKind::DeactivateValidator(data) => {
let data = data.clone().unwrap();
Some(ValidatorStateChange {
address: Id::from(data),
state: ValidatorState::Deactivating,
})
}
TransactionKind::ReactivateValidator(data) => {
let data = data.clone().unwrap();
Some(ValidatorStateChange {
address: Id::from(data),
state: ValidatorState::Reactivating,
})
}
_ => None,
})
.collect()
}

pub fn bond_addresses(&self) -> HashSet<BondAddresses> {
self.transactions
.iter()
Expand Down
Loading
Loading