probe_eddy_current: Rework EddyGatherSamples()

Rework the internal EddyGatherSamples() class with a goal of making it
easier to add tap analysis in the future.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2026-01-22 10:26:38 -05:00 committed by KevinOConnor
parent 7f822f3a5c
commit 9957546ae0

View file

@ -34,8 +34,12 @@ class EddyCalibration:
gcode.register_command('Z_OFFSET_APPLY_PROBE',
self.cmd_Z_OFFSET_APPLY_PROBE,
desc=self.cmd_Z_OFFSET_APPLY_PROBE_help)
def is_calibrated(self):
return len(self.cal_freqs) > 2
def get_printer(self):
return self.printer
def verify_calibrated(self):
if len(self.cal_freqs) <= 2:
raise self.printer.command_error(
"Must calibrate probe_eddy_current first")
def load_calibration(self, cal):
cal = sorted([(c[1], c[0]) for c in cal])
self.cal_freqs = [c[0] for c in cal]
@ -278,56 +282,33 @@ class EddyCalibration:
# Tool to gather samples and convert them to probe positions
class EddyGatherSamples:
def __init__(self, printer, sensor_helper, calibration, offsets):
def __init__(self, printer, sensor_helper):
self._printer = printer
self._sensor_helper = sensor_helper
self._calibration = calibration
self._offsets = offsets
# Results storage
self._samples = []
self._probe_times = []
self._probe_results = []
# Sensor reading
self._sensor_messages = []
self._need_stop = False
# Probe request and results storage
self._probe_requests = []
self._analysis_results = []
# Start samples
if not self._calibration.is_calibrated():
raise self._printer.command_error(
"Must calibrate probe_eddy_current first")
sensor_helper.add_client(self._add_measurement)
def _add_measurement(self, msg):
sensor_helper.add_client(self._add_sensor_message)
# Sensor reading and measurement extraction
def _add_sensor_message(self, msg):
if self._need_stop:
del self._samples[:]
del self._sensor_messages[:]
return False
self._samples.append(msg)
self._check_samples()
self._sensor_messages.append(msg)
self._check_sensor_messages()
return True
def finish(self):
self._need_stop = True
def _await_samples(self):
# Make sure enough samples have been collected
reactor = self._printer.get_reactor()
mcu = self._sensor_helper.get_mcu()
while self._probe_times:
start_time, end_time, pos_time, toolhead_pos = self._probe_times[0]
systime = reactor.monotonic()
est_print_time = mcu.estimated_print_time(systime)
if est_print_time > end_time + 1.0:
raise self._printer.command_error(
"probe_eddy_current sensor outage")
if mcu.is_fileoutput():
# In debugging mode
if pos_time is not None:
toolhead_pos = self._lookup_toolhead_pos(pos_time)
self._probe_results.append((toolhead_pos[2], toolhead_pos))
self._probe_times.pop(0)
continue
reactor.pause(systime + 0.010)
def _pull_freq(self, start_time, end_time):
# Find average sensor frequency between time range
def _pull_measurements(self, start_time, end_time):
# Extract measurements from sensor messages for given time range
measures = []
msg_num = discard_msgs = 0
samp_sum = 0.
samp_count = 0
while msg_num < len(self._samples):
msg = self._samples[msg_num]
while msg_num < len(self._sensor_messages):
msg = self._sensor_messages[msg_num]
msg_num += 1
data = msg['data']
if data[0][0] > end_time:
@ -335,58 +316,78 @@ class EddyGatherSamples:
if data[-1][0] < start_time:
discard_msgs = msg_num
continue
for time, freq, z in data:
if time >= start_time and time <= end_time:
samp_sum += freq
samp_count += 1
del self._samples[:discard_msgs]
if not samp_count:
# No sensor readings - raise error in pull_probed()
return 0.
return samp_sum / samp_count
def _lookup_toolhead_pos(self, pos_time):
toolhead = self._printer.lookup_object('toolhead')
kin = toolhead.get_kinematics()
kin_spos = {s.get_name(): s.mcu_to_commanded_position(
s.get_past_mcu_position(pos_time))
for s in kin.get_steppers()}
return kin.calc_position(kin_spos)
def _check_samples(self):
while self._samples and self._probe_times:
start_time, end_time, pos_time, toolhead_pos = self._probe_times[0]
if self._samples[-1]['data'][-1][0] < end_time:
for measure in data:
time = measure[0]
if time < start_time:
continue
if time > end_time:
break
measures.append(measure)
del self._sensor_messages[:discard_msgs]
return measures
def _check_sensor_messages(self):
while self._sensor_messages and self._probe_requests:
cb, start_time, end_time, args = self._probe_requests[0]
if self._sensor_messages[-1]['data'][-1][0] < end_time:
break
freq = self._pull_freq(start_time, end_time)
if pos_time is not None:
toolhead_pos = self._lookup_toolhead_pos(pos_time)
sensor_z = None
if freq:
sensor_z = self._calibration.freq_to_height(freq)
self._probe_results.append((sensor_z, toolhead_pos))
self._probe_times.pop(0)
measures = self._pull_measurements(start_time, end_time)
errmsg = res = None
try:
# Call analysis callback to process measurements
res = cb(measures, *args)
except self._printer.command_error as e:
# Defer raising of errors to pull_probed()
errmsg = str(e)
self._analysis_results.append((res, errmsg))
self._probe_requests.pop(0)
def add_probe_request(self, cb, start_time, end_time, *args):
self._probe_requests.append((cb, start_time, end_time, args))
self._check_sensor_messages()
# Extract probe results
def _await_sensor_messages(self):
# Make sure enough samples have been collected
reactor = self._printer.get_reactor()
mcu = self._sensor_helper.get_mcu()
while self._probe_requests:
cb, start_time, end_time, args = self._probe_requests[0]
systime = reactor.monotonic()
est_print_time = mcu.estimated_print_time(systime)
if est_print_time > end_time + 1.0:
raise self._printer.command_error(
"probe_eddy_current sensor outage")
if mcu.is_fileoutput():
# In debugging mode - just create dummy response
dummy_pr = manual_probe.ProbeResult(0., 0., 0., 0., 0., 0.)
self._analysis_results.append((dummy_pr, None))
self._probe_requests.pop(0)
continue
reactor.pause(systime + 0.010)
def pull_probed(self):
self._await_samples()
self._await_sensor_messages()
results = []
for sensor_z, toolhead_pos in self._probe_results:
if sensor_z is None:
raise self._printer.command_error(
"Unable to obtain probe_eddy_current sensor readings")
if sensor_z <= -OUT_OF_RANGE or sensor_z >= OUT_OF_RANGE:
raise self._printer.command_error(
"probe_eddy_current sensor not in valid range")
res = manual_probe.ProbeResult(
toolhead_pos[0]+self._offsets[0],
toolhead_pos[1]+self._offsets[1], toolhead_pos[2]-sensor_z,
toolhead_pos[0], toolhead_pos[1], toolhead_pos[2])
for res, errmsg in self._analysis_results:
if errmsg is not None:
raise self._printer.command_error(errmsg)
results.append(res)
del self._probe_results[:]
del self._analysis_results[:]
return results
def note_probe(self, start_time, end_time, toolhead_pos):
self._probe_times.append((start_time, end_time, None, toolhead_pos))
self._check_samples()
def note_probe_and_position(self, start_time, end_time, pos_time):
self._probe_times.append((start_time, end_time, pos_time, None))
self._check_samples()
# Generate a ProbeResult from the average of a set of measurements
def probe_results_from_avg(measures, toolhead_pos, calibration, offsets):
cmderr = calibration.get_printer().command_error
if not measures:
raise cmderr("Unable to obtain probe_eddy_current sensor readings")
# Determine average of measurements
freq_sum = sum([m[1] for m in measures])
freq_avg = freq_sum / len(measures)
# Determine height associated with frequency
sensor_z = calibration.freq_to_height(freq_avg)
if sensor_z <= -OUT_OF_RANGE or sensor_z >= OUT_OF_RANGE:
raise cmderr("probe_eddy_current sensor not in valid range")
return manual_probe.ProbeResult(
toolhead_pos[0] + offsets[0], toolhead_pos[1] + offsets[1],
toolhead_pos[2] - sensor_z,
toolhead_pos[0], toolhead_pos[1], toolhead_pos[2])
MAX_VALID_RAW_VALUE=0x03ffffff
@ -396,7 +397,6 @@ class EddyDescend:
probe_offsets, param_helper):
self._printer = config.get_printer()
self._sensor_helper = sensor_helper
self._mcu = sensor_helper.get_mcu()
self._calibration = calibration
self._probe_offsets = probe_offsets
self._param_helper = param_helper
@ -413,10 +413,9 @@ class EddyDescend:
self._trigger_analog.set_trigger('gt', conv_freq)
# Probe session interface
def start_probe_session(self, gcmd):
self._calibration.verify_calibrated()
self._prep_trigger_analog()
offsets = self._probe_offsets.get_offsets()
self._gather = EddyGatherSamples(self._printer, self._sensor_helper,
self._calibration, offsets)
self._gather = EddyGatherSamples(self._printer, self._sensor_helper)
return self
def run_probe(self, gcmd):
toolhead = self._printer.lookup_object('toolhead')
@ -430,7 +429,10 @@ class EddyDescend:
start_time = self._trigger_analog.get_last_trigger_time() + 0.050
end_time = start_time + 0.100
toolhead_pos = toolhead.get_position()
self._gather.note_probe(start_time, end_time, toolhead_pos)
offsets = self._probe_offsets.get_offsets()
self._gather.add_probe_request(probe_results_from_avg,
start_time, end_time,
toolhead_pos, self._calibration, offsets)
def pull_probed_results(self):
return self._gather.pull_probed()
def end_probe_session(self):
@ -477,19 +479,31 @@ class EddyEndstopWrapper:
class EddyScanningProbe:
def __init__(self, printer, sensor_helper, calibration, probe_offsets,
gcmd):
calibration.verify_calibrated()
self._printer = printer
self._sensor_helper = sensor_helper
self._calibration = calibration
offsets = probe_offsets.get_offsets()
self._gather = EddyGatherSamples(printer, sensor_helper,
calibration, offsets)
self._offsets = probe_offsets.get_offsets()
self._gather = EddyGatherSamples(printer, sensor_helper)
self._sample_time_delay = 0.050
self._sample_time = gcmd.get_float("SAMPLE_TIME", 0.100, above=0.0)
self._is_rapid = gcmd.get("METHOD", "scan") == 'rapid_scan'
def _lookup_toolhead_pos(self, pos_time):
toolhead = self._printer.lookup_object('toolhead')
kin = toolhead.get_kinematics()
kin_spos = {s.get_name(): s.mcu_to_commanded_position(
s.get_past_mcu_position(pos_time))
for s in kin.get_steppers()}
return kin.calc_position(kin_spos)
def _analyze_scan(self, measures, pos_time):
toolhead_pos = self._lookup_toolhead_pos(pos_time)
return probe_results_from_avg(measures, toolhead_pos,
self._calibration, self._offsets)
def _rapid_lookahead_cb(self, printtime):
start_time = printtime - self._sample_time / 2
self._gather.note_probe_and_position(
start_time, start_time + self._sample_time, printtime)
end_time = start_time + self._sample_time
self._gather.add_probe_request(self._analyze_scan, start_time, end_time,
printtime)
def run_probe(self, gcmd):
toolhead = self._printer.lookup_object("toolhead")
if self._is_rapid:
@ -498,8 +512,9 @@ class EddyScanningProbe:
printtime = toolhead.get_last_move_time()
toolhead.dwell(self._sample_time_delay + self._sample_time)
start_time = printtime + self._sample_time_delay
self._gather.note_probe_and_position(
start_time, start_time + self._sample_time, start_time)
end_time = start_time + self._sample_time
self._gather.add_probe_request(self._analyze_scan, start_time, end_time,
start_time)
def pull_probed_results(self):
if self._is_rapid:
# Flush lookahead (so all lookahead callbacks are invoked)