Skip to content

Commit

Permalink
add timeout for rpc connections (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
csdtowards authored Nov 9, 2024
1 parent fae2d5e commit 0c49388
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 28 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node/log_entry_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ futures-util = "0.3.28"
thiserror = "1.0.44"
lazy_static = "1.4.0"
metrics = { workspace = true }
reqwest = {version = "0.11", features = ["json"]}
url = { version = "2.4", default-features = false }
7 changes: 7 additions & 0 deletions node/log_entry_sync/src/sync_manager/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use crate::ContractAddress;

pub struct LogSyncConfig {
Expand Down Expand Up @@ -34,6 +36,9 @@ pub struct LogSyncConfig {
pub watch_loop_wait_time_ms: u64,
// force to sync log from start block number
pub force_log_sync_from_start_block_number: bool,

// the timeout for blockchain rpc connection
pub blockchain_rpc_timeout: Duration,
}

#[derive(Clone)]
Expand Down Expand Up @@ -61,6 +66,7 @@ impl LogSyncConfig {
remove_finalized_block_interval_minutes: u64,
watch_loop_wait_time_ms: u64,
force_log_sync_from_start_block_number: bool,
blockchain_rpc_timeout: Duration,
) -> Self {
Self {
rpc_endpoint_url,
Expand All @@ -77,6 +83,7 @@ impl LogSyncConfig {
remove_finalized_block_interval_minutes,
watch_loop_wait_time_ms,
force_log_sync_from_start_block_number,
blockchain_rpc_timeout,
}
}
}
36 changes: 18 additions & 18 deletions node/log_entry_sync/src/sync_manager/log_entry_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::sync_manager::log_query::LogQuery;
use crate::sync_manager::RETRY_WAIT_MS;
use crate::ContractAddress;
use crate::{ContractAddress, LogSyncConfig};
use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, Sha3Algorithm};
use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow};
Expand All @@ -12,7 +12,6 @@ use futures::StreamExt;
use jsonrpsee::tracing::{debug, error, info, warn};
use shared_types::{DataRoot, Transaction};
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
Expand All @@ -34,28 +33,29 @@ pub struct LogEntryFetcher {
}

impl LogEntryFetcher {
pub async fn new(
url: &str,
contract_address: ContractAddress,
log_page_size: u64,
confirmation_delay: u64,
rate_limit_retries: u32,
timeout_retries: u32,
initial_backoff: u64,
) -> Result<Self> {
pub async fn new(config: &LogSyncConfig) -> Result<Self> {
let provider = Arc::new(Provider::new(
RetryClientBuilder::default()
.rate_limit_retries(rate_limit_retries)
.timeout_retries(timeout_retries)
.initial_backoff(Duration::from_millis(initial_backoff))
.build(Http::from_str(url)?, Box::new(HttpRateLimitRetryPolicy)),
.rate_limit_retries(config.rate_limit_retries)
.timeout_retries(config.timeout_retries)
.initial_backoff(Duration::from_millis(config.initial_backoff))
.build(
Http::new_with_client(
url::Url::parse(&config.rpc_endpoint_url)?,
reqwest::Client::builder()
.timeout(config.blockchain_rpc_timeout)
.connect_timeout(config.blockchain_rpc_timeout)
.build()?,
),
Box::new(HttpRateLimitRetryPolicy),
),
));
// TODO: `error` types are removed from the ABI json file.
Ok(Self {
contract_address,
contract_address: config.contract_address,
provider,
log_page_size,
confirmation_delay,
log_page_size: config.log_page_size,
confirmation_delay: config.confirmation_block_count,
})
}

Expand Down
11 changes: 1 addition & 10 deletions node/log_entry_sync/src/sync_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,7 @@ impl LogSyncManager {
.expect("shutdown send error")
},
async move {
let log_fetcher = LogEntryFetcher::new(
&config.rpc_endpoint_url,
config.contract_address,
config.log_page_size,
config.confirmation_block_count,
config.rate_limit_retries,
config.timeout_retries,
config.initial_backoff,
)
.await?;
let log_fetcher = LogEntryFetcher::new(&config).await?;
let data_cache = DataCache::new(config.cache_config.clone());

let block_hash_cache = Arc::new(RwLock::new(
Expand Down
1 change: 1 addition & 0 deletions node/src/config/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl ZgsConfig {
self.remove_finalized_block_interval_minutes,
self.watch_loop_wait_time_ms,
self.force_log_sync_from_start_block_number,
Duration::from_secs(self.blockchain_rpc_timeout_secs),
))
}

Expand Down
2 changes: 2 additions & 0 deletions node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ build_config! {
(remove_finalized_block_interval_minutes, (u64), 30)
(watch_loop_wait_time_ms, (u64), 500)

(blockchain_rpc_timeout_secs, (u64), 120)

// chunk pool
(chunk_pool_write_window_size, (usize), 4)
(chunk_pool_max_cached_chunks_all, (usize), 4*1024*1024) // 1G
Expand Down

0 comments on commit 0c49388

Please sign in to comment.