Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] socks5 proxy support for tentacle tokio_runtime #384

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tentacle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ igd = { version = "0.15", optional = true, package = "igd-next" }

#tls
tokio-rustls = { version = "0.26.0", optional = true }
shadowsocks-service = { version = "1.21.2", features = ["local"]}
shadowsocks = "1.21.0"
url = "2.5.4"

[target.'cfg(not(target_family = "wasm"))'.dependencies]
# rand 0.8 not support wasm32
Expand Down
15 changes: 12 additions & 3 deletions tentacle/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{io, sync::Arc, time::Duration};

use crate::service::config::ProxyConfig;
use crate::service::config::TcpSocketConfig;
use nohash_hasher::IntMap;
use tokio_util::codec::LengthDelimitedCodec;

Expand Down Expand Up @@ -219,7 +221,14 @@ where
where
F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
{
self.config.tcp_config.tcp = Arc::new(f);
self.config.tcp_config.tcp.tcp_socket_config = Arc::new(f);
self
}

/// Proxy config for tcp
#[cfg(not(target_family = "wasm"))]
pub fn tcp_proxy_config(mut self, proxy_conifg: Option<ProxyConfig>) -> Self {
self.config.tcp_config.tcp.proxy_config = proxy_conifg;
self
}

Expand All @@ -230,7 +239,7 @@ where
where
F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
{
self.config.tcp_config.ws = Arc::new(f);
self.config.tcp_config.ws.tcp_socket_config = Arc::new(f);
self
}

Expand All @@ -254,7 +263,7 @@ where
where
F: Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static,
{
self.config.tcp_config.tls = Arc::new(f);
self.config.tcp_config.tls.tcp_socket_config = Arc::new(f);
self
}
}
Expand Down
4 changes: 2 additions & 2 deletions tentacle/src/runtime/async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ mod os {
let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?;

let socket = {
let t = tcp_config(TcpSocket { inner: socket })?;
let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?;
t.inner
};
// `bind` twice will return error
Expand Down Expand Up @@ -188,7 +188,7 @@ mod os {
// user can disable it on tcp_config
#[cfg(not(windows))]
socket.set_reuse_address(true)?;
let t = tcp_config(TcpSocket { inner: socket })?;
let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?;
t.inner
};

Expand Down
2 changes: 2 additions & 0 deletions tentacle/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ mod generic_split {
}
}

mod socks5;

mod budget;
pub use budget::*;

Expand Down
97 changes: 97 additions & 0 deletions tentacle/src/runtime/socks5.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use log::{debug, trace};
use shadowsocks::relay::socks5::{
self, Address, Command, Error as Socks5Error, HandshakeRequest, HandshakeResponse,
PasswdAuthRequest, PasswdAuthResponse, Reply, TcpRequestHeader, TcpResponseHeader,
};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::{TcpStream, ToSocketAddrs},
};

use crate::service::ProxyConfig;

pub(crate) struct Socks5Config {
pub(crate) proxy_url: String,
pub(crate) auth: Option<(String, String)>,
}

// parse proxy url like "socks5://username:password@localhost:1080" to Socks5Config
pub(crate) fn parse(proxy_url: &str) -> Result<Socks5Config, std::error::Error> {
let parsed_url = url::Url::parse(proxy_url)?;
let scheme = parsed_url.scheme();
match scheme {
"socks5" => {
let auth = match parsed_url.username() {
"" => None,
username => Some((
username.to_string(),
parsed_url.password().unwrap_or("").to_string(),
)),
};
let proxy_url = parsed_url.host_str().ok_or(Err("missing host"))?;
Ok(Socks5Config {
proxy_url,
auth,
})
}
_ => Err(format!("tentacle doesn't support proxy scheme: {}", scheme).into(),
}
}

pub async fn connect<A, P>(
addr: A,
socks5_config: Socks5Config,
) -> Result<TcpStream, Socks5Error>
where
A: Into<Address>,
P: ToSocketAddrs,
{
debug!("client connecting proxy server");
// destruct socks5_config
let Socks5Config { auth, proxy_url } = socks5_config;

let mut s = TcpStream::connect(proxy_url).await?;

// 1. Handshake
let hs = {
if auth.is_some() {
HandshakeRequest::new(vec![socks5::SOCKS5_AUTH_METHOD_PASSWORD])
} else {
HandshakeRequest::new(vec![socks5::SOCKS5_AUTH_METHOD_NONE])
}
};
debug!("client connected, going to send handshake: {:?}", hs);

hs.write_to(&mut s).await?;

let hsp = HandshakeResponse::read_from(&mut s).await?;

debug!("got handshake response: {:?}", hsp);
match hsp.chosen_method {
socks5::SOCKS5_AUTH_METHOD_NONE => (),
socks5::SOCKS5_AUTH_METHOD_PASSWORD => {
let pr = PasswdAuthRequest::new(auth.0, auth.1);
pr.write_to(&mut s).await?;
let prp = PasswdAuthResponse::read_from(&mut s).await?;
match Reply::from_u8(prp.status) {
Reply::Succeeded => debug!("password auth succeeded"),
r => return Err(Socks5Error::Reply(r)),
}
}
}

// 2. Send request header
let h = TcpRequestHeader::new(Command::TcpConnect, addr.into());
debug!("going to connect, req: {:?}", h);
h.write_to(&mut s).await?;

let hp = TcpResponseHeader::read_from(&mut s).await?;

debug!("got response: {:?}", hp);
match hp.reply {
Reply::Succeeded => (),
r => return Err(Socks5Error::Reply(r)),
}

Ok(s)
}
47 changes: 28 additions & 19 deletions tentacle/src/runtime/tokio_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use libc::TCP_FASTOPEN_CONNECT;
use shadowsocks_service::local::socks::client::Socks5TcpClient;
pub use tokio::{
net::{TcpListener, TcpStream},
spawn,
Expand All @@ -10,8 +12,8 @@ use socket2::{Domain, Protocol as SocketProtocol, Socket, Type};
use std::os::unix::io::{FromRawFd, IntoRawFd};
#[cfg(windows)]
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
use std::{io, net::SocketAddr};
use tokio::net::TcpSocket as TokioTcp;
use std::{io, net::SocketAddr, pin::Pin};
use tokio::net::{TcpSocket as TokioTcp, ToSocketAddrs};

#[cfg(feature = "tokio-timer")]
pub use {
Expand Down Expand Up @@ -88,7 +90,7 @@ pub(crate) fn listen(addr: SocketAddr, tcp_config: TcpSocketConfig) -> io::Resul
// user can disable it on tcp_config
#[cfg(not(windows))]
socket.set_reuse_address(true)?;
let t = tcp_config(TcpSocket { inner: socket })?;
let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?;
t.inner.set_nonblocking(true)?;
// safety: fd convert by socket2
unsafe {
Expand Down Expand Up @@ -117,21 +119,28 @@ pub(crate) async fn connect(
addr: SocketAddr,
tcp_config: TcpSocketConfig,
) -> io::Result<TcpStream> {
let domain = Domain::for_address(addr);
let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?;

let socket = {
let t = tcp_config(TcpSocket { inner: socket })?;
t.inner.set_nonblocking(true)?;
// safety: fd convert by socket2
unsafe {
#[cfg(unix)]
let socket = TokioTcp::from_raw_fd(t.into_raw_fd());
#[cfg(windows)]
let socket = TokioTcp::from_raw_socket(t.into_raw_socket());
socket
match tcp_config.proxy_config {
Some(proxy_config) => super::socks5::connect(addr, proxy_config.proxy_url)
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err)),
None => {
let domain = Domain::for_address(addr);
let socket = Socket::new(domain, Type::STREAM, Some(SocketProtocol::TCP))?;

let socket = {
let t = (tcp_config.tcp_socket_config)(TcpSocket { inner: socket })?;
t.inner.set_nonblocking(true)?;
// safety: fd convert by socket2
unsafe {
#[cfg(unix)]
let socket = TokioTcp::from_raw_fd(t.into_raw_fd());
#[cfg(windows)]
let socket = TokioTcp::from_raw_socket(t.into_raw_socket());
socket
}
};

socket.connect(addr).await
}
};

socket.connect(addr).await
}
}
1 change: 1 addition & 0 deletions tentacle/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub use crate::service::{
};
use bytes::Bytes;

pub use crate::service::config::ProxyConfig;
#[cfg(feature = "tls")]
pub use crate::service::config::TlsConfig;

Expand Down
38 changes: 23 additions & 15 deletions tentacle/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,31 @@ impl Default for SessionConfig {
}
}

pub(crate) type TcpSocketConfig =
Arc<dyn Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static>;
/// Proxy related config
#[derive(Clone)]
pub struct ProxyConfig {
/// proxy url, like: socks5://127.0.0.1:9050
pub proxy_url: String,
}

/// This config Allow users to set various underlying parameters of TCP
#[derive(Clone)]
pub(crate) struct TcpSocketConfig {
pub(crate) tcp_socket_config:
Arc<dyn Fn(TcpSocket) -> Result<TcpSocket, std::io::Error> + Send + Sync + 'static>,
pub(crate) proxy_config: Option<ProxyConfig>,
}

impl Default for TcpSocketConfig {
fn default() -> Self {
Self {
tcp_socket_config: Arc::new(Ok),
proxy_config: None,
}
}
}

/// This config Allow users to set various underlying parameters of TCP
#[derive(Clone, Default)]
pub(crate) struct TcpConfig {
/// When dial/listen on tcp, tentacle will call it allow user to set all tcp socket config
pub tcp: TcpSocketConfig,
Expand All @@ -102,18 +122,6 @@ pub(crate) struct TcpConfig {
pub tls: TcpSocketConfig,
}

impl Default for TcpConfig {
fn default() -> Self {
TcpConfig {
tcp: Arc::new(Ok),
#[cfg(feature = "ws")]
ws: Arc::new(Ok),
#[cfg(feature = "tls")]
tls: Arc::new(Ok),
}
}
}

/// A TCP socket that has not yet been converted to a `TcpStream` or
/// `TcpListener`.
///
Expand Down
Loading