python/qemu: Add mypy type annotations

These should all be purely annotations with no changes in behavior at
all. You need to be in the python folder, but you should be able to
confirm that these annotations are correct (or at least self-consistent)
by running `mypy --strict qemu`.

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Kevin Wolf <kwolf@redhat.com>
Message-id: 20201006235817.3280413-12-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
This commit is contained in:
John Snow 2020-10-06 19:58:08 -04:00
parent 090744d552
commit f12a282ff4
4 changed files with 101 additions and 75 deletions

View file

@ -15,6 +15,7 @@ from types import TracebackType
from typing import (
Any,
Dict,
List,
Optional,
TextIO,
Tuple,
@ -90,7 +91,9 @@ class QEMUMonitorProtocol:
#: Logger object for debugging messages
logger = logging.getLogger('QMP')
def __init__(self, address, server=False, nickname=None):
def __init__(self, address: SocketAddrT,
server: bool = False,
nickname: Optional[str] = None):
"""
Create a QEMUMonitorProtocol class.
@ -102,7 +105,7 @@ class QEMUMonitorProtocol:
@note No connection is established, this is done by the connect() or
accept() methods
"""
self.__events = []
self.__events: List[QMPMessage] = []
self.__address = address
self.__sock = self.__get_sock()
self.__sockfile: Optional[TextIO] = None
@ -114,14 +117,14 @@ class QEMUMonitorProtocol:
self.__sock.bind(self.__address)
self.__sock.listen(1)
def __get_sock(self):
def __get_sock(self) -> socket.socket:
if isinstance(self.__address, tuple):
family = socket.AF_INET
else:
family = socket.AF_UNIX
return socket.socket(family, socket.SOCK_STREAM)
def __negotiate_capabilities(self):
def __negotiate_capabilities(self) -> QMPMessage:
greeting = self.__json_read()
if greeting is None or "QMP" not in greeting:
raise QMPConnectError
@ -131,7 +134,7 @@ class QEMUMonitorProtocol:
return greeting
raise QMPCapabilitiesError
def __json_read(self, only_event=False):
def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
assert self.__sockfile is not None
while True:
data = self.__sockfile.readline()
@ -148,7 +151,7 @@ class QEMUMonitorProtocol:
continue
return resp
def __get_events(self, wait=False):
def __get_events(self, wait: Union[bool, float] = False) -> None:
"""
Check for new events in the stream and cache them in __events.
@ -186,7 +189,7 @@ class QEMUMonitorProtocol:
raise QMPConnectError("Error while reading from socket")
self.__sock.settimeout(None)
def __enter__(self):
def __enter__(self) -> 'QEMUMonitorProtocol':
# Implement context manager enter function.
return self
@ -199,7 +202,7 @@ class QEMUMonitorProtocol:
# Implement context manager exit function.
self.close()
def connect(self, negotiate=True):
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
"""
Connect to the QMP Monitor and perform capabilities negotiation.
@ -214,7 +217,7 @@ class QEMUMonitorProtocol:
return self.__negotiate_capabilities()
return None
def accept(self, timeout=15.0):
def accept(self, timeout: float = 15.0) -> QMPMessage:
"""
Await connection from QMP Monitor and perform capabilities negotiation.
@ -250,7 +253,9 @@ class QEMUMonitorProtocol:
self.logger.debug("<<< %s", resp)
return resp
def cmd(self, name, args=None, cmd_id=None):
def cmd(self, name: str,
args: Optional[Dict[str, Any]] = None,
cmd_id: Optional[Any] = None) -> QMPMessage:
"""
Build a QMP command and send it to the QMP Monitor.
@ -258,14 +263,14 @@ class QEMUMonitorProtocol:
@param args: command arguments (dict)
@param cmd_id: command id (dict, list, string or int)
"""
qmp_cmd = {'execute': name}
qmp_cmd: QMPMessage = {'execute': name}
if args:
qmp_cmd['arguments'] = args
if cmd_id:
qmp_cmd['id'] = cmd_id
return self.cmd_obj(qmp_cmd)
def command(self, cmd, **kwds):
def command(self, cmd: str, **kwds: Any) -> QMPReturnValue:
"""
Build and send a QMP command to the monitor, report errors if any
"""
@ -278,7 +283,8 @@ class QEMUMonitorProtocol:
)
return cast(QMPReturnValue, ret['return'])
def pull_event(self, wait=False):
def pull_event(self,
wait: Union[bool, float] = False) -> Optional[QMPMessage]:
"""
Pulls a single event.
@ -298,7 +304,7 @@ class QEMUMonitorProtocol:
return self.__events.pop(0)
return None
def get_events(self, wait=False):
def get_events(self, wait: bool = False) -> List[QMPMessage]:
"""
Get a list of available QMP events.
@ -315,13 +321,13 @@ class QEMUMonitorProtocol:
self.__get_events(wait)
return self.__events
def clear_events(self):
def clear_events(self) -> None:
"""
Clear current list of pending events.
"""
self.__events = []
def close(self):
def close(self) -> None:
"""
Close the socket and socket file.
"""
@ -330,7 +336,7 @@ class QEMUMonitorProtocol:
if self.__sockfile:
self.__sockfile.close()
def settimeout(self, timeout):
def settimeout(self, timeout: float) -> None:
"""
Set the socket timeout.
@ -339,7 +345,7 @@ class QEMUMonitorProtocol:
"""
self.__sock.settimeout(timeout)
def get_sock_fd(self):
def get_sock_fd(self) -> int:
"""
Get the socket file descriptor.
@ -347,7 +353,7 @@ class QEMUMonitorProtocol:
"""
return self.__sock.fileno()
def is_scm_available(self):
def is_scm_available(self) -> bool:
"""
Check if the socket allows for SCM_RIGHTS.