Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an optional timeout_sec input to Client.call() to fix issue https://github.com/ros2/rclpy/issues/1181 #1188

Merged
merged 9 commits into from
Nov 17, 2023
15 changes: 11 additions & 4 deletions rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,19 @@ def __init__(

self._lock = threading.Lock()

def call(self, request: SrvTypeRequest) -> SrvTypeResponse:
def call(
self,
request: SrvTypeRequest,
timeout_sec: Optional[float] = None
) -> Optional[SrvTypeResponse]:
"""
Make a service request and wait for the result.

.. warning:: Do not call this method in a callback or a deadlock may occur.
.. warning:: Do not call this method in a callback, or a deadlock or timeout may occur.
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved

:param request: The service request.
:return: The service response.
:param timeout_sec: Seconds to wait. If ``None``, then wait forever.
:return: The service response, or None if timed out.
:raises: TypeError if the type of the passed request isn't an instance
of the Request type of the provided service when the client was
constructed.
Expand All @@ -97,7 +102,9 @@ def unblock(future):
# The callback might have been added after the future is completed,
# resulting in the event never being set.
if not future.done():
event.wait()
if not event.wait(timeout_sec):
# Timed out. remove_pending_request() to free resources
self.remove_pending_request(future)
if future.exception() is not None:
raise future.exception()
return future.result()
Expand Down
59 changes: 59 additions & 0 deletions rclpy/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# limitations under the License.

import platform
import threading
import time
import traceback
import unittest

from rcl_interfaces.srv import GetParameters
import rclpy
import rclpy.executors
import rclpy.node
from rclpy.utilities import get_rmw_implementation_identifier
from test_msgs.srv import Empty

Expand Down Expand Up @@ -56,6 +59,21 @@ def do_test_service_name(cls, test_service_name_list):
client.destroy()
node.destroy_node()

@staticmethod
def _spin_rclpy_node(
rclpy_node: rclpy.node.Node,
rclpy_executor: rclpy.executors.SingleThreadedExecutor
) -> None:
try:
rclpy_executor.spin()
except rclpy.executors.ExternalShutdownException:
pass
except Exception as err:
traceback.print_exc()
print(rclpy_node.get_name() + ': ' + str(err))
print(rclpy_node.get_name() + ': rclpy_node exit')
# rclpy_node.destroy_node()

def test_wait_for_service_5sec(self):
cli = self.node.create_client(GetParameters, 'get/parameters')
try:
Expand Down Expand Up @@ -183,6 +201,47 @@ def test_get_service_name_after_remapping(self):
]
TestClient.do_test_service_name(test_service_name_list)

def test_sync_call(self):
def _service(request, response):
return response
cli = self.node.create_client(GetParameters, 'get/parameters')
srv = self.node.create_service(GetParameters, 'get/parameters', _service)
try:
self.assertTrue(cli.wait_for_service(timeout_sec=20))
executor = rclpy.executors.SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)
executor_thread = threading.Thread(
target=TestClient._spin_rclpy_node, args=(self.node, executor))
executor_thread.start()
result = cli.call(GetParameters.Request(), 0.5)
self.assertTrue(result is not None)
executor.shutdown()
executor_thread.join()
finally:
self.node.destroy_client(cli)
self.node.destroy_service(srv)

def test_sync_call_timeout(self):
def _service(request, response):
time.sleep(1)
return response
cli = self.node.create_client(GetParameters, 'get/parameters')
srv = self.node.create_service(GetParameters, 'get/parameters', _service)
try:
self.assertTrue(cli.wait_for_service(timeout_sec=20))
executor = rclpy.executors.SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)
executor_thread = threading.Thread(
target=TestClient._spin_rclpy_node, args=(self.node, executor))
executor_thread.start()
result = cli.call(GetParameters.Request(), 0.5)
self.assertTrue(result is None)
executor.shutdown()
executor_thread.join()
finally:
self.node.destroy_client(cli)
self.node.destroy_service(srv)


if __name__ == '__main__':
unittest.main()