Skip to content

Runner

runner

ANTA runner function.

adjust_rlimit_nofile

adjust_rlimit_nofile() -> tuple[int, int]

Adjust the maximum number of open file descriptors for the ANTA process.

The limit is set to the lower of the current hard limit and the value of the ANTA_NOFILE environment variable.

If the ANTA_NOFILE environment variable is not set or is invalid, DEFAULT_NOFILE is used.

Returns:

Type Description
tuple[int, int]

The new soft and hard limits for open file descriptors.

Source code in anta/runner.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def adjust_rlimit_nofile() -> tuple[int, int]:
    """Adjust the maximum number of open file descriptors for the ANTA process.

    The limit is set to the lower of the current hard limit and the value of the ANTA_NOFILE environment variable.

    If the `ANTA_NOFILE` environment variable is not set or is invalid, `DEFAULT_NOFILE` is used.

    Returns
    -------
    tuple[int, int]
        The new soft and hard limits for open file descriptors.
    """
    try:
        nofile = int(os.environ.get("ANTA_NOFILE", DEFAULT_NOFILE))
    except ValueError as exception:
        logger.warning("The ANTA_NOFILE environment variable value is invalid: %s\nDefault to %s.", exc_to_str(exception), DEFAULT_NOFILE)
        nofile = DEFAULT_NOFILE

    limits = resource.getrlimit(resource.RLIMIT_NOFILE)
    logger.debug("Initial limit numbers for open file descriptors for the current ANTA process: Soft Limit: %s | Hard Limit: %s", limits[0], limits[1])
    nofile = min(limits[1], nofile)
    logger.debug("Setting soft limit for open file descriptors for the current ANTA process to %s", nofile)
    resource.setrlimit(resource.RLIMIT_NOFILE, (nofile, limits[1]))
    return resource.getrlimit(resource.RLIMIT_NOFILE)

get_coroutines

get_coroutines(
    selected_tests: defaultdict[
        AntaDevice, set[AntaTestDefinition]
    ],
    manager: ResultManager | None = None,
) -> list[Coroutine[Any, Any, TestResult]]

Get the coroutines for the ANTA run.

Parameters:

Name Type Description Default
selected_tests defaultdict[AntaDevice, set[AntaTestDefinition]]

A mapping of devices to the tests to run. The selected tests are generated by the prepare_tests function.

required
manager ResultManager | None

An optional ResultManager object to pre-populate with the test results. Used in dry-run mode.

None

Returns:

Type Description
list[Coroutine[Any, Any, TestResult]]

The list of coroutines to run.

Source code in anta/runner.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]], manager: ResultManager | None = None) -> list[Coroutine[Any, Any, TestResult]]:
    """Get the coroutines for the ANTA run.

    Parameters
    ----------
    selected_tests
        A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function.
    manager
        An optional ResultManager object to pre-populate with the test results. Used in dry-run mode.

    Returns
    -------
    list[Coroutine[Any, Any, TestResult]]
        The list of coroutines to run.
    """
    coros = []
    for device, test_definitions in selected_tests.items():
        for test in test_definitions:
            try:
                test_instance = test.test(device=device, inputs=test.inputs)
                if manager is not None:
                    manager.add(test_instance.result)
                coros.append(test_instance.test())
            except Exception as e:  # noqa: PERF203, BLE001
                # An AntaTest instance is potentially user-defined code.
                # We need to catch everything and exit gracefully with an error message.
                message = "\n".join(
                    [
                        f"There is an error when creating test {test.test.__module__}.{test.test.__name__}.",
                        f"If this is not a custom test implementation: {GITHUB_SUGGESTION}",
                    ],
                )
                anta_log_exception(e, message, logger)
    return coros

log_cache_statistics

log_cache_statistics(devices: list[AntaDevice]) -> None

Log cache statistics for each device in the inventory.

Parameters:

Name Type Description Default
devices list[AntaDevice]

List of devices in the inventory.

required
Source code in anta/runner.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def log_cache_statistics(devices: list[AntaDevice]) -> None:
    """Log cache statistics for each device in the inventory.

    Parameters
    ----------
    devices
        List of devices in the inventory.
    """
    for device in devices:
        if device.cache_statistics is not None:
            msg = (
                f"Cache statistics for '{device.name}': "
                f"{device.cache_statistics['cache_hits']} hits / {device.cache_statistics['total_commands_sent']} "
                f"command(s) ({device.cache_statistics['cache_hit_ratio']})"
            )
            logger.info(msg)
        else:
            logger.info("Caching is not enabled on %s", device.name)

main async

main(
    manager: ResultManager,
    inventory: AntaInventory,
    catalog: AntaCatalog,
    devices: set[str] | None = None,
    tests: set[str] | None = None,
    tags: set[str] | None = None,
    *,
    established_only: bool = True,
    dry_run: bool = False
) -> None

Run ANTA.

Use this as an entrypoint to the test framework in your script. ResultManager object gets updated with the test results.

Parameters:

Name Type Description Default
manager ResultManager

ResultManager object to populate with the test results.

required
inventory AntaInventory

AntaInventory object that includes the device(s).

required
catalog AntaCatalog

AntaCatalog object that includes the list of tests.

required
devices set[str] | None

Devices on which to run tests. None means all devices. These may come from the --device / -d CLI option in NRFU.

None
tests set[str] | None

Tests to run against devices. None means all tests. These may come from the --test / -t CLI option in NRFU.

None
tags set[str] | None

Tags to filter devices from the inventory. These may come from the --tags CLI option in NRFU.

None
established_only bool

Include only established device(s).

True
dry_run bool

Build the list of coroutine to run and stop before test execution.

False
Source code in anta/runner.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
@cprofile()
async def main(
    manager: ResultManager,
    inventory: AntaInventory,
    catalog: AntaCatalog,
    devices: set[str] | None = None,
    tests: set[str] | None = None,
    tags: set[str] | None = None,
    *,
    established_only: bool = True,
    dry_run: bool = False,
) -> None:
    """Run ANTA.

    Use this as an entrypoint to the test framework in your script.
    ResultManager object gets updated with the test results.

    Parameters
    ----------
    manager
        ResultManager object to populate with the test results.
    inventory
        AntaInventory object that includes the device(s).
    catalog
        AntaCatalog object that includes the list of tests.
    devices
        Devices on which to run tests. None means all devices. These may come from the `--device / -d` CLI option in NRFU.
    tests
        Tests to run against devices. None means all tests. These may come from the `--test / -t` CLI option in NRFU.
    tags
        Tags to filter devices from the inventory. These may come from the `--tags` CLI option in NRFU.
    established_only
        Include only established device(s).
    dry_run
        Build the list of coroutine to run and stop before test execution.
    """
    if not catalog.tests:
        logger.info("The list of tests is empty, exiting")
        return

    with Catchtime(logger=logger, message="Preparing ANTA NRFU Run"):
        # Setup the inventory
        selected_inventory = inventory if dry_run else await setup_inventory(inventory, tags, devices, established_only=established_only)
        if selected_inventory is None:
            return

        with Catchtime(logger=logger, message="Preparing the tests"):
            selected_tests = prepare_tests(selected_inventory, catalog, tests, tags)
            if selected_tests is None:
                return
            final_tests_count = sum(len(tests) for tests in selected_tests.values())

        run_info = (
            "--- ANTA NRFU Run Information ---\n"
            f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n"
            f"Total number of selected tests: {final_tests_count}\n"
        )

        if os.name == "posix":
            # Adjust the maximum number of open file descriptors for the ANTA process
            limits = adjust_rlimit_nofile()
            run_info += f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n"
        else:
            # Running on non-Posix system, cannot manage the resource.
            limits = (sys.maxsize, sys.maxsize)
            run_info += "Running on a non-POSIX system, cannot adjust the maximum number of file descriptors.\n"

        run_info += "---------------------------------"

        logger.info(run_info)

        if final_tests_count > limits[0]:
            logger.warning(
                "The number of concurrent tests is higher than the open file descriptors limit for this ANTA process.\n"
                "Errors may occur while running the tests.\n"
                "Please consult the ANTA FAQ."
            )

        coroutines = get_coroutines(selected_tests, manager if dry_run else None)

    if dry_run:
        logger.info("Dry-run mode, exiting before running the tests.")
        for coro in coroutines:
            coro.close()
        return

    if AntaTest.progress is not None:
        AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=len(coroutines))

    with Catchtime(logger=logger, message="Running ANTA tests"):
        results = await asyncio.gather(*coroutines)
        for result in results:
            manager.add(result)

    log_cache_statistics(selected_inventory.devices)

prepare_tests

prepare_tests(
    inventory: AntaInventory,
    catalog: AntaCatalog,
    tests: set[str] | None,
    tags: set[str] | None,
) -> (
    defaultdict[AntaDevice, set[AntaTestDefinition]] | None
)

Prepare the tests to run.

Parameters:

Name Type Description Default
inventory AntaInventory

AntaInventory object that includes the device(s).

required
catalog AntaCatalog

AntaCatalog object that includes the list of tests.

required
tests set[str] | None

Tests to run against devices. None means all tests.

required
tags set[str] | None

Tags to filter devices from the inventory.

required

Returns:

Type Description
defaultdict[AntaDevice, set[AntaTestDefinition]] | None

A mapping of devices to the tests to run or None if there are no tests to run.

Source code in anta/runner.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def prepare_tests(
    inventory: AntaInventory, catalog: AntaCatalog, tests: set[str] | None, tags: set[str] | None
) -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None:
    """Prepare the tests to run.

    Parameters
    ----------
    inventory
        AntaInventory object that includes the device(s).
    catalog
        AntaCatalog object that includes the list of tests.
    tests
        Tests to run against devices. None means all tests.
    tags
        Tags to filter devices from the inventory.

    Returns
    -------
    defaultdict[AntaDevice, set[AntaTestDefinition]] | None
        A mapping of devices to the tests to run or None if there are no tests to run.
    """
    # Build indexes for the catalog. If `tests` is set, filter the indexes based on these tests
    catalog.build_indexes(filtered_tests=tests)

    # Using a set to avoid inserting duplicate tests
    device_to_tests: defaultdict[AntaDevice, set[AntaTestDefinition]] = defaultdict(set)

    total_test_count = 0

    # Create the device to tests mapping from the tags
    for device in inventory.devices:
        if tags:
            # If there are CLI tags, execute tests with matching tags for this device
            if not (matching_tags := tags.intersection(device.tags)):
                # The device does not have any selected tag, skipping
                continue
            device_to_tests[device].update(catalog.get_tests_by_tags(matching_tags))
        else:
            # If there is no CLI tags, execute all tests that do not have any tags
            device_to_tests[device].update(catalog.tag_to_tests[None])

            # Then add the tests with matching tags from device tags
            device_to_tests[device].update(catalog.get_tests_by_tags(device.tags))

        total_test_count += len(device_to_tests[device])

    if total_test_count == 0:
        msg = (
            f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current "
            "test catalog and device inventory, please verify your inputs."
        )
        logger.warning(msg)
        return None

    return device_to_tests

setup_inventory async

setup_inventory(
    inventory: AntaInventory,
    tags: set[str] | None,
    devices: set[str] | None,
    *,
    established_only: bool
) -> AntaInventory | None

Set up the inventory for the ANTA run.

Parameters:

Name Type Description Default
inventory AntaInventory

AntaInventory object that includes the device(s).

required
tags set[str] | None

Tags to filter devices from the inventory.

required
devices set[str] | None

Devices on which to run tests. None means all devices.

required
established_only bool

If True use return only devices where a connection is established.

required

Returns:

Type Description
AntaInventory | None

The filtered inventory or None if there are no devices to run tests on.

Source code in anta/runner.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devices: set[str] | None, *, established_only: bool) -> AntaInventory | None:
    """Set up the inventory for the ANTA run.

    Parameters
    ----------
    inventory
        AntaInventory object that includes the device(s).
    tags
        Tags to filter devices from the inventory.
    devices
        Devices on which to run tests. None means all devices.
    established_only
        If True use return only devices where a connection is established.

    Returns
    -------
    AntaInventory | None
        The filtered inventory or None if there are no devices to run tests on.
    """
    if len(inventory) == 0:
        logger.info("The inventory is empty, exiting")
        return None

    # Filter the inventory based on the CLI provided tags and devices if any
    selected_inventory = inventory.get_inventory(tags=tags, devices=devices) if tags or devices else inventory

    with Catchtime(logger=logger, message="Connecting to devices"):
        # Connect to the devices
        await selected_inventory.connect_inventory()

    # Remove devices that are unreachable
    selected_inventory = selected_inventory.get_inventory(established_only=established_only)

    # If there are no devices in the inventory after filtering, exit
    if not selected_inventory.devices:
        msg = f'No reachable device {f"matching the tags {tags} " if tags else ""}was found.{f" Selected devices: {devices} " if devices is not None else ""}'
        logger.warning(msg)
        return None

    return selected_inventory