diff --git a/devs.py b/devs.py index ea42d76..5c87e9a 100644 --- a/devs.py +++ b/devs.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from serial.tools.list_ports import comports from uhubctl import Uhubctl import yaml @@ -51,7 +51,6 @@ def reset_all(cls) -> None: @classmethod def create_from_uid(cls, uid: str) -> "DevSwitch": - hub, port = cls.hw_controller.get_hub_port_by_desc(uid) if hub and port: @@ -61,7 +60,6 @@ def create_from_uid(cls, uid: str) -> "DevSwitch": @classmethod def scan(cls) -> list["DevSwitch"]: - hub_port = cls.hw_controller.scan_hubs_ports() switch_list = [] @@ -73,33 +71,50 @@ def scan(cls) -> list["DevSwitch"]: @dataclass class DevAccessSerial: - address: str + address: str = None + serial_number: str = None # TODO: Add additional requirements for accessibility: # # - the device is not busy (fuser or such as in makersHIL) def get_address(self) -> str: return self.address + + def get_serial_number(self) -> str: + return self.serial_number @classmethod def create_from_uid(cls, uid: str) -> "DevAccessSerial": for port in comports(): if port.serial_number == uid: - return cls(address=port.device) + return cls(address=port.device, serial_number=port.serial_number) + + return None + + @classmethod + def scan(cls, attr_name: str = None , attr_value: str = None) -> list["DevAccessSerial"]: + access_list = [] + for port in comports(): + if attr_name is None: + access_list.append(cls(address=port.device, serial_number=port.serial_number)) + else: + if hasattr(port, attr_name) and attr_value == getattr(port, attr_name): + access_list.append(cls(address=port.device, serial_number=port.serial_number)) - return None + return access_list @dataclass class Device: - name: str - uid: str - features: list[str] + name: str = "" + uid: str = "" + features: list[str] = field(default_factory=list) access: DevAccessSerial = None switch: DevSwitch = None def __post_init__(self): - self.access = DevAccessSerial.create_from_uid(self.uid) - self.switch = DevSwitch.create_from_uid(self.uid) + if self.uid != "": + self.access = DevAccessSerial.create_from_uid(self.uid) + self.switch = DevSwitch.create_from_uid(self.uid) @classmethod def load_device_list_from_yml(cls, devs_yml_file: str) -> list["Device"]: diff --git a/devs_query.py b/devs_query.py new file mode 100644 index 0000000..4291928 --- /dev/null +++ b/devs_query.py @@ -0,0 +1,238 @@ +import argparse +from dataclasses import dataclass +from devs import Device, DevAccessSerial, DevSwitch + +class ObjectAttrQuerier: + """ + A utility class to query attributes from an object, including nested objects. + It supports exact and partial matching, as well as filtering based on multiple criteria. + """ + + @dataclass + class AttrQueryFilter: + """ + A data class to represent a filter for attribute querying. + """ + attr_key: str + attr_value: str = None + exact: bool = True + + def __init__(self, obj): + """ + Initialize the ObjectAttrQuerier with the target object. + """ + self.obj = obj + + def query(self, attr_key: str, attr_value: str = None, exact: bool = True) -> object: + """ + Query the attribute of the object. + If attr_value is provided, it checks for exact or partial match based on the 'exact' flag.""" + if attr_value: + if exact: + return (attr_value if self.__query_exact(attr_key, attr_value) else None) + else: + return (attr_value if self.__query_contains(attr_key, attr_value) else None) + else: + return self.__query_attr(attr_key) + + def query_deep(self, attr_key: str, attr_value: str = None, exact: bool = True) -> object: + """ + Deep query the attribute of the object, including nested objects. + If attr_value is provided, it checks for exact or partial match based on the 'exact' flag. + """ + found_attr_value = self.query(attr_key, attr_value, exact) + + if not found_attr_value: + for attr in vars(self.obj): + attr_obj = getattr(self.obj, attr) + if isinstance(attr_obj, object): + sub_querier = ObjectAttrQuerier(attr_obj) + found_attr_value = sub_querier.query(attr_key, attr_value, exact) + if found_attr_value is not None: + return found_attr_value + + return found_attr_value + + def query_filtered(self, attr_key, query_filter_list: list[AttrQueryFilter], deep: bool = False) -> object: + """ + Query the attribute of the object with multiple filters. + If deep is True, it performs a deep query. + """ + query_func = self.query_deep if deep else self.query + + attr_value = query_func(attr_key) + if not attr_value: + return None + + for query_filter in query_filter_list: + filter_attr_value = query_func(query_filter.attr_key, query_filter.attr_value, query_filter.exact) + if not filter_attr_value: + return None + + return attr_value + + @staticmethod + def query_list(obj_list: list[object], attr_key, query_filter_list: list[AttrQueryFilter] = [], deep: bool = False) -> list[object]: + """ + Query a list of objects for a specific attribute with optional filters. + If deep is True, it performs a deep query.""" + query_list = [] + for obj in obj_list: + obj = ObjectAttrQuerier(obj) + attr_value = obj.query_filtered(attr_key, query_filter_list, deep) + if attr_value: + query_list.append(attr_value) + + return query_list + + """ + Private methods + """ + def __query_attr(self, query_attr: str) -> object: + """ + Query the attribute of the object. + """ + if hasattr(self.obj, query_attr): + return getattr(self.obj, query_attr) + + return None + + def __query_exact(self, query_attr: str, query_value: str) -> bool: + """ + Check if the attribute of the object matches exactly the query value. + """ + if hasattr(self.obj, query_attr): + return getattr(self.obj, query_attr) == query_value + + return False + + def __query_contains(self, query_attr: str, query_value: str) -> bool: + """ + Check if the attribute of the object contains the query value. + """ + if hasattr(self.obj, query_attr): + attr_value = getattr(self.obj, query_attr) + if isinstance(attr_value, str): + return query_value in attr_value + + return False + +class DevsQueryCLI: + """ + A command-line interface (CLI) for querying device attributes using ObjectAttrQuerier. + The allowed device objects are Device, DevAccessSerial, and DevSwitch. + """ + def __init__(self): + + self.allowed_dev_objs = [Device, DevAccessSerial, DevSwitch] + self.parser = argparse.ArgumentParser(description="Device query utility") + + self.parser.add_argument("query_attribute", choices=self.__get_query_choices(), help="The attribute to query from the devices.") + self.parser.add_argument("-f", "--filter", nargs="*", action="append", help="Filter in format 'attribute_key=attribute_value'. It accepts multiple filters separated by space, or multiple -f options.") + self.parser.add_argument("-y", "--devs-yml", default=None, help="Device YAML file. If not provided, it query from the connected serial devices.") + self.parser.add_argument("--not-connected", default=False, action="store_true", help="Include also NOT connected devices from the device list. Only relevant when using --devs-yml") + + def parse(self) -> argparse.Namespace: + """ + Parse and validate the command-line arguments. + """ + args = self.parser.parse_args() + self.__set_validate_args(args) + return args + + def __get_query_choices(self) -> list[str]: + """ + Get the list of possible attribute names from the allowed device objects. + The valid attributes are those with string or integer types. + """ + def get_obj_attributes(obj): + attr_list = [] + for attr in dir(obj): + if not callable(getattr(obj, attr)) and not attr.startswith("__"): + if isinstance(getattr(obj, attr), str) or isinstance(getattr(obj, attr), int): + attr_list.append(attr) + + if hasattr(obj, '__annotations__'): + for attr, attr_type in obj.__annotations__.items(): + # Check if it's a string type + if attr_type == str or attr_type == 'str' or attr_type == int or attr_type == 'int': + attr_list.append(attr) + + return attr_list + + choices = [] + for obj in self.allowed_dev_objs: + choices.extend(get_obj_attributes(obj)) + + choices = sorted(list(set(choices))) + + return choices + + def __set_validate_args(self, args: argparse.Namespace) -> argparse.Namespace: + """ + Validate and set the command-line arguments. + The filters must be in the format 'attribute=value'. + As the CLI allows multiple -f options and the append action, it flatten + the list of lists into a single list. + """ + if args.filter: + simple_filter_list = [] + for filter in args.filter: + if not isinstance(filter, list): + simple_filter_list.append(filter) + else: + simple_filter_list.extend(filter) + + args.filter = simple_filter_list + for f in simple_filter_list: + if '=' not in f: + self.parser.error("filter must be in format 'attribute=value'") + + def parse_filters(self, filter_str_list: list[str]) -> list[ObjectAttrQuerier.AttrQueryFilter]: + """ + Parse the filter strings into a list of AttrQueryFilter objects. + Each filter string must be in the format 'attribute=value'. + """ + filter_list = [] + for filter_str in filter_str_list: + + attr_name, attr_value = filter_str.split('=', 1) + filter_list.append(ObjectAttrQuerier.AttrQueryFilter(attr_key=attr_name, attr_value=attr_value)) + + return filter_list + + +def main_devs_query_cli() -> list[str]: + """ + Main function for the DevsQueryCLI. + It parses the command-line arguments, loads the device list, + applies the queries and filters, and prints the results. + + If a device YAML file is provided, it loads the device list from the file. + Otherwise, it scans for connected serial devices. + """ + cli = DevsQueryCLI() + args = cli.parse() + + query_filter_list = [] + if args.filter: + query_filter_list = cli.parse_filters(args.filter) + + if args.devs_yml: + dev_list = Device.load_device_list_from_yml(args.devs_yml) + if not args.not_connected: + dev_list =[ dev for dev in dev_list if dev.access is not None ] + else: + dev_list = DevAccessSerial.scan() + + query_list = ObjectAttrQuerier.query_list( + dev_list, + args.query_attribute, + query_filter_list, + deep=True + ) + + print(*query_list) + +if __name__ == "__main__": + main_devs_query_cli() \ No newline at end of file diff --git a/get_devs.py b/get_devs.py deleted file mode 100644 index 26c64b9..0000000 --- a/get_devs.py +++ /dev/null @@ -1,184 +0,0 @@ -# This script support the discovery of attached KitProg3 devices - -import argparse, glob, os, re, subprocess, yaml - - -def get_devs_from_yml(dev_yml): - if not os.path.exists(dev_yml): - raise Exception("devices .yml file does not exit") - - board_sn_map = [] - with open(dev_yml, "r") as devs_f: - board_sn_map = yaml.safe_load(devs_f) - - return board_sn_map - - -def udevadm_get_kitprog3_attached_devs(): - def get_ttyACM_dev_list(): - return glob.glob(os.path.join("/dev", "ttyACM*"), recursive=False) - - def get_udevadm_port_attrs_output(dev): - dev_param = "--name=" + str(dev) - cmd_line_args = ["udevadm", "info", dev_param, "--attribute-walk"] - udevadm_output_lines = [] - try: - udevadm_proc = subprocess.Popen(cmd_line_args, stdout=subprocess.PIPE) - while True: - line = udevadm_proc.stdout.readline() - udevadm_output_lines.append(line) - if not line: - break - except: - raise Exception("udevadm error") - - return udevadm_output_lines - - def is_kitprog_device(udevadm_output_lines): - def is_device_attr_found(line, pattern): - attr = re.search(pattern, str(line)) - if attr is not None: - return True - - return False - - # It is a kitprog probe is these matches - # are found in the device attributes - required_attr_match = [ - r'ATTRS{interface}=="KitProg\d.*"', - r'ATTRS{product}==".*KitProg\d.*"', - ] - - attr_found_count = 0 - - for line in udevadm_output_lines: - for req_match in required_attr_match: - if is_device_attr_found(line, req_match): - attr_found_count += 1 - - if attr_found_count == len(required_attr_match): - return True - - return False - - def get_kitprog_serial_number(udevadm_output_lines): - for line in udevadm_output_lines: - attr = re.search('ATTRS{serial}=="[a-zA-Z0-9]*"', str(line)) - if attr is not None: - sn = attr.group()[len("ATTRS{serial}==") :] - sn = sn.strip('"') - return sn - - return "" - - kitprog_devs = [] - dev_list = get_ttyACM_dev_list() - - if dev_list != []: - for dev in dev_list: - udevadm_output_lines = get_udevadm_port_attrs_output(dev) - if is_kitprog_device(udevadm_output_lines): - sn = get_kitprog_serial_number(udevadm_output_lines) - # new kp device - kp_dev = {} - kp_dev["port"] = dev - kp_dev["sn"] = sn - - kitprog_devs.append(kp_dev) - - return kitprog_devs - - -def get_devices(search_param, board=None, devs_yml=None, hw_ext=None): - dev_list = [] - port_sn_map = udevadm_get_kitprog3_attached_devs() - - if board is not None and devs_yml is not None: - board_sn_map_list = get_devs_from_yml(devs_yml) - - for board_sn_map_item in board_sn_map_list: - if board == board_sn_map_item["board_type"]: - for dev in port_sn_map: - for mapped_board_item in board_sn_map_item["board_list"]: - if dev["sn"] == mapped_board_item["sn"]: - if hw_ext is None: - dev_list.append(dev[search_param]) - break - if hw_ext is not None: - if "hw_ext" in mapped_board_item.keys(): - if hw_ext == mapped_board_item["hw_ext"]: - dev_list.append(dev[search_param]) - break - else: - for dev in port_sn_map: - dev_list.append(dev[search_param]) - - return dev_list - - -def get_devices_serial_num(board=None, devs_yml=None, hw_ext=None): - return get_devices("sn", board, devs_yml, hw_ext) - - -def get_devices_port(board=None, devs_yml=None, hw_ext=None): - return get_devices("port", board, devs_yml, hw_ext) - - -def parser(): - def main_parser_func(args): - parser.print_help() - - def parse_validate_opt_arg_mutual_required(args): - if args.devs_yml and args.board is None: - parser.error("--devs-yml requires --board.") - if args.board and args.devs_yml is None: - parser.error("--board requires --dev-yml.") - if args.hw_ext is not None and (args.board is None or args.devs_yml is None): - parser.error("--hw_ext requires --board and --dev-yml.") - - def parser_get_devices_serial_num(args): - parse_validate_opt_arg_mutual_required(args) - devs_serial = get_devices_serial_num(args.board, args.devs_yml, args.hw_ext) - print(*devs_serial) - - def parser_get_devices_port(args): - parse_validate_opt_arg_mutual_required(args) - devs_port = get_devices_port(args.board, args.devs_yml, args.hw_ext) - print(*devs_port) - - parser = argparse.ArgumentParser(description="Get kitprog3 device utility") - - subparser = parser.add_subparsers() - parser.set_defaults(func=main_parser_func) - - # Get devices serial numbers - parser_sn = subparser.add_parser( - "serial-number", description="Get kitprog3 devices serial number list" - ) - parser_sn.add_argument("-b", "--board", type=str, help="Board name") - parser_sn.add_argument( - "-y", "--devs-yml", type=str, help="Device list yml with board - serial number map" - ) - parser_sn.add_argument( - "--hw-ext", type=str, default=None, help="Required external hardware configuration" - ) - parser_sn.set_defaults(func=parser_get_devices_serial_num) - - # Get devices port - parser_port = subparser.add_parser("port", description="Get kitprog3 devices port list") - parser_port.add_argument("-b", "--board", type=str, help="Board name") - parser_port.add_argument( - "-y", "--devs-yml", type=str, help="Device list yml with board - serial number map" - ) - parser_port.add_argument( - "--hw-ext", type=str, default=None, help="Required external hardware configuration" - ) - parser_port.set_defaults(func=parser_get_devices_port) - - # Parser call - args = parser.parse_args() - args.func(args) - - -if __name__ == "__main__": - parser() diff --git a/run_test_plan.py b/run_test_plan.py index 88ec5a7..4f872bc 100644 --- a/run_test_plan.py +++ b/run_test_plan.py @@ -8,8 +8,7 @@ import yaml import time -from get_devs import get_devices_port - +from devs import Device, DevAccessSerial class TestRunner: """ @@ -286,7 +285,6 @@ def get_test_list_exclude_args(): def __run_single_post_delay_test(self, dut_port: str) -> int: """ Run single tests with a delay between each test. - """ def get_test_list_args(): @@ -673,14 +671,15 @@ def run(self, test_name_list: list[str] = [], max_retries: int = 0) -> int: while pending_retries: for test in test_list: - dut_port, stub_port = self.get_test_device_ports(test) + dut_dev, stub_dev = self.get_test_devs(test) + dut_port, stub_port = TestPlanRunner.__get_test_ports(dut_dev, stub_dev) if not test.are_supported_devs_available(dut_port, stub_port): test_results.register_skip(test.name) self.logger.test_skip_info(test.name) continue - # TODO: Add device.switch management. + TestPlanRunner.__reset_switchable_devs(dut_dev, stub_dev) self.logger.test_info(test.name, dut_port, stub_port) ret_code = test.run(dut_port, stub_port) @@ -733,15 +732,47 @@ def __get_test_list(self, test_name_list: list[str] = []): return test_list + @staticmethod + def __reset_switchable_devs(dut_dev: Device, stub_dev: Device) -> None: + """ + Reset the given devices. + If a device has a switch, it uses the switch to reset the device. + """ + devs_to_reset = [dut_dev, stub_dev] + for dev in devs_to_reset: + if dev.switch: + dev.switch.reset() + timeout = 0 + while not dev.switch.status() == "on connected" and timeout < 5: + time.sleep(1) + timeout += 1 + + @staticmethod + def __get_test_ports(dut_dev: Device, stub_dev: Device) -> tuple[str, str]: + """ + Get the test device ports from the given devices. + If a device has an access method, it returns the address of the access. + Otherwise, it returns None. + """ + dut_port = None + stub_port = None + + if dut_dev.access: + dut_port = dut_dev.access.get_address() + + if stub_dev.access: + stub_port = stub_dev.access.get_address() + + return dut_port, stub_port + @abstractmethod - def get_test_device_ports(self, test: TestRunner) -> tuple[str, str]: + def get_test_devs(self, test: TestRunner) -> tuple[Device, Device]: """ - Abstract method to get the test device ports for the given test. + Abstract method to get the test devices for the given test. This method must be implemented by the derived classes. - It should return a tuple of (dut_port, stub_port). + It should return a tuple of (dut_dev, stub_dev). """ - return None, None - + return Device(), Device() class TestPlanRunnerHIL(TestPlanRunner): """ @@ -774,55 +805,63 @@ def run(self, test_name_list: list[str] = [], max_retries: int = 0) -> int: Private methods """ - def get_test_device_ports(self, test: TestRunner) -> tuple[str, str]: + def get_test_devs(self, test: TestRunner) -> tuple[Device, Device]: """ - Get the test device ports for the given test. - It uses the HIL devices file and the board name to find the appropriate ports. - Returns a tuple of (dut_port, stub_port). + Get the test devices for the given test. + It uses the HIL devices file and the board name to find the appropriate devices. + Returns a tuple of (dut_dev, stub_dev). If multiple test devices are available for the given role, it takes the first one for DUT and any other for the STUB. """ - dut_port = None - stub_port = None + dut_dev = Device() + stub_dev = Device() - dut_port_list = self.__get_ports_for_role(test, self.board, TestRunner.DeviceRole.DUT) + dut_dev_list = self.__get_devs_for_role(test, self.board, TestRunner.DeviceRole.DUT) - if not dut_port_list: - return dut_port, stub_port + if not dut_dev_list: + return dut_dev, stub_dev - dut_port = dut_port_list[0] # Take the first + for dev in dut_dev_list: + if dev.access: + dut_dev = dev # Take the first accessible + break + + if not dut_dev.access: + return dut_dev, stub_dev if test.requires_multiple_devs(): - stub_port_list = self.__get_ports_for_role( + stub_dev_list = self.__get_devs_for_role( test, self.board, TestRunner.DeviceRole.STUB ) - for port in stub_port_list: - # Take any element from stub_port_list that is not dut_port - if port != dut_port: - stub_port = port - break + for dev in stub_dev_list: + if dev.access: + # Take any element from stub_port_list that is not dut_port + if dev.access.get_address() != dut_dev.access.get_address(): + stub_dev = dev + break - return dut_port, stub_port + return dut_dev, stub_dev - def __get_ports_for_role( + def __get_devs_for_role( self, test: TestRunner, board: str, device_role: TestRunner.DeviceRole - ) -> list[str]: + ) -> list[Device]: """ - Get the list of ports for the given device role (dut or stub) and board. - It uses the HIL devices file to find the available ports. + Get the list of devices for the given device role (dut or stub) and board. + It uses the HIL devices file to find the available matching devices. """ supported_dev_list = test.get_supported_dev_list(device_role, board) - port_list = [] - for device in supported_dev_list: - available_ports = get_devices_port( - device.get("board"), self.hil_devs_file, device.get("version", None) - ) - port_list.extend(available_ports) + dev_list = [] + for supported_dev in supported_dev_list: + available_devs = Device.load_device_list_from_yml(self.hil_devs_file) + for dev in available_devs: + if dev.name == supported_dev.get("board"): + if supported_dev.get("version") is None or supported_dev.get("version") in dev.features: + dev_list.append(dev) - return port_list + return dev_list class TestPlanRunnerPorts(TestPlanRunner): @@ -858,12 +897,14 @@ def run(self, test_name_list: list[str] = [], max_retries: int = 0): Private methods """ - def get_test_device_ports(self, test: TestRunner) -> tuple[str, str]: + def get_test_devs(self, test: TestRunner) -> tuple[Device, Device]: """ Get the test device ports for the given test. It returns the ports set in the instance. """ - return self.dut_port, self.stub_port + dev_dut = Device(access=DevAccessSerial(address=self.dut_port)) + dev_stub = Device(access=DevAccessSerial(address=self.stub_port)) + return dev_dut, dev_stub class TestPlanRunnerCLI: diff --git a/test/test_devs_query.py b/test/test_devs_query.py new file mode 100644 index 0000000..f3ed7b7 --- /dev/null +++ b/test/test_devs_query.py @@ -0,0 +1,82 @@ +""" +This is a basic during development test for the ObjectAttrQuerier class. +It is not meant as a formal and thorough unit and coverage testing. +It just exercises the main functionalities of the class in +a sunny-day testing approach. + +In it we create a couple of Device and DevAccessSerial objects +and use them to test the querying functionalities of the ObjectAttrQuerier class. +""" +import sys + +sys.path.append('..') + +from devs_query import ObjectAttrQuerier +from devs import DevAccessSerial, Device + +serial_dev_list = [ + DevAccessSerial(address="/dev/ttyACM1", serial_number="1106035A012D2400"), + DevAccessSerial(address="/dev/ttyACM0", serial_number="0D170C5A012D2400") +] + +dev_list = [ + Device(name="CY8CKIT-062S2-AI", uid="1106035A012D2400", features=["psoc6", "ble"], access=serial_dev_list[0]), + Device(name="CY8CKIT-062S2-AI", uid="0D170C5A012D2400", features=["0.1.0.c"], access=serial_dev_list[1]) +] +dev_querier = ObjectAttrQuerier(dev_list[0]) + +print("---- query ---") +assert(dev_querier.query("name") == "CY8CKIT-062S2-AI") +assert(dev_querier.query("uid") == "1106035A012D2400") +assert(dev_querier.query("features") == ["psoc6", "ble"]) +assert(dev_querier.query("access") == serial_dev_list[0]) +assert(dev_querier.query("switch") == None) +assert(dev_querier.query("serial_number") == None) +assert(dev_querier.query("address") == None) + +print("---- deep query ---") +assert(dev_querier.query_deep("name") == "CY8CKIT-062S2-AI") +assert(dev_querier.query_deep("uid") == "1106035A012D2400") +assert(dev_querier.query_deep("features") == ["psoc6", "ble"]) +assert(dev_querier.query_deep("access") == serial_dev_list[0]) +assert(dev_querier.query_deep("switch") == None) +assert(dev_querier.query_deep("serial_number") == "1106035A012D2400") +assert(dev_querier.query_deep("address") == "/dev/ttyACM1") + +print("---- filtered query ---") +filter_list = [ + ObjectAttrQuerier.AttrQueryFilter(attr_key="serial_number", attr_value="1106035A012D2400", exact=True), + ObjectAttrQuerier.AttrQueryFilter(attr_key="address", attr_value="/dev/ttyACM1", exact=True), + ObjectAttrQuerier.AttrQueryFilter(attr_key="name", attr_value="CY8CKIT-062S2-AI", exact=True) +] +assert(dev_querier.query_filtered("address", filter_list, deep=True) == "/dev/ttyACM1") +assert(dev_querier.query_filtered("address", filter_list) == None) +assert(dev_querier.query_filtered("serial_number", filter_list, deep=True) == "1106035A012D2400") +assert(dev_querier.query_filtered("serial_number", filter_list) == None) +assert(dev_querier.query_filtered("name", filter_list, deep=True) == "CY8CKIT-062S2-AI") + +filter_list = [ + ObjectAttrQuerier.AttrQueryFilter(attr_key="address", attr_value="/dev/tty", exact=False) +] + +assert(dev_querier.query_filtered("address", filter_list, deep=True) == "/dev/ttyACM1") + +print("---- filtered query list ---") + +filter_list = [ + ObjectAttrQuerier.AttrQueryFilter(attr_key="serial_number", attr_value="0D170C5A012D2400") +] + +found_query_list = ObjectAttrQuerier.query_list(dev_list, "address", filter_list, deep=True) +assert(len(found_query_list) == 1) +assert("/dev/ttyACM0" in found_query_list) + +found_query_list = ObjectAttrQuerier.query_list(dev_list, "address") +assert(len(found_query_list) == 0) + +found_query_list = ObjectAttrQuerier.query_list(dev_list, "address", [], deep=True) +assert(len(found_query_list) == 2) + + + + diff --git a/uhubctl.py b/uhubctl.py index 2786d7f..e7b4bc5 100644 --- a/uhubctl.py +++ b/uhubctl.py @@ -99,6 +99,11 @@ def __run_cmd(self, cmd_args: list[str]) -> None: this is not considered an error. A USB hub may not be connected or available. + If uhubctl is not installed, it silently returns an empty output. + This enables compatibility with systems where uhubctl is not + available. + Otherwise, it prints the stderror message. + Args: cmd_args: List of command-line arguments for uhubctl @@ -106,9 +111,17 @@ def __run_cmd(self, cmd_args: list[str]) -> None: The standard output from the uhubctl command as a string. An empty string if no devices are detected or an error occurs. """ - uhub_cmd = ["uhubctl"] + cmd_args - uhub_proc = subprocess.run(uhub_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - uhub_err = uhub_proc.stderr.decode('utf-8') + try: + uhub_cmd = ["uhubctl"] + cmd_args + uhub_proc = subprocess.run(uhub_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + uhub_err = uhub_proc.stderr.decode('utf-8') + except Exception as e: + uhubctl_not_installed_error = "[Errno 2] No such file or directory: 'uhubctl'" + if not uhubctl_not_installed_error in str(e): + print(f"error: unable to run uhubctl command '{cmd_args}': {e}") + + self.last_cmd_output = "" + return if uhub_proc.returncode != 0: if not Uhubctl.__are_devices_detected(uhub_err):