ACME exception fixes (#217)

* Fix wrong usages of ACMEProtocolException.

* Add changelog fragment.

* Fix error handling when content could not be decoded.

* Make sure that content_json is a dict or None.

* Improve acme_inspect's ACMEProtocolException handling.

* Improve error handling.

* Add tests.

* Fix challenge error.

* Add challenges tests.

* Provide content if available.

* Add some order tests.

* Linting.
This commit is contained in:
Felix Fontein
2021-04-11 14:44:44 +02:00
committed by GitHub
parent 7b1d4770e9
commit 0e1f0fd730
12 changed files with 778 additions and 47 deletions

View File

@@ -6,6 +6,14 @@ import base64
import datetime
import os
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CryptoBackend,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException,
)
def load_fixture(name):
with open(os.path.join(os.path.dirname(__file__), 'fixtures', name)) as f:
@@ -74,3 +82,23 @@ TEST_CERT_DAYS = [
(datetime.datetime(2018, 11, 25, 15, 20, 0), 1),
(datetime.datetime(2018, 11, 25, 15, 30, 0), 0),
]
class FakeBackend(CryptoBackend):
def parse_key(self, key_file=None, key_content=None, passphrase=None):
raise BackendException('Not implemented in fake backend')
def sign(self, payload64, protected64, key_data):
raise BackendException('Not implemented in fake backend')
def create_mac_key(self, alg, key):
raise BackendException('Not implemented in fake backend')
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
raise BackendException('Not implemented in fake backend')
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
raise BackendException('Not implemented in fake backend')
def create_chain_matcher(self, criterium):
raise BackendException('Not implemented in fake backend')

View File

@@ -0,0 +1,248 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from mock import MagicMock
from ansible_collections.community.crypto.plugins.module_utils.acme.challenges import (
combine_identifier,
split_identifier,
Challenge,
Authorization,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ACMEProtocolException,
ModuleFailException,
)
def test_combine_identifier():
assert combine_identifier('', '') == ':'
assert combine_identifier('a', 'b') == 'a:b'
def test_split_identifier():
assert split_identifier(':') == ['', '']
assert split_identifier('a:b') == ['a', 'b']
assert split_identifier('a:b:c') == ['a', 'b:c']
with pytest.raises(ModuleFailException) as exc:
split_identifier('a')
assert exc.value.msg == 'Identifier "a" is not of the form <type>:<identifier>'
def test_challenge_from_to_json():
client = MagicMock()
data = {
'url': 'xxx',
'type': 'type',
'status': 'valid',
}
client.version = 2
challenge = Challenge.from_json(client, data)
assert challenge.data == data
assert challenge.type == 'type'
assert challenge.url == 'xxx'
assert challenge.status == 'valid'
assert challenge.token is None
assert challenge.to_json() == data
data = {
'type': 'type',
'status': 'valid',
'token': 'foo',
}
challenge = Challenge.from_json(None, data, url='xxx')
assert challenge.data == data
assert challenge.type == 'type'
assert challenge.url == 'xxx'
assert challenge.status == 'valid'
assert challenge.token == 'foo'
assert challenge.to_json() == data
data = {
'uri': 'xxx',
'type': 'type',
'status': 'valid',
}
client.version = 1
challenge = Challenge.from_json(client, data)
assert challenge.data == data
assert challenge.type == 'type'
assert challenge.url == 'xxx'
assert challenge.status == 'valid'
assert challenge.token is None
assert challenge.to_json() == data
def test_authorization_from_to_json():
client = MagicMock()
client.version = 2
data = {
'challenges': [],
'status': 'valid',
'identifier': {
'type': 'dns',
'value': 'example.com',
},
}
authz = Authorization.from_json(client, data, 'xxx')
assert authz.url == 'xxx'
assert authz.status == 'valid'
assert authz.identifier == 'example.com'
assert authz.identifier_type == 'dns'
assert authz.challenges == []
assert authz.to_json() == {
'uri': 'xxx',
'challenges': [],
'status': 'valid',
'identifier': {
'type': 'dns',
'value': 'example.com',
},
}
data = {
'challenges': [
{
'url': 'xxxyyy',
'type': 'type',
'status': 'valid',
}
],
'status': 'valid',
'identifier': {
'type': 'dns',
'value': 'example.com',
},
'wildcard': True,
}
authz = Authorization.from_json(client, data, 'xxx')
assert authz.url == 'xxx'
assert authz.status == 'valid'
assert authz.identifier == '*.example.com'
assert authz.identifier_type == 'dns'
assert len(authz.challenges) == 1
assert authz.challenges[0].data == {
'url': 'xxxyyy',
'type': 'type',
'status': 'valid',
}
assert authz.to_json() == {
'uri': 'xxx',
'challenges': [
{
'url': 'xxxyyy',
'type': 'type',
'status': 'valid',
}
],
'status': 'valid',
'identifier': {
'type': 'dns',
'value': 'example.com',
},
'wildcard': True,
}
client.version = 1
data = {
'challenges': [],
'identifier': {
'type': 'dns',
'value': 'example.com',
},
}
authz = Authorization.from_json(client, data, 'xxx')
assert authz.url == 'xxx'
assert authz.status == 'pending'
assert authz.identifier == 'example.com'
assert authz.identifier_type == 'dns'
assert authz.challenges == []
assert authz.to_json() == {
'uri': 'xxx',
'challenges': [],
'identifier': {
'type': 'dns',
'value': 'example.com',
},
}
def test_authorization_create_error():
client = MagicMock()
client.version = 2
client.directory.directory = {}
with pytest.raises(ACMEProtocolException) as exc:
Authorization.create(client, 'dns', 'example.com')
assert exc.value.msg == 'ACME endpoint does not support pre-authorization.'
def test_wait_for_validation_error():
client = MagicMock()
client.version = 2
data = {
'challenges': [
{
'url': 'xxxyyy1',
'type': 'dns-01',
'status': 'invalid',
'error': {
'type': 'dns-failed',
'subproblems': [
{
'type': 'subproblem',
'detail': 'example.com DNS-01 validation failed',
},
]
},
},
{
'url': 'xxxyyy2',
'type': 'http-01',
'status': 'invalid',
'error': {
'type': 'http-failed',
'subproblems': [
{
'type': 'subproblem',
'detail': 'example.com HTTP-01 validation failed',
},
]
},
},
{
'url': 'xxxyyy3',
'type': 'something-else',
'status': 'valid',
},
],
'status': 'invalid',
'identifier': {
'type': 'dns',
'value': 'example.com',
},
}
client.get_request = MagicMock(return_value=(data, {}))
authz = Authorization.from_json(client, data, 'xxx')
with pytest.raises(ACMEProtocolException) as exc:
authz.wait_for_validation(client, 'dns')
assert exc.value.msg == (
'Failed to validate challenge for dns:example.com: Status is "invalid". Challenge dns-01: Error dns-failed Subproblems:\n'
'(dns-01.0) Error subproblem: "example.com DNS-01 validation failed"; Challenge http-01: Error http-failed Subproblems:\n'
'(http-01.0) Error subproblem: "example.com HTTP-01 validation failed".'
)
data = data.copy()
data['uri'] = 'xxx'
assert exc.value.module_fail_args == {
'identifier': 'dns:example.com',
'authorization': data,
}

View File

@@ -0,0 +1,374 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from mock import MagicMock
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
format_error_problem,
ACMEProtocolException,
)
TEST_FORMAT_ERROR_PROBLEM = [
(
{
'type': 'foo',
},
'',
'Error foo'
),
(
{
'type': 'foo',
'title': 'bar'
},
'',
'Error "bar" (foo)'
),
(
{
'type': 'foo',
'detail': 'bar baz'
},
'',
'Error foo: "bar baz"'
),
(
{
'type': 'foo',
'subproblems': []
},
'',
'Error foo Subproblems:'
),
(
{
'type': 'foo',
'subproblems': [
{
'type': 'bar',
},
]
},
'',
'Error foo Subproblems:\n(0) Error bar'
),
(
{
'type': 'foo',
'subproblems': [
{
'type': 'bar',
'subproblems': [
{
'type': 'baz',
},
]
},
]
},
'',
'Error foo Subproblems:\n(0) Error bar Subproblems:\n(0.0) Error baz'
),
(
{
'type': 'foo',
'title': 'Foo Error',
'detail': 'Foo went wrong',
'subproblems': [
{
'type': 'bar',
'detail': 'Bar went wrong',
'subproblems': [
{
'type': 'baz',
'title': 'Baz Error',
},
]
},
{
'type': 'bar2',
'title': 'Bar 2 Error',
'detail': 'Bar really went wrong'
},
]
},
'X.',
'Error "Foo Error" (foo): "Foo went wrong" Subproblems:\n'
'(X.0) Error bar: "Bar went wrong" Subproblems:\n'
'(X.0.0) Error "Baz Error" (baz)\n'
'(X.1) Error "Bar 2 Error" (bar2): "Bar really went wrong"'
),
]
@pytest.mark.parametrize("problem, subproblem_prefix, result", TEST_FORMAT_ERROR_PROBLEM)
def test_format_error_problem(problem, subproblem_prefix, result):
res = format_error_problem(problem, subproblem_prefix)
assert res == result
def create_regular_response(response_text):
response = MagicMock()
response.read = MagicMock(return_value=response_text.encode('utf-8'))
return response
def create_error_response():
response = MagicMock()
response.read = MagicMock(side_effect=AttributeError('read'))
return response
def create_decode_error(msg):
def f(content):
raise Exception(msg)
return f
TEST_ACME_PROTOCOL_EXCEPTION = [
(
{},
None,
'ACME request failed.',
{
},
),
(
{
'msg': 'Foo',
'extras': {
'foo': 'bar',
},
},
None,
'Foo.',
{
'foo': 'bar',
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 201,
},
},
None,
'ACME request failed for https://ca.example.com/foo with HTTP status 201.',
{
'http_url': 'https://ca.example.com/foo',
'http_status': 201,
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 201,
},
'response': create_regular_response('xxx'),
},
None,
'ACME request failed for https://ca.example.com/foo with HTTP status 201. The raw error result: xxx',
{
'http_url': 'https://ca.example.com/foo',
'http_status': 201,
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 201,
},
'response': create_regular_response('xxx'),
},
create_decode_error('yyy'),
'ACME request failed for https://ca.example.com/foo with HTTP status 201. The raw error result: xxx',
{
'http_url': 'https://ca.example.com/foo',
'http_status': 201,
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 201,
},
'response': create_regular_response('xxx'),
},
lambda content: dict(foo='bar'),
"ACME request failed for https://ca.example.com/foo with HTTP status 201. The JSON error result: {'foo': 'bar'}",
{
'http_url': 'https://ca.example.com/foo',
'http_status': 201,
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 201,
},
'response': create_error_response(),
},
None,
'ACME request failed for https://ca.example.com/foo with HTTP status 201.',
{
'http_url': 'https://ca.example.com/foo',
'http_status': 201,
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 201,
'body': 'xxx',
},
'response': create_error_response(),
},
lambda content: dict(foo='bar'),
"ACME request failed for https://ca.example.com/foo with HTTP status 201. The JSON error result: {'foo': 'bar'}",
{
'http_url': 'https://ca.example.com/foo',
'http_status': 201,
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 201,
},
'content': 'xxx',
},
None,
"ACME request failed for https://ca.example.com/foo with HTTP status 201. The raw error result: xxx",
{
'http_url': 'https://ca.example.com/foo',
'http_status': 201,
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 400,
},
'content_json': {
'foo': 'bar',
},
'extras': {
'bar': 'baz',
}
},
None,
"ACME request failed for https://ca.example.com/foo with HTTP status 400. The JSON error result: {'foo': 'bar'}",
{
'http_url': 'https://ca.example.com/foo',
'http_status': 400,
'bar': 'baz',
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 201,
},
'content_json': {
'type': 'foo',
},
},
None,
"ACME request failed for https://ca.example.com/foo with HTTP status 201. The JSON error result: {'type': 'foo'}",
{
'http_url': 'https://ca.example.com/foo',
'http_status': 201,
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 400,
},
'content_json': {
'type': 'foo',
},
},
None,
"ACME request failed for https://ca.example.com/foo with status 400. Error foo.",
{
'http_url': 'https://ca.example.com/foo',
'http_status': 400,
'problem': {
'type': 'foo',
},
'subproblems': [],
},
),
(
{
'info': {
'url': 'https://ca.example.com/foo',
'status': 400,
},
'content_json': {
'type': 'foo',
'title': 'Foo Error',
'subproblems': [
{
'type': 'bar',
'detail': 'This is a bar error',
'details': 'Details.',
},
],
},
},
None,
"ACME request failed for https://ca.example.com/foo with status 400. Error \"Foo Error\" (foo). Subproblems:\n"
"(0) Error bar: \"This is a bar error\".",
{
'http_url': 'https://ca.example.com/foo',
'http_status': 400,
'problem': {
'type': 'foo',
'title': 'Foo Error',
},
'subproblems': [
{
'type': 'bar',
'detail': 'This is a bar error',
'details': 'Details.',
},
],
},
),
]
@pytest.mark.parametrize("input, from_json, msg, args", TEST_ACME_PROTOCOL_EXCEPTION)
def test_acme_protocol_exception(input, from_json, msg, args):
if from_json is None:
module = None
else:
module = MagicMock()
module.from_json = from_json
with pytest.raises(ACMEProtocolException) as exc:
raise ACMEProtocolException(module, **input)
print(exc.value.msg)
print(exc.value.module_fail_args)
print(msg)
print(args)
assert exc.value.msg == msg
assert exc.value.module_fail_args == args

View File

@@ -0,0 +1,56 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from mock import MagicMock
from ansible_collections.community.crypto.plugins.module_utils.acme.orders import (
Order,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ACMEProtocolException,
ModuleFailException,
)
def test_order_from_json():
client = MagicMock()
data = {
'status': 'valid',
'identifiers': [],
'authorizations': [],
}
client.version = 2
order = Order.from_json(client, data, 'xxx')
assert order.data == data
assert order.url == 'xxx'
assert order.status == 'valid'
assert order.identifiers == []
assert order.finalize_uri is None
assert order.certificate_uri is None
assert order.authorization_uris == []
assert order.authorizations == {}
def test_wait_for_finalization_error():
client = MagicMock()
client.version = 2
data = {
'status': 'invalid',
'identifiers': [],
'authorizations': [],
}
order = Order.from_json(client, data, 'xxx')
client.get_request = MagicMock(return_value=(data, {}))
with pytest.raises(ACMEProtocolException) as exc:
order.wait_for_finalization(client)
assert exc.value.msg.startswith('Failed to wait for order to complete; got status "invalid". The JSON result: ')
assert exc.value.module_fail_args == {}