diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index 1ef4774c8..1c125bcda 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -110,7 +110,7 @@ pub async fn query_balance( }) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await) } @@ -440,7 +440,7 @@ pub async fn query_bonds( Some(bonds) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -513,7 +513,7 @@ pub async fn query_unbonds( } }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -573,7 +573,7 @@ pub async fn query_tallies( Some((proposal, tally_type)) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -603,7 +603,7 @@ pub async fn query_all_votes( Some(votes) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -686,7 +686,7 @@ pub async fn get_validator_set_at_epoch( state: validator_state }) }) - .buffer_unordered(100) + .buffer_unordered(32) .try_collect::>() .await?; diff --git a/governance/src/services/namada.rs b/governance/src/services/namada.rs index d3849b214..3a9d48401 100644 --- a/governance/src/services/namada.rs +++ b/governance/src/services/namada.rs @@ -63,7 +63,7 @@ pub async fn get_governance_proposals_updates( } }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await) } diff --git a/pos/src/services/namada.rs b/pos/src/services/namada.rs index d3fa7af7c..fa18e9e4a 100644 --- a/pos/src/services/namada.rs +++ b/pos/src/services/namada.rs @@ -85,7 +85,7 @@ pub async fn get_validator_set_at_epoch( state: validator_state }) }) - .buffer_unordered(100) + .buffer_unordered(32) .try_collect::>() .await?; @@ -143,7 +143,7 @@ pub async fn get_validators_state( anyhow::Ok(validator) }) - .buffer_unordered(100) + .buffer_unordered(32) .try_collect::>() .await?; diff --git a/rewards/src/main.rs b/rewards/src/main.rs index a28e09f54..74a108c0c 100644 --- a/rewards/src/main.rs +++ b/rewards/src/main.rs @@ -81,7 +81,7 @@ async fn crawling_fn( return Err(MainError::NoAction); } - tracing::info!("Starting to update proposals..."); + tracing::info!("Starting to update pos rewards..."); // TODO: change this by querying all the pairs in the database let delegations_pairs = namada_service::query_delegation_pairs(&client) diff --git a/rewards/src/services/namada.rs b/rewards/src/services/namada.rs index af020d3fd..06a5c225d 100644 --- a/rewards/src/services/namada.rs +++ b/rewards/src/services/namada.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::time::Duration; use anyhow::Context; use futures::StreamExt; @@ -37,13 +38,97 @@ pub async fn query_rewards( client: &HttpClient, delegation_pairs: HashSet, ) -> anyhow::Result> { - Ok(futures::stream::iter(delegation_pairs) + let mut all_rewards: Vec = Vec::new(); + + let batches: Vec<(usize, Vec)> = delegation_pairs + .clone() + .into_iter() + .collect::>() + .chunks(32) + .enumerate() + .map(|(i, chunk)| (i, chunk.to_vec())) + .collect(); + + tracing::info!( + "Got {} batches with a total of {} rewards to query...", + batches.len(), + delegation_pairs.len() + ); + + let results = futures::stream::iter(batches) + .map(|batch| process_batch_with_retries(client, batch)) + .buffer_unordered(3) + .collect::>() + .await; + + tracing::info!("Done fetching rewards!"); + + for result in results { + match result { + Ok(mut rewards) => all_rewards.append(&mut rewards), + Err(err) => return Err(err), + } + } + + Ok(all_rewards) +} + +pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result { + let epoch = rpc::query_epoch(client) + .await + .context("Failed to query Namada's current epoch")?; + + Ok(epoch.0 as Epoch) +} + +async fn process_batch_with_retries( + client: &HttpClient, + batch: (usize, Vec), +) -> anyhow::Result> { + let mut retries = 0; + + tracing::info!("Processing batch {}", batch.0); + loop { + let result = process_batch(client, batch.1.clone()).await; + + match result { + Ok(rewards) => { + tracing::info!("Batch {} done!", batch.0); + return Ok(rewards); + } + Err(err) => { + retries += 1; + tracing::warn!( + "Batch reward failed (attempt {}/{}) - Error: {:?}", + retries, + 3, + err + ); + + if retries >= 3 { + tracing::error!( + "Batch reward failed after maximum retries." + ); + return Err(err); + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } +} + +async fn process_batch( + client: &HttpClient, + batch: Vec, +) -> anyhow::Result> { + Ok(futures::stream::iter(batch) .filter_map(|delegation| async move { - tracing::info!( + tracing::debug!( "Fetching rewards {} -> {} ...", delegation.validator_address, delegation.delegator_address ); + let reward = RPC .vp() .pos() @@ -55,7 +140,7 @@ pub async fn query_rewards( .await .ok()?; - tracing::info!( + tracing::debug!( "Done fetching reward for {} -> {}!", delegation.validator_address, delegation.delegator_address @@ -67,15 +152,7 @@ pub async fn query_rewards( }) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await) } - -pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result { - let epoch = rpc::query_epoch(client) - .await - .context("Failed to query Namada's current epoch")?; - - Ok(epoch.0 as Epoch) -}