Skip to content

Commit

Permalink
Fixup data science logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Feb 17, 2022
1 parent 668b706 commit 28976f2
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 32 deletions.
5 changes: 3 additions & 2 deletions src/indexer_selection/indexers.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::prelude::*;
use std::sync::Arc;

pub struct IndexerDataReader {
pub url: Eventual<String>,
pub url: Eventual<Arc<String>>,
pub stake: Eventual<GRT>,
}

pub struct IndexerDataWriter {
pub url: EventualWriter<String>,
pub url: EventualWriter<Arc<String>>,
pub stake: EventualWriter<GRT>,
}

Expand Down
4 changes: 2 additions & 2 deletions src/indexer_selection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub use ordered_float::NotNan;
use prometheus;
use rand::{thread_rng, Rng as _};
pub use secp256k1::SecretKey;
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};
use utility::*;

pub type Context<'c> = cost_model::Context<'c, &'c str>;
Expand Down Expand Up @@ -149,7 +149,7 @@ pub struct UtilityConfig {

#[derive(Clone, Debug)]
pub struct IndexerScore {
pub url: String,
pub url: Arc<String>,
pub fee: GRT,
pub slashable: USD,
pub utility: NotNan<f64>,
Expand Down
4 changes: 2 additions & 2 deletions src/indexer_selection/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use plotters::{
};
use rand::{thread_rng, Rng as _};
use secp256k1::SecretKey;
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

#[derive(Clone)]
struct IndexerCharacteristics {
Expand Down Expand Up @@ -267,7 +267,7 @@ async fn run_simulation(
latest: latest.number,
});
let indexer_writer = input_writers.indexers.write(&indexing.indexer).await;
indexer_writer.url.write("".to_string());
indexer_writer.url.write(Arc::default());
indexer_writer.stake.write(data.stake);
if let Some(special_weight) = data.special_weight {
special_indexers.insert(indexing.indexer, special_weight.try_into().unwrap());
Expand Down
7 changes: 5 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ async fn handle_subgraph_query(
"handle_subgraph_query",
ray_id = %query.ray_id,
query_id = %query.id,
deployment = %query.subgraph.as_ref().unwrap().deployment,
%deployment,
network = %query.subgraph.as_ref().unwrap().network,
);
let api_key = request.match_info().get("api_key").unwrap_or("");
Expand All @@ -420,7 +420,7 @@ async fn handle_subgraph_query(
tracing::info!(
ray_id = %query.ray_id,
query_id = %query.id,
deployment = %query.subgraph.as_ref().unwrap().deployment,
%deployment,
network = %query.subgraph.as_ref().unwrap().network,
%api_key,
query = %query.query,
Expand All @@ -437,8 +437,11 @@ async fn handle_subgraph_query(
tracing::info!(
ray_id = %query.ray_id,
query_id = %query.id,
api_key = %api_key,
%deployment,
attempt_index,
indexer = %attempt.indexer,
url = %attempt.score.url,
allocation = %attempt.allocation,
fee = %attempt.score.fee,
utility = *attempt.score.utility,
Expand Down
39 changes: 17 additions & 22 deletions src/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,6 @@ where
.await;
selection_timer.map(|t| t.observe_duration());

match &selection_result {
Ok(None) => tracing::info!(err = ?NoIndexerSelected),
Err(err) => tracing::info!(?err),
_ => (),
};
let (indexer_query, scoring_sample) = match selection_result {
Ok(Some(indexer_query)) => indexer_query,
Ok(None) => return Err(NoIndexerSelected),
Expand Down Expand Up @@ -350,9 +345,12 @@ where
)
.await;
assert_eq!(retry_count + 1, query.indexer_attempts.len());
if let Ok(()) = result {
return Ok(());
}
match result {
Ok(()) => return Ok(()),
Err(rejection) => {
query.indexer_attempts.last_mut().unwrap().rejection = Some(rejection);
}
};
}
tracing::info!("retry limit reached");
Err(NoIndexerSelected)
Expand All @@ -369,6 +367,7 @@ where
query_id = %query.id,
deployment = %query.subgraph.as_ref().unwrap().deployment,
%indexer,
url = %score.url,
fee = %score.fee,
slashable = %score.slashable,
utility = *score.utility,
Expand Down Expand Up @@ -406,9 +405,8 @@ where
deployment_id: &str,
context: &mut Context<'_>,
block_resolver: &BlockResolver,
) -> Result<(), ()> {
) -> Result<(), String> {
let indexer_id = indexer_query.indexing.indexer.to_string();
tracing::info!(indexer = %indexer_id);
self.observe_indexer_selection_metrics(deployment_id, &indexer_query);
let t0 = Instant::now();
let result = self.indexer_client.query_indexer(&indexer_query).await;
Expand Down Expand Up @@ -440,7 +438,7 @@ where
&[&deployment_id, &indexer_id],
|counter| counter.inc(),
);
return Err(());
return Err(err.to_string());
}
};
with_metric(
Expand All @@ -451,18 +449,17 @@ where

let subgraph = query.subgraph.as_ref().unwrap();
if !subgraph.features.is_empty() && response.attestation.is_none() {
tracing::info!(indexer_response_err = "Attestable response has no attestation");
self.indexers
.observe_failed_query(
&indexer_query.indexing,
&result.receipt,
IndexerError::NoAttestation,
)
.await;
return Err(());
return Err("Attestable response has no attestation".into());
}

if let Err(remove_indexer) = self
if let Err(rejection) = self
.check_unattestable_responses(
context,
&block_resolver,
Expand All @@ -477,7 +474,7 @@ where
&[&deployment_id, &indexer_id],
|counter| counter.inc(),
);
return Err(remove_indexer);
return Err(rejection);
}

if let Some(attestation) = &response.attestation {
Expand Down Expand Up @@ -530,30 +527,28 @@ where
indexing: &Indexing,
receipt: &Receipt,
response: &IndexerResponse,
) -> Result<(), ()> {
) -> Result<(), String> {
// Special-casing for a few known indexer errors; the block scope here
// is just to separate the code neatly from the rest

let parsed_response =
serde_json::from_str::<Response<Box<RawValue>>>(&response.payload).map_err(|_| ())?;
let parsed_response = serde_json::from_str::<Response<Box<RawValue>>>(&response.payload)
.map_err(|_| "invalid indexer response")?;

if indexer_response_has_error(
&parsed_response,
"Failed to decode `block.hash` value: `no block with that hash found`",
) {
tracing::info!(indexer_response_err = "indexing behind");
self.indexers
.observe_indexing_behind(context, indexing, block_resolver)
.await;
return Err(());
return Err("indexer failed to resolve block".into());
}

if indexer_response_has_error(&parsed_response, "panic processing query") {
tracing::info!(indexer_response_err = "panic processing query");
self.indexers
.observe_failed_query(indexing, receipt, IndexerError::NondeterministicResponse)
.await;
return Err(());
return Err("indexer panicked processing query".into());
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/query_engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl Topology {
let stake_table = [0.0, 50e3, 100e3, 150e3];
for indexer in self.indexers.values() {
let indexer_writer = indexer_inputs.indexers.write(&indexer.id).await;
indexer_writer.url.write("".into());
indexer_writer.url.write(Arc::default());
indexer_writer
.stake
.write(indexer.staked_grt.as_udecimal(&stake_table));
Expand Down
2 changes: 1 addition & 1 deletion src/sync_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ fn handle_indexers(
let mut indexers = indexers.lock().await;
for (indexer, status) in statuses {
let indexer = indexers.write(&indexer).await;
indexer.url.write(status.url);
indexer.url.write(Arc::new(status.url));
indexer.stake.write(status.staked);
}
}
Expand Down

0 comments on commit 28976f2

Please sign in to comment.