klipper/klippy/extras/trigger_analog.py
Kevin O'Connor 147022dee2 trigger_analog: Update to support generic trigger types
Rework the trigger_analog code to support different "trigger"
conditions.  This merges in features of ldc1612.c into
trigger_analog.c, such as error code reporting in the MCU.  This is in
preparation for using trigger_analog with ldc1612.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
2026-01-25 12:49:01 -05:00

355 lines
15 KiB
Python

# Wrapper around mcu trigger_analog objects
#
# Copyright (C) 2025 Gareth Farrington <gareth@waves.ky>
# Copyright (C) 2026 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import mcu
######################################################################
# SOS filters (Second Order Sectional)
######################################################################
MAX_INT32 = (2 ** 31)
MIN_INT32 = -(2 ** 31) - 1
def assert_is_int32(value, frac_bits):
if value > MAX_INT32 or value < MIN_INT32:
raise OverflowError("Fixed point Q%d.%d overflow"
% (31-frac_bits, frac_bits))
return value
# convert a floating point value to a 32 bit fixed point representation
# checks for overflow
def to_fixed_32(value, frac_bits):
fixed_val = int(value * (2**frac_bits))
return assert_is_int32(fixed_val, frac_bits)
# Digital filter designer and container
class DigitalFilter:
def __init__(self, sps, cfg_error, highpass=None, highpass_order=1,
lowpass=None, lowpass_order=1, notches=None, notch_quality=2.0):
self.filter_sections = []
self.initial_state = []
self.sample_frequency = sps
# an empty filter can be created without SciPi/numpy
if not (highpass or lowpass or notches):
return
try:
import scipy.signal as signal
except:
raise cfg_error("DigitalFilter require the SciPy module")
if highpass:
self.filter_sections.extend(
self._butter(highpass, "highpass", highpass_order))
if lowpass:
self.filter_sections.extend(
self._butter(lowpass, "lowpass", lowpass_order))
if notches is None:
notches = []
for notch_freq in notches:
self.filter_sections.append(self._notch(notch_freq, notch_quality))
if len(self.filter_sections) > 0:
self.initial_state = signal.sosfilt_zi(self.filter_sections)
def _butter(self, frequency, btype, order):
import scipy.signal as signal
return signal.butter(order, Wn=frequency, btype=btype,
fs=self.sample_frequency, output='sos')
def _notch(self, freq, quality):
import scipy.signal as signal
b, a = signal.iirnotch(freq, Q=quality, fs=self.sample_frequency)
return signal.tf2sos(b, a)[0]
def get_filter_sections(self):
return self.filter_sections
def get_initial_state(self):
return self.initial_state
# Control an `sos_filter` object on the MCU
class MCU_SosFilter:
# max_sections should be the largest number of sections you expect
# to use at runtime.
def __init__(self, mcu, cmd_queue, max_sections, coeff_frac_bits=29):
self._mcu = mcu
self._cmd_queue = cmd_queue
self._oid = self._mcu.create_oid()
self._max_sections = max_sections
self._coeff_frac_bits = coeff_frac_bits
self._value_frac_bits = self._scale_frac_bits = 0
self._design = None
self._offset = 0
self._scale = 1
self._set_section_cmd = self._set_state_cmd = None
self._set_active_cmd = self._set_offset_scale_cmd = None
self._last_sent_coeffs = [None] * self._max_sections
self._last_sent_offset_scale = None
self._mcu.add_config_cmd("config_sos_filter oid=%d max_sections=%d"
% (self._oid, self._max_sections))
self._mcu.register_config_callback(self._build_config)
def _validate_frac_bits(self, frac_bits):
if frac_bits < 0 or frac_bits > 31:
raise ValueError("The number of fractional bits (%i) must be a"
" value between 0 and 31" % (frac_bits,))
return frac_bits
def _build_config(self):
self._set_section_cmd = self._mcu.lookup_command(
"sos_filter_set_section oid=%c section_idx=%c"
" sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i", cq=self._cmd_queue)
self._set_state_cmd = self._mcu.lookup_command(
"sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i",
cq=self._cmd_queue)
self._set_offset_scale_cmd = self._mcu.lookup_command(
"sos_filter_set_offset_scale oid=%c offset=%i"
" scale=%i scale_frac_bits=%c", cq=self._cmd_queue)
self._set_active_cmd = self._mcu.lookup_command(
"sos_filter_set_active oid=%c n_sections=%c coeff_frac_bits=%c",
cq=self._cmd_queue)
def get_oid(self):
return self._oid
def convert_value(self, val):
return to_fixed_32(val, self._value_frac_bits)
# convert the SciPi SOS filters to fixed point format
def _convert_filter(self):
if self._design is None:
return []
filter_sections = self._design.get_filter_sections()
sos_fixed = []
for section in filter_sections:
nun_coeff = len(section)
if nun_coeff != 6:
raise ValueError("The number of filter coefficients is %i"
", must be 6" % (nun_coeff,))
fixed_section = []
for col, coeff in enumerate(section):
if col != 3: # omit column 3
fixed_coeff = to_fixed_32(coeff, self._coeff_frac_bits)
fixed_section.append(fixed_coeff)
elif coeff != 1.0: # double check column 3 is always 1.0
raise ValueError("Coefficient 3 is expected to be 1.0"
" but was %f" % (coeff,))
sos_fixed.append(fixed_section)
return sos_fixed
# convert the SOS filter state matrix (zi) to fixed point format
def _convert_state(self):
if self._design is None:
return []
filter_state = self._design.get_initial_state()
sos_state = []
for section in filter_state:
nun_states = len(section)
if nun_states != 2:
raise ValueError(
"The number of state elements is %i, must be 2"
% (nun_states,))
fixed_state = []
for col, value in enumerate(section):
fixed_state.append(to_fixed_32(value, self._value_frac_bits))
sos_state.append(fixed_state)
return sos_state
# Set conversion of a raw value 1 to a 1.0 value processed by sos filter
def set_offset_scale(self, offset, scale, scale_frac_bits=0,
value_frac_bits=0):
self._offset = offset
self._value_frac_bits = value_frac_bits
scale_mult = scale * float(1 << value_frac_bits)
self._scale = to_fixed_32(scale_mult, scale_frac_bits)
self._scale_frac_bits = scale_frac_bits
# Change the filter coefficients and state at runtime
def set_filter_design(self, design):
self._design = design
# Resets the filter state back to initial conditions at runtime
def reset_filter(self):
# Generate filter parameters
sos_fixed = self._convert_filter()
sos_state = self._convert_state()
num_sections = len(sos_fixed)
if num_sections > self._max_sections:
raise ValueError("Too many filter sections: %i, The max is %i"
% (num_sections, self._max_sections,))
if len(sos_state) != num_sections:
raise ValueError("The number of filter sections (%i) and state "
"sections (%i) must be equal"
% (num_sections, len(sos_state)))
# Send section coefficients (if they have changed)
for i, section in enumerate(sos_fixed):
args = (self._oid, i, section[0], section[1], section[2],
section[3], section[4])
if args == self._last_sent_coeffs[i]:
continue
self._set_section_cmd.send(args)
self._last_sent_coeffs[i] = args
# Send section initial states
for i, state in enumerate(sos_state):
self._set_state_cmd.send([self._oid, i, state[0], state[1]])
# Send offset/scale (if they have changed)
args = (self._oid, self._offset, self._scale, self._scale_frac_bits)
if args != self._last_sent_offset_scale:
self._set_offset_scale_cmd.send(args)
self._last_sent_offset_scale = args
# Activate filter
if self._max_sections:
self._set_active_cmd.send([self._oid, num_sections,
self._coeff_frac_bits])
######################################################################
# Trigger Analog
######################################################################
# MCU_trigger_analog is the interface to `trigger_analog` on the MCU
class MCU_trigger_analog:
MONITOR_MAX = 3
REASON_TRIGGER_ANALOG = mcu.MCU_trsync.REASON_COMMS_TIMEOUT + 1
def __init__(self, sensor_inst):
self._printer = sensor_inst.get_mcu().get_printer()
self._sensor = sensor_inst
self._mcu = self._sensor.get_mcu()
self._sos_filter = None
self._dispatch = mcu.TriggerDispatch(self._mcu)
self._last_trigger_time = 0.
# Raw range checking
self._raw_min = self._raw_max = 0
self._last_range_args = None
# Trigger type
self._trigger_type = "unspecified"
self._trigger_value = 0.
self._last_trigger_args = None
# Error codes from MCU
self._error_map = {}
self._sensor_specific_error = 0
# Configure MCU objects
self._oid = self._mcu.create_oid()
self._home_cmd = self._query_state_cmd = None
self._set_raw_range_cmd = self._set_trigger_cmd = None
self._mcu.register_config_callback(self._build_config)
def setup_sos_filter(self, sos_filter):
self._sos_filter = sos_filter
def _build_config(self):
self._sensor.setup_trigger_analog(self._oid)
cmd_queue = self._dispatch.get_command_queue()
if self._sos_filter is None:
self.setup_sos_filter(MCU_SosFilter(self._mcu, cmd_queue, 0, 0))
self._mcu.add_config_cmd(
"config_trigger_analog oid=%d sos_filter_oid=%d" % (
self._oid, self._sos_filter.get_oid()))
# Lookup commands
self._query_state_cmd = self._mcu.lookup_query_command(
"trigger_analog_query_state oid=%c",
"trigger_analog_state oid=%c homing=%c trigger_clock=%u",
oid=self._oid, cq=cmd_queue)
self._set_raw_range_cmd = self._mcu.lookup_command(
"trigger_analog_set_raw_range oid=%c raw_min=%i raw_max=%i",
cq=cmd_queue)
self._set_trigger_cmd = self._mcu.lookup_command(
"trigger_analog_set_trigger oid=%c trigger_analog_type=%c"
" trigger_value=%i", cq=cmd_queue)
self._home_cmd = self._mcu.lookup_command(
"trigger_analog_home oid=%c trsync_oid=%c trigger_reason=%c"
" error_reason=%c clock=%u monitor_ticks=%u monitor_max=%u",
cq=cmd_queue)
# Load errors from mcu
errors = self._mcu.get_enumerations().get("trigger_analog_error:", {})
self._error_map = {v: k for k, v in errors.items()}
self._sensor_specific_error = errors.get("SENSOR_SPECIFIC", 0)
def get_oid(self):
return self._oid
def get_mcu(self):
return self._mcu
def get_sos_filter(self):
return self._sos_filter
def get_dispatch(self):
return self._dispatch
def get_last_trigger_time(self):
return self._last_trigger_time
def set_trigger(self, trigger_type, trigger_value):
self._trigger_type = trigger_type
self._trigger_value = trigger_value
def set_raw_range(self, raw_min, raw_max):
self._raw_min = raw_min
self._raw_max = raw_max
def _reset_filter(self):
# Update raw range parameters in mcu (if they have changed)
args = [self._oid, self._raw_min, self._raw_max]
if args != self._last_range_args:
self._set_raw_range_cmd.send(args)
self._last_range_args = args
# Update trigger in mcu (if it has changed)
tval32 = self._sos_filter.convert_value(self._trigger_value)
args = [self._oid, self._trigger_type, tval32]
if args != self._last_trigger_args:
self._set_trigger_cmd.send(args)
self._last_trigger_args = args
# Update sos filter in mcu
self._sos_filter.reset_filter()
def _clear_home(self):
self._home_cmd.send([self._oid, 0, 0, 0, 0, 0, 0, 0])
params = self._query_state_cmd.send([self._oid])
trigger_ticks = self._mcu.clock32_to_clock64(params['trigger_clock'])
return self._mcu.clock_to_print_time(trigger_ticks)
def get_steppers(self):
return self._dispatch.get_steppers()
def home_start(self, print_time, sample_time, sample_count, rest_time,
triggered=True):
self._last_trigger_time = 0.
self._reset_filter()
trigger_completion = self._dispatch.start(print_time)
clock = self._mcu.print_time_to_clock(print_time)
sensor_update = 1. / self._sensor.get_samples_per_second()
sm_ticks = self._mcu.seconds_to_clock(sensor_update)
self._home_cmd.send([self._oid, self._dispatch.get_oid(),
mcu.MCU_trsync.REASON_ENDSTOP_HIT, self.REASON_TRIGGER_ANALOG,
clock, sm_ticks, self.MONITOR_MAX], reqclock=clock)
return trigger_completion
def home_wait(self, home_end_time):
self._dispatch.wait_end(home_end_time)
# trigger has happened, now to find out why...
res = self._dispatch.stop()
# clear the homing state so it stops processing samples
trigger_time = self._clear_home()
if res >= mcu.MCU_trsync.REASON_COMMS_TIMEOUT:
if res == mcu.MCU_trsync.REASON_COMMS_TIMEOUT:
raise self._printer.command_error(
"Communication timeout during homing")
error_code = res - self.REASON_TRIGGER_ANALOG
if error_code >= self._sensor_specific_error:
sensor_err = error_code - self._sensor_specific_error
error_msg = self._sensor.lookup_sensor_error(sensor_err)
else:
defmsg = "Unknown code %i" % (error_code,)
error_msg = self._error_map.get(error_code, defmsg)
raise self._printer.command_error("Trigger analog error: %s"
% (error_msg,))
if res != mcu.MCU_trsync.REASON_ENDSTOP_HIT:
return 0.
if self._mcu.is_fileoutput():
trigger_time = home_end_time
self._last_trigger_time = trigger_time
return trigger_time