diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f5e8f13 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +# Byte-compiled / optimized / DLL files +**/__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Linter cache +*.ruff_cache + +# Colcon custom files +COLCON_IGNORE +AMENT_IGNORE + +# VS Code +.vscode diff --git a/CMakeLists.txt b/CMakeLists.txt index 4551647..bf0aecb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,11 +43,28 @@ if(BUILD_TESTING) ament_add_gtest(test_eigen_conversions tests/test_eigen_conversions.cpp tests/gtest_main.cpp) target_link_libraries(test_eigen_conversions ${PROJECT_NAME}_conversion) + + find_package(ament_cmake_pytest REQUIRED) + set(_pytest_tests + tests/test_numpy_conversions.py + ) + foreach(_test_path ${_pytest_tests}) + get_filename_component(_test_name ${_test_path} NAME_WE) + ament_add_pytest_test(${_test_name} ${_test_path} + APPEND_ENV PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR} + TIMEOUT 60 + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} + ) + endforeach() endif() # # Install # + +# Install Python modules +ament_python_install_package(${PROJECT_NAME}_py) + install( TARGETS ${PROJECT_NAME}_conversion EXPORT export_${PROJECT_NAME} diff --git a/linear_feedback_controller_msgs_py/__init__.py b/linear_feedback_controller_msgs_py/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/linear_feedback_controller_msgs_py/lfc_py_types.py b/linear_feedback_controller_msgs_py/lfc_py_types.py new file mode 100644 index 0000000..12d9ccc --- /dev/null +++ b/linear_feedback_controller_msgs_py/lfc_py_types.py @@ -0,0 +1,54 @@ +from typing import Annotated, List, Literal +from dataclasses import dataclass +import numpy as np +import numpy.typing as npt + +np_array6 = Annotated[npt.NDArray[np.float64], Literal[6]] +np_array7 = Annotated[npt.NDArray[np.float64], Literal[7]] + + +@dataclass +class JointState: + """Structure containing JointState information similarly to ROS message + sensor_msgs.msg.JointState. + """ + + name: List[str] + position: npt.NDArray[np.float64] + velocity: npt.NDArray[np.float64] + effort: npt.NDArray[np.float64] + + +@dataclass +class Contact: + """Structure containing Contact information similarly to ROS message + linear_feedback_controller_msgs.msg.Contact. + """ + + active: bool + name: str + wrench: np_array6 + pose: np_array7 + + +@dataclass +class Sensor: + """Structure containing Sensor information similarly to ROS message + linear_feedback_controller_msgs.msg.Sensor. + """ + + base_pose: np_array7 + base_twist: np_array6 + joint_state: JointState + contacts: List[Contact] + + +@dataclass +class Control: + """Structure containing Control information similarly to ROS message + linear_feedback_controller_msgs.msg.Control. + """ + + feedback_gain: npt.NDArray[np.float64] + feedforward: npt.NDArray[np.float64] + initial_state: Sensor diff --git a/linear_feedback_controller_msgs_py/numpy_conversions.py b/linear_feedback_controller_msgs_py/numpy_conversions.py new file mode 100644 index 0000000..69e1885 --- /dev/null +++ b/linear_feedback_controller_msgs_py/numpy_conversions.py @@ -0,0 +1,350 @@ +import numpy as np +import numpy.typing as npt +from typing import Annotated, Literal + +from std_msgs.msg import Float64MultiArray, MultiArrayDimension + +from geometry_msgs.msg import Pose, Point, Quaternion, Twist, Vector3, Wrench +from sensor_msgs.msg import JointState + +import linear_feedback_controller_msgs_py.lfc_py_types as lfc_py_types + +from linear_feedback_controller_msgs.msg import Contact, Control, Sensor + +np_array3 = Annotated[npt.NDArray[np.float64], Literal[3]] +np_array6 = Annotated[npt.NDArray[np.float64], Literal[6]] +np_array7 = Annotated[npt.NDArray[np.float64], Literal[7]] + + +def vector3_numpy_to_msg(input: np_array3) -> Vector3: + """Converts Numpy array of shape (3,) to ROS Vector3 message. + Expected order of axes is (x, y, z). + + Args: + input (npt.NDArray[np.float64], Literal[3]): Input vector as Numpy array. + + Returns: + geometry_msgs.msg.Vector3: Vector represented as a ROS message. + """ + assert ( + input.ndim == 1 + ), f"Input vector has '{input.ndim}' dimensions, expected dimension size of 1!" + assert ( + input.size == 3 + ), f"Input vector has length of '{input.size}', expected length '3'!" + return Vector3(x=input[0], y=input[1], z=input[2]) + + +def pose_numpy_to_msg(input: np_array7) -> Pose: + """Converts Numpy array of shape (7,) to ROS Pose message. + Expected order of axes is (position.x, position.y, position.z, orientation.x, + orientation.y, orientation.z, orientation.w). + + Args: + input (npt.NDArray[np.float64], Literal[7]): Input pose as Numpy array. + + Returns: + geometry_msgs.msg.Pose: Pose represented as a ROS message. + """ + assert ( + input.ndim == 1 + ), f"Input vector has '{input.ndim}' dimensions, expected dimension size of 1" + assert ( + input.size == 7 + ), f"Input vector has length of '{input.size}', expected length '7'!" + return Pose( + position=Point(x=input[0], y=input[1], z=input[2]), + orientation=Quaternion(x=input[3], y=input[4], z=input[5], w=input[6]), + ) + + +def wrench_numpy_to_msg(input: np_array6) -> Wrench: + """Converts Numpy array of shape (6,) to ROS Wrench message. + Expected order of axes is (force.x, force.y, force.z, torque.x, + torque.y, torque.z). + + Args: + input (npt.NDArray[np.float64], Literal[6]): Input wrench as Numpy array. + + Returns: + geometry_msgs.msg.Wrench: Wrench represented as a ROS message. + """ + assert ( + input.ndim == 1 + ), f"Input vector has '{input.ndim}' dimensions, expected dimension size of 1" + assert ( + input.size == 6 + ), f"Input vector has length of '{input.size}', expected length '6'!" + return Wrench( + force=vector3_numpy_to_msg(input[:3]), + torque=vector3_numpy_to_msg(input[3:]), + ) + + +def twist_numpy_to_msg(input: np_array6) -> Twist: + """Converts Numpy array of shape (6,) to ROS Twist message. + Expected order of axes is (linear.x, linear.y, linear.z, angular.x, + angular.y, angular.z). + + Args: + input (npt.NDArray[np.float64], Literal[6]): Input twist as Numpy array. + + Returns: + geometry_msgs.msg.Twist: Twist represented as a ROS message. + """ + assert ( + input.ndim == 1 + ), f"Input vector has '{input.ndim}' dimensions, expected dimension size of 1" + assert ( + input.size == 6 + ), f"Input vector has length of '{input.size}', expected length ''6!" + return Twist( + linear=vector3_numpy_to_msg(input[:3]), + angular=vector3_numpy_to_msg(input[3:]), + ) + + +def pose_msg_to_numpy(msg: Pose) -> np_array7: + """Converts ROS Pose message into Numpy array of shape (7,). + Output order is (position.x, position.y, position.z, orientation.x, + orientation.y, orientation.z, orientation.w). + + Args: + msg (geometry_msgs.msg.Pose): Input ROS Pose message. + + Returns: + npt.NDArray[np.float64], Literal[7]: Numpy array of shape (7,) + with position and orientation vectors concatenated. + """ + return np.array( + [ + msg.position.x, + msg.position.y, + msg.position.z, + msg.orientation.x, + msg.orientation.y, + msg.orientation.z, + msg.orientation.w, + ] + ) + + +def wrench_msg_to_numpy(msg: Wrench) -> np_array6: + """Converts ROS Wrench message into Numpy array of shape (6,). + Output order is (force.x, force.y, force.z, torque.x, torque.y, torque.z). + + Args: + msg (geometry_msgs.msg.Wrench): Input ROS Wrench message. + + Returns: + npt.NDArray[np.float64], Literal[6]: Numpy array of shape (6,) + with force and torque vector concatenated. + """ + return np.array( + [ + msg.force.x, + msg.force.y, + msg.force.z, + msg.torque.x, + msg.torque.y, + msg.torque.z, + ] + ) + + +def twist_msg_to_numpy(msg: Twist) -> np_array6: + """Converts ROS Twist message into Numpy array of shape (6,). + Output order is (linear.x, linear.y, linear.z, angular.x, + angular.y, angular.z). + + Args: + msg (geometry_msgs.msg.Pose): Input ROS Twist message. + + Returns: + npt.NDArray[np.float64], Literal[6]: Numpy array of shape (6,) + with linear and angular vectors concatenated. + """ + return np.array( + [ + msg.linear.x, + msg.linear.y, + msg.linear.z, + msg.angular.x, + msg.angular.y, + msg.angular.z, + ] + ) + + +def matrix_numpy_to_msg(input: npt.NDArray[np.float64]) -> Float64MultiArray: + """Converts Numpy array into ROS array message. + + Args: + input (npt.NDArray[np.float64]]): Input matrix. + + Returns: + std_msgs.msg.Float64MultiArray: ROS message with the matrix. + """ + assert ( + input.ndim == 2 + ), f"Input matrix is dimension '{input.ndim}'. EXpected 2D matrix!" + + m = Float64MultiArray() + m.layout.data_offset = 0 + m.layout.dim = [MultiArrayDimension(), MultiArrayDimension()] + m.layout.dim[0].label = "rows" + m.layout.dim[0].size = input.shape[0] + m.layout.dim[0].stride = input.size + m.layout.dim[1].label = "cols" + m.layout.dim[1].stride = input.shape[1] + m.layout.dim[1].size = input.shape[1] + # Flatten the matrix to a vector + m.data = input.reshape(-1).tolist() + return m + + +def matrix_msg_to_numpy(msg: Float64MultiArray) -> npt.NDArray[np.float64]: + """Converts ROS array message into numpy array. + + Args: + msg (std_msgs.msg.Float64MultiArray): Input ROS message with array. + + Returns: + npt.NDArray[np.float64]: Output numpy matrix. + """ + assert len(msg.layout.dim) == 2, "The ROS message must be a 2D matrix." + return np.array(msg.data).reshape(msg.layout.dim[0].size, msg.layout.dim[1].size) + + +def joint_state_msg_to_numpy(msg: JointState) -> lfc_py_types.JointState: + """Converts ROS JointState message into internal LFC JointState class. + + Args: + msg (sensor_msgs.msg.JointState): Input ROS message. + + Returns: + lfc_py_types.JointState: Output LFC representation of JointState. + """ + return lfc_py_types.JointState( + name=msg.name, + position=np.array(msg.position), + velocity=np.array(msg.velocity), + effort=np.array(msg.effort), + ) + + +def contact_msg_to_numpy(msg: Contact) -> lfc_py_types.Contact: + """Converts ROS Contact message into internal LFC Contact class. + + Args: + msg (linear_feedback_controller_msgs.msg.Contact): Input ROS message. + + Returns: + lfc_py_types.Contact: Output LFC representation of Contact. + """ + return lfc_py_types.Contact( + active=msg.active, + name=msg.name, + wrench=wrench_msg_to_numpy(msg.wrench), + pose=pose_msg_to_numpy(msg.pose), + ) + + +def sensor_msg_to_numpy(msg: Sensor) -> lfc_py_types.Sensor: + """Converts ROS Sensor message into internal LFC Sensor class. + + Args: + msg (linear_feedback_controller_msgs.msg.Sensor): Input ROS message. + + Returns: + lfc_py_types.Sensor: Output LFC representation of Sensor. + """ + return lfc_py_types.Sensor( + base_pose=pose_msg_to_numpy(msg.base_pose), + base_twist=twist_msg_to_numpy(msg.base_twist), + joint_state=joint_state_msg_to_numpy(msg.joint_state), + contacts=[contact_msg_to_numpy(contact) for contact in msg.contacts], + ) + + +def control_msg_to_numpy(msg: Control) -> lfc_py_types.Control: + """Converts ROS Control message into internal LFC Control class. + + Args: + msg (linear_feedback_controller_msgs.msg.Control): Input ROS message. + + Returns: + lfc_py_types.Control: Output LFC representation of Control. + """ + return lfc_py_types.Control( + feedback_gain=matrix_msg_to_numpy(msg.feedback_gain), + feedforward=matrix_msg_to_numpy(msg.feedforward), + initial_state=sensor_msg_to_numpy(msg.initial_state), + ) + + +def joint_state_numpy_to_msg(input: lfc_py_types.JointState) -> JointState: + """Converts internal LFC JointState class into ROS JointState message. + + Args: + input (lfc_py_types.JointState): Input LFC representation of Control. + + Returns: + sensor_msgs.msg.JointState: Output ROS message. + """ + return JointState( + name=input.name, + position=input.position, + velocity=input.velocity, + effort=input.effort, + ) + + +def contact_numpy_to_msg(input: lfc_py_types.Contact) -> Contact: + """Converts internal LFC Contact class into ROS Contact message. + + Args: + input (lfc_py_types.Contact): Input LFC representation of Control. + + Returns: + linear_feedback_controller_msgs.msg.Contact: Output ROS message. + """ + return Contact( + active=input.active, + name=input.name, + wrench=wrench_numpy_to_msg(input.wrench), + pose=pose_numpy_to_msg(input.pose), + ) + + +def sensor_numpy_to_msg(input: lfc_py_types.Sensor) -> Sensor: + """Converts internal LFC Sensor class into ROS Sensor message. + + Args: + input (lfc_py_types.Sensor): Input LFC representation of Control. + + Returns: + linear_feedback_controller_msgs.msg.Sensor: Output ROS message. + """ + return Sensor( + base_pose=pose_numpy_to_msg(input.base_pose), + base_twist=twist_numpy_to_msg(input.base_twist), + joint_state=joint_state_numpy_to_msg(input.joint_state), + contacts=[contact_numpy_to_msg(contact) for contact in input.contacts], + ) + + +def control_numpy_to_msg(input: lfc_py_types.Control) -> Control: + """Converts internal LFC Control class into ROS Control message. + + Args: + input (lfc_py_types.Control): Input LFC representation of Control. + + Returns: + linear_feedback_controller_msgs.msg.Control: Output ROS message. + """ + return Control( + feedback_gain=matrix_numpy_to_msg(input.feedback_gain), + feedforward=matrix_numpy_to_msg(input.feedforward), + initial_state=sensor_numpy_to_msg(input.initial_state), + ) diff --git a/tests/test_numpy_conversions.py b/tests/test_numpy_conversions.py new file mode 100644 index 0000000..55c98de --- /dev/null +++ b/tests/test_numpy_conversions.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python + +import numpy as np +import numpy.typing as npt +from typing import List +from copy import deepcopy + +from geometry_msgs.msg import Transform, Vector3, Quaternion + +from linear_feedback_controller_msgs_py.numpy_conversions import * +import linear_feedback_controller_msgs_py.lfc_py_types as lfc_py_types + + +def test_check_numpy_constructors() -> None: + joint_state = lfc_py_types.JointState( + name=["joint_0", "joint_1"], + position=np.array([1.0, 1.0]), + velocity=np.array([1.0, 1.0]), + effort=np.array([1.0, 1.0]), + ) + contact = lfc_py_types.Contact( + active=False, + name="contact_0", + wrench=np.ones(6), + pose=np.ones(7), + ) + sensor = lfc_py_types.Sensor( + base_pose=np.ones(7), + base_twist=np.ones(6), + joint_state=joint_state, + contacts=[contact], + ) + lfc_py_types.Control( + feedback_gain=np.ones((4, 4)), + feedforward=np.ones(4), + initial_state=sensor, + ) + + +def test_check_ros_numpy_matrix_conversion() -> None: + numpy_random_matrix = np.random.rand(5, 6) + + ros_matrix = matrix_numpy_to_msg(deepcopy(numpy_random_matrix)) + numpy_back_converted_matrix = matrix_msg_to_numpy(ros_matrix) + + np.testing.assert_array_equal( + numpy_back_converted_matrix, + numpy_random_matrix, + err_msg="Matrix after conversion back to " + + "Numpy is not equal the initial matrix!", + ) + + +def test_check_ros_numpy_joint_state_conversion() -> None: + numpy_joint_state = lfc_py_types.JointState( + name=["1", "2", "3", "4", "5", "6"], + position=np.random.rand(6), + velocity=np.random.rand(6), + effort=np.random.rand(6), + ) + + # Pass deep copy to ensure memory is separate + ros_joint_state = joint_state_numpy_to_msg(deepcopy(numpy_joint_state)) + numpy_back_converted_join_state = joint_state_msg_to_numpy(ros_joint_state) + + assert ( + numpy_joint_state.name == numpy_back_converted_join_state.name + ), "Names after conversion back to Numpy is not equal initial values!" + + np.testing.assert_array_equal( + numpy_joint_state.position, + numpy_back_converted_join_state.position, + err_msg="Positions after conversion back to " + + "Numpy is not equal initial values!", + ) + + np.testing.assert_array_equal( + numpy_joint_state.velocity, + numpy_back_converted_join_state.velocity, + err_msg="Velocities after conversion back to " + + "Numpy is not equal initial values!", + ) + + np.testing.assert_array_equal( + numpy_joint_state.effort, + numpy_back_converted_join_state.effort, + err_msg="Efforts after conversion back to " + + "Numpy is not equal initial values!", + ) + + +def test_check_ros_eigen_sensor_conversions() -> None: + quat = np.random.rand(4) + quat = quat / np.linalg.norm(quat) + + numpy_sensor = lfc_py_types.Sensor( + base_pose=np.concatenate((np.random.rand(3), quat)), + base_twist=np.random.rand(6), + joint_state=lfc_py_types.JointState( + name=["1", "2", "3", "4", "5", "6"], + position=np.random.rand(6), + velocity=np.random.rand(6), + effort=np.random.rand(6), + ), + contacts=[], + ) + + numpy_sensor.contacts.append( + lfc_py_types.Contact( + active=True, + name="left_foot", + wrench=np.random.rand(6), + pose=np.random.rand(7), + ) + ) + + numpy_sensor.contacts.append( + lfc_py_types.Contact( + active=False, + name="right_foot", + wrench=np.random.rand(6), + pose=np.random.rand(7), + ) + ) + + ros_sensor = sensor_numpy_to_msg(deepcopy(numpy_sensor)) + numpy_back_converted_sensor = sensor_msg_to_numpy(ros_sensor) + + np.testing.assert_array_equal( + numpy_sensor.base_pose, + numpy_back_converted_sensor.base_pose, + err_msg="Base Pose after conversion back to " + + "Numpy is not equal initial values!", + ) + + np.testing.assert_array_equal( + numpy_sensor.base_twist, + numpy_back_converted_sensor.base_twist, + err_msg="Base Twist after conversion back to " + + "Numpy is not equal initial values!", + ) + + assert ( + numpy_sensor.joint_state.name == numpy_back_converted_sensor.joint_state.name + ), "Joint names after conversion back to Numpy is not equal initial values!" + + np.testing.assert_array_equal( + numpy_sensor.joint_state.position, + numpy_back_converted_sensor.joint_state.position, + err_msg="Joint positions after conversion back to " + + "Numpy is not equal initial values!", + ) + + np.testing.assert_array_equal( + numpy_sensor.joint_state.velocity, + numpy_back_converted_sensor.joint_state.velocity, + err_msg="Joint velocities after conversion back to " + + "Numpy is not equal initial values!", + ) + + np.testing.assert_array_equal( + numpy_sensor.joint_state.effort, + numpy_back_converted_sensor.joint_state.effort, + err_msg="Joint efforts after conversion back to " + + "Numpy is not equal initial values!", + ) + + assert len(numpy_sensor.contacts) == len( + numpy_back_converted_sensor.contacts + ), "Number of contacts after conversion doesn't match number of contacts before conversion!" + + for i in range(len(numpy_sensor.contacts)): + c1 = numpy_sensor.contacts[i] + c2 = numpy_back_converted_sensor.contacts[i] + assert ( + c1.active == c2.active + ), f"Active parameter before and after conversion differs at index '{i}'" + assert ( + c1.name == c2.name + ), f"Name parameter before and after conversion differs at index '{i}'" + + np.testing.assert_array_equal( + c1.wrench, + c2.wrench, + err_msg=f"Wrench parameter before and after conversion differs at index '{i}'", + ) + + np.testing.assert_array_equal( + c1.pose, + c2.pose, + err_msg=f"Pose parameter before and after conversion differs at index '{i}'", + )