python/aqmp: add logging to AsyncProtocol

Give the connection and the reader/writer tasks nicknames, and add
logging statements throughout.

Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Message-id: 20210915162955.333025-9-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
This commit is contained in:
John Snow 2021-09-15 12:29:36 -04:00
parent c1408345af
commit 50e533061f

View file

@ -14,6 +14,7 @@ import asyncio
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter
from enum import Enum from enum import Enum
from functools import wraps from functools import wraps
import logging
from ssl import SSLContext from ssl import SSLContext
from typing import ( from typing import (
Any, Any,
@ -32,8 +33,10 @@ from .error import AQMPError
from .util import ( from .util import (
bottom_half, bottom_half,
create_task, create_task,
exception_summary,
flush, flush,
is_closing, is_closing,
pretty_traceback,
upper_half, upper_half,
wait_closed, wait_closed,
) )
@ -174,14 +177,28 @@ class AsyncProtocol(Generic[T]):
can be written after the super() call. can be written after the super() call.
- `_on_message`: - `_on_message`:
Actions to be performed when a message is received. Actions to be performed when a message is received.
:param name:
Name used for logging messages, if any. By default, messages
will log to 'qemu.aqmp.protocol', but each individual connection
can be given its own logger by giving it a name; messages will
then log to 'qemu.aqmp.protocol.${name}'.
""" """
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
#: Logger object for debugging messages from this connection.
logger = logging.getLogger(__name__)
# ------------------------- # -------------------------
# Section: Public interface # Section: Public interface
# ------------------------- # -------------------------
def __init__(self) -> None: def __init__(self, name: Optional[str] = None) -> None:
#: The nickname for this connection, if any.
self.name: Optional[str] = name
if self.name is not None:
self.logger = self.logger.getChild(self.name)
# stream I/O # stream I/O
self._reader: Optional[StreamReader] = None self._reader: Optional[StreamReader] = None
self._writer: Optional[StreamWriter] = None self._writer: Optional[StreamWriter] = None
@ -205,6 +222,14 @@ class AsyncProtocol(Generic[T]):
self._runstate = Runstate.IDLE self._runstate = Runstate.IDLE
self._runstate_changed: Optional[asyncio.Event] = None self._runstate_changed: Optional[asyncio.Event] = None
def __repr__(self) -> str:
cls_name = type(self).__name__
tokens = []
if self.name is not None:
tokens.append(f"name={self.name!r}")
tokens.append(f"runstate={self.runstate.name}")
return f"<{cls_name} {' '.join(tokens)}>"
@property # @upper_half @property # @upper_half
def runstate(self) -> Runstate: def runstate(self) -> Runstate:
"""The current `Runstate` of the connection.""" """The current `Runstate` of the connection."""
@ -246,6 +271,7 @@ class AsyncProtocol(Generic[T]):
:raise Exception: When the reader or writer terminate unexpectedly. :raise Exception: When the reader or writer terminate unexpectedly.
""" """
self.logger.debug("disconnect() called.")
self._schedule_disconnect() self._schedule_disconnect()
await self._wait_disconnect() await self._wait_disconnect()
@ -273,6 +299,8 @@ class AsyncProtocol(Generic[T]):
if state == self._runstate: if state == self._runstate:
return return
self.logger.debug("Transitioning from '%s' to '%s'.",
str(self._runstate), str(state))
self._runstate = state self._runstate = state
self._runstate_event.set() self._runstate_event.set()
self._runstate_event.clear() self._runstate_event.clear()
@ -312,8 +340,15 @@ class AsyncProtocol(Generic[T]):
except BaseException as err: except BaseException as err:
emsg = f"Failed to establish {phase}" emsg = f"Failed to establish {phase}"
self.logger.error("%s: %s", emsg, exception_summary(err))
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
try:
# Reset from CONNECTING back to IDLE. # Reset from CONNECTING back to IDLE.
await self.disconnect() await self.disconnect()
except:
emsg = "Unexpected bottom half exception"
self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
raise
# NB: CancelledError is not a BaseException before Python 3.8 # NB: CancelledError is not a BaseException before Python 3.8
if isinstance(err, asyncio.CancelledError): if isinstance(err, asyncio.CancelledError):
@ -363,12 +398,16 @@ class AsyncProtocol(Generic[T]):
:raise OSError: For stream-related errors. :raise OSError: For stream-related errors.
""" """
self.logger.debug("Connecting to %s ...", address)
if isinstance(address, tuple): if isinstance(address, tuple):
connect = asyncio.open_connection(address[0], address[1], ssl=ssl) connect = asyncio.open_connection(address[0], address[1], ssl=ssl)
else: else:
connect = asyncio.open_unix_connection(path=address, ssl=ssl) connect = asyncio.open_unix_connection(path=address, ssl=ssl)
self._reader, self._writer = await connect self._reader, self._writer = await connect
self.logger.debug("Connected.")
@upper_half @upper_half
async def _establish_session(self) -> None: async def _establish_session(self) -> None:
""" """
@ -382,8 +421,8 @@ class AsyncProtocol(Generic[T]):
self._outgoing = asyncio.Queue() self._outgoing = asyncio.Queue()
reader_coro = self._bh_loop_forever(self._bh_recv_message) reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
writer_coro = self._bh_loop_forever(self._bh_send_message) writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
self._reader_task = create_task(reader_coro) self._reader_task = create_task(reader_coro)
self._writer_task = create_task(writer_coro) self._writer_task = create_task(writer_coro)
@ -410,6 +449,7 @@ class AsyncProtocol(Generic[T]):
""" """
if not self._dc_task: if not self._dc_task:
self._set_state(Runstate.DISCONNECTING) self._set_state(Runstate.DISCONNECTING)
self.logger.debug("Scheduling disconnect.")
self._dc_task = create_task(self._bh_disconnect()) self._dc_task = create_task(self._bh_disconnect())
@upper_half @upper_half
@ -492,30 +532,39 @@ class AsyncProtocol(Generic[T]):
# Try to flush the writer, if possible: # Try to flush the writer, if possible:
if not error_pathway: if not error_pathway:
await self._bh_flush_writer() await self._bh_flush_writer()
except: except BaseException as err:
error_pathway = True error_pathway = True
emsg = "Failed to flush the writer"
self.logger.error("%s: %s", emsg, exception_summary(err))
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
raise raise
finally: finally:
# Cancel any still-running tasks: # Cancel any still-running tasks:
if self._writer_task is not None and not self._writer_task.done(): if self._writer_task is not None and not self._writer_task.done():
self.logger.debug("Cancelling writer task.")
self._writer_task.cancel() self._writer_task.cancel()
if self._reader_task is not None and not self._reader_task.done(): if self._reader_task is not None and not self._reader_task.done():
self.logger.debug("Cancelling reader task.")
self._reader_task.cancel() self._reader_task.cancel()
# Close out the tasks entirely (Won't raise): # Close out the tasks entirely (Won't raise):
if tasks: if tasks:
self.logger.debug("Waiting for tasks to complete ...")
await asyncio.wait(tasks) await asyncio.wait(tasks)
# Lastly, close the stream itself. (May raise): # Lastly, close the stream itself. (May raise):
await self._bh_close_stream(error_pathway) await self._bh_close_stream(error_pathway)
self.logger.debug("Disconnected.")
@bottom_half @bottom_half
async def _bh_flush_writer(self) -> None: async def _bh_flush_writer(self) -> None:
if not self._writer_task: if not self._writer_task:
return return
self.logger.debug("Draining the outbound queue ...")
await self._outgoing.join() await self._outgoing.join()
if self._writer is not None: if self._writer is not None:
self.logger.debug("Flushing the StreamWriter ...")
await flush(self._writer) await flush(self._writer)
@bottom_half @bottom_half
@ -525,8 +574,10 @@ class AsyncProtocol(Generic[T]):
return return
if not is_closing(self._writer): if not is_closing(self._writer):
self.logger.debug("Closing StreamWriter.")
self._writer.close() self._writer.close()
self.logger.debug("Waiting for StreamWriter to close ...")
try: try:
await wait_closed(self._writer) await wait_closed(self._writer)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
@ -541,13 +592,18 @@ class AsyncProtocol(Generic[T]):
# just trust that the Exception we already have is the # just trust that the Exception we already have is the
# better one to present to the user, even if we don't # better one to present to the user, even if we don't
# genuinely *know* the relationship between the two. # genuinely *know* the relationship between the two.
pass self.logger.debug(
"Discarding Exception from wait_closed:\n%s\n",
pretty_traceback(),
)
else: else:
# Oops, this is a brand-new error! # Oops, this is a brand-new error!
raise raise
finally:
self.logger.debug("StreamWriter closed.")
@bottom_half @bottom_half
async def _bh_loop_forever(self, async_fn: _TaskFN) -> None: async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
""" """
Run one of the bottom-half methods in a loop forever. Run one of the bottom-half methods in a loop forever.
@ -555,16 +611,24 @@ class AsyncProtocol(Generic[T]):
disconnect that will terminate the entire loop. disconnect that will terminate the entire loop.
:param async_fn: The bottom-half method to run in a loop. :param async_fn: The bottom-half method to run in a loop.
:param name: The name of this task, used for logging.
""" """
try: try:
while True: while True:
await async_fn() await async_fn()
except asyncio.CancelledError: except asyncio.CancelledError:
# We have been cancelled by _bh_disconnect, exit gracefully. # We have been cancelled by _bh_disconnect, exit gracefully.
self.logger.debug("Task.%s: cancelled.", name)
return return
except BaseException: except BaseException as err:
self.logger.error("Task.%s: %s",
name, exception_summary(err))
self.logger.debug("Task.%s: failure:\n%s\n",
name, pretty_traceback())
self._schedule_disconnect() self._schedule_disconnect()
raise raise
finally:
self.logger.debug("Task.%s: exiting.", name)
@bottom_half @bottom_half
async def _bh_send_message(self) -> None: async def _bh_send_message(self) -> None: