# Copyright [2021] [Red Hat, Inc.] # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import, division, print_function __metaclass__ = type import os from tempfile import TemporaryFile, NamedTemporaryFile from select import select from abc import ABCMeta, abstractmethod import tarfile # from ansible_collections.kubernetes.core.plugins.module_utils.ansiblemodule import AnsibleModule from ansible.module_utils._text import to_native try: from kubernetes.client.api import core_v1_api from kubernetes.stream import stream from kubernetes.stream.ws_client import ( STDOUT_CHANNEL, STDERR_CHANNEL, ERROR_CHANNEL, ABNF, ) except ImportError: pass try: import yaml except ImportError: # ImportError are managed by the common module already. pass class K8SCopy(metaclass=ABCMeta): def __init__(self, module, client): self.client = client self.module = module self.api_instance = core_v1_api.CoreV1Api(client.client) self.local_path = module.params.get("local_path") self.name = module.params.get("pod") self.namespace = module.params.get("namespace") self.remote_path = module.params.get("remote_path") self.content = module.params.get("content") self.no_preserve = module.params.get("no_preserve") self.container_arg = {} if module.params.get("container"): self.container_arg["container"] = module.params.get("container") @abstractmethod def run(self): pass class K8SCopyFromPod(K8SCopy): """ Copy files/directory from Pod into local filesystem """ def __init__(self, module, client): super(K8SCopyFromPod, self).__init__(module, client) self.is_remote_path_dir = None self.files_to_copy = list() def list_remote_files(self): """ This method will check if the remote path is a dir or file if it is a directory the file list will be updated accordingly """ try: find_cmd = ["find", self.remote_path, "-type", "f", "-name", "*"] response = stream( self.api_instance.connect_get_namespaced_pod_exec, self.name, self.namespace, command=find_cmd, stdout=True, stderr=True, stdin=False, tty=False, _preload_content=False, **self.container_arg ) except Exception as e: self.module.fail_json( msg="Failed to execute on pod {0}/{1} due to : {2}".format( self.namespace, self.name, to_native(e) ) ) stderr = [] while response.is_open(): response.update(timeout=1) if response.peek_stdout(): self.files_to_copy.extend( response.read_stdout().rstrip("\n").split("\n") ) if response.peek_stderr(): err = response.read_stderr() if "No such file or directory" in err: self.module.fail_json( msg="{0} does not exist in remote pod filesystem".format( self.remote_path ) ) stderr.append(err) error = response.read_channel(ERROR_CHANNEL) response.close() error = yaml.safe_load(error) if error["status"] != "Success": self.module.fail_json( msg="Failed to execute on Pod due to: {0}".format(error) ) def read(self): self.stdout = None self.stderr = None if self.response.is_open(): if not self.response.sock.connected: self.response._connected = False else: ret, out, err = select((self.response.sock.sock,), (), (), 0) if ret: code, frame = self.response.sock.recv_data_frame(True) if code == ABNF.OPCODE_CLOSE: self.response._connected = False elif ( code in (ABNF.OPCODE_BINARY, ABNF.OPCODE_TEXT) and len(frame.data) > 1 ): channel = frame.data[0] content = frame.data[1:] if content: if channel == STDOUT_CHANNEL: self.stdout = content elif channel == STDERR_CHANNEL: self.stderr = content.decode("utf-8", "replace") def copy(self): is_remote_path_dir = ( len(self.files_to_copy) > 1 or self.files_to_copy[0] != self.remote_path ) relpath_start = self.remote_path if is_remote_path_dir and os.path.isdir(self.local_path): relpath_start = os.path.dirname(self.remote_path) for remote_file in self.files_to_copy: dest_file = self.local_path if is_remote_path_dir: dest_file = os.path.join( self.local_path, os.path.relpath(remote_file, start=relpath_start) ) # create directory to copy file in os.makedirs(os.path.dirname(dest_file), exist_ok=True) pod_command = ["cat", remote_file] self.response = stream( self.api_instance.connect_get_namespaced_pod_exec, self.name, self.namespace, command=pod_command, stderr=True, stdin=True, stdout=True, tty=False, _preload_content=False, **self.container_arg ) errors = [] with open(dest_file, "wb") as fh: while self.response._connected: self.read() if self.stdout: fh.write(self.stdout) if self.stderr: errors.append(self.stderr) if errors: self.module.fail_json( msg="Failed to copy file from Pod: {0}".format("".join(errors)) ) self.module.exit_json( changed=True, result="{0} successfully copied locally into {1}".format( self.remote_path, self.local_path ), ) def run(self): self.list_remote_files() if self.files_to_copy == []: self.module.exit_json( changed=False, warning="No file found from directory '{0}' into remote Pod.".format( self.remote_path ), ) self.copy() class K8SCopyToPod(K8SCopy): """ Copy files/directory from local filesystem into remote Pod """ def __init__(self, module, client): super(K8SCopyToPod, self).__init__(module, client) self.files_to_copy = list() def run_from_pod(self, command): response = stream( self.api_instance.connect_get_namespaced_pod_exec, self.name, self.namespace, command=command, stderr=True, stdin=False, stdout=True, tty=False, _preload_content=False, **self.container_arg ) errors = [] while response.is_open(): response.update(timeout=1) if response.peek_stderr(): errors.append(response.read_stderr()) response.close() err = response.read_channel(ERROR_CHANNEL) err = yaml.safe_load(err) response.close() if err["status"] != "Success": self.module.fail_json( msg="Failed to run {0} on Pod.".format(command), errors=errors ) def is_remote_path_dir(self): pod_command = ["test", "-d", self.remote_path] response = stream( self.api_instance.connect_get_namespaced_pod_exec, self.name, self.namespace, command=pod_command, stdout=True, stderr=True, stdin=False, tty=False, _preload_content=False, **self.container_arg ) while response.is_open(): response.update(timeout=1) err = response.read_channel(ERROR_CHANNEL) err = yaml.safe_load(err) response.close() if err["status"] == "Success": return True return False def close_temp_file(self): if self.named_temp_file: self.named_temp_file.close() def run(self): # remove trailing slash from destination path dest_file = self.remote_path.rstrip("/") src_file = self.local_path self.named_temp_file = None if self.content: self.named_temp_file = NamedTemporaryFile(mode="w") self.named_temp_file.write(self.content) self.named_temp_file.flush() src_file = self.named_temp_file.name else: if not os.path.exists(self.local_path): self.module.fail_json( msg="{0} does not exist in local filesystem".format(self.local_path) ) if not os.access(self.local_path, os.R_OK): self.module.fail_json(msg="{0} not readable".format(self.local_path)) if self.is_remote_path_dir(): if self.content: self.module.fail_json( msg="When content is specified, remote path should not be an existing directory" ) else: dest_file = os.path.join(dest_file, os.path.basename(src_file)) if self.no_preserve: tar_command = [ "tar", "--no-same-permissions", "--no-same-owner", "-xmf", "-", ] else: tar_command = ["tar", "-xmf", "-"] if dest_file.startswith("/"): tar_command.extend(["-C", "/"]) response = stream( self.api_instance.connect_get_namespaced_pod_exec, self.name, self.namespace, command=tar_command, stderr=True, stdin=True, stdout=True, tty=False, _preload_content=False, **self.container_arg ) with TemporaryFile() as tar_buffer: with tarfile.open(fileobj=tar_buffer, mode="w") as tar: tar.add(src_file, dest_file) tar_buffer.seek(0) commands = [] # push command in chunk mode size = 1024 * 1024 while True: data = tar_buffer.read(size) if not data: break commands.append(data) stderr, stdout = [], [] while response.is_open(): if response.peek_stdout(): stdout.append(response.read_stdout().rstrip("\n")) if response.peek_stderr(): stderr.append(response.read_stderr().rstrip("\n")) if commands: cmd = commands.pop(0) response.write_stdin(cmd) else: break response.close() if stderr: self.close_temp_file() self.module.fail_json( command=tar_command, msg="Failed to copy local file/directory into Pod due to: {0}".format( "".join(stderr) ), ) self.close_temp_file() if self.content: self.module.exit_json( changed=True, result="Content successfully copied into {0} on remote Pod".format( self.remote_path ), ) self.module.exit_json( changed=True, result="{0} successfully copied into remote Pod into {1}".format( self.local_path, self.remote_path ), ) def check_pod(k8s_ansible_mixin, module): resource = k8s_ansible_mixin.find_resource("Pod", None, True) namespace = module.params.get("namespace") name = module.params.get("pod") container = module.params.get("container") def _fail(exc): arg = {} if hasattr(exc, "body"): msg = ( "Namespace={0} Kind=Pod Name={1}: Failed requested object: {2}".format( namespace, name, exc.body ) ) else: msg = to_native(exc) for attr in ["status", "reason"]: if hasattr(exc, attr): arg[attr] = getattr(exc, attr) module.fail_json(msg=msg, **arg) try: result = resource.get(name=name, namespace=namespace) containers = [ c["name"] for c in result.to_dict()["status"]["containerStatuses"] ] if container and container not in containers: module.fail_json(msg="Pod has no container {0}".format(container)) return containers except Exception as exc: _fail(exc)