mirror of
https://github.com/ansible-collections/community.general.git
synced 2026-05-07 22:02:50 +00:00
Add support for cliconf and netconf plugin (#25093)
* ansible-connection refactor and action plugin changes * Add cliconf plugin for eos, ios, iosxr, junos, nxos, vyos * Add netconf plugin for junos * Add jsonrpc support * Modify network_cli and netconf connection plugin * Fix py3 unit test failure * Fix review comment * Minor fixes * Fix ansible-connection review comments * Fix CI issue * platform_agnostic related changes
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# (c) 2016, Ansible, Inc. <support@ansible.com>
|
||||
# (c) 2017, Ansible, Inc. <support@ansible.com>
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
@@ -33,18 +33,17 @@ import os
|
||||
import shlex
|
||||
import signal
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
import syslog
|
||||
import datetime
|
||||
import logging
|
||||
import errno
|
||||
|
||||
from ansible import constants as C
|
||||
from ansible.module_utils._text import to_bytes, to_native
|
||||
from ansible.module_utils.six import PY3
|
||||
from ansible.module_utils.six.moves import cPickle
|
||||
from ansible.module_utils.connection import send_data, recv_data
|
||||
from ansible.playbook.play_context import PlayContext
|
||||
from ansible.plugins import connection_loader
|
||||
from ansible.utils.path import unfrackpath, makedirs_safe
|
||||
@@ -88,33 +87,11 @@ def do_fork():
|
||||
except OSError as e:
|
||||
sys.exit(1)
|
||||
|
||||
def send_data(s, data):
|
||||
packed_len = struct.pack('!Q', len(data))
|
||||
return s.sendall(packed_len + data)
|
||||
|
||||
def recv_data(s):
|
||||
header_len = 8 # size of a packed unsigned long long
|
||||
data = b""
|
||||
while len(data) < header_len:
|
||||
d = s.recv(header_len - len(data))
|
||||
if not d:
|
||||
return None
|
||||
data += d
|
||||
data_len = struct.unpack('!Q', data[:header_len])[0]
|
||||
data = data[header_len:]
|
||||
while len(data) < data_len:
|
||||
d = s.recv(data_len - len(data))
|
||||
if not d:
|
||||
return None
|
||||
data += d
|
||||
return data
|
||||
|
||||
|
||||
class Server():
|
||||
|
||||
def __init__(self, path, play_context):
|
||||
|
||||
self.path = path
|
||||
def __init__(self, socket_path, play_context):
|
||||
self.socket_path = socket_path
|
||||
self.play_context = play_context
|
||||
|
||||
display.display(
|
||||
@@ -123,135 +100,163 @@ class Server():
|
||||
log_only=True
|
||||
)
|
||||
|
||||
display.display('control socket path is %s' % path, log_only=True)
|
||||
display.display('control socket path is %s' % socket_path, log_only=True)
|
||||
display.display('current working directory is %s' % os.getcwd(), log_only=True)
|
||||
|
||||
self._start_time = datetime.datetime.now()
|
||||
|
||||
display.display("using connection plugin %s" % self.play_context.connection, log_only=True)
|
||||
|
||||
self.conn = connection_loader.get(play_context.connection, play_context, sys.stdin)
|
||||
self.conn._connect()
|
||||
if not self.conn.connected:
|
||||
self.connection = connection_loader.get(play_context.connection, play_context, sys.stdin)
|
||||
self.connection._connect()
|
||||
|
||||
if not self.connection.connected:
|
||||
raise AnsibleConnectionFailure('unable to connect to remote host %s' % self._play_context.remote_addr)
|
||||
|
||||
connection_time = datetime.datetime.now() - self._start_time
|
||||
display.display('connection established to %s in %s' % (play_context.remote_addr, connection_time), log_only=True)
|
||||
|
||||
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.socket.bind(path)
|
||||
self.socket.bind(self.socket_path)
|
||||
self.socket.listen(1)
|
||||
|
||||
signal.signal(signal.SIGALRM, self.alarm_handler)
|
||||
|
||||
def dispatch(self, obj, name, *args, **kwargs):
|
||||
meth = getattr(obj, name, None)
|
||||
if meth:
|
||||
return meth(*args, **kwargs)
|
||||
|
||||
def alarm_handler(self, signum, frame):
|
||||
'''
|
||||
Alarm handler
|
||||
'''
|
||||
# FIXME: this should also set internal flags for other
|
||||
# areas of code to check, so they can terminate
|
||||
# earlier than the socket going back to the accept
|
||||
# call and failing there.
|
||||
#
|
||||
# hooks the connection plugin to handle any cleanup
|
||||
self.dispatch(self.conn, 'alarm_handler', signum, frame)
|
||||
self.socket.close()
|
||||
display.display('local socket is set to listening', log_only=True)
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
while True:
|
||||
# set the alarm, if we don't get an accept before it
|
||||
# goes off we exit (via an exception caused by the socket
|
||||
# getting closed while waiting on accept())
|
||||
# FIXME: is this the best way to exit? as noted above in the
|
||||
# handler we should probably be setting a flag to check
|
||||
# here and in other parts of the code
|
||||
signal.signal(signal.SIGALRM, self.connect_timeout)
|
||||
signal.signal(signal.SIGTERM, self.handler)
|
||||
signal.alarm(C.PERSISTENT_CONNECT_TIMEOUT)
|
||||
try:
|
||||
(s, addr) = self.socket.accept()
|
||||
display.display('incoming request accepted on persistent socket', log_only=True)
|
||||
# clear the alarm
|
||||
# FIXME: potential race condition here between the accept and
|
||||
# time to this call.
|
||||
signal.alarm(0)
|
||||
except:
|
||||
break
|
||||
|
||||
(s, addr) = self.socket.accept()
|
||||
display.display('incoming request accepted on persistent socket', log_only=True)
|
||||
signal.alarm(0)
|
||||
|
||||
while True:
|
||||
data = recv_data(s)
|
||||
if not data:
|
||||
break
|
||||
|
||||
signal.signal(signal.SIGALRM, self.command_timeout)
|
||||
signal.alarm(self.play_context.timeout)
|
||||
|
||||
op = data.split(':')[0]
|
||||
display.display('socket operation is %s' % op, log_only=True)
|
||||
|
||||
method = getattr(self, 'do_%s' % op, None)
|
||||
|
||||
rc = 255
|
||||
try:
|
||||
if data.startswith(b'EXEC: '):
|
||||
display.display("socket operation is EXEC", log_only=True)
|
||||
cmd = data.split(b'EXEC: ')[1]
|
||||
(rc, stdout, stderr) = self.conn.exec_command(cmd)
|
||||
elif data.startswith(b'PUT: ') or data.startswith(b'FETCH: '):
|
||||
(op, src, dst) = shlex.split(to_native(data))
|
||||
stdout = stderr = ''
|
||||
try:
|
||||
if op == 'FETCH:':
|
||||
display.display("socket operation is FETCH", log_only=True)
|
||||
self.conn.fetch_file(src, dst)
|
||||
elif op == 'PUT:':
|
||||
display.display("socket operation is PUT", log_only=True)
|
||||
self.conn.put_file(src, dst)
|
||||
rc = 0
|
||||
except:
|
||||
pass
|
||||
elif data.startswith(b'CONTEXT: '):
|
||||
display.display("socket operation is CONTEXT", log_only=True)
|
||||
pc_data = data.split(b'CONTEXT: ', 1)[1]
|
||||
stdout = stderr = ''
|
||||
|
||||
if PY3:
|
||||
pc_data = cPickle.loads(pc_data, encoding='bytes')
|
||||
else:
|
||||
pc_data = cPickle.loads(pc_data)
|
||||
|
||||
pc = PlayContext()
|
||||
pc.deserialize(pc_data)
|
||||
|
||||
self.dispatch(self.conn, 'update_play_context', pc)
|
||||
continue
|
||||
else:
|
||||
display.display("socket operation is UNKNOWN", log_only=True)
|
||||
stdout = ''
|
||||
stderr = 'Invalid action specified'
|
||||
except:
|
||||
stdout = ''
|
||||
stderr = traceback.format_exc()
|
||||
if not method:
|
||||
stderr = 'Invalid action specified'
|
||||
else:
|
||||
rc, stdout, stderr = method(data)
|
||||
|
||||
signal.alarm(0)
|
||||
|
||||
display.display("socket operation completed with rc %s" % rc, log_only=True)
|
||||
display.display('socket operation completed with rc %s' % rc, log_only=True)
|
||||
|
||||
send_data(s, to_bytes(rc))
|
||||
send_data(s, to_bytes(stdout))
|
||||
send_data(s, to_bytes(stderr))
|
||||
|
||||
s.close()
|
||||
|
||||
except Exception as e:
|
||||
display.display(traceback.format_exc(), log_only=True)
|
||||
# socket.accept() will raise EINTR if the socket.close() is called
|
||||
if e.errno != errno.EINTR:
|
||||
display.display(traceback.format_exc(), log_only=True)
|
||||
|
||||
finally:
|
||||
# when done, close the connection properly and cleanup
|
||||
# the socket file so it can be recreated
|
||||
self.shutdown()
|
||||
end_time = datetime.datetime.now()
|
||||
delta = end_time - self._start_time
|
||||
display.display('shutting down control socket, connection was active for %s secs' % delta, log_only=True)
|
||||
try:
|
||||
self.conn.close()
|
||||
display.display('shutdown local socket, connection was active for %s secs' % delta, log_only=True)
|
||||
|
||||
def connect_timeout(self, signum, frame):
|
||||
display.display('connect timeout triggered, timeout value is %s secs' % C.PERSISTENT_CONNECT_TIMEOUT, log_only=True)
|
||||
self.shutdown()
|
||||
|
||||
def command_timeout(self, signum, frame):
|
||||
display.display('commnad timeout triggered, timeout value is %s secs' % self.play_context.timeout, log_only=True)
|
||||
self.shutdown()
|
||||
|
||||
def handler(self, signum, frame):
|
||||
display.display('signal handler called with signal %s' % signum, log_only=True)
|
||||
self.shutdown()
|
||||
|
||||
def shutdown(self):
|
||||
display.display('shutdown persistent connection requested', log_only=True)
|
||||
|
||||
if not os.path.exists(self.socket_path):
|
||||
display.display('persistent connection is not active', log_only=True)
|
||||
return
|
||||
|
||||
try:
|
||||
if self.socket:
|
||||
display.display('closing local listener', log_only=True)
|
||||
self.socket.close()
|
||||
except Exception as e:
|
||||
pass
|
||||
os.remove(self.path)
|
||||
if self.connection:
|
||||
display.display('closing the connection', log_only=True)
|
||||
self.close()
|
||||
except:
|
||||
pass
|
||||
finally:
|
||||
if os.path.exists(self.socket_path):
|
||||
display.display('removing the local control socket', log_only=True)
|
||||
os.remove(self.socket_path)
|
||||
|
||||
display.display('shutdown complete', log_only=True)
|
||||
|
||||
def do_EXEC(self, data):
|
||||
cmd = data.split(b'EXEC: ')[1]
|
||||
return self.connection.exec_command(cmd)
|
||||
|
||||
def do_PUT(self, data):
|
||||
(op, src, dst) = shlex.split(to_native(data))
|
||||
return self.connection.fetch_file(src, dst)
|
||||
|
||||
def do_FETCH(self, data):
|
||||
(op, src, dst) = shlex.split(to_native(data))
|
||||
return self.connection.put_file(src, dst)
|
||||
|
||||
def do_CONTEXT(self, data):
|
||||
pc_data = data.split(b'CONTEXT: ', 1)[1]
|
||||
|
||||
if PY3:
|
||||
pc_data = cPickle.loads(pc_data, encoding='bytes')
|
||||
else:
|
||||
pc_data = cPickle.loads(pc_data)
|
||||
|
||||
pc = PlayContext()
|
||||
pc.deserialize(pc_data)
|
||||
|
||||
try:
|
||||
self.connection.update_play_context(pc)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
return (0, 'ok', '')
|
||||
|
||||
def do_RUN(self, data):
|
||||
timeout = self.play_context.timeout
|
||||
while bool(timeout):
|
||||
if os.path.exists(self.socket_path):
|
||||
break
|
||||
time.sleep(1)
|
||||
timeout -= 1
|
||||
return (0, self.socket_path, '')
|
||||
|
||||
|
||||
def communicate(sock, data):
|
||||
send_data(sock, data)
|
||||
rc = int(recv_data(sock), 10)
|
||||
stdout = recv_data(sock)
|
||||
stderr = recv_data(sock)
|
||||
return (rc, stdout, stderr)
|
||||
|
||||
def main():
|
||||
# Need stdin as a byte stream
|
||||
@@ -279,30 +284,32 @@ def main():
|
||||
|
||||
pc = PlayContext()
|
||||
pc.deserialize(pc_data)
|
||||
|
||||
except Exception as e:
|
||||
# FIXME: better error message/handling/logging
|
||||
sys.stderr.write(traceback.format_exc())
|
||||
sys.exit("FAIL: %s" % e)
|
||||
|
||||
ssh = connection_loader.get('ssh', class_only=True)
|
||||
m = ssh._create_control_path(pc.remote_addr, pc.port, pc.remote_user)
|
||||
cp = ssh._create_control_path(pc.remote_addr, pc.connection, pc.remote_user)
|
||||
|
||||
# create the persistent connection dir if need be and create the paths
|
||||
# which we will be using later
|
||||
tmp_path = unfrackpath("$HOME/.ansible/pc")
|
||||
tmp_path = unfrackpath(C.PERSISTENT_CONTROL_PATH_DIR)
|
||||
makedirs_safe(tmp_path)
|
||||
lk_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path)
|
||||
sf_path = unfrackpath(m % dict(directory=tmp_path))
|
||||
lock_path = unfrackpath("%s/.ansible_pc_lock" % tmp_path)
|
||||
socket_path = unfrackpath(cp % dict(directory=tmp_path))
|
||||
|
||||
# if the socket file doesn't exist, spin up the daemon process
|
||||
lock_fd = os.open(lk_path, os.O_RDWR|os.O_CREAT, 0o600)
|
||||
lock_fd = os.open(lock_path, os.O_RDWR|os.O_CREAT, 0o600)
|
||||
fcntl.lockf(lock_fd, fcntl.LOCK_EX)
|
||||
if not os.path.exists(sf_path):
|
||||
|
||||
if not os.path.exists(socket_path):
|
||||
pid = do_fork()
|
||||
if pid == 0:
|
||||
rc = 0
|
||||
try:
|
||||
server = Server(sf_path, pc)
|
||||
server = Server(socket_path, pc)
|
||||
except AnsibleConnectionFailure as exc:
|
||||
display.display('connecting to host %s returned an error' % pc.remote_addr, log_only=True)
|
||||
display.display(str(exc), log_only=True)
|
||||
@@ -318,50 +325,57 @@ def main():
|
||||
sys.exit(rc)
|
||||
else:
|
||||
display.display('re-using existing socket for %s@%s:%s' % (pc.remote_user, pc.remote_addr, pc.port), log_only=True)
|
||||
|
||||
fcntl.lockf(lock_fd, fcntl.LOCK_UN)
|
||||
os.close(lock_fd)
|
||||
|
||||
timeout = pc.timeout
|
||||
while bool(timeout):
|
||||
if os.path.exists(socket_path):
|
||||
display.vvvv('connected to local socket in %s' % (pc.timeout - timeout), pc.remote_addr)
|
||||
break
|
||||
time.sleep(1)
|
||||
timeout -= 1
|
||||
else:
|
||||
raise AnsibleConnectionFailure('timeout waiting for local socket', pc.remote_addr)
|
||||
|
||||
# now connect to the daemon process
|
||||
# FIXME: if the socket file existed but the daemonized process was killed,
|
||||
# the connection will timeout here. Need to make this more resilient.
|
||||
rc = 0
|
||||
while rc == 0:
|
||||
while True:
|
||||
data = stdin.readline()
|
||||
if data == b'':
|
||||
break
|
||||
if data.strip() == b'':
|
||||
continue
|
||||
sf = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
attempts = 1
|
||||
while True:
|
||||
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
|
||||
attempts = C.PERSISTENT_CONNECT_RETRIES
|
||||
while bool(attempts):
|
||||
try:
|
||||
sf.connect(sf_path)
|
||||
sock.connect(socket_path)
|
||||
break
|
||||
except socket.error:
|
||||
# FIXME: better error handling/logging/message here
|
||||
time.sleep(C.PERSISTENT_CONNECT_INTERVAL)
|
||||
attempts += 1
|
||||
if attempts > C.PERSISTENT_CONNECT_RETRIES:
|
||||
display.display('number of connection attempts exceeded, unable to connect to control socket', pc.remote_addr, pc.remote_user, log_only=True)
|
||||
display.display('persistent_connect_interval=%s, persistent_connect_retries=%s' % (C.PERSISTENT_CONNECT_INTERVAL, C.PERSISTENT_CONNECT_RETRIES), pc.remote_addr, pc.remote_user, log_only=True)
|
||||
sys.stderr.write('failed to connect to control socket')
|
||||
sys.exit(255)
|
||||
attempts -= 1
|
||||
else:
|
||||
display.display('number of connection attempts exceeded, unable to connect to control socket', pc.remote_addr, pc.remote_user, log_only=True)
|
||||
display.display('persistent_connect_interval=%s, persistent_connect_retries=%s' % (C.PERSISTENT_CONNECT_INTERVAL, C.PERSISTENT_CONNECT_RETRIES), pc.remote_addr, pc.remote_user, log_only=True)
|
||||
sys.stderr.write('failed to connect to control socket')
|
||||
sys.exit(255)
|
||||
|
||||
# send the play_context back into the connection so the connection
|
||||
# can handle any privilege escalation activities
|
||||
pc_data = b'CONTEXT: %s' % init_data
|
||||
send_data(sf, pc_data)
|
||||
communicate(sock, pc_data)
|
||||
|
||||
send_data(sf, data.strip())
|
||||
|
||||
rc = int(recv_data(sf), 10)
|
||||
stdout = recv_data(sf)
|
||||
stderr = recv_data(sf)
|
||||
rc, stdout, stderr = communicate(sock, data.strip())
|
||||
|
||||
sys.stdout.write(to_native(stdout))
|
||||
sys.stderr.write(to_native(stderr))
|
||||
|
||||
sf.close()
|
||||
sock.close()
|
||||
break
|
||||
|
||||
sys.exit(rc)
|
||||
|
||||
Reference in New Issue
Block a user