luks_device: add support for tpm2 and keyslot priority (#972)

* luks_device: add support for tpm2 and fido2 devices

* Update documentation per code review comments

Co-authored-by: Felix Fontein <felix@fontein.de>

* Add support for keyslot priority

* Add changelog fragment.

* Remove fido2 support. Add idempotency for remove-tpm2

* Fix testing

* Fix testing again

* Fix formatting

* Fix format

* Apply suggestions from code review

Co-authored-by: Felix Fontein <felix@fontein.de>

* Add required_by and fix formatting

* vscode did something stupid...

* Address code review comments

---------

Co-authored-by: Felix Fontein <felix@fontein.de>
This commit is contained in:
Aram Akhavan
2025-12-22 00:15:52 -08:00
committed by GitHub
parent 663d1a1321
commit 83806cafc7
3 changed files with 477 additions and 52 deletions

View File

@@ -0,0 +1,3 @@
minor_changes:
- luks_device - add support for TPM2 enrollment using ``systemd-cryptsetup`` (https://github.com/ansible-collections/community.crypto/issues/850, https://github.com/ansible-collections/community.crypto/pull/972).
- luks_device - add support for keyslot priority (https://github.com/ansible-collections/community.crypto/issues/850, https://github.com/ansible-collections/community.crypto/pull/972).

View File

@@ -38,9 +38,9 @@ options:
- V(present) will create LUKS container unless already present. Requires O(device) and either O(keyfile) or O(passphrase)
options to be provided.
- V(absent) will remove existing LUKS container if it exists. Requires O(device) or O(name) to be specified.
- V(opened) will unlock the LUKS container. If it does not exist it will be created first. Requires O(device) and either
O(keyfile) or O(passphrase) to be specified. Use the O(name) option to set the name of the opened container. Otherwise
the name will be generated automatically and returned as a part of the result.
- V(opened) will unlock the LUKS container. Requires O(device) and one of O(keyfile), O(passphrase), O(tpm2_device) to be specified.
If the container does not exist it will be created first, however O(tpm2_device) can not be used for creation.
Use the O(name) option to set the name of the opened container. Otherwise the name will be generated automatically and returned as a part of the result.
- V(closed) will lock the LUKS container. However if the container does not exist it will be created. Requires O(device)
and either O(keyfile) or O(passphrase) options to be provided. If container does already exist O(device) or O(name)
will suffice.
@@ -66,6 +66,15 @@ options:
another encoding, use the O(passphrase_encoding) option and provide the passphrase Base64 encoded.
type: str
version_added: '1.0.0'
tpm2_device:
description:
- Used to unlock the container, but can not be used for container creation. A device node path referring to a TPM2 chip (for example V(/dev/tpmrm0)).
Alternatively the special value V(auto) may be specified, in order to automatically determine the device node of a currently
discovered TPM2 device (of which there must be exactly one).
- B(Note) that only LUKS2 containers are supported
- B(Note) that systemd-cryptsetup (v256 or newer) is required.
type: str
version_added: '3.1.0'
passphrase_encoding:
description:
- Determine how passphrases are provided to parameters such as O(passphrase), O(new_passphrase), and O(remove_passphrase).
@@ -82,6 +91,7 @@ options:
description:
- Adds the O(keyfile) or O(passphrase) to a specific keyslot when creating a new container on O(device). Parameter value
is the number of the keyslot.
- Defines the keyslot whose priority will be changed by O(keyslot_priority)
- B(Note) that a device of O(type=luks1) supports the keyslot numbers V(0)-V(7) and a device of O(type=luks2) supports
the keyslot numbers V(0)-V(31). In order to use the keyslots V(8)-V(31) when creating a new container, setting O(type)
to V(luks2) is required.
@@ -118,6 +128,21 @@ options:
the keyslot numbers V(0)-V(31).
type: int
version_added: '2.16.0'
new_tpm2:
description:
- Adds a TPM2 security chip to given container on O(device). Expects a device node path referring to the TPM2 chip (e.g. V(/dev/tpmrm0)).
Alternatively the special value V(auto) may be specified, in order to automatically determine the device node of a currently
discovered TPM2 device (of which there must be exactly one). Requires O(new_tpm2_pcrs).
- B(Note) that O(new_keyslot) does not affect the keyslot for TPM2 enrollment.
- B(Note) that only LUKS2 containers are supported.
- B(Note) that systemd-cryptsetup (v248 or newer) is required.
type: str
version_added: '3.1.0'
new_tpm2_pcrs:
description:
- TPM2 PCRs (Platform Configuration Registers) to bind to. See systemd-cryptenroll documentation for details (C(--tpm2-pcrs) argument).
type: str
version_added: '3.1.0'
remove_keyfile:
description:
- Removes given key from the container on O(device). Does not remove the keyfile from filesystem. Parameter value is
@@ -137,6 +162,15 @@ options:
another encoding, use the O(passphrase_encoding) option and provide the passphrase Base64 encoded.
type: str
version_added: '1.0.0'
remove_tpm2:
description:
- Removes B(all) key slots on O(device) that are unlocked by a TPM2 device.
Needs O(keyfile), O(passphrase), or O(tpm2_device) for authorization.
- B(Note) that systemd-cryptsetup (v248 or newer) is required.
- B(Note) that you should avoid using O(tpm2_device) to authorize removal of all TPM2 slots to ensure that you can still access the container afterwards.
type: bool
default: false
version_added: '3.1.0'
remove_keyslot:
description:
- Removes the key in the given slot on O(device). Needs O(keyfile) or O(passphrase) for authorization.
@@ -145,6 +179,13 @@ options:
- B(Note) that the given O(keyfile) or O(passphrase) must not be in the slot to be removed.
type: int
version_added: '2.16.0'
keyslot_priority:
description:
- Sets the keyslot priority for the keyslot specified by O(keyslot).
- B(Note) that keyslot priority is only supported for LUKS2 containers.
type: str
choices: [prefer, normal, ignore]
version_added: '3.1.0'
force_remove_last_key:
description:
- If set to V(true), allows removing the last key from a container.
@@ -284,6 +325,7 @@ requirements:
- "wipefs (when O(state) is V(absent))"
- "lsblk"
- "blkid (when O(label) or O(uuid) options are used)"
- "systemd-cryptsetup (for TPM2 only)"
author: Jan Pokorny (@japokorn)
"""
@@ -407,6 +449,23 @@ EXAMPLES = r"""
device: "/dev/loop0"
keyfile: "/vault/keyfile"
remove_keyslot: 4
- name: Enroll a TPM2 device using a keyfile to unlock the container
community.crypto.luks_device:
keyfile: "/vault/keyfile"
new_tpm2: "auto"
new_tpm2_pcrs: "1+3+5+7+11+12+14"
- name: Remove all enrolled TPM2 devices
community.crypto.luks_device:
tpm2_device: "auto"
remove_tpm2: true
- name: Set the priority of keyslot 0 to 'prefer'
community.crypto.luks_device:
device: "/dev/loop0"
keyslot: 0
keyslot_priority: prefer
"""
RETURN = r"""
@@ -417,6 +476,7 @@ name:
sample: "luks-c1da9a58-2fde-4256-9d9f-6ab008b4dd1b"
"""
import json
import os
import re
import stat
@@ -859,6 +919,129 @@ class CryptHandler(Handler):
f"Error while testing whether keyslot exists on {device}: {stderr}"
)
def luks_dump_json_metadata(self, device: str) -> dict:
"""Dump LUKS metadata in JSON format.
Raises ValueError when command fails.
"""
rc, stdout, stderr = self._run_command(
[self._cryptsetup_bin, "luksDump", "--dump-json-metadata", device]
)
if rc != 0:
raise ValueError(
f"Error while dumping LUKS JSON metadata from {device}: {stderr}"
)
return json.loads(stdout)
def run_config(self, device: str, keyslot: int, keyslot_priority: str) -> None:
"""Configure LUKS keyslot priority.
Raises ValueError when command fails.
"""
rc, dummy, stderr = self._run_command(
[
self._cryptsetup_bin,
"config",
f"--key-slot={keyslot}",
f"--priority={keyslot_priority}",
device,
]
)
if rc != 0:
raise ValueError(
f"Error while configuring LUKS keyslot {keyslot} on {device}: {stderr}"
)
def run_systemd_cryptsetup_attach(
self,
device: str,
tpm2_device: str | None,
perf_same_cpu_crypt: bool,
perf_submit_from_crypt_cpus: bool,
perf_no_read_workqueue: bool,
perf_no_write_workqueue: bool,
allow_discards: bool,
name: str,
) -> None:
systemd_cryptsetup_bin = self._module.get_bin_path(
"systemd-cryptsetup", required=True
)
args = [systemd_cryptsetup_bin, name, device, "none"]
options = []
if tpm2_device is not None:
options.append(f"tpm2-device={tpm2_device}")
if perf_same_cpu_crypt:
options.append("same-cpu-crypt")
if perf_submit_from_crypt_cpus:
options.append("submit-from-crypt-cpus")
if perf_no_read_workqueue:
options.append("no-read-workqueue")
if perf_no_write_workqueue:
options.append("no-write-workqueue")
if allow_discards:
options.append("discard")
args.append(",".join(options))
rc, dummy, stderr = self._run_command(args)
if rc != 0:
raise ValueError(f"Error while opening {device}: {stderr}")
def run_systemd_cryptenroll(
self,
device: str,
keyfile: str | None,
passphrase: bytes | None,
tpm2_device: str | None,
new_tpm2: str | None,
new_tpm2_pcrs: str | None,
remove_tpm2: bool,
) -> bool:
systemd_cryptenroll_bin = self._module.get_bin_path(
"systemd-cryptenroll", required=True
)
args = [systemd_cryptenroll_bin]
if keyfile is not None:
args.append(f"--unlock-key-file={keyfile}")
if tpm2_device:
args.append(f"--unlock-tpm2-device={tpm2_device}")
if new_tpm2:
args.extend([f"--tpm2-device={new_tpm2}", f"--tpm2-pcrs={new_tpm2_pcrs}"])
if remove_tpm2:
args.append("--wipe-slot=tpm2")
if passphrase:
# --unlock-key-file is needed both so a newline isnt required at the end of stdin
# and also because otherwise, systemd-cryptenroll will just hang if the passphrase is wrong,
# waiting for the user to input another one
rc, dummy, stderr = self._run_command(
[*args, "--unlock-key-file=/dev/stdin", device], data=passphrase
)
else:
rc, dummy, stderr = self._run_command([*args, device])
if rc != 0:
raise ValueError(
f"Error while adding key to {device} with {args}: {stderr}"
)
# systemd-cryptenroll stores a policy hash in the LUKS metadata which is used to
# detect if a TPM2 device and PCR set are already enrolled. Computing it
# is very complicated, however, so rather than duplicating all of that here,
# we rely on the command's output which hopefully won't change.
# See: https://github.com/systemd/systemd/blob/7524671f74c9b0ea858a077ae9b1af3fe574d57e/src/cryptenroll/cryptenroll-tpm2.c#L529-L541
return "This PCR set is already enrolled, executing no operation." not in stderr
class ConditionsHandler(Handler):
def __init__(self, module: AnsibleModule, crypthandler: CryptHandler) -> None:
@@ -882,15 +1065,24 @@ class ConditionsHandler(Handler):
return device
def luks_create(self) -> bool:
return (
if (
self.device is not None
and (
self._module.params["keyfile"] is not None
or self._module.params["passphrase"] is not None
)
and self._module.params["state"] in ("present", "opened", "closed")
and not self._crypthandler.is_luks(self.device)
)
):
if (
self._module.params["keyfile"] is None
and self._module.params["passphrase"] is None
):
self._module.fail_json(
msg="Neither keyfile nor passphrase were given but the LUKS volume must be created"
)
return True
return False
def opened_luks_name(self, device: str) -> str | None:
"""If luks is already opened, return its name.
@@ -923,17 +1115,19 @@ class ConditionsHandler(Handler):
return name
def luks_open(self) -> bool:
if (
(
self._module.params["keyfile"] is None
and self._module.params["passphrase"] is None
)
or self.device is None
or self._module.params["state"] != "opened"
):
if self.device is None or self._module.params["state"] != "opened":
# conditions for open not fulfilled
return False
if (
self._module.params["keyfile"] is None
and self._module.params["passphrase"] is None
and self._module.params["tpm2_device"] is None
):
self._module.fail_json(
msg="state=opened was specified but none of keyfile, passphrase, or tpm2_device were given"
)
name = self.opened_luks_name(self.device)
return name is None
@@ -1049,6 +1243,25 @@ class ConditionsHandler(Handler):
and self._crypthandler.is_luks(self.device)
)
def luks_config(self) -> bool:
if self.device is None or self._module.params["keyslot_priority"] is None:
return False
if self._module.params["keyslot"] is None:
self._module.fail_json(
msg="keyslot_priority was specified but keyslot was not."
)
json_metadata = self._crypthandler.luks_dump_json_metadata(self.device)
slot_priority = (
json_metadata.get("keyslots", {})
.get(str(self._module.params["keyslot"]), {})
.get("priority", None)
)
priority_map = {"prefer": 2, "normal": None, "ignore": 0}
return slot_priority != priority_map[self._module.params["keyslot_priority"]]
def validate_keyslot(
self, param: str, luks_type: t.Literal["luks1", "luks2"] | None
) -> None:
@@ -1072,8 +1285,28 @@ class ConditionsHandler(Handler):
msg=f"{self._module.params[param]} must be between 0 and 31 when using LUKS2."
)
def systemd_cryptenroll(self) -> bool:
if self.device is None:
return False
def run_module() -> t.NoReturn:
if (
self._module.params["new_tpm2"] is not None
and self._module.params["new_tpm2_pcrs"] is None
):
self._module.fail_json(msg="new_tpm2_pcrs must be specified with new_tpm2")
if self._module.params["remove_tpm2"]:
json_metadata = self._crypthandler.luks_dump_json_metadata(self.device)
tokens = json_metadata.get("tokens", {})
tpm2_enrolled = any(
token.get("type") == "systemd-tpm2" for token in tokens.values()
)
return tpm2_enrolled or self._module.params["new_tpm2"] is not None
return self._module.params["new_tpm2"] is not None
def run_module() -> t.NoReturn: # noqa: C901
# available arguments/parameters that a user can pass
module_args = {
"state": {
@@ -1089,6 +1322,10 @@ def run_module() -> t.NoReturn:
"passphrase": {"type": "str", "no_log": True},
"new_passphrase": {"type": "str", "no_log": True},
"remove_passphrase": {"type": "str", "no_log": True},
"tpm2_device": {"type": "str"},
"new_tpm2": {"type": "str"},
"new_tpm2_pcrs": {"type": "str"},
"remove_tpm2": {"type": "bool", "default": False},
"passphrase_encoding": {
"type": "str",
"default": "text",
@@ -1096,6 +1333,11 @@ def run_module() -> t.NoReturn:
"no_log": False,
},
"keyslot": {"type": "int", "no_log": False},
"keyslot_priority": {
"type": "str",
"no_log": False,
"choices": ["prefer", "normal", "ignore"],
},
"new_keyslot": {"type": "int", "no_log": False},
"remove_keyslot": {"type": "int", "no_log": False},
"force_remove_last_key": {"type": "bool", "default": False},
@@ -1129,11 +1371,16 @@ def run_module() -> t.NoReturn:
}
mutually_exclusive = [
("keyfile", "passphrase"),
("keyfile", "passphrase", "tpm2_device"),
("new_keyfile", "new_passphrase"),
("remove_keyfile", "remove_passphrase", "remove_keyslot"),
]
required_by = {
"new_tpm2": ["new_tpm2_pcrs"],
"keyslot_priority": ["keyslot"],
}
# seed the result dict in the object
result: dict[str, t.Any] = {"changed": False, "name": None}
@@ -1141,6 +1388,7 @@ def run_module() -> t.NoReturn:
argument_spec=module_args,
supports_check_mode=True,
mutually_exclusive=mutually_exclusive,
required_by=required_by,
)
module.run_command_environ_update = {
"LANG": "C",
@@ -1235,18 +1483,33 @@ def run_module() -> t.NoReturn:
module.fail_json(msg=f"luks_device error: {e}")
if not module.check_mode:
try:
crypt.run_luks_open(
conditions.device,
module.params["keyfile"],
conditions.get_passphrase_from_module_params("passphrase"),
module.params["perf_same_cpu_crypt"],
module.params["perf_submit_from_crypt_cpus"],
module.params["perf_no_read_workqueue"],
module.params["perf_no_write_workqueue"],
module.params["persistent"],
module.params["allow_discards"],
name,
)
if (
module.params["keyfile"] is not None
or module.params["passphrase"] is not None
):
crypt.run_luks_open(
conditions.device,
module.params["keyfile"],
conditions.get_passphrase_from_module_params("passphrase"),
module.params["perf_same_cpu_crypt"],
module.params["perf_submit_from_crypt_cpus"],
module.params["perf_no_read_workqueue"],
module.params["perf_no_write_workqueue"],
module.params["persistent"],
module.params["allow_discards"],
name,
)
else:
crypt.run_systemd_cryptsetup_attach(
conditions.device,
module.params["tpm2_device"],
module.params["perf_same_cpu_crypt"],
module.params["perf_submit_from_crypt_cpus"],
module.params["perf_no_read_workqueue"],
module.params["perf_no_write_workqueue"],
module.params["allow_discards"],
name,
)
except ValueError as e:
module.fail_json(msg=f"luks_device error: {e}")
result["name"] = name
@@ -1326,6 +1589,40 @@ def run_module() -> t.NoReturn:
if module.check_mode:
module.exit_json(**result)
if conditions.systemd_cryptenroll():
assert conditions.device is not None
if not module.check_mode:
try:
changed = crypt.run_systemd_cryptenroll(
conditions.device,
module.params["keyfile"],
conditions.get_passphrase_from_module_params("passphrase"),
module.params["tpm2_device"],
module.params["new_tpm2"],
module.params["new_tpm2_pcrs"],
module.params["remove_tpm2"],
)
except ValueError as e:
module.fail_json(msg=f"luks_device error: {e}")
result["changed"] |= changed
if module.check_mode:
module.exit_json(**result)
if conditions.luks_config():
assert conditions.device is not None # ensured in conditions.luks_config()
if not module.check_mode:
try:
crypt.run_config(
conditions.device,
module.params["keyslot"],
module.params["keyslot_priority"],
)
except ValueError as e:
module.fail_json(msg=f"luks_device error: {e}")
result["changed"] = True
if module.check_mode:
module.exit_json(**result)
# Success - return result
module.exit_json(**result)

View File

@@ -98,7 +98,6 @@ LUKS_CREATE_DATA: list[
("dummy", "key", None, "present", False, None, "dummy", "dummy", True),
(None, "key", None, "present", False, None, "dummy", "dummy", False),
(None, "key", None, "present", False, "labelName", "dummy", "dummy", True),
("dummy", None, None, "present", False, None, "dummy", "dummy", False),
("dummy", "key", None, "absent", False, None, "dummy", "dummy", False),
("dummy", "key", None, "opened", True, None, "dummy", "dummy", False),
("dummy", "key", None, "closed", True, None, "dummy", "dummy", False),
@@ -106,7 +105,6 @@ LUKS_CREATE_DATA: list[
("dummy", None, "foo", "present", False, None, "dummy", "dummy", True),
(None, None, "bar", "present", False, None, "dummy", "dummy", False),
(None, None, "baz", "present", False, "labelName", "dummy", "dummy", True),
("dummy", None, None, "present", False, None, "dummy", "dummy", False),
("dummy", None, "quz", "absent", False, None, "dummy", "dummy", False),
("dummy", None, "qux", "opened", True, None, "dummy", "dummy", False),
("dummy", None, "quux", "closed", True, None, "dummy", "dummy", False),
@@ -114,6 +112,11 @@ LUKS_CREATE_DATA: list[
("dummy", "key", None, "present", False, None, None, None, True),
("dummy", "key", None, "present", False, None, None, "dummy", True),
("dummy", "key", None, "present", False, None, "dummy", None, True),
("dummy", None, None, "present", False, None, "dummy", None, "exception"),
("dummy", None, None, "opened", False, None, "dummy", None, "exception"),
("dummy", None, None, "closed", False, None, "dummy", None, "exception"),
("dummy", None, None, "absent", False, None, "dummy", None, False),
("dummy", None, None, "opened", True, None, "dummy", None, False),
]
# device, state, is_luks, expected
@@ -131,34 +134,35 @@ LUKS_REMOVE_DATA: list[
("dummy", "absent", False, False),
]
# device, key, passphrase, state, name, name_by_dev, expected
# device, key, passphrase, tpm2_device, state, name, name_by_dev, expected
LUKS_OPEN_DATA: list[
tuple[
str | None,
str | None,
str | None,
str | None,
t.Literal["present", "absent", "opened", "closed"],
str | None,
str | None,
bool | t.Literal["exception"],
]
] = [
("dummy", "key", None, "present", "name", None, False),
("dummy", "key", None, "absent", "name", None, False),
("dummy", "key", None, "closed", "name", None, False),
("dummy", "key", None, "opened", "name", None, True),
(None, "key", None, "opened", "name", None, False),
("dummy", None, None, "opened", "name", None, False),
("dummy", "key", None, "opened", "name", "name", False),
("dummy", "key", None, "opened", "beer", "name", "exception"),
("dummy", None, "foo", "present", "name", None, False),
("dummy", None, "bar", "absent", "name", None, False),
("dummy", None, "baz", "closed", "name", None, False),
("dummy", None, "qux", "opened", "name", None, True),
(None, None, "quux", "opened", "name", None, False),
("dummy", None, None, "opened", "name", None, False),
("dummy", None, "quuz", "opened", "name", "name", False),
("dummy", None, "corge", "opened", "beer", "name", "exception"),
("dummy", "key", None, None, "present", "name", None, False),
("dummy", "key", None, None, "absent", "name", None, False),
("dummy", "key", None, None, "closed", "name", None, False),
("dummy", "key", None, None, "opened", "name", None, True),
(None, "key", None, None, "opened", "name", None, False),
("dummy", "key", None, None, "opened", "name", "name", False),
("dummy", "key", None, None, "opened", "beer", "name", "exception"),
("dummy", None, "foo", None, "present", "name", None, False),
("dummy", None, "bar", None, "absent", "name", None, False),
("dummy", None, "baz", None, "closed", "name", None, False),
("dummy", None, "qux", None, "opened", "name", None, True),
(None, None, "quux", None, "opened", "name", None, False),
("dummy", None, "quuz", None, "opened", "name", "name", False),
("dummy", None, "corge", None, "opened", "beer", "name", "exception"),
("dummy", None, None, None, "opened", "name", None, "exception"),
("dummy", None, None, "auto", "opened", "name", None, True),
]
# device, dev_by_name, name, name_by_dev, state, label, expected
@@ -237,6 +241,42 @@ LUKS_REMOVE_KEY_DATA: list[
("dummy", None, "foo", None, "absent", None, "exception"),
]
# device, new_tpm2, new_tpm2_pcrs, remove_tpm2, existing_tpm2, expected
SYSTEMD_CRYPTENROLL_DATA: list[
tuple[
str | None,
str | None,
str | None,
bool,
bool,
bool | t.Literal["exception"],
]
] = [
("dummy", None, None, False, False, False),
("dummy", None, None, False, True, False),
("dummy", None, None, True, False, False),
("dummy", None, None, True, True, True),
("dummy", "auto", "0+1", False, False, True),
("dummy", "auto", "0+1", True, False, True),
("dummy", "auto", None, False, False, "exception"),
]
# device, keyslot_priority, existing_priority, expected
LUKS_CONFIG_DATA: list[
tuple[str | None, str | None, int | None, bool | t.Literal["exception"]]
] = [
("dummy", "prefer", 0, True),
("dummy", "prefer", None, True),
("dummy", "prefer", 2, False),
("dummy", "normal", 0, True),
("dummy", "normal", None, False),
("dummy", "normal", 2, True),
("dummy", "ignore", 0, False),
("dummy", "ignore", None, True),
("dummy", "ignore", 2, True),
(None, "normal", 0, False),
]
@pytest.mark.parametrize(
"device, keyfile, passphrase, state, is_luks, " + "label, cipher, hash_, expected",
@@ -307,13 +347,14 @@ def test_luks_remove(
@pytest.mark.parametrize(
"device, keyfile, passphrase, state, name, name_by_dev, expected",
((d[0], d[1], d[2], d[3], d[4], d[5], d[6]) for d in LUKS_OPEN_DATA),
"device, keyfile, passphrase, tpm2_device, state, name, name_by_dev, expected",
((d[0], d[1], d[2], d[3], d[4], d[5], d[6], d[7]) for d in LUKS_OPEN_DATA),
)
def test_luks_open(
device: str | None,
keyfile: str | None,
passphrase: str | None,
tpm2_device: str | None,
state: t.Literal["present", "absent", "opened", "closed"],
name: str | None,
name_by_dev: str | None,
@@ -324,6 +365,7 @@ def test_luks_open(
module.params["device"] = device
module.params["keyfile"] = keyfile
module.params["passphrase"] = passphrase
module.params["tpm2_device"] = tpm2_device
module.params["passphrase_encoding"] = "text"
module.params["state"] = state
module.params["name"] = name
@@ -468,3 +510,86 @@ def test_luks_remove_key(
assert conditions.luks_remove_key() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize(
"device, new_tpm2, new_tpm2_pcrs, remove_tpm2, existing_tpm2, expected",
((d[0], d[1], d[2], d[3], d[4], d[5]) for d in SYSTEMD_CRYPTENROLL_DATA),
)
def test_systemd_cryptenroll(
device: str | None,
new_tpm2: str | None,
new_tpm2_pcrs: str | None,
remove_tpm2: bool,
existing_tpm2: bool,
expected: bool | t.Literal["exception"],
monkeypatch: pytest.MonkeyPatch,
) -> None:
module = DummyModule()
module.params["device"] = device
module.params["passphrase_encoding"] = "text"
module.params["new_tpm2"] = new_tpm2
module.params["new_tpm2_pcrs"] = new_tpm2_pcrs
module.params["remove_tpm2"] = remove_tpm2
crypt = luks_device.CryptHandler(module) # type: ignore
def mock_luks_dump_json_metadata(
self: luks_device.CryptHandler, device: str
) -> dict[str, t.Any]:
return (
{"tokens": {"0": {"type": "systemd-tpm2"}}}
if existing_tpm2
else {"tokens": {}}
)
monkeypatch.setattr(
luks_device.CryptHandler,
"luks_dump_json_metadata",
mock_luks_dump_json_metadata,
)
try:
conditions = luks_device.ConditionsHandler(module, crypt) # type: ignore
assert conditions.systemd_cryptenroll() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize(
"device, keyslot_priority, existing_priority, expected",
((d[0], d[1], d[2], d[3]) for d in LUKS_CONFIG_DATA),
)
def test_luks_config(
device: str | None,
keyslot_priority: str | None,
existing_priority: int | None,
expected: bool | t.Literal["exception"],
monkeypatch: pytest.MonkeyPatch,
) -> None:
module = DummyModule()
module.params["device"] = device
module.params["passphrase_encoding"] = "text"
module.params["keyslot"] = 1
module.params["keyslot_priority"] = keyslot_priority
crypt = luks_device.CryptHandler(module) # type: ignore
def mock_luks_dump_json_metadata(
self: luks_device.CryptHandler, device: str
) -> dict[str, t.Any]:
return {"keyslots": {"1": {"priority": existing_priority}}}
monkeypatch.setattr(
luks_device.CryptHandler,
"luks_dump_json_metadata",
mock_luks_dump_json_metadata,
)
try:
conditions = luks_device.ConditionsHandler(module, crypt) # type: ignore
assert conditions.luks_config() == expected
except ValueError:
assert expected == "exception"