mcu: Pass conn_helper to CommandWrapper and CommandQueryWrapper

Pass the low-level MCUConnectHelper class to these helper classes.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2026-01-19 20:40:40 -05:00
parent e590bc87d8
commit 1cde5e2018

View file

@ -60,14 +60,14 @@ class RetryAsyncCommand:
# Wrapper around query commands
class CommandQueryWrapper:
def __init__(self, serial, msgformat, respformat, oid=None,
cmd_queue=None, is_async=False, error=serialhdl.error):
self._serial = serial
def __init__(self, conn_helper, msgformat, respformat, oid=None,
cmd_queue=None, is_async=False):
self._serial = serial = conn_helper.get_serial()
self._cmd = serial.get_msgparser().lookup_command(msgformat)
serial.get_msgparser().lookup_command(respformat)
self._response = respformat.split()[0]
self._oid = oid
self._error = error
self._error = conn_helper.get_mcu().get_printer().command_error
self._xmit_helper = serialhdl.SerialRetryCommand
if is_async:
self._xmit_helper = RetryAsyncCommand
@ -92,15 +92,15 @@ class CommandQueryWrapper:
# Wrapper around command sending
class CommandWrapper:
def __init__(self, serial, msgformat, cmd_queue=None, debugoutput=False):
self._serial = serial
def __init__(self, conn_helper, msgformat, cmd_queue=None):
self._serial = serial = conn_helper.get_serial()
msgparser = serial.get_msgparser()
self._cmd = msgparser.lookup_command(msgformat)
if cmd_queue is None:
cmd_queue = serial.get_default_command_queue()
self._cmd_queue = cmd_queue
self._msgtag = msgparser.lookup_msgid(msgformat) & 0xffffffff
if debugoutput:
if conn_helper.get_mcu().is_fileoutput():
# Can't use send_wait_ack when in debugging mode
self.send_wait_ack = self.send
def send(self, data=(), minclock=0, reqclock=0):
@ -1096,12 +1096,11 @@ class MCU:
def max_nominal_duration(self):
return MAX_NOMINAL_DURATION
def lookup_command(self, msgformat, cq=None):
return CommandWrapper(self._serial, msgformat, cq,
debugoutput=self.is_fileoutput())
return CommandWrapper(self._conn_helper, msgformat, cq)
def lookup_query_command(self, msgformat, respformat, oid=None,
cq=None, is_async=False):
return CommandQueryWrapper(self._serial, msgformat, respformat, oid,
cq, is_async, self._printer.command_error)
return CommandQueryWrapper(self._conn_helper, msgformat, respformat,
oid, cq, is_async)
def try_lookup_command(self, msgformat):
try:
return self.lookup_command(msgformat)