Skip to content

Commit

Permalink
Add more metrics for network unbounded channel (#264)
Browse files Browse the repository at this point in the history
* Add metrics for file finalization in chunk pool

* Add metrics for network unbounded channel
  • Loading branch information
boqiu authored Nov 12, 2024
1 parent 0c49388 commit 1de7afe
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 71 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node/chunk_pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ tokio = { version = "1.19.2", features = ["sync"] }
async-lock = "2.5.0"
hashlink = "0.8.0"
tracing = "0.1.35"
lazy_static = "1.4.0"
metrics = { workspace = true }
20 changes: 13 additions & 7 deletions node/chunk_pool/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
use super::mem_pool::MemoryChunkPool;
use crate::mem_pool::FileID;
use anyhow::Result;
use network::NetworkMessage;
use metrics::{Histogram, Sample};
use network::{NetworkMessage, NetworkSender};
use shared_types::{ChunkArray, FileProof};
use std::{sync::Arc, time::SystemTime};
use std::{sync::Arc, time::Instant};
use storage_async::{ShardConfig, Store};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::UnboundedReceiver;

lazy_static::lazy_static! {
pub static ref FINALIZE_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("chunk_pool_finalize_file_latency", 1024);
}

/// Handle the cached file when uploaded completely and verified from blockchain.
/// Generally, the file will be persisted into log store.
pub struct ChunkPoolHandler {
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>,
sender: NetworkSender,
}

impl ChunkPoolHandler {
pub(crate) fn new(
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>,
sender: NetworkSender,
) -> Self {
ChunkPoolHandler {
receiver,
Expand Down Expand Up @@ -68,7 +73,7 @@ impl ChunkPoolHandler {
}
}

let start = SystemTime::now();
let start = Instant::now();
if !self
.log_store
.finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash)
Expand All @@ -77,8 +82,9 @@ impl ChunkPoolHandler {
return Ok(false);
}

let elapsed = start.elapsed()?;
let elapsed = start.elapsed();
debug!(?id, ?elapsed, "Transaction finalized");
FINALIZE_FILE_LATENCY.update_since(start);

// always remove file from pool after transaction finalized
self.mem_pool.remove_file(&id.root).await;
Expand Down
2 changes: 1 addition & 1 deletion node/chunk_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Config {
pub fn unbounded(
config: Config,
log_store: Arc<storage_async::Store>,
network_send: tokio::sync::mpsc::UnboundedSender<network::NetworkMessage>,
network_send: network::NetworkSender,
) -> (Arc<MemoryChunkPool>, ChunkPoolHandler) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();

Expand Down
5 changes: 2 additions & 3 deletions node/miner/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use crate::monitor::Monitor;
use crate::sealer::Sealer;
use crate::submitter::Submitter;
use crate::{config::MinerConfig, mine::PoraService, watcher::MineContextWatcher};
use network::NetworkMessage;
use network::NetworkSender;
use std::sync::Arc;
use std::time::Duration;
use storage::config::ShardConfig;
use storage_async::Store;
use tokio::sync::broadcast;
use tokio::sync::mpsc;

#[derive(Clone, Debug)]
pub enum MinerMessage {
Expand All @@ -29,7 +28,7 @@ pub struct MineService;
impl MineService {
pub async fn spawn(
executor: task_executor::TaskExecutor,
_network_send: mpsc::UnboundedSender<NetworkMessage>,
_network_send: NetworkSender,
config: MinerConfig,
store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> {
Expand Down
1 change: 1 addition & 0 deletions node/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ if-addrs = "0.10.1"
slog = "2.7.0"
igd = "0.12.1"
duration-str = "0.5.1"
channel = { path = "../../common/channel" }

[dependencies.libp2p]
version = "0.45.1"
Expand Down
7 changes: 7 additions & 0 deletions node/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,10 @@ pub enum NetworkMessage {
udp_socket: Option<SocketAddr>,
},
}

pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;

pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
channel::metrics::unbounded_channel("network")
}
8 changes: 2 additions & 6 deletions node/network/src/nat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
//! Currently supported strategies:
//! - UPnP

use crate::{NetworkConfig, NetworkMessage};
use crate::{NetworkConfig, NetworkMessage, NetworkSender};
use if_addrs::get_if_addrs;
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
use tokio::sync::mpsc;

/// Configuration required to construct the UPnP port mappings.
pub struct UPnPConfig {
Expand Down Expand Up @@ -36,10 +35,7 @@ impl UPnPConfig {
}

/// Attempts to construct external port mappings with UPnP.
pub fn construct_upnp_mappings(
config: UPnPConfig,
network_send: mpsc::UnboundedSender<NetworkMessage>,
) {
pub fn construct_upnp_mappings(config: UPnPConfig, network_send: NetworkSender) {
info!("UPnP Attempting to initialise routes");
match igd::search_gateway(Default::default()) {
Err(e) => info!(error = %e, "UPnP not available"),
Expand Down
5 changes: 2 additions & 3 deletions node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::discovery::enr;
use crate::multiaddr::Protocol;
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, ReqId};
use crate::types::{error, GossipKind};
use crate::{EnrExt, NetworkMessage};
use crate::{EnrExt, NetworkSender};
use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource};
use futures::prelude::*;
use libp2p::core::{
Expand All @@ -21,7 +21,6 @@ use std::io::prelude::*;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;

use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};

Expand Down Expand Up @@ -60,7 +59,7 @@ pub struct Context<'a> {
impl<AppReqId: ReqId> Service<AppReqId> {
pub async fn new(
executor: task_executor::TaskExecutor,
network_sender: UnboundedSender<NetworkMessage>,
network_sender: NetworkSender,
ctx: Context<'_>,
) -> error::Result<(Arc<NetworkGlobals>, Keypair, Self)> {
trace!("Libp2p Service starting");
Expand Down
4 changes: 2 additions & 2 deletions node/network/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg(test)]

use libp2p::gossipsub::GossipsubConfigBuilder;
use network::new_network_channel;
use network::Enr;
use network::EnrExt;
use network::Multiaddr;
Expand All @@ -22,7 +23,6 @@ pub mod swarm;
type ReqId = usize;

use tempfile::Builder as TempBuilder;
use tokio::sync::mpsc::unbounded_channel;

#[allow(unused)]
pub struct Libp2pInstance(LibP2PService<ReqId>, exit_future::Signal);
Expand Down Expand Up @@ -72,7 +72,7 @@ pub async fn build_libp2p_instance(rt: Weak<Runtime>, boot_nodes: Vec<Enr>) -> L
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx);
let libp2p_context = network::Context { config: &config };
let (sender, _) = unbounded_channel();
let (sender, _) = new_network_channel();
Libp2pInstance(
LibP2PService::new(executor, sender, libp2p_context)
.await
Expand Down
16 changes: 9 additions & 7 deletions node/router/src/libp2p_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use network::{
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
PublicKey, PubsubMessage, Request, RequestId, Response,
};
use network::{Multiaddr, PeerAction, ReportSource};
use network::{Multiaddr, NetworkSender, PeerAction, ReportSource};
use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID};
use storage::config::ShardConfig;
use storage_async::Store;
Expand Down Expand Up @@ -88,7 +88,7 @@ pub struct Libp2pEventHandler {
/// A collection of global variables, accessible outside of the network service.
network_globals: Arc<NetworkGlobals>,
/// A channel to the router service.
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
/// A channel to the syncing service.
sync_send: SyncSender,
/// A channel to the RPC chunk pool service.
Expand All @@ -112,7 +112,7 @@ impl Libp2pEventHandler {
pub fn new(
config: Config,
network_globals: Arc<NetworkGlobals>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
sync_send: SyncSender,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
local_keypair: Keypair,
Expand Down Expand Up @@ -1010,10 +1010,12 @@ mod tests {
use network::{
discovery::{CombinedKey, ConnectionId},
discv5::enr::EnrBuilder,
new_network_channel,
rpc::{GetChunksRequest, StatusMessage, SubstreamId},
types::FindFile,
CombinedKeyExt, Keypair, MessageAcceptance, MessageId, Multiaddr, NetworkGlobals,
NetworkMessage, PeerId, PubsubMessage, Request, RequestId, Response, SyncId,
NetworkMessage, NetworkReceiver, PeerId, PubsubMessage, Request, RequestId, Response,
SyncId,
};
use shared_types::{timestamp_now, ChunkArray, ChunkArrayWithProof, FlowRangeProof, TxID};
use storage::{
Expand All @@ -1035,8 +1037,8 @@ mod tests {
runtime: TestRuntime,
network_globals: Arc<NetworkGlobals>,
keypair: Keypair,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
network_send: NetworkSender,
network_recv: NetworkReceiver,
sync_send: SyncSender,
sync_recv: SyncReceiver,
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
Expand All @@ -1050,7 +1052,7 @@ mod tests {
fn default() -> Self {
let runtime = TestRuntime::default();
let (network_globals, keypair) = Context::new_network_globals();
let (network_send, network_recv) = mpsc::unbounded_channel();
let (network_send, network_recv) = new_network_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();

Expand Down
12 changes: 5 additions & 7 deletions node/router/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage;
use network::types::NewFile;
use network::PubsubMessage;
use network::{
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService, Swarm,
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
};
use pruner::PrunerMessage;
use shared_types::timestamp_now;
Expand All @@ -33,7 +31,7 @@ pub struct RouterService {
network_globals: Arc<NetworkGlobals>,

/// The receiver channel for Zgs to communicate with the network service.
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
network_recv: NetworkReceiver,

/// The receiver channel for Zgs to communicate with the pruner service.
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
Expand All @@ -57,8 +55,8 @@ impl RouterService {
executor: task_executor::TaskExecutor,
libp2p: LibP2PService<RequestId>,
network_globals: Arc<NetworkGlobals>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_recv: NetworkReceiver,
network_send: NetworkSender,
sync_send: SyncSender,
_miner_send: Option<broadcast::Sender<MinerMessage>>,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
Expand Down
6 changes: 2 additions & 4 deletions node/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ use file_location_cache::FileLocationCache;
use futures::channel::mpsc::Sender;
use jsonrpsee::core::RpcResult;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use network::NetworkGlobals;
use network::NetworkMessage;
use network::{NetworkGlobals, NetworkMessage, NetworkSender};
use std::error::Error;
use std::sync::Arc;
use storage_async::Store;
use sync::{SyncRequest, SyncResponse, SyncSender};
use task_executor::ShutdownReason;
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use zgs::RpcServer as ZgsRpcServer;
use zgs_miner::MinerMessage;

Expand All @@ -42,7 +40,7 @@ pub struct Context {
pub config: RPCConfig,
pub file_location_cache: Arc<FileLocationCache>,
pub network_globals: Arc<NetworkGlobals>,
pub network_send: UnboundedSender<NetworkMessage>,
pub network_send: NetworkSender,
pub sync_send: SyncSender,
pub chunk_pool: Arc<MemoryChunkPool>,
pub log_store: Arc<Store>,
Expand Down
13 changes: 5 additions & 8 deletions node/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
use network::{
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService,
self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver,
NetworkSender, RequestId, Service as LibP2PService,
};
use pruner::{Pruner, PrunerConfig, PrunerMessage};
use router::RouterService;
Expand All @@ -27,15 +27,12 @@ macro_rules! require {
}

struct NetworkComponents {
send: mpsc::UnboundedSender<NetworkMessage>,
send: NetworkSender,
globals: Arc<NetworkGlobals>,
keypair: Keypair,

// note: these will be owned by the router service
owned: Option<(
LibP2PService<RequestId>,
mpsc::UnboundedReceiver<NetworkMessage>,
)>,
owned: Option<(LibP2PService<RequestId>, NetworkReceiver)>,
}

struct SyncComponents {
Expand Down Expand Up @@ -144,7 +141,7 @@ impl ClientBuilder {
let service_context = network::Context { config };

// construct communication channel
let (send, recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (send, recv) = new_network_channel();

// launch libp2p service
let (globals, keypair, libp2p) =
Expand Down
7 changes: 3 additions & 4 deletions node/sync/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use network::{NetworkMessage, PeerAction, PeerId, PubsubMessage, ReportSource};
use tokio::sync::mpsc;
use network::{NetworkMessage, NetworkSender, PeerAction, PeerId, PubsubMessage, ReportSource};

pub struct SyncNetworkContext {
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
}

impl SyncNetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>) -> Self {
pub fn new(network_send: NetworkSender) -> Self {
Self { network_send }
}

Expand Down
Loading

0 comments on commit 1de7afe

Please sign in to comment.