Skip to content

Commit

Permalink
Merge pull request #216 from anoma/improve-rewards-fetching
Browse files Browse the repository at this point in the history
rewards: fetch rewards in batches
  • Loading branch information
Fraccaman authored Dec 16, 2024
2 parents cda97c1 + 9128561 commit e2ab703
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 22 deletions.
12 changes: 6 additions & 6 deletions chain/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub async fn query_balance(
})
})
.map(futures::future::ready)
.buffer_unordered(20)
.buffer_unordered(32)
.collect::<Vec<_>>()
.await)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ pub async fn query_bonds(
Some(bonds)
})
.map(futures::future::ready)
.buffer_unordered(20)
.buffer_unordered(32)
.collect::<Vec<_>>()
.await;

Expand Down Expand Up @@ -513,7 +513,7 @@ pub async fn query_unbonds(
}
})
.map(futures::future::ready)
.buffer_unordered(20)
.buffer_unordered(32)
.collect::<Vec<_>>()
.await;

Expand Down Expand Up @@ -573,7 +573,7 @@ pub async fn query_tallies(
Some((proposal, tally_type))
})
.map(futures::future::ready)
.buffer_unordered(20)
.buffer_unordered(32)
.collect::<Vec<_>>()
.await;

Expand Down Expand Up @@ -603,7 +603,7 @@ pub async fn query_all_votes(
Some(votes)
})
.map(futures::future::ready)
.buffer_unordered(20)
.buffer_unordered(32)
.collect::<Vec<_>>()
.await;

Expand Down Expand Up @@ -686,7 +686,7 @@ pub async fn get_validator_set_at_epoch(
state: validator_state
})
})
.buffer_unordered(100)
.buffer_unordered(32)
.try_collect::<HashSet<_>>()
.await?;

Expand Down
2 changes: 1 addition & 1 deletion governance/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub async fn get_governance_proposals_updates(
}
})
.map(futures::future::ready)
.buffer_unordered(20)
.buffer_unordered(32)
.collect::<Vec<_>>()
.await)
}
4 changes: 2 additions & 2 deletions pos/src/services/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub async fn get_validator_set_at_epoch(
state: validator_state
})
})
.buffer_unordered(100)
.buffer_unordered(32)
.try_collect::<HashSet<_>>()
.await?;

Expand Down Expand Up @@ -143,7 +143,7 @@ pub async fn get_validators_state(

anyhow::Ok(validator)
})
.buffer_unordered(100)
.buffer_unordered(32)
.try_collect::<HashSet<_>>()
.await?;

Expand Down
2 changes: 1 addition & 1 deletion rewards/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn crawling_fn(
return Err(MainError::NoAction);
}

tracing::info!("Starting to update proposals...");
tracing::info!("Starting to update pos rewards...");

// TODO: change this by querying all the pairs in the database
let delegations_pairs = namada_service::query_delegation_pairs(&client)
Expand Down
101 changes: 89 additions & 12 deletions rewards/src/services/namada.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::time::Duration;

use anyhow::Context;
use futures::StreamExt;
Expand Down Expand Up @@ -37,13 +38,97 @@ pub async fn query_rewards(
client: &HttpClient,
delegation_pairs: HashSet<DelegationPair>,
) -> anyhow::Result<Vec<Reward>> {
Ok(futures::stream::iter(delegation_pairs)
let mut all_rewards: Vec<Reward> = Vec::new();

let batches: Vec<(usize, Vec<DelegationPair>)> = delegation_pairs
.clone()
.into_iter()
.collect::<Vec<_>>()
.chunks(32)
.enumerate()
.map(|(i, chunk)| (i, chunk.to_vec()))
.collect();

tracing::info!(
"Got {} batches with a total of {} rewards to query...",
batches.len(),
delegation_pairs.len()
);

let results = futures::stream::iter(batches)
.map(|batch| process_batch_with_retries(client, batch))
.buffer_unordered(3)
.collect::<Vec<_>>()
.await;

tracing::info!("Done fetching rewards!");

for result in results {
match result {
Ok(mut rewards) => all_rewards.append(&mut rewards),
Err(err) => return Err(err),
}
}

Ok(all_rewards)
}

pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result<Epoch> {
let epoch = rpc::query_epoch(client)
.await
.context("Failed to query Namada's current epoch")?;

Ok(epoch.0 as Epoch)
}

async fn process_batch_with_retries(
client: &HttpClient,
batch: (usize, Vec<DelegationPair>),
) -> anyhow::Result<Vec<Reward>> {
let mut retries = 0;

tracing::info!("Processing batch {}", batch.0);
loop {
let result = process_batch(client, batch.1.clone()).await;

match result {
Ok(rewards) => {
tracing::info!("Batch {} done!", batch.0);
return Ok(rewards);
}
Err(err) => {
retries += 1;
tracing::warn!(
"Batch reward failed (attempt {}/{}) - Error: {:?}",
retries,
3,
err
);

if retries >= 3 {
tracing::error!(
"Batch reward failed after maximum retries."
);
return Err(err);
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
}

async fn process_batch(
client: &HttpClient,
batch: Vec<DelegationPair>,
) -> anyhow::Result<Vec<Reward>> {
Ok(futures::stream::iter(batch)
.filter_map(|delegation| async move {
tracing::info!(
tracing::debug!(
"Fetching rewards {} -> {} ...",
delegation.validator_address,
delegation.delegator_address
);

let reward = RPC
.vp()
.pos()
Expand All @@ -55,7 +140,7 @@ pub async fn query_rewards(
.await
.ok()?;

tracing::info!(
tracing::debug!(
"Done fetching reward for {} -> {}!",
delegation.validator_address,
delegation.delegator_address
Expand All @@ -67,15 +152,7 @@ pub async fn query_rewards(
})
})
.map(futures::future::ready)
.buffer_unordered(20)
.buffer_unordered(32)
.collect::<Vec<_>>()
.await)
}

pub async fn get_current_epoch(client: &HttpClient) -> anyhow::Result<Epoch> {
let epoch = rpc::query_epoch(client)
.await
.context("Failed to query Namada's current epoch")?;

Ok(epoch.0 as Epoch)
}

0 comments on commit e2ab703

Please sign in to comment.