# -*- coding: utf-8 -*- # Copyright (c) 2023, Ansible Project # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = """ name: onepassword_ssh_key author: - Mohammed Babelly (@mohammedbabelly20) requirements: - C(op) 1Password command line utility version 2 or later. short_description: Fetch SSH Keys stored in 1Password version_added: "10.2.1" description: - P(community.general.onepassword_ssh_key#lookup) wraps C(op) command line utility to fetch ssh keys from 1Password. notes: - By default, it returns the private key value in PKCS#8 format, unless 'ssh_format=true' is passed. - The pluging works only for 'SSHKEY' type items. - This plugin requires C(op) version 2 or later. options: _terms: description: Identifier(s) (case-insensitive UUID or name) of item(s) to retrieve. required: true type: list elements: string ssh_format: description: Output key in SSH format if true. Otherwise, outputs in the default format. required: false default: false type: bool extends_documentation_fragment: - community.general.onepassword - community.general.onepassword.lookup """ EXAMPLES = """ - name: Retrieve the private key of ssh key from 1Password ansible.builtin.debug: var: lookup('community.general.onepassword_ssh_key', 'SSH Key', ssh_format=true) """ RETURN = """ _raw: description: Private key of SSH key type: list elements: string """ import json from ansible_collections.community.general.plugins.lookup.onepassword import ( OnePass, OnePassCLIv2, ) from ansible.errors import AnsibleLookupError from ansible.module_utils.common.text.converters import to_bytes from ansible.plugins.lookup import LookupBase class OnePassCLIv2SSHKey(OnePassCLIv2): def _get_raw(self, item_id, vault=None, token=None): args = ["item", "get", item_id, "--format", "json"] if vault is not None: args = [*args, f"--vault={vault}"] if self.service_account_token: if vault is None: raise AnsibleLookupError( "'vault' is required with 'service_account_token'" ) environment_update = { "OP_SERVICE_ACCOUNT_TOKEN": self.service_account_token } return self._run(args, environment_update=environment_update) if token is not None: args = [*args, to_bytes("--session=") + token] return self._run(args) def get_ssh_key(self, item_id, vault=None, token=None, ssh_format=False): _, out, _ = self._get_raw(item_id, vault, token) data = json.loads(out) if data.get("category") != "SSH_KEY": raise AnsibleLookupError(f"Item {item_id} is not SSH Key") for field in data.get("fields", {}): if field.get("id") == "private_key" and field.get("type") == "SSHKEY": return ( field.get("ssh_formats", {}).get("openssh", {}).get("value", "") if ssh_format else field.get("value", "") ) class LookupModule(LookupBase): def run(self, terms, variables=None, **kwargs): self.set_options(var_options=variables, direct=kwargs) ssh_format = kwargs.get("ssh_format") vault = self.get_option("vault") subdomain = self.get_option("subdomain") domain = self.get_option("domain", "1password.com") username = self.get_option("username") secret_key = self.get_option("secret_key") master_password = self.get_option("master_password") service_account_token = self.get_option("service_account_token") account_id = self.get_option("account_id") connect_host = self.get_option("connect_host") connect_token = self.get_option("connect_token") op = OnePass( subdomain=subdomain, domain=domain, username=username, secret_key=secret_key, master_password=master_password, service_account_token=service_account_token, account_id=account_id, connect_host=connect_host, connect_token=connect_token, cli_class=OnePassCLIv2SSHKey, ) op.assert_logged_in() values = [] for term in terms: values.append(op._cli.get_ssh_key(term, vault, ssh_format=ssh_format)) return [values]