Skip to content

Commit

Permalink
Add basic numpy conversions
Browse files Browse the repository at this point in the history
  • Loading branch information
Kotochleb committed Nov 26, 2024
1 parent a98c7ee commit 1e7d84c
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ endif()
#
# Install
#

# Install Python modules
ament_python_install_package(${PROJECT_NAME}_py)

install(
TARGETS ${PROJECT_NAME}_conversion
EXPORT export_${PROJECT_NAME}
Expand Down
Empty file.
54 changes: 54 additions & 0 deletions linear_feedback_controller_msgs_py/lfc_py_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Annotated, List, Literal
from dataclasses import dataclass
import numpy as np
import numpy.typing as npt

np_array6 = Annotated[npt.NDArray[np.float64], Literal[6]]
np_array7 = Annotated[npt.NDArray[np.float64], Literal[7]]


@dataclass
class JointState:
"""Structure containing JointState information similarly to ROS message
sensor_msgs.msg.JointState.
"""

name: str
position: npt.NDArray[np.float64]
velocity: npt.NDArray[np.float64]
effort: npt.NDArray[np.float64]


@dataclass
class Contact:
"""Structure containing Contact information similarly to ROS message
linear_feedback_controller_msgs.msg.Contact.
"""

active: bool
name: str
wrench: np_array6
pose: np_array7


@dataclass
class Sensor:
"""Structure containing Sensor information similarly to ROS message
linear_feedback_controller_msgs.msg.Sensor.
"""

base_pose: np_array7
base_twist: np_array6
joint_state: JointState
contacts: List[Contact]


@dataclass
class Control:
"""Structure containing Control information similarly to ROS message
linear_feedback_controller_msgs.msg.Control.
"""

feedback_gain: npt.NDArray[np.float64]
feedforward: npt.NDArray[np.float64]
initial_state: Sensor
327 changes: 327 additions & 0 deletions linear_feedback_controller_msgs_py/numpy_conversions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
import numpy as np
import numpy.typing as npt
from typing import Annotated, Literal

from std_msgs.msg import Float64MultiArray, MultiArrayDimension

from geometry_msgs.msg import Pose, Point, Quaternion, Twist, Vector3, Wrench
from sensor_msgs.msg import JointState

import lfc_py_types

from linear_feedback_controller_msgs.msg import Contact, Control, Sensor

np_array3 = Annotated[npt.NDArray[np.float64], Literal[3]]
np_array6 = Annotated[npt.NDArray[np.float64], Literal[6]]
np_array7 = Annotated[npt.NDArray[np.float64], Literal[7]]


def vector3_numpy_to_msg(input: np_array3) -> Vector3:
"""Converts Numpy array of shape (3,) to ROS Vector3 message.
Expected order of axes is (x, y, z).
Args:
input (npt.NDArray[np.float64], Literal[3]): Input vector as Numpy array.
Returns:
geometry_msgs.msg.Vector3: Vector represented as a ROS message.
"""
assert len(input.shape) == 3, "Input vector is not of a length 3."
return Vector3(x=input[0], y=input[1], z=input[2])


def pose_numpy_to_msg(pose: np_array7) -> Pose:
"""Converts Numpy array of shape (7,) to ROS Pose message.
Expected order of axes is (position.x, position.y, position.z, orientation.x,
orientation.y, orientation.z, orientation.w).
Args:
pose (npt.NDArray[np.float64], Literal[7]): Input pose as Numpy array.
Returns:
geometry_msgs.msg.Pose: Pose represented as a ROS message.
"""
assert len(pose.shape) == 7, "Input pose is not of a length 7."
return Wrench(
position=Point(x=pose[0], y=pose[1], z=pose[2]),
orientation=Quaternion(x=pose[3], y=pose[4], z=pose[5], w=pose[6]),
)


def wrench_numpy_to_msg(wrench: np_array6) -> Wrench:
"""Converts Numpy array of shape (6,) to ROS Wrench message.
Expected order of axes is (force.x, force.y, force.z, torque.x,
torque.y, torque.z).
Args:
wrench (npt.NDArray[np.float64], Literal[6]): Input wrench as Numpy array.
Returns:
geometry_msgs.msg.Wrench: Wrench represented as a ROS message.
"""
assert len(wrench.shape) == 6, "Input wrench is not of a length 6."
return Wrench(
force=vector3_numpy_to_msg(wrench[:3]),
torque=vector3_numpy_to_msg(wrench[3:]),
)


def twist_numpy_to_msg(twist: np_array6) -> Twist:
"""Converts Numpy array of shape (6,) to ROS Twist message.
Expected order of axes is (linear.x, linear.y, linear.z, angular.x,
angular.y, angular.z).
Args:
twist (npt.NDArray[np.float64], Literal[6]): Input twist as Numpy array.
Returns:
geometry_msgs.msg.Twist: Twist represented as a ROS message.
"""
assert len(twist.shape) == 6, "Input twist is not of a length 6."
return Twist(
linear=vector3_numpy_to_msg(twist[:3]),
angular=vector3_numpy_to_msg(twist[3:]),
)


def pose_msg_to_numpy(msg: Pose) -> np_array7:
"""Converts ROS Pose message into Numpy array of shape (7,).
Output order is (position.x, position.y, position.z, orientation.x,
orientation.y, orientation.z, orientation.w).
Args:
msg (geometry_msgs.msg.Pose): Input ROS Pose message.
Returns:
npt.NDArray[np.float64], Literal[7]: Numpy array of shape (7,)
with position and orientation vectors concatenated.
"""
return np.array(
[
msg.position.x,
msg.position.y,
msg.position.z,
msg.orientation.x,
msg.orientation.y,
msg.orientation.z,
msg.orientation.w,
]
)


def wrench_msg_to_numpy(msg: Wrench) -> np_array6:
"""Converts ROS Wrench message into Numpy array of shape (6,).
Output order is (force.x, force.y, force.z, torque.x, torque.y, torque.z).
Args:
msg (geometry_msgs.msg.Wrench): Input ROS Wrench message.
Returns:
npt.NDArray[np.float64], Literal[6]: Numpy array of shape (6,) with force and torque vector concatenated.
"""
return np.array(
[
msg.force.x,
msg.force.y,
msg.force.z,
msg.torque.x,
msg.torque.y,
msg.torque.z,
]
)


def twist_msg_to_numpy(msg: Twist) -> np_array6:
"""Converts ROS Twist message into Numpy array of shape (6,).
Output order is (linear.x, linear.y, linear.z, angular.x,
angular.y, angular.z).
Args:
msg (geometry_msgs.msg.Pose): Input ROS Twist message.
Returns:
npt.NDArray[np.float64], Literal[6]: Numpy array of shape (6,)
with linear and angular vectors concatenated.
"""
return np.array(
[
msg.linear.x,
msg.linear.y,
msg.linear.z,
msg.angular.x,
msg.angular.y,
msg.angular.z,
]
)


def matrix_numpy_to_msg(matrix: npt.NDArray[np.float64]) -> Float64MultiArray:
"""Converts Numpy array into ROS array message.
Args:
matrix (npt.NDArray[np.float64]]): Input matrix.
Returns:
Float64MultiArray: ROS message with the matrix.
"""
assert len(matrix.shape) == 2, "Input matrix is not 2D."

m = Float64MultiArray()
m.layout.data_offset = 0
m.layout.dim = [MultiArrayDimension(), MultiArrayDimension()]
m.layout.dim[0].label = "rows"
m.layout.dim[0].size = matrix.shape[0]
m.layout.dim[0].stride = matrix.size
m.layout.dim[1].label = "cols"
m.layout.dim[1].stride = matrix.shape[1]
m.layout.dim[1].size = matrix.shape[1]
# Flatten the matrix to a vector
m.data = matrix.reshape(-1).tolist()
return m


def matrix_msg_to_numpy(msg: Float64MultiArray) -> npt.NDArray[np.float64]:
"""Converts ROS array message into numpy array of a shape [nx, nu] with Riccati gains.
Args:
msg (Float64MultiArray): Input ROS message with array of Riccati gains.
Returns:
npt.NDArray[np.float64]: Output numpy matrix with Riccati gains.
"""
assert msg.layout.dim.size() == 2, "The ROS message must be a 2D matrix."
return np.array(msg.data).reshape(msg.layout.dim[0].size, msg.layout.dim[1].size)


def joint_state_msg_to_numpy(msg: JointState) -> lfc_py_types.JointState:
"""Converts ROS JointState message into internal LFC JointState class.
Args:
msg (sensor_msgs.msg.JointState): Input ROS message.
Returns:
lfc_py_types.JointState: Output LFC representation of JointState.
"""
return lfc_py_types.JointState(
name=msg.name,
position=np.array(msg.position),
velocity=np.array(msg.velocity),
effort=np.array(msg.effort),
)


def contact_msg_to_numpy(msg: Contact) -> lfc_py_types.Contact:
"""Converts ROS Contact message into internal LFC Contact class.
Args:
msg (linear_feedback_controller_msgs.msg.Contact): Input ROS message.
Returns:
lfc_py_types.Contact: Output LFC representation of Contact.
"""
return lfc_py_types.Contact(
active=msg.active,
name=msg.name,
wrench=wrench_msg_to_numpy(msg.wrench),
pose=pose_msg_to_numpy(msg.pose),
)


def sensor_msg_to_numpy(msg: Sensor) -> lfc_py_types.Sensor:
"""Converts ROS Sensor message into internal LFC Sensor class.
Args:
msg (linear_feedback_controller_msgs.msg.Sensor): Input ROS message.
Returns:
lfc_py_types.Sensor: Output LFC representation of Sensor.
"""
return lfc_py_types.Sensor(
base_pose=pose_msg_to_numpy(msg.base_pose),
base_twist=twist_msg_to_numpy(msg.base_twist),
joint_state=joint_state_msg_to_numpy(msg.joint_state),
contacts=[contact_msg_to_numpy(contact) for contact in msg.contacts],
)


def control_msg_to_numpy(msg: Control) -> lfc_py_types.Control:
"""Converts ROS Control message into internal LFC Control class.
Args:
msg (linear_feedback_controller_msgs.msg.Control): Input ROS message.
Returns:
lfc_py_types.Control: Output LFC representation of Control.
"""
return lfc_py_types.Control(
feedback_gain=matrix_msg_to_numpy(msg.feedback_gain),
feedforward=matrix_msg_to_numpy(msg.feedforward),
initial_state=sensor_msg_to_numpy(msg.initial_state),
)


def joint_state_numpy_to_msg(input: lfc_py_types.JointState) -> JointState:
"""Converts internal LFC JointState class into ROS JointState message.
Args:
input (lfc_py_types.JointState): Input LFC representation of Control.
Returns:
sensor_msgs.msg..JointState: Output ROS message.
"""
return JointState(
name=input.name,
position=input.position,
velocity=input.velocity,
effort=input.effort,
)


def contact_numpy_to_msg(input: lfc_py_types.Contact) -> Contact:
"""Converts internal LFC Contact class into ROS Contact message.
Args:
input (lfc_py_types.Contact): Input LFC representation of Control.
Returns:
linear_feedback_controller_msgs.msg..Contact: Output ROS message.
"""
return Contact(
active=input.active,
name=input.name,
wrench=wrench_numpy_to_msg(input.wrench),
pose=pose_numpy_to_msg(input.pose),
)


def sensor_numpy_to_msg(input: lfc_py_types.Sensor) -> Sensor:
"""Converts internal LFC Sensor class into ROS Sensor message.
Args:
input (lfc_py_types.Sensor): Input LFC representation of Control.
Returns:
linear_feedback_controller_msgs.msg..Sensor: Output ROS message.
"""
return Sensor(
base_pose=pose_numpy_to_msg(input.base_pose),
base_twist=twist_numpy_to_msg(input.base_twist),
joint_state=joint_state_numpy_to_msg(input.joint_state),
contacts=[contact_numpy_to_msg(contact) for contact in input.contacts],
)


def control_numpy_to_msg(input: lfc_py_types.Control) -> Control:
"""Converts internal LFC Control class into ROS Control message.
Args:
input (lfc_py_types.Control): Input LFC representation of Control.
Returns:
linear_feedback_controller_msgs.msg..Control: Output ROS message.
"""
return Control(
feedback_gain=matrix_numpy_to_msg(input.feedback_gain),
feedforward=matrix_numpy_to_msg(input.feedforward),
initial_state=sensor_numpy_to_msg(input.initial_state),
)

0 comments on commit 1e7d84c

Please sign in to comment.