mcu: Add new MCURestartHelper helper class

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2025-09-22 15:47:57 -04:00
parent 64c155121e
commit de73e41d0f

View file

@ -557,72 +557,28 @@ MIN_SCHEDULE_TIME = 0.100
# Maximum time all MCUs can internally schedule into the future
MAX_NOMINAL_DURATION = 3.0
# Low-level mcu connection management helper
class MCUConnectHelper:
def __init__(self, config, mcu, clocksync):
self._mcu = mcu
self._clocksync = clocksync
# Support for restarting a micro-controller
class MCURestartHelper:
def __init__(self, config, conn_helper):
self._printer = printer = config.get_printer()
self._conn_helper = conn_helper
self._mcu = mcu = conn_helper.get_mcu()
self._serial = conn_helper.get_serial()
self._clocksync = conn_helper.get_clocksync()
self._reactor = printer.get_reactor()
self._name = name = mcu.get_name()
# Serial port
self._serial = serialhdl.SerialReader(self._reactor, mcu_name=name)
self._baud = 0
self._canbus_iface = None
canbus_uuid = config.get('canbus_uuid', None)
if canbus_uuid is not None:
self._serialport = canbus_uuid
self._canbus_iface = config.get('canbus_interface', 'can0')
cbid = self._printer.load_object(config, 'canbus_ids')
cbid.add_uuid(config, canbus_uuid, self._canbus_iface)
self._printer.load_object(config, 'canbus_stats %s' % (name,))
else:
self._serialport = config.get('serial')
if not (self._serialport.startswith("/dev/rpmsg_")
or self._serialport.startswith("/tmp/klipper_host_")):
self._baud = config.getint('baud', 250000, minval=2400)
# Restarts
self._name = mcu.get_name()
restart_methods = [None, 'arduino', 'cheetah', 'command', 'rpi_usb']
self._restart_method = 'command'
if self._baud:
serialport, baud = conn_helper.get_serialport()
if baud:
self._restart_method = config.getchoice('restart_method',
restart_methods, None)
self._reset_cmd = self._config_reset_cmd = None
self._is_mcu_bridge = False
self._emergency_stop_cmd = None
self._is_shutdown = self._is_timeout = False
self._shutdown_clock = 0
self._shutdown_msg = ""
# Register handlers
printer.register_event_handler("klippy:firmware_restart",
self._firmware_restart)
printer.register_event_handler("klippy:shutdown", self._shutdown)
printer.register_event_handler("klippy:disconnect", self._disconnect)
def get_mcu(self):
return self._mcu
def get_serial(self):
return self._serial
def get_clocksync(self):
return self._clocksync
def _handle_shutdown(self, params):
if self._is_shutdown:
return
self._is_shutdown = True
clock = params.get("clock")
if clock is not None:
self._shutdown_clock = self._mcu.clock32_to_clock64(clock)
self._shutdown_msg = msg = params['static_string_id']
event_type = params['#name']
self._printer.invoke_async_shutdown(
"MCU shutdown", {"reason": msg, "mcu": self._name,
"event_type": event_type})
logging.info("MCU '%s' %s: %s\n%s\n%s", self._name, event_type,
self._shutdown_msg, self._clocksync.dump_debug(),
self._serial.dump_debug())
def _handle_starting(self, params):
if not self._is_shutdown:
self._printer.invoke_async_shutdown("MCU '%s' spontaneous restart"
% (self._name,))
# Connection phase
def _check_restart(self, reason):
start_reason = self._printer.get_start_args().get("start_reason")
@ -639,63 +595,17 @@ class MCUConnectHelper:
if self._restart_method == 'rpi_usb':
# Only configure mcu after usb power reset
self._check_restart("full reset before config")
def _check_restart_on_attach(self):
def check_restart_on_attach(self):
resmeth = self._restart_method
if resmeth == 'rpi_usb' and not os.path.exists(self._serialport):
serialport, baud = self._conn_helper.get_serialport()
if resmeth == 'rpi_usb' and not os.path.exists(serialport):
# Try toggling usb power
self._check_restart("enable power")
def _lookup_attach_uart_rts(self):
def lookup_attach_uart_rts(self):
# Cheetah boards require RTS to be deasserted
# else a reset will trigger the built-in bootloader.
return (self._restart_method != "cheetah")
def log_info(self):
msgparser = self._serial.get_msgparser()
message_count = len(msgparser.get_messages())
version, build_versions = msgparser.get_version_info()
log_info = [
"Loaded MCU '%s' %d commands (%s / %s)"
% (self._name, message_count, version, build_versions),
"MCU '%s' config: %s" % (self._name, " ".join(
["%s=%s" % (k, v)
for k, v in msgparser.get_constants().items()]))]
return "\n".join(log_info)
def _attach_file(self):
# In a debugging mode. Open debug output file and read data dictionary
start_args = self._printer.get_start_args()
if self._name == 'mcu':
out_fname = start_args.get('debugoutput')
dict_fname = start_args.get('dictionary')
else:
out_fname = start_args.get('debugoutput') + "-" + self._name
dict_fname = start_args.get('dictionary_' + self._name)
outfile = open(out_fname, 'wb')
dfile = open(dict_fname, 'rb')
dict_data = dfile.read()
dfile.close()
self._serial.connect_file(outfile, dict_data)
self._clocksync.connect_file(self._serial)
def _attach(self):
self._check_restart_on_attach()
try:
if self._canbus_iface is not None:
cbid = self._printer.lookup_object('canbus_ids')
nodeid = cbid.get_nodeid(self._serialport)
self._serial.connect_canbus(self._serialport, nodeid,
self._canbus_iface)
elif self._baud:
rts = self._lookup_attach_uart_rts()
self._serial.connect_uart(self._serialport, self._baud, rts)
else:
self._serial.connect_pipe(self._serialport)
self._clocksync.connect(self._serial)
except serialhdl.error as e:
raise error(str(e))
def _post_attach_setup_shutdown(self):
self._emergency_stop_cmd = self._mcu.lookup_command("emergency_stop")
self._mcu.register_response(self._handle_shutdown, 'shutdown')
self._mcu.register_response(self._handle_shutdown, 'is_shutdown')
self._mcu.register_response(self._handle_starting, 'starting')
def _post_attach_setup_restart(self):
def post_attach_setup_restart(self):
self._reset_cmd = self._mcu.try_lookup_command("reset")
self._config_reset_cmd = self._mcu.try_lookup_command("config_reset")
ext_only = self._reset_cmd is None and self._config_reset_cmd is None
@ -707,30 +617,18 @@ class MCUConnectHelper:
self._is_mcu_bridge = True
self._printer.register_event_handler("klippy:firmware_restart",
self._firmware_restart_bridge)
def start_attach(self):
if self._mcu.is_fileoutput():
self._attach_file()
else:
self._attach()
logging.info(self.log_info())
self._post_attach_setup_shutdown()
self._post_attach_setup_restart()
# Restarts
def _disconnect(self):
self._serial.disconnect()
def _shutdown(self, force=False):
if (self._emergency_stop_cmd is None
or (self._is_shutdown and not force)):
return
self._emergency_stop_cmd.send()
def _restart_arduino(self):
logging.info("Attempting MCU '%s' reset", self._name)
self._disconnect()
serialhdl.arduino_reset(self._serialport, self._reactor)
serialport, baud = self._conn_helper.get_serialport()
serialhdl.arduino_reset(serialport, self._reactor)
def _restart_cheetah(self):
logging.info("Attempting MCU '%s' Cheetah-style reset", self._name)
self._disconnect()
serialhdl.cheetah_reset(self._serialport, self._reactor)
serialport, baud = self._conn_helper.get_serialport()
serialhdl.cheetah_reset(serialport, self._reactor)
def _restart_via_command(self):
if ((self._reset_cmd is None and self._config_reset_cmd is None)
or not self._clocksync.is_active()):
@ -740,8 +638,7 @@ class MCUConnectHelper:
if self._reset_cmd is None:
# Attempt reset via config_reset command
logging.info("Attempting MCU '%s' config_reset command", self._name)
self._is_shutdown = True
self._shutdown(force=True)
self._conn_helper.force_local_shutdown()
self._reactor.pause(self._reactor.monotonic() + 0.015)
self._config_reset_cmd.send()
else:
@ -769,6 +666,129 @@ class MCUConnectHelper:
self._restart_arduino()
def _firmware_restart_bridge(self):
self._firmware_restart(True)
# Low-level mcu connection management helper
class MCUConnectHelper:
def __init__(self, config, mcu, clocksync):
self._mcu = mcu
self._clocksync = clocksync
self._printer = printer = config.get_printer()
self._reactor = printer.get_reactor()
self._name = name = mcu.get_name()
# Serial port
self._serial = serialhdl.SerialReader(self._reactor, mcu_name=name)
self._baud = 0
self._canbus_iface = None
canbus_uuid = config.get('canbus_uuid', None)
if canbus_uuid is not None:
self._serialport = canbus_uuid
self._canbus_iface = config.get('canbus_interface', 'can0')
cbid = self._printer.load_object(config, 'canbus_ids')
cbid.add_uuid(config, canbus_uuid, self._canbus_iface)
self._printer.load_object(config, 'canbus_stats %s' % (name,))
else:
self._serialport = config.get('serial')
if not (self._serialport.startswith("/dev/rpmsg_")
or self._serialport.startswith("/tmp/klipper_host_")):
self._baud = config.getint('baud', 250000, minval=2400)
self._emergency_stop_cmd = None
self._is_shutdown = self._is_timeout = False
self._shutdown_clock = 0
self._shutdown_msg = ""
self._restart_helper = MCURestartHelper(config, self)
printer.register_event_handler("klippy:shutdown", self._shutdown)
def get_mcu(self):
return self._mcu
def get_serial(self):
return self._serial
def get_clocksync(self):
return self._clocksync
def get_serialport(self):
return self._serialport, self._baud
def get_restart_helper(self):
return self._restart_helper
def _handle_shutdown(self, params):
if self._is_shutdown:
return
self._is_shutdown = True
clock = params.get("clock")
if clock is not None:
self._shutdown_clock = self._mcu.clock32_to_clock64(clock)
self._shutdown_msg = msg = params['static_string_id']
event_type = params['#name']
self._printer.invoke_async_shutdown(
"MCU shutdown", {"reason": msg, "mcu": self._name,
"event_type": event_type})
logging.info("MCU '%s' %s: %s\n%s\n%s", self._name, event_type,
self._shutdown_msg, self._clocksync.dump_debug(),
self._serial.dump_debug())
def _handle_starting(self, params):
if not self._is_shutdown:
self._printer.invoke_async_shutdown("MCU '%s' spontaneous restart"
% (self._name,))
def log_info(self):
msgparser = self._serial.get_msgparser()
message_count = len(msgparser.get_messages())
version, build_versions = msgparser.get_version_info()
log_info = [
"Loaded MCU '%s' %d commands (%s / %s)"
% (self._name, message_count, version, build_versions),
"MCU '%s' config: %s" % (self._name, " ".join(
["%s=%s" % (k, v)
for k, v in msgparser.get_constants().items()]))]
return "\n".join(log_info)
def _attach_file(self):
# In a debugging mode. Open debug output file and read data dictionary
start_args = self._printer.get_start_args()
if self._name == 'mcu':
out_fname = start_args.get('debugoutput')
dict_fname = start_args.get('dictionary')
else:
out_fname = start_args.get('debugoutput') + "-" + self._name
dict_fname = start_args.get('dictionary_' + self._name)
outfile = open(out_fname, 'wb')
dfile = open(dict_fname, 'rb')
dict_data = dfile.read()
dfile.close()
self._serial.connect_file(outfile, dict_data)
self._clocksync.connect_file(self._serial)
def _attach(self):
self._restart_helper.check_restart_on_attach()
try:
if self._canbus_iface is not None:
cbid = self._printer.lookup_object('canbus_ids')
nodeid = cbid.get_nodeid(self._serialport)
self._serial.connect_canbus(self._serialport, nodeid,
self._canbus_iface)
elif self._baud:
rts = self._restart_helper.lookup_attach_uart_rts()
self._serial.connect_uart(self._serialport, self._baud, rts)
else:
self._serial.connect_pipe(self._serialport)
self._clocksync.connect(self._serial)
except serialhdl.error as e:
raise error(str(e))
def _post_attach_setup_shutdown(self):
self._emergency_stop_cmd = self._mcu.lookup_command("emergency_stop")
self._mcu.register_response(self._handle_shutdown, 'shutdown')
self._mcu.register_response(self._handle_shutdown, 'is_shutdown')
self._mcu.register_response(self._handle_starting, 'starting')
def start_attach(self):
if self._mcu.is_fileoutput():
self._attach_file()
else:
self._attach()
logging.info(self.log_info())
self._post_attach_setup_shutdown()
self._restart_helper.post_attach_setup_restart()
def _shutdown(self, force=False):
if (self._emergency_stop_cmd is None
or (self._is_shutdown and not force)):
return
self._emergency_stop_cmd.send()
def force_local_shutdown(self):
self._is_shutdown = True
self._shutdown(force=True)
def check_timeout(self, eventtime):
if (self._clocksync.is_active() or self._mcu.is_fileoutput()
or self._is_timeout):
@ -884,7 +904,8 @@ class MCUConfigHelper:
config_crc = zlib.crc32(encoded_config) & 0xffffffff
self.add_config_cmd("finalize_config crc=%d" % (config_crc,))
if prev_crc is not None and config_crc != prev_crc:
self._conn_helper.check_restart_on_crc_mismatch()
restart_helper = self._conn_helper.get_restart_helper()
restart_helper.check_restart_on_crc_mismatch()
raise error("MCU '%s' CRC does not match config" % (self._name,))
# Transmit config messages (if needed)
try:
@ -924,7 +945,8 @@ class MCUConfigHelper:
def _connect(self):
config_params = self._send_get_config()
if not config_params['is_config']:
self._conn_helper.check_restart_on_send_config()
restart_helper = self._conn_helper.get_restart_helper()
restart_helper.check_restart_on_send_config()
# Not configured - send config and issue get_config again
self._send_config(None)
config_params = self._send_get_config()