Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backing: improve session buffering for runtime information #6284

Merged
merged 10 commits into from
Nov 13, 2024
19 changes: 16 additions & 3 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ struct State {
/// This is guaranteed to have an entry for each candidate with a relay parent in the implicit
/// or explicit view for which a `Seconded` statement has been successfully imported.
per_candidate: HashMap<CandidateHash, PerCandidateState>,
/// Cache the per-session validators.
validators_cache: LruMap<SessionIndex, Vec<ValidatorId>>,
sw10pa marked this conversation as resolved.
Show resolved Hide resolved
/// Cache the per-session Validator->Group mapping.
validator_to_group_cache:
LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
Expand All @@ -342,6 +344,7 @@ impl State {
per_leaf: HashMap::default(),
per_relay_parent: HashMap::default(),
per_candidate: HashMap::new(),
validators_cache: LruMap::new(ByLength::new(2)),
validator_to_group_cache: LruMap::new(ByLength::new(2)),
background_validation_tx,
keystore,
Expand Down Expand Up @@ -984,6 +987,7 @@ async fn handle_active_leaves_update<Context>(
ctx,
maybe_new,
&state.keystore,
&mut state.validators_cache,
&mut state.validator_to_group_cache,
mode,
)
Expand Down Expand Up @@ -1084,6 +1088,7 @@ async fn construct_per_relay_parent_state<Context>(
ctx: &mut Context,
relay_parent: Hash,
keystore: &KeystorePtr,
validators_cache: &mut LruMap<SessionIndex, Vec<ValidatorId>>,
validator_to_group_cache: &mut LruMap<
SessionIndex,
Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
Expand All @@ -1092,9 +1097,8 @@ async fn construct_per_relay_parent_state<Context>(
) -> Result<Option<PerRelayParentState>, Error> {
let parent = relay_parent;

let (session_index, validators, groups, cores) = futures::try_join!(
let (session_index, groups, cores) = futures::try_join!(
request_session_index_for_child(parent, ctx.sender()).await,
request_validators(parent, ctx.sender()).await,
request_validator_groups(parent, ctx.sender()).await,
request_from_runtime(parent, ctx.sender(), |tx| {
RuntimeApiRequest::AvailabilityCores(tx)
Expand All @@ -1105,6 +1109,16 @@ async fn construct_per_relay_parent_state<Context>(

let session_index = try_runtime_api!(session_index);

if validators_cache.get(&session_index).is_none() {
let validators = request_validators(parent, ctx.sender())
.await
.await
.map_err(Error::RuntimeApiUnavailable)?;
let validators: Vec<_> = try_runtime_api!(validators);
validators_cache.insert(session_index, validators);
}
let validators = validators_cache.get(&session_index).expect("Just inserted").clone();

let inject_core_index = request_node_features(parent, session_index, ctx.sender())
.await?
.unwrap_or(NodeFeatures::EMPTY)
Expand All @@ -1114,7 +1128,6 @@ async fn construct_per_relay_parent_state<Context>(

gum::debug!(target: LOG_TARGET, inject_core_index, ?parent, "New state");

let validators: Vec<_> = try_runtime_api!(validators);
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let cores = try_runtime_api!(cores);
let minimum_backing_votes =
Expand Down
77 changes: 41 additions & 36 deletions polkadot/node/core/backing/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub(crate) struct TestState {
chain_ids: Vec<ParaId>,
keystore: KeystorePtr,
validators: Vec<Sr25519Keyring>,
has_cached_validators: bool,
validator_public: Vec<ValidatorId>,
validation_data: PersistedValidationData,
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
Expand Down Expand Up @@ -157,6 +158,7 @@ impl Default for TestState {
chain_ids,
keystore,
validators,
has_cached_validators: false,
validator_public,
validator_groups: (validator_groups, group_rotation_info),
validator_to_group,
Expand Down Expand Up @@ -251,7 +253,7 @@ impl TestCandidateBuilder {
}

// Tests that the subsystem performs actions that are required on startup.
async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestState) {
async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &mut TestState) {
// Start work on some new parent.
virtual_overseer
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
Expand All @@ -278,16 +280,6 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
}
);

// Check that subsystem job issues a request for a validator set.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::Validators(tx))
) if parent == test_state.relay_parent => {
tx.send(Ok(test_state.validator_public.clone())).unwrap();
}
);

// Check that subsystem job issues a request for the validator groups.
assert_matches!(
virtual_overseer.recv().await,
Expand All @@ -308,6 +300,19 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
}
);

if !test_state.has_cached_validators {
// Check that subsystem job issues a request for a validator set.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::Validators(tx))
) if parent == test_state.relay_parent => {
tx.send(Ok(test_state.validator_public.clone())).unwrap();
}
);
test_state.has_cached_validators = true;
}

// Node features request from runtime: all features are disabled.
assert_matches!(
virtual_overseer.recv().await,
Expand Down Expand Up @@ -458,9 +463,9 @@ async fn assert_validate_from_exhaustive(
// and in case validation is successful issues a `StatementDistributionMessage`.
#[test]
fn backing_second_works() {
let test_state = TestState::default();
let mut test_state = TestState::default();
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd = dummy_pvd();
Expand Down Expand Up @@ -554,7 +559,7 @@ fn backing_works(#[case] elastic_scaling_mvp: bool) {
}

test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov_ab = PoV { block_data: BlockData(vec![1, 2, 3]) };
let pvd_ab = dummy_pvd();
Expand Down Expand Up @@ -772,7 +777,7 @@ fn get_backed_candidate_preserves_order() {
.insert(CoreIndex(2), [test_state.chain_ids[1]].into_iter().collect());

test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov_a = PoV { block_data: BlockData(vec![1, 2, 3]) };
let pov_b = PoV { block_data: BlockData(vec![3, 4, 5]) };
Expand Down Expand Up @@ -1171,9 +1176,9 @@ fn extract_core_index_from_statement_works() {

#[test]
fn backing_works_while_validation_ongoing() {
let test_state = TestState::default();
let mut test_state = TestState::default();
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov_abc = PoV { block_data: BlockData(vec![1, 2, 3]) };
let pvd_abc = dummy_pvd();
Expand Down Expand Up @@ -1366,9 +1371,9 @@ fn backing_works_while_validation_ongoing() {
// be a misbehavior.
#[test]
fn backing_misbehavior_works() {
let test_state = TestState::default();
let mut test_state = TestState::default();
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov_a = PoV { block_data: BlockData(vec![1, 2, 3]) };

Expand Down Expand Up @@ -1552,9 +1557,9 @@ fn backing_misbehavior_works() {
// can still second a valid one afterwards.
#[test]
fn backing_dont_second_invalid() {
let test_state = TestState::default();
let mut test_state = TestState::default();
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov_block_a = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd_a = dummy_pvd();
Expand Down Expand Up @@ -1712,9 +1717,9 @@ fn backing_dont_second_invalid() {
// candidate we will not be issuing a `Seconded` statement on it.
#[test]
fn backing_second_after_first_fails_works() {
let test_state = TestState::default();
let mut test_state = TestState::default();
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov_a = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd_a = dummy_pvd();
Expand Down Expand Up @@ -1858,9 +1863,9 @@ fn backing_second_after_first_fails_works() {
// the work of this subsystem and so it is not fatal to the node.
#[test]
fn backing_works_after_failed_validation() {
let test_state = TestState::default();
let mut test_state = TestState::default();
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov_a = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd_a = dummy_pvd();
Expand Down Expand Up @@ -2037,9 +2042,9 @@ fn candidate_backing_reorders_votes() {
#[test]
fn retry_works() {
// sp_tracing::try_init_simple();
let test_state = TestState::default();
let mut test_state = TestState::default();
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov_a = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd_a = dummy_pvd();
Expand Down Expand Up @@ -2221,10 +2226,10 @@ fn retry_works() {

#[test]
fn observes_backing_even_if_not_validator() {
let test_state = TestState::default();
let mut test_state = TestState::default();
let empty_keystore = Arc::new(sc_keystore::LocalKeystore::in_memory());
test_harness(empty_keystore, |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov = PoV { block_data: BlockData(vec![1, 2, 3]) };
let pvd = dummy_pvd();
Expand Down Expand Up @@ -2340,9 +2345,9 @@ fn observes_backing_even_if_not_validator() {
// without prospective parachains.
#[test]
fn cannot_second_multiple_candidates_per_parent() {
let test_state = TestState::default();
let mut test_state = TestState::default();
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd = dummy_pvd();
Expand Down Expand Up @@ -2478,13 +2483,13 @@ fn new_leaf_view_doesnt_clobber_old() {
let relay_parent_2 = Hash::repeat_byte(1);
assert_ne!(test_state.relay_parent, relay_parent_2);
test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

// New leaf that doesn't clobber old.
{
let old_relay_parent = test_state.relay_parent;
test_state.relay_parent = relay_parent_2;
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;
test_state.relay_parent = old_relay_parent;
}

Expand Down Expand Up @@ -2537,7 +2542,7 @@ fn disabled_validator_doesnt_distribute_statement_on_receiving_second() {
test_state.disabled_validators.push(ValidatorIndex(0));

test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd = dummy_pvd();
Expand Down Expand Up @@ -2585,7 +2590,7 @@ fn disabled_validator_doesnt_distribute_statement_on_receiving_statement() {
test_state.disabled_validators.push(ValidatorIndex(0));

test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd = dummy_pvd();
Expand Down Expand Up @@ -2647,7 +2652,7 @@ fn validator_ignores_statements_from_disabled_validators() {
test_state.disabled_validators.push(ValidatorIndex(2));

test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
test_startup(&mut virtual_overseer, &mut test_state).await;

let pov = PoV { block_data: BlockData(vec![42, 43, 44]) };
let pvd = dummy_pvd();
Expand Down
Loading
Loading