Files
kubernetes.core/plugins/modules/k8s_log.py
Yuriy Novostavskiy 6360763098 minor: doc: use the same style of version_added across repo (#703)
minor(doc): use the same style of version_added across repo

SUMMARY
Currently is no single style of version_added, in some places it's unquoted, somewhere single quote is used, in another places it's double quoted. Moreover, some file had different styles in one single file.
The aim of this PR is to update whole repo to single style for version_added
ISSUE TYPE

Docs Pull Request

COMPONENT NAME
kustomize
helm
helm_info
helm_plugin
helm_plugin_info
helm_pull
helm_repository
helm_template
k8s_cluster_info
k8s_cp
k8s_drain
k8s_exec
k8s_log
k8s_rollback
k8s_taint
ADDITIONAL INFORMATION
The same style is proposed as used in amazon.aws collections

Reviewed-by: Kelv Gooding
Reviewed-by: Alina Buzachis
Reviewed-by: Mike Graves <mgraves@redhat.com>
2024-05-14 15:50:12 +00:00

364 lines
11 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# (c) 2019, Fabian von Feilitzsch <@fabianvf>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = r"""
module: k8s_log
short_description: Fetch logs from Kubernetes resources
version_added: 0.10.0
author:
- "Fabian von Feilitzsch (@fabianvf)"
description:
- Use the Kubernetes Python client to perform read operations on K8s log endpoints.
- Authenticate using either a config file, certificates, password or token.
- Supports check mode.
- Analogous to `kubectl logs` or `oc logs`
extends_documentation_fragment:
- kubernetes.core.k8s_auth_options
- kubernetes.core.k8s_name_options
options:
kind:
description:
- Use to specify an object model.
- Use in conjunction with I(api_version), I(name), and I(namespace) to identify a specific object.
- If using I(label_selectors), cannot be overridden.
type: str
default: Pod
name:
description:
- Use to specify an object name.
- Use in conjunction with I(api_version), I(kind) and I(namespace) to identify a specific object.
- Only one of I(name) or I(label_selectors) may be provided.
type: str
label_selectors:
description:
- List of label selectors to use to filter results
- Only one of I(name) or I(label_selectors) may be provided.
type: list
elements: str
default: []
container:
description:
- Use to specify the container within a pod to grab the log from.
- If there is only one container, this will default to that container.
- If there is more than one container, this option is required or set I(all_containers) to C(true).
- mutually exclusive with C(all_containers).
required: no
type: str
since_seconds:
description:
- A relative time in seconds before the current time from which to show logs.
required: no
type: str
version_added: 2.2.0
previous:
description:
- If C(true), print the logs for the previous instance of the container in a pod if it exists.
required: no
type: bool
default: False
version_added: 2.4.0
tail_lines:
description:
- A number of lines from the end of the logs to retrieve.
required: no
type: int
version_added: 2.4.0
all_containers:
description:
- If set to C(true), retrieve all containers' logs in the pod(s).
- mutually exclusive with C(container).
type: bool
version_added: 2.4.0
requirements:
- "python >= 3.9"
- "kubernetes >= 24.2.0"
- "PyYAML >= 3.11"
"""
EXAMPLES = r"""
- name: Get a log from a Pod
kubernetes.core.k8s_log:
name: example-1
namespace: testing
register: log
# This will get the log from the first Pod found matching the selector
- name: Log a Pod matching a label selector
kubernetes.core.k8s_log:
namespace: testing
label_selectors:
- app=example
register: log
# This will get the log from a single Pod managed by this Deployment
- name: Get a log from a Deployment
kubernetes.core.k8s_log:
api_version: apps/v1
kind: Deployment
namespace: testing
name: example
since_seconds: "4000"
register: log
# This will get the log from a single Pod managed by this DeploymentConfig
- name: Get a log from a DeploymentConfig
kubernetes.core.k8s_log:
api_version: apps.openshift.io/v1
kind: DeploymentConfig
namespace: testing
name: example
tail_lines: 100
register: log
# This will get the logs from all containers in Pod
- name: Get the logs from all containers in pod
kubernetes.core.k8s_log:
namespace: testing
name: some-pod
all_containers: true
"""
RETURN = r"""
log:
type: str
description:
- The text log of the object
returned: success
log_lines:
type: list
description:
- The log of the object, split on newlines
returned: success
"""
import copy
import json
from ansible_collections.kubernetes.core.plugins.module_utils.ansiblemodule import (
AnsibleModule,
)
from ansible_collections.kubernetes.core.plugins.module_utils.args_common import (
AUTH_ARG_SPEC,
NAME_ARG_SPEC,
)
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.client import (
get_api_client,
)
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.core import (
AnsibleK8SModule,
)
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.exceptions import (
CoreException,
)
from ansible_collections.kubernetes.core.plugins.module_utils.k8s.service import (
K8sService,
)
try:
from kubernetes.client.exceptions import ApiException
except ImportError:
# ImportError are managed by the common module already.
pass
def argspec():
args = copy.deepcopy(AUTH_ARG_SPEC)
args.update(NAME_ARG_SPEC)
args.update(
dict(
kind=dict(type="str", default="Pod"),
container=dict(),
since_seconds=dict(),
label_selectors=dict(type="list", elements="str", default=[]),
previous=dict(type="bool", default=False),
tail_lines=dict(type="int"),
all_containers=dict(type="bool"),
)
)
return args
def get_exception_message(exc):
try:
d = json.loads(exc.body.decode("utf8"))
return d["message"]
except Exception:
return exc
def list_containers_in_pod(svc, resource, namespace, name):
try:
result = svc.client.get(resource, name=name, namespace=namespace)
containers = [
c["name"] for c in result.to_dict()["status"]["containerStatuses"]
]
return containers
except Exception as exc:
raise CoreException(
"Unable to retrieve log from Pod due to: {0}".format(
get_exception_message(exc)
)
)
def execute_module(svc, params):
name = params.get("name")
namespace = params.get("namespace")
label_selector = ",".join(params.get("label_selectors", {}))
if name and label_selector:
raise CoreException("Only one of name or label_selectors can be provided")
resource = svc.find_resource(params["kind"], params["api_version"], fail=True)
v1_pods = svc.find_resource("Pod", "v1", fail=True)
if "log" not in resource.subresources:
if not name:
raise CoreException(
"name must be provided for resources that do not support the log subresource"
)
instance = resource.get(name=name, namespace=namespace)
label_selector = ",".join(extract_selectors(instance))
resource = v1_pods
if label_selector:
instances = v1_pods.get(namespace=namespace, label_selector=label_selector)
if not instances.items:
raise CoreException(
"No pods in namespace {0} matched selector {1}".format(
namespace, label_selector
)
)
# This matches the behavior of kubectl when logging pods via a selector
name = instances.items[0].metadata.name
resource = v1_pods
if "base" not in resource.log.urls and not name:
raise CoreException(
"name must be provided for resources that do not support namespaced base url"
)
kwargs = {}
if params.get("container"):
kwargs["query_params"] = {"container": params["container"]}
if params.get("since_seconds"):
kwargs.setdefault("query_params", {}).update(
{"sinceSeconds": params["since_seconds"]}
)
if params.get("previous"):
kwargs.setdefault("query_params", {}).update({"previous": params["previous"]})
if params.get("tail_lines"):
kwargs.setdefault("query_params", {}).update(
{"tailLines": params["tail_lines"]}
)
pod_containers = [None]
if params.get("all_containers"):
pod_containers = list_containers_in_pod(svc, resource, namespace, name)
log = ""
try:
for container in pod_containers:
if container is not None:
kwargs.setdefault("query_params", {}).update({"container": container})
response = resource.log.get(
name=name, namespace=namespace, serialize=False, **kwargs
)
log += response.data.decode("utf8")
except ApiException as exc:
if exc.reason == "Not Found":
raise CoreException("Pod {0}/{1} not found.".format(namespace, name))
raise CoreException(
"Unable to retrieve log from Pod due to: {0}".format(
get_exception_message(exc)
)
)
return {"changed": False, "log": log, "log_lines": log.split("\n")}
def extract_selectors(instance):
# Parses selectors on an object based on the specifications documented here:
# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
selectors = []
if not instance.spec.selector:
raise CoreException(
"{0} {1} does not support the log subresource directly, and no Pod selector was found on the object".format(
"/".join(instance.group, instance.apiVersion), instance.kind
)
)
if not (
instance.spec.selector.matchLabels or instance.spec.selector.matchExpressions
):
# A few resources (like DeploymentConfigs) just use a simple key:value style instead of supporting expressions
for k, v in dict(instance.spec.selector).items():
selectors.append("{0}={1}".format(k, v))
return selectors
if instance.spec.selector.matchLabels:
for k, v in dict(instance.spec.selector.matchLabels).items():
selectors.append("{0}={1}".format(k, v))
if instance.spec.selector.matchExpressions:
for expression in instance.spec.selector.matchExpressions:
operator = expression.operator
if operator == "Exists":
selectors.append(expression.key)
elif operator == "DoesNotExist":
selectors.append("!{0}".format(expression.key))
elif operator in ["In", "NotIn"]:
selectors.append(
"{key} {operator} {values}".format(
key=expression.key,
operator=operator.lower(),
values="({0})".format(", ".join(expression.values)),
)
)
else:
raise CoreException(
"The k8s_log module does not support the {0} matchExpression operator".format(
operator.lower()
)
)
return selectors
def main():
module = AnsibleK8SModule(
module_class=AnsibleModule,
argument_spec=argspec(),
supports_check_mode=True,
mutually_exclusive=[("container", "all_containers")],
)
try:
client = get_api_client(module=module)
svc = K8sService(client, module)
result = execute_module(svc, module.params)
module.exit_json(**result)
except CoreException as e:
module.fail_from_exception(e)
if __name__ == "__main__":
main()