Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions launch_testing/launch_testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .parametrize import parametrize
from .proc_info_handler import ActiveProcInfoHandler, ProcInfoHandler
from .ready_aggregator import ReadyAggregator
from .ready_to_test_action_timeout import ready_to_test_action_timeout


__all__ = [
Expand All @@ -27,6 +28,7 @@
# Functions
'parametrize',
'post_shutdown_test',
'ready_to_test_action_timeout',

# Classes
'ActiveIoHandler',
Expand Down
7 changes: 7 additions & 0 deletions launch_testing/launch_testing/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ def __init__(self,
self.pre_shutdown_tests = pre_shutdown_tests
self.post_shutdown_tests = post_shutdown_tests

# Duration for which the ReadyToTest action waits for the processes
# to start up, 15 seconds by default
self.timeout = 15

if hasattr(test_description_function, '__ready_to_test_action_timeout__'):
self.timeout = getattr(test_description_function, '__ready_to_test_action_timeout__')

# If we're parametrized, extend the test names so we can tell more easily what
# params they were run with
if self.param_args:
Expand Down
5 changes: 5 additions & 0 deletions launch_testing/launch_testing/parametrize.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ def _wrapped():
partial = functools.partial(func, **partial_args)
functools.update_wrapper(partial, func)
yield partial, partial_args

_wrapped.__parametrized__ = True
if hasattr(func, '__ready_to_test_action_timeout__'):
_wrapped.__ready_to_test_action_timeout__ = \
getattr(func, '__ready_to_test_action_timeout__')

return _wrapped

return _decorator
30 changes: 30 additions & 0 deletions launch_testing/launch_testing/ready_to_test_action_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def ready_to_test_action_timeout(timeout):
"""
Decorate a test launch description in a way that it adds ReadyToTest action timeout.

attribute to the function being decorated.

:param: timeout Duration for which the ReadyToTest action waits for processes to start up

"""

def _decorator(func):
func.__ready_to_test_action_timeout__ = timeout
return func

return _decorator
6 changes: 5 additions & 1 deletion launch_testing/launch_testing/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ def __init__(self,
self._tests_completed = threading.Event() # To signal when all the tests have finished
self._launch_file_arguments = launch_file_arguments

# Duration for which the ReadyToTest action waits for the processes
# to start up, 15 seconds by default
self.timeout = self._test_run.timeout

# Can't run LaunchService.run on another thread :-(
# See https://github.com/ros2/launch/issues/126
#
Expand Down Expand Up @@ -181,7 +185,7 @@ def _run_test(self):
# Waits for the DUT processes to start (signaled by the _processes_launched
# event) and then runs the tests

if not self._processes_launched.wait(timeout=15):
if not self._processes_launched.wait(self.timeout):
# Timed out waiting for the processes to start
print('Timed out waiting for processes to start up')
self._launch_service.shutdown()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

import launch
from launch.actions import TimerAction
import launch_testing
from launch_testing.actions import ReadyToTest

import pytest


# The ReadyToTest action waits for 15 second by default for the processes to
# start, if processes take more time an error is thrown. We use decorator here
# to provide timeout duration of 20 second so that processes that take longer than
# 15 seconds can start up.

@pytest.mark.launch_test
@launch_testing.ready_to_test_action_timeout(20)
def generate_test_description():
# takes 15 sec for the TimerAction process to start
return launch.LaunchDescription([
launch_testing.util.KeepAliveProc(),
TimerAction(period=15.0, actions=[ReadyToTest()]),
])


# the process should exit cleanly now since the process takes 15 second
# to start and timeout duration of ReadyToTest action is 20 seconds.
@launch_testing.post_shutdown_test()
class TestHelloWorldShutdown(unittest.TestCase):

def test_exit_codes(self, proc_info):
"""Check if the processes exited normally."""
launch_testing.asserts.assertExitCodes(proc_info)
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import launch_testing


def test_ready_to_test_action_attribute():

@launch_testing.ready_to_test_action_timeout(10)
def fake_test_description(arg):
pass # pragma: no cover

assert hasattr(fake_test_description, '__ready_to_test_action_timeout__')
assert getattr(fake_test_description, '__ready_to_test_action_timeout__') == 10


def test_parametrize_and_ready_to_test_action_attribute():

@launch_testing.parametrize('val', [1, 2, 3])
@launch_testing.ready_to_test_action_timeout(10)
def fake_test_description(arg):
pass # pragma: no cover

assert hasattr(fake_test_description, '__parametrized__')
assert hasattr(fake_test_description, '__ready_to_test_action_timeout__')
assert getattr(fake_test_description, '__ready_to_test_action_timeout__') == 10


def test_ready_to_test_action_and_parametrize_attribute():

@launch_testing.ready_to_test_action_timeout(10)
@launch_testing.parametrize('val', [1, 2, 3])
def fake_test_description(arg):
pass # pragma: no cover

assert hasattr(fake_test_description, '__parametrized__')
assert hasattr(fake_test_description, '__ready_to_test_action_timeout__')
assert getattr(fake_test_description, '__ready_to_test_action_timeout__') == 10