Skip to content

Commit

Permalink
enhancement: Add --backfill-from option to force crawling from given …
Browse files Browse the repository at this point in the history
…height for chain & transactions
  • Loading branch information
joel-u410 committed Dec 10, 2024
1 parent b763c29 commit aefaf71
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 20 deletions.
6 changes: 6 additions & 0 deletions chain/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ pub struct AppConfig {
#[clap(long, env, default_value = "5")]
pub initial_query_retry_attempts: usize,

#[clap(
long,
help = "Crawl from given height and do not update crawler_state"
)]
pub backfill_from: Option<u32>,

#[clap(flatten)]
pub log: LogConfig,
}
36 changes: 26 additions & 10 deletions chain/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,22 @@ async fn main() -> Result<(), MainError> {
.into_db_error()?;

// See if we can start from existing crawler_state
let crawler_state = match db_service::try_get_chain_crawler_state(&conn)
.await
.into_db_error()?
{
Some(crawler_state) => {
let crawler_state = match (
config.backfill_from,
db_service::try_get_chain_crawler_state(&conn)
.await
.into_db_error()?,
) {
(Some(height), _) => {
tracing::warn!("Backfilling from block height {}", height);
Some(ChainCrawlerState {
last_processed_block: height,
last_processed_epoch: 0,
first_block_in_epoch: 0,
timestamp: 0,
})
}
(None, Some(crawler_state)) => {
tracing::info!(
"Found chain crawler state, attempting initial crawl at block {}...",
crawler_state.last_processed_block
Expand All @@ -79,6 +90,7 @@ async fn main() -> Result<(), MainError> {
client.clone(),
conn.clone(),
checksums.clone(),
true,
)
.await;

Expand Down Expand Up @@ -115,7 +127,7 @@ async fn main() -> Result<(), MainError> {
}
}
}
None => {
(None, None) => {
tracing::info!(
"No chain crawler state found, starting from initial_query..."
);
Expand Down Expand Up @@ -148,6 +160,7 @@ async fn main() -> Result<(), MainError> {
client.clone(),
conn.clone(),
checksums.clone(),
config.backfill_from.is_none(),
)
},
crawler_state.last_processed_block,
Expand All @@ -161,6 +174,7 @@ async fn crawling_fn(
client: Arc<HttpClient>,
conn: Arc<Object>,
checksums: Checksums,
should_update_crawler_state: bool,
) -> Result<(), MainError> {
let should_process = can_process(block_height, client.clone()).await?;

Expand Down Expand Up @@ -421,10 +435,12 @@ async fn crawling_fn(
revealed_pks,
)?;

repository::crawler_state::upsert_crawler_state(
transaction_conn,
crawler_state,
)?;
if should_update_crawler_state {
repository::crawler_state::upsert_crawler_state(
transaction_conn,
crawler_state,
)?;
}

anyhow::Ok(())
})
Expand Down
6 changes: 6 additions & 0 deletions transactions/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ pub struct AppConfig {
#[clap(long, env, default_value_t = 1)]
pub from_block_height: u32,

#[clap(
long,
help = "Crawl from given height and do not update crawler_state"
)]
pub backfill_from: Option<u32>,

#[clap(long, env)]
pub database_url: String,

Expand Down
31 changes: 21 additions & 10 deletions transactions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@ async fn main() -> Result<(), MainError> {

let crawler_state = db_service::get_crawler_state(&conn).await;

let next_block = std::cmp::max(
crawler_state
.map(|cs| cs.last_processed_block + 1)
.unwrap_or(1),
config.from_block_height,
);
let next_block = match config.backfill_from {
Some(height) => {
tracing::warn!("Backfilling from block height {}", height);
height
}
None => std::cmp::max(
crawler_state
.map(|cs| cs.last_processed_block + 1)
.unwrap_or(1),
config.from_block_height,
),
};

crawl(
move |block_height| {
Expand All @@ -65,6 +71,7 @@ async fn main() -> Result<(), MainError> {
client.clone(),
conn.clone(),
checksums.clone(),
config.backfill_from.is_none(),
)
},
next_block,
Expand All @@ -78,6 +85,7 @@ async fn crawling_fn(
client: Arc<HttpClient>,
conn: Arc<Object>,
checksums: Checksums,
should_update_crawler_state: bool,
) -> Result<(), MainError> {
let should_process = can_process(block_height, client.clone()).await?;

Expand Down Expand Up @@ -159,10 +167,13 @@ async fn crawling_fn(
transaction_conn,
inner_txs,
)?;
transaction_repo::insert_crawler_state(
transaction_conn,
crawler_state,
)?;

if should_update_crawler_state {
transaction_repo::insert_crawler_state(
transaction_conn,
crawler_state,
)?;
}

anyhow::Ok(())
})
Expand Down

0 comments on commit aefaf71

Please sign in to comment.