mirror of
https://github.com/ansible-collections/community.general.git
synced 2026-03-26 21:33:12 +00:00
* nsupdate: add unit tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix var name to regain sanity * remove unneeded typing from test file * formatting --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
272 lines
10 KiB
Python
272 lines
10 KiB
Python
# Copyright (c) 2026, Ansible Project
|
|
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import pytest
|
|
|
|
dns = pytest.importorskip("dns")
|
|
|
|
import dns.message
|
|
import dns.name
|
|
import dns.rcode
|
|
import dns.rdata
|
|
import dns.rdataclass
|
|
import dns.rdatatype
|
|
import dns.resolver
|
|
import dns.update
|
|
from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import set_module_args
|
|
|
|
import ansible_collections.community.general.plugins.modules.nsupdate as nsupdate_module
|
|
from ansible_collections.community.general.plugins.module_utils import deps
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def register_deps():
|
|
"""Re-register the dnspython dep after deps_cleanup clears _deps before each test."""
|
|
with deps.declare("dnspython"):
|
|
pass
|
|
|
|
|
|
@pytest.fixture
|
|
def run_module(capfd):
|
|
def _run(args):
|
|
with set_module_args(args):
|
|
with pytest.raises(SystemExit):
|
|
nsupdate_module.main()
|
|
out, dummy = capfd.readouterr()
|
|
return json.loads(out)
|
|
|
|
return _run
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
BASE_PARAMS = dict(
|
|
server="10.1.1.1",
|
|
record="ansible",
|
|
zone="example.org",
|
|
key_name="testkey",
|
|
key_secret="dGVzdA==",
|
|
key_algorithm="hmac-md5",
|
|
type="A",
|
|
value=["192.168.1.1"],
|
|
ttl=3600,
|
|
state="present",
|
|
protocol="tcp",
|
|
port=53,
|
|
)
|
|
|
|
# Used for zone auto-detection tests — no zone, absolute record
|
|
PARAMS_NO_ZONE = {**BASE_PARAMS, "record": "ansible.example.org.", "zone": None}
|
|
|
|
|
|
def make_update_response(query, rcode_val=dns.rcode.NOERROR):
|
|
"""Build a DNS response for an UPDATE or prerequisite check."""
|
|
response = dns.message.make_response(query)
|
|
response.set_rcode(rcode_val)
|
|
return response
|
|
|
|
|
|
def make_soa_response(query, zone_name_text):
|
|
"""Build a DNS response with an SOA in the authority section (zone lookup)."""
|
|
response = dns.message.make_response(query)
|
|
rrset = response.find_rrset(
|
|
dns.message.AUTHORITY,
|
|
dns.name.from_text(zone_name_text),
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.SOA,
|
|
create=True,
|
|
)
|
|
soa = dns.rdata.from_text(
|
|
dns.rdataclass.IN,
|
|
dns.rdatatype.SOA,
|
|
"ns1.example.org. admin.example.org. 2024010101 3600 900 604800 300",
|
|
)
|
|
rrset.add(soa, ttl=3600)
|
|
return response
|
|
|
|
|
|
def make_a_response(query, addresses, ttl=3600):
|
|
"""Build a DNS response with A records in the answer section (TTL lookup)."""
|
|
response = dns.message.make_response(query)
|
|
name = query.question[0].name
|
|
rrset = response.find_rrset(dns.message.ANSWER, name, dns.rdataclass.IN, dns.rdatatype.A, create=True)
|
|
for addr in addresses:
|
|
rrset.add(dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.A, addr), ttl=ttl)
|
|
return response
|
|
|
|
|
|
def route_query(query, update_rcodes):
|
|
"""Dispatch a dns.query.tcp call to the right response builder.
|
|
|
|
UPDATE queries consume the next rcode from update_rcodes.
|
|
SOA queries (zone lookup) get a fixed example.org. SOA response.
|
|
Other queries (A record / TTL lookup) get an A response with the base value.
|
|
"""
|
|
if isinstance(query, dns.update.Update):
|
|
return make_update_response(query, update_rcodes.pop(0))
|
|
if query.question and query.question[0].rdtype == dns.rdatatype.SOA:
|
|
return make_soa_response(query, "example.org.")
|
|
return make_a_response(query, ["192.168.1.1"], ttl=3600)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# resolve_server()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_resolve_server_ipv4(mocker, run_module):
|
|
"""IPv4 server is used directly without any DNS resolution."""
|
|
rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR]
|
|
mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes))
|
|
result = run_module(BASE_PARAMS)
|
|
assert result["changed"] is True
|
|
|
|
|
|
def test_resolve_server_ipv6(mocker, run_module):
|
|
"""IPv6 server is used directly without any DNS resolution."""
|
|
rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR]
|
|
mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes))
|
|
result = run_module({**BASE_PARAMS, "server": "2001:db8::1"})
|
|
assert result["changed"] is True
|
|
|
|
|
|
def test_resolve_server_fqdn(mocker, run_module):
|
|
"""FQDN server is resolved to an IP address before making DNS queries."""
|
|
MockResolver = mocker.patch("dns.resolver.Resolver")
|
|
MockResolver.return_value.resolve.side_effect = lambda name, rdatatype: (
|
|
["192.168.1.1"] if rdatatype == dns.rdatatype.A else (dummy for dummy in ()).throw(dns.resolver.NoAnswer())
|
|
)
|
|
rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR]
|
|
mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes))
|
|
result = run_module({**BASE_PARAMS, "server": "ns1.example.org"})
|
|
assert result["changed"] is True
|
|
|
|
|
|
def test_resolve_server_fqdn_unresolvable_fails(mocker, run_module):
|
|
"""Module fails with a clear message when the FQDN server cannot be resolved."""
|
|
MockResolver = mocker.patch("dns.resolver.Resolver")
|
|
MockResolver.return_value.resolve.side_effect = dns.resolver.NXDOMAIN
|
|
result = run_module({**BASE_PARAMS, "server": "nonexistent.example.org"})
|
|
assert result["failed"] is True
|
|
assert "Failed to resolve" in result["msg"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# lookup_zone()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_lookup_zone_auto_detects_zone(mocker, run_module):
|
|
"""Zone is correctly auto-detected from the SOA in the authority section."""
|
|
rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR]
|
|
mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes))
|
|
result = run_module(PARAMS_NO_ZONE)
|
|
assert result["record"]["zone"] == "example.org."
|
|
|
|
|
|
def test_lookup_zone_tsig_key_attached_when_keyring_set(mocker, run_module):
|
|
"""Zone lookup attaches the TSIG key so split-view DNS servers pick the right view.
|
|
|
|
Regression test for issue #749.
|
|
"""
|
|
soa_queries = []
|
|
rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR]
|
|
|
|
def mock_tcp(query, *args, **kwargs):
|
|
if not isinstance(query, dns.update.Update):
|
|
soa_queries.append(query)
|
|
return route_query(query, rcodes)
|
|
|
|
mocker.patch("dns.query.tcp", side_effect=mock_tcp)
|
|
run_module(PARAMS_NO_ZONE)
|
|
|
|
assert soa_queries, "expected at least one SOA query for zone lookup"
|
|
assert soa_queries[0].keyring is not None, "SOA query must carry the TSIG key"
|
|
|
|
|
|
def test_lookup_zone_no_tsig_without_key(mocker, run_module):
|
|
"""Zone lookup sends no TSIG key when no key_name is configured."""
|
|
soa_queries = []
|
|
rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR]
|
|
|
|
def mock_tcp(query, *args, **kwargs):
|
|
if not isinstance(query, dns.update.Update):
|
|
soa_queries.append(query)
|
|
return route_query(query, rcodes)
|
|
|
|
mocker.patch("dns.query.tcp", side_effect=mock_tcp)
|
|
run_module({**PARAMS_NO_ZONE, "key_name": None, "key_secret": None})
|
|
|
|
assert soa_queries, "expected at least one SOA query for zone lookup"
|
|
assert soa_queries[0].keyring is None, "SOA query must not carry a TSIG key"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# state=present
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_creates_record_when_absent(mocker, run_module):
|
|
"""Record is created and changed=True when it does not exist."""
|
|
rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR]
|
|
mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes))
|
|
result = run_module(BASE_PARAMS)
|
|
assert result["changed"] is True
|
|
|
|
|
|
def test_no_change_when_record_matches(mocker, run_module):
|
|
"""No change when the record already exists with the correct value and TTL."""
|
|
mocker.patch(
|
|
"dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, [dns.rcode.NOERROR, dns.rcode.NOERROR])
|
|
)
|
|
result = run_module(BASE_PARAMS)
|
|
assert result["changed"] is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# state=absent
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_deletes_record_when_present(mocker, run_module):
|
|
"""Record is deleted and changed=True when it exists."""
|
|
rcodes = [dns.rcode.NOERROR, dns.rcode.NOERROR]
|
|
mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes))
|
|
result = run_module({**BASE_PARAMS, "state": "absent"})
|
|
assert result["changed"] is True
|
|
|
|
|
|
def test_no_change_when_record_already_absent(mocker, run_module):
|
|
"""No change when the record does not exist and state=absent."""
|
|
rcodes = [dns.rcode.NXDOMAIN]
|
|
mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes))
|
|
result = run_module({**BASE_PARAMS, "state": "absent"})
|
|
assert result["changed"] is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# check_mode
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_check_mode_reports_changed_without_updating(mocker, run_module):
|
|
"""Check mode returns changed=True but does not send the actual DNS update."""
|
|
tcp_mock = mocker.patch(
|
|
"dns.query.tcp",
|
|
return_value=make_update_response(dns.update.Update("example.org."), dns.rcode.NXDOMAIN),
|
|
)
|
|
result = run_module({**BASE_PARAMS, "_ansible_check_mode": True})
|
|
assert result["changed"] is True
|
|
assert tcp_mock.call_count == 1, "check mode must not send the update"
|