diff --git a/lib/rs/Cargo.toml b/lib/rs/Cargo.toml index 3cdabf68..caed8752 100644 --- a/lib/rs/Cargo.toml +++ b/lib/rs/Cargo.toml @@ -13,4 +13,5 @@ license = "MIT" podio = "0.1" log = "0" ordered-float = "0" +byteorder = "0.4.2" diff --git a/lib/rs/src/lib.rs b/lib/rs/src/lib.rs index 8b000f30..af34785f 100644 --- a/lib/rs/src/lib.rs +++ b/lib/rs/src/lib.rs @@ -1,6 +1,7 @@ #![recursion_limit="1024"] extern crate podio; extern crate ordered_float; +extern crate byteorder; #[macro_use] extern crate log; diff --git a/lib/rs/src/transport/mod.rs b/lib/rs/src/transport/mod.rs index 91a3a264..70ad58c9 100644 --- a/lib/rs/src/transport/mod.rs +++ b/lib/rs/src/transport/mod.rs @@ -19,6 +19,8 @@ use std::io::{self, Read, Write}; +use byteorder::{ByteOrder, BigEndian}; + pub mod server; pub trait Transport: Write + Read { } @@ -26,15 +28,15 @@ pub trait Transport: Write + Read { } impl<'t, T> Transport for &'t mut T where T: Transport {} impl<'t> Transport for &'t mut Transport {} -pub struct RwTransport(pub Rw); +pub struct RwTransport(pub T); -impl Read for RwTransport { +impl Read for RwTransport { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } } -impl Write for RwTransport { +impl Write for RwTransport { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } @@ -44,5 +46,68 @@ impl Write for RwTransport { } } -impl Transport for RwTransport { } +impl Transport for RwTransport { } + +pub struct FramedTransport { + pub transport: T, + tx_buffer: Vec, + flushed: bool, +} + +impl FramedTransport { + pub fn new(transport: T) -> FramedTransport { + FramedTransport { + transport: transport, + tx_buffer: Vec::::new(), + flushed: true, + } + } + + fn flush(&mut self) -> &Self { + self.tx_buffer.clear(); + self.flushed = true; + self + } +} + +impl Read for FramedTransport { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + + loop { + if self.flushed { + // read the size of the frame + let mut b = [0; 4]; + if let Ok(_) = self.transport.read_exact(&mut b) { + self.flushed = false; + } + } else { + break; + } + } + + self.transport.read(buf) + } +} + +impl Write for FramedTransport { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.tx_buffer.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + // write all buffered data to sink + let mut size = [0; 4]; + BigEndian::write_u32(&mut size, (self.tx_buffer.len() as u32 + 0)); + try!(self.transport.write(&size)); + try!(self.transport.write(&self.tx_buffer)); + + // flush self + self.flush(); + + // flush the sink + self.transport.flush() + } +} +impl Transport for FramedTransport { }