From ef700b116a1dfe0bd1f139ac0a874d734ca6bea9 Mon Sep 17 00:00:00 2001 From: Alexei Znamensky <103110+russoz@users.noreply.github.com> Date: Fri, 27 Mar 2026 09:37:16 +1300 Subject: [PATCH] nsupdate: add unit tests (#11677) * nsupdate: add unit tests Co-Authored-By: Claude Sonnet 4.6 * fix var name to regain sanity * remove unneeded typing from test file * formatting --------- Co-authored-by: Claude Sonnet 4.6 --- tests/unit/plugins/modules/test_nsupdate.py | 271 ++++++++++++++++++++ tests/unit/requirements.txt | 3 + 2 files changed, 274 insertions(+) create mode 100644 tests/unit/plugins/modules/test_nsupdate.py diff --git a/tests/unit/plugins/modules/test_nsupdate.py b/tests/unit/plugins/modules/test_nsupdate.py new file mode 100644 index 0000000000..b43e9c0eeb --- /dev/null +++ b/tests/unit/plugins/modules/test_nsupdate.py @@ -0,0 +1,271 @@ +# Copyright (c) 2026, Ansible Project +# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +import json + +import pytest + +dns = pytest.importorskip("dns") + +import dns.message +import dns.name +import dns.rcode +import dns.rdata +import dns.rdataclass +import dns.rdatatype +import dns.resolver +import dns.update +from ansible_collections.community.internal_test_tools.tests.unit.plugins.modules.utils import set_module_args + +import ansible_collections.community.general.plugins.modules.nsupdate as nsupdate_module +from ansible_collections.community.general.plugins.module_utils import deps + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def register_deps(): + """Re-register the dnspython dep after deps_cleanup clears _deps before each test.""" + with deps.declare("dnspython"): + pass + + +@pytest.fixture +def run_module(capfd): + def _run(args): + with set_module_args(args): + with pytest.raises(SystemExit): + nsupdate_module.main() + out, dummy = capfd.readouterr() + return json.loads(out) + + return _run + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +BASE_PARAMS = dict( + server="10.1.1.1", + record="ansible", + zone="example.org", + key_name="testkey", + key_secret="dGVzdA==", + key_algorithm="hmac-md5", + type="A", + value=["192.168.1.1"], + ttl=3600, + state="present", + protocol="tcp", + port=53, +) + +# Used for zone auto-detection tests — no zone, absolute record +PARAMS_NO_ZONE = {**BASE_PARAMS, "record": "ansible.example.org.", "zone": None} + + +def make_update_response(query, rcode_val=dns.rcode.NOERROR): + """Build a DNS response for an UPDATE or prerequisite check.""" + response = dns.message.make_response(query) + response.set_rcode(rcode_val) + return response + + +def make_soa_response(query, zone_name_text): + """Build a DNS response with an SOA in the authority section (zone lookup).""" + response = dns.message.make_response(query) + rrset = response.find_rrset( + dns.message.AUTHORITY, + dns.name.from_text(zone_name_text), + dns.rdataclass.IN, + dns.rdatatype.SOA, + create=True, + ) + soa = dns.rdata.from_text( + dns.rdataclass.IN, + dns.rdatatype.SOA, + "ns1.example.org. admin.example.org. 2024010101 3600 900 604800 300", + ) + rrset.add(soa, ttl=3600) + return response + + +def make_a_response(query, addresses, ttl=3600): + """Build a DNS response with A records in the answer section (TTL lookup).""" + response = dns.message.make_response(query) + name = query.question[0].name + rrset = response.find_rrset(dns.message.ANSWER, name, dns.rdataclass.IN, dns.rdatatype.A, create=True) + for addr in addresses: + rrset.add(dns.rdata.from_text(dns.rdataclass.IN, dns.rdatatype.A, addr), ttl=ttl) + return response + + +def route_query(query, update_rcodes): + """Dispatch a dns.query.tcp call to the right response builder. + + UPDATE queries consume the next rcode from update_rcodes. + SOA queries (zone lookup) get a fixed example.org. SOA response. + Other queries (A record / TTL lookup) get an A response with the base value. + """ + if isinstance(query, dns.update.Update): + return make_update_response(query, update_rcodes.pop(0)) + if query.question and query.question[0].rdtype == dns.rdatatype.SOA: + return make_soa_response(query, "example.org.") + return make_a_response(query, ["192.168.1.1"], ttl=3600) + + +# --------------------------------------------------------------------------- +# resolve_server() +# --------------------------------------------------------------------------- + + +def test_resolve_server_ipv4(mocker, run_module): + """IPv4 server is used directly without any DNS resolution.""" + rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR] + mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes)) + result = run_module(BASE_PARAMS) + assert result["changed"] is True + + +def test_resolve_server_ipv6(mocker, run_module): + """IPv6 server is used directly without any DNS resolution.""" + rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR] + mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes)) + result = run_module({**BASE_PARAMS, "server": "2001:db8::1"}) + assert result["changed"] is True + + +def test_resolve_server_fqdn(mocker, run_module): + """FQDN server is resolved to an IP address before making DNS queries.""" + MockResolver = mocker.patch("dns.resolver.Resolver") + MockResolver.return_value.resolve.side_effect = lambda name, rdatatype: ( + ["192.168.1.1"] if rdatatype == dns.rdatatype.A else (dummy for dummy in ()).throw(dns.resolver.NoAnswer()) + ) + rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR] + mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes)) + result = run_module({**BASE_PARAMS, "server": "ns1.example.org"}) + assert result["changed"] is True + + +def test_resolve_server_fqdn_unresolvable_fails(mocker, run_module): + """Module fails with a clear message when the FQDN server cannot be resolved.""" + MockResolver = mocker.patch("dns.resolver.Resolver") + MockResolver.return_value.resolve.side_effect = dns.resolver.NXDOMAIN + result = run_module({**BASE_PARAMS, "server": "nonexistent.example.org"}) + assert result["failed"] is True + assert "Failed to resolve" in result["msg"] + + +# --------------------------------------------------------------------------- +# lookup_zone() +# --------------------------------------------------------------------------- + + +def test_lookup_zone_auto_detects_zone(mocker, run_module): + """Zone is correctly auto-detected from the SOA in the authority section.""" + rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR] + mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes)) + result = run_module(PARAMS_NO_ZONE) + assert result["record"]["zone"] == "example.org." + + +def test_lookup_zone_tsig_key_attached_when_keyring_set(mocker, run_module): + """Zone lookup attaches the TSIG key so split-view DNS servers pick the right view. + + Regression test for issue #749. + """ + soa_queries = [] + rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR] + + def mock_tcp(query, *args, **kwargs): + if not isinstance(query, dns.update.Update): + soa_queries.append(query) + return route_query(query, rcodes) + + mocker.patch("dns.query.tcp", side_effect=mock_tcp) + run_module(PARAMS_NO_ZONE) + + assert soa_queries, "expected at least one SOA query for zone lookup" + assert soa_queries[0].keyring is not None, "SOA query must carry the TSIG key" + + +def test_lookup_zone_no_tsig_without_key(mocker, run_module): + """Zone lookup sends no TSIG key when no key_name is configured.""" + soa_queries = [] + rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR] + + def mock_tcp(query, *args, **kwargs): + if not isinstance(query, dns.update.Update): + soa_queries.append(query) + return route_query(query, rcodes) + + mocker.patch("dns.query.tcp", side_effect=mock_tcp) + run_module({**PARAMS_NO_ZONE, "key_name": None, "key_secret": None}) + + assert soa_queries, "expected at least one SOA query for zone lookup" + assert soa_queries[0].keyring is None, "SOA query must not carry a TSIG key" + + +# --------------------------------------------------------------------------- +# state=present +# --------------------------------------------------------------------------- + + +def test_creates_record_when_absent(mocker, run_module): + """Record is created and changed=True when it does not exist.""" + rcodes = [dns.rcode.NXDOMAIN, dns.rcode.NOERROR] + mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes)) + result = run_module(BASE_PARAMS) + assert result["changed"] is True + + +def test_no_change_when_record_matches(mocker, run_module): + """No change when the record already exists with the correct value and TTL.""" + mocker.patch( + "dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, [dns.rcode.NOERROR, dns.rcode.NOERROR]) + ) + result = run_module(BASE_PARAMS) + assert result["changed"] is False + + +# --------------------------------------------------------------------------- +# state=absent +# --------------------------------------------------------------------------- + + +def test_deletes_record_when_present(mocker, run_module): + """Record is deleted and changed=True when it exists.""" + rcodes = [dns.rcode.NOERROR, dns.rcode.NOERROR] + mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes)) + result = run_module({**BASE_PARAMS, "state": "absent"}) + assert result["changed"] is True + + +def test_no_change_when_record_already_absent(mocker, run_module): + """No change when the record does not exist and state=absent.""" + rcodes = [dns.rcode.NXDOMAIN] + mocker.patch("dns.query.tcp", side_effect=lambda q, *a, **kw: route_query(q, rcodes)) + result = run_module({**BASE_PARAMS, "state": "absent"}) + assert result["changed"] is False + + +# --------------------------------------------------------------------------- +# check_mode +# --------------------------------------------------------------------------- + + +def test_check_mode_reports_changed_without_updating(mocker, run_module): + """Check mode returns changed=True but does not send the actual DNS update.""" + tcp_mock = mocker.patch( + "dns.query.tcp", + return_value=make_update_response(dns.update.Update("example.org."), dns.rcode.NXDOMAIN), + ) + result = run_module({**BASE_PARAMS, "_ansible_check_mode": True}) + assert result["changed"] is True + assert tcp_mock.call_count == 1, "check mode must not send the update" diff --git a/tests/unit/requirements.txt b/tests/unit/requirements.txt index 3ba8c5fac3..8734c4c0c6 100644 --- a/tests/unit/requirements.txt +++ b/tests/unit/requirements.txt @@ -30,6 +30,9 @@ datadog-api-client >= 1.0.0b3 ; python_version >= '3.6' # requirement for dnsimple module dnsimple >= 2 ; python_version >= '3.6' +# requirement for the nsupdate module +dnspython + # requirement for the opentelemetry callback plugin # WARNING: these libraries rely on Protobuf for Python, which regularly stops installing. # That's why they are disabled for now.