Skip to content

Commit

Permalink
add FramedTransport
Browse files Browse the repository at this point in the history
- add a FramedTransport using a Vec<u8> tx buffer
- switch RwTransport to use `T` notation
  • Loading branch information
brayniac committed Feb 16, 2016
1 parent 07f3bdb commit db90a18
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 4 deletions.
1 change: 1 addition & 0 deletions lib/rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ license = "MIT"
podio = "0.1"
log = "0"
ordered-float = "0"
byteorder = "0.4.2"

1 change: 1 addition & 0 deletions lib/rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![recursion_limit="1024"]
extern crate podio;
extern crate ordered_float;
extern crate byteorder;

#[macro_use]
extern crate log;
Expand Down
73 changes: 69 additions & 4 deletions lib/rs/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,24 @@

use std::io::{self, Read, Write};

use byteorder::{ByteOrder, BigEndian};

pub mod server;

pub trait Transport: Write + Read { }

impl<'t, T> Transport for &'t mut T where T: Transport {}
impl<'t> Transport for &'t mut Transport {}

pub struct RwTransport<Rw>(pub Rw);
pub struct RwTransport<T>(pub T);

impl<R: Read> Read for RwTransport<R> {
impl<T: Read> Read for RwTransport<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}

impl<W: Write> Write for RwTransport<W> {
impl<T: Write> Write for RwTransport<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
Expand All @@ -44,5 +46,68 @@ impl<W: Write> Write for RwTransport<W> {
}
}

impl<Rw: Read + Write> Transport for RwTransport<Rw> { }
impl<T: Read + Write> Transport for RwTransport<T> { }

pub struct FramedTransport<T> {
pub transport: T,
tx_buffer: Vec<u8>,
flushed: bool,
}

impl<T> FramedTransport<T> {
pub fn new(transport: T) -> FramedTransport<T> {
FramedTransport {
transport: transport,
tx_buffer: Vec::<u8>::new(),
flushed: true,
}
}

fn flush(&mut self) -> &Self {
self.tx_buffer.clear();
self.flushed = true;
self
}
}

impl<T: Read> Read for FramedTransport<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {

loop {
if self.flushed {
// read the size of the frame
let mut b = [0; 4];
if let Ok(_) = self.transport.read_exact(&mut b) {
self.flushed = false;
}
} else {
break;
}
}

self.transport.read(buf)
}
}

impl<T: Write> Write for FramedTransport<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.tx_buffer.extend_from_slice(buf);
Ok(buf.len())
}

fn flush(&mut self) -> io::Result<()> {
// write all buffered data to sink
let mut size = [0; 4];
BigEndian::write_i32(&mut size, self.tx_buffer.len() as i32);
try!(self.transport.write(&size));
try!(self.transport.write(&self.tx_buffer));

// flush self
self.flush();

// flush the sink
self.transport.flush()
}
}

impl<T: Read + Write> Transport for FramedTransport<T> { }

0 comments on commit db90a18

Please sign in to comment.