adxl345: Move g-code commands to new helper class

Separate out the G-Code command handlers to a new ADXLCommandHelper()
class.  This helps separate the sensing code from the user interface
code.

Deprecate the RATE parameter of the ACCELEROMETER_MEASURE command.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2021-08-11 21:54:19 -04:00
parent 6999ff6256
commit e03e0e9dac
3 changed files with 95 additions and 105 deletions

View file

@ -96,10 +96,85 @@ class ADXL345Results:
write_proc.daemon = True
write_proc.start()
# Helper class for G-Code commands
class ADXLCommandHelper:
def __init__(self, config, chip):
self.printer = config.get_printer()
self.chip = chip
self.name = "default"
if len(config.get_name().split()) > 1:
self.name = config.get_name().split()[1]
self.register_commands(self.name)
if self.name == "default":
self.register_commands(None)
def register_commands(self, name):
# Register commands
gcode = self.printer.lookup_object('gcode')
gcode.register_mux_command("ACCELEROMETER_MEASURE", "CHIP", name,
self.cmd_ACCELEROMETER_MEASURE,
desc=self.cmd_ACCELEROMETER_MEASURE_help)
gcode.register_mux_command("ACCELEROMETER_QUERY", "CHIP", name,
self.cmd_ACCELEROMETER_QUERY,
desc=self.cmd_ACCELEROMETER_QUERY_help)
gcode.register_mux_command("ADXL345_DEBUG_READ", "CHIP", name,
self.cmd_ADXL345_DEBUG_READ,
desc=self.cmd_ADXL345_DEBUG_READ_help)
gcode.register_mux_command("ADXL345_DEBUG_WRITE", "CHIP", name,
self.cmd_ADXL345_DEBUG_WRITE,
desc=self.cmd_ADXL345_DEBUG_WRITE_help)
cmd_ACCELEROMETER_MEASURE_help = "Start/stop accelerometer"
def cmd_ACCELEROMETER_MEASURE(self, gcmd):
if not self.chip.is_measuring():
# Start measurements
self.chip.start_measurements()
gcmd.respond_info("adxl345 measurements started")
return
# End measurements
name = gcmd.get("NAME", time.strftime("%Y%m%d_%H%M%S"))
if not name.replace('-', '').replace('_', '').isalnum():
raise gcmd.error("Invalid adxl345 NAME parameter")
res = self.chip.finish_measurements()
# Write data to file
if self.name == "default":
filename = "/tmp/adxl345-%s.csv" % (name,)
else:
filename = "/tmp/adxl345-%s-%s.csv" % (self.name, name,)
res.write_to_file(filename)
gcmd.respond_info("Writing raw accelerometer data to %s file"
% (filename,))
cmd_ACCELEROMETER_QUERY_help = "Query accelerometer for the current values"
def cmd_ACCELEROMETER_QUERY(self, gcmd):
if self.chip.is_measuring():
raise gcmd.error("adxl345 measurements in progress")
self.chip.start_measurements()
self.printer.lookup_object('toolhead').dwell(1.)
result = self.chip.finish_measurements()
values = result.decode_samples()
if not values:
raise gcmd.error("No adxl345 measurements found")
_, accel_x, accel_y, accel_z = values[-1]
gcmd.respond_info("adxl345 values (x, y, z): %.6f, %.6f, %.6f"
% (accel_x, accel_y, accel_z))
cmd_ADXL345_DEBUG_READ_help = "Query accelerometer register (for debugging)"
def cmd_ADXL345_DEBUG_READ(self, gcmd):
if self.chip.is_measuring():
raise gcmd.error("adxl345 measurements in progress")
reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0))
val = self.chip.read_reg(reg)
gcmd.respond_info("ADXL345 REG[0x%x] = 0x%x" % (reg, val))
cmd_ADXL345_DEBUG_WRITE_help = "Set accelerometer register (for debugging)"
def cmd_ADXL345_DEBUG_WRITE(self, gcmd):
if self.chip.is_measuring():
raise gcmd.error("adxl345 measurements in progress")
reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0))
val = gcmd.get("VAL", minval=0, maxval=255, parser=lambda x: int(x, 0))
self.chip.set_reg(reg, val)
# Printer class that controls ADXL345 chip
class ADXL345:
def __init__(self, config):
self.printer = config.get_printer()
ADXLCommandHelper(config, self)
self.query_rate = 0
self.last_tx_time = 0.
am = {'x': (0, SCALE), 'y': (1, SCALE), 'z': (2, SCALE),
@ -127,34 +202,6 @@ class ADXL345:
mcu.register_config_callback(self._build_config)
mcu.register_response(self._handle_adxl345_start, "adxl345_start", oid)
mcu.register_response(self._handle_adxl345_data, "adxl345_data", oid)
# Register commands
self.name = "default"
if len(config.get_name().split()) > 1:
self.name = config.get_name().split()[1]
gcode = self.printer.lookup_object('gcode')
gcode.register_mux_command("ACCELEROMETER_MEASURE", "CHIP", self.name,
self.cmd_ACCELEROMETER_MEASURE,
desc=self.cmd_ACCELEROMETER_MEASURE_help)
gcode.register_mux_command("ACCELEROMETER_QUERY", "CHIP", self.name,
self.cmd_ACCELEROMETER_QUERY,
desc=self.cmd_ACCELEROMETER_QUERY_help)
gcode.register_mux_command("ADXL345_DEBUG_READ", "CHIP", self.name,
self.cmd_ADXL345_DEBUG_READ,
desc=self.cmd_ADXL345_DEBUG_READ_help)
gcode.register_mux_command("ADXL345_DEBUG_WRITE", "CHIP", self.name,
self.cmd_ADXL345_DEBUG_WRITE,
desc=self.cmd_ADXL345_DEBUG_WRITE_help)
if self.name == "default":
gcode.register_mux_command("ACCELEROMETER_MEASURE", "CHIP", None,
self.cmd_ACCELEROMETER_MEASURE)
gcode.register_mux_command("ACCELEROMETER_QUERY", "CHIP", None,
self.cmd_ACCELEROMETER_QUERY)
gcode.register_mux_command("ADXL345_DEBUG_READ", "CHIP", None,
self.cmd_ADXL345_DEBUG_READ,
desc=self.cmd_ADXL345_DEBUG_READ_help)
gcode.register_mux_command("ADXL345_DEBUG_WRITE", "CHIP", None,
self.cmd_ADXL345_DEBUG_WRITE,
desc=self.cmd_ADXL345_DEBUG_WRITE_help)
def is_initialized(self):
# In case of miswiring, testing ADXL345 device ID prevents treating
# noise or wrong signal as a correctly initialized device
@ -213,10 +260,9 @@ class ADXL345:
reg, val, stored_val))
def is_measuring(self):
return self.query_rate > 0
def start_measurements(self, rate=None):
def start_measurements(self):
if self.is_measuring():
return
rate = rate or self.data_rate
if not self.is_initialized():
self.initialize()
# Setup chip in requested query rate
@ -225,7 +271,7 @@ class ADXL345:
clock = self.mcu.print_time_to_clock(self.last_tx_time)
self.set_reg(REG_POWER_CTL, 0x00, minclock=clock)
self.set_reg(REG_FIFO_CTL, 0x00)
self.set_reg(REG_BW_RATE, QUERY_RATES[rate])
self.set_reg(REG_BW_RATE, QUERY_RATES[self.data_rate])
self.set_reg(REG_FIFO_CTL, 0x80)
# Setup samples
print_time = self.printer.lookup_object('toolhead').get_last_move_time()
@ -234,9 +280,9 @@ class ADXL345:
self.samples_start1 = self.samples_start2 = print_time
# Start bulk reading
reqclock = self.mcu.print_time_to_clock(print_time)
rest_ticks = self.mcu.seconds_to_clock(4. / rate)
rest_ticks = self.mcu.seconds_to_clock(4. / self.data_rate)
self.last_tx_time = print_time
self.query_rate = rate
self.query_rate = self.data_rate
self.query_adxl345_cmd.send([self.oid, reqclock, rest_ticks],
reqclock=reqclock)
def finish_measurements(self):
@ -263,64 +309,6 @@ class ADXL345:
logging.info("ADXL345 finished %d measurements: %s",
res.total_count, res.get_stats())
return res
def end_query(self, name, gcmd):
if not self.is_measuring():
return
res = self.finish_measurements()
# Write data to file
if self.name == "default":
filename = "/tmp/adxl345-%s.csv" % (name,)
else:
filename = "/tmp/adxl345-%s-%s.csv" % (self.name, name,)
res.write_to_file(filename)
gcmd.respond_info(
"Writing raw accelerometer data to %s file" % (filename,))
cmd_ACCELEROMETER_MEASURE_help = "Start/stop accelerometer"
def cmd_ACCELEROMETER_MEASURE(self, gcmd):
if self.is_measuring():
name = gcmd.get("NAME", time.strftime("%Y%m%d_%H%M%S"))
if not name.replace('-', '').replace('_', '').isalnum():
raise gcmd.error("Invalid adxl345 NAME parameter")
self.end_query(name, gcmd)
gcmd.respond_info("adxl345 measurements stopped")
else:
rate = gcmd.get_int("RATE", self.data_rate)
if rate not in QUERY_RATES:
raise gcmd.error("Not a valid adxl345 query rate: %d" % (rate,))
self.start_measurements(rate)
gcmd.respond_info("adxl345 measurements started")
cmd_ACCELEROMETER_QUERY_help = "Query accelerometer for the current values"
def cmd_ACCELEROMETER_QUERY(self, gcmd):
if self.is_measuring():
raise gcmd.error("adxl345 measurements in progress")
self.start_measurements()
reactor = self.printer.get_reactor()
eventtime = starttime = reactor.monotonic()
while not self.raw_samples:
eventtime = reactor.pause(eventtime + .1)
if eventtime > starttime + 3.:
# Try to shutdown the measurements
self.finish_measurements()
raise gcmd.error("Timeout reading adxl345 data")
result = self.finish_measurements()
values = result.decode_samples()
_, accel_x, accel_y, accel_z = values[-1]
gcmd.respond_info("adxl345 values (x, y, z): %.6f, %.6f, %.6f" % (
accel_x, accel_y, accel_z))
cmd_ADXL345_DEBUG_READ_help = "Query accelerometer register (for debugging)"
def cmd_ADXL345_DEBUG_READ(self, gcmd):
if self.is_measuring():
raise gcmd.error("adxl345 measurements in progress")
reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0))
val = self.read_reg(reg)
gcmd.respond_info("ADXL345 REG[0x%x] = 0x%x" % (reg, val))
cmd_ADXL345_DEBUG_WRITE_help = "Set accelerometer register (for debugging)"
def cmd_ADXL345_DEBUG_WRITE(self, gcmd):
if self.is_measuring():
raise gcmd.error("adxl345 measurements in progress")
reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0))
val = gcmd.get("VAL", minval=0, maxval=255, parser=lambda x: int(x, 0))
self.set_reg(reg, val)
def load_config(config):
return ADXL345(config)