diff --git a/crates/water/src/runtime/client.rs b/crates/water/src/runtime/client.rs index 2ab8951..b78a7d5 100644 --- a/crates/water/src/runtime/client.rs +++ b/crates/water/src/runtime/client.rs @@ -97,7 +97,7 @@ impl WATERClient { } /// keep_listen is the function that is called when user wants to accept a newly income connection, - /// it creates a new WASM instance and migrate the previous listener to it. Used by v0 listener and relay for now. + /// it creates a new WASM instance and migrate the previous listener to it. -- v0_plus listener and relay for now. pub fn keep_listen(&mut self) -> Result { info!("[HOST] WATERClient keep listening...",); @@ -128,7 +128,7 @@ impl WATERClient { self.debug = debug; } - /// `connect` is the entry point for `Dialer` to connect to a remote address + /// `connect` is the function for `Dialer` to connect to a remote address pub fn connect(&mut self) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient connecting ..."); @@ -161,7 +161,7 @@ impl WATERClient { Ok(()) } - /// `associate` is the entry point for `Relay` to associate with a remote addr + /// `associate` is the function for `Relay` to associate a remote connection pub fn associate(&mut self) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient relaying ..."); @@ -176,8 +176,8 @@ impl WATERClient { Ok(()) } - /// `accept` is the entry point for `Listener` to accept a connection - /// called after `listen` + /// `accept` is the function for `Listener` to accept a connection + /// called after `listen()` pub fn accept(&mut self) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient accepting ..."); @@ -192,8 +192,8 @@ impl WATERClient { Ok(()) } - /// `run_worker` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in a separate thread - /// it will return a `JoinHandle` for the caller to manage the thread -- used by v0 currently + /// `run_worker` is the function to run the entry_fn(a worker in WATM) in a separate thread and return the thread handle + /// it will return a `JoinHandle` for the caller to manage the thread -- used by v0_plus pub fn run_worker( &mut self, ) -> Result>, anyhow::Error> { @@ -207,7 +207,7 @@ impl WATERClient { } } - /// `execute` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in the current thread + /// `execute` is the function to run the entry_fn(a worker in WATM) in the current thread /// -- replace the thread running Host when running it <- used by v1 currently pub fn execute(&mut self) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient Executing ..."); @@ -229,7 +229,7 @@ impl WATERClient { Ok(()) } - /// `cancel_with` is the function to set the pipe for canceling later -- v0 + /// `cancel_with` is the function to set the cancel pipe for exiting later -- v0_plus pub fn cancel_with(&mut self) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient cancel_with ..."); @@ -251,7 +251,7 @@ impl WATERClient { Ok(()) } - /// `cancel` is the function to cancel the thread running the entry_fn -- v0 + /// `cancel` is the function to send thru the cancel_pipe and let the thread running the worker to exit -- v0_plus pub fn cancel(&mut self) -> Result<(), anyhow::Error> { info!("[HOST] WATERClient canceling ..."); diff --git a/crates/water/src/runtime/core.rs b/crates/water/src/runtime/core.rs index 8ed51a6..17e0900 100644 --- a/crates/water/src/runtime/core.rs +++ b/crates/water/src/runtime/core.rs @@ -75,62 +75,10 @@ impl H2O { return Err(anyhow::Error::msg("WATM module version not found")); } - Self::setup_core(conf, linker, store, module, engine, version) + Self::create_core(conf, linker, store, module, engine, version) } - /// This function is for migrating the v0 core for listener and relay - /// to handle every new connection will create a new separate core (as v0 spec) - pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O) -> Result { - info!("[HOST] WATERCore H2O v0_migrating..."); - - // reseting the listener accepted_fd or the relay's accepted_fd & dial_fd - // when migrating from existed listener / relay - let version = match &core.version { - Version::V0(v0conf) => { - match v0conf { - Some(og_v0_conf) => match og_v0_conf.lock() { - Ok(og_v0_conf) => { - let mut new_v0_conf_inner = og_v0_conf.clone(); - // reset the new cloned v0conf - new_v0_conf_inner.reset_listener_or_relay(); - - Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner)))) - } - Err(e) => { - return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?; - } - }, - None => { - return Err(anyhow::anyhow!("v0_conf is None"))?; - } - } - } - _ => { - return Err(anyhow::anyhow!("This is not a V0 core"))?; - } - }; - - // NOTE: Some of the followings can reuse the existing core, leave to later explore - let wasm_config = wasmtime::Config::new(); - - #[cfg(feature = "multithread")] - { - wasm_config.wasm_threads(true); - } - - let engine = Engine::new(&wasm_config)?; - let linker: Linker = Linker::new(&engine); - - let module = Module::from_file(&engine, &conf.filepath)?; - - let host = Host::default(); - let store = Store::new(&engine, host); - - Self::setup_core(conf, linker, store, module, engine, Some(version)) - } - - /// called by init_core() or v0_migrate_core() to setup the core (reduce code duplication) - pub fn setup_core( + pub fn create_core( conf: &WATERConfig, mut linker: Linker, mut store: Store, @@ -213,6 +161,57 @@ impl H2O { }) } + // This function is for migrating the v0 core for listener and relay + // to handle every new connection is creating a new separate core (as v0 spec) + pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O) -> Result { + info!("[HOST] WATERCore H2O v0_migrating..."); + + // reseting the listener accepted_fd or the relay's accepted_fd & dial_fd + // when migrating from existed listener / relay + let version = match &core.version { + Version::V0(v0conf) => { + match v0conf { + Some(og_v0_conf) => match og_v0_conf.lock() { + Ok(og_v0_conf) => { + let mut new_v0_conf_inner = og_v0_conf.clone(); + // reset the new cloned v0conf + new_v0_conf_inner.reset_listener_or_relay(); + + Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner)))) + } + Err(e) => { + return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?; + } + }, + None => { + return Err(anyhow::anyhow!("v0_conf is None"))?; + } + } + } + _ => { + return Err(anyhow::anyhow!("This is not a V0 core"))?; + } + }; + + // NOTE: Some of the followings can reuse the existing core, leave to later explore + let wasm_config = wasmtime::Config::new(); + + #[cfg(feature = "multithread")] + { + wasm_config.wasm_threads(true); + } + + let engine = Engine::new(&wasm_config)?; + let linker: Linker = Linker::new(&engine); + + let module = Module::from_file(&engine, &conf.filepath)?; + + let host = Host::default(); + let store = Store::new(&engine, host); + + Self::create_core(conf, linker, store, module, engine, Some(version)) + } + pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> { self._init(conf.debug)?; self._process_config(conf)?; // This is for now needed only by v1_preview diff --git a/crates/water/src/runtime/transport.rs b/crates/water/src/runtime/transport.rs index f05d6bf..115e8b6 100644 --- a/crates/water/src/runtime/transport.rs +++ b/crates/water/src/runtime/transport.rs @@ -15,7 +15,8 @@ pub trait WATERTransportTrait: Send { // read from WASM's caller_reader match caller_io { Some(ref mut caller_io) => match caller_io.read(buf) { - Ok(n) => Ok(n as i64), + Ok(n) if n > 0 => Ok(n as i64), + Ok(_) => Err(anyhow::Error::msg("Stream closed or read 0 bytes")), Err(e) => Err(anyhow::Error::msg(format!( "failed to read from caller_reader: {}", e diff --git a/crates/water/src/runtime/v0/config.rs b/crates/water/src/runtime/v0/config.rs index c8adb20..6eb9f53 100644 --- a/crates/water/src/runtime/v0/config.rs +++ b/crates/water/src/runtime/v0/config.rs @@ -93,12 +93,14 @@ impl V0Config { }) } + /// It will connect to the remote addr and set the fd in the V0Config pub fn connect(&mut self) -> Result { let addr = format!("{}:{}", self.remote_addr, self.remote_port); info!("[HOST] WATERCore V0 connecting to {}", addr); match &mut self.conn { + // if the V0CRole is Relay, then it will remain as Relay V0CRole::Relay(_, _, ref mut conn_fd) => { // now relay has been built, need to dial if *conn_fd != -1 { @@ -109,6 +111,7 @@ impl V0Config { *conn_fd = conn.as_raw_fd(); Ok(conn) } + // if the V0CRole has not been set, and connect() was called, then it should be a dialer V0CRole::Unknown => { let conn = std::net::TcpStream::connect(addr)?; self.conn = V0CRole::Dialer(conn.as_raw_fd()); @@ -118,6 +121,7 @@ impl V0Config { } } + /// It will create a listener and set the fd in the V0Config (for either listener or relay) pub fn create_listener(&mut self, is_relay: bool) -> Result<(), anyhow::Error> { let addr = format!("{}:{}", self.loc_addr, self.loc_port); @@ -133,6 +137,7 @@ impl V0Config { Ok(()) } + /// It will accept a connection and set the fd in the V0Config (for either listener or relay) pub fn accept(&mut self) -> Result { info!("[HOST] WATERCore V0 accept with conn {:?} ...", self.conn); @@ -146,7 +151,7 @@ impl V0Config { let (stream, _) = listener.accept()?; - *listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope + *listener_fd = listener.into_raw_fd(); // made sure the listener is not closed after scope *accepted_fd = stream.as_raw_fd(); Ok(stream) @@ -158,7 +163,7 @@ impl V0Config { let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) }; let (stream, _) = listener.accept()?; - *listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope + *listener_fd = listener.into_raw_fd(); // made sure the listener is not closed after scope *accepted_fd = stream.as_raw_fd(); Ok(stream) } @@ -166,6 +171,7 @@ impl V0Config { } } + /// It will close the connection to remote / accepted connection listened and exit gracefully pub fn defer(&mut self) { info!("[HOST] WATERCore V0 defer with conn {:?} ...", self.conn); @@ -193,6 +199,7 @@ impl V0Config { } } + /// It is used for listener and relay only, to reset the accepted connection in the migrated listener / relay pub fn reset_listener_or_relay(&mut self) { info!( "[HOST] WATERCore v0 reset lisener / relay with conn {:?} ...", @@ -202,21 +209,15 @@ impl V0Config { match self.conn { V0CRole::Listener(_, ref mut accepted_fd) => { if *accepted_fd != -1 { - let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; - drop(accepted_conn); *accepted_fd = -1; // set it back to default } } V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => { if *accepted_fd != -1 { - let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; - drop(accepted_conn); *accepted_fd = -1; // set it back to default } if *conn_fd != -1 { - let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) }; - drop(conn); *conn_fd = -1; // set it back to default } } diff --git a/examples/clients/cli/Cargo.toml b/examples/clients/cli/Cargo.toml index daf817e..9f41f81 100644 --- a/examples/clients/cli/Cargo.toml +++ b/examples/clients/cli/Cargo.toml @@ -17,5 +17,6 @@ clap = { version="4.2.1", features = ["derive"] } anyhow = "1.0.7" tracing = "0.1" tracing-subscriber = "0.3.17" +tokio = { version = "1", features = ["full"] } water = {path="../../../crates/water", version="0.1.0"} diff --git a/examples/clients/cli/README.md b/examples/clients/cli/README.md index 8c6d89c..8f62f6c 100644 --- a/examples/clients/cli/README.md +++ b/examples/clients/cli/README.md @@ -1,7 +1,5 @@ # cli tool for using `water` library -🚧 Currently under reimplementation 🚧 - ## How to run? To run the Host program + WASM: ```shell @@ -15,28 +13,60 @@ nc 127.0.0.1 9005 you should see `> CONNECTED` in the terminal of running WASM, then you can connect a bunch like this and input anything to see how it echos. ## Examples -To run the shadowsocks wasm: - -1. run the server side from the [official implementation](https://github.com/shadowsocks/shadowsocks-rust) with the following config: - ```json - { - "server": "127.0.0.1", - "server_port": 8388, - "password": "Test!23", - "method": "chacha20-ietf-poly1305" - } - ``` - and run the server side with: - ```shell - cargo run --bin ssserver -- -c .json - ``` - -2. then run the cli tool with the `ss_client_wasm` - ```shell - cargo run --bin water_cli -- --wasm-path demo_wasm/ss_client_wasm.wasm --entry-fn v1_listen --config-wasm demo_configs/ss_config.json --type-client 3 - ``` - -3. to test the traffic is going through - ```shell - curl -4 -v --socks5 localhost:8080 https://erikchi.com - ``` \ No newline at end of file +1. To run the shadowsocks WATM: + + - run the server side from the [official implementation](https://github.com/shadowsocks/shadowsocks-rust) with the following config: + ```json + { + "server": "127.0.0.1", + "server_port": 8388, + "password": "Test!23", + "method": "chacha20-ietf-poly1305" + } + ``` + and run the server side with: + ```shell + cargo run --bin ssserver -- -c .json + ``` + + - then run the cli tool with the `ss_client_wasm` + ```shell + cargo run --bin water_cli -- --wasm-path demo_wasm/ss_client_wasm.wasm --entry-fn v1_listen --config-wasm demo_configs/ss_config.json --type-client 3 + ``` + + - to test the traffic is going through + ```shell + curl -4 -v --socks5 localhost:8080 https://erikchi.com + ``` + +2. To run the v0_plus WATM: + + 1. Run a v0_plus Listener: + - use the cli tool + ```shell + cargo run --bin water_cli -- --wasm-path demo_wasm/plain.wasm --entry-fn _water_worker --config-wasm demo_configs/v0_listener_config.json --type-client 1 + ``` + + - then test with: + ```shell + nc 127.0.0.1 8888 + hello + hello + ... + ``` + you can also look at the log printed out by the cli tool to see the listener is receiving the input. + + 2. Run a v0_plus Relay: + - first you need a listener / destination for the Relay, you can use the above Listener for as it, then config the correct `ip:port` for the `remote` in the config file `demo_configs/v0_relay_config.json`, then run the cli tool: + ```shell + cargo run --bin water_cli -- --wasm-path demo_wasm/plain.wasm --entry-fn _water_worker --config-wasm demo_configs/v0_relay_config.json --type-client 2 + ``` + + - then test with: + ```shell + nc 127.0.0.1 8080 + hello + hello + ... + ``` + you can also look at the log printed out by the cli tool / the listener to see the Relay is relaying the input. \ No newline at end of file diff --git a/examples/clients/cli/demo_configs/v0_listener_config.json b/examples/clients/cli/demo_configs/v0_listener_config.json new file mode 100644 index 0000000..09cfd65 --- /dev/null +++ b/examples/clients/cli/demo_configs/v0_listener_config.json @@ -0,0 +1,7 @@ +{ + "remote_address": "127.0.0.1", + "remote_port": 8088, + "local_address": "127.0.0.1", + "local_port": 8888, + "bypass": false +} \ No newline at end of file diff --git a/examples/clients/cli/demo_configs/v0_relay_config.json b/examples/clients/cli/demo_configs/v0_relay_config.json new file mode 100644 index 0000000..be1ba54 --- /dev/null +++ b/examples/clients/cli/demo_configs/v0_relay_config.json @@ -0,0 +1,7 @@ +{ + "remote_address": "127.0.0.1", + "remote_port": 8888, + "local_address": "127.0.0.1", + "local_port": 8080, + "bypass": false +} \ No newline at end of file diff --git a/examples/clients/cli/demo_wasm/plain.wasm b/examples/clients/cli/demo_wasm/plain.wasm index 77e1350..2f51fbc 100644 Binary files a/examples/clients/cli/demo_wasm/plain.wasm and b/examples/clients/cli/demo_wasm/plain.wasm differ diff --git a/examples/clients/cli/src/cli.rs b/examples/clients/cli/src/cli.rs index 4f49088..4079b75 100644 --- a/examples/clients/cli/src/cli.rs +++ b/examples/clients/cli/src/cli.rs @@ -1,6 +1,6 @@ use water::config::{WATERConfig, WaterBinType}; use water::globals::{CONFIG_WASM_PATH, MAIN, WASM_PATH}; -use water::runtime; +use water::runtime::client::WATERClient; use clap::Parser; @@ -42,30 +42,89 @@ impl From for WATERConfig { pub fn parse() -> Result { // Parse command-line arguments and execute the appropriate commands - let conf: WATERConfig = Args::parse().into(); Ok(conf) } -pub fn parse_and_execute() -> Result<(), anyhow::Error> { - execute(parse()?) +pub async fn parse_and_execute() -> Result<(), anyhow::Error> { + execute(parse()?).await } -pub fn execute(_conf: WATERConfig) -> Result<(), anyhow::Error> { - let mut water_client = runtime::client::WATERClient::new(_conf).unwrap(); +pub async fn execute(_conf: WATERConfig) -> Result<(), anyhow::Error> { + let mut water_client = WATERClient::new(_conf).unwrap(); match water_client.config.client_type { WaterBinType::Dial => { water_client.connect().unwrap(); } WaterBinType::Runner => { + // generally for v1_preview shadowsocks client water_client.execute().unwrap(); } - WaterBinType::Listen => {} - WaterBinType::Relay => {} + WaterBinType::Listen => { + water_client.listen().unwrap(); + + loop { + water_client.accept().unwrap(); + let next_water_client = water_client.keep_listen().unwrap(); + handle_incoming(water_client).await.unwrap(); + water_client = next_water_client; + } + } + WaterBinType::Relay => { + water_client.listen().unwrap(); + water_client.associate().unwrap(); + water_client.cancel_with().unwrap(); + + let handle_water = water_client.run_worker().unwrap(); + + match handle_water.join().unwrap() { + Ok(_) => {} + Err(e) => { + eprintln!("Running _water_worker ERROR: {}", e); + return Err(anyhow::anyhow!("Failed to join _water_worker thread")); + } + }; + } WaterBinType::Wrap => {} WaterBinType::Unknown => {} } Ok(()) } + +// set as async for later dev of async reading from the pipe +pub async fn handle_incoming(mut water_client: WATERClient) -> Result<(), anyhow::Error> { + water_client.cancel_with().unwrap(); + + let handle_water = water_client.run_worker().unwrap(); + + // taking input from terminal + loop { + let mut buf = vec![0; 1024]; + let res = water_client.read(&mut buf); + + if res.is_ok() { + let str_buf = String::from_utf8(buf).unwrap(); + if str_buf.trim() == "exit" { + water_client.cancel().unwrap(); + break; + } + + println!("Received: {}", str_buf); + } else { + println!("Error: {}", res.unwrap_err()); + break; + } + } + + match handle_water.join().unwrap() { + Ok(_) => {} + Err(e) => { + eprintln!("Running _water_worker ERROR: {}", e); + return Err(anyhow::anyhow!("Failed to join _water_worker thread")); + } + }; + + Ok(()) +} diff --git a/examples/clients/cli/src/main.rs b/examples/clients/cli/src/main.rs index e21d360..dbdd8cb 100644 --- a/examples/clients/cli/src/main.rs +++ b/examples/clients/cli/src/main.rs @@ -9,8 +9,9 @@ use tracing::Level; mod cli; -fn main() -> Result<(), anyhow::Error> { +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { tracing_subscriber::fmt().with_max_level(Level::INFO).init(); - cli::parse_and_execute() + cli::parse_and_execute().await } diff --git a/examples/water_bins/README.md b/examples/water_bins/README.md index 28c26ac..0e16fe3 100644 --- a/examples/water_bins/README.md +++ b/examples/water_bins/README.md @@ -10,9 +10,9 @@ These WATM examples can be compiled to WASM and optimized with the script I've p For example, if you want to make the `ss_client_wasm`, you can run this command in the root directory of this repo: ```shell -sh ./script/make_and_opt_wasm.sh ss_client_wasm_v1 ss_client_wasm +sh ./scripts/make_and_opt_wasm.sh ss_client_wasm_v1 ss_client_wasm ``` which is: ```shell -sh ./script/make_and_opt_wasm.sh ./examples/water_bins/ +sh ./scripts/make_and_opt_wasm.sh ./examples/water_bins/ ``` \ No newline at end of file diff --git a/examples/water_bins/plain_v0/Cargo.toml b/examples/water_bins/plain_v0/Cargo.toml index c4d36b6..a1af264 100644 --- a/examples/water_bins/plain_v0/Cargo.toml +++ b/examples/water_bins/plain_v0/Cargo.toml @@ -18,5 +18,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1.33.0", default-features = false, features = ["fs", "net", "rt", "macros", "io-util", "io-std", "time", "sync"] } +tracing = "0.1" +tracing-subscriber = "0.3.17" + # water wasm lib import water-wasm-v0 = { path = "../../../crates/wasm_v0/", version = "0.1.0" } \ No newline at end of file diff --git a/examples/water_bins/plain_v0/plain.wasm b/examples/water_bins/plain_v0/plain.wasm index 375787f..2f51fbc 100644 Binary files a/examples/water_bins/plain_v0/plain.wasm and b/examples/water_bins/plain_v0/plain.wasm differ diff --git a/examples/water_bins/plain_v0/src/lib.rs b/examples/water_bins/plain_v0/src/lib.rs index 3eb0b3b..5e288d4 100644 --- a/examples/water_bins/plain_v0/src/lib.rs +++ b/examples/water_bins/plain_v0/src/lib.rs @@ -11,6 +11,8 @@ use tokio::{ }; use v0plus::ConnPair; +use tracing::{info, Level}; + const READ_BUFFER_SIZE: usize = 1024; // 1KB is shorter than common MTU but longer than common TCP MSS lazy_static! { @@ -28,7 +30,11 @@ pub static VERSION: i32 = v0plus::VERSION; // version-independent API #[export_name = "_water_init"] -pub fn _init() -> i32 { +pub fn _init(debug: bool) -> i32 { + if debug { + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + } + // do all the initializing work here AND pull config from host sleep(Duration::from_millis(10)); // sleep for 10ms error::Error::None.i32() @@ -143,6 +149,8 @@ pub fn _cancel_with(fd: i32) -> i32 { /// WASM Entry point here #[export_name = "_water_worker"] pub fn _worker() -> i32 { + info!("[WATM plain] worker: start"); + // borrow CANCEL as &mut AsyncFdConn let mut cancel = CANCEL.lock().unwrap(); let cancel = cancel.deref_mut(); @@ -214,6 +222,8 @@ async fn bidi_worker( src: &mut common::AsyncFdConn, cancel: &mut common::AsyncFdConn, ) -> std::io::Result<()> { + info!("[WATM plain] bidi_worker: start"); + // upgrade to AsyncFdConn dst.tokio_upgrade().expect("dst upgrade failed"); src.tokio_upgrade().expect("src upgrade failed"); @@ -241,6 +251,8 @@ async fn bidi_worker( println!("Error writing to src: {:?}", e); return Err(e); } + + info!("[WATM plain] dst read {:?}", &dst_buf[0..n]); } Err(e) => { println!("Error reading from dst: {:?}", e); @@ -258,6 +270,8 @@ async fn bidi_worker( println!("Error writing to dst: {:?}", e); return Err(e); } + + info!("[WATM plain] src read {:?}", &src_buf[0..n]); } Err(e) => { println!("Error reading from src: {:?}", e); diff --git a/tests/test_wasm/plain.wasm b/tests/test_wasm/plain.wasm index 77e1350..2f51fbc 100644 Binary files a/tests/test_wasm/plain.wasm and b/tests/test_wasm/plain.wasm differ diff --git a/tests/tests/cross_lang_tests.rs b/tests/tests/cross_lang_tests.rs index 7c32eb4..dd3acdd 100644 --- a/tests/tests/cross_lang_tests.rs +++ b/tests/tests/cross_lang_tests.rs @@ -4,7 +4,7 @@ #![allow(dead_code)] -use water::*; +use water::{runtime::client::WATERClient, *}; use tracing::Level; @@ -12,6 +12,7 @@ use std::{ fs::File, io::{Error, ErrorKind, Read, Write}, net::{TcpListener, TcpStream}, + thread::JoinHandle, vec, }; @@ -184,57 +185,96 @@ fn test_cross_lang_wasm_listener() -> Result<(), Box> { Ok(()) } -// #[test] -// fn test_cross_lang_wasm_multi_listener() -> Result<(), Box> { -// // tracing_subscriber::fmt().with_max_level(Level::INFO).init(); - -// let cfg_str = r#" -// { -// "remote_address": "127.0.0.1", -// "remote_port": 8088, -// "local_address": "127.0.0.1", -// "local_port": 8084 -// } -// "#; -// // Create a directory inside of `std::env::temp_dir()`. -// let dir = tempdir()?; -// let file_path = dir.path().join("temp-config.txt"); -// let mut file = File::create(&file_path)?; -// writeln!(file, "{}", cfg_str)?; - -// let conf = config::WATERConfig::init( -// // plain.wasm is in v0 and fully compatible with the Go engine -// // More details for the Go-side of running plain.wasm check here: -// // https://github.com/gaukas/water/tree/master/examples/v0/plain -// // -// // More details for the implementation of plain.wasm check this PR: -// // https://github.com/erikziyunchi/water-rs/pull/10 -// // -// String::from("./test_wasm/plain.wasm"), -// String::from("_water_worker"), -// String::from(file_path.to_string_lossy()), -// config::WaterBinType::Listen, -// true, -// ) -// .unwrap(); - -// let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); -// water_client.listen().unwrap(); - -// water_client.accept().unwrap(); -// water_client.cancel_with().unwrap(); -// let mut handler = water_client.run_worker().unwrap(); - -// for i in 0..5 { -// handler.join().unwrap(); -// let mut new_water = water_client.keep_listen().unwrap(); -// new_water.accept().unwrap(); -// new_water.cancel_with().unwrap(); -// handler = new_water.run_worker().unwrap(); -// } - -// drop(file); -// dir.close()?; - -// Ok(()) -// } +#[test] +fn test_cross_lang_wasm_multi_listener() -> Result<(), Box> { + let cfg_str = r#" + { + "remote_address": "127.0.0.1", + "remote_port": 8088, + "local_address": "127.0.0.1", + "local_port": 10088, + "bypass": false + } + "#; + // Create a directory inside of `std::env::temp_dir()`. + let dir = tempdir()?; + let file_path = dir.path().join("temp-config.txt"); + let mut file = File::create(&file_path)?; + writeln!(file, "{}", cfg_str)?; + + let conf = config::WATERConfig::init( + // plain.wasm is in v0 and fully compatible with the Go engine + // More details for the Go-side of running plain.wasm check here: + // https://github.com/gaukas/water/tree/master/examples/v0/plain + // + // More details for the implementation of plain.wasm check this PR: + // https://github.com/erikziyunchi/water-rs/pull/10 + // + String::from("./test_wasm/plain.wasm"), + String::from("_water_worker"), + String::from(file_path.to_string_lossy()), + config::WaterBinType::Listen, + true, + ) + .unwrap(); + + let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); + water_client.listen().unwrap(); + + let test_message: &'static [u8] = b"hello"; + + let mut water_handles: Vec> = Vec::new(); + + // creating two connections to the listener + for _i in 0..2 { + // make a connect to the listener in a separate thread + std::thread::spawn(|| { + let mut stream = TcpStream::connect(("127.0.0.1", 10088)).unwrap(); + let res = stream.write(test_message); + + assert!(res.is_ok()); + let write_bytes = res.unwrap(); + + assert_eq!(write_bytes, test_message.len()); + }); + + water_client.accept().unwrap(); + + let new_water = water_client.keep_listen().unwrap(); + + water_handles.push(std::thread::spawn(|| { + handle_connection(water_client, test_message).unwrap(); + })); + + water_client = new_water; + } + + for handle in water_handles { + handle.join().unwrap(); + } + + drop(file); + dir.close()?; + + Ok(()) +} + +fn handle_connection( + mut water_client: WATERClient, + test_message: &[u8], +) -> Result<(), Box> { + water_client.cancel_with().unwrap(); + + let handle = water_client.run_worker().unwrap(); + + let mut buf = vec![0; 32]; + let res = water_client.read(&mut buf); + assert!(res.is_ok()); + assert_eq!(res.unwrap() as usize, test_message.len()); + + water_client.cancel().unwrap(); + + let _ = handle.join().unwrap(); + + Ok(()) +}