mirror of
https://github.com/freeipa/ansible-freeipa.git
synced 2026-05-07 13:53:23 +00:00
ansible_ipa_server: Fix ansible-test fake execution test findings
All imports that are only available after installing IPA need to be in a try exception clause to be able to pass the fake execution test. The old workaround "if 'ansible.executor' in sys.modules:" is not working with this test anymore. If the imports can not be done, all used and needed attributes are defines with the value None. The new function check_imports has been added to fail with module.fail_json if an import exception occured and ANSIBLE_IPA_SERVER_MODULE_IMPORT_ERROR is not None. This function needs to be called in all modules. The `copyright` date is extended with `-2022`.
This commit is contained in:
@@ -3,9 +3,9 @@
|
||||
# Authors:
|
||||
# Thomas Woerner <twoerner@redhat.com>
|
||||
#
|
||||
# Based on ipa-client-install code
|
||||
# Based on ipa-server-install code
|
||||
#
|
||||
# Copyright (C) 2017 Red Hat
|
||||
# Copyright (C) 2017-2022 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
@@ -23,12 +23,12 @@
|
||||
|
||||
from __future__ import (absolute_import, division, print_function)
|
||||
|
||||
__metaclass__ = type
|
||||
__metaclass__ = type # pylint: disable=invalid-name
|
||||
|
||||
__all__ = ["IPAChangeConf", "certmonger", "sysrestore", "root_logger",
|
||||
"ipa_generate_password", "run", "ScriptError", "services",
|
||||
"tasks", "errors", "x509", "DOMAIN_LEVEL_0", "MIN_DOMAIN_LEVEL",
|
||||
"validate_domain_name",
|
||||
"MAX_DOMAIN_LEVEL", "validate_domain_name",
|
||||
"no_matching_interface_for_ip_address_warning",
|
||||
"check_zone_overlap", "timeconf", "ntpinstance", "adtrust",
|
||||
"bindinstance", "ca", "dns", "httpinstance", "installutils",
|
||||
@@ -41,45 +41,42 @@ __all__ = ["IPAChangeConf", "certmonger", "sysrestore", "root_logger",
|
||||
"adtrustinstance", "IPAAPI_USER", "sync_time", "PKIIniLoader",
|
||||
"default_subject_base", "default_ca_subject_dn",
|
||||
"check_ldap_conf", "encode_certificate", "decode_certificate",
|
||||
"check_available_memory", "getargspec", "get_min_idstart"]
|
||||
"check_available_memory", "getargspec", "get_min_idstart",
|
||||
"paths", "api", "ipautil", "adtrust_imported", "NUM_VERSION",
|
||||
"time_service", "kra_imported", "dsinstance", "IPA_PYTHON_VERSION",
|
||||
"NUM_VERSION"]
|
||||
|
||||
import sys
|
||||
import logging
|
||||
|
||||
# HACK: workaround for Ansible 2.9
|
||||
# https://github.com/ansible/ansible/issues/68361
|
||||
if 'ansible.executor' in sys.modules:
|
||||
for attr in __all__:
|
||||
setattr(sys.modules[__name__], attr, None)
|
||||
# Import getargspec from inspect or provide own getargspec for
|
||||
# Python 2 compatibility with Python 3.11+.
|
||||
try:
|
||||
from inspect import getargspec
|
||||
except ImportError:
|
||||
from collections import namedtuple
|
||||
from inspect import getfullargspec
|
||||
|
||||
else:
|
||||
# The code is copied from Python 3.10 inspect.py
|
||||
# Authors: Ka-Ping Yee <ping@lfw.org>
|
||||
# Yury Selivanov <yselivanov@sprymix.com>
|
||||
ArgSpec = namedtuple('ArgSpec', 'args varargs keywords defaults')
|
||||
|
||||
import logging
|
||||
def getargspec(func):
|
||||
args, varargs, varkw, defaults, kwonlyargs, _kwonlydefaults, \
|
||||
ann = getfullargspec(func)
|
||||
if kwonlyargs or ann:
|
||||
raise ValueError(
|
||||
"Function has keyword-only parameters or annotations"
|
||||
", use inspect.signature() API which can support them")
|
||||
return ArgSpec(args, varargs, varkw, defaults)
|
||||
|
||||
|
||||
try:
|
||||
from contextlib import contextmanager as contextlib_contextmanager
|
||||
from ansible.module_utils import six
|
||||
import base64
|
||||
|
||||
# Import getargspec from inspect or provide own getargspec for
|
||||
# Python 2 compatibility with Python 3.11+.
|
||||
try:
|
||||
from inspect import getargspec
|
||||
except ImportError:
|
||||
from collections import namedtuple
|
||||
from inspect import getfullargspec
|
||||
|
||||
# The code is copied from Python 3.10 inspect.py
|
||||
# Authors: Ka-Ping Yee <ping@lfw.org>
|
||||
# Yury Selivanov <yselivanov@sprymix.com>
|
||||
ArgSpec = namedtuple('ArgSpec', 'args varargs keywords defaults')
|
||||
|
||||
def getargspec(func):
|
||||
args, varargs, varkw, defaults, kwonlyargs, _kwonlydefaults, \
|
||||
ann = getfullargspec(func)
|
||||
if kwonlyargs or ann:
|
||||
raise ValueError(
|
||||
"Function has keyword-only parameters or annotations"
|
||||
", use inspect.signature() API which can support them")
|
||||
return ArgSpec(args, varargs, varkw, defaults)
|
||||
|
||||
from ipapython.version import NUM_VERSION, VERSION
|
||||
|
||||
if NUM_VERSION < 30201:
|
||||
@@ -211,223 +208,249 @@ else:
|
||||
|
||||
raise Exception("freeipa version '%s' is too old" % VERSION)
|
||||
|
||||
logger = logging.getLogger("ipa-server-install")
|
||||
except ImportError as _err:
|
||||
ANSIBLE_IPA_SERVER_MODULE_IMPORT_ERROR = str(_err)
|
||||
|
||||
def setup_logging():
|
||||
# logger.setLevel(logging.DEBUG)
|
||||
standard_logging_setup(
|
||||
paths.IPASERVER_INSTALL_LOG, verbose=False, debug=False,
|
||||
filemode='a', console_format='%(message)s')
|
||||
for attr in __all__:
|
||||
setattr(sys.modules[__name__], attr, None)
|
||||
else:
|
||||
ANSIBLE_IPA_SERVER_MODULE_IMPORT_ERROR = None
|
||||
|
||||
@contextlib_contextmanager
|
||||
def redirect_stdout(stream):
|
||||
sys.stdout = stream
|
||||
|
||||
logger = logging.getLogger("ipa-server-install")
|
||||
|
||||
|
||||
def setup_logging():
|
||||
# logger.setLevel(logging.DEBUG)
|
||||
standard_logging_setup(
|
||||
paths.IPASERVER_INSTALL_LOG, verbose=False, debug=False,
|
||||
filemode='a', console_format='%(message)s')
|
||||
|
||||
|
||||
@contextlib_contextmanager
|
||||
def redirect_stdout(stream):
|
||||
sys.stdout = stream
|
||||
try:
|
||||
yield stream
|
||||
finally:
|
||||
sys.stdout = sys.__stdout__
|
||||
|
||||
|
||||
class AnsibleModuleLog():
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
_ansible_module_log = self
|
||||
|
||||
class AnsibleLoggingHandler(logging.Handler):
|
||||
def emit(self, record):
|
||||
_ansible_module_log.write(self.format(record))
|
||||
|
||||
self.logging_handler = AnsibleLoggingHandler()
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.root.addHandler(self.logging_handler)
|
||||
|
||||
def close(self):
|
||||
self.flush()
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
def log(self, msg):
|
||||
# self.write(msg+"\n")
|
||||
self.write(msg)
|
||||
|
||||
def debug(self, msg):
|
||||
self.module.debug(msg)
|
||||
|
||||
def info(self, msg):
|
||||
self.module.debug(msg)
|
||||
|
||||
@staticmethod
|
||||
def isatty():
|
||||
return False
|
||||
|
||||
def write(self, msg):
|
||||
self.module.debug(msg)
|
||||
# self.module.warn(msg)
|
||||
|
||||
|
||||
# pylint: disable=too-few-public-methods, useless-object-inheritance
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
class options_obj(object): # pylint: disable=invalid-name
|
||||
def __init__(self):
|
||||
self._replica_install = False
|
||||
self.dnssec_master = False # future unknown
|
||||
self.disable_dnssec_master = False # future unknown
|
||||
self.domainlevel = MAX_DOMAIN_LEVEL # deprecated
|
||||
self.domain_level = self.domainlevel # deprecated
|
||||
self.interactive = False
|
||||
self.unattended = not self.interactive
|
||||
|
||||
# def __getattribute__(self, attr):
|
||||
# logger.info(" <-- Accessing options.%s" % attr)
|
||||
# return super(options_obj, self).__getattribute__(attr)
|
||||
|
||||
# def __getattr__(self, attr):
|
||||
# logger.info(" --> Adding missing options.%s" % attr)
|
||||
# setattr(self, attr, None)
|
||||
# return getattr(self, attr)
|
||||
|
||||
def knobs(self):
|
||||
for name in self.__dict__:
|
||||
yield self, name
|
||||
|
||||
|
||||
# pylint: enable=too-few-public-methods, useless-object-inheritance
|
||||
|
||||
|
||||
# pylint: enable=too-many-instance-attributes
|
||||
options = options_obj()
|
||||
installer = options
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
|
||||
# ServerMasterInstall
|
||||
options.add_sids = True
|
||||
options.add_agents = False
|
||||
|
||||
# Installable
|
||||
options.uninstalling = False
|
||||
|
||||
# ServerInstallInterface
|
||||
options.description = "Server"
|
||||
|
||||
options.kinit_attempts = 1
|
||||
options.fixed_primary = True
|
||||
options.permit = False
|
||||
options.enable_dns_updates = False
|
||||
options.no_krb5_offline_passwords = False
|
||||
options.preserve_sssd = False
|
||||
options.no_sssd = False
|
||||
|
||||
# ServerMasterInstall
|
||||
options.force_join = False
|
||||
options.servers = None
|
||||
options.no_wait_for_dns = True
|
||||
options.host_password = None
|
||||
options.keytab = None
|
||||
options.setup_ca = True
|
||||
# always run sidgen task and do not allow adding agents on first master
|
||||
options.add_sids = True
|
||||
options.add_agents = False
|
||||
|
||||
# ADTrustInstallInterface
|
||||
# no_msdcs is deprecated
|
||||
options.no_msdcs = False
|
||||
|
||||
# For pylint
|
||||
options.external_cert_files = None
|
||||
options.dirsrv_cert_files = None
|
||||
|
||||
# Uninstall
|
||||
options.ignore_topology_disconnect = False
|
||||
options.ignore_last_of_role = False
|
||||
|
||||
# pylint: enable=attribute-defined-outside-init
|
||||
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def api_Backend_ldap2(host_name, setup_ca, connect=False):
|
||||
# we are sure we have the configuration file ready.
|
||||
cfg = dict(context='installer', confdir=paths.ETC_IPA, in_server=True,
|
||||
host=host_name)
|
||||
if setup_ca:
|
||||
# we have an IPA-integrated CA
|
||||
cfg['ca_host'] = host_name
|
||||
|
||||
api.bootstrap(**cfg)
|
||||
api.finalize()
|
||||
if connect:
|
||||
api.Backend.ldap2.connect()
|
||||
|
||||
|
||||
# pylint: enable=invalid-name
|
||||
|
||||
|
||||
def ds_init_info(ansible_log, fstore, domainlevel, dirsrv_config_file,
|
||||
realm_name, host_name, domain_name, dm_password,
|
||||
idstart, idmax, subject_base, ca_subject,
|
||||
_no_hbac_allow, dirsrv_pkcs12_info, no_pkinit):
|
||||
|
||||
if not options.external_cert_files:
|
||||
_ds = dsinstance.DsInstance(fstore=fstore, domainlevel=domainlevel,
|
||||
config_ldif=dirsrv_config_file)
|
||||
_ds.set_output(ansible_log)
|
||||
|
||||
if options.dirsrv_cert_files:
|
||||
_dirsrv_pkcs12_info = dirsrv_pkcs12_info
|
||||
else:
|
||||
_dirsrv_pkcs12_info = None
|
||||
|
||||
with redirect_stdout(ansible_log):
|
||||
_ds.init_info(realm_name, host_name, domain_name, dm_password,
|
||||
subject_base, ca_subject, idstart, idmax,
|
||||
# hbac_allow=not no_hbac_allow,
|
||||
_dirsrv_pkcs12_info, setup_pkinit=not no_pkinit)
|
||||
else:
|
||||
_ds = dsinstance.DsInstance(fstore=fstore, domainlevel=domainlevel)
|
||||
_ds.set_output(ansible_log)
|
||||
|
||||
with redirect_stdout(ansible_log):
|
||||
_ds.init_info(realm_name, host_name, domain_name, dm_password,
|
||||
subject_base, ca_subject, 1101, 1100, None,
|
||||
setup_pkinit=not no_pkinit)
|
||||
|
||||
return _ds
|
||||
|
||||
|
||||
def ansible_module_get_parsed_ip_addresses(ansible_module,
|
||||
param='ip_addresses'):
|
||||
ip_addrs = []
|
||||
for _ip in ansible_module.params.get(param):
|
||||
try:
|
||||
yield stream
|
||||
finally:
|
||||
sys.stdout = sys.__stdout__
|
||||
ip_parsed = ipautil.CheckedIPAddress(_ip)
|
||||
except Exception as err:
|
||||
ansible_module.fail_json(
|
||||
msg="Invalid IP Address %s: %s" % (_ip, err))
|
||||
ip_addrs.append(ip_parsed)
|
||||
return ip_addrs
|
||||
|
||||
class AnsibleModuleLog():
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
_ansible_module_log = self
|
||||
|
||||
class AnsibleLoggingHandler(logging.Handler):
|
||||
def emit(self, record):
|
||||
_ansible_module_log.write(self.format(record))
|
||||
def encode_certificate(cert):
|
||||
"""
|
||||
Encode a certificate using base64.
|
||||
|
||||
self.logging_handler = AnsibleLoggingHandler()
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.root.addHandler(self.logging_handler)
|
||||
It also takes FreeIPA and Python versions into account.
|
||||
"""
|
||||
if isinstance(cert, (str, bytes)):
|
||||
encoded = base64.b64encode(cert)
|
||||
else:
|
||||
encoded = base64.b64encode(cert.public_bytes(Encoding.DER))
|
||||
if not six.PY2:
|
||||
encoded = encoded.decode('ascii')
|
||||
return encoded
|
||||
|
||||
def close(self):
|
||||
self.flush()
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
def decode_certificate(cert):
|
||||
"""
|
||||
Decode a certificate using base64.
|
||||
|
||||
def log(self, msg):
|
||||
# self.write(msg+"\n")
|
||||
self.write(msg)
|
||||
It also takes FreeIPA versions into account and returns a
|
||||
IPACertificate for newer IPA versions.
|
||||
"""
|
||||
if hasattr(x509, "IPACertificate"):
|
||||
cert = cert.strip()
|
||||
if not cert.startswith("-----BEGIN CERTIFICATE-----"):
|
||||
cert = "-----BEGIN CERTIFICATE-----\n" + cert
|
||||
if not cert.endswith("-----END CERTIFICATE-----"):
|
||||
cert += "\n-----END CERTIFICATE-----"
|
||||
|
||||
def debug(self, msg):
|
||||
self.module.debug(msg)
|
||||
cert = certificate_loader(cert.encode('utf-8'))
|
||||
else:
|
||||
cert = base64.b64decode(cert)
|
||||
return cert
|
||||
|
||||
def info(self, msg):
|
||||
self.module.debug(msg)
|
||||
|
||||
@staticmethod
|
||||
def isatty():
|
||||
return False
|
||||
|
||||
def write(self, msg):
|
||||
self.module.debug(msg)
|
||||
# self.module.warn(msg)
|
||||
|
||||
# pylint: disable=too-few-public-methods, useless-object-inheritance
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
class options_obj(object): # pylint: disable=invalid-name
|
||||
def __init__(self):
|
||||
self._replica_install = False
|
||||
self.dnssec_master = False # future unknown
|
||||
self.disable_dnssec_master = False # future unknown
|
||||
self.domainlevel = MAX_DOMAIN_LEVEL # deprecated
|
||||
self.domain_level = self.domainlevel # deprecated
|
||||
self.interactive = False
|
||||
self.unattended = not self.interactive
|
||||
|
||||
# def __getattribute__(self, attr):
|
||||
# logger.info(" <-- Accessing options.%s" % attr)
|
||||
# return super(options_obj, self).__getattribute__(attr)
|
||||
|
||||
# def __getattr__(self, attr):
|
||||
# logger.info(" --> Adding missing options.%s" % attr)
|
||||
# setattr(self, attr, None)
|
||||
# return getattr(self, attr)
|
||||
|
||||
def knobs(self):
|
||||
for name in self.__dict__:
|
||||
yield self, name
|
||||
|
||||
# pylint: enable=too-few-public-methods, useless-object-inheritance
|
||||
|
||||
# pylint: enable=too-many-instance-attributes
|
||||
options = options_obj()
|
||||
installer = options
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
|
||||
# ServerMasterInstall
|
||||
options.add_sids = True
|
||||
options.add_agents = False
|
||||
|
||||
# Installable
|
||||
options.uninstalling = False
|
||||
|
||||
# ServerInstallInterface
|
||||
options.description = "Server"
|
||||
|
||||
options.kinit_attempts = 1
|
||||
options.fixed_primary = True
|
||||
options.permit = False
|
||||
options.enable_dns_updates = False
|
||||
options.no_krb5_offline_passwords = False
|
||||
options.preserve_sssd = False
|
||||
options.no_sssd = False
|
||||
|
||||
# ServerMasterInstall
|
||||
options.force_join = False
|
||||
options.servers = None
|
||||
options.no_wait_for_dns = True
|
||||
options.host_password = None
|
||||
options.keytab = None
|
||||
options.setup_ca = True
|
||||
# always run sidgen task and do not allow adding agents on first master
|
||||
options.add_sids = True
|
||||
options.add_agents = False
|
||||
|
||||
# ADTrustInstallInterface
|
||||
# no_msdcs is deprecated
|
||||
options.no_msdcs = False
|
||||
|
||||
# For pylint
|
||||
options.external_cert_files = None
|
||||
options.dirsrv_cert_files = None
|
||||
|
||||
# Uninstall
|
||||
options.ignore_topology_disconnect = False
|
||||
options.ignore_last_of_role = False
|
||||
|
||||
# pylint: enable=attribute-defined-outside-init
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def api_Backend_ldap2(host_name, setup_ca, connect=False):
|
||||
# we are sure we have the configuration file ready.
|
||||
cfg = dict(context='installer', confdir=paths.ETC_IPA, in_server=True,
|
||||
host=host_name)
|
||||
if setup_ca:
|
||||
# we have an IPA-integrated CA
|
||||
cfg['ca_host'] = host_name
|
||||
|
||||
api.bootstrap(**cfg)
|
||||
api.finalize()
|
||||
if connect:
|
||||
api.Backend.ldap2.connect()
|
||||
|
||||
# pylint: enable=invalid-name
|
||||
|
||||
def ds_init_info(ansible_log, fstore, domainlevel, dirsrv_config_file,
|
||||
realm_name, host_name, domain_name, dm_password,
|
||||
idstart, idmax, subject_base, ca_subject,
|
||||
_no_hbac_allow, dirsrv_pkcs12_info, no_pkinit):
|
||||
|
||||
if not options.external_cert_files:
|
||||
_ds = dsinstance.DsInstance(fstore=fstore, domainlevel=domainlevel,
|
||||
config_ldif=dirsrv_config_file)
|
||||
_ds.set_output(ansible_log)
|
||||
|
||||
if options.dirsrv_cert_files:
|
||||
_dirsrv_pkcs12_info = dirsrv_pkcs12_info
|
||||
else:
|
||||
_dirsrv_pkcs12_info = None
|
||||
|
||||
with redirect_stdout(ansible_log):
|
||||
_ds.init_info(realm_name, host_name, domain_name, dm_password,
|
||||
subject_base, ca_subject, idstart, idmax,
|
||||
# hbac_allow=not no_hbac_allow,
|
||||
_dirsrv_pkcs12_info, setup_pkinit=not no_pkinit)
|
||||
else:
|
||||
_ds = dsinstance.DsInstance(fstore=fstore, domainlevel=domainlevel)
|
||||
_ds.set_output(ansible_log)
|
||||
|
||||
with redirect_stdout(ansible_log):
|
||||
_ds.init_info(realm_name, host_name, domain_name, dm_password,
|
||||
subject_base, ca_subject, 1101, 1100, None,
|
||||
setup_pkinit=not no_pkinit)
|
||||
|
||||
return _ds
|
||||
|
||||
def ansible_module_get_parsed_ip_addresses(ansible_module,
|
||||
param='ip_addresses'):
|
||||
ip_addrs = []
|
||||
for _ip in ansible_module.params.get(param):
|
||||
try:
|
||||
ip_parsed = ipautil.CheckedIPAddress(_ip)
|
||||
except Exception as err:
|
||||
ansible_module.fail_json(
|
||||
msg="Invalid IP Address %s: %s" % (_ip, err))
|
||||
ip_addrs.append(ip_parsed)
|
||||
return ip_addrs
|
||||
|
||||
def encode_certificate(cert):
|
||||
"""
|
||||
Encode a certificate using base64.
|
||||
|
||||
It also takes FreeIPA and Python versions into account.
|
||||
"""
|
||||
if isinstance(cert, (str, bytes)):
|
||||
encoded = base64.b64encode(cert)
|
||||
else:
|
||||
encoded = base64.b64encode(cert.public_bytes(Encoding.DER))
|
||||
if not six.PY2:
|
||||
encoded = encoded.decode('ascii')
|
||||
return encoded
|
||||
|
||||
def decode_certificate(cert):
|
||||
"""
|
||||
Decode a certificate using base64.
|
||||
|
||||
It also takes FreeIPA versions into account and returns a
|
||||
IPACertificate for newer IPA versions.
|
||||
"""
|
||||
if hasattr(x509, "IPACertificate"):
|
||||
cert = cert.strip()
|
||||
if not cert.startswith("-----BEGIN CERTIFICATE-----"):
|
||||
cert = "-----BEGIN CERTIFICATE-----\n" + cert
|
||||
if not cert.endswith("-----END CERTIFICATE-----"):
|
||||
cert += "\n-----END CERTIFICATE-----"
|
||||
|
||||
cert = certificate_loader(cert.encode('utf-8'))
|
||||
else:
|
||||
cert = base64.b64decode(cert)
|
||||
return cert
|
||||
def check_imports(module):
|
||||
if ANSIBLE_IPA_SERVER_MODULE_IMPORT_ERROR is not None:
|
||||
module.fail_json(msg=ANSIBLE_IPA_SERVER_MODULE_IMPORT_ERROR)
|
||||
|
||||
Reference in New Issue
Block a user