[PR #11764/e9110811 backport][stable-12] logrotate: fix parameter and config file validation and more (#11856)

logrotate: fix parameter and config file validation and more (#11764)

* fix(logrotate): add missing defaults and parameter validation declarations

- Add default="present" to state parameter
- Add default="/etc/logrotate.d" to config_dir parameter
- Add required_by declarations for shred and compression parameters

* fix(logrotate): fix runtime validation bugs, remove duplicate checks

- Fix shred_cycles TypeError when value is None
- Fix enabled=None handling in get_config_path
- Remove duplicate runtime mutually_exclusive checks
- Add runtime boolean truthiness checks
- Add 'create' parameter format validation
- Remove stale test method

* fix(logrotate): restructure file operations, validate before write

- Write content to tmpdir temp file, validate, then atomic move to destination.
- Wrap all os.remove() calls in try/except with fail_json on error
- Wrap all module.atomic_move() calls in try/except with fail_json on error
- Also add self.mock_module.tmpdir = self.test_dir to test setUp for new code path

* docs(logrotate): update DOCUMENTATION block

- Add 'default: present' to state option
- Add 'default: /etc/logrotate.d' to config_dir option

* feat(logrotate): add optional backup parameter

* chore: add logrotate fixes changelog fragment

* chore(changelog/logrotate): use present tense singular

* fix(logrotate): handle trailing spaces in create param



* refactor(logrotate): remove redundant checks

These are already handled by `required_if` statements in the module spec

* refactor(logrotate): use tempfile to create temporary file

* refactor(logrotate): remove redundant `bool()` casts on `target_enabled`

`target_enabled` is guaranteed to be bool by this point. It's either the module param (typed bool) or falls back to `current_enabled` (also bool). The `bool()` wraps are no-ops.

* refactor(logrotate): remove unused `self.config_file` attribute

* refactor(logrotate): remove dead `any_state` parameter from `read_existing_config`

* fix(logrotate): raise error instead of falling through on enabled-state rename failures

* refactor(logrotate): tighten `get_config_path` sig to bool

`None` callers are removed now so this is safe

* test(logrotate): remove stale open mock assertion after tempfile refactor

* style(logrotate): format file

* chore(logrotate): add missing `version_added` attribute



* fix(logrotate): clean up temp file



* fix(logrotate): remove redundant temp file cleanup



* refactor(logrotate): Use dict subscript to access required backup param



* fix(logrotate): fix: only remove old config file when path differs from target



* fix(logrotate): update logrotate_bin type hint to str

* feat(logrotate): add backup file handling when removing old config

* style(logrotate): format file

* test(logrotate): add missing backup default to `_setup_module_params`

* test(logrotate): fix incorrect `os.remove` assertion in update test

* refactor(logrotate): remove unnecessary `to_native()` call



* refactor(logrotate): replace str quotes with !r



* fix(logrotate): change backup default back to true

* fix(logrotate): raise error when `shred_cycle`s is set with `shred=false`

* docs(logrotate): clarify `shred_cycles` behaviour

* fix(logrotate): remove to_native calls for exception messages

* docs(logrotate): improve `config_dir` param description

* refactor(logrotate): simplify backup file assignment logic

* style(logrotate): format file

* docs(logrotate): improve config_map description

---------



(cherry picked from commit e911081102)

Co-authored-by: tigattack <10629864+tigattack@users.noreply.github.com>
Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
patchback[bot]
2026-04-17 18:32:36 +02:00
committed by GitHub
parent 956fc075ef
commit 2873d439c3
3 changed files with 317 additions and 379 deletions

View File

@@ -33,13 +33,14 @@ options:
- Whether the configuration should be present or absent.
type: str
choices: [present, absent]
default: present
config_dir:
description:
- Directory where logrotate configurations are stored.
- Default is V(/etc/logrotate.d) for system-wide configurations.
- Use V(~/.logrotate.d) for user-specific configurations.
- Using V(~/.logrotate.d) is recommended for user-specific configurations.
- This directory must exist before using the module.
type: path
default: /etc/logrotate.d
paths:
description:
- List of log file paths or patterns to rotate.
@@ -95,6 +96,7 @@ options:
shred_cycles:
description:
- Number of times to overwrite files when using O(shred=true).
- When omitted, logrotate uses C(shred)'s default.
type: int
missing_ok:
description:
@@ -252,6 +254,13 @@ options:
- Base number for rotated files. Allowed values are from V(0) to V(999).
- For example, V(1) gives files C(.1), C(.2), and so on instead of C(.0), C(.1).
type: int
backup:
description:
- Create a backup of the existing configuration file before overwriting.
- The backup file is created in the same directory with a timestamp suffix.
type: bool
version_added: 12.6.0
default: true
syslog:
description:
- Send logrotate messages to syslog.
@@ -450,15 +459,15 @@ enabled_state:
backup_file:
description: Path to the backup of the original configuration file, if it was backed up.
type: str
returned: success when backup was made
returned: success when O(backup=true) and an existing configuration was overwritten
sample: /etc/logrotate.d/nginx.20250101_120000
"""
import os
import re
import tempfile
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_native
class LogrotateConfig:
@@ -479,8 +488,6 @@ class LogrotateConfig:
self.config_name = self.params["name"]
self.disabled_suffix = ".disabled"
self.config_file = self.get_config_path(self.params["enabled"])
def get_config_path(self, enabled: bool) -> str:
"""Get config file path based on enabled state."""
base_path = os.path.join(self.config_dir, self.config_name)
@@ -494,9 +501,6 @@ class LogrotateConfig:
if self.params["start"] < 0 or self.params["start"] > 999:
self.module.fail_json(msg="'start' must be between 0 and 999")
if self.params.get("size") and self.params.get("max_size"):
self.module.fail_json(msg="'size' and 'max_size' parameters are mutually exclusive")
su_val = self.params.get("su")
if su_val is not None:
if su_val == "":
@@ -506,8 +510,18 @@ class LogrotateConfig:
if len(su_parts) != 2:
self.module.fail_json(msg="'su' parameter must be in format 'user group' or empty string to remove")
if self.params.get("shred_cycles", 1) < 1:
self.module.fail_json(msg="'shred_cycles' must be a positive integer")
if self.params.get("create") is not None:
create_val = self.params["create"]
if not re.match(r"^\d{3,4}(\s+\S+\s+\S+)?\s*$", create_val):
self.module.fail_json(
msg="'create' must be in format 'mode' or 'mode user group' (for example '0640' or '0640 root adm')"
)
if self.params.get("shred_cycles") is not None:
if self.params["shred_cycles"] < 1:
self.module.fail_json(msg="'shred_cycles' must be a positive integer")
if self.params.get("shred") is False:
self.module.fail_json(msg="'shred_cycles' requires 'shred=true'")
for size_param in ["size", "min_size", "max_size"]:
if self.params.get(size_param):
@@ -516,53 +530,29 @@ class LogrotateConfig:
msg=f"'{size_param}' must be in format 'number[k|M|G]' (for example '100M', '1G')"
)
if self.params.get("old_dir") and self.params.get("no_old_dir"):
self.module.fail_json(msg="'old_dir' and 'no_old_dir' parameters are mutually exclusive")
copy_params = [self.params.get("copy"), self.params.get("copy_truncate"), self.params.get("rename_copy")]
if sum(1 for v in copy_params if v) > 1:
self.module.fail_json(msg="'copy', 'copy_truncate', and 'rename_copy' parameters are mutually exclusive")
if self.params.get("delay_compress") and self.params.get("no_delay_compress"):
self.module.fail_json(msg="'delay_compress' and 'no_delay_compress' parameters are mutually exclusive")
if self.params["state"] == "present":
existing_content = self.read_existing_config(any_state=True)
existing_content = self.read_existing_config()
if not existing_content and not self.params.get("paths"):
self.module.fail_json(msg="'paths' parameter is required when creating a new configuration")
def read_existing_config(self, any_state: bool = False) -> str | None:
"""Read existing configuration file.
Args:
any_state: If True, check both enabled and disabled versions.
If False, only check based on current enabled param.
"""
if any_state:
for suffix in ["", self.disabled_suffix]:
config_path = os.path.join(self.config_dir, self.config_name + suffix)
if os.path.exists(config_path):
self.result["enabled_state"] = suffix == ""
try:
with open(config_path, "r") as f:
return f.read()
except Exception as e:
self.module.fail_json(msg=f"Failed to read config file {config_path}: {to_native(e)}")
else:
config_path = self.get_config_path(self.params["enabled"])
def read_existing_config(self) -> str | None:
"""Read existing configuration file, checking both enabled and disabled versions."""
for suffix in ["", self.disabled_suffix]:
config_path = os.path.join(self.config_dir, self.config_name + suffix)
if os.path.exists(config_path):
self.result["enabled_state"] = suffix == ""
try:
with open(config_path, "r") as f:
return f.read()
except Exception as e:
self.module.fail_json(msg=f"Failed to read config file {config_path}: {to_native(e)}")
self.module.fail_json(msg=f"Failed to read config file {config_path}: {e}")
return None
def generate_config_content(self) -> str:
"""Generate logrotate configuration content."""
if not self.params.get("paths"):
existing_content = self.read_existing_config(any_state=True)
existing_content = self.read_existing_config()
if existing_content:
lines = existing_content.strip().split("\n")
paths = []
@@ -785,13 +775,18 @@ class LogrotateConfig:
config_path = os.path.join(self.config_dir, self.config_name + suffix)
if os.path.exists(config_path):
if not self.module.check_mode:
os.remove(config_path)
if self.params["backup"]:
self.result["backup_file"] = self.module.backup_local(config_path)
try:
os.remove(config_path)
except Exception as e:
self.module.fail_json(msg=f"Failed to remove config file '{config_path}': {e}")
self.result["changed"] = True
self.result["config_file"] = config_path
break
return self.result
existing_content = self.read_existing_config(any_state=True)
existing_content = self.read_existing_config()
current_enabled = self.result.get("enabled_state", True)
target_enabled = self.params.get("enabled")
@@ -806,21 +801,28 @@ class LogrotateConfig:
old_path = self.get_config_path(not target_enabled)
new_path = self.get_config_path(target_enabled)
if os.path.exists(old_path) and not os.path.exists(new_path):
self.result["changed"] = True
if not self.module.check_mode:
self.module.atomic_move(old_path, new_path, unsafe_writes=False)
self.result["config_file"] = new_path
self.result["enabled_state"] = target_enabled
if not os.path.exists(old_path):
self.module.fail_json(msg=f"Cannot change enabled state: config file '{old_path}' not found")
elif os.path.exists(new_path):
self.module.fail_json(msg=f"Cannot change enabled state: target path '{new_path}' already exists")
self.result["changed"] = True
if not self.module.check_mode:
try:
with open(new_path, "r") as f:
self.result["config_content"] = f.read()
except Exception:
self.result["config_content"] = existing_content
self.module.atomic_move(old_path, new_path, unsafe_writes=False)
except Exception as e:
self.module.fail_json(msg=f"Failed to rename config file from '{old_path}' to '{new_path}': {e}")
return self.result
self.result["config_file"] = new_path
self.result["enabled_state"] = target_enabled
try:
with open(new_path, "r") as f:
self.result["config_content"] = f.read()
except Exception:
self.result["config_content"] = existing_content
return self.result
new_content = self.generate_config_content()
self.result["config_content"] = new_content
@@ -830,32 +832,44 @@ class LogrotateConfig:
if needs_update:
if not self.module.check_mode:
for suffix in ["", self.disabled_suffix]:
old_path = os.path.join(self.config_dir, self.config_name + suffix)
if os.path.exists(old_path):
backup_path = self.module.backup_local(old_path)
if backup_path:
self.result["backup_file"] = backup_path
os.remove(old_path)
config_file_path = str(self.result["config_file"])
try:
with open(config_file_path, "w") as f:
f.write(new_content)
os.chmod(config_file_path, 0o644)
with tempfile.NamedTemporaryFile(
mode="w", suffix=".tmp", prefix=self.config_name + "_", dir=self.module.tmpdir, delete=False
) as tmp_file:
tmp_file.write(new_content)
tmp_path = tmp_file.name
self.module.add_cleanup_file(tmp_path)
except Exception as e:
self.module.fail_json(msg=f"Failed to write config file {config_file_path}: {to_native(e)}")
self.module.fail_json(msg=f"Failed to write temp config file: {e}")
test_cmd = [self.logrotate_bin, "-d", config_file_path]
test_cmd = [self.logrotate_bin, "-d", tmp_path]
rc, stdout, stderr = self.module.run_command(test_cmd)
if rc != 0:
self.module.fail_json(
msg="logrotate configuration test failed",
stderr=stderr,
stdout=stdout,
config_file=config_file_path,
config_file=str(self.result["config_file"]),
)
config_file_path = str(self.result["config_file"])
for suffix in ["", self.disabled_suffix]:
old_path = os.path.join(self.config_dir, self.config_name + suffix)
if os.path.exists(old_path):
if self.params["backup"]:
self.result["backup_file"] = self.module.backup_local(old_path)
if old_path != config_file_path:
try:
os.remove(old_path)
except Exception as e:
self.module.fail_json(msg=f"Failed to remove old config file '{old_path}': {e}")
try:
self.module.atomic_move(tmp_path, config_file_path, unsafe_writes=False)
os.chmod(config_file_path, 0o644)
except Exception as e:
self.module.fail_json(msg=f"Failed to move config file to {config_file_path!r}: {e}")
self.result["changed"] = needs_update
self.result["enabled_state"] = target_enabled
return self.result
@@ -866,8 +880,8 @@ def main() -> None:
module = AnsibleModule(
argument_spec=dict(
name=dict(type="str", required=True, aliases=["config_name"]),
state=dict(type="str", choices=["present", "absent"]),
config_dir=dict(type="path"),
state=dict(type="str", choices=["present", "absent"], default="present"),
config_dir=dict(type="path", default="/etc/logrotate.d"),
paths=dict(type="list", elements="path"),
rotation_period=dict(
type="str",
@@ -916,6 +930,7 @@ def main() -> None:
enabled=dict(type="bool"),
start=dict(type="int"),
syslog=dict(type="bool"),
backup=dict(type="bool", default=True),
),
mutually_exclusive=[
["delay_compress", "no_delay_compress"],
@@ -923,6 +938,11 @@ def main() -> None:
["size", "max_size"],
["copy", "copy_truncate", "rename_copy"],
],
required_by={
"shred_cycles": "shred",
"compression_method": "compress",
"compress_options": "compress",
},
supports_check_mode=True,
)