diff --git a/nodescraper/cli/helper.py b/nodescraper/cli/helper.py index b07c1696..bf5d3989 100644 --- a/nodescraper/cli/helper.py +++ b/nodescraper/cli/helper.py @@ -331,7 +331,7 @@ def generate_reference_config( for obj in results: if obj.result_data.collection_result.status != ExecutionStatus.OK: logger.warning( - "Plugin: %s result status is %, skipping", + "Plugin: %s result status is %s, skipping", obj.source, obj.result_data.collection_result.status, ) diff --git a/nodescraper/plugins/inband/kernel_module/__init__.py b/nodescraper/plugins/inband/kernel_module/__init__.py new file mode 100644 index 00000000..fdb6d9c8 --- /dev/null +++ b/nodescraper/plugins/inband/kernel_module/__init__.py @@ -0,0 +1,25 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### diff --git a/nodescraper/plugins/inband/kernel_module/analyzer_args.py b/nodescraper/plugins/inband/kernel_module/analyzer_args.py new file mode 100644 index 00000000..fcf4fb4e --- /dev/null +++ b/nodescraper/plugins/inband/kernel_module/analyzer_args.py @@ -0,0 +1,59 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +import re + +from nodescraper.models import AnalyzerArgs +from nodescraper.plugins.inband.kernel_module.kernel_module_data import ( + KernelModuleDataModel, +) + + +class KernelModuleAnalyzerArgs(AnalyzerArgs): + kernel_modules: dict[str, dict] = {} + regex_filter: list[str] = ["amd"] + + @classmethod + def build_from_model(cls, datamodel: KernelModuleDataModel) -> "KernelModuleAnalyzerArgs": + """build analyzer args from data model and filter by regex_filter + + Args: + datamodel (KernelModuleDataModel): data model for plugin + + Returns: + KernelModuleAnalyzerArgs: instance of analyzer args class + """ + + pattern_regex = re.compile("amd", re.IGNORECASE) + filtered_mods = { + name: data + for name, data in datamodel.kernel_modules.items() + if pattern_regex.search(name) + } + + return cls( + kernel_modules=filtered_mods, + regex_filter=[], + ) diff --git a/nodescraper/plugins/inband/kernel_module/kernel_module_analyzer.py b/nodescraper/plugins/inband/kernel_module/kernel_module_analyzer.py new file mode 100644 index 00000000..6a6f8c0d --- /dev/null +++ b/nodescraper/plugins/inband/kernel_module/kernel_module_analyzer.py @@ -0,0 +1,209 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +import re +from typing import Optional + +from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus +from nodescraper.interfaces import DataAnalyzer +from nodescraper.models import TaskResult + +from .analyzer_args import KernelModuleAnalyzerArgs +from .kernel_module_data import KernelModuleDataModel + + +class KernelModuleAnalyzer(DataAnalyzer[KernelModuleDataModel, KernelModuleAnalyzerArgs]): + """Check kernel matches expected versions""" + + DATA_MODEL = KernelModuleDataModel + + def filter_modules_by_pattern( + self, modules: dict[str, dict], patterns: list[str] = None + ) -> tuple[dict[str, dict], list[str]]: + """Filter modules by pattern + + Args: + modules (dict[str, dict]): modules to be filtered + patterns (list[str], optional): pattern to us. Defaults to None. + + Returns: + tuple[dict[str, dict], list[str]]: tuple - dict of modules filtered, + list of unmatched pattern + """ + if patterns is None: + return modules, [] + + matched_modules = {} + unmatched_patterns = [] + + pattern_match_flags = {p: False for p in patterns} + + for mod_name in modules: + for p in patterns: + if re.search(p, mod_name, re.IGNORECASE): + matched_modules[mod_name] = modules[mod_name] + pattern_match_flags[p] = True + break + + unmatched_patterns = [p for p, matched in pattern_match_flags.items() if not matched] + + return matched_modules, unmatched_patterns + + def filter_modules_by_name_and_param( + self, modules: dict[str, dict], to_match: dict[str, dict] + ) -> tuple[dict[str, dict], dict[str, dict]]: + """Filter modules by name, param and value + + Args: + modules (dict[str, dict]): modules to be filtered + to_match (dict[str, dict]): modules to match + + Returns: + tuple[dict[str, dict], dict[str, dict]]: tuple - dict of modules filtered, + dict of modules unmatched + """ + if not to_match: + return modules, {} + + filtered = {} + unmatched = {} + + for mod_name, expected_data in to_match.items(): + expected_params = expected_data.get("parameters", {}) + actual_data = modules.get(mod_name) + + if not actual_data: + unmatched[mod_name] = expected_data + continue + + actual_params = actual_data.get("parameters", {}) + param_mismatches = {} + + for param, expected_val in expected_params.items(): + actual_val = actual_params.get(param) + if actual_val != expected_val: + param_mismatches[param] = { + "expected": expected_val, + "actual": actual_val if actual_val is not None else "", + } + + if param_mismatches: + unmatched[mod_name] = {"parameters": param_mismatches} + else: + filtered[mod_name] = actual_data + + return filtered, unmatched + + def analyze_data( + self, data: KernelModuleDataModel, args: Optional[KernelModuleAnalyzerArgs] = None + ) -> TaskResult: + """Analyze the kernel modules and associated parameters. + + Args: + data (KernelModuleDataModel): KernelModule data to analyze. + args (Optional[KernelModuleAnalyzerArgs], optional): KernelModule analyzer args. + + Returns: + TaskResult: Result of the analysis containing status and message. + """ + if not args: + args = KernelModuleAnalyzerArgs() + else: + if args.regex_filter and args.kernel_modules: + self.logger.warning( + "Both regex_filter and kernel_modules provided in analyzer args. kernel_modules will be ignored" + ) + + self.result.status = ExecutionStatus.OK + + if args.regex_filter: + try: + filtered_modules, unmatched_pattern = self.filter_modules_by_pattern( + data.kernel_modules, args.regex_filter + ) + except re.error: + self._log_event( + category=EventCategory.RUNTIME, + description="KernelModule regex is invalid", + data={"regex_filters": {args.regex_filter}}, + priority=EventPriority.ERROR, + ) + self.result.message = "Kernel modules failed to match regex" + self.result.status = ExecutionStatus.ERROR + return self.result + + if unmatched_pattern: + self._log_event( + category=EventCategory.RUNTIME, + description="KernelModules did not match all patterns", + data={"unmatched_pattern: ": unmatched_pattern}, + priority=EventPriority.INFO, + ) + self.result.message = "Kernel modules failed to match every pattern" + self.result.status = ExecutionStatus.ERROR + return self.result + + self._log_event( + category=EventCategory.OS, + description="KernelModules analyzed", + data={"filtered_modules": filtered_modules}, + priority=EventPriority.INFO, + ) + return self.result + + elif args.kernel_modules: + filtered_modules, not_matched = self.filter_modules_by_name_and_param( + data.kernel_modules, args.kernel_modules + ) + + # no modules matched + if not filtered_modules and not_matched: + self._log_event( + category=EventCategory.RUNTIME, + description="KernelModules: no modules matched", + data=args.kernel_modules, + priority=EventPriority.ERROR, + ) + self.result.message = "Kernel modules not matched" + self.result.status = ExecutionStatus.ERROR + return self.result + # some modules matched + elif filtered_modules and not_matched: + + self._log_event( + category=EventCategory.RUNTIME, + description="KernelModules: not all modules matched", + data=not_matched, + priority=EventPriority.ERROR, + ) + self.result.message = "Kernel modules not matched" + self.result.status = ExecutionStatus.ERROR + return self.result + else: + self.result.message = ( + "No values provided in analysis args for: kernel_modules and regex_match" + ) + self.result.status = ExecutionStatus.NOT_RAN + return self.result diff --git a/nodescraper/plugins/inband/kernel_module/kernel_module_collector.py b/nodescraper/plugins/inband/kernel_module/kernel_module_collector.py new file mode 100644 index 00000000..4d190b91 --- /dev/null +++ b/nodescraper/plugins/inband/kernel_module/kernel_module_collector.py @@ -0,0 +1,156 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +from nodescraper.base import InBandDataCollector +from nodescraper.connection.inband.inband import CommandArtifact +from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily +from nodescraper.models import TaskResult + +from .kernel_module_data import KernelModuleDataModel + + +class KernelModuleCollector(InBandDataCollector[KernelModuleDataModel, None]): + """Read kernel modules and associated parameters""" + + DATA_MODEL = KernelModuleDataModel + + def parse_proc_modules(self, output: dict) -> dict: + """Parse command output and return dict of modules + + Args: + output (dict): sut cmd output + + Returns: + dict: parsed modules + """ + modules = {} + for line in output.strip().splitlines(): + parts = line.split() + if not parts: + continue + name = parts[0] + modules[name] = { + "parameters": {}, + } + return modules + + def get_module_parameters(self, module_name: str) -> dict: + """Fetches parameter names and values for a given kernel module using _run_sut_cmd + + Args: + module_name (str): name of module to fetch params for + + Returns: + dict: param dict of module + """ + param_dict = {} + param_dir = f"/sys/module/{module_name}/parameters" + + list_params_cmd = f"ls {param_dir}" + res = self._run_sut_cmd(list_params_cmd) + if res.exit_code != 0: + return param_dict + + for param in res.stdout.strip().splitlines(): + param_path = f"{param_dir}/{param}" + value_res = self._run_sut_cmd(f"cat {param_path}") + value = value_res.stdout.strip() if value_res.exit_code == 0 else "" + param_dict[param] = value + + return param_dict + + def collect_all_module_info(self) -> tuple[dict, CommandArtifact]: + """Get all modules and its associated params and values + + Raises: + RuntimeError: error for failing to get modules + + Returns: + tuple[dict, CommandArtifact]: modules found and exit code + """ + modules = {} + res = self._run_sut_cmd("cat /proc/modules") + if res.exit_code != 0: + self._log_event( + category=EventCategory.OS, + description="Failed to read /proc/modules", + data={"command": res.command, "exit_code": res.exit_code}, + priority=EventPriority.ERROR, + console_log=True, + ) + return modules + + modules = self.parse_proc_modules(res.stdout) + + for mod in modules: + modules[mod]["parameters"] = self.get_module_parameters(mod) + + if not modules: + self._log_event( + category=EventCategory.OS, + description="Error checking kernel modules", + data={"command": res.command, "exit_code": res.exit_code}, + priority=EventPriority.ERROR, + console_log=True, + ) + + return modules + + def collect_data(self, args=None) -> tuple[TaskResult, KernelModuleDataModel | None]: + """ + Collect kernel modules data. + + Returns: + tuple[TaskResult, KernelModuleDataModel | None]: tuple containing the task result and kernel data model or None if not found. + """ + kernel_modules = {} + km_data: KernelModuleDataModel | None = None + if self.system_info.os_family == OSFamily.WINDOWS: + res = self._run_sut_cmd("wmic os get Version /Value") + if res.exit_code == 0: + for line in res.stdout.splitlines(): + if line.startswith("Version="): + version = line.split("=", 1)[1] + kernel_modules = {version: {"parameters": {}}} + break + + else: + kernel_modules = self.collect_all_module_info() + + if kernel_modules: + km_data = KernelModuleDataModel(kernel_modules=kernel_modules) + self._log_event( + category="KERNEL_READ", + description="Kernel modules read", + data=km_data.model_dump(), + priority=EventPriority.INFO, + ) + self.result.message = f"{len(km_data.kernel_modules)} kernel modules collected" + self.result.status = ExecutionStatus.OK + else: + self.result.message = "Kernel modules not found" + self.result.status = ExecutionStatus.ERROR + + return self.result, km_data diff --git a/nodescraper/plugins/inband/kernel_module/kernel_module_data.py b/nodescraper/plugins/inband/kernel_module/kernel_module_data.py new file mode 100644 index 00000000..f9f91b61 --- /dev/null +++ b/nodescraper/plugins/inband/kernel_module/kernel_module_data.py @@ -0,0 +1,31 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### + +from nodescraper.models import DataModel + + +class KernelModuleDataModel(DataModel): + kernel_modules: dict diff --git a/nodescraper/plugins/inband/kernel_module/kernel_module_plugin.py b/nodescraper/plugins/inband/kernel_module/kernel_module_plugin.py new file mode 100644 index 00000000..77a8c5a9 --- /dev/null +++ b/nodescraper/plugins/inband/kernel_module/kernel_module_plugin.py @@ -0,0 +1,43 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +from nodescraper.base import InBandDataPlugin + +from .analyzer_args import KernelModuleAnalyzerArgs +from .kernel_module_analyzer import KernelModuleAnalyzer +from .kernel_module_collector import KernelModuleCollector +from .kernel_module_data import KernelModuleDataModel + + +class KernelModulePlugin(InBandDataPlugin[KernelModuleDataModel, None, KernelModuleAnalyzerArgs]): + """Plugin for collection and analysis of kernel data""" + + DATA_MODEL = KernelModuleDataModel + + COLLECTOR = KernelModuleCollector + + ANALYZER = KernelModuleAnalyzer + + ANALYZER_ARGS = KernelModuleAnalyzerArgs diff --git a/test/unit/plugin/test_kernel_module_analyzer.py b/test/unit/plugin/test_kernel_module_analyzer.py new file mode 100644 index 00000000..0d824379 --- /dev/null +++ b/test/unit/plugin/test_kernel_module_analyzer.py @@ -0,0 +1,120 @@ +import pytest + +from nodescraper.enums.eventcategory import EventCategory +from nodescraper.enums.executionstatus import ExecutionStatus +from nodescraper.models.systeminfo import OSFamily +from nodescraper.plugins.inband.kernel_module.analyzer_args import ( + KernelModuleAnalyzerArgs, +) +from nodescraper.plugins.inband.kernel_module.kernel_module_analyzer import ( + KernelModuleAnalyzer, +) +from nodescraper.plugins.inband.kernel_module.kernel_module_data import ( + KernelModuleDataModel, +) + + +@pytest.fixture +def sample_modules(): + return { + "modA": {"parameters": {"p": 1}}, + "otherMod": {"parameters": {"p": 2}}, + "TESTmod": {"parameters": {"p": 3}}, + "amdABC": {"parameters": {"p": 3}}, + } + + +@pytest.fixture +def data_model(sample_modules): + return KernelModuleDataModel(kernel_modules=sample_modules) + + +@pytest.fixture +def analyzer(system_info): + system_info.os_family = OSFamily.LINUX + return KernelModuleAnalyzer(system_info=system_info) + + +def test_filter_modules_by_pattern_none(sample_modules, analyzer): + matched, unmatched = analyzer.filter_modules_by_pattern(sample_modules, None) + assert matched == sample_modules + assert unmatched == [] + + +def test_filter_modules_by_pattern_strict(sample_modules, analyzer): + matched, unmatched = analyzer.filter_modules_by_pattern(sample_modules, [r"mod$"]) + assert set(matched) == {"otherMod", "TESTmod"} + assert unmatched == [] + + +def test_filter_modules_by_pattern_unmatched(sample_modules, analyzer): + matched, unmatched = analyzer.filter_modules_by_pattern(sample_modules, ["foo"]) + assert matched == {} + assert unmatched == ["foo"] + + +def test_filter_name_and_param_all_match(sample_modules, analyzer): + to_match = {"modA": {"parameters": {"p": 1}}} + matched, unmatched = analyzer.filter_modules_by_name_and_param(sample_modules, to_match) + assert matched == {"modA": sample_modules["modA"]} + assert unmatched == {} + + +def test_filter_name_and_param_param_mismatch(sample_modules, analyzer): + to_match = {"modA": {"parameters": {"p": 999}}} + matched, unmatched = analyzer.filter_modules_by_name_and_param(sample_modules, to_match) + assert matched == {} + assert "modA" in unmatched + assert "p" in unmatched["modA"]["parameters"] + + +def test_filter_name_and_param_missing_module(sample_modules, analyzer): + to_match = {"bogus": {"parameters": {"x": 1}}} + matched, unmatched = analyzer.filter_modules_by_name_and_param(sample_modules, to_match) + assert matched == {} + assert "bogus" in unmatched + + +def test_analyze_data_default(data_model, analyzer): + result = analyzer.analyze_data(data_model, None) + assert result.status == ExecutionStatus.OK + + +def test_analyze_data_regex_success(data_model, analyzer): + args = KernelModuleAnalyzerArgs(regex_filter=["^TESTmod$"]) + result = analyzer.analyze_data(data_model, args) + assert result.status == ExecutionStatus.OK + ev = result.events[0] + assert ev.description == "KernelModules analyzed" + fm = ev.data["filtered_modules"] + assert set(fm) == {"TESTmod"} + + +def test_analyze_data_regex_invalid_pattern(data_model, analyzer): + args = KernelModuleAnalyzerArgs(regex_filter=["*invalid"]) + result = analyzer.analyze_data(data_model, args) + assert result.status in (ExecutionStatus.ERROR, ExecutionStatus.EXECUTION_FAILURE) + assert any(EventCategory.RUNTIME.value in ev.category for ev in result.events) + + +def test_analyze_data_regex_unmatched_patterns(data_model, analyzer): + args = KernelModuleAnalyzerArgs(regex_filter=["modA", "nope"]) + result = analyzer.analyze_data(data_model, args) + assert result.status == ExecutionStatus.ERROR + assert any(ev.description == "KernelModules did not match all patterns" for ev in result.events) + + +def test_analyze_data_name_only_success(data_model, analyzer): + args = KernelModuleAnalyzerArgs(kernel_modules={"modA": {"parameters": {"p": 1}}}) + result = analyzer.analyze_data(data_model, args) + assert result.status == ExecutionStatus.OK + assert result.message == "task completed successfully" + + +def test_no_analyzer_args(data_model, analyzer): + args = KernelModuleAnalyzerArgs(kernel_modules={}, regex_filter=[]) + result = analyzer.analyze_data(data_model, args) + assert result.status == ExecutionStatus.NOT_RAN + assert ( + result.message == "No values provided in analysis args for: kernel_modules and regex_match" + ) diff --git a/test/unit/plugin/test_kernel_module_collector.py b/test/unit/plugin/test_kernel_module_collector.py new file mode 100644 index 00000000..65ffae6e --- /dev/null +++ b/test/unit/plugin/test_kernel_module_collector.py @@ -0,0 +1,116 @@ +from types import SimpleNamespace + +import pytest + +from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily +from nodescraper.plugins.inband.kernel_module.kernel_module_collector import ( + KernelModuleCollector, +) +from nodescraper.plugins.inband.kernel_module.kernel_module_data import ( + KernelModuleDataModel, +) + + +@pytest.fixture +def linux_collector(system_info, conn_mock): + system_info.os_family = OSFamily.LINUX + return KernelModuleCollector(system_info, conn_mock) + + +@pytest.fixture +def win_collector(system_info, conn_mock): + system_info.os_family = OSFamily.WINDOWS + return KernelModuleCollector(system_info, conn_mock) + + +def make_artifact(cmd, exit_code, stdout): + return SimpleNamespace(command=cmd, exit_code=exit_code, stdout=stdout, stderr="") + + +def test_parse_proc_modules_empty(linux_collector): + assert linux_collector.parse_proc_modules("") == {} + + +def test_parse_proc_modules_basic(linux_collector): + out = "modA 16384 0 - Live 0x00000000\nmodB 32768 1 - Live 0x00001000" + parsed = linux_collector.parse_proc_modules(out) + assert set(parsed) == {"modA", "modB"} + for v in parsed.values(): + assert v == {"parameters": {}} + + +def test_get_module_parameters_no_params(linux_collector): + linux_collector._run_sut_cmd = lambda cmd: make_artifact(cmd, 1, "") + params = linux_collector.get_module_parameters("modA") + assert params == {} + + +def test_get_module_parameters_with_params(linux_collector): + seq = [ + make_artifact("ls /sys/module/modA/parameters", 0, "p1\np2"), + make_artifact("cat /sys/module/modA/parameters/p1", 0, "val1\n"), + make_artifact("cat /sys/module/modA/parameters/p2", 1, ""), + ] + linux_collector._run_sut_cmd = lambda cmd, seq=seq: seq.pop(0) + params = linux_collector.get_module_parameters("modA") + assert params == {"p1": "val1", "p2": ""} + + +def test_collect_all_module_info_success(linux_collector): + seq = [ + make_artifact("cat /proc/modules", 0, "modX 0 0 - Live\n"), + make_artifact("ls /sys/module/modX/parameters", 0, ""), + ] + linux_collector._run_sut_cmd = lambda cmd, seq=seq: seq.pop(0) + modules = linux_collector.collect_all_module_info() + assert modules == {"modX": {"parameters": {}}} + + +def test_collect_data_linux_success(linux_collector): + seq = [ + make_artifact("cat /proc/modules", 0, "m1 0 0 - Live\n"), + make_artifact("ls /sys/module/m1/parameters", 1, ""), + ] + linux_collector._run_sut_cmd = lambda cmd, seq=seq: seq.pop(0) + + result, data = linux_collector.collect_data() + + assert result.status == ExecutionStatus.OK + assert isinstance(data, KernelModuleDataModel) + evt = result.events[-1] + assert evt.category == "KERNEL_READ" + assert evt.priority == EventPriority.INFO.value + assert result.message == "1 kernel modules collected" + assert data.kernel_modules == {"m1": {"parameters": {}}} + + +def test_collect_data_linux_error(linux_collector): + def bad(cmd): + return make_artifact(cmd, 1, "") + + linux_collector._run_sut_cmd = bad + + result, data = linux_collector.collect_data() + assert result.status == ExecutionStatus.ERROR + assert data is None + evt = result.events[0] + assert evt.category == EventCategory.RUNTIME.value or evt.category == EventCategory.OS.value + assert "Failed to read /proc/modules" in evt.description + + +def test_collect_data_windows_success(win_collector): + win_collector._run_sut_cmd = lambda cmd: make_artifact( + "wmic os get Version /Value", 0, "Version=10.0.19041\r\n" + ) + result, data = win_collector.collect_data() + assert result.status == ExecutionStatus.OK + assert isinstance(data, KernelModuleDataModel) + assert data.kernel_modules == {"10.0.19041": {"parameters": {}}} + assert result.message == "1 kernel modules collected" + + +def test_collect_data_windows_not_found(win_collector): + win_collector._run_sut_cmd = lambda cmd: make_artifact("wmic os get", 0, "") + result, data = win_collector.collect_data() + assert result.status == ExecutionStatus.ERROR + assert data is None