mirror of
https://github.com/ansible-collections/community.crypto.git
synced 2026-05-06 13:22:58 +00:00
ACME: add dns-persist-01 support (#997)
* Add dns-persist-01 DNS TXT record filters. * Refactor parsing and joining CAA issue-values out. * Add basic tests. * Fix bug and add integration tests for filters. * Add dns-persist-01 support to ACME modules. * Add changelog fragment.
This commit is contained in:
@@ -26,6 +26,9 @@ from ansible_collections.community.crypto.plugins.module_utils._acme.errors impo
|
||||
from ansible_collections.community.crypto.plugins.module_utils._acme.utils import (
|
||||
nopad_b64,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils._caa import (
|
||||
_check_domain_name,
|
||||
)
|
||||
|
||||
if t.TYPE_CHECKING:
|
||||
from ansible.module_utils.basic import AnsibleModule # pragma: no cover
|
||||
@@ -104,6 +107,52 @@ class Challenge:
|
||||
def get_validation_data(
|
||||
self, *, client: ACMEClient, identifier_type: str, identifier: str
|
||||
) -> dict[str, t.Any] | None:
|
||||
if self.type == "dns-persist-01":
|
||||
# https://www.ietf.org/archive/id/draft-ietf-acme-dns-persist-01.html#section-3.1
|
||||
account_uri = self.data.get("accounturi")
|
||||
issuer_domain_names = self.data.get("issuer-domain-names")
|
||||
if account_uri is None:
|
||||
# In version 00 of the draft, accounturi isn't present.
|
||||
# Since that's what Pebble currently implements,
|
||||
# let's fake the value if we have it.
|
||||
# (https://www.ietf.org/archive/id/draft-ietf-acme-dns-persist-00.html#section-6)
|
||||
account_uri = client.account_uri
|
||||
if (
|
||||
not isinstance(account_uri, str)
|
||||
or not isinstance(issuer_domain_names, list)
|
||||
or not all(isinstance(idn, str) for idn in issuer_domain_names)
|
||||
):
|
||||
return None
|
||||
if client.account_uri is not None and account_uri != client.account_uri:
|
||||
# While the RFC doesn't demand this, I think it's a bad sign if the account URIs disagree.
|
||||
# Better err on the side of caution...
|
||||
client.module.warn(
|
||||
f"The dns-persist-01 challenge for DNS:{identifier} has account URI {account_uri!r},"
|
||||
f" while the client is has account URI {client.account_uri}. Ignoring malformed challenge."
|
||||
)
|
||||
return None
|
||||
if not (1 <= len(issuer_domain_names) <= 10):
|
||||
client.module.warn(
|
||||
f"The dns-persist-01 challenge for DNS:{identifier} has {len(issuer_domain_names)}"
|
||||
" issuer domain names, which is not in [1, 10]. Ignoring malformed challenge."
|
||||
)
|
||||
return None
|
||||
for idn in issuer_domain_names:
|
||||
try:
|
||||
_check_domain_name(idn)
|
||||
if idn != idn.lower() or len(idn) > 253:
|
||||
raise ValueError()
|
||||
except ValueError:
|
||||
client.module.warn(
|
||||
f"The dns-persist-01 challenge for DNS:{identifier} has an invalid"
|
||||
f" issuer domain name {idn!r}. Ignoring malformed challenge."
|
||||
)
|
||||
return None
|
||||
return {
|
||||
"account_uri": account_uri,
|
||||
"issuer_domain_names": issuer_domain_names,
|
||||
}
|
||||
|
||||
if self.token is None:
|
||||
return None
|
||||
|
||||
|
||||
98
plugins/module_utils/_caa.py
Normal file
98
plugins/module_utils/_caa.py
Normal file
@@ -0,0 +1,98 @@
|
||||
# Copyright (c) 2020, Felix Fontein <felix@fontein.de>
|
||||
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
# Note that this module util is **PRIVATE** to the collection. It can have breaking changes at any time.
|
||||
# Do not use this from other collections or standalone plugins/modules!
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
_VALUE_RE = re.compile("^[\x21-\x3a\x3c-\x7e]*$")
|
||||
_LABEL_RE = re.compile("^[0-9a-zA-Z][0-9a-zA-Z-]*$")
|
||||
|
||||
|
||||
def _check_value(value: str) -> None:
|
||||
if not _VALUE_RE.match(value):
|
||||
raise ValueError(f"Invalid value {value!r}")
|
||||
|
||||
|
||||
def _check_label(label: str, what: str) -> None:
|
||||
if not _LABEL_RE.match(label):
|
||||
raise ValueError(f"Invalid {what} {label!r}")
|
||||
|
||||
|
||||
def _check_domain_name(value: str) -> None:
|
||||
for p in value.split("."):
|
||||
_check_label(p, "label")
|
||||
|
||||
|
||||
def parse_issue_value(
|
||||
value: str, *, check_for_duplicates: bool = True, strict: bool = True
|
||||
) -> tuple[str | None, list[tuple[str, str]]]:
|
||||
"""
|
||||
Given a CAA issue property, parses it according to the syntax defined in RFC 8659.
|
||||
|
||||
More precisely, see https://www.rfc-editor.org/rfc/rfc8659.html#section-4.2.
|
||||
|
||||
If ``check_for_duplicates == True``, duplicate tags are reported as an error.
|
||||
If ``strict == True``, invalid characters are reported as an error.
|
||||
"""
|
||||
parts = [v.strip(" \t") for v in value.split(";")]
|
||||
if len(parts) > 1 and not parts[-1]:
|
||||
del parts[-1]
|
||||
domain_name = parts[0] or None
|
||||
if domain_name is not None and strict:
|
||||
_check_domain_name(domain_name)
|
||||
pairs = []
|
||||
previous_tags: set[str] = set()
|
||||
for part in parts[1:]:
|
||||
pieces = part.split("=", 1)
|
||||
if len(pieces) != 2:
|
||||
raise ValueError(f"{part!r} is not of the form tag=value")
|
||||
tag, value = pieces[0].rstrip(" \t"), pieces[1].lstrip(" \t")
|
||||
if strict:
|
||||
_check_label(tag, "tag")
|
||||
_check_value(value)
|
||||
pairs.append((tag, value))
|
||||
if check_for_duplicates:
|
||||
if tag in previous_tags:
|
||||
raise ValueError(f"Tag {tag!r} appears multiple times")
|
||||
previous_tags.add(tag)
|
||||
return domain_name, pairs
|
||||
|
||||
|
||||
def join_issue_value(
|
||||
domain_name: str | None,
|
||||
pairs: list[tuple[str, str]],
|
||||
*,
|
||||
check_for_duplicates: bool = True,
|
||||
strict: bool = True,
|
||||
) -> str:
|
||||
"""
|
||||
Given a domain name and a list of tag-value pairs, joins them according
|
||||
to the syntax defined in RFC 8659.
|
||||
|
||||
More precisely, see https://www.rfc-editor.org/rfc/rfc8659.html#section-4.2.
|
||||
|
||||
If ``check_for_duplicates == True``, duplicate tags are reported as an error.
|
||||
If ``strict == True``, invalid characters are reported as an error.
|
||||
"""
|
||||
if domain_name is not None and strict:
|
||||
_check_domain_name(domain_name)
|
||||
parts = [domain_name or ""]
|
||||
previous_tags: set[str] = set()
|
||||
for tag, value in pairs:
|
||||
if strict:
|
||||
_check_label(tag, "tag")
|
||||
_check_value(value)
|
||||
parts.append(f"{tag}={value}")
|
||||
if check_for_duplicates:
|
||||
if tag in previous_tags:
|
||||
raise ValueError(f"Tag {tag!r} appears multiple times")
|
||||
previous_tags.add(tag)
|
||||
return "; ".join(parts)
|
||||
|
||||
|
||||
__all__ = ("parse_issue_value",)
|
||||
Reference in New Issue
Block a user