diff --git a/changelogs/fragments/11764-logrotate-fixes.yml b/changelogs/fragments/11764-logrotate-fixes.yml new file mode 100644 index 0000000000..7c93e5cdd7 --- /dev/null +++ b/changelogs/fragments/11764-logrotate-fixes.yml @@ -0,0 +1,6 @@ +bugfixes: + - logrotate - fixes ``TypeError`` when ``shred_cycles`` is ``None`` and corrects ``enabled=None`` handling in ``get_config_path()`` (https://github.com/ansible-collections/community.general/pull/11764). + - logrotate - adds missing default values for ``state`` and ``config_dir`` parameters, and adds ``required_by`` declarations for shred and compression parameters (https://github.com/ansible-collections/community.general/pull/11764). + - logrotate - writes configuration files to a temporary file first and validates before atomically moving to the destination, and properly wraps all ``os.remove()`` and ``atomic_move()`` calls in error handling (https://github.com/ansible-collections/community.general/pull/11764). +minor_changes: + - logrotate - adds optional ``backup`` parameter to create a backup of the existing configuration file before writing changes (https://github.com/ansible-collections/community.general/pull/11764). diff --git a/plugins/modules/logrotate.py b/plugins/modules/logrotate.py index ee9d71e8d5..b51779fcef 100644 --- a/plugins/modules/logrotate.py +++ b/plugins/modules/logrotate.py @@ -33,13 +33,14 @@ options: - Whether the configuration should be present or absent. type: str choices: [present, absent] + default: present config_dir: description: - Directory where logrotate configurations are stored. - - Default is V(/etc/logrotate.d) for system-wide configurations. - - Use V(~/.logrotate.d) for user-specific configurations. + - Using V(~/.logrotate.d) is recommended for user-specific configurations. - This directory must exist before using the module. type: path + default: /etc/logrotate.d paths: description: - List of log file paths or patterns to rotate. @@ -95,6 +96,7 @@ options: shred_cycles: description: - Number of times to overwrite files when using O(shred=true). + - When omitted, logrotate uses C(shred)'s default. type: int missing_ok: description: @@ -252,6 +254,13 @@ options: - Base number for rotated files. Allowed values are from V(0) to V(999). - For example, V(1) gives files C(.1), C(.2), and so on instead of C(.0), C(.1). type: int + backup: + description: + - Create a backup of the existing configuration file before overwriting. + - The backup file is created in the same directory with a timestamp suffix. + type: bool + version_added: 12.6.0 + default: true syslog: description: - Send logrotate messages to syslog. @@ -450,15 +459,15 @@ enabled_state: backup_file: description: Path to the backup of the original configuration file, if it was backed up. type: str - returned: success when backup was made + returned: success when O(backup=true) and an existing configuration was overwritten sample: /etc/logrotate.d/nginx.20250101_120000 """ import os import re +import tempfile from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils.common.text.converters import to_native class LogrotateConfig: @@ -479,8 +488,6 @@ class LogrotateConfig: self.config_name = self.params["name"] self.disabled_suffix = ".disabled" - self.config_file = self.get_config_path(self.params["enabled"]) - def get_config_path(self, enabled: bool) -> str: """Get config file path based on enabled state.""" base_path = os.path.join(self.config_dir, self.config_name) @@ -494,9 +501,6 @@ class LogrotateConfig: if self.params["start"] < 0 or self.params["start"] > 999: self.module.fail_json(msg="'start' must be between 0 and 999") - if self.params.get("size") and self.params.get("max_size"): - self.module.fail_json(msg="'size' and 'max_size' parameters are mutually exclusive") - su_val = self.params.get("su") if su_val is not None: if su_val == "": @@ -506,8 +510,18 @@ class LogrotateConfig: if len(su_parts) != 2: self.module.fail_json(msg="'su' parameter must be in format 'user group' or empty string to remove") - if self.params.get("shred_cycles", 1) < 1: - self.module.fail_json(msg="'shred_cycles' must be a positive integer") + if self.params.get("create") is not None: + create_val = self.params["create"] + if not re.match(r"^\d{3,4}(\s+\S+\s+\S+)?\s*$", create_val): + self.module.fail_json( + msg="'create' must be in format 'mode' or 'mode user group' (for example '0640' or '0640 root adm')" + ) + + if self.params.get("shred_cycles") is not None: + if self.params["shred_cycles"] < 1: + self.module.fail_json(msg="'shred_cycles' must be a positive integer") + if self.params.get("shred") is False: + self.module.fail_json(msg="'shred_cycles' requires 'shred=true'") for size_param in ["size", "min_size", "max_size"]: if self.params.get(size_param): @@ -516,53 +530,29 @@ class LogrotateConfig: msg=f"'{size_param}' must be in format 'number[k|M|G]' (for example '100M', '1G')" ) - if self.params.get("old_dir") and self.params.get("no_old_dir"): - self.module.fail_json(msg="'old_dir' and 'no_old_dir' parameters are mutually exclusive") - - copy_params = [self.params.get("copy"), self.params.get("copy_truncate"), self.params.get("rename_copy")] - if sum(1 for v in copy_params if v) > 1: - self.module.fail_json(msg="'copy', 'copy_truncate', and 'rename_copy' parameters are mutually exclusive") - - if self.params.get("delay_compress") and self.params.get("no_delay_compress"): - self.module.fail_json(msg="'delay_compress' and 'no_delay_compress' parameters are mutually exclusive") - if self.params["state"] == "present": - existing_content = self.read_existing_config(any_state=True) + existing_content = self.read_existing_config() if not existing_content and not self.params.get("paths"): self.module.fail_json(msg="'paths' parameter is required when creating a new configuration") - def read_existing_config(self, any_state: bool = False) -> str | None: - """Read existing configuration file. - - Args: - any_state: If True, check both enabled and disabled versions. - If False, only check based on current enabled param. - """ - if any_state: - for suffix in ["", self.disabled_suffix]: - config_path = os.path.join(self.config_dir, self.config_name + suffix) - if os.path.exists(config_path): - self.result["enabled_state"] = suffix == "" - try: - with open(config_path, "r") as f: - return f.read() - except Exception as e: - self.module.fail_json(msg=f"Failed to read config file {config_path}: {to_native(e)}") - else: - config_path = self.get_config_path(self.params["enabled"]) + def read_existing_config(self) -> str | None: + """Read existing configuration file, checking both enabled and disabled versions.""" + for suffix in ["", self.disabled_suffix]: + config_path = os.path.join(self.config_dir, self.config_name + suffix) if os.path.exists(config_path): + self.result["enabled_state"] = suffix == "" try: with open(config_path, "r") as f: return f.read() except Exception as e: - self.module.fail_json(msg=f"Failed to read config file {config_path}: {to_native(e)}") + self.module.fail_json(msg=f"Failed to read config file {config_path}: {e}") return None def generate_config_content(self) -> str: """Generate logrotate configuration content.""" if not self.params.get("paths"): - existing_content = self.read_existing_config(any_state=True) + existing_content = self.read_existing_config() if existing_content: lines = existing_content.strip().split("\n") paths = [] @@ -785,13 +775,18 @@ class LogrotateConfig: config_path = os.path.join(self.config_dir, self.config_name + suffix) if os.path.exists(config_path): if not self.module.check_mode: - os.remove(config_path) + if self.params["backup"]: + self.result["backup_file"] = self.module.backup_local(config_path) + try: + os.remove(config_path) + except Exception as e: + self.module.fail_json(msg=f"Failed to remove config file '{config_path}': {e}") self.result["changed"] = True self.result["config_file"] = config_path break return self.result - existing_content = self.read_existing_config(any_state=True) + existing_content = self.read_existing_config() current_enabled = self.result.get("enabled_state", True) target_enabled = self.params.get("enabled") @@ -806,21 +801,28 @@ class LogrotateConfig: old_path = self.get_config_path(not target_enabled) new_path = self.get_config_path(target_enabled) - if os.path.exists(old_path) and not os.path.exists(new_path): - self.result["changed"] = True - if not self.module.check_mode: - self.module.atomic_move(old_path, new_path, unsafe_writes=False) - - self.result["config_file"] = new_path - self.result["enabled_state"] = target_enabled + if not os.path.exists(old_path): + self.module.fail_json(msg=f"Cannot change enabled state: config file '{old_path}' not found") + elif os.path.exists(new_path): + self.module.fail_json(msg=f"Cannot change enabled state: target path '{new_path}' already exists") + self.result["changed"] = True + if not self.module.check_mode: try: - with open(new_path, "r") as f: - self.result["config_content"] = f.read() - except Exception: - self.result["config_content"] = existing_content + self.module.atomic_move(old_path, new_path, unsafe_writes=False) + except Exception as e: + self.module.fail_json(msg=f"Failed to rename config file from '{old_path}' to '{new_path}': {e}") - return self.result + self.result["config_file"] = new_path + self.result["enabled_state"] = target_enabled + + try: + with open(new_path, "r") as f: + self.result["config_content"] = f.read() + except Exception: + self.result["config_content"] = existing_content + + return self.result new_content = self.generate_config_content() self.result["config_content"] = new_content @@ -830,32 +832,44 @@ class LogrotateConfig: if needs_update: if not self.module.check_mode: - for suffix in ["", self.disabled_suffix]: - old_path = os.path.join(self.config_dir, self.config_name + suffix) - if os.path.exists(old_path): - backup_path = self.module.backup_local(old_path) - if backup_path: - self.result["backup_file"] = backup_path - os.remove(old_path) - - config_file_path = str(self.result["config_file"]) try: - with open(config_file_path, "w") as f: - f.write(new_content) - os.chmod(config_file_path, 0o644) + with tempfile.NamedTemporaryFile( + mode="w", suffix=".tmp", prefix=self.config_name + "_", dir=self.module.tmpdir, delete=False + ) as tmp_file: + tmp_file.write(new_content) + tmp_path = tmp_file.name + self.module.add_cleanup_file(tmp_path) except Exception as e: - self.module.fail_json(msg=f"Failed to write config file {config_file_path}: {to_native(e)}") + self.module.fail_json(msg=f"Failed to write temp config file: {e}") - test_cmd = [self.logrotate_bin, "-d", config_file_path] + test_cmd = [self.logrotate_bin, "-d", tmp_path] rc, stdout, stderr = self.module.run_command(test_cmd) if rc != 0: self.module.fail_json( msg="logrotate configuration test failed", stderr=stderr, stdout=stdout, - config_file=config_file_path, + config_file=str(self.result["config_file"]), ) + config_file_path = str(self.result["config_file"]) + for suffix in ["", self.disabled_suffix]: + old_path = os.path.join(self.config_dir, self.config_name + suffix) + if os.path.exists(old_path): + if self.params["backup"]: + self.result["backup_file"] = self.module.backup_local(old_path) + if old_path != config_file_path: + try: + os.remove(old_path) + except Exception as e: + self.module.fail_json(msg=f"Failed to remove old config file '{old_path}': {e}") + + try: + self.module.atomic_move(tmp_path, config_file_path, unsafe_writes=False) + os.chmod(config_file_path, 0o644) + except Exception as e: + self.module.fail_json(msg=f"Failed to move config file to {config_file_path!r}: {e}") + self.result["changed"] = needs_update self.result["enabled_state"] = target_enabled return self.result @@ -866,8 +880,8 @@ def main() -> None: module = AnsibleModule( argument_spec=dict( name=dict(type="str", required=True, aliases=["config_name"]), - state=dict(type="str", choices=["present", "absent"]), - config_dir=dict(type="path"), + state=dict(type="str", choices=["present", "absent"], default="present"), + config_dir=dict(type="path", default="/etc/logrotate.d"), paths=dict(type="list", elements="path"), rotation_period=dict( type="str", @@ -916,6 +930,7 @@ def main() -> None: enabled=dict(type="bool"), start=dict(type="int"), syslog=dict(type="bool"), + backup=dict(type="bool", default=True), ), mutually_exclusive=[ ["delay_compress", "no_delay_compress"], @@ -923,6 +938,11 @@ def main() -> None: ["size", "max_size"], ["copy", "copy_truncate", "rename_copy"], ], + required_by={ + "shred_cycles": "shred", + "compression_method": "compress", + "compress_options": "compress", + }, supports_check_mode=True, ) diff --git a/tests/unit/plugins/modules/test_logrotate.py b/tests/unit/plugins/modules/test_logrotate.py index 150923d96c..f428e6eac7 100644 --- a/tests/unit/plugins/modules/test_logrotate.py +++ b/tests/unit/plugins/modules/test_logrotate.py @@ -51,6 +51,8 @@ class TestLogrotateConfig(unittest.TestCase): self.mock_module.atomic_move = Mock() self.mock_module.warn = Mock() self.mock_module.run_command = Mock(return_value=(0, "", "")) + self.mock_module.backup_local = Mock(return_value=None) + self.mock_module.tmpdir = self.test_dir self.mock_ansible_basic.AnsibleModule.return_value = self.mock_module self.config_dir = os.path.join(self.test_dir, "logrotate.d") os.makedirs(self.config_dir, exist_ok=True) @@ -80,6 +82,7 @@ class TestLogrotateConfig(unittest.TestCase): "date_format": "-%Y%m%d", "shared_scripts": False, "enabled": True, + "backup": True, } default_params.update(params) self.mock_module.params = default_params @@ -99,18 +102,15 @@ class TestLogrotateConfig(unittest.TestCase): return False with patch("os.path.exists", side_effect=exists_side_effect): - with patch("builtins.open", mock_open()) as mock_file: - with patch("os.chmod") as mock_chmod: - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - self.assertTrue(result["changed"]) - self.assertIn("config_file", result) - self.assertIn("config_content", result) - self.assertEqual(result["enabled_state"], True) - mock_file.assert_called_once() - mock_chmod.assert_called_once() + with patch("os.chmod") as mock_chmod: + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertIn("config_file", result) + self.assertIn("config_content", result) + self.assertEqual(result["enabled_state"], True) + mock_chmod.assert_called_once() def test_update_existing_configuration(self): """Test updating an existing logrotate configuration.""" @@ -138,14 +138,13 @@ class TestLogrotateConfig(unittest.TestCase): with patch("builtins.open", mock_open(read_data=existing_content)): with patch("os.remove") as mock_remove: with patch("os.chmod") as mock_chmod: - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - self.assertTrue(result["changed"]) - self.assertIn("14", result["config_content"]) - mock_remove.assert_called() - mock_chmod.assert_called_once() + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertIn("14", result["config_content"]) + mock_remove.assert_not_called() + mock_chmod.assert_called_once() def test_remove_configuration(self): """Test removing a logrotate configuration.""" @@ -191,10 +190,9 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.chmod"): mock_file_write = mock_open() with patch("builtins.open", mock_file_write): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() self.assertTrue(result["changed"]) self.assertEqual(result["enabled_state"], False) self.assertTrue(result["config_file"].endswith(".disabled")) @@ -222,13 +220,12 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.remove"): with patch("os.chmod"): self.mock_module.atomic_move = Mock() - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - self.assertTrue(result["changed"]) - self.assertEqual(result["enabled_state"], True) - self.assertFalse(result["config_file"].endswith(".disabled")) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertEqual(result["enabled_state"], True) + self.assertFalse(result["config_file"].endswith(".disabled")) def test_validation_missing_paths(self): """Test validation when paths are missing for new configuration.""" @@ -251,27 +248,6 @@ class TestLogrotateConfig(unittest.TestCase): config.apply() self.assertIn("fail_json called", str(context.exception)) - def test_validation_size_and_max_size_exclusive(self): - """Test validation when both size and max_size are specified.""" - from ansible_collections.community.general.plugins.modules import logrotate - - self._setup_module_params(size="100M", max_size="200M") - config_path = os.path.join(self.config_dir, "test") - - def exists_side_effect(path): - if path == self.config_dir: - return True - elif path == config_path: - return False - return False - - with patch("os.path.exists", side_effect=exists_side_effect): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - with self.assertRaises(Exception) as context: - config.apply() - self.assertIn("fail_json called", str(context.exception)) - def test_check_mode(self): """Test that no changes are made in check mode.""" from ansible_collections.community.general.plugins.modules import logrotate @@ -289,11 +265,10 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) def test_generate_config_with_scripts(self): """Test generating configuration with pre/post scripts.""" @@ -317,17 +292,16 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("prerotate", content) - self.assertIn("postrotate", content) - self.assertIn("firstaction", content) - self.assertIn("lastaction", content) - self.assertIn("systemctl reload test", content) - self.assertIn("echo 'Pre-rotation'", content) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("prerotate", content) + self.assertIn("postrotate", content) + self.assertIn("firstaction", content) + self.assertIn("lastaction", content) + self.assertIn("systemctl reload test", content) + self.assertIn("echo 'Pre-rotation'", content) def test_compression_methods(self): """Test different compression methods.""" @@ -349,18 +323,17 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - if method != "gzip": - self.assertIn(f"compresscmd /usr/bin/{method}", content) - if method == "zstd" or method == "lz4": - self.assertIn(f"uncompresscmd /usr/bin/{method} -d", content) - else: - uncompress_cmd = f"{method}un{method}" - self.assertIn(f"uncompresscmd /usr/bin/{uncompress_cmd}", content) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + if method != "gzip": + self.assertIn(f"compresscmd /usr/bin/{method}", content) + if method == "zstd" or method == "lz4": + self.assertIn(f"uncompresscmd /usr/bin/{method} -d", content) + else: + uncompress_cmd = f"{method}un{method}" + self.assertIn(f"uncompresscmd /usr/bin/{uncompress_cmd}", content) def test_size_based_rotation(self): """Test size-based rotation configuration.""" @@ -379,13 +352,12 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("size 100M", content) - self.assertNotIn("daily", content) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("size 100M", content) + self.assertNotIn("daily", content) def test_logrotate_not_installed(self): """Test error when logrotate is not installed.""" @@ -460,12 +432,11 @@ class TestLogrotateConfig(unittest.TestCase): with patch("builtins.open", mock_file_read): with patch("os.remove"): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - self.assertTrue(result["changed"]) - self.assertIn("/var/log/app1/*.log", result["config_content"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.assertIn("/var/log/app1/*.log", result["config_content"]) def test_no_delay_compress_parameter(self): """Test no_delay_compress parameter.""" @@ -484,13 +455,12 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("nodelaycompress", content) - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("nodelaycompress", content) + self.assertTrue(result["changed"]) def test_shred_and_shred_cycles_parameters(self): """Test shred and shred_cycles parameters.""" @@ -509,14 +479,13 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("shred", content) - self.assertIn("shredcycles 3", content) - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("shred", content) + self.assertIn("shredcycles 3", content) + self.assertTrue(result["changed"]) def test_copy_parameter(self): """Test copy parameter.""" @@ -535,14 +504,13 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("copy", content) - self.assertNotIn("copytruncate", content) - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("copy", content) + self.assertNotIn("copytruncate", content) + self.assertTrue(result["changed"]) def test_rename_copy_parameter(self): """Test rename_copy parameter.""" @@ -561,13 +529,12 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("renamecopy", content) - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("renamecopy", content) + self.assertTrue(result["changed"]) def test_min_size_parameter(self): """Test min_size parameter.""" @@ -586,13 +553,12 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("minsize 100k", content) - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("minsize 100k", content) + self.assertTrue(result["changed"]) def test_date_yesterday_parameter(self): """Test date_yesterday parameter.""" @@ -611,14 +577,13 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("dateext", content) - self.assertIn("dateyesterday", content) - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("dateext", content) + self.assertIn("dateyesterday", content) + self.assertTrue(result["changed"]) def test_create_old_dir_parameter(self): """Test create_old_dir parameter.""" @@ -637,14 +602,13 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("olddir /var/log/archives", content) - self.assertIn("createolddir", content) - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("olddir /var/log/archives", content) + self.assertIn("createolddir", content) + self.assertTrue(result["changed"]) def test_start_parameter(self): """Test start parameter.""" @@ -663,13 +627,12 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("start 1", content) - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("start 1", content) + self.assertTrue(result["changed"]) def test_syslog_parameter(self): """Test syslog parameter.""" @@ -688,55 +651,12 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertIn("syslog", content) - self.assertTrue(result["changed"]) - - def test_validation_copy_and_copy_truncate_exclusive(self): - """Test validation when both copy and copy_truncate are specified.""" - from ansible_collections.community.general.plugins.modules import logrotate - - self._setup_module_params(copy=True, copy_truncate=True) - config_path = os.path.join(self.config_dir, "test") - - def exists_side_effect(path): - if path == self.config_dir: - return True - elif path == config_path: - return False - return False - - with patch("os.path.exists", side_effect=exists_side_effect): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - with self.assertRaises(Exception) as context: - config.apply() - self.assertIn("fail_json called", str(context.exception)) - - def test_validation_copy_and_rename_copy_exclusive(self): - """Test validation when both copy and rename_copy are specified.""" - from ansible_collections.community.general.plugins.modules import logrotate - - self._setup_module_params(copy=True, rename_copy=True) - config_path = os.path.join(self.config_dir, "test") - - def exists_side_effect(path): - if path == self.config_dir: - return True - elif path == config_path: - return False - return False - - with patch("os.path.exists", side_effect=exists_side_effect): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - with self.assertRaises(Exception) as context: - config.apply() - self.assertIn("fail_json called", str(context.exception)) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertIn("syslog", content) + self.assertTrue(result["changed"]) def test_validation_shred_cycles_positive(self): """Test validation when shred_cycles is not positive.""" @@ -780,27 +700,6 @@ class TestLogrotateConfig(unittest.TestCase): config.apply() self.assertIn("fail_json called", str(context.exception)) - def test_validation_old_dir_and_no_old_dir_exclusive(self): - """Test validation when both old_dir and no_old_dir are specified.""" - from ansible_collections.community.general.plugins.modules import logrotate - - self._setup_module_params(old_dir="/var/log/archives", no_old_dir=True) - config_path = os.path.join(self.config_dir, "test") - - def exists_side_effect(path): - if path == self.config_dir: - return True - elif path == config_path: - return False - return False - - with patch("os.path.exists", side_effect=exists_side_effect): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - with self.assertRaises(Exception) as context: - config.apply() - self.assertIn("fail_json called", str(context.exception)) - def test_all_new_parameters_together(self): """Test all new parameters together in one configuration.""" from ansible_collections.community.general.plugins.modules import logrotate @@ -834,74 +733,28 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - content = result["config_content"] - self.assertTrue(result["changed"]) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + content = result["config_content"] + self.assertTrue(result["changed"]) - self.assertIn("nodelaycompress", content) - self.assertIn("shred", content) - self.assertIn("shredcycles 3", content) - self.assertIn("copy", content) - self.assertIn("minsize 100k", content) - self.assertIn("dateext", content) - self.assertIn("dateyesterday", content) - self.assertIn("olddir /var/log/archives", content) - self.assertIn("createolddir", content) - self.assertIn("start 1", content) - self.assertIn("syslog", content) + self.assertIn("nodelaycompress", content) + self.assertIn("shred", content) + self.assertIn("shredcycles 3", content) + self.assertIn("copy", content) + self.assertIn("minsize 100k", content) + self.assertIn("dateext", content) + self.assertIn("dateyesterday", content) + self.assertIn("olddir /var/log/archives", content) + self.assertIn("createolddir", content) + self.assertIn("start 1", content) + self.assertIn("syslog", content) - lines = [line.strip() for line in content.split("\n")] - self.assertNotIn("copytruncate", lines) - self.assertNotIn("renamecopy", lines) - self.assertNotIn("delaycompress", lines) - - def test_parameter_interactions(self): - """Test interactions between related parameters.""" - from ansible_collections.community.general.plugins.modules import logrotate - - self._setup_module_params(old_dir="/var/log/archives", no_old_dir=True) - config_path = os.path.join(self.config_dir, "test") - - def exists_side_effect(path): - if path == self.config_dir: - return True - elif path == config_path: - return False - return False - - with patch("os.path.exists", side_effect=exists_side_effect): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - - with self.assertRaises(Exception) as context: - config.apply() - - self.assertIn("fail_json called", str(context.exception)) - - self._setup_module_params(copy=True, rename_copy=True) - - with patch("os.path.exists", side_effect=exists_side_effect): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - - with self.assertRaises(Exception) as context: - config.apply() - - self.assertIn("fail_json called", str(context.exception)) - - self._setup_module_params(copy=True, copy_truncate=True) - - with patch("os.path.exists", side_effect=exists_side_effect): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - - with self.assertRaises(Exception) as context: - config.apply() - - self.assertIn("fail_json called", str(context.exception)) + lines = [line.strip() for line in content.split("\n")] + self.assertNotIn("copytruncate", lines) + self.assertNotIn("renamecopy", lines) + self.assertNotIn("delaycompress", lines) def test_size_format_validation(self): """Test validation of size format parameters.""" @@ -924,12 +777,11 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - self.assertIn(f"size {size}", result["config_content"]) + result = config.apply() + self.assertIn(f"size {size}", result["config_content"]) invalid_sizes = ["100kb", "M100", "1.5G", "abc", "100 MB"] @@ -975,12 +827,11 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - logrotate_bin = self.mock_module.get_bin_path.return_value - config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) - result = config.apply() - self.assertIn(f"maxsize {size}", result["config_content"]) + result = config.apply() + self.assertIn(f"maxsize {size}", result["config_content"]) invalid_sizes = ["100kb", "M100", "1.5G", "abc", "100 MB"] @@ -1005,6 +856,68 @@ class TestLogrotateConfig(unittest.TestCase): self.assertIn("fail_json called", str(context.exception)) + def test_backup_disabled_skips_backup(self): + """Test that backup is not created when backup parameter is False.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(rotate_count=14, backup=False) + config_path = os.path.join(self.config_dir, "test") + existing_content = """/var/log/test/*.log { + daily + rotate 7 + compress +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open(read_data=existing_content)): + with patch("os.remove"): + with patch("os.chmod"): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.mock_module.backup_local.assert_not_called() + self.assertNotIn("backup_file", result) + + def test_backup_enabled_by_default(self): + """Test that backup is created by default.""" + from ansible_collections.community.general.plugins.modules import logrotate + + self._setup_module_params(rotate_count=14) + config_path = os.path.join(self.config_dir, "test") + expected_backup_path = config_path + ".20260101_120000" + self.mock_module.backup_local = Mock(return_value=expected_backup_path) + existing_content = """/var/log/test/*.log { + daily + rotate 7 + compress +}""" + + def exists_side_effect(path): + if path == self.config_dir: + return True + elif path == config_path: + return True + return False + + with patch("os.path.exists", side_effect=exists_side_effect): + with patch("builtins.open", mock_open(read_data=existing_content)): + with patch("os.remove"): + with patch("os.chmod"): + logrotate_bin = self.mock_module.get_bin_path.return_value + config = logrotate.LogrotateConfig(self.mock_module, logrotate_bin) + result = config.apply() + self.assertTrue(result["changed"]) + self.mock_module.backup_local.assert_called_once_with(config_path) + self.assertEqual(result["backup_file"], expected_backup_path) + def test_logrotate_bin_used_in_apply(self): """Test that logrotate binary path is used in apply method.""" from ansible_collections.community.general.plugins.modules import logrotate @@ -1026,13 +939,12 @@ class TestLogrotateConfig(unittest.TestCase): with patch("os.path.exists", side_effect=exists_side_effect): with patch("builtins.open", mock_open()): with patch("os.chmod"): - with patch.object(logrotate.LogrotateConfig, "_backup_config", create=True): - config = logrotate.LogrotateConfig(self.mock_module, test_logrotate_path) - config.apply() + config = logrotate.LogrotateConfig(self.mock_module, test_logrotate_path) + config.apply() - self.mock_module.run_command.assert_called_once() - call_args = self.mock_module.run_command.call_args[0][0] - self.assertEqual(call_args[0], test_logrotate_path) + self.mock_module.run_command.assert_called_once() + call_args = self.mock_module.run_command.call_args[0][0] + self.assertEqual(call_args[0], test_logrotate_path) if __name__ == "__main__":