CI: add type checking (#10997)

* Set up type checking with mypy.

* Make mypy pass.

* Use list() instead of sorted().
This commit is contained in:
Felix Fontein
2025-10-29 18:13:38 +01:00
committed by GitHub
parent 831787619a
commit 6088b0cff5
73 changed files with 442 additions and 175 deletions

View File

@@ -106,10 +106,10 @@ class Connection(ConnectionBase):
host=self._instance())
self._connected = True
def _build_command(self, cmd) -> str:
def _build_command(self, cmd) -> list[str]:
"""build the command to execute on the incus host"""
exec_cmd = [
exec_cmd: list[str] = [
self._incus_cmd,
"--project", self.get_option("project"),
"exec",

View File

@@ -108,10 +108,10 @@ class Connection(ConnectionBase):
self._display.vvv(f"ESTABLISH LXD CONNECTION FOR USER: {self.get_option('remote_user')}", host=self._host())
self._connected = True
def _build_command(self, cmd) -> str:
def _build_command(self, cmd) -> list[str]:
"""build the command to execute on the lxd host"""
exec_cmd = [self._lxc_cmd]
exec_cmd: list[str] = [self._lxc_cmd]
if self.get_option("project"):
exec_cmd.extend(["--project", self.get_option("project")])

View File

@@ -106,7 +106,7 @@ class Connection(ConnectionBase):
super(Connection, self)._connect()
self._connected = True
@ensure_connect
@ensure_connect # type: ignore # TODO: for some reason, the type infos for ensure_connect suck...
def exec_command(self, cmd, in_data=None, sudoable=False):
"""Run specified command in a running QubesVM """
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)

View File

@@ -332,31 +332,23 @@ from ansible.utils.path import makedirs_safe
from binascii import hexlify
from subprocess import list2cmdline
PARAMIKO_IMPORT_ERR: str | None
try:
import paramiko
from paramiko import MissingHostKeyPolicy
PARAMIKO_IMPORT_ERR = None
except ImportError:
paramiko = None
PARAMIKO_IMPORT_ERR = traceback.format_exc()
if t.TYPE_CHECKING and PARAMIKO_IMPORT_ERR is None:
from paramiko import MissingHostKeyPolicy
from paramiko.client import SSHClient
from paramiko.pkey import PKey
else:
MissingHostKeyPolicy: type = object
SSHClient: type = object
PKey: type = object
MissingHostKeyPolicy = object # type: ignore
display = Display()
def authenticity_msg(hostname: str, ktype: str, fingerprint: str) -> str:
def authenticity_msg(hostname: str, ktype: str, fingerprint: bytes) -> str:
msg = f"""
paramiko: The authenticity of host '{hostname}' can't be established.
The {ktype} key fingerprint is {fingerprint}.
The {ktype} key fingerprint is {to_text(fingerprint)}.
Are you sure you want to continue connecting (yes/no)?
"""
return msg
@@ -376,7 +368,7 @@ class MyAddPolicy(MissingHostKeyPolicy):
self.connection = connection
self._options = connection._options
def missing_host_key(self, client: SSHClient, hostname: str, key: PKey) -> None:
def missing_host_key(self, client: paramiko.SSHClient, hostname: str, key: paramiko.PKey) -> None:
if all((self.connection.get_option('host_key_checking'), not self.connection.get_option('host_key_auto_add'))):
@@ -396,10 +388,10 @@ class MyAddPolicy(MissingHostKeyPolicy):
if inp.lower() not in ['yes', 'y', '']:
raise AnsibleError('host connection rejected by user')
key._added_by_ansible_this_time = True
key._added_by_ansible_this_time = True # type: ignore
# existing implementation below:
client._host_keys.add(hostname, key.get_name(), key)
client._host_keys.add(hostname, key.get_name(), key) # type: ignore[attr-defined] # TODO: figure out what _host_keys is!
# host keys are actually saved in close() function below
# in order to control ordering.
@@ -540,7 +532,7 @@ class Connection(ConnectionBase):
return self
def _any_keys_added(self) -> bool:
for hostname, keys in self.ssh._host_keys.items():
for hostname, keys in self.ssh._host_keys.items(): # type: ignore[attr-defined] # TODO: figure out what _host_keys is!
for keytype, key in keys.items():
added_this_time = getattr(key, '_added_by_ansible_this_time', False)
if added_this_time:
@@ -560,14 +552,14 @@ class Connection(ConnectionBase):
makedirs_safe(path)
with open(filename, 'w') as f:
for hostname, keys in self.ssh._host_keys.items():
for hostname, keys in self.ssh._host_keys.items(): # type: ignore[attr-defined] # TODO: figure out what _host_keys is!
for keytype, key in keys.items():
# was f.write
added_this_time = getattr(key, '_added_by_ansible_this_time', False)
if not added_this_time:
f.write(f'{hostname} {keytype} {key.get_base64()}\n')
for hostname, keys in self.ssh._host_keys.items():
for hostname, keys in self.ssh._host_keys.items(): # type: ignore[attr-defined] # TODO: figure out what _host_keys is!
for keytype, key in keys.items():
added_this_time = getattr(key, '_added_by_ansible_this_time', False)
if added_this_time:
@@ -595,13 +587,16 @@ class Connection(ConnectionBase):
cmd = self._build_wsl_command(cmd)
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable) # type: ignore[safe-super]
bufsize = 4096
try:
self.ssh.get_transport().set_keepalive(5)
chan = self.ssh.get_transport().open_session()
transport = self.ssh.get_transport()
if transport is None:
raise ValueError("Transport not available")
transport.set_keepalive(5)
chan = transport.open_session()
except Exception as e:
text_e = to_text(e)
msg = 'Failed to open session'
@@ -745,7 +740,7 @@ class Connection(ConnectionBase):
# just in case any were added recently
self.ssh.load_system_host_keys()
self.ssh._host_keys.update(self.ssh._system_host_keys)
self.ssh._host_keys.update(self.ssh._system_host_keys) # type: ignore[attr-defined] # TODO this is a HACK!
# gather information about the current key file, so
# we can ensure the new file has the correct mode/owner