From 165fe1730d9d9ec902d8345b415fd50d3956f8ef Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Tue, 20 Jan 2026 19:16:31 -0500 Subject: [PATCH] load_cell_probe: Move phoming.probing_move() interface to MCU_trigger_analog Signed-off-by: Kevin O'Connor --- klippy/extras/load_cell_probe.py | 127 ++++++++++++++----------------- 1 file changed, 58 insertions(+), 69 deletions(-) diff --git a/klippy/extras/load_cell_probe.py b/klippy/extras/load_cell_probe.py index 4daeec2a2..ea4aaf767 100644 --- a/klippy/extras/load_cell_probe.py +++ b/klippy/extras/load_cell_probe.py @@ -242,7 +242,6 @@ class LoadCellProbeConfigHelper: self._printer = config.get_printer() self._load_cell = load_cell_inst self._sensor = load_cell_inst.get_sensor() - self._rest_time = 1. / float(self._sensor.get_samples_per_second()) # Collect 4 x 60hz power cycles of data to average across power noise self._tare_time_param = floatParamHelper(config, 'tare_time', default=4. / 60., minval=0.01, maxval=1.0) @@ -263,9 +262,6 @@ class LoadCellProbeConfigHelper: def get_safety_limit_grams(self, gcmd=None): return self._force_safety_limit_param.get(gcmd) - def get_rest_time(self): - return self._rest_time - def get_safety_range(self, gcmd=None): counts_per_gram = self._load_cell.get_counts_per_gram() # calculate the safety band @@ -294,16 +290,21 @@ class LoadCellProbeConfigHelper: # MCU_trigger_analog is the interface to `trigger_analog` on the MCU # This also manages the SosFilter so all commands use one command queue class MCU_trigger_analog: - WATCHDOG_MAX = 3 + MONITOR_MAX = 3 ERROR_SAFETY_RANGE = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 1 ERROR_OVERFLOW = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 2 ERROR_WATCHDOG = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 3 + ERROR_MAP = { + mcu.MCU_trsync.REASON_COMMS_TIMEOUT: "Communication timeout during " + "homing", + ERROR_SAFETY_RANGE: "sensor exceeds safety limit", + ERROR_OVERFLOW: "fixed point math overflow", + ERROR_WATCHDOG: "timed out waiting for sensor data" + } - def __init__(self, config, sensor_inst, sos_filter_inst, config_helper, - trigger_dispatch): - self._printer = config.get_printer() + def __init__(self, sensor_inst, sos_filter_inst, trigger_dispatch): + self._printer = sensor_inst.get_mcu().get_printer() self._sos_filter = sos_filter_inst - self._config_helper = config_helper self._sensor = sensor_inst self._mcu = self._sensor.get_mcu() # configure MCU objects @@ -313,6 +314,7 @@ class MCU_trigger_analog: self._raw_min = self._raw_max = 0 self._last_range_args = None self._trigger_value = 0. + self._last_trigger_time = 0. self._config_commands() self._home_cmd = None self._query_cmd = None @@ -355,6 +357,9 @@ class MCU_trigger_analog: def get_dispatch(self): return self._dispatch + def get_last_trigger_time(self): + return self._last_trigger_time + def set_trigger_value(self, trigger_value): self._trigger_value = trigger_value @@ -362,7 +367,7 @@ class MCU_trigger_analog: self._raw_min = raw_min self._raw_max = raw_max - def reset_filter(self): + def _reset_filter(self): # Update parameters in mcu (if they have changed) tval32 = self._sos_filter.convert_value(self._trigger_value) args = [self._oid, self._raw_min, self._raw_max, tval32] @@ -372,15 +377,7 @@ class MCU_trigger_analog: # Update sos filter in mcu self._sos_filter.reset_filter() - def home_start(self, print_time): - clock = self._mcu.print_time_to_clock(print_time) - rest_time = self._config_helper.get_rest_time() - rest_ticks = self._mcu.seconds_to_clock(rest_time) - self._home_cmd.send([self._oid, self._dispatch.get_oid(), - mcu.MCU_trsync.REASON_ENDSTOP_HIT, self.ERROR_SAFETY_RANGE, clock, - rest_ticks, self.WATCHDOG_MAX], reqclock=clock) - - def clear_home(self): + def _clear_home(self): params = self._query_cmd.send([self._oid]) # The time of the first sample that triggered is in "trigger_ticks" trigger_ticks = self._mcu.clock32_to_clock64(params['trigger_ticks']) @@ -388,20 +385,43 @@ class MCU_trigger_analog: self._home_cmd.send([self._oid, 0, 0, 0, 0, 0, 0, 0]) return self._mcu.clock_to_print_time(trigger_ticks) + def get_steppers(self): + return self._dispatch.get_steppers() + + def home_start(self, print_time, sample_time, sample_count, rest_time, + triggered=True): + self._last_trigger_time = 0. + self._reset_filter() + trigger_completion = self._dispatch.start(print_time) + clock = self._mcu.print_time_to_clock(print_time) + sensor_update = 1. / self._sensor.get_samples_per_second() + sm_ticks = self._mcu.seconds_to_clock(sensor_update) + self._home_cmd.send([self._oid, self._dispatch.get_oid(), + mcu.MCU_trsync.REASON_ENDSTOP_HIT, self.ERROR_SAFETY_RANGE, clock, + sm_ticks, self.MONITOR_MAX], reqclock=clock) + return trigger_completion + + def home_wait(self, home_end_time): + self._dispatch.wait_end(home_end_time) + # trigger has happened, now to find out why... + res = self._dispatch.stop() + # clear the homing state so it stops processing samples + trigger_time = self._clear_home() + if res >= mcu.MCU_trsync.REASON_COMMS_TIMEOUT: + defmsg = "unknown reason code %i" % (res,) + error_msg = self.ERROR_MAP.get(res, defmsg) + raise self._printer.command_error("Trigger analog error: %s" + % (error_msg,)) + if res != mcu.MCU_trsync.REASON_ENDSTOP_HIT: + return 0. + if self._mcu.is_fileoutput(): + trigger_time = home_end_time + self._last_trigger_time = trigger_time + return trigger_time + # Execute probing moves using the MCU_trigger_analog class LoadCellProbingMove: - ERROR_MAP = { - mcu.MCU_trsync.REASON_COMMS_TIMEOUT: "Communication timeout during " - "homing", - MCU_trigger_analog.ERROR_SAFETY_RANGE: "Load Cell Probe Error: load " - "exceeds safety limit", - MCU_trigger_analog.ERROR_OVERFLOW: "Load Cell Probe Error: fixed point " - "math overflow", - MCU_trigger_analog.ERROR_WATCHDOG: "Load Cell Probe Error: timed out " - "waiting for sensor data" - } - def __init__(self, config, load_cell_inst, mcu_trigger_analog, param_helper, continuous_tare_filter_helper, config_helper): self._printer = config.get_printer() @@ -416,7 +436,6 @@ class LoadCellProbingMove: probe.LookupZSteppers(config, self._dispatch.add_stepper) # internal state tracking self._tare_counts = 0 - self._last_trigger_time = 0 def _start_collector(self): toolhead = self._printer.lookup_object('toolhead') @@ -451,37 +470,6 @@ class LoadCellProbingMove: Q17_14_FRAC_BITS = 14 sos_filter.set_offset_scale(int(-tare_counts), gpc, Q17_14_FRAC_BITS, Q16_15_FRAC_BITS) - self._mcu_trigger_analog.reset_filter() - - def _home_start(self, print_time): - # start trsync - trigger_completion = self._dispatch.start(print_time) - self._mcu_trigger_analog.home_start(print_time) - return trigger_completion - - def home_start(self, print_time, sample_time, sample_count, rest_time, - triggered=True): - return self._home_start(print_time) - - def home_wait(self, home_end_time): - self._dispatch.wait_end(home_end_time) - # trigger has happened, now to find out why... - res = self._dispatch.stop() - # clear the homing state so it stops processing samples - self._last_trigger_time = self._mcu_trigger_analog.clear_home() - if self._mcu.is_fileoutput(): - self._last_trigger_time = home_end_time - if res >= mcu.MCU_trsync.REASON_COMMS_TIMEOUT: - error = "Load Cell Probe Error: unknown reason code %i" % (res,) - if res in self.ERROR_MAP: - error = self.ERROR_MAP[res] - raise self._printer.command_error(error) - if res != mcu.MCU_trsync.REASON_ENDSTOP_HIT: - return 0. - return self._last_trigger_time - - def get_steppers(self): - return self._dispatch.get_steppers() # Probe towards z_min until the trigger_analog on the MCU triggers def probing_move(self, gcmd): @@ -499,20 +487,22 @@ class LoadCellProbingMove: # start collector after tare samples are consumed collector = self._start_collector() # do homing move - return phoming.probing_move(self, pos, speed), collector + epos = phoming.probing_move(self._mcu_trigger_analog, pos, speed) + return epos, collector # Wait for the MCU to trigger with no movement def probing_test(self, gcmd, timeout): self._pause_and_tare(gcmd) toolhead = self._printer.lookup_object('toolhead') print_time = toolhead.get_last_move_time() - self._home_start(print_time) - return self.home_wait(print_time + timeout) + self._mcu_trigger_analog.home_start(print_time, 0., 0, 0.) + return self._mcu_trigger_analog.home_wait(print_time + timeout) def get_status(self, eventtime): + trig_time = self._mcu_trigger_analog.get_last_trigger_time() return { 'tare_counts': self._tare_counts, - 'last_trigger_time': self._last_trigger_time, + 'last_trigger_time': trig_time, } @@ -642,9 +632,8 @@ class LoadCellPrinterProbe: self._param_helper = probe.ProbeParameterHelper(config) self._cmd_helper = probe.ProbeCommandHelper(config, self) self._probe_offsets = probe.ProbeOffsetsHelper(config) - self._mcu_trigger_analog = MCU_trigger_analog(config, sensor, - continuous_tare_filter_helper.get_sos_filter(), config_helper, - trigger_dispatch) + self._mcu_trigger_analog = MCU_trigger_analog(sensor, + continuous_tare_filter_helper.get_sos_filter(), trigger_dispatch) load_cell_probing_move = LoadCellProbingMove(config, self._load_cell, self._mcu_trigger_analog, self._param_helper, continuous_tare_filter_helper, config_helper)