From 8e7893f604b082e127b62e3a5ae3d7758e45817b Mon Sep 17 00:00:00 2001 From: Gianmarco Fraccaroli Date: Mon, 16 Dec 2024 13:33:04 +0100 Subject: [PATCH 1/3] fetch rewards in batches --- chain/src/services/namada.rs | 12 ++-- governance/src/services/namada.rs | 2 +- pos/src/services/namada.rs | 4 +- rewards/src/main.rs | 2 +- rewards/src/services/namada.rs | 96 +++++++++++++++++++++++++++---- 5 files changed, 94 insertions(+), 22 deletions(-) diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index 1ef4774c8..1c125bcda 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -110,7 +110,7 @@ pub async fn query_balance( }) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await) } @@ -440,7 +440,7 @@ pub async fn query_bonds( Some(bonds) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -513,7 +513,7 @@ pub async fn query_unbonds( } }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -573,7 +573,7 @@ pub async fn query_tallies( Some((proposal, tally_type)) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -603,7 +603,7 @@ pub async fn query_all_votes( Some(votes) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await; @@ -686,7 +686,7 @@ pub async fn get_validator_set_at_epoch( state: validator_state }) }) - .buffer_unordered(100) + .buffer_unordered(32) .try_collect::>() .await?; diff --git a/governance/src/services/namada.rs b/governance/src/services/namada.rs index d3849b214..3a9d48401 100644 --- a/governance/src/services/namada.rs +++ b/governance/src/services/namada.rs @@ -63,7 +63,7 @@ pub async fn get_governance_proposals_updates( } }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await) } diff --git a/pos/src/services/namada.rs b/pos/src/services/namada.rs index d3fa7af7c..fa18e9e4a 100644 --- a/pos/src/services/namada.rs +++ b/pos/src/services/namada.rs @@ -85,7 +85,7 @@ pub async fn get_validator_set_at_epoch( state: validator_state }) }) - .buffer_unordered(100) + .buffer_unordered(32) .try_collect::>() .await?; @@ -143,7 +143,7 @@ pub async fn get_validators_state( anyhow::Ok(validator) }) - .buffer_unordered(100) + .buffer_unordered(32) .try_collect::>() .await?; diff --git a/rewards/src/main.rs b/rewards/src/main.rs index a28e09f54..74a108c0c 100644 --- a/rewards/src/main.rs +++ b/rewards/src/main.rs @@ -81,7 +81,7 @@ async fn crawling_fn( return Err(MainError::NoAction); } - tracing::info!("Starting to update proposals..."); + tracing::info!("Starting to update pos rewards..."); // TODO: change this by querying all the pairs in the database let delegations_pairs = namada_service::query_delegation_pairs(&client) diff --git a/rewards/src/services/namada.rs b/rewards/src/services/namada.rs index af020d3fd..197e580b4 100644 --- a/rewards/src/services/namada.rs +++ b/rewards/src/services/namada.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::time::Duration; use anyhow::Context; use futures::StreamExt; @@ -37,13 +38,92 @@ pub async fn query_rewards( client: &HttpClient, delegation_pairs: HashSet, ) -> anyhow::Result> { - Ok(futures::stream::iter(delegation_pairs) + let mut all_rewards: Vec = Vec::new(); + + let batches: Vec> = delegation_pairs + .clone() + .into_iter() + .collect::>() + .chunks(32) + .map(|chunk| chunk.to_vec()) + .collect(); + + tracing::info!( + "Got {} batches with a total of {} rewards to query...", + batches.len(), + delegation_pairs.len() + ); + + let results = futures::stream::iter(batches) + .map(|batch| process_batch_with_retries(client, batch)) + .buffer_unordered(3) + .collect::>() + .await; + + for result in results { + match result { + Ok(mut rewards) => all_rewards.append(&mut rewards), + Err(err) => return Err(err) + } + } + + Ok(all_rewards) +} + +pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result { + let epoch = rpc::query_epoch(client) + .await + .context("Failed to query Namada's current epoch")?; + + Ok(epoch.0 as Epoch) +} + +async fn process_batch_with_retries( + client: &HttpClient, + batch: Vec, +) -> anyhow::Result> { + let mut retries = 0; + + loop { + let result = process_batch(client, batch.clone()).await; + + tracing::info!("Done batch..."); + + match result { + Ok(rewards) => return Ok(rewards), + Err(err) => { + retries += 1; + tracing::warn!( + "Batch reward failed (attempt {}/{}) - Error: {:?}", + retries, + 3, + err + ); + + if retries >= 3 { + tracing::error!( + "Batch reward failed after maximum retries." + ); + return Err(err); + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } +} + +async fn process_batch( + client: &HttpClient, + batch: Vec, +) -> anyhow::Result> { + Ok(futures::stream::iter(batch) .filter_map(|delegation| async move { - tracing::info!( + tracing::debug!( "Fetching rewards {} -> {} ...", delegation.validator_address, delegation.delegator_address ); + let reward = RPC .vp() .pos() @@ -55,7 +135,7 @@ pub async fn query_rewards( .await .ok()?; - tracing::info!( + tracing::debug!( "Done fetching reward for {} -> {}!", delegation.validator_address, delegation.delegator_address @@ -67,15 +147,7 @@ pub async fn query_rewards( }) }) .map(futures::future::ready) - .buffer_unordered(20) + .buffer_unordered(32) .collect::>() .await) } - -pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result { - let epoch = rpc::query_epoch(client) - .await - .context("Failed to query Namada's current epoch")?; - - Ok(epoch.0 as Epoch) -} From a99a85e14b65c2916ffdf6d7bfac247006952f6a Mon Sep 17 00:00:00 2001 From: Gianmarco Fraccaroli Date: Mon, 16 Dec 2024 14:21:23 +0100 Subject: [PATCH 2/3] minors --- rewards/src/services/namada.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rewards/src/services/namada.rs b/rewards/src/services/namada.rs index 197e580b4..ab9a42d04 100644 --- a/rewards/src/services/namada.rs +++ b/rewards/src/services/namada.rs @@ -60,6 +60,8 @@ pub async fn query_rewards( .collect::>() .await; + tracing::info!("Done fetching rewards!"); + for result in results { match result { Ok(mut rewards) => all_rewards.append(&mut rewards), @@ -87,8 +89,6 @@ async fn process_batch_with_retries( loop { let result = process_batch(client, batch.clone()).await; - tracing::info!("Done batch..."); - match result { Ok(rewards) => return Ok(rewards), Err(err) => { From 9128561a1184c9e12b91ac59e4a6a806a3f17b53 Mon Sep 17 00:00:00 2001 From: Mateusz Jasiuk Date: Mon, 16 Dec 2024 14:45:44 +0100 Subject: [PATCH 3/3] feat: print processed rewards batch --- rewards/src/services/namada.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/rewards/src/services/namada.rs b/rewards/src/services/namada.rs index ab9a42d04..06a5c225d 100644 --- a/rewards/src/services/namada.rs +++ b/rewards/src/services/namada.rs @@ -40,12 +40,13 @@ pub async fn query_rewards( ) -> anyhow::Result> { let mut all_rewards: Vec = Vec::new(); - let batches: Vec> = delegation_pairs + let batches: Vec<(usize, Vec)> = delegation_pairs .clone() .into_iter() .collect::>() .chunks(32) - .map(|chunk| chunk.to_vec()) + .enumerate() + .map(|(i, chunk)| (i, chunk.to_vec())) .collect(); tracing::info!( @@ -65,7 +66,7 @@ pub async fn query_rewards( for result in results { match result { Ok(mut rewards) => all_rewards.append(&mut rewards), - Err(err) => return Err(err) + Err(err) => return Err(err), } } @@ -82,15 +83,19 @@ pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result { async fn process_batch_with_retries( client: &HttpClient, - batch: Vec, + batch: (usize, Vec), ) -> anyhow::Result> { let mut retries = 0; + tracing::info!("Processing batch {}", batch.0); loop { - let result = process_batch(client, batch.clone()).await; + let result = process_batch(client, batch.1.clone()).await; match result { - Ok(rewards) => return Ok(rewards), + Ok(rewards) => { + tracing::info!("Batch {} done!", batch.0); + return Ok(rewards); + } Err(err) => { retries += 1; tracing::warn!(