Skip to content

Commit

Permalink
[RSDK-9216] - allocate buffer for response message per RPC call (#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
gvaradarajan authored Nov 13, 2024
1 parent ca78046 commit c2a2baf
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 194 deletions.
46 changes: 14 additions & 32 deletions micro-rdk/src/common/camera/fake_camera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ use crate::{
status::{Status, StatusError},
},
google,
proto::component::camera::v1::GetImageResponse,
};
use bytes::BytesMut;
use prost::Message;
use bytes::Bytes;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
Expand Down Expand Up @@ -48,24 +46,8 @@ impl Default for FakeCamera {
}

impl Camera for FakeCamera {
fn get_image(&mut self, mut buffer: BytesMut) -> Result<BytesMut, CameraError> {
let msg = GetImageResponse {
mime_type: "image/jpeg".to_string(),
image: FAKE_JPEG.into(),
};
msg.encode(&mut buffer)
.map_err(|_| CameraError::CameraGenericError("failed to encode GetImageResponse"))?;
Ok(buffer)
}
fn render_frame(&mut self, mut buffer: BytesMut) -> Result<BytesMut, CameraError> {
let msg = google::api::HttpBody {
content_type: "image/jpeg".to_string(),
data: FAKE_JPEG.to_vec(),
..Default::default()
};
msg.encode(&mut buffer)
.map_err(|_| CameraError::CameraGenericError("failed to encode RenderFrameResponse"))?;
Ok(buffer)
fn get_image(&mut self) -> Result<Bytes, CameraError> {
Ok(FAKE_JPEG.into())
}
}

Expand Down Expand Up @@ -93,14 +75,13 @@ mod tests {
common::{
app_client::encode_request,
config::DynamicComponentConfig,
conn::server::{AsyncableTcpListener, Http2Connector},
exec::Executor,
grpc::GrpcError,
grpc::{GrpcBody, GrpcServer},
grpc::{GrpcBody, GrpcError, GrpcServer},
registry::ComponentRegistry,
robot::{LocalRobot, RobotError},
},
google::api::HttpBody,
native::tcp::{NativeListener, NativeStream},
native::tcp::NativeStream,
proto::component::camera::v1::{GetImageRequest, GetImageResponse, RenderFrameRequest},
};

Expand Down Expand Up @@ -130,25 +111,26 @@ mod tests {
attributes: None,
..Default::default()
}));
let mut registry: Box<ComponentRegistry> = Box::default();

robot.process_components(conf, Box::default())?;
robot.process_components(conf, &mut registry)?;

Ok(robot)
}

async fn setup_grpc_server(exec: Executor, addr: SocketAddr) {
let mut listener = NativeListener::new((addr).into(), None)
.unwrap()
.as_async_listener()
.await
.unwrap();
let tcp_server = TcpListener::bind(addr);
assert!(tcp_server.is_ok());
let tcp_server = tcp_server.unwrap();
let listener: async_io::Async<TcpListener> = tcp_server.try_into().unwrap();

let robot = Arc::new(Mutex::new(setup_robot().unwrap()));

loop {
let incoming = listener.accept().await;
assert!(incoming.is_ok());
let stream = incoming.unwrap();
let incoming = incoming.unwrap();
let stream: NativeStream = NativeStream::LocalPlain(incoming.0);
let srv = GrpcServer::new(robot.clone(), GrpcBody::new());
Box::new(http2::Builder::new(exec.clone()).serve_connection(stream, srv))
.await
Expand Down
52 changes: 21 additions & 31 deletions micro-rdk/src/common/camera/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{generic::DoCommand, registry::ComponentRegistry, status::Status};
use bytes::BytesMut;
use bytes::Bytes;
use prost::EncodeError;
use std::sync::{Arc, Mutex};
use thiserror::Error;
Expand Down Expand Up @@ -42,63 +42,53 @@ pub enum CameraError {
pub trait Camera: Status + DoCommand {
/// Returns a structured image response from a camera of the underlying robot.
/// A specific MIME type can be requested but may not necessarily be the same one returned
fn get_image(&mut self, _buffer: BytesMut) -> Result<BytesMut, CameraError> {
fn get_image(&mut self) -> Result<Bytes, CameraError> {
Err(CameraError::CameraMethodUnimplemented("get_image"))
}
fn get_images(&mut self, _buffer: BytesMut) -> Result<BytesMut, CameraError> {
fn get_images(&mut self) -> Result<Bytes, CameraError> {
Err(CameraError::CameraMethodUnimplemented("get_images"))
}
fn get_point_cloud(&mut self, _buffer: BytesMut) -> Result<BytesMut, CameraError> {
fn get_point_cloud(&mut self) -> Result<Bytes, CameraError> {
Err(CameraError::CameraMethodUnimplemented("get_point_cloud"))
}
/// Returns the camera intrinsic parameters and camera distortion parameters
fn get_properties(&mut self, _buffer: BytesMut) -> Result<BytesMut, CameraError> {
fn get_properties(&mut self) -> Result<Bytes, CameraError> {
Err(CameraError::CameraMethodUnimplemented("get_properties"))
}
/// Deprecated, use `get_image` instead.
fn render_frame(&mut self, _buffer: BytesMut) -> Result<BytesMut, CameraError> {
Err(CameraError::CameraMethodUnimplemented("render_frame"))
}
}

impl<L> Camera for Mutex<L>
where
L: ?Sized + Camera,
{
fn get_image(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.get_mut().unwrap().get_image(buffer)
}
fn get_images(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.get_mut().unwrap().get_images(buffer)
fn get_image(&mut self) -> Result<Bytes, CameraError> {
self.get_mut().unwrap().get_image()
}
fn get_point_cloud(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.get_mut().unwrap().get_point_cloud(buffer)
fn get_images(&mut self) -> Result<Bytes, CameraError> {
self.get_mut().unwrap().get_images()
}
fn get_properties(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.get_mut().unwrap().get_properties(buffer)
fn get_point_cloud(&mut self) -> Result<Bytes, CameraError> {
self.get_mut().unwrap().get_point_cloud()
}
fn render_frame(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.get_mut().unwrap().render_frame(buffer)
fn get_properties(&mut self) -> Result<Bytes, CameraError> {
self.get_mut().unwrap().get_properties()
}
}

impl<L> Camera for Arc<Mutex<L>>
where
L: ?Sized + Camera,
{
fn get_image(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().get_image(buffer)
}
fn get_images(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().get_images(buffer)
fn get_image(&mut self) -> Result<Bytes, CameraError> {
self.lock().unwrap().get_image()
}
fn get_point_cloud(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().get_point_cloud(buffer)
fn get_images(&mut self) -> Result<Bytes, CameraError> {
self.lock().unwrap().get_images()
}
fn get_properties(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().get_properties(buffer)
fn get_point_cloud(&mut self) -> Result<Bytes, CameraError> {
self.lock().unwrap().get_point_cloud()
}
fn render_frame(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().render_frame(buffer)
fn get_properties(&mut self) -> Result<Bytes, CameraError> {
self.lock().unwrap().get_properties()
}
}
Loading

0 comments on commit c2a2baf

Please sign in to comment.