python/aqmp: Add message routing to QMP protocol

Add the ability to handle and route messages in qmp_protocol.py. The
interface for actually sending anything still isn't added until next
commit.

Signed-off-by: John Snow <jsnow@redhat.com>
Message-id: 20210915162955.333025-20-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
This commit is contained in:
John Snow 2021-09-15 12:29:47 -04:00
parent 4cd17f375d
commit 577737be55

View file

@ -7,15 +7,19 @@ used to either connect to a listening server, or used to listen and
accept an incoming connection from that server. accept an incoming connection from that server.
""" """
# The import workarounds here are fixed in the next commit.
import asyncio # pylint: disable=unused-import # noqa
import logging import logging
from typing import ( from typing import (
Dict, Dict,
List, List,
Mapping, Mapping,
Optional, Optional,
Union,
cast,
) )
from .error import ProtocolError from .error import AQMPError, ProtocolError
from .events import Events from .events import Events
from .message import Message from .message import Message
from .models import Greeting from .models import Greeting
@ -61,6 +65,53 @@ class NegotiationError(_WrappedProtocolError):
""" """
class ExecInterruptedError(AQMPError):
"""
Exception raised when an RPC is interrupted.
This error is raised when an execute() statement could not be
completed. This can occur because the connection itself was
terminated before a reply was received.
The true cause of the interruption will be available via `disconnect()`.
"""
class _MsgProtocolError(ProtocolError):
"""
Abstract error class for protocol errors that have a `Message` object.
This Exception class is used for protocol errors where the `Message`
was mechanically understood, but was found to be inappropriate or
malformed.
:param error_message: Human-readable string describing the error.
:param msg: The QMP `Message` that caused the error.
"""
def __init__(self, error_message: str, msg: Message):
super().__init__(error_message)
#: The received `Message` that caused the error.
self.msg: Message = msg
def __str__(self) -> str:
return "\n".join([
super().__str__(),
f" Message was: {str(self.msg)}\n",
])
class ServerParseError(_MsgProtocolError):
"""
The Server sent a `Message` indicating parsing failure.
i.e. A reply has arrived from the server, but it is missing the "ID"
field, indicating a parsing error.
:param error_message: Human-readable string describing the error.
:param msg: The QMP `Message` that caused the error.
"""
class QMPClient(AsyncProtocol[Message], Events): class QMPClient(AsyncProtocol[Message], Events):
""" """
Implements a QMP client connection. Implements a QMP client connection.
@ -106,6 +157,9 @@ class QMPClient(AsyncProtocol[Message], Events):
# Read buffer limit; large enough to accept query-qmp-schema # Read buffer limit; large enough to accept query-qmp-schema
_limit = (256 * 1024) _limit = (256 * 1024)
# Type alias for pending execute() result items
_PendingT = Union[Message, ExecInterruptedError]
def __init__(self, name: Optional[str] = None) -> None: def __init__(self, name: Optional[str] = None) -> None:
super().__init__(name) super().__init__(name)
Events.__init__(self) Events.__init__(self)
@ -120,6 +174,12 @@ class QMPClient(AsyncProtocol[Message], Events):
# Cached Greeting, if one was awaited. # Cached Greeting, if one was awaited.
self._greeting: Optional[Greeting] = None self._greeting: Optional[Greeting] = None
# Incoming RPC reply messages.
self._pending: Dict[
Union[str, None],
'asyncio.Queue[QMPClient._PendingT]'
] = {}
@upper_half @upper_half
async def _establish_session(self) -> None: async def _establish_session(self) -> None:
""" """
@ -132,6 +192,9 @@ class QMPClient(AsyncProtocol[Message], Events):
:raise EOFError: When the server unexpectedly hangs up. :raise EOFError: When the server unexpectedly hangs up.
:raise OSError: For underlying stream errors. :raise OSError: For underlying stream errors.
""" """
self._greeting = None
self._pending = {}
if self.await_greeting or self.negotiate: if self.await_greeting or self.negotiate:
self._greeting = await self._get_greeting() self._greeting = await self._get_greeting()
@ -203,10 +266,33 @@ class QMPClient(AsyncProtocol[Message], Events):
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback()) self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
raise raise
@bottom_half
async def _bh_disconnect(self) -> None:
try:
await super()._bh_disconnect()
finally:
if self._pending:
self.logger.debug("Cancelling pending executions")
keys = self._pending.keys()
for key in keys:
self.logger.debug("Cancelling execution '%s'", key)
self._pending[key].put_nowait(
ExecInterruptedError("Disconnected")
)
self.logger.debug("QMP Disconnected.")
@upper_half
def _cleanup(self) -> None:
super()._cleanup()
assert not self._pending
@bottom_half @bottom_half
async def _on_message(self, msg: Message) -> None: async def _on_message(self, msg: Message) -> None:
""" """
Add an incoming message to the appropriate queue/handler. Add an incoming message to the appropriate queue/handler.
:raise ServerParseError: When Message indicates server parse failure.
""" """
# Incoming messages are not fully parsed/validated here; # Incoming messages are not fully parsed/validated here;
# do only light peeking to know how to route the messages. # do only light peeking to know how to route the messages.
@ -216,7 +302,39 @@ class QMPClient(AsyncProtocol[Message], Events):
return return
# Below, we assume everything left is an execute/exec-oob response. # Below, we assume everything left is an execute/exec-oob response.
# ... Which we'll implement in the next commit!
exec_id = cast(Optional[str], msg.get('id'))
if exec_id in self._pending:
await self._pending[exec_id].put(msg)
return
# We have a message we can't route back to a caller.
is_error = 'error' in msg
has_id = 'id' in msg
if is_error and not has_id:
# This is very likely a server parsing error.
# It doesn't inherently belong to any pending execution.
# Instead of performing clever recovery, just terminate.
# See "NOTE" in qmp-spec.txt, section 2.4.2
raise ServerParseError(
("Server sent an error response without an ID, "
"but there are no ID-less executions pending. "
"Assuming this is a server parser failure."),
msg
)
# qmp-spec.txt, section 2.4:
# 'Clients should drop all the responses
# that have an unknown "id" field.'
self.logger.log(
logging.ERROR if is_error else logging.WARNING,
"Unknown ID '%s', message dropped.",
exec_id,
)
self.logger.debug("Unroutable message: %s", str(msg))
@upper_half @upper_half
@bottom_half @bottom_half