Skip to content

Commit

Permalink
basic wrappers for sendmmsg/recvmmsg (#823)
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed May 8, 2022
1 parent 238b8f4 commit 204de7a
Show file tree
Hide file tree
Showing 5 changed files with 648 additions and 84 deletions.
171 changes: 165 additions & 6 deletions crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{
io,
mem,
io, mem,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
os::unix::io::{AsRawFd, RawFd},
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::{self, Poll},
};

use log::{error, warn};
use log::{debug, error, warn};
use pin_project::pin_project;
use socket2::{Domain, Protocol, Socket, Type};
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket},
Expand All @@ -18,8 +18,8 @@ use tokio_tfo::TfoStream;

use crate::net::{
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
AddrFamily,
ConnectOpts,
udp::{BatchRecvMessage, BatchSendMessage},
AddrFamily, ConnectOpts,
};

/// A `TcpStream` that supports TFO (TCP Fast Open)
Expand Down Expand Up @@ -241,3 +241,162 @@ pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) ->

Ok(socket)
}

static SUPPORT_BATCH_SEND_RECV_MSG: AtomicBool = AtomicBool::new(true);

fn recvmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchRecvMessage<'_>) -> io::Result<()> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };

let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
let sock_addr = unsafe { SockAddr::new(addr_storage, addr_len) };
hdr.msg_name = sock_addr.as_ptr() as *mut _;
hdr.msg_namelen = sock_addr.len() as _;

hdr.msg_iov = msg.data.as_ptr() as *mut _;
hdr.msg_iovlen = msg.data.len() as _;

let ret = unsafe { libc::recvmsg(sock.as_raw_fd(), &mut hdr as *mut _, 0) };
if ret < 0 {
return Err(io::Error::last_os_error());
}

msg.addr = sock_addr.as_socket().expect("SockAddr.as_socket");
msg.data_len = ret as usize;

Ok(())
}

pub fn batch_recvmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchRecvMessage<'_>]) -> io::Result<usize> {
if msgs.is_empty() {
return Ok(0);
}

if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
recvmsg_fallback(sock, &mut msgs[0])?;
return Ok(1);
}

let mut vec_msg_name = Vec::with_capacity(msgs.len());
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());

for msg in msgs.iter_mut() {
let mut hdr: libc::mmsghdr = unsafe { mem::zeroed() };

let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;

vec_msg_name.push(unsafe { SockAddr::new(addr_storage, addr_len) });
let sock_addr = vec_msg_name.last_mut().unwrap();
hdr.msg_hdr.msg_name = sock_addr.as_ptr() as *mut _;
hdr.msg_hdr.msg_namelen = sock_addr.len() as _;

hdr.msg_hdr.msg_iov = msg.data.as_ptr() as *mut _;
hdr.msg_hdr.msg_iovlen = msg.data.len() as _;

vec_msg_hdr.push(hdr);
}

let ret = unsafe {
libc::recvmmsg(
sock.as_raw_fd(),
vec_msg_hdr.as_mut_ptr(),
vec_msg_hdr.len() as _,
0,
ptr::null(),
)
};
if ret < 0 {
let err = io::Error::last_os_error();
if let Some(libc::ENOSYS) = err.raw_os_error() {
debug!("recvmmsg is not supported, fallback to recvmsg, error: {:?}", err);
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);

recvmsg_fallback(sock, &mut msgs[0])?;
return Ok(1);
}
return Err(err);
}

for idx in 0..ret as usize {
let msg = &mut msgs[idx];
let hdr = &vec_msg_hdr[idx];
let name = &vec_msg_name[idx];
msg.addr = name.as_socket().expect("SockAddr.as_socket");
msg.data_len = hdr.msg_len as usize;
}

Ok(ret as usize)
}

fn sendmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchSendMessage<'_>) -> io::Result<()> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };

let sock_addr = msg.addr.map(SockAddr::from);
if let Some(ref sa) = sock_addr {
hdr.msg_name = sa.as_ptr() as *mut _;
hdr.msg_namelen = sa.len() as _;
}

hdr.msg_iov = msg.data.as_ptr() as *mut _;
hdr.msg_iovlen = msg.data.len() as _;

let ret = unsafe { libc::sendmsg(sock.as_raw_fd(), &hdr as *const _, 0) };
if ret < 0 {
return Err(io::Error::last_os_error());
}
msg.data_len = ret as usize;

Ok(())
}

pub fn batch_sendmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchSendMessage<'_>]) -> io::Result<usize> {
if msgs.is_empty() {
return Ok(0);
}

if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
sendmsg_fallback(sock, &mut msgs[0])?;
return Ok(1);
}

let mut vec_msg_name = Vec::with_capacity(msgs.len());
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());

for msg in msgs.iter_mut() {
let mut hdr: libc::mmsghdr = unsafe { mem::zeroed() };

if let Some(addr) = msg.addr {
vec_msg_name.push(SockAddr::from(addr));
let sock_addr = vec_msg_name.last_mut().unwrap();
hdr.msg_hdr.msg_name = sock_addr.as_ptr() as *mut _;
hdr.msg_hdr.msg_namelen = sock_addr.len() as _;
}

hdr.msg_hdr.msg_iov = msg.data.as_ptr() as *mut _;
hdr.msg_hdr.msg_iovlen = msg.data.len() as _;

vec_msg_hdr.push(hdr);
}

let ret = unsafe { libc::sendmmsg(sock.as_raw_fd(), vec_msg_hdr.as_mut_ptr(), vec_msg_hdr.len() as _, 0) };
if ret < 0 {
let err = io::Error::last_os_error();
if let Some(libc::ENOSYS) = err.raw_os_error() {
debug!("sendmmsg is not supported, fallback to sendmsg, error: {:?}", err);
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);

sendmsg_fallback(sock, &mut msgs[0])?;
return Ok(1);
}
return Err(err);
}

for idx in 0..ret as usize {
let msg = &mut msgs[idx];
let hdr = &vec_msg_hdr[idx];
msg.data_len = hdr.msg_len as usize;
}

Ok(ret as usize)
}
175 changes: 173 additions & 2 deletions crates/shadowsocks/src/net/sys/unix/bsd/macos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::{
os::unix::io::{AsRawFd, RawFd},
pin::Pin,
ptr,
sync::atomic::{AtomicBool, Ordering},
task::{self, Poll},
};

use log::{error, warn};
use log::{debug, error, warn};
use pin_project::pin_project;
use socket2::{Domain, Protocol, Socket, Type};
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::{TcpSocket, TcpStream as TokioTcpStream, UdpSocket},
Expand All @@ -19,6 +20,7 @@ use tokio_tfo::TfoStream;

use crate::net::{
sys::{set_common_sockopt_after_connect, set_common_sockopt_for_connect, socket_bind_dual_stack},
udp::{BatchRecvMessage, BatchSendMessage},
AddrFamily,
ConnectOpts,
};
Expand Down Expand Up @@ -273,3 +275,172 @@ pub async fn create_outbound_udp_socket(af: AddrFamily, config: &ConnectOpts) ->

Ok(socket)
}

/// https://github.com/apple/darwin-xnu/blob/main/bsd/sys/socket.h
#[repr(C)]
struct msghdr_x {
msg_name: *mut libc::c_void, //< optional address
msg_namelen: libc::socklen_t, //< size of address
msg_iov: *mut libc::iovec, //< scatter/gather array
msg_iovlen: libc::c_int, //< # elements in msg_iov
msg_control: *mut libc::c_void, //< ancillary data, see below
msg_controllen: libc::socklen_t, //< ancillary data buffer len
msg_flags: libc::c_int, //< flags on received message
msg_datalen: libc::size_t, //< byte length of buffer in msg_iov
}

extern "C" {
fn recvmsg_x(s: libc::c_int, msgp: *const msghdr_x, cnt: libc::c_uint, flags: libc::c_int) -> libc::ssize_t;
fn sendmsg_x(s: libc::c_int, msgp: *const msghdr_x, cnt: libc::c_uint, flags: libc::c_int) -> libc::ssize_t;
}

static SUPPORT_BATCH_SEND_RECV_MSG: AtomicBool = AtomicBool::new(true);

fn recvmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchRecvMessage<'_>) -> io::Result<()> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };

let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;
let sock_addr = unsafe { SockAddr::new(addr_storage, addr_len) };
hdr.msg_name = sock_addr.as_ptr() as *mut _;
hdr.msg_namelen = sock_addr.len() as _;

hdr.msg_iov = msg.data.as_ptr() as *mut _;
hdr.msg_iovlen = msg.data.len() as _;

let ret = unsafe { libc::recvmsg(sock.as_raw_fd(), &mut hdr as *mut _, 0) };
if ret < 0 {
return Err(io::Error::last_os_error());
}

msg.addr = sock_addr.as_socket().expect("SockAddr.as_socket");
msg.data_len = ret as usize;

Ok(())
}

pub fn batch_recvmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchRecvMessage<'_>]) -> io::Result<usize> {
if msgs.is_empty() {
return Ok(0);
}

if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
recvmsg_fallback(sock, &mut msgs[0])?;
return Ok(1);
}

let mut vec_msg_name = Vec::with_capacity(msgs.len());
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());

for msg in msgs.iter_mut() {
let mut hdr: msghdr_x = unsafe { mem::zeroed() };

let addr_storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
let addr_len = mem::size_of_val(&addr_storage) as libc::socklen_t;

vec_msg_name.push(unsafe { SockAddr::new(addr_storage, addr_len) });
let sock_addr = vec_msg_name.last_mut().unwrap();
hdr.msg_name = sock_addr.as_ptr() as *mut _;
hdr.msg_namelen = sock_addr.len() as _;

hdr.msg_iov = msg.data.as_ptr() as *mut _;
hdr.msg_iovlen = msg.data.len() as _;

vec_msg_hdr.push(hdr);
}

let ret = unsafe { recvmsg_x(sock.as_raw_fd(), vec_msg_hdr.as_ptr(), vec_msg_hdr.len() as _, 0) };
if ret < 0 {
let err = io::Error::last_os_error();
if let Some(libc::ENOSYS) = err.raw_os_error() {
debug!("recvmsg_x is not supported, fallback to recvmsg, error: {:?}", err);
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);

recvmsg_fallback(sock, &mut msgs[0])?;
return Ok(1);
}
return Err(err);
}

for idx in 0..ret as usize {
let msg = &mut msgs[idx];
let hdr = &vec_msg_hdr[idx];
let name = &vec_msg_name[idx];
msg.addr = name.as_socket().expect("SockAddr.as_socket");
msg.data_len = hdr.msg_datalen as usize;
}

Ok(ret as usize)
}

fn sendmsg_fallback<S: AsRawFd>(sock: &S, msg: &mut BatchSendMessage<'_>) -> io::Result<()> {
let mut hdr: libc::msghdr = unsafe { mem::zeroed() };

let sock_addr = msg.addr.map(SockAddr::from);
if let Some(ref sa) = sock_addr {
hdr.msg_name = sa.as_ptr() as *mut _;
hdr.msg_namelen = sa.len() as _;
}

hdr.msg_iov = msg.data.as_ptr() as *mut _;
hdr.msg_iovlen = msg.data.len() as _;

let ret = unsafe { libc::sendmsg(sock.as_raw_fd(), &hdr as *const _, 0) };
if ret < 0 {
return Err(io::Error::last_os_error());
}
msg.data_len = ret as usize;

Ok(())
}

pub fn batch_sendmsg<S: AsRawFd>(sock: &S, msgs: &mut [BatchSendMessage<'_>]) -> io::Result<usize> {
if msgs.is_empty() {
return Ok(0);
}

if !SUPPORT_BATCH_SEND_RECV_MSG.load(Ordering::Acquire) {
sendmsg_fallback(sock, &mut msgs[0])?;
return Ok(1);
}

let mut vec_msg_name = Vec::with_capacity(msgs.len());
let mut vec_msg_hdr = Vec::with_capacity(msgs.len());

for msg in msgs.iter_mut() {
let mut hdr: msghdr_x = unsafe { mem::zeroed() };

if let Some(addr) = msg.addr {
vec_msg_name.push(SockAddr::from(addr));
let sock_addr = vec_msg_name.last_mut().unwrap();
hdr.msg_name = sock_addr.as_ptr() as *mut _;
hdr.msg_namelen = sock_addr.len() as _;
}

hdr.msg_iov = msg.data.as_ptr() as *mut _;
hdr.msg_iovlen = msg.data.len() as _;

vec_msg_hdr.push(hdr);
}

let ret = unsafe { sendmsg_x(sock.as_raw_fd(), vec_msg_hdr.as_ptr(), vec_msg_hdr.len() as _, 0) };
if ret < 0 {
let err = io::Error::last_os_error();
if let Some(libc::ENOSYS) = err.raw_os_error() {
debug!("sendmsg_x is not supported, fallback to sendmsg, error: {:?}", err);
SUPPORT_BATCH_SEND_RECV_MSG.store(false, Ordering::Release);

sendmsg_fallback(sock, &mut msgs[0])?;
return Ok(1);
}
return Err(err);
}

for idx in 0..ret as usize {
let msg = &mut msgs[idx];
let hdr = &vec_msg_hdr[idx];
msg.data_len = hdr.msg_datalen as usize;
}

Ok(ret as usize)
}
Loading

0 comments on commit 204de7a

Please sign in to comment.