Files
ansible-freeipa/roles/ipareplica/module_utils/ansible_ipa_replica.py
Thomas Woerner cd440a2049 ipareplica: Add support for DNS over TLS
This change adds support for DNS over TLS to the ipareplica role.

New variables

ipareplica_dot_forwarders
    List of DNS over TLS forwarders. Required if ipareplica_dns_over_tls
    is enabled. (list of strings)
    required: false
ipareplica_dns_over_tls | ipaclient_dns_over_tls
    Configure DNS over TLS. Requires FreeIPA version 4.12.5 or later.
    (bool, default: false)
    required: false
ipareplica_dns_over_tls_cert
    Certificate to use for DNS over TLS. If empty, a new certificate will
    be requested from IPA CA. (string)
    required: false
ipareplica_dns_over_tls_key
    Key for certificate specified in ipareplica_dns_over_tls_cert. (string)
    required: false
ipareplica_dns_policy
    Encrypted DNS policy. Only usable if `ipareplica_dns_over_tls` is
    enabled. (choice: relaxed, enforced, default: relaxed)
    required: false

New distribution specific variable

ipareplica_packages_dot
    List of IPA packages needed for DNS over TLS.
2025-07-21 13:38:36 +02:00

523 lines
18 KiB
Python

# -*- coding: utf-8 -*-
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# Based on ipa-replica-install code
#
# Copyright (C) 2018-2022 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
__all__ = ["contextlib", "dnsexception", "dnsresolver", "dnsreversename",
"parse_version", "IPAChangeConf",
"certstore", "sysrestore", "ipa_generate_password", "kinit_keytab",
"IPA_CA_TRUST_FLAGS", "EXTERNAL_CA_TRUST_FLAGS", "DN",
"ScriptError", "services", "tasks", "constants", "errors", "rpc",
"x509", "validate_domain_name",
"no_matching_interface_for_ip_address_warning",
"configure_krb5_conf", "purge_host_keytab", "adtrust",
"bindinstance", "ca", "certs", "dns", "httpinstance", "kra",
"otpdinstance", "custodiainstance", "service", "upgradeinstance",
"find_providing_servers", "find_providing_server", "load_pkcs12",
"is_ipa_configured", "ReplicationManager", "replica_conn_check",
"install_replica_ds", "install_krb", "install_ca_cert",
"install_http", "install_dns_records", "create_ipa_conf",
"check_dirsrv", "check_dns_resolution", "configure_certmonger",
"remove_replica_info_dir", "preserve_enrollment_state",
"uninstall_client", "promote_sssd", "promote_openldap_conf",
"rpc_client", "check_remote_fips_mode", "check_remote_version",
"common_check", "current_domain_level",
"check_domain_level_is_supported", "promotion_check_ipa_domain",
"SSSDConfig", "CalledProcessError", "timeconf", "ntpinstance",
"dnsname", "kernel_keyring", "krbinstance", "getargspec",
"adtrustinstance", "paths", "api", "dsinstance", "ipaldap", "Env",
"ipautil", "installutils", "IPA_PYTHON_VERSION", "NUM_VERSION",
"ReplicaConfig", "create_api", "clean_up_hsm_nicknames"]
import sys
import logging
# Import getargspec from inspect or provide own getargspec for
# Python 2 compatibility with Python 3.11+.
try:
from inspect import getargspec
except ImportError:
from collections import namedtuple
from inspect import getfullargspec
# The code is copied from Python 3.10 inspect.py
# Authors: Ka-Ping Yee <ping@lfw.org>
# Yury Selivanov <yselivanov@sprymix.com>
ArgSpec = namedtuple('ArgSpec', 'args varargs keywords defaults')
def getargspec(func):
args, varargs, varkw, defaults, kwonlyargs, _kwonlydefaults, \
ann = getfullargspec(func)
if kwonlyargs or ann:
raise ValueError(
"Function has keyword-only parameters or annotations"
", use inspect.signature() API which can support them")
return ArgSpec(args, varargs, varkw, defaults)
try:
from contextlib import contextmanager as contextlib_contextmanager
from ipapython.version import NUM_VERSION, VERSION
try:
from ipapython.version import parse_version
except ImportError:
# In IPA we either need pkg_resources or packaging Version
# class to compare versions with check_remote_version, so
# we let an exception to be raised if neither is available.
from pkg_resources import parse_version
if NUM_VERSION < 30201:
# See ipapython/version.py
IPA_MAJOR, IPA_MINOR, IPA_RELEASE = [int(x) for x in
VERSION.split(".", 2)]
IPA_PYTHON_VERSION = IPA_MAJOR * 10000 + IPA_MINOR * 100 + IPA_RELEASE
else:
IPA_PYTHON_VERSION = NUM_VERSION
if NUM_VERSION >= 40600:
# IPA version >= 4.6
import contextlib
import dns.exception as dnsexception
import dns.name as dnsname
import dns.resolver as dnsresolver
import dns.reversename as dnsreversename
from ipaclient.install.ipachangeconf import IPAChangeConf
from ipalib.install import certstore, sysrestore
from ipapython.ipautil import ipa_generate_password
try:
from ipalib.kinit import kinit_keytab
except ImportError:
from ipalib.install.kinit import kinit_keytab
from ipapython import ipaldap, ipautil, kernel_keyring
from ipapython.certdb import IPA_CA_TRUST_FLAGS, \
EXTERNAL_CA_TRUST_FLAGS
from ipapython.dn import DN
from ipapython.admintool import ScriptError
from ipapython.ipa_log_manager import standard_logging_setup
from ipaplatform import services
from ipaplatform.tasks import tasks
from ipaplatform.paths import paths
from ipalib import api, constants, create_api, errors, rpc, x509
from ipalib.config import Env
from ipalib.util import (
validate_domain_name,
no_matching_interface_for_ip_address_warning)
from ipaclient.install.client import configure_krb5_conf, \
purge_host_keytab
from ipaserver.install import (
adtrust, bindinstance, ca, certs, dns, dsinstance, httpinstance,
installutils, kra, krbinstance,
otpdinstance, custodiainstance, service, upgradeinstance)
from ipaserver.install import adtrustinstance
try:
from ipaserver.masters import (
find_providing_servers, find_providing_server)
except ImportError:
from ipaserver.install.service import (
find_providing_servers, find_providing_server)
from ipaserver.install.installutils import (
ReplicaConfig, load_pkcs12)
try:
from ipalib.facts import is_ipa_configured
except ImportError:
from ipaserver.install.installutils import is_ipa_configured
from ipaserver.install.replication import (
ReplicationManager, replica_conn_check)
from ipaserver.install.server.replicainstall import (
make_pkcs12_info, install_replica_ds, install_krb,
install_http, install_dns_records, create_ipa_conf, check_dirsrv,
check_dns_resolution, configure_certmonger,
remove_replica_info_dir,
# common_cleanup,
preserve_enrollment_state, uninstall_client,
promote_sssd, promote_openldap_conf, rpc_client,
check_remote_fips_mode, check_remote_version, common_check,
current_domain_level, check_domain_level_is_supported,
# enroll_dl0_replica,
# ensure_enrolled,
promotion_check_ipa_domain
)
try:
from ipaserver.install.server.replicainstall import \
install_ca_cert
except ImportError:
install_ca_cert = None
try:
from ipaserver.install.server.replicainstall import \
clean_up_hsm_nicknames
except ImportError:
clean_up_hsm_nicknames = None
import SSSDConfig
from subprocess import CalledProcessError
try:
from ipaclient.install import timeconf
time_service = "chronyd" # pylint: disable=invalid-name
ntpinstance = None # pylint: disable=invalid-name
except ImportError:
try:
from ipaclient.install import ntpconf as timeconf
except ImportError:
from ipaclient import ntpconf as timeconf
from ipaserver.install import ntpinstance
time_service = "ntpd" # pylint: disable=invalid-name
try:
CLIENT_SUPPORTS_NO_DNSSEC_VALIDATION = False
from ipaclient.install.client import ClientInstallInterface
except ImportError:
pass
else:
if hasattr(ClientInstallInterface, "no_dnssec_validation"):
CLIENT_SUPPORTS_NO_DNSSEC_VALIDATION = True
else:
# IPA version < 4.6
raise RuntimeError("freeipa version '%s' is too old" % VERSION)
except ImportError as _err:
ANSIBLE_IPA_REPLICA_MODULE_IMPORT_ERROR = str(_err)
for attr in __all__:
setattr(sys.modules[__name__], attr, None)
else:
ANSIBLE_IPA_REPLICA_MODULE_IMPORT_ERROR = None
logger = logging.getLogger("ipa-server-install")
def setup_logging():
# logger.setLevel(logging.DEBUG)
standard_logging_setup(
paths.IPAREPLICA_INSTALL_LOG, verbose=False, debug=False,
filemode='a', console_format='%(message)s')
@contextlib_contextmanager
def redirect_stdout(stream):
sys.stdout = stream
try:
yield stream
finally:
sys.stdout = sys.__stdout__
class AnsibleModuleLog():
def __init__(self, module):
self.module = module
_ansible_module_log = self
class AnsibleLoggingHandler(logging.Handler):
def emit(self, record):
_ansible_module_log.write(self.format(record))
self.logging_handler = AnsibleLoggingHandler()
logger.setLevel(logging.DEBUG)
logger.root.addHandler(self.logging_handler)
def close(self):
self.flush()
def flush(self):
pass
def log(self, msg):
# self.write(msg+"\n")
self.write(msg)
def debug(self, msg):
self.module.debug(msg)
def info(self, msg):
self.module.debug(msg)
@staticmethod
def isatty():
return False
def write(self, msg):
self.module.debug(msg)
# self.module.warn(msg)
# pylint: disable=too-many-instance-attributes, useless-object-inheritance
class installer_obj(object): # pylint: disable=invalid-name
def __init__(self):
# CompatServerReplicaInstall
self.ca_cert_files = None
self.all_ip_addresses = False
self.no_wait_for_dns = True
self.nisdomain = None
self.no_nisdomain = False
self.no_sudo = False
self.request_cert = False
self.ca_file = None
self.zonemgr = None
self.replica_file = None
# ServerReplicaInstall
self.subject_base = None
self.ca_subject = None
# others
self._ccache = None
self.password = None
self.reverse_zones = []
# def _is_promote(self):
# return self.replica_file is None
# self.skip_conncheck = False
self._replica_install = False
# self.dnssec_master = False # future unknown
# self.disable_dnssec_master = False # future unknown
# self.domainlevel = MAX_DOMAIN_LEVEL # deprecated
# self.domain_level = self.domainlevel # deprecated
self.interactive = False
self.unattended = not self.interactive
# self.promote = self.replica_file is None
self.promote = True
self.skip_schema_check = None
# def __getattribute__(self, attr):
# value = super(installer_obj, self).__getattribute__(attr)
# if not attr.startswith("--") and not attr.endswith("--"):
# logger.debug(
# " <-- Accessing installer.%s (%s)" %
# (attr, repr(value)))
# return value
def __getattr__(self, attrname):
logger.info(" --> ADDING missing installer.%s", attrname)
setattr(self, attrname, None)
return getattr(self, attrname)
# def __setattr__(self, attr, value):
# logger.debug(" --> Setting installer.%s to %s" %
# (attr, repr(value)))
# return super(installer_obj, self).__setattr__(attr, value)
def knobs(self):
for name in self.__dict__:
yield self, name
# pylint: enable=too-many-instance-attributes, useless-object-inheritance
# pylint: disable=attribute-defined-outside-init
installer = installer_obj()
options = installer
# DNSInstallInterface
options.dnssec_master = False
options.disable_dnssec_master = False
options.kasp_db_file = None
options.force = False
# ServerMasterInstall
options.add_sids = False
options.add_agents = False
# ServerReplicaInstall
options.subject_base = None
options.ca_subject = None
# pylint: enable=attribute-defined-outside-init
def gen_env_boostrap_finalize_core(etc_ipa, default_config):
env = Env()
# env._bootstrap(context='installer', confdir=paths.ETC_IPA, log=None)
# env._finalize_core(**dict(constants.DEFAULT_CONFIG))
env._bootstrap(context='installer', confdir=etc_ipa, log=None)
env._finalize_core(**dict(default_config))
return env
def api_bootstrap_finalize(env):
# pylint: disable=no-member
xmlrpc_uri = \
'https://{0}/ipa/xml'.format(ipautil.format_netloc(env.host))
api.bootstrap(in_server=True,
context='installer',
confdir=paths.ETC_IPA,
ldap_uri=installutils.realm_to_ldapi_uri(env.realm),
xmlrpc_uri=xmlrpc_uri)
# pylint: enable=no-member
api.finalize()
def gen_ReplicaConfig(): # pylint: disable=invalid-name
# pylint: disable=too-many-instance-attributes
class ExtendedReplicaConfig(ReplicaConfig):
# pylint: disable=useless-super-delegation
def __init__(self, top_dir=None):
# pylint: disable=super-with-arguments
super(ExtendedReplicaConfig, self).__init__(top_dir)
# def __getattribute__(self, attr):
# value = super(ExtendedReplicaConfig, self).__getattribute__(
# attr)
# if attr not in ["__dict__", "knobs"]:
# logger.debug(" <== Accessing config.%s (%s)" %
# (attr, repr(value)))
# return value\
# pylint: enable=useless-super-delegation
def __getattr__(self, attrname):
logger.info(" ==> ADDING missing config.%s", attrname)
setattr(self, attrname, None)
return getattr(self, attrname)
# def __setattr__(self, attr, value):
# logger.debug(" ==> Setting config.%s to %s" %
# (attr, repr(value)))
# return super(ExtendedReplicaConfig, self).__setattr__(attr,
# value)
def knobs(self):
for name in self.__dict__:
yield self, name
# pylint: enable=too-many-instance-attributes
# pylint: disable=attribute-defined-outside-init
# config = ReplicaConfig()
config = ExtendedReplicaConfig()
config.realm_name = api.env.realm
config.host_name = api.env.host
config.domain_name = api.env.domain
config.master_host_name = api.env.server
config.ca_host_name = api.env.ca_host
config.kra_host_name = config.ca_host_name
config.ca_ds_port = 389
config.setup_ca = options.setup_ca
config.setup_kra = options.setup_kra
config.dir = options._top_dir
config.basedn = api.env.basedn
# config.subject_base = options.subject_base
# pylint: enable=attribute-defined-outside-init
return config
def replica_ds_init_info(ansible_log,
config, options_, ca_is_configured, remote_api,
ds_ca_subject, ca_file,
promote=False, pkcs12_info=None):
dsinstance.check_ports()
# if we have a pkcs12 file, create the cert db from
# that. Otherwise the ds setup will create the CA
# cert
if pkcs12_info is None:
pkcs12_info = make_pkcs12_info(config.dir, "dscert.p12",
"dirsrv_pin.txt")
# during replica install, this gets invoked before local DS is
# available, so use the remote api.
# if ca_is_configured:
# ca_subject = ca.lookup_ca_subject(_api, config.subject_base)
# else:
# ca_subject = installutils.default_ca_subject_dn(
# config.subject_base)
ca_subject = ds_ca_subject
ds = dsinstance.DsInstance(
config_ldif=options_.dirsrv_config_file)
ds.set_output(ansible_log)
# Source: ipaserver/install/dsinstance.py
# idstart and idmax are configured so that the range is seen as
# depleted by the DNA plugin and the replica will go and get a
# new range from the master.
# This way all servers use the initially defined range by default.
idstart = 1101
idmax = 1100
with redirect_stdout(ansible_log):
ds.init_info(
realm_name=config.realm_name,
fqdn=config.host_name,
domain_name=config.domain_name,
dm_password=config.dirman_password,
subject_base=config.subject_base,
ca_subject=ca_subject,
idstart=idstart,
idmax=idmax,
pkcs12_info=pkcs12_info,
ca_file=ca_file,
setup_pkinit=not options.no_pkinit,
)
ds.master_fqdn = config.master_host_name
if ca_is_configured is not None:
ds.ca_is_configured = ca_is_configured
ds.promote = promote
ds.api = remote_api
# from __setup_replica
# Always connect to ds over ldapi
ldap_uri = ipaldap.get_ldap_uri(protocol='ldapi', realm=ds.realm)
conn = ipaldap.LDAPClient(ldap_uri)
conn.external_bind()
return ds
def ansible_module_get_parsed_ip_addresses(ansible_module,
param='ip_addresses'):
ip_addrs = []
for ip in ansible_module.params.get(param):
try:
ip_parsed = ipautil.CheckedIPAddress(ip)
except Exception as e:
ansible_module.fail_json(
msg="Invalid IP Address %s: %s" % (ip, e))
ip_addrs.append(ip_parsed)
return ip_addrs
def gen_remote_api(master_host_name, etc_ipa):
ldapuri = 'ldaps://%s' % ipautil.format_netloc(master_host_name)
xmlrpc_uri = 'https://{0}/ipa/xml'.format(
ipautil.format_netloc(master_host_name))
remote_api = create_api(mode=None)
remote_api.bootstrap(in_server=True,
context='installer',
confdir=etc_ipa,
ldap_uri=ldapuri,
xmlrpc_uri=xmlrpc_uri)
remote_api.finalize()
return remote_api
def check_imports(module):
if ANSIBLE_IPA_REPLICA_MODULE_IMPORT_ERROR is not None:
module.fail_json(msg=ANSIBLE_IPA_REPLICA_MODULE_IMPORT_ERROR)