mirror of
https://github.com/Motorhead1991/qemu.git
synced 2025-07-27 04:13:53 -06:00
python/aqmp: split _client_connected_cb() out as _incoming()
As part of disentangling the monolithic nature of _do_accept(), split out the incoming callback to prepare for factoring out the "wait for a peer" step. Namely, this means using an event signal we can wait on from outside of this method. Signed-off-by: John Snow <jsnow@redhat.com> Acked-by: Kevin Wolf <kwolf@redhat.com> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com> Message-id: 20220225205948.3693480-5-jsnow@redhat.com Signed-off-by: John Snow <jsnow@redhat.com>
This commit is contained in:
parent
68a6cf3ffe
commit
830e6fd36e
1 changed files with 58 additions and 25 deletions
|
@ -242,6 +242,10 @@ class AsyncProtocol(Generic[T]):
|
||||||
# Workaround for bind()
|
# Workaround for bind()
|
||||||
self._sock: Optional[socket.socket] = None
|
self._sock: Optional[socket.socket] = None
|
||||||
|
|
||||||
|
# Server state for start_server() and _incoming()
|
||||||
|
self._server: Optional[asyncio.AbstractServer] = None
|
||||||
|
self._accepted: Optional[asyncio.Event] = None
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
cls_name = type(self).__name__
|
cls_name = type(self).__name__
|
||||||
tokens = []
|
tokens = []
|
||||||
|
@ -425,6 +429,54 @@ class AsyncProtocol(Generic[T]):
|
||||||
self._runstate_event.set()
|
self._runstate_event.set()
|
||||||
self._runstate_event.clear()
|
self._runstate_event.clear()
|
||||||
|
|
||||||
|
@bottom_half # However, it does not run from the R/W tasks.
|
||||||
|
async def _stop_server(self) -> None:
|
||||||
|
"""
|
||||||
|
Stop listening for / accepting new incoming connections.
|
||||||
|
"""
|
||||||
|
if self._server is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.logger.debug("Stopping server.")
|
||||||
|
self._server.close()
|
||||||
|
await self._server.wait_closed()
|
||||||
|
self.logger.debug("Server stopped.")
|
||||||
|
finally:
|
||||||
|
self._server = None
|
||||||
|
|
||||||
|
@bottom_half # However, it does not run from the R/W tasks.
|
||||||
|
async def _incoming(self,
|
||||||
|
reader: asyncio.StreamReader,
|
||||||
|
writer: asyncio.StreamWriter) -> None:
|
||||||
|
"""
|
||||||
|
Accept an incoming connection and signal the upper_half.
|
||||||
|
|
||||||
|
This method does the minimum necessary to accept a single
|
||||||
|
incoming connection. It signals back to the upper_half ASAP so
|
||||||
|
that any errors during session initialization can occur
|
||||||
|
naturally in the caller's stack.
|
||||||
|
|
||||||
|
:param reader: Incoming `asyncio.StreamReader`
|
||||||
|
:param writer: Incoming `asyncio.StreamWriter`
|
||||||
|
"""
|
||||||
|
peer = writer.get_extra_info('peername', 'Unknown peer')
|
||||||
|
self.logger.debug("Incoming connection from %s", peer)
|
||||||
|
|
||||||
|
if self._reader or self._writer:
|
||||||
|
# Sadly, we can have more than one pending connection
|
||||||
|
# because of https://bugs.python.org/issue46715
|
||||||
|
# Close any extra connections we don't actually want.
|
||||||
|
self.logger.warning("Extraneous connection inadvertently accepted")
|
||||||
|
writer.close()
|
||||||
|
return
|
||||||
|
|
||||||
|
# A connection has been accepted; stop listening for new ones.
|
||||||
|
assert self._accepted is not None
|
||||||
|
await self._stop_server()
|
||||||
|
self._reader, self._writer = (reader, writer)
|
||||||
|
self._accepted.set()
|
||||||
|
|
||||||
def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None:
|
def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None:
|
||||||
"""
|
"""
|
||||||
Used to create a socket in advance of accept().
|
Used to create a socket in advance of accept().
|
||||||
|
@ -469,30 +521,11 @@ class AsyncProtocol(Generic[T]):
|
||||||
self._set_state(Runstate.CONNECTING)
|
self._set_state(Runstate.CONNECTING)
|
||||||
|
|
||||||
self.logger.debug("Awaiting connection on %s ...", address)
|
self.logger.debug("Awaiting connection on %s ...", address)
|
||||||
connected = asyncio.Event()
|
self._accepted = asyncio.Event()
|
||||||
server: Optional[asyncio.AbstractServer] = None
|
|
||||||
|
|
||||||
async def _client_connected_cb(reader: asyncio.StreamReader,
|
|
||||||
writer: asyncio.StreamWriter) -> None:
|
|
||||||
"""Used to accept a single incoming connection, see below."""
|
|
||||||
nonlocal server
|
|
||||||
nonlocal connected
|
|
||||||
|
|
||||||
# A connection has been accepted; stop listening for new ones.
|
|
||||||
assert server is not None
|
|
||||||
server.close()
|
|
||||||
await server.wait_closed()
|
|
||||||
server = None
|
|
||||||
|
|
||||||
# Register this client as being connected
|
|
||||||
self._reader, self._writer = (reader, writer)
|
|
||||||
|
|
||||||
# Signal back: We've accepted a client!
|
|
||||||
connected.set()
|
|
||||||
|
|
||||||
if isinstance(address, tuple):
|
if isinstance(address, tuple):
|
||||||
coro = asyncio.start_server(
|
coro = asyncio.start_server(
|
||||||
_client_connected_cb,
|
self._incoming,
|
||||||
host=None if self._sock else address[0],
|
host=None if self._sock else address[0],
|
||||||
port=None if self._sock else address[1],
|
port=None if self._sock else address[1],
|
||||||
ssl=ssl,
|
ssl=ssl,
|
||||||
|
@ -502,7 +535,7 @@ class AsyncProtocol(Generic[T]):
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
coro = asyncio.start_unix_server(
|
coro = asyncio.start_unix_server(
|
||||||
_client_connected_cb,
|
self._incoming,
|
||||||
path=None if self._sock else address,
|
path=None if self._sock else address,
|
||||||
ssl=ssl,
|
ssl=ssl,
|
||||||
backlog=1,
|
backlog=1,
|
||||||
|
@ -515,9 +548,9 @@ class AsyncProtocol(Generic[T]):
|
||||||
# otherwise yield.
|
# otherwise yield.
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
server = await coro # Starts listening
|
self._server = await coro # Starts listening
|
||||||
await connected.wait() # Waits for the callback to fire (and finish)
|
await self._accepted.wait() # Waits for the callback to finish
|
||||||
assert server is None
|
assert self._server is None
|
||||||
self._sock = None
|
self._sock = None
|
||||||
|
|
||||||
self.logger.debug("Connection accepted.")
|
self.logger.debug("Connection accepted.")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue