From 3b5045ed9e0217c93ca984d56d866c2826c564c6 Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Thu, 8 Jan 2026 19:44:15 -0500 Subject: [PATCH] sos_filter: Handle fixed point conversion within MCU_SosFilter Merge the logic from the FixedPointSosFilter class into the MCU_SosFilter class. Signed-off-by: Kevin O'Connor --- klippy/extras/load_cell_probe.py | 18 ++--- klippy/extras/sos_filter.py | 131 +++++++++++++------------------ 2 files changed, 61 insertions(+), 88 deletions(-) diff --git a/klippy/extras/load_cell_probe.py b/klippy/extras/load_cell_probe.py index 0ee7b77ea..3b08b4510 100644 --- a/klippy/extras/load_cell_probe.py +++ b/klippy/extras/load_cell_probe.py @@ -163,13 +163,9 @@ class ContinuousTareFilter: # create a filter design from the parameters def design_filter(self, error_func): - design = sos_filter.DigitalFilter(self.sps, error_func, self.drift, + return sos_filter.DigitalFilter(self.sps, error_func, self.drift, self.drift_delay, self.buzz, self.buzz_delay, self.notches, self.notch_quality) - fixed_filter = sos_filter.FixedPointSosFilter( - design.get_filter_sections(), design.get_initial_state(), - Q2_INT_BITS, Q16_INT_BITS) - return fixed_filter # Combine ContinuousTareFilter and SosFilter into an easy-to-use class @@ -214,9 +210,11 @@ class ContinuousTareFilterHelper: return ContinuousTareFilter(self._sps, drift, drift_delay, buzz, buzz_delay, notches, notch_quality) - def _create_filter(self, fixed_filter, cmd_queue): - return sos_filter.MCU_SosFilter(self._sensor.get_mcu(), cmd_queue, - fixed_filter, 4) + def _create_filter(self, design, cmd_queue): + sf = sos_filter.MCU_SosFilter(self._sensor.get_mcu(), cmd_queue, 4, + Q2_INT_BITS, Q16_INT_BITS) + sf.set_filter_design(design) + return sf def update_from_command(self, gcmd, cq=None): gcmd_filter = self._build_filter(gcmd) @@ -224,8 +222,8 @@ class ContinuousTareFilterHelper: if self._active_design == gcmd_filter: return # update MCU filter from GCode command - self._sos_filter.change_filter( - self._active_design.design_filter(gcmd.error)) + design = self._active_design.design_filter(gcmd.error) + self._sos_filter.set_filter_design(design) def get_sos_filter(self): return self._sos_filter diff --git a/klippy/extras/sos_filter.py b/klippy/extras/sos_filter.py index dd9725d06..9c9cda499 100644 --- a/klippy/extras/sos_filter.py +++ b/klippy/extras/sos_filter.py @@ -63,41 +63,25 @@ class DigitalFilter: def get_initial_state(self): return self.initial_state -# container that accepts SciPy formatted SOS filter data and converts it to a -# selected fixed point representation. This data could come from DigitalFilter, -# static data, config etc. -class FixedPointSosFilter: - # filter_sections is an array of SciPy formatted SOS filter sections (sos) - # initial_state is an array of SciPy formatted SOS state sections (zi) - def __init__(self, filter_sections=None, initial_state=None, - coeff_int_bits=2, value_int_bits=15): - filter_sections = [] if filter_sections is None else filter_sections - initial_state = [] if initial_state is None else initial_state - num_sections = len(filter_sections) - num_state = len(initial_state) - if num_state != num_sections: - raise ValueError("The number of filter sections (%i) and state " - "sections (%i) must be equal" % ( - num_sections, num_state)) - self._coeff_int_bits = self._validate_int_bits(coeff_int_bits) - self._value_int_bits = self._validate_int_bits(value_int_bits) - self._filter = self._convert_filter(filter_sections) - self._state = self._convert_state(initial_state) - def get_filter_sections(self): - return self._filter - - def get_initial_state(self): - return self._state - - def get_coeff_int_bits(self): - return self._coeff_int_bits - - def get_value_int_bits(self): - return self._value_int_bits - - def get_num_sections(self): - return len(self._filter) +# Control an `sos_filter` object on the MCU +class MCU_SosFilter: + # max_sections should be the largest number of sections you expect + # to use at runtime. + def __init__(self, mcu, cmd_queue, max_sections, + coeff_int_bits=2, value_int_bits=15): + self._mcu = mcu + self._cmd_queue = cmd_queue + self._oid = self._mcu.create_oid() + self._max_sections = max_sections + self._coeff_int_bits = coeff_int_bits + self._value_int_bits = value_int_bits + self._design = None + self._set_section_cmd = self._set_state_cmd = self._set_active_cmd =None + self._last_sent_coeffs = [None] * self._max_sections + self._mcu.add_config_cmd("config_sos_filter oid=%d max_sections=%d" + % (self._oid, self._max_sections)) + self._mcu.register_config_callback(self._build_config) def _validate_int_bits(self, int_bits): if int_bits < 1 or int_bits > 30: @@ -105,8 +89,25 @@ class FixedPointSosFilter: " value between 1 and 30" % (int_bits,)) return int_bits + def _build_config(self): + self._set_section_cmd = self._mcu.lookup_command( + "sos_filter_set_section oid=%c section_idx=%c" + " sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i", cq=self._cmd_queue) + self._set_state_cmd = self._mcu.lookup_command( + "sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i", + cq=self._cmd_queue) + self._set_active_cmd = self._mcu.lookup_command( + "sos_filter_set_active oid=%c n_sections=%c coeff_int_bits=%c", + cq=self._cmd_queue) + + def get_oid(self): + return self._oid + # convert the SciPi SOS filters to fixed point format - def _convert_filter(self, filter_sections): + def _convert_filter(self): + if self._design is None: + return [] + filter_sections = self._design.get_filter_sections() sos_fixed = [] for section in filter_sections: nun_coeff = len(section) @@ -125,7 +126,10 @@ class FixedPointSosFilter: return sos_fixed # convert the SOS filter state matrix (zi) to fixed point format - def _convert_state(self, filter_state): + def _convert_state(self): + if self._design is None: + return [] + filter_state = self._design.get_initial_state() sos_state = [] for section in filter_state: nun_states = len(section) @@ -139,54 +143,25 @@ class FixedPointSosFilter: sos_state.append(fixed_state) return sos_state - -# Control an `sos_filter` object on the MCU -class MCU_SosFilter: - # fixed_point_filter should be an FixedPointSosFilter instance. A filter of - # size 0 will create a passthrough filter. - # max_sections should be the largest number of sections you expect - # to use at runtime. The default is the size of the fixed_point_filter. - def __init__(self, mcu, cmd_queue, fixed_point_filter, max_sections=None): - self._mcu = mcu - self._cmd_queue = cmd_queue - self._oid = self._mcu.create_oid() - self._filter = fixed_point_filter - self._max_sections = max_sections - if self._max_sections is None: - self._max_sections = self._filter.get_num_sections() - self._set_section_cmd = self._set_state_cmd = self._set_active_cmd =None - self._last_sent_coeffs = [None] * self._max_sections - self._mcu.add_config_cmd("config_sos_filter oid=%d max_sections=%d" - % (self._oid, self._max_sections)) - self._mcu.register_config_callback(self._build_config) - - def _build_config(self): - self._set_section_cmd = self._mcu.lookup_command( - "sos_filter_set_section oid=%c section_idx=%c" - " sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i", cq=self._cmd_queue) - self._set_state_cmd = self._mcu.lookup_command( - "sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i", - cq=self._cmd_queue) - self._set_active_cmd = self._mcu.lookup_command( - "sos_filter_set_active oid=%c n_sections=%c coeff_int_bits=%c", - cq=self._cmd_queue) - - def get_oid(self): - return self._oid - # Change the filter coefficients and state at runtime - # fixed_point_filter should be an FixedPointSosFilter instance - def change_filter(self, fixed_point_filter): - self._filter = fixed_point_filter + def set_filter_design(self, design): + self._design = design # Resets the filter state back to initial conditions at runtime def reset_filter(self): - num_sections = self._filter.get_num_sections() + # Generate filter parameters + sos_fixed = self._convert_filter() + sos_state = self._convert_state() + num_sections = len(sos_fixed) if num_sections > self._max_sections: raise ValueError("Too many filter sections: %i, The max is %i" % (num_sections, self._max_sections,)) + if len(sos_state) != num_sections: + raise ValueError("The number of filter sections (%i) and state " + "sections (%i) must be equal" + % (num_sections, len(sos_state))) # Send section coefficients (if they have changed) - for i, section in enumerate(self._filter.get_filter_sections()): + for i, section in enumerate(sos_fixed): args = (self._oid, i, section[0], section[1], section[2], section[3], section[4]) if args == self._last_sent_coeffs[i]: @@ -194,8 +169,8 @@ class MCU_SosFilter: self._set_section_cmd.send(args) self._last_sent_coeffs[i] = args # Send section initial states - for i, state in enumerate(self._filter.get_initial_state()): + for i, state in enumerate(sos_state): self._set_state_cmd.send([self._oid, i, state[0], state[1]]) # Activate filter self._set_active_cmd.send([self._oid, num_sections, - self._filter.get_coeff_int_bits()]) + self._coeff_int_bits])