sos_filter: Handle fixed point conversion within MCU_SosFilter

Merge the logic from the FixedPointSosFilter class into the
MCU_SosFilter class.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2026-01-08 19:44:15 -05:00
parent 6413399784
commit 3b5045ed9e
2 changed files with 61 additions and 88 deletions

View file

@ -163,13 +163,9 @@ class ContinuousTareFilter:
# create a filter design from the parameters
def design_filter(self, error_func):
design = sos_filter.DigitalFilter(self.sps, error_func, self.drift,
return sos_filter.DigitalFilter(self.sps, error_func, self.drift,
self.drift_delay, self.buzz, self.buzz_delay, self.notches,
self.notch_quality)
fixed_filter = sos_filter.FixedPointSosFilter(
design.get_filter_sections(), design.get_initial_state(),
Q2_INT_BITS, Q16_INT_BITS)
return fixed_filter
# Combine ContinuousTareFilter and SosFilter into an easy-to-use class
@ -214,9 +210,11 @@ class ContinuousTareFilterHelper:
return ContinuousTareFilter(self._sps, drift, drift_delay, buzz,
buzz_delay, notches, notch_quality)
def _create_filter(self, fixed_filter, cmd_queue):
return sos_filter.MCU_SosFilter(self._sensor.get_mcu(), cmd_queue,
fixed_filter, 4)
def _create_filter(self, design, cmd_queue):
sf = sos_filter.MCU_SosFilter(self._sensor.get_mcu(), cmd_queue, 4,
Q2_INT_BITS, Q16_INT_BITS)
sf.set_filter_design(design)
return sf
def update_from_command(self, gcmd, cq=None):
gcmd_filter = self._build_filter(gcmd)
@ -224,8 +222,8 @@ class ContinuousTareFilterHelper:
if self._active_design == gcmd_filter:
return
# update MCU filter from GCode command
self._sos_filter.change_filter(
self._active_design.design_filter(gcmd.error))
design = self._active_design.design_filter(gcmd.error)
self._sos_filter.set_filter_design(design)
def get_sos_filter(self):
return self._sos_filter

View file

@ -63,41 +63,25 @@ class DigitalFilter:
def get_initial_state(self):
return self.initial_state
# container that accepts SciPy formatted SOS filter data and converts it to a
# selected fixed point representation. This data could come from DigitalFilter,
# static data, config etc.
class FixedPointSosFilter:
# filter_sections is an array of SciPy formatted SOS filter sections (sos)
# initial_state is an array of SciPy formatted SOS state sections (zi)
def __init__(self, filter_sections=None, initial_state=None,
coeff_int_bits=2, value_int_bits=15):
filter_sections = [] if filter_sections is None else filter_sections
initial_state = [] if initial_state is None else initial_state
num_sections = len(filter_sections)
num_state = len(initial_state)
if num_state != num_sections:
raise ValueError("The number of filter sections (%i) and state "
"sections (%i) must be equal" % (
num_sections, num_state))
self._coeff_int_bits = self._validate_int_bits(coeff_int_bits)
self._value_int_bits = self._validate_int_bits(value_int_bits)
self._filter = self._convert_filter(filter_sections)
self._state = self._convert_state(initial_state)
def get_filter_sections(self):
return self._filter
def get_initial_state(self):
return self._state
def get_coeff_int_bits(self):
return self._coeff_int_bits
def get_value_int_bits(self):
return self._value_int_bits
def get_num_sections(self):
return len(self._filter)
# Control an `sos_filter` object on the MCU
class MCU_SosFilter:
# max_sections should be the largest number of sections you expect
# to use at runtime.
def __init__(self, mcu, cmd_queue, max_sections,
coeff_int_bits=2, value_int_bits=15):
self._mcu = mcu
self._cmd_queue = cmd_queue
self._oid = self._mcu.create_oid()
self._max_sections = max_sections
self._coeff_int_bits = coeff_int_bits
self._value_int_bits = value_int_bits
self._design = None
self._set_section_cmd = self._set_state_cmd = self._set_active_cmd =None
self._last_sent_coeffs = [None] * self._max_sections
self._mcu.add_config_cmd("config_sos_filter oid=%d max_sections=%d"
% (self._oid, self._max_sections))
self._mcu.register_config_callback(self._build_config)
def _validate_int_bits(self, int_bits):
if int_bits < 1 or int_bits > 30:
@ -105,8 +89,25 @@ class FixedPointSosFilter:
" value between 1 and 30" % (int_bits,))
return int_bits
def _build_config(self):
self._set_section_cmd = self._mcu.lookup_command(
"sos_filter_set_section oid=%c section_idx=%c"
" sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i", cq=self._cmd_queue)
self._set_state_cmd = self._mcu.lookup_command(
"sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i",
cq=self._cmd_queue)
self._set_active_cmd = self._mcu.lookup_command(
"sos_filter_set_active oid=%c n_sections=%c coeff_int_bits=%c",
cq=self._cmd_queue)
def get_oid(self):
return self._oid
# convert the SciPi SOS filters to fixed point format
def _convert_filter(self, filter_sections):
def _convert_filter(self):
if self._design is None:
return []
filter_sections = self._design.get_filter_sections()
sos_fixed = []
for section in filter_sections:
nun_coeff = len(section)
@ -125,7 +126,10 @@ class FixedPointSosFilter:
return sos_fixed
# convert the SOS filter state matrix (zi) to fixed point format
def _convert_state(self, filter_state):
def _convert_state(self):
if self._design is None:
return []
filter_state = self._design.get_initial_state()
sos_state = []
for section in filter_state:
nun_states = len(section)
@ -139,54 +143,25 @@ class FixedPointSosFilter:
sos_state.append(fixed_state)
return sos_state
# Control an `sos_filter` object on the MCU
class MCU_SosFilter:
# fixed_point_filter should be an FixedPointSosFilter instance. A filter of
# size 0 will create a passthrough filter.
# max_sections should be the largest number of sections you expect
# to use at runtime. The default is the size of the fixed_point_filter.
def __init__(self, mcu, cmd_queue, fixed_point_filter, max_sections=None):
self._mcu = mcu
self._cmd_queue = cmd_queue
self._oid = self._mcu.create_oid()
self._filter = fixed_point_filter
self._max_sections = max_sections
if self._max_sections is None:
self._max_sections = self._filter.get_num_sections()
self._set_section_cmd = self._set_state_cmd = self._set_active_cmd =None
self._last_sent_coeffs = [None] * self._max_sections
self._mcu.add_config_cmd("config_sos_filter oid=%d max_sections=%d"
% (self._oid, self._max_sections))
self._mcu.register_config_callback(self._build_config)
def _build_config(self):
self._set_section_cmd = self._mcu.lookup_command(
"sos_filter_set_section oid=%c section_idx=%c"
" sos0=%i sos1=%i sos2=%i sos3=%i sos4=%i", cq=self._cmd_queue)
self._set_state_cmd = self._mcu.lookup_command(
"sos_filter_set_state oid=%c section_idx=%c state0=%i state1=%i",
cq=self._cmd_queue)
self._set_active_cmd = self._mcu.lookup_command(
"sos_filter_set_active oid=%c n_sections=%c coeff_int_bits=%c",
cq=self._cmd_queue)
def get_oid(self):
return self._oid
# Change the filter coefficients and state at runtime
# fixed_point_filter should be an FixedPointSosFilter instance
def change_filter(self, fixed_point_filter):
self._filter = fixed_point_filter
def set_filter_design(self, design):
self._design = design
# Resets the filter state back to initial conditions at runtime
def reset_filter(self):
num_sections = self._filter.get_num_sections()
# Generate filter parameters
sos_fixed = self._convert_filter()
sos_state = self._convert_state()
num_sections = len(sos_fixed)
if num_sections > self._max_sections:
raise ValueError("Too many filter sections: %i, The max is %i"
% (num_sections, self._max_sections,))
if len(sos_state) != num_sections:
raise ValueError("The number of filter sections (%i) and state "
"sections (%i) must be equal"
% (num_sections, len(sos_state)))
# Send section coefficients (if they have changed)
for i, section in enumerate(self._filter.get_filter_sections()):
for i, section in enumerate(sos_fixed):
args = (self._oid, i, section[0], section[1], section[2],
section[3], section[4])
if args == self._last_sent_coeffs[i]:
@ -194,8 +169,8 @@ class MCU_SosFilter:
self._set_section_cmd.send(args)
self._last_sent_coeffs[i] = args
# Send section initial states
for i, state in enumerate(self._filter.get_initial_state()):
for i, state in enumerate(sos_state):
self._set_state_cmd.send([self._oid, i, state[0], state[1]])
# Activate filter
self._set_active_cmd.send([self._oid, num_sections,
self._filter.get_coeff_int_bits()])
self._coeff_int_bits])