Initial commit

This commit is contained in:
Ansible Core Team
2020-03-09 13:11:34 +00:00
commit a9f45b4d5b
249 changed files with 37903 additions and 0 deletions

0
tests/unit/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,33 @@
# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
#
# Compat for python2.7
#
# One unittest needs to import builtins via __import__() so we need to have
# the string that represents it
try:
import __builtin__
except ImportError:
BUILTINS = 'builtins'
else:
BUILTINS = '__builtin__'

122
tests/unit/compat/mock.py Normal file
View File

@@ -0,0 +1,122 @@
# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
'''
Compat module for Python3.x's unittest.mock module
'''
import sys
# Python 2.7
# Note: Could use the pypi mock library on python3.x as well as python2.x. It
# is the same as the python3 stdlib mock library
try:
# Allow wildcard import because we really do want to import all of mock's
# symbols into this compat shim
# pylint: disable=wildcard-import,unused-wildcard-import
from unittest.mock import *
except ImportError:
# Python 2
# pylint: disable=wildcard-import,unused-wildcard-import
try:
from mock import *
except ImportError:
print('You need the mock library installed on python2.x to run tests')
# Prior to 3.4.4, mock_open cannot handle binary read_data
if sys.version_info >= (3,) and sys.version_info < (3, 4, 4):
file_spec = None
def _iterate_read_data(read_data):
# Helper for mock_open:
# Retrieve lines from read_data via a generator so that separate calls to
# readline, read, and readlines are properly interleaved
sep = b'\n' if isinstance(read_data, bytes) else '\n'
data_as_list = [l + sep for l in read_data.split(sep)]
if data_as_list[-1] == sep:
# If the last line ended in a newline, the list comprehension will have an
# extra entry that's just a newline. Remove this.
data_as_list = data_as_list[:-1]
else:
# If there wasn't an extra newline by itself, then the file being
# emulated doesn't have a newline to end the last line remove the
# newline that our naive format() added
data_as_list[-1] = data_as_list[-1][:-1]
for line in data_as_list:
yield line
def mock_open(mock=None, read_data=''):
"""
A helper function to create a mock to replace the use of `open`. It works
for `open` called directly or used as a context manager.
The `mock` argument is the mock object to configure. If `None` (the
default) then a `MagicMock` will be created for you, with the API limited
to methods or attributes available on standard file handles.
`read_data` is a string for the `read` methoddline`, and `readlines` of the
file handle to return. This is an empty string by default.
"""
def _readlines_side_effect(*args, **kwargs):
if handle.readlines.return_value is not None:
return handle.readlines.return_value
return list(_data)
def _read_side_effect(*args, **kwargs):
if handle.read.return_value is not None:
return handle.read.return_value
return type(read_data)().join(_data)
def _readline_side_effect():
if handle.readline.return_value is not None:
while True:
yield handle.readline.return_value
for line in _data:
yield line
global file_spec
if file_spec is None:
import _io
file_spec = list(set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO))))
if mock is None:
mock = MagicMock(name='open', spec=open)
handle = MagicMock(spec=file_spec)
handle.__enter__.return_value = handle
_data = _iterate_read_data(read_data)
handle.write.return_value = None
handle.read.return_value = None
handle.readline.return_value = None
handle.readlines.return_value = None
handle.read.side_effect = _read_side_effect
handle.readline.side_effect = _readline_side_effect()
handle.readlines.side_effect = _readlines_side_effect
mock.return_value = handle
return mock

View File

@@ -0,0 +1,38 @@
# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
'''
Compat module for Python2.7's unittest module
'''
import sys
# Allow wildcard import because we really do want to import all of
# unittests's symbols into this compat shim
# pylint: disable=wildcard-import,unused-wildcard-import
if sys.version_info < (2, 7):
try:
# Need unittest2 on python2.6
from unittest2 import *
except ImportError:
print('You need unittest2 installed on python2.6.x to run tests')
else:
from unittest import *

View File

116
tests/unit/mock/loader.py Normal file
View File

@@ -0,0 +1,116 @@
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
from ansible.errors import AnsibleParserError
from ansible.parsing.dataloader import DataLoader
from ansible.module_utils._text import to_bytes, to_text
class DictDataLoader(DataLoader):
def __init__(self, file_mapping=None):
file_mapping = {} if file_mapping is None else file_mapping
assert type(file_mapping) == dict
super(DictDataLoader, self).__init__()
self._file_mapping = file_mapping
self._build_known_directories()
self._vault_secrets = None
def load_from_file(self, path, cache=True, unsafe=False):
path = to_text(path)
if path in self._file_mapping:
return self.load(self._file_mapping[path], path)
return None
# TODO: the real _get_file_contents returns a bytestring, so we actually convert the
# unicode/text it's created with to utf-8
def _get_file_contents(self, path):
path = to_text(path)
if path in self._file_mapping:
return (to_bytes(self._file_mapping[path]), False)
else:
raise AnsibleParserError("file not found: %s" % path)
def path_exists(self, path):
path = to_text(path)
return path in self._file_mapping or path in self._known_directories
def is_file(self, path):
path = to_text(path)
return path in self._file_mapping
def is_directory(self, path):
path = to_text(path)
return path in self._known_directories
def list_directory(self, path):
ret = []
path = to_text(path)
for x in (list(self._file_mapping.keys()) + self._known_directories):
if x.startswith(path):
if os.path.dirname(x) == path:
ret.append(os.path.basename(x))
return ret
def is_executable(self, path):
# FIXME: figure out a way to make paths return true for this
return False
def _add_known_directory(self, directory):
if directory not in self._known_directories:
self._known_directories.append(directory)
def _build_known_directories(self):
self._known_directories = []
for path in self._file_mapping:
dirname = os.path.dirname(path)
while dirname not in ('/', ''):
self._add_known_directory(dirname)
dirname = os.path.dirname(dirname)
def push(self, path, content):
rebuild_dirs = False
if path not in self._file_mapping:
rebuild_dirs = True
self._file_mapping[path] = content
if rebuild_dirs:
self._build_known_directories()
def pop(self, path):
if path in self._file_mapping:
del self._file_mapping[path]
self._build_known_directories()
def clear(self):
self._file_mapping = dict()
self._known_directories = []
def get_basedir(self):
return os.getcwd()
def set_vault_secrets(self, vault_secrets):
self._vault_secrets = vault_secrets

5
tests/unit/mock/path.py Normal file
View File

@@ -0,0 +1,5 @@
from ansible_collections.community.crypto.tests.unit.compat.mock import MagicMock
from ansible.utils.path import unfrackpath
mock_unfrackpath_noop = MagicMock(spec_set=unfrackpath, side_effect=lambda x, *args, **kwargs: x)

View File

@@ -0,0 +1,90 @@
# (c) 2016, Matt Davis <mdavis@ansible.com>
# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import sys
import json
from contextlib import contextmanager
from io import BytesIO, StringIO
from ansible_collections.community.crypto.tests.unit.compat import unittest
from ansible.module_utils.six import PY3
from ansible.module_utils._text import to_bytes
@contextmanager
def swap_stdin_and_argv(stdin_data='', argv_data=tuple()):
"""
context manager that temporarily masks the test runner's values for stdin and argv
"""
real_stdin = sys.stdin
real_argv = sys.argv
if PY3:
fake_stream = StringIO(stdin_data)
fake_stream.buffer = BytesIO(to_bytes(stdin_data))
else:
fake_stream = BytesIO(to_bytes(stdin_data))
try:
sys.stdin = fake_stream
sys.argv = argv_data
yield
finally:
sys.stdin = real_stdin
sys.argv = real_argv
@contextmanager
def swap_stdout():
"""
context manager that temporarily replaces stdout for tests that need to verify output
"""
old_stdout = sys.stdout
if PY3:
fake_stream = StringIO()
else:
fake_stream = BytesIO()
try:
sys.stdout = fake_stream
yield fake_stream
finally:
sys.stdout = old_stdout
class ModuleTestCase(unittest.TestCase):
def setUp(self, module_args=None):
if module_args is None:
module_args = {'_ansible_remote_tmp': '/tmp', '_ansible_keep_remote_files': False}
args = json.dumps(dict(ANSIBLE_MODULE_ARGS=module_args))
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
self.stdin_swap.__enter__()
def tearDown(self):
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
self.stdin_swap.__exit__(None, None, None)

View File

@@ -0,0 +1,39 @@
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.module_utils._text import to_bytes
from ansible.parsing.vault import VaultSecret
class TextVaultSecret(VaultSecret):
'''A secret piece of text. ie, a password. Tracks text encoding.
The text encoding of the text may not be the default text encoding so
we keep track of the encoding so we encode it to the same bytes.'''
def __init__(self, text, encoding=None, errors=None, _bytes=None):
super(TextVaultSecret, self).__init__()
self.text = text
self.encoding = encoding or 'utf-8'
self._bytes = _bytes
self.errors = errors or 'strict'
@property
def bytes(self):
'''The text encoded with encoding, unless we specifically set _bytes.'''
return self._bytes or to_bytes(self.text, encoding=self.encoding, errors=self.errors)

View File

@@ -0,0 +1,121 @@
import io
import yaml
from ansible.module_utils.six import PY3
from ansible.parsing.yaml.loader import AnsibleLoader
from ansible.parsing.yaml.dumper import AnsibleDumper
class YamlTestUtils(object):
"""Mixin class to combine with a unittest.TestCase subclass."""
def _loader(self, stream):
"""Vault related tests will want to override this.
Vault cases should setup a AnsibleLoader that has the vault password."""
return AnsibleLoader(stream)
def _dump_stream(self, obj, stream, dumper=None):
"""Dump to a py2-unicode or py3-string stream."""
if PY3:
return yaml.dump(obj, stream, Dumper=dumper)
else:
return yaml.dump(obj, stream, Dumper=dumper, encoding=None)
def _dump_string(self, obj, dumper=None):
"""Dump to a py2-unicode or py3-string"""
if PY3:
return yaml.dump(obj, Dumper=dumper)
else:
return yaml.dump(obj, Dumper=dumper, encoding=None)
def _dump_load_cycle(self, obj):
# Each pass though a dump or load revs the 'generation'
# obj to yaml string
string_from_object_dump = self._dump_string(obj, dumper=AnsibleDumper)
# wrap a stream/file like StringIO around that yaml
stream_from_object_dump = io.StringIO(string_from_object_dump)
loader = self._loader(stream_from_object_dump)
# load the yaml stream to create a new instance of the object (gen 2)
obj_2 = loader.get_data()
# dump the gen 2 objects directory to strings
string_from_object_dump_2 = self._dump_string(obj_2,
dumper=AnsibleDumper)
# The gen 1 and gen 2 yaml strings
self.assertEqual(string_from_object_dump, string_from_object_dump_2)
# the gen 1 (orig) and gen 2 py object
self.assertEqual(obj, obj_2)
# again! gen 3... load strings into py objects
stream_3 = io.StringIO(string_from_object_dump_2)
loader_3 = self._loader(stream_3)
obj_3 = loader_3.get_data()
string_from_object_dump_3 = self._dump_string(obj_3, dumper=AnsibleDumper)
self.assertEqual(obj, obj_3)
# should be transitive, but...
self.assertEqual(obj_2, obj_3)
self.assertEqual(string_from_object_dump, string_from_object_dump_3)
def _old_dump_load_cycle(self, obj):
'''Dump the passed in object to yaml, load it back up, dump again, compare.'''
stream = io.StringIO()
yaml_string = self._dump_string(obj, dumper=AnsibleDumper)
self._dump_stream(obj, stream, dumper=AnsibleDumper)
yaml_string_from_stream = stream.getvalue()
# reset stream
stream.seek(0)
loader = self._loader(stream)
# loader = AnsibleLoader(stream, vault_password=self.vault_password)
obj_from_stream = loader.get_data()
stream_from_string = io.StringIO(yaml_string)
loader2 = self._loader(stream_from_string)
# loader2 = AnsibleLoader(stream_from_string, vault_password=self.vault_password)
obj_from_string = loader2.get_data()
stream_obj_from_stream = io.StringIO()
stream_obj_from_string = io.StringIO()
if PY3:
yaml.dump(obj_from_stream, stream_obj_from_stream, Dumper=AnsibleDumper)
yaml.dump(obj_from_stream, stream_obj_from_string, Dumper=AnsibleDumper)
else:
yaml.dump(obj_from_stream, stream_obj_from_stream, Dumper=AnsibleDumper, encoding=None)
yaml.dump(obj_from_stream, stream_obj_from_string, Dumper=AnsibleDumper, encoding=None)
yaml_string_stream_obj_from_stream = stream_obj_from_stream.getvalue()
yaml_string_stream_obj_from_string = stream_obj_from_string.getvalue()
stream_obj_from_stream.seek(0)
stream_obj_from_string.seek(0)
if PY3:
yaml_string_obj_from_stream = yaml.dump(obj_from_stream, Dumper=AnsibleDumper)
yaml_string_obj_from_string = yaml.dump(obj_from_string, Dumper=AnsibleDumper)
else:
yaml_string_obj_from_stream = yaml.dump(obj_from_stream, Dumper=AnsibleDumper, encoding=None)
yaml_string_obj_from_string = yaml.dump(obj_from_string, Dumper=AnsibleDumper, encoding=None)
assert yaml_string == yaml_string_obj_from_stream
assert yaml_string == yaml_string_obj_from_stream == yaml_string_obj_from_string
assert (yaml_string == yaml_string_obj_from_stream == yaml_string_obj_from_string == yaml_string_stream_obj_from_stream ==
yaml_string_stream_obj_from_string)
assert obj == obj_from_stream
assert obj == obj_from_string
assert obj == yaml_string_obj_from_stream
assert obj == yaml_string_obj_from_string
assert obj == obj_from_stream == obj_from_string == yaml_string_obj_from_stream == yaml_string_obj_from_string
return {'obj': obj,
'yaml_string': yaml_string,
'yaml_string_from_stream': yaml_string_from_stream,
'obj_from_stream': obj_from_stream,
'obj_from_string': obj_from_string,
'yaml_string_obj_from_string': yaml_string_obj_from_string}

View File

View File

View File

@@ -0,0 +1,11 @@
-----BEGIN CERTIFICATE-----
MIIBljCCATugAwIBAgIBATAKBggqhkjOPQQDAjAWMRQwEgYDVQQDEwthbnNpYmxl
LmNvbTAeFw0xODExMjUxNTI4MjNaFw0xODExMjYxNTI4MjRaMBYxFDASBgNVBAMT
C2Fuc2libGUuY29tMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEAJz0yAAXAwEm
OhTRkjXxwgedbWO6gobYM3lWszrS68G8QSzhXR6AmQ3IzZDimnTTXO7XhVylDT8S
LzE44/Epm6N6MHgwIwYDVR0RBBwwGoILZXhhbXBsZS5jb22CC2V4YW1wbGUub3Jn
MAwGA1UdEwEB/wQCMAAwDwYDVR0PAQH/BAUDAweAADATBgNVHSUEDDAKBggrBgEF
BQcDATAdBgNVHQ4EFgQUmNL9PMzNaUX74owwLFRiGDS3B3MwCgYIKoZIzj0EAwID
SQAwRgIhALz7Ur96ky0OfM5D9MwFmCg2jccqm/UglGI9+4KeOEIyAiEAwFX4tdll
QSrd1HY/jMsHwdK5wH3JkK/9+fGwyRP11VI=
-----END CERTIFICATE-----

View File

@@ -0,0 +1,9 @@
-----BEGIN NEW CERTIFICATE REQUEST-----
MIIBJTCBzQIBADAWMRQwEgYDVQQDEwthbnNpYmxlLmNvbTBZMBMGByqGSM49AgEG
CCqGSM49AwEHA0IABACc9MgAFwMBJjoU0ZI18cIHnW1juoKG2DN5VrM60uvBvEEs
4V0egJkNyM2Q4pp001zu14VcpQ0/Ei8xOOPxKZugVTBTBgkqhkiG9w0BCQ4xRjBE
MCMGA1UdEQQcMBqCC2V4YW1wbGUuY29tggtleGFtcGxlLm9yZzAMBgNVHRMBAf8E
AjAAMA8GA1UdDwEB/wQFAwMHgAAwCgYIKoZIzj0EAwIDRwAwRAIgcDyoRmwFVBDl
FvbFZtiSd5wmJU1ltM6JtcfnLWnjY54CICruOByrropFUkOKKb4xXOYsgaDT93Wr
URnCJfTLr2T3
-----END NEW CERTIFICATE REQUEST-----

View File

@@ -0,0 +1,28 @@
Certificate Request:
Data:
Version: 1 (0x0)
Subject: CN = ansible.com
Subject Public Key Info:
Public Key Algorithm: id-ecPublicKey
Public-Key: (256 bit)
pub:
04:00:9c:f4:c8:00:17:03:01:26:3a:14:d1:92:35:
f1:c2:07:9d:6d:63:ba:82:86:d8:33:79:56:b3:3a:
d2:eb:c1:bc:41:2c:e1:5d:1e:80:99:0d:c8:cd:90:
e2:9a:74:d3:5c:ee:d7:85:5c:a5:0d:3f:12:2f:31:
38:e3:f1:29:9b
ASN1 OID: prime256v1
NIST CURVE: P-256
Attributes:
Requested Extensions:
X509v3 Subject Alternative Name:
DNS:example.com, DNS:example.org
X509v3 Basic Constraints: critical
CA:FALSE
X509v3 Key Usage: critical
Digital Signature
Signature Algorithm: ecdsa-with-SHA256
30:44:02:20:70:3c:a8:46:6c:05:54:10:e5:16:f6:c5:66:d8:
92:77:9c:26:25:4d:65:b4:ce:89:b5:c7:e7:2d:69:e3:63:9e:
02:20:2a:ee:38:1c:ab:ae:8a:45:52:43:8a:29:be:31:5c:e6:
2c:81:a0:d3:f7:75:ab:51:19:c2:25:f4:cb:af:64:f7

View File

@@ -0,0 +1,27 @@
-----BEGIN CERTIFICATE REQUEST-----
MIIEqjCCApICAQAwADCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBANv1
V7gDsh76O//d9wclBcW6kNpWeR6eAggzThwbMZjcO7GFHQsBZCZGGVdyS37uhejc
RrIBdtDDWXhoh3Dz+GQxD+6GuwAEFyL1F3MfT0v1HHoO8fE74G5mD6+ZA2HRDeU9
jf8BPyVWHBtNbCmJGSlSNOFejWCmwvsLARQxqFBuTyRjgos4BkLyWMqZRukrzO1P
z7IBhuFrB608t+AG4vGnPXZNM7xefhzO8bPOiepT0YS2ERPkFmOy97SnwTGdKykw
ZYM9oKukYhE4Z+yOaTFpJMBNXwDCI5TMnhtc6eJrf5sOFH92n2E9+YWMoahUOiTw
G6XV5HfSpySpwORUaTITQRsPAM+bmK9f1jB6ctfFVwpa8uW/h8pSgbHgZvkeD6s6
rFLh9TQ24t0vrRmhnY7/AMFgbgJoBTBq0l0lEXS4FCGKDGqQOqSws+eHR/pHA4uY
v8d498SQl9fYsT/c7Uj3/JnMSRVN942yQUFCzwLf0/WzWCi2HTqPM8CPh5ryiJ30
GAN2eb026/noyTOXm479Tg9o86Tw9qczE0j0CdcRnr6J337RGHQg58PZ7j+hnUmK
wgyclyvjE10ZFBgToMGSnzYp5UeRcOFZ3bnK6LOsGC75mIvz2OQgSQeO5VQASEnO
9uhygNyo91sK4BtVroloit8ZCa82LlsHSCj/mMzPAgMBAAGgZTBjBgkqhkiG9w0B
CQ4xVjBUMFIGA1UdEQRLMEmCC2Fuc2libGUuY29thwR/AAABhxAAAAAAAAAAAAAA
AAAAAAABhxAgAQ2IrBD+AQAAAAAAAAAAhxAgARI0VnirzZh2VDIQ/ty6MA0GCSqG
SIb3DQEBCwUAA4ICAQBFRuANzVRcze+iur0YevjtYIXDa03GoWWkgnLuE8u8epTM
2248duG3TmvVvxWPN4iFrvFcZIvNsevBo+Z7kXJ24m3YldtXvwfAYmCZ062apSoh
yzgo3Q0KfDehwLcoJPe5bh+jbbgJVGGvJug/QFyHSVl+iGyFUXE7pwafl9LuNDi3
yfOYZLIQ34mBH4Rsvymj9xSTYliWDEEU/o7RrrZeEqkOxNeLh64LbnifdrYUputz
yBURg2xs9hpAsytZJX90iJW8aYPM1aQ7eetqTViIRoqUAmIQobnKlNnpOliBHl+p
RY+AtTnsfAetKUP7OsAZkHRTGAXx0JHJQ1ITY8w5Dcw/v1bDCbAfkDubBP3X+us9
RQk2h6m74hWFFNu9xOfkNejPf7h4gywfDjo/wGZFSWKyi6avB9V53znZgRUwc009
p5MM9e37MH8pyBqfnbSwOj4hUoyecRCIAFdywjMb9akP2u15XP3MOtJOEvecyCxN
TZBxupTg65zB47GeSAufnc8FaTZkE8xPuCtbvqOVOkWYqzlqNdCfK8f3AZdlpwLh
38wdUm5G7LIu6aQNiY66aQs9qVpoGvqdmxHRkuSwqwZxGgzcY1yJaWGXQ6R4jgC3
VKlMTUVs1WYV6jrYLHcVt6Rn/2FVTOns3Jn6cTPOdKViYoqF+yW8yCEAqAskZw==
-----END CERTIFICATE REQUEST-----

View File

@@ -0,0 +1,78 @@
Certificate Request:
Data:
Version: 1 (0x0)
Subject:
Subject Public Key Info:
Public Key Algorithm: rsaEncryption
RSA Public-Key: (4096 bit)
Modulus:
00:db:f5:57:b8:03:b2:1e:fa:3b:ff:dd:f7:07:25:
05:c5:ba:90:da:56:79:1e:9e:02:08:33:4e:1c:1b:
31:98:dc:3b:b1:85:1d:0b:01:64:26:46:19:57:72:
4b:7e:ee:85:e8:dc:46:b2:01:76:d0:c3:59:78:68:
87:70:f3:f8:64:31:0f:ee:86:bb:00:04:17:22:f5:
17:73:1f:4f:4b:f5:1c:7a:0e:f1:f1:3b:e0:6e:66:
0f:af:99:03:61:d1:0d:e5:3d:8d:ff:01:3f:25:56:
1c:1b:4d:6c:29:89:19:29:52:34:e1:5e:8d:60:a6:
c2:fb:0b:01:14:31:a8:50:6e:4f:24:63:82:8b:38:
06:42:f2:58:ca:99:46:e9:2b:cc:ed:4f:cf:b2:01:
86:e1:6b:07:ad:3c:b7:e0:06:e2:f1:a7:3d:76:4d:
33:bc:5e:7e:1c:ce:f1:b3:ce:89:ea:53:d1:84:b6:
11:13:e4:16:63:b2:f7:b4:a7:c1:31:9d:2b:29:30:
65:83:3d:a0:ab:a4:62:11:38:67:ec:8e:69:31:69:
24:c0:4d:5f:00:c2:23:94:cc:9e:1b:5c:e9:e2:6b:
7f:9b:0e:14:7f:76:9f:61:3d:f9:85:8c:a1:a8:54:
3a:24:f0:1b:a5:d5:e4:77:d2:a7:24:a9:c0:e4:54:
69:32:13:41:1b:0f:00:cf:9b:98:af:5f:d6:30:7a:
72:d7:c5:57:0a:5a:f2:e5:bf:87:ca:52:81:b1:e0:
66:f9:1e:0f:ab:3a:ac:52:e1:f5:34:36:e2:dd:2f:
ad:19:a1:9d:8e:ff:00:c1:60:6e:02:68:05:30:6a:
d2:5d:25:11:74:b8:14:21:8a:0c:6a:90:3a:a4:b0:
b3:e7:87:47:fa:47:03:8b:98:bf:c7:78:f7:c4:90:
97:d7:d8:b1:3f:dc:ed:48:f7:fc:99:cc:49:15:4d:
f7:8d:b2:41:41:42:cf:02:df:d3:f5:b3:58:28:b6:
1d:3a:8f:33:c0:8f:87:9a:f2:88:9d:f4:18:03:76:
79:bd:36:eb:f9:e8:c9:33:97:9b:8e:fd:4e:0f:68:
f3:a4:f0:f6:a7:33:13:48:f4:09:d7:11:9e:be:89:
df:7e:d1:18:74:20:e7:c3:d9:ee:3f:a1:9d:49:8a:
c2:0c:9c:97:2b:e3:13:5d:19:14:18:13:a0:c1:92:
9f:36:29:e5:47:91:70:e1:59:dd:b9:ca:e8:b3:ac:
18:2e:f9:98:8b:f3:d8:e4:20:49:07:8e:e5:54:00:
48:49:ce:f6:e8:72:80:dc:a8:f7:5b:0a:e0:1b:55:
ae:89:68:8a:df:19:09:af:36:2e:5b:07:48:28:ff:
98:cc:cf
Exponent: 65537 (0x10001)
Attributes:
Requested Extensions:
X509v3 Subject Alternative Name:
DNS:ansible.com, IP Address:127.0.0.1, IP Address:0:0:0:0:0:0:0:1, IP Address:2001:D88:AC10:FE01:0:0:0:0, IP Address:2001:1234:5678:ABCD:9876:5432:10FE:DCBA
Signature Algorithm: sha256WithRSAEncryption
45:46:e0:0d:cd:54:5c:cd:ef:a2:ba:bd:18:7a:f8:ed:60:85:
c3:6b:4d:c6:a1:65:a4:82:72:ee:13:cb:bc:7a:94:cc:db:6e:
3c:76:e1:b7:4e:6b:d5:bf:15:8f:37:88:85:ae:f1:5c:64:8b:
cd:b1:eb:c1:a3:e6:7b:91:72:76:e2:6d:d8:95:db:57:bf:07:
c0:62:60:99:d3:ad:9a:a5:2a:21:cb:38:28:dd:0d:0a:7c:37:
a1:c0:b7:28:24:f7:b9:6e:1f:a3:6d:b8:09:54:61:af:26:e8:
3f:40:5c:87:49:59:7e:88:6c:85:51:71:3b:a7:06:9f:97:d2:
ee:34:38:b7:c9:f3:98:64:b2:10:df:89:81:1f:84:6c:bf:29:
a3:f7:14:93:62:58:96:0c:41:14:fe:8e:d1:ae:b6:5e:12:a9:
0e:c4:d7:8b:87:ae:0b:6e:78:9f:76:b6:14:a6:eb:73:c8:15:
11:83:6c:6c:f6:1a:40:b3:2b:59:25:7f:74:88:95:bc:69:83:
cc:d5:a4:3b:79:eb:6a:4d:58:88:46:8a:94:02:62:10:a1:b9:
ca:94:d9:e9:3a:58:81:1e:5f:a9:45:8f:80:b5:39:ec:7c:07:
ad:29:43:fb:3a:c0:19:90:74:53:18:05:f1:d0:91:c9:43:52:
13:63:cc:39:0d:cc:3f:bf:56:c3:09:b0:1f:90:3b:9b:04:fd:
d7:fa:eb:3d:45:09:36:87:a9:bb:e2:15:85:14:db:bd:c4:e7:
e4:35:e8:cf:7f:b8:78:83:2c:1f:0e:3a:3f:c0:66:45:49:62:
b2:8b:a6:af:07:d5:79:df:39:d9:81:15:30:73:4d:3d:a7:93:
0c:f5:ed:fb:30:7f:29:c8:1a:9f:9d:b4:b0:3a:3e:21:52:8c:
9e:71:10:88:00:57:72:c2:33:1b:f5:a9:0f:da:ed:79:5c:fd:
cc:3a:d2:4e:12:f7:9c:c8:2c:4d:4d:90:71:ba:94:e0:eb:9c:
c1:e3:b1:9e:48:0b:9f:9d:cf:05:69:36:64:13:cc:4f:b8:2b:
5b:be:a3:95:3a:45:98:ab:39:6a:35:d0:9f:2b:c7:f7:01:97:
65:a7:02:e1:df:cc:1d:52:6e:46:ec:b2:2e:e9:a4:0d:89:8e:
ba:69:0b:3d:a9:5a:68:1a:fa:9d:9b:11:d1:92:e4:b0:ab:06:
71:1a:0c:dc:63:5c:89:69:61:97:43:a4:78:8e:00:b7:54:a9:
4c:4d:45:6c:d5:66:15:ea:3a:d8:2c:77:15:b7:a4:67:ff:61:
55:4c:e9:ec:dc:99:fa:71:33:ce:74:a5:62:62:8a:85:fb:25:
bc:c8:21:00:a8:0b:24:67

View File

@@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIDWajU0PyhYKeulfy/luNtkAve7DkwQ01bXJ97zbxB66oAoGCCqGSM49
AwEHoUQDQgAEAJz0yAAXAwEmOhTRkjXxwgedbWO6gobYM3lWszrS68G8QSzhXR6A
mQ3IzZDimnTTXO7XhVylDT8SLzE44/Epmw==
-----END EC PRIVATE KEY-----

View File

@@ -0,0 +1,14 @@
read EC key
Private-Key: (256 bit)
priv:
35:9a:8d:4d:0f:ca:16:0a:7a:e9:5f:cb:f9:6e:36:
d9:00:bd:ee:c3:93:04:34:d5:b5:c9:f7:bc:db:c4:
1e:ba
pub:
04:00:9c:f4:c8:00:17:03:01:26:3a:14:d1:92:35:
f1:c2:07:9d:6d:63:ba:82:86:d8:33:79:56:b3:3a:
d2:eb:c1:bc:41:2c:e1:5d:1e:80:99:0d:c8:cd:90:
e2:9a:74:d3:5c:ee:d7:85:5c:a5:0d:3f:12:2f:31:
38:e3:f1:29:9b
ASN1 OID: prime256v1
NIST CURVE: P-256

View File

@@ -0,0 +1,216 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import base64
import datetime
import os.path
import pytest
from mock import MagicMock
from ansible_collections.community.crypto.plugins.module_utils.acme import (
HAS_CURRENT_CRYPTOGRAPHY,
nopad_b64,
write_file,
read_file,
pem_to_der,
_parse_key_openssl,
# _sign_request_openssl,
_parse_key_cryptography,
# _sign_request_cryptography,
_normalize_ip,
openssl_get_csr_identifiers,
cryptography_get_csr_identifiers,
cryptography_get_cert_days,
)
def load_fixture(name):
with open(os.path.join(os.path.dirname(__file__), 'fixtures', name)) as f:
return f.read()
################################################
NOPAD_B64 = [
("", ""),
("\n", "Cg"),
("123", "MTIz"),
("Lorem?ipsum", "TG9yZW0_aXBzdW0"),
]
@pytest.mark.parametrize("value, result", NOPAD_B64)
def test_nopad_b64(value, result):
assert nopad_b64(value.encode('utf-8')) == result
################################################
TEST_TEXT = r"""1234
5678"""
def test_read_file(tmpdir):
fn = tmpdir / 'test.txt'
fn.write(TEST_TEXT)
assert read_file(str(fn), 't') == TEST_TEXT
assert read_file(str(fn), 'b') == TEST_TEXT.encode('utf-8')
def test_write_file(tmpdir):
fn = tmpdir / 'test.txt'
module = MagicMock()
write_file(module, str(fn), TEST_TEXT.encode('utf-8'))
assert fn.read() == TEST_TEXT
################################################
TEST_PEM_DERS = [
(
load_fixture('privatekey_1.pem'),
base64.b64decode('MHcCAQEEIDWajU0PyhYKeulfy/luNtkAve7DkwQ01bXJ97zbxB66oAo'
'GCCqGSM49AwEHoUQDQgAEAJz0yAAXAwEmOhTRkjXxwgedbWO6gobYM3'
'lWszrS68G8QSzhXR6AmQ3IzZDimnTTXO7XhVylDT8SLzE44/Epmw==')
)
]
@pytest.mark.parametrize("pem, der", TEST_PEM_DERS)
def test_pem_to_der(pem, der, tmpdir):
fn = tmpdir / 'test.pem'
fn.write(pem)
assert pem_to_der(str(fn)) == der
################################################
TEST_KEYS = [
(
load_fixture('privatekey_1.pem'),
{
'alg': 'ES256',
'hash': 'sha256',
'jwk': {
'crv': 'P-256',
'kty': 'EC',
'x': 'AJz0yAAXAwEmOhTRkjXxwgedbWO6gobYM3lWszrS68E',
'y': 'vEEs4V0egJkNyM2Q4pp001zu14VcpQ0_Ei8xOOPxKZs',
},
'point_size': 32,
'type': 'ec',
},
load_fixture('privatekey_1.txt'),
)
]
@pytest.mark.parametrize("pem, result, openssl_output", TEST_KEYS)
def test_eckeyparse_openssl(pem, result, openssl_output, tmpdir):
fn = tmpdir / 'test.key'
fn.write(pem)
module = MagicMock()
module.run_command = MagicMock(return_value=(0, openssl_output, 0))
error, key = _parse_key_openssl('openssl', module, key_file=str(fn))
assert error is None
key.pop('key_file')
assert key == result
if HAS_CURRENT_CRYPTOGRAPHY:
@pytest.mark.parametrize("pem, result, dummy", TEST_KEYS)
def test_eckeyparse_cryptography(pem, result, dummy):
module = MagicMock()
error, key = _parse_key_cryptography(module, key_content=pem)
assert error is None
key.pop('key_obj')
assert key == result
################################################
TEST_IPS = [
("0:0:0:0:0:0:0:1", "::1"),
("1::0:2", "1::2"),
("0000:0001:0000:0000:0000:0000:0000:0001", "0:1::1"),
("0000:0001:0000:0000:0001:0000:0000:0001", "0:1::1:0:0:1"),
("0000:0001:0000:0001:0000:0001:0000:0001", "0:1:0:1:0:1:0:1"),
("0.0.0.0", "0.0.0.0"),
("000.001.000.000", "0.1.0.0"),
("2001:d88:ac10:fe01:0:0:0:0", "2001:d88:ac10:fe01::"),
("0000:0000:0000:0000:0000:0000:0000:0000", "::"),
]
@pytest.mark.parametrize("ip, result", TEST_IPS)
def test_normalize_ip(ip, result):
assert _normalize_ip(ip) == result
################################################
TEST_CSRS = [
(
load_fixture('csr_1.pem'),
set([
('dns', 'ansible.com'),
('dns', 'example.com'),
('dns', 'example.org')
]),
load_fixture('csr_1.txt'),
),
(
load_fixture('csr_2.pem'),
set([
('dns', 'ansible.com'),
('ip', '127.0.0.1'),
('ip', '::1'),
('ip', '2001:d88:ac10:fe01::'),
('ip', '2001:1234:5678:abcd:9876:5432:10fe:dcba')
]),
load_fixture('csr_2.txt'),
),
]
@pytest.mark.parametrize("csr, result, openssl_output", TEST_CSRS)
def test_csridentifiers_openssl(csr, result, openssl_output, tmpdir):
fn = tmpdir / 'test.csr'
fn.write(csr)
module = MagicMock()
module.run_command = MagicMock(return_value=(0, openssl_output, 0))
identifiers = openssl_get_csr_identifiers('openssl', module, str(fn))
assert identifiers == result
if HAS_CURRENT_CRYPTOGRAPHY:
@pytest.mark.parametrize("csr, result, openssl_output", TEST_CSRS)
def test_csridentifiers_cryptography(csr, result, openssl_output, tmpdir):
fn = tmpdir / 'test.csr'
fn.write(csr)
module = MagicMock()
identifiers = cryptography_get_csr_identifiers(module, str(fn))
assert identifiers == result
################################################
TEST_CERT = load_fixture("cert_1.pem")
TEST_CERT_DAYS = [
(datetime.datetime(2018, 11, 15, 1, 2, 3), 11),
(datetime.datetime(2018, 11, 25, 15, 20, 0), 1),
(datetime.datetime(2018, 11, 25, 15, 30, 0), 0),
]
if HAS_CURRENT_CRYPTOGRAPHY:
@pytest.mark.parametrize("now, expected_days", TEST_CERT_DAYS)
def test_certdays_cryptography(now, expected_days, tmpdir):
fn = tmpdir / 'test-cert.pem'
fn.write(TEST_CERT)
module = MagicMock()
days = cryptography_get_cert_days(module, str(fn), now=now)
assert days == expected_days

View File

@@ -0,0 +1,69 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import json
import sys
from io import BytesIO
import pytest
import ansible.module_utils.basic
from ansible.module_utils.six import PY3, string_types
from ansible.module_utils._text import to_bytes
from ansible.module_utils.common._collections_compat import MutableMapping
@pytest.fixture
def stdin(mocker, request):
old_args = ansible.module_utils.basic._ANSIBLE_ARGS
ansible.module_utils.basic._ANSIBLE_ARGS = None
old_argv = sys.argv
sys.argv = ['ansible_unittest']
if isinstance(request.param, string_types):
args = request.param
elif isinstance(request.param, MutableMapping):
if 'ANSIBLE_MODULE_ARGS' not in request.param:
request.param = {'ANSIBLE_MODULE_ARGS': request.param}
if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']:
request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp'
if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']:
request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False
args = json.dumps(request.param)
else:
raise Exception('Malformed data to the stdin pytest fixture')
fake_stdin = BytesIO(to_bytes(args, errors='surrogate_or_strict'))
if PY3:
mocker.patch('ansible.module_utils.basic.sys.stdin', mocker.MagicMock())
mocker.patch('ansible.module_utils.basic.sys.stdin.buffer', fake_stdin)
else:
mocker.patch('ansible.module_utils.basic.sys.stdin', fake_stdin)
yield fake_stdin
ansible.module_utils.basic._ANSIBLE_ARGS = old_args
sys.argv = old_argv
@pytest.fixture
def am(stdin, request):
old_args = ansible.module_utils.basic._ANSIBLE_ARGS
ansible.module_utils.basic._ANSIBLE_ARGS = None
old_argv = sys.argv
sys.argv = ['ansible_unittest']
argspec = {}
if hasattr(request, 'param'):
if isinstance(request.param, dict):
argspec = request.param
am = ansible.module_utils.basic.AnsibleModule(
argument_spec=argspec,
)
am._name = 'ansible_unittest'
yield am
ansible.module_utils.basic._ANSIBLE_ARGS = old_args
sys.argv = old_argv

View File

View File

@@ -0,0 +1,28 @@
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
import json
import pytest
from ansible.module_utils.six import string_types
from ansible.module_utils._text import to_bytes
from ansible.module_utils.common._collections_compat import MutableMapping
@pytest.fixture
def patch_ansible_module(request, mocker):
if isinstance(request.param, string_types):
args = request.param
elif isinstance(request.param, MutableMapping):
if 'ANSIBLE_MODULE_ARGS' not in request.param:
request.param = {'ANSIBLE_MODULE_ARGS': request.param}
if '_ansible_remote_tmp' not in request.param['ANSIBLE_MODULE_ARGS']:
request.param['ANSIBLE_MODULE_ARGS']['_ansible_remote_tmp'] = '/tmp'
if '_ansible_keep_remote_files' not in request.param['ANSIBLE_MODULE_ARGS']:
request.param['ANSIBLE_MODULE_ARGS']['_ansible_keep_remote_files'] = False
args = json.dumps(request.param)
else:
raise Exception('Malformed data to the patch_ansible_module pytest fixture')
mocker.patch('ansible.module_utils.basic._ANSIBLE_ARGS', to_bytes(args))

View File

View File

@@ -0,0 +1,305 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from ansible_collections.community.crypto.plugins.modules import luks_device
from ansible_collections.community.crypto.tests.unit.compat.mock import patch
from ansible.module_utils import basic
class DummyModule(object):
# module to mock AnsibleModule class
def __init__(self):
self.params = dict()
def fail_json(self, msg=""):
raise ValueError(msg)
def get_bin_path(self, command, dummy):
return command
# ===== Handler & CryptHandler methods tests =====
def test_generate_luks_name(monkeypatch):
module = DummyModule()
monkeypatch.setattr(luks_device.Handler, "_run_command",
lambda x, y: [0, "UUID", ""])
crypt = luks_device.CryptHandler(module)
assert crypt.generate_luks_name("/dev/dummy") == "luks-UUID"
def test_get_container_name_by_device(monkeypatch):
module = DummyModule()
monkeypatch.setattr(luks_device.Handler, "_run_command",
lambda x, y: [0, "crypt container_name", ""])
crypt = luks_device.CryptHandler(module)
assert crypt.get_container_name_by_device("/dev/dummy") == "container_name"
def test_get_container_device_by_name(monkeypatch):
module = DummyModule()
monkeypatch.setattr(luks_device.Handler, "_run_command",
lambda x, y: [0, "device: /dev/luksdevice", ""])
crypt = luks_device.CryptHandler(module)
assert crypt.get_container_device_by_name("dummy") == "/dev/luksdevice"
def test_run_luks_remove(monkeypatch):
def run_command_check(self, command):
# check that wipefs command is actually called
assert command[0] == "wipefs"
return [0, "", ""]
module = DummyModule()
monkeypatch.setattr(luks_device.CryptHandler,
"get_container_name_by_device",
lambda x, y: None)
monkeypatch.setattr(luks_device.Handler,
"_run_command",
run_command_check)
crypt = luks_device.CryptHandler(module)
crypt.run_luks_remove("dummy")
# ===== ConditionsHandler methods data and tests =====
# device, key, passphrase, state, is_luks, label, expected
LUKS_CREATE_DATA = (
("dummy", "key", None, "present", False, None, True),
(None, "key", None, "present", False, None, False),
(None, "key", None, "present", False, "labelName", True),
("dummy", None, None, "present", False, None, False),
("dummy", "key", None, "absent", False, None, False),
("dummy", "key", None, "opened", True, None, False),
("dummy", "key", None, "closed", True, None, False),
("dummy", "key", None, "present", True, None, False),
("dummy", None, "foo", "present", False, None, True),
(None, None, "bar", "present", False, None, False),
(None, None, "baz", "present", False, "labelName", True),
("dummy", None, None, "present", False, None, False),
("dummy", None, "quz", "absent", False, None, False),
("dummy", None, "qux", "opened", True, None, False),
("dummy", None, "quux", "closed", True, None, False),
("dummy", None, "corge", "present", True, None, False))
# device, state, is_luks, expected
LUKS_REMOVE_DATA = (
("dummy", "absent", True, True),
(None, "absent", True, False),
("dummy", "present", True, False),
("dummy", "absent", False, False))
# device, key, passphrase, state, name, name_by_dev, expected
LUKS_OPEN_DATA = (
("dummy", "key", None, "present", "name", None, False),
("dummy", "key", None, "absent", "name", None, False),
("dummy", "key", None, "closed", "name", None, False),
("dummy", "key", None, "opened", "name", None, True),
(None, "key", None, "opened", "name", None, False),
("dummy", None, None, "opened", "name", None, False),
("dummy", "key", None, "opened", "name", "name", False),
("dummy", "key", None, "opened", "beer", "name", "exception"),
("dummy", None, "foo", "present", "name", None, False),
("dummy", None, "bar", "absent", "name", None, False),
("dummy", None, "baz", "closed", "name", None, False),
("dummy", None, "qux", "opened", "name", None, True),
(None, None, "quux", "opened", "name", None, False),
("dummy", None, None, "opened", "name", None, False),
("dummy", None, "quuz", "opened", "name", "name", False),
("dummy", None, "corge", "opened", "beer", "name", "exception"))
# device, dev_by_name, name, name_by_dev, state, label, expected
LUKS_CLOSE_DATA = (
("dummy", "dummy", "name", "name", "present", None, False),
("dummy", "dummy", "name", "name", "absent", None, False),
("dummy", "dummy", "name", "name", "opened", None, False),
("dummy", "dummy", "name", "name", "closed", None, True),
(None, "dummy", "name", "name", "closed", None, True),
("dummy", "dummy", None, "name", "closed", None, True),
(None, "dummy", None, "name", "closed", None, False))
# device, key, passphrase, new_key, new_passphrase, state, label, expected
LUKS_ADD_KEY_DATA = (
("dummy", "key", None, "new_key", None, "present", None, True),
(None, "key", None, "new_key", None, "present", "labelName", True),
(None, "key", None, "new_key", None, "present", None, False),
("dummy", None, None, "new_key", None, "present", None, False),
("dummy", "key", None, None, None, "present", None, False),
("dummy", "key", None, "new_key", None, "absent", None, "exception"),
("dummy", None, "pass", "new_key", None, "present", None, True),
(None, None, "pass", "new_key", None, "present", "labelName", True),
("dummy", "key", None, None, "new_pass", "present", None, True),
(None, "key", None, None, "new_pass", "present", "labelName", True),
(None, "key", None, None, "new_pass", "present", None, False),
("dummy", None, None, None, "new_pass", "present", None, False),
("dummy", "key", None, None, None, "present", None, False),
("dummy", "key", None, None, "new_pass", "absent", None, "exception"),
("dummy", None, "pass", None, "new_pass", "present", None, True),
(None, None, "pass", None, "new_pass", "present", "labelName", True))
# device, remove_key, remove_passphrase, state, label, expected
LUKS_REMOVE_KEY_DATA = (
("dummy", "key", None, "present", None, True),
(None, "key", None, "present", None, False),
(None, "key", None, "present", "labelName", True),
("dummy", None, None, "present", None, False),
("dummy", "key", None, "absent", None, "exception"),
("dummy", None, "foo", "present", None, True),
(None, None, "foo", "present", None, False),
(None, None, "foo", "present", "labelName", True),
("dummy", None, None, "present", None, False),
("dummy", None, "foo", "absent", None, "exception"))
@pytest.mark.parametrize("device, keyfile, passphrase, state, is_luks, " +
"label, expected",
((d[0], d[1], d[2], d[3], d[4], d[5], d[6])
for d in LUKS_CREATE_DATA))
def test_luks_create(device, keyfile, passphrase, state, is_luks, label,
expected, monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["keyfile"] = keyfile
module.params["passphrase"] = passphrase
module.params["state"] = state
module.params["label"] = label
monkeypatch.setattr(luks_device.CryptHandler, "is_luks",
lambda x, y: is_luks)
crypt = luks_device.CryptHandler(module)
if device is None:
monkeypatch.setattr(luks_device.Handler, "get_device_by_label",
lambda x, y: [0, "/dev/dummy", ""])
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_create() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, state, is_luks, expected",
((d[0], d[1], d[2], d[3])
for d in LUKS_REMOVE_DATA))
def test_luks_remove(device, state, is_luks, expected, monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["state"] = state
monkeypatch.setattr(luks_device.CryptHandler, "is_luks",
lambda x, y: is_luks)
crypt = luks_device.CryptHandler(module)
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_remove() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, keyfile, passphrase, state, name, "
"name_by_dev, expected",
((d[0], d[1], d[2], d[3], d[4], d[5], d[6])
for d in LUKS_OPEN_DATA))
def test_luks_open(device, keyfile, passphrase, state, name, name_by_dev,
expected, monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["keyfile"] = keyfile
module.params["passphrase"] = passphrase
module.params["state"] = state
module.params["name"] = name
monkeypatch.setattr(luks_device.CryptHandler,
"get_container_name_by_device",
lambda x, y: name_by_dev)
monkeypatch.setattr(luks_device.CryptHandler,
"get_container_device_by_name",
lambda x, y: device)
monkeypatch.setattr(luks_device.Handler, "_run_command",
lambda x, y: [0, device, ""])
crypt = luks_device.CryptHandler(module)
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_open() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, dev_by_name, name, name_by_dev, "
"state, label, expected",
((d[0], d[1], d[2], d[3], d[4], d[5], d[6])
for d in LUKS_CLOSE_DATA))
def test_luks_close(device, dev_by_name, name, name_by_dev, state,
label, expected, monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["name"] = name
module.params["state"] = state
module.params["label"] = label
monkeypatch.setattr(luks_device.CryptHandler,
"get_container_name_by_device",
lambda x, y: name_by_dev)
monkeypatch.setattr(luks_device.CryptHandler,
"get_container_device_by_name",
lambda x, y: dev_by_name)
crypt = luks_device.CryptHandler(module)
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_close() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, keyfile, passphrase, new_keyfile, " +
"new_passphrase, state, label, expected",
((d[0], d[1], d[2], d[3], d[4], d[5], d[6], d[7])
for d in LUKS_ADD_KEY_DATA))
def test_luks_add_key(device, keyfile, passphrase, new_keyfile, new_passphrase,
state, label, expected, monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["keyfile"] = keyfile
module.params["passphrase"] = passphrase
module.params["new_keyfile"] = new_keyfile
module.params["new_passphrase"] = new_passphrase
module.params["state"] = state
module.params["label"] = label
monkeypatch.setattr(luks_device.Handler, "get_device_by_label",
lambda x, y: [0, "/dev/dummy", ""])
try:
conditions = luks_device.ConditionsHandler(module, module)
assert conditions.luks_add_key() == expected
except ValueError:
assert expected == "exception"
@pytest.mark.parametrize("device, remove_keyfile, remove_passphrase, state, " +
"label, expected",
((d[0], d[1], d[2], d[3], d[4], d[5])
for d in LUKS_REMOVE_KEY_DATA))
def test_luks_remove_key(device, remove_keyfile, remove_passphrase, state,
label, expected, monkeypatch):
module = DummyModule()
module.params["device"] = device
module.params["remove_keyfile"] = remove_keyfile
module.params["remove_passphrase"] = remove_passphrase
module.params["state"] = state
module.params["label"] = label
monkeypatch.setattr(luks_device.Handler, "get_device_by_label",
lambda x, y: [0, "/dev/dummy", ""])
monkeypatch.setattr(luks_device.Handler, "_run_command",
lambda x, y: [0, device, ""])
crypt = luks_device.CryptHandler(module)
try:
conditions = luks_device.ConditionsHandler(module, crypt)
assert conditions.luks_remove_key() == expected
except ValueError:
assert expected == "exception"

View File

@@ -0,0 +1,47 @@
import json
from ansible_collections.community.crypto.tests.unit.compat import unittest
from ansible_collections.community.crypto.tests.unit.compat.mock import patch
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
def set_module_args(args):
if '_ansible_remote_tmp' not in args:
args['_ansible_remote_tmp'] = '/tmp'
if '_ansible_keep_remote_files' not in args:
args['_ansible_keep_remote_files'] = False
args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
basic._ANSIBLE_ARGS = to_bytes(args)
class AnsibleExitJson(Exception):
pass
class AnsibleFailJson(Exception):
pass
def exit_json(*args, **kwargs):
if 'changed' not in kwargs:
kwargs['changed'] = False
raise AnsibleExitJson(kwargs)
def fail_json(*args, **kwargs):
kwargs['failed'] = True
raise AnsibleFailJson(kwargs)
class ModuleTestCase(unittest.TestCase):
def setUp(self):
self.mock_module = patch.multiple(basic.AnsibleModule, exit_json=exit_json, fail_json=fail_json)
self.mock_module.start()
self.mock_sleep = patch('time.sleep')
self.mock_sleep.start()
set_module_args({})
self.addCleanup(self.mock_module.stop)
self.addCleanup(self.mock_sleep.stop)

View File

@@ -0,0 +1,42 @@
boto3
placebo
pycrypto
passlib
pypsrp
python-memcached
pytz
pyvmomi
redis
requests
setuptools > 0.6 # pytest-xdist installed via requirements does not work with very old setuptools (sanity_ok)
unittest2 ; python_version < '2.7'
importlib ; python_version < '2.7'
netaddr
ipaddress
netapp-lib
solidfire-sdk-python
# requirements for F5 specific modules
f5-sdk ; python_version >= '2.7'
f5-icontrol-rest ; python_version >= '2.7'
deepdiff
# requirement for Fortinet specific modules
pyFMG
# requirement for aci_rest module
xmljson
# requirement for winrm connection plugin tests
pexpect
# requirement for the linode module
linode-python # APIv3
linode_api4 ; python_version > '2.6' # APIv4
# requirement for the gitlab module
python-gitlab
httmock
# requirment for kubevirt modules
openshift ; python_version >= '2.7'