diff --git a/changelogs/fragments/964-luks-device-tpm2-priority.yml b/changelogs/fragments/964-luks-device-tpm2-priority.yml new file mode 100644 index 00000000..1f9221f3 --- /dev/null +++ b/changelogs/fragments/964-luks-device-tpm2-priority.yml @@ -0,0 +1,3 @@ +minor_changes: + - luks_device - add support for TPM2 enrollment using ``systemd-cryptsetup`` (https://github.com/ansible-collections/community.crypto/issues/850, https://github.com/ansible-collections/community.crypto/pull/972). + - luks_device - add support for keyslot priority (https://github.com/ansible-collections/community.crypto/issues/850, https://github.com/ansible-collections/community.crypto/pull/972). \ No newline at end of file diff --git a/plugins/modules/luks_device.py b/plugins/modules/luks_device.py index 6d5e45be..778c1f5e 100644 --- a/plugins/modules/luks_device.py +++ b/plugins/modules/luks_device.py @@ -38,9 +38,9 @@ options: - V(present) will create LUKS container unless already present. Requires O(device) and either O(keyfile) or O(passphrase) options to be provided. - V(absent) will remove existing LUKS container if it exists. Requires O(device) or O(name) to be specified. - - V(opened) will unlock the LUKS container. If it does not exist it will be created first. Requires O(device) and either - O(keyfile) or O(passphrase) to be specified. Use the O(name) option to set the name of the opened container. Otherwise - the name will be generated automatically and returned as a part of the result. + - V(opened) will unlock the LUKS container. Requires O(device) and one of O(keyfile), O(passphrase), O(tpm2_device) to be specified. + If the container does not exist it will be created first, however O(tpm2_device) can not be used for creation. + Use the O(name) option to set the name of the opened container. Otherwise the name will be generated automatically and returned as a part of the result. - V(closed) will lock the LUKS container. However if the container does not exist it will be created. Requires O(device) and either O(keyfile) or O(passphrase) options to be provided. If container does already exist O(device) or O(name) will suffice. @@ -66,6 +66,15 @@ options: another encoding, use the O(passphrase_encoding) option and provide the passphrase Base64 encoded. type: str version_added: '1.0.0' + tpm2_device: + description: + - Used to unlock the container, but can not be used for container creation. A device node path referring to a TPM2 chip (for example V(/dev/tpmrm0)). + Alternatively the special value V(auto) may be specified, in order to automatically determine the device node of a currently + discovered TPM2 device (of which there must be exactly one). + - B(Note) that only LUKS2 containers are supported + - B(Note) that systemd-cryptsetup (v256 or newer) is required. + type: str + version_added: '3.1.0' passphrase_encoding: description: - Determine how passphrases are provided to parameters such as O(passphrase), O(new_passphrase), and O(remove_passphrase). @@ -82,6 +91,7 @@ options: description: - Adds the O(keyfile) or O(passphrase) to a specific keyslot when creating a new container on O(device). Parameter value is the number of the keyslot. + - Defines the keyslot whose priority will be changed by O(keyslot_priority) - B(Note) that a device of O(type=luks1) supports the keyslot numbers V(0)-V(7) and a device of O(type=luks2) supports the keyslot numbers V(0)-V(31). In order to use the keyslots V(8)-V(31) when creating a new container, setting O(type) to V(luks2) is required. @@ -118,6 +128,21 @@ options: the keyslot numbers V(0)-V(31). type: int version_added: '2.16.0' + new_tpm2: + description: + - Adds a TPM2 security chip to given container on O(device). Expects a device node path referring to the TPM2 chip (e.g. V(/dev/tpmrm0)). + Alternatively the special value V(auto) may be specified, in order to automatically determine the device node of a currently + discovered TPM2 device (of which there must be exactly one). Requires O(new_tpm2_pcrs). + - B(Note) that O(new_keyslot) does not affect the keyslot for TPM2 enrollment. + - B(Note) that only LUKS2 containers are supported. + - B(Note) that systemd-cryptsetup (v248 or newer) is required. + type: str + version_added: '3.1.0' + new_tpm2_pcrs: + description: + - TPM2 PCRs (Platform Configuration Registers) to bind to. See systemd-cryptenroll documentation for details (C(--tpm2-pcrs) argument). + type: str + version_added: '3.1.0' remove_keyfile: description: - Removes given key from the container on O(device). Does not remove the keyfile from filesystem. Parameter value is @@ -137,6 +162,15 @@ options: another encoding, use the O(passphrase_encoding) option and provide the passphrase Base64 encoded. type: str version_added: '1.0.0' + remove_tpm2: + description: + - Removes B(all) key slots on O(device) that are unlocked by a TPM2 device. + Needs O(keyfile), O(passphrase), or O(tpm2_device) for authorization. + - B(Note) that systemd-cryptsetup (v248 or newer) is required. + - B(Note) that you should avoid using O(tpm2_device) to authorize removal of all TPM2 slots to ensure that you can still access the container afterwards. + type: bool + default: false + version_added: '3.1.0' remove_keyslot: description: - Removes the key in the given slot on O(device). Needs O(keyfile) or O(passphrase) for authorization. @@ -145,6 +179,13 @@ options: - B(Note) that the given O(keyfile) or O(passphrase) must not be in the slot to be removed. type: int version_added: '2.16.0' + keyslot_priority: + description: + - Sets the keyslot priority for the keyslot specified by O(keyslot). + - B(Note) that keyslot priority is only supported for LUKS2 containers. + type: str + choices: [prefer, normal, ignore] + version_added: '3.1.0' force_remove_last_key: description: - If set to V(true), allows removing the last key from a container. @@ -284,6 +325,7 @@ requirements: - "wipefs (when O(state) is V(absent))" - "lsblk" - "blkid (when O(label) or O(uuid) options are used)" + - "systemd-cryptsetup (for TPM2 only)" author: Jan Pokorny (@japokorn) """ @@ -407,6 +449,23 @@ EXAMPLES = r""" device: "/dev/loop0" keyfile: "/vault/keyfile" remove_keyslot: 4 + +- name: Enroll a TPM2 device using a keyfile to unlock the container + community.crypto.luks_device: + keyfile: "/vault/keyfile" + new_tpm2: "auto" + new_tpm2_pcrs: "1+3+5+7+11+12+14" + +- name: Remove all enrolled TPM2 devices + community.crypto.luks_device: + tpm2_device: "auto" + remove_tpm2: true + +- name: Set the priority of keyslot 0 to 'prefer' + community.crypto.luks_device: + device: "/dev/loop0" + keyslot: 0 + keyslot_priority: prefer """ RETURN = r""" @@ -417,6 +476,7 @@ name: sample: "luks-c1da9a58-2fde-4256-9d9f-6ab008b4dd1b" """ +import json import os import re import stat @@ -859,6 +919,129 @@ class CryptHandler(Handler): f"Error while testing whether keyslot exists on {device}: {stderr}" ) + def luks_dump_json_metadata(self, device: str) -> dict: + """Dump LUKS metadata in JSON format. + Raises ValueError when command fails. + """ + rc, stdout, stderr = self._run_command( + [self._cryptsetup_bin, "luksDump", "--dump-json-metadata", device] + ) + if rc != 0: + raise ValueError( + f"Error while dumping LUKS JSON metadata from {device}: {stderr}" + ) + + return json.loads(stdout) + + def run_config(self, device: str, keyslot: int, keyslot_priority: str) -> None: + """Configure LUKS keyslot priority. + Raises ValueError when command fails. + """ + rc, dummy, stderr = self._run_command( + [ + self._cryptsetup_bin, + "config", + f"--key-slot={keyslot}", + f"--priority={keyslot_priority}", + device, + ] + ) + if rc != 0: + raise ValueError( + f"Error while configuring LUKS keyslot {keyslot} on {device}: {stderr}" + ) + + def run_systemd_cryptsetup_attach( + self, + device: str, + tpm2_device: str | None, + perf_same_cpu_crypt: bool, + perf_submit_from_crypt_cpus: bool, + perf_no_read_workqueue: bool, + perf_no_write_workqueue: bool, + allow_discards: bool, + name: str, + ) -> None: + systemd_cryptsetup_bin = self._module.get_bin_path( + "systemd-cryptsetup", required=True + ) + args = [systemd_cryptsetup_bin, name, device, "none"] + options = [] + + if tpm2_device is not None: + options.append(f"tpm2-device={tpm2_device}") + + if perf_same_cpu_crypt: + options.append("same-cpu-crypt") + + if perf_submit_from_crypt_cpus: + options.append("submit-from-crypt-cpus") + + if perf_no_read_workqueue: + options.append("no-read-workqueue") + + if perf_no_write_workqueue: + options.append("no-write-workqueue") + + if allow_discards: + options.append("discard") + + args.append(",".join(options)) + + rc, dummy, stderr = self._run_command(args) + + if rc != 0: + raise ValueError(f"Error while opening {device}: {stderr}") + + def run_systemd_cryptenroll( + self, + device: str, + keyfile: str | None, + passphrase: bytes | None, + tpm2_device: str | None, + new_tpm2: str | None, + new_tpm2_pcrs: str | None, + remove_tpm2: bool, + ) -> bool: + systemd_cryptenroll_bin = self._module.get_bin_path( + "systemd-cryptenroll", required=True + ) + args = [systemd_cryptenroll_bin] + + if keyfile is not None: + args.append(f"--unlock-key-file={keyfile}") + + if tpm2_device: + args.append(f"--unlock-tpm2-device={tpm2_device}") + + if new_tpm2: + args.extend([f"--tpm2-device={new_tpm2}", f"--tpm2-pcrs={new_tpm2_pcrs}"]) + + if remove_tpm2: + args.append("--wipe-slot=tpm2") + + if passphrase: + # --unlock-key-file is needed both so a newline isnt required at the end of stdin + # and also because otherwise, systemd-cryptenroll will just hang if the passphrase is wrong, + # waiting for the user to input another one + rc, dummy, stderr = self._run_command( + [*args, "--unlock-key-file=/dev/stdin", device], data=passphrase + ) + else: + rc, dummy, stderr = self._run_command([*args, device]) + + if rc != 0: + raise ValueError( + f"Error while adding key to {device} with {args}: {stderr}" + ) + + # systemd-cryptenroll stores a policy hash in the LUKS metadata which is used to + # detect if a TPM2 device and PCR set are already enrolled. Computing it + # is very complicated, however, so rather than duplicating all of that here, + # we rely on the command's output which hopefully won't change. + # See: https://github.com/systemd/systemd/blob/7524671f74c9b0ea858a077ae9b1af3fe574d57e/src/cryptenroll/cryptenroll-tpm2.c#L529-L541 + return "This PCR set is already enrolled, executing no operation." not in stderr + class ConditionsHandler(Handler): def __init__(self, module: AnsibleModule, crypthandler: CryptHandler) -> None: @@ -882,15 +1065,24 @@ class ConditionsHandler(Handler): return device def luks_create(self) -> bool: - return ( + if ( self.device is not None - and ( - self._module.params["keyfile"] is not None - or self._module.params["passphrase"] is not None - ) and self._module.params["state"] in ("present", "opened", "closed") and not self._crypthandler.is_luks(self.device) - ) + ): + + if ( + self._module.params["keyfile"] is None + and self._module.params["passphrase"] is None + ): + + self._module.fail_json( + msg="Neither keyfile nor passphrase were given but the LUKS volume must be created" + ) + + return True + + return False def opened_luks_name(self, device: str) -> str | None: """If luks is already opened, return its name. @@ -923,17 +1115,19 @@ class ConditionsHandler(Handler): return name def luks_open(self) -> bool: - if ( - ( - self._module.params["keyfile"] is None - and self._module.params["passphrase"] is None - ) - or self.device is None - or self._module.params["state"] != "opened" - ): + if self.device is None or self._module.params["state"] != "opened": # conditions for open not fulfilled return False + if ( + self._module.params["keyfile"] is None + and self._module.params["passphrase"] is None + and self._module.params["tpm2_device"] is None + ): + self._module.fail_json( + msg="state=opened was specified but none of keyfile, passphrase, or tpm2_device were given" + ) + name = self.opened_luks_name(self.device) return name is None @@ -1049,6 +1243,25 @@ class ConditionsHandler(Handler): and self._crypthandler.is_luks(self.device) ) + def luks_config(self) -> bool: + if self.device is None or self._module.params["keyslot_priority"] is None: + return False + + if self._module.params["keyslot"] is None: + self._module.fail_json( + msg="keyslot_priority was specified but keyslot was not." + ) + + json_metadata = self._crypthandler.luks_dump_json_metadata(self.device) + slot_priority = ( + json_metadata.get("keyslots", {}) + .get(str(self._module.params["keyslot"]), {}) + .get("priority", None) + ) + + priority_map = {"prefer": 2, "normal": None, "ignore": 0} + return slot_priority != priority_map[self._module.params["keyslot_priority"]] + def validate_keyslot( self, param: str, luks_type: t.Literal["luks1", "luks2"] | None ) -> None: @@ -1072,8 +1285,28 @@ class ConditionsHandler(Handler): msg=f"{self._module.params[param]} must be between 0 and 31 when using LUKS2." ) + def systemd_cryptenroll(self) -> bool: + if self.device is None: + return False -def run_module() -> t.NoReturn: + if ( + self._module.params["new_tpm2"] is not None + and self._module.params["new_tpm2_pcrs"] is None + ): + self._module.fail_json(msg="new_tpm2_pcrs must be specified with new_tpm2") + + if self._module.params["remove_tpm2"]: + json_metadata = self._crypthandler.luks_dump_json_metadata(self.device) + tokens = json_metadata.get("tokens", {}) + tpm2_enrolled = any( + token.get("type") == "systemd-tpm2" for token in tokens.values() + ) + return tpm2_enrolled or self._module.params["new_tpm2"] is not None + + return self._module.params["new_tpm2"] is not None + + +def run_module() -> t.NoReturn: # noqa: C901 # available arguments/parameters that a user can pass module_args = { "state": { @@ -1089,6 +1322,10 @@ def run_module() -> t.NoReturn: "passphrase": {"type": "str", "no_log": True}, "new_passphrase": {"type": "str", "no_log": True}, "remove_passphrase": {"type": "str", "no_log": True}, + "tpm2_device": {"type": "str"}, + "new_tpm2": {"type": "str"}, + "new_tpm2_pcrs": {"type": "str"}, + "remove_tpm2": {"type": "bool", "default": False}, "passphrase_encoding": { "type": "str", "default": "text", @@ -1096,6 +1333,11 @@ def run_module() -> t.NoReturn: "no_log": False, }, "keyslot": {"type": "int", "no_log": False}, + "keyslot_priority": { + "type": "str", + "no_log": False, + "choices": ["prefer", "normal", "ignore"], + }, "new_keyslot": {"type": "int", "no_log": False}, "remove_keyslot": {"type": "int", "no_log": False}, "force_remove_last_key": {"type": "bool", "default": False}, @@ -1129,11 +1371,16 @@ def run_module() -> t.NoReturn: } mutually_exclusive = [ - ("keyfile", "passphrase"), + ("keyfile", "passphrase", "tpm2_device"), ("new_keyfile", "new_passphrase"), ("remove_keyfile", "remove_passphrase", "remove_keyslot"), ] + required_by = { + "new_tpm2": ["new_tpm2_pcrs"], + "keyslot_priority": ["keyslot"], + } + # seed the result dict in the object result: dict[str, t.Any] = {"changed": False, "name": None} @@ -1141,6 +1388,7 @@ def run_module() -> t.NoReturn: argument_spec=module_args, supports_check_mode=True, mutually_exclusive=mutually_exclusive, + required_by=required_by, ) module.run_command_environ_update = { "LANG": "C", @@ -1235,18 +1483,33 @@ def run_module() -> t.NoReturn: module.fail_json(msg=f"luks_device error: {e}") if not module.check_mode: try: - crypt.run_luks_open( - conditions.device, - module.params["keyfile"], - conditions.get_passphrase_from_module_params("passphrase"), - module.params["perf_same_cpu_crypt"], - module.params["perf_submit_from_crypt_cpus"], - module.params["perf_no_read_workqueue"], - module.params["perf_no_write_workqueue"], - module.params["persistent"], - module.params["allow_discards"], - name, - ) + if ( + module.params["keyfile"] is not None + or module.params["passphrase"] is not None + ): + crypt.run_luks_open( + conditions.device, + module.params["keyfile"], + conditions.get_passphrase_from_module_params("passphrase"), + module.params["perf_same_cpu_crypt"], + module.params["perf_submit_from_crypt_cpus"], + module.params["perf_no_read_workqueue"], + module.params["perf_no_write_workqueue"], + module.params["persistent"], + module.params["allow_discards"], + name, + ) + else: + crypt.run_systemd_cryptsetup_attach( + conditions.device, + module.params["tpm2_device"], + module.params["perf_same_cpu_crypt"], + module.params["perf_submit_from_crypt_cpus"], + module.params["perf_no_read_workqueue"], + module.params["perf_no_write_workqueue"], + module.params["allow_discards"], + name, + ) except ValueError as e: module.fail_json(msg=f"luks_device error: {e}") result["name"] = name @@ -1326,6 +1589,40 @@ def run_module() -> t.NoReturn: if module.check_mode: module.exit_json(**result) + if conditions.systemd_cryptenroll(): + assert conditions.device is not None + if not module.check_mode: + try: + changed = crypt.run_systemd_cryptenroll( + conditions.device, + module.params["keyfile"], + conditions.get_passphrase_from_module_params("passphrase"), + module.params["tpm2_device"], + module.params["new_tpm2"], + module.params["new_tpm2_pcrs"], + module.params["remove_tpm2"], + ) + except ValueError as e: + module.fail_json(msg=f"luks_device error: {e}") + result["changed"] |= changed + if module.check_mode: + module.exit_json(**result) + + if conditions.luks_config(): + assert conditions.device is not None # ensured in conditions.luks_config() + if not module.check_mode: + try: + crypt.run_config( + conditions.device, + module.params["keyslot"], + module.params["keyslot_priority"], + ) + except ValueError as e: + module.fail_json(msg=f"luks_device error: {e}") + result["changed"] = True + if module.check_mode: + module.exit_json(**result) + # Success - return result module.exit_json(**result) diff --git a/tests/unit/plugins/modules/test_luks_device.py b/tests/unit/plugins/modules/test_luks_device.py index 0f964c57..983f6fcd 100644 --- a/tests/unit/plugins/modules/test_luks_device.py +++ b/tests/unit/plugins/modules/test_luks_device.py @@ -98,7 +98,6 @@ LUKS_CREATE_DATA: list[ ("dummy", "key", None, "present", False, None, "dummy", "dummy", True), (None, "key", None, "present", False, None, "dummy", "dummy", False), (None, "key", None, "present", False, "labelName", "dummy", "dummy", True), - ("dummy", None, None, "present", False, None, "dummy", "dummy", False), ("dummy", "key", None, "absent", False, None, "dummy", "dummy", False), ("dummy", "key", None, "opened", True, None, "dummy", "dummy", False), ("dummy", "key", None, "closed", True, None, "dummy", "dummy", False), @@ -106,7 +105,6 @@ LUKS_CREATE_DATA: list[ ("dummy", None, "foo", "present", False, None, "dummy", "dummy", True), (None, None, "bar", "present", False, None, "dummy", "dummy", False), (None, None, "baz", "present", False, "labelName", "dummy", "dummy", True), - ("dummy", None, None, "present", False, None, "dummy", "dummy", False), ("dummy", None, "quz", "absent", False, None, "dummy", "dummy", False), ("dummy", None, "qux", "opened", True, None, "dummy", "dummy", False), ("dummy", None, "quux", "closed", True, None, "dummy", "dummy", False), @@ -114,6 +112,11 @@ LUKS_CREATE_DATA: list[ ("dummy", "key", None, "present", False, None, None, None, True), ("dummy", "key", None, "present", False, None, None, "dummy", True), ("dummy", "key", None, "present", False, None, "dummy", None, True), + ("dummy", None, None, "present", False, None, "dummy", None, "exception"), + ("dummy", None, None, "opened", False, None, "dummy", None, "exception"), + ("dummy", None, None, "closed", False, None, "dummy", None, "exception"), + ("dummy", None, None, "absent", False, None, "dummy", None, False), + ("dummy", None, None, "opened", True, None, "dummy", None, False), ] # device, state, is_luks, expected @@ -131,34 +134,35 @@ LUKS_REMOVE_DATA: list[ ("dummy", "absent", False, False), ] -# device, key, passphrase, state, name, name_by_dev, expected +# device, key, passphrase, tpm2_device, state, name, name_by_dev, expected LUKS_OPEN_DATA: list[ tuple[ str | None, str | None, str | None, + str | None, t.Literal["present", "absent", "opened", "closed"], str | None, str | None, bool | t.Literal["exception"], ] ] = [ - ("dummy", "key", None, "present", "name", None, False), - ("dummy", "key", None, "absent", "name", None, False), - ("dummy", "key", None, "closed", "name", None, False), - ("dummy", "key", None, "opened", "name", None, True), - (None, "key", None, "opened", "name", None, False), - ("dummy", None, None, "opened", "name", None, False), - ("dummy", "key", None, "opened", "name", "name", False), - ("dummy", "key", None, "opened", "beer", "name", "exception"), - ("dummy", None, "foo", "present", "name", None, False), - ("dummy", None, "bar", "absent", "name", None, False), - ("dummy", None, "baz", "closed", "name", None, False), - ("dummy", None, "qux", "opened", "name", None, True), - (None, None, "quux", "opened", "name", None, False), - ("dummy", None, None, "opened", "name", None, False), - ("dummy", None, "quuz", "opened", "name", "name", False), - ("dummy", None, "corge", "opened", "beer", "name", "exception"), + ("dummy", "key", None, None, "present", "name", None, False), + ("dummy", "key", None, None, "absent", "name", None, False), + ("dummy", "key", None, None, "closed", "name", None, False), + ("dummy", "key", None, None, "opened", "name", None, True), + (None, "key", None, None, "opened", "name", None, False), + ("dummy", "key", None, None, "opened", "name", "name", False), + ("dummy", "key", None, None, "opened", "beer", "name", "exception"), + ("dummy", None, "foo", None, "present", "name", None, False), + ("dummy", None, "bar", None, "absent", "name", None, False), + ("dummy", None, "baz", None, "closed", "name", None, False), + ("dummy", None, "qux", None, "opened", "name", None, True), + (None, None, "quux", None, "opened", "name", None, False), + ("dummy", None, "quuz", None, "opened", "name", "name", False), + ("dummy", None, "corge", None, "opened", "beer", "name", "exception"), + ("dummy", None, None, None, "opened", "name", None, "exception"), + ("dummy", None, None, "auto", "opened", "name", None, True), ] # device, dev_by_name, name, name_by_dev, state, label, expected @@ -237,6 +241,42 @@ LUKS_REMOVE_KEY_DATA: list[ ("dummy", None, "foo", None, "absent", None, "exception"), ] +# device, new_tpm2, new_tpm2_pcrs, remove_tpm2, existing_tpm2, expected +SYSTEMD_CRYPTENROLL_DATA: list[ + tuple[ + str | None, + str | None, + str | None, + bool, + bool, + bool | t.Literal["exception"], + ] +] = [ + ("dummy", None, None, False, False, False), + ("dummy", None, None, False, True, False), + ("dummy", None, None, True, False, False), + ("dummy", None, None, True, True, True), + ("dummy", "auto", "0+1", False, False, True), + ("dummy", "auto", "0+1", True, False, True), + ("dummy", "auto", None, False, False, "exception"), +] + +# device, keyslot_priority, existing_priority, expected +LUKS_CONFIG_DATA: list[ + tuple[str | None, str | None, int | None, bool | t.Literal["exception"]] +] = [ + ("dummy", "prefer", 0, True), + ("dummy", "prefer", None, True), + ("dummy", "prefer", 2, False), + ("dummy", "normal", 0, True), + ("dummy", "normal", None, False), + ("dummy", "normal", 2, True), + ("dummy", "ignore", 0, False), + ("dummy", "ignore", None, True), + ("dummy", "ignore", 2, True), + (None, "normal", 0, False), +] + @pytest.mark.parametrize( "device, keyfile, passphrase, state, is_luks, " + "label, cipher, hash_, expected", @@ -307,13 +347,14 @@ def test_luks_remove( @pytest.mark.parametrize( - "device, keyfile, passphrase, state, name, name_by_dev, expected", - ((d[0], d[1], d[2], d[3], d[4], d[5], d[6]) for d in LUKS_OPEN_DATA), + "device, keyfile, passphrase, tpm2_device, state, name, name_by_dev, expected", + ((d[0], d[1], d[2], d[3], d[4], d[5], d[6], d[7]) for d in LUKS_OPEN_DATA), ) def test_luks_open( device: str | None, keyfile: str | None, passphrase: str | None, + tpm2_device: str | None, state: t.Literal["present", "absent", "opened", "closed"], name: str | None, name_by_dev: str | None, @@ -324,6 +365,7 @@ def test_luks_open( module.params["device"] = device module.params["keyfile"] = keyfile module.params["passphrase"] = passphrase + module.params["tpm2_device"] = tpm2_device module.params["passphrase_encoding"] = "text" module.params["state"] = state module.params["name"] = name @@ -468,3 +510,86 @@ def test_luks_remove_key( assert conditions.luks_remove_key() == expected except ValueError: assert expected == "exception" + + +@pytest.mark.parametrize( + "device, new_tpm2, new_tpm2_pcrs, remove_tpm2, existing_tpm2, expected", + ((d[0], d[1], d[2], d[3], d[4], d[5]) for d in SYSTEMD_CRYPTENROLL_DATA), +) +def test_systemd_cryptenroll( + device: str | None, + new_tpm2: str | None, + new_tpm2_pcrs: str | None, + remove_tpm2: bool, + existing_tpm2: bool, + expected: bool | t.Literal["exception"], + monkeypatch: pytest.MonkeyPatch, +) -> None: + module = DummyModule() + + module.params["device"] = device + module.params["passphrase_encoding"] = "text" + module.params["new_tpm2"] = new_tpm2 + module.params["new_tpm2_pcrs"] = new_tpm2_pcrs + module.params["remove_tpm2"] = remove_tpm2 + + crypt = luks_device.CryptHandler(module) # type: ignore + + def mock_luks_dump_json_metadata( + self: luks_device.CryptHandler, device: str + ) -> dict[str, t.Any]: + return ( + {"tokens": {"0": {"type": "systemd-tpm2"}}} + if existing_tpm2 + else {"tokens": {}} + ) + + monkeypatch.setattr( + luks_device.CryptHandler, + "luks_dump_json_metadata", + mock_luks_dump_json_metadata, + ) + + try: + conditions = luks_device.ConditionsHandler(module, crypt) # type: ignore + assert conditions.systemd_cryptenroll() == expected + except ValueError: + assert expected == "exception" + + +@pytest.mark.parametrize( + "device, keyslot_priority, existing_priority, expected", + ((d[0], d[1], d[2], d[3]) for d in LUKS_CONFIG_DATA), +) +def test_luks_config( + device: str | None, + keyslot_priority: str | None, + existing_priority: int | None, + expected: bool | t.Literal["exception"], + monkeypatch: pytest.MonkeyPatch, +) -> None: + module = DummyModule() + + module.params["device"] = device + module.params["passphrase_encoding"] = "text" + module.params["keyslot"] = 1 + module.params["keyslot_priority"] = keyslot_priority + + crypt = luks_device.CryptHandler(module) # type: ignore + + def mock_luks_dump_json_metadata( + self: luks_device.CryptHandler, device: str + ) -> dict[str, t.Any]: + return {"keyslots": {"1": {"priority": existing_priority}}} + + monkeypatch.setattr( + luks_device.CryptHandler, + "luks_dump_json_metadata", + mock_luks_dump_json_metadata, + ) + + try: + conditions = luks_device.ConditionsHandler(module, crypt) # type: ignore + assert conditions.luks_config() == expected + except ValueError: + assert expected == "exception"