This commit is contained in:
Microlay 2025-12-22 17:32:58 +01:00 committed by GitHub
commit c8409b10e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 2229 additions and 1 deletions

View file

@ -0,0 +1,66 @@
# Shell command execution support for Klipper
#
# Copyright (C) 2024 Microlay
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import subprocess
import logging
import os
class ShellCommand:
def __init__(self, config):
self.name = config.get_name().split()[1]
self.printer = config.get_printer()
self.gcode = self.printer.lookup_object('gcode')
self.command = config.get('command')
self.timeout = config.getfloat('timeout', 2.0)
self.verbose = config.getboolean('verbose', True)
# Register the gcode command
self.gcode.register_mux_command(
"RUN_SHELL_COMMAND", "CMD", self.name,
self.cmd_RUN_SHELL_COMMAND,
desc="Run a predefined shell command"
)
def cmd_RUN_SHELL_COMMAND(self, gcmd):
# Get optional parameters
params = gcmd.get('PARAMS', '')
# Build the full command
if params:
full_command = f"{self.command} {params}"
else:
full_command = self.command
if self.verbose:
self.gcode.respond_info(f"Running: {full_command}")
try:
# Execute the command
result = subprocess.run(
full_command,
shell=True,
capture_output=True,
text=True,
timeout=self.timeout
)
if self.verbose:
if result.stdout:
self.gcode.respond_info(f"Output: {result.stdout.strip()}")
if result.stderr:
self.gcode.respond_info(f"Stderr: {result.stderr.strip()}")
if result.returncode != 0:
self.gcode.respond_info(f"Return code: {result.returncode}")
except subprocess.TimeoutExpired:
raise self.gcode.error(f"Shell command timed out after {self.timeout}s")
except Exception as e:
raise self.gcode.error(f"Shell command error: {str(e)}")
def load_config_prefix(config):
return ShellCommand(config)

View file

@ -127,7 +127,33 @@ class HomingMove:
for sp in self.stepper_positions}
trig_steps = {sp.stepper_name: sp.trig_pos - sp.start_pos
for sp in self.stepper_positions}
# Debug logging (comentado para producción)
# logging.info("HomingMove debug:")
# for sp in self.stepper_positions:
# logging.info(" %s: start_pos=%d, trig_pos=%d, halt_pos=%d, "
# "trig_steps=%d, halt_steps=%d",
# sp.stepper_name, sp.start_pos, sp.trig_pos,
# sp.halt_pos,
# trig_steps.get(sp.stepper_name, 0),
# halt_steps.get(sp.stepper_name, 0))
# logging.info(" trigger_times: %s", trigger_times)
# logging.info(" move_end_print_time: %.6f", move_end_print_time)
# Check if there was no movement detected (start_pos == trig_pos)
# This can happen when the endstop triggers immediately
no_movement = any(sp.start_pos == sp.trig_pos
for sp in self.stepper_positions)
if no_movement and any(halt_steps.values()):
# If we detected no movement but halt_steps shows movement,
# use halt positions instead of trigger positions
# logging.info(" Detected no_movement but halt_steps has values, using halt positions")
trig_steps = halt_steps
haltpos = trigpos = self.calc_toolhead_pos(kin_spos, trig_steps)
# logging.info(" Calculated trigpos: %s", trigpos)
if trig_steps != halt_steps:
haltpos = self.calc_toolhead_pos(kin_spos, halt_steps)
self.toolhead.set_position(haltpos)
@ -269,10 +295,21 @@ class PrinterHoming:
raise self.printer.command_error(
"Probing failed due to printer shutdown")
raise
if hmove.check_no_movement() is not None:
# Check if there was no movement detected
no_movement_endstop = hmove.check_no_movement()
if no_movement_endstop is not None:
# Still calculate and store the position for diagnostic purposes
# This is useful when the endstop was already triggered but we need
# to know where it thinks it is
self._last_probe_position = epos
raise self.printer.command_error(
"Probe triggered prior to movement")
return epos
def get_last_probe_position(self):
"""Get the last calculated probe position, even if the probe failed.
This is useful for diagnostics when 'Probe triggered prior to movement'
error occurs but we still need to know the calculated position."""
return getattr(self, '_last_probe_position', None)
def cmd_G28(self, gcmd):
# Move to origin
axes = []

View file

@ -0,0 +1,503 @@
# Módulo para movimientos G1 con monitoreo de endstops
#
# Copyright (C) 2024 MicroLay
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging
import os
import time
import json
class MonitoredMove:
def __init__(self, config):
self.printer = config.get_printer()
self.gcode = self.printer.lookup_object('gcode')
self.printer.register_event_handler("klippy:connect", self._handle_connect)
self.toolhead = None
self.phoming = None
self.gcode.register_command('MONITORED_G1', self.cmd_MONITORED_G1, desc=self.cmd_MONITORED_G1_help)
self.gcode.register_command('SET_POSITION_LIMITS', self.cmd_SET_POSITION_LIMITS, desc=self.cmd_SET_POSITION_LIMITS_help)
self.gcode.register_command('QUERY_ENDSTOPS_STATUS', self.cmd_QUERY_ENDSTOPS_STATUS, desc=self.cmd_QUERY_ENDSTOPS_STATUS_help)
logging.info("MonitoredMove: Initialized")
def _handle_connect(self):
self.toolhead = self.printer.lookup_object('toolhead')
self.phoming = self.printer.lookup_object('homing')
def _get_endstop_by_stepper(self, stepper_name, gcmd):
"""Obtiene el endstop de un stepper buscándolo a través del rail de su eje."""
kin = self.toolhead.get_kinematics()
target_rail = None
for rail in kin.rails:
if rail.get_name() == stepper_name:
target_rail = rail
break
if target_rail is None:
rail_names = [r.get_name() for r in kin.rails]
raise gcmd.error(f"No se pudo encontrar el rail para '{stepper_name}'. Rails disponibles: {rail_names}")
endstops = target_rail.get_endstops()
if not endstops:
raise gcmd.error(f"El rail del stepper '{stepper_name}' no tiene endstops configurados.")
return endstops[0][0]
cmd_MONITORED_G1_help = "Realiza un movimiento G1 monitoreando el endstop de un stepper"
def cmd_MONITORED_G1(self, gcmd):
if not all([self.toolhead, self.phoming]):
raise gcmd.error("MONITORED_G1: Sistema no listo.")
stepper_name = gcmd.get('STEPPER')
if not stepper_name:
raise gcmd.error("El parámetro 'STEPPER' es requerido (ej: STEPPER=stepper_y).")
# Obtener posición inicial
initial_pos = self.toolhead.get_position()
x = gcmd.get_float('X', initial_pos[0])
y = gcmd.get_float('Y', initial_pos[1])
z = gcmd.get_float('Z', initial_pos[2])
f = gcmd.get_float('F', 60.0, above=0.)
target_pos = [x, y, z]
endstop = self._get_endstop_by_stepper(stepper_name, gcmd)
movement_distance = target_pos[2] - initial_pos[2]
# gcmd.respond_info(f"🎯 Movimiento monitoreado desde Z={initial_pos[2]:.3f} hacia Z={target_pos[2]:.3f} ({movement_distance:+.3f}mm)")
# gcmd.respond_info(f"🔍 Monitoreando endstop de '{stepper_name}'")
# Verificar estado inicial del endstop
initial_endstop_state = False
if hasattr(endstop, 'query_endstop'):
initial_endstop_state = endstop.query_endstop(self.toolhead.get_last_move_time())
# gcmd.respond_info(f"🔍 Estado inicial del endstop: {'ACTIVADO' if initial_endstop_state else 'LIBRE'}")
# Variable para almacenar la posición final
final_pos = None
endstop_triggered = False
# Guardar el tiempo inicial del movimiento
start_time = self.toolhead.get_last_move_time()
# Variable para rastrear si modificamos el endstop
endstop_modified = False
original_steppers = []
try:
speed_mms = f / 60.0
# IMPORTANTE: Si estamos monitoreando stepper_y pero moviendo en Z,
# necesitamos agregar temporalmente el stepper Z al endstop
if stepper_name == 'stepper_y' and abs(movement_distance) > 0.01:
# gcmd.respond_info("🔧 Configurando endstop Y para monitorear movimiento Z...")
# Obtener el rail Z y sus steppers
kin = self.toolhead.get_kinematics()
z_steppers = []
for rail in kin.rails:
if rail.get_name() == 'stepper_z':
z_steppers = rail.get_steppers()
break
# Agregar temporalmente los steppers Z al endstop Y
original_steppers = endstop.get_steppers()[:]
endstop_modified = True
for z_stepper in z_steppers:
endstop.add_stepper(z_stepper)
# gcmd.respond_info(f" Agregado stepper: {z_stepper.get_name()}")
# Intentar el movimiento con probing_move
trigger_pos = self.phoming.probing_move(endstop, target_pos, speed_mms)
# Si llegamos aquí sin excepción, el endstop se activó durante el movimiento
# gcmd.respond_info(f"🎯 Endstop activado en X={trigger_pos[0]:.6f} Y={trigger_pos[1]:.6f} Z={trigger_pos[2]:.6f}")
# gcmd.respond_info("✅ Movimiento detenido por endstop.")
final_pos = trigger_pos
endstop_triggered = True
except self.printer.command_error as e:
error_msg = str(e)
# Obtener la posición actual después del intento
current_pos = self.toolhead.get_position()
if "triggered prior to movement" in error_msg:
# Este error puede significar varias cosas, necesitamos investigar
# gcmd.respond_info("⚠️ Klipper reporta 'triggered prior to movement'")
# Intentar obtener la posición calculada por probing_move
last_probe_pos = None
if hasattr(self.phoming, 'get_last_probe_position'):
last_probe_pos = self.phoming.get_last_probe_position()
if last_probe_pos:
# gcmd.respond_info(f"📍 Posición calculada por probing_move: Z={last_probe_pos[2]:.6f}")
final_pos = last_probe_pos
endstop_triggered = True
# Actualizar la posición del toolhead
self.toolhead.set_position(final_pos)
# Verificar si realmente hubo movimiento
actual_movement = abs(final_pos[2] - initial_pos[2])
# gcmd.respond_info(f"📐 Movimiento detectado: {actual_movement:.3f}mm")
if actual_movement < 0.01:
# gcmd.respond_info("⚠️ No hubo movimiento real (< 0.01mm)")
pass
else:
# gcmd.respond_info("✅ Confirmado: SÍ hubo movimiento")
pass
if not last_probe_pos:
# Fallback al método anterior si no tenemos get_last_probe_position
gcmd.respond_info("⚠️ Usando método de diagnóstico alternativo...")
# La posición del toolhead no se actualiza cuando probing_move falla
# Necesitamos calcularla basándonos en los steppers
kin = self.toolhead.get_kinematics()
# Forzar actualización de la generación de pasos
self.toolhead.flush_step_generation()
# Diagnóstico adicional
gcmd.respond_info("🔍 Diagnóstico de posiciones:")
# Verificar cada stepper individualmente
for i, rail in enumerate(kin.rails[:3]):
axis_name = "XYZ"[i]
rail_name = rail.get_name()
for stepper in rail.get_steppers():
stepper_name = stepper.get_name()
cmd_pos = stepper.get_commanded_position()
mcu_pos = stepper.get_mcu_position()
gcmd.respond_info(f" {axis_name} - {stepper_name}: cmd_pos={cmd_pos:.3f}, mcu_pos={mcu_pos}")
# Obtener posición real de los steppers
kin_spos = {s.get_name(): s.get_commanded_position()
for s in kin.get_steppers()}
# Calcular posición cartesiana desde posiciones de steppers
real_pos = kin.calc_position(kin_spos)
# También intentar obtener la posición de otra manera
# Verificar el estado del endstop después del movimiento
final_endstop_state = False
if hasattr(endstop, 'query_endstop'):
final_endstop_state = endstop.query_endstop(self.toolhead.get_last_move_time())
gcmd.respond_info(f"🔍 Estado final del endstop: {'ACTIVADO' if final_endstop_state else 'LIBRE'}")
# Calcular tiempo transcurrido
move_time = self.toolhead.get_last_move_time() - start_time
gcmd.respond_info(f"⏱️ Tiempo de movimiento: {move_time:.1f}s")
# real_pos puede tener None en algunos ejes, usar posición actual para esos
final_pos = list(current_pos)
for i in range(3):
if real_pos[i] is not None:
final_pos[i] = real_pos[i]
# Verificar si realmente nos movimos
actual_movement = abs(final_pos[2] - initial_pos[2])
gcmd.respond_info(f"📍 Posición inicial: Z={initial_pos[2]:.3f}")
gcmd.respond_info(f"📍 Posición real (steppers): Z={final_pos[2]:.3f}")
gcmd.respond_info(f"📍 Movimiento real: {actual_movement:.3f}mm")
# Si el endstop cambió de estado, asumimos que hubo movimiento
if not initial_endstop_state and final_endstop_state:
gcmd.respond_info("⚠️ El endstop cambió de LIBRE a ACTIVADO")
# Para stepper_y usado como Z-max, calcular posición exacta
if stepper_name == 'stepper_y':
# Obtener el rail Z y sus steppers
z_rail = kin.rails[2] # Rail Z
z_steppers = z_rail.get_steppers()
if z_steppers:
# Obtener el primer stepper Z
z_stepper = z_steppers[0]
try:
# Forzar una consulta actualizada de la posición MCU
# Esto es similar a lo que hace _query_mcu_position()
gcmd.respond_info("🔄 Consultando posición actual del MCU...")
# Método 1: Intentar obtener la posición actual del MCU
# Primero, asegurarnos de que todos los movimientos estén procesados
self.toolhead.flush_step_generation()
self.toolhead.wait_moves()
# Obtener la posición comandada actual (después del movimiento)
final_cmd_pos = z_stepper.get_commanded_position()
# También intentar obtener la posición MCU actual
final_mcu_pos = z_stepper.get_mcu_position()
gcmd.respond_info(f"📍 Posición comandada inicial: {initial_pos[2]:.3f}mm")
gcmd.respond_info(f"📍 Posición comandada final: {final_cmd_pos:.3f}mm")
gcmd.respond_info(f"🔢 Posición MCU final: {final_mcu_pos} pasos")
# Método 2: Calcular basándonos en la diferencia de posiciones comandadas
movement = final_cmd_pos - initial_pos[2]
if abs(movement) > 0.01:
# Hubo movimiento según las posiciones comandadas
calculated_z = initial_pos[2] + movement
gcmd.respond_info(f"📐 Movimiento detectado: {movement:.3f}mm")
gcmd.respond_info(f"🎯 Posición calculada: Z={calculated_z:.3f}")
else:
# Si no hay diferencia en posiciones comandadas,
# usar el position_max como fallback
z_min, z_max = z_rail.get_range()
calculated_z = z_max
gcmd.respond_info(f"⚠️ Sin movimiento en posiciones comandadas")
gcmd.respond_info(f"📐 Usando position_max: Z={calculated_z:.3f}")
final_pos[2] = calculated_z
endstop_triggered = True
# Comparar con position_max
z_min, z_max = z_rail.get_range()
diff = calculated_z - z_max
gcmd.respond_info(f"📏 Position_max esperado: Z={z_max:.3f}")
gcmd.respond_info(f"📊 Diferencia detectada: {diff:+.3f}mm")
# Actualizar la posición del toolhead
self.toolhead.set_position(final_pos)
except Exception as e:
gcmd.respond_info(f"⚠️ Error obteniendo posición: {str(e)}")
# Fallback: asumir position_max
z_min, z_max = z_rail.get_range()
final_pos[2] = z_max
gcmd.respond_info(f"📐 Fallback a position_max: Z={z_max:.3f}")
endstop_triggered = True
self.toolhead.set_position(final_pos)
elif actual_movement > 0.01: # Si nos movimos más de 0.01mm
gcmd.respond_info(f"✅ Confirmado: SÍ hubo movimiento")
gcmd.respond_info(f"🎯 Endstop encontrado en: Z={final_pos[2]:.6f}")
endstop_triggered = True
# Actualizar la posición del toolhead para sincronizar
self.toolhead.set_position(final_pos)
else:
# Realmente no hubo movimiento
gcmd.respond_info("📍 Confirmado: NO hubo movimiento")
if initial_endstop_state:
gcmd.respond_info("💡 El endstop ya estaba activado desde el inicio")
final_pos = initial_pos
endstop_triggered = True
else:
gcmd.respond_info("❌ Error: endstop reporta activación sin movimiento real")
raise
elif "No trigger" in error_msg:
# Movimiento completado sin activación
gcmd.respond_info(f"⚠️ Movimiento completado sin activación del endstop")
gcmd.respond_info(f"📍 Posición final: Z={current_pos[2]:.6f}")
gcmd.respond_info("❌ No se pudo detectar el Z-max")
final_pos = current_pos
endstop_triggered = False
elif "Move out of range" in error_msg:
gcmd.respond_info("❌ Movimiento fuera de rango")
gcmd.respond_info("💡 Usa SET_POSITION_LIMITS AXIS=Z MAX=150 primero")
raise
else:
gcmd.respond_info(f"❌ Error inesperado: {error_msg}")
raise
finally:
# Restaurar los steppers originales del endstop si los modificamos
if endstop_modified and original_steppers is not None:
# gcmd.respond_info("🔧 Restaurando configuración original del endstop...")
# Primero, obtener los steppers actuales
current_steppers = endstop.get_steppers()
# Remover los steppers que agregamos (los que no estaban originalmente)
for stepper in current_steppers:
if stepper not in original_steppers:
# No hay método remove_stepper, así que tenemos que hackear un poco
# Simplemente reportamos que deberíamos removerlo
pass # gcmd.respond_info(f" ⚠️ Nota: {stepper.get_name()} permanecerá asociado al endstop hasta el reinicio")
# Si tenemos una posición final válida y es para stepper_y, calcular pérdida de pasos
if final_pos is not None and stepper_name == 'stepper_y' and endstop_triggered:
# Comparar con la posición esperada
kin = self.toolhead.get_kinematics()
z_rail = kin.rails[2] # Rail Z
z_min, z_max = z_rail.get_range()
diferencia = final_pos[2] - z_max
# Reporte con posición absoluta y diferencia para base de datos
gcmd.respond_info(f"Posición Z detectada: {final_pos[2]:.6f}mm")
if diferencia > 0:
gcmd.respond_info(f"Pérdida de pasos BAJANDO: {diferencia:.6f}mm")
elif diferencia < 0:
gcmd.respond_info(f"Pérdida de pasos SUBIENDO: {diferencia:.6f}mm")
else:
gcmd.respond_info(f"Pérdida de pasos: {diferencia:.6f}mm")
# Escribir los valores en un archivo para nanodlp
try:
# Crear directorio si no existe
db_path = "/home/pi/nanodlp/db"
if not os.path.exists(db_path):
os.makedirs(db_path)
# Leer PlateID del archivo status.json
plate_id = None
status_file = "/home/pi/nanodlp/public/status.json"
try:
if os.path.exists(status_file):
with open(status_file, 'r') as f:
status_data = json.load(f)
plate_id = status_data.get('PlateID', None)
except Exception as e:
gcmd.respond_info(f"Advertencia: No se pudo leer PlateID: {str(e)}")
# Archivo JSON para el historial
json_file = os.path.join(db_path, "step_loss_history.json")
# Crear objeto de medición
measurement = {
"timestamp": time.time(),
"datetime": time.strftime("%Y-%m-%d %H:%M:%S"),
"step_loss": float(f"{diferencia:.6f}"),
"z_position": float(f"{final_pos[2]:.6f}"),
"plate_id": plate_id
}
# Leer historial existente o crear nuevo
history = []
if os.path.exists(json_file):
try:
with open(json_file, 'r') as f:
history = json.load(f)
except:
history = []
# Agregar nueva medición
history.append(measurement)
# Guardar historial actualizado
with open(json_file, 'w') as f:
json.dump(history, f, indent=2)
gcmd.respond_info(f"Medición guardada en: {json_file}")
# También guardar la última medición en un archivo separado
latest_file = os.path.join(db_path, "step_loss_latest.json")
with open(latest_file, 'w') as f:
json.dump(measurement, f, indent=2)
except Exception as e:
gcmd.respond_info(f"Error al guardar archivo: {str(e)}")
cmd_SET_POSITION_LIMITS_help = "Modifica temporalmente los límites de posición de un eje (equivalente a M211 de Marlin)"
def cmd_SET_POSITION_LIMITS(self, gcmd):
"""Comando para modificar temporalmente los límites de posición."""
if not self.toolhead:
raise gcmd.error("SET_POSITION_LIMITS: Sistema no listo.")
# Obtener la cinemática
kin = self.toolhead.get_kinematics()
# Obtener parámetros
axis = gcmd.get('AXIS', None)
min_val = gcmd.get_float('MIN', None)
max_val = gcmd.get_float('MAX', None)
reset = gcmd.get('RESET', None)
if reset is not None:
# Resetear a valores originales del config
gcmd.respond_info("🔄 Restaurando límites originales...")
for i, rail in enumerate(kin.rails[:3]): # Solo X, Y, Z
original_range = rail.get_range()
kin.limits[i] = original_range
axis_name = "XYZ"[i]
gcmd.respond_info(f"{axis_name}: [{original_range[0]:.3f}, {original_range[1]:.3f}]")
return
if not axis:
# Mostrar límites actuales
gcmd.respond_info("📏 Límites de posición actuales:")
for i, limit in enumerate(kin.limits[:3]):
axis_name = "XYZ"[i]
if limit[0] <= limit[1]: # Eje homedado
gcmd.respond_info(f" {axis_name}: [{limit[0]:.3f}, {limit[1]:.3f}]")
else:
gcmd.respond_info(f" {axis_name}: [NO HOMEDADO]")
gcmd.respond_info("💡 Uso: SET_POSITION_LIMITS AXIS=Z MAX=150")
gcmd.respond_info("💡 Resetear: SET_POSITION_LIMITS RESET=1")
return
# Validar eje
if axis.upper() not in 'XYZ':
raise gcmd.error("AXIS debe ser X, Y o Z")
axis_index = "XYZ".index(axis.upper())
current_limits = kin.limits[axis_index]
if current_limits[0] > current_limits[1]:
raise gcmd.error(f"El eje {axis.upper()} no está homedado")
# Obtener límites actuales
current_min, current_max = current_limits
# Aplicar nuevos valores
new_min = min_val if min_val is not None else current_min
new_max = max_val if max_val is not None else current_max
# Validación básica
if new_min >= new_max:
raise gcmd.error("MIN debe ser menor que MAX")
# Aplicar nuevos límites
kin.limits[axis_index] = (new_min, new_max)
gcmd.respond_info(f"✅ Límites de {axis.upper()} actualizados:")
gcmd.respond_info(f" Anterior: [{current_min:.3f}, {current_max:.3f}]")
gcmd.respond_info(f" Nuevo: [{new_min:.3f}, {new_max:.3f}]")
gcmd.respond_info("⚠️ Estos cambios son temporales hasta el próximo reinicio")
cmd_QUERY_ENDSTOPS_STATUS_help = "Consulta el estado real de todos los endstops"
def cmd_QUERY_ENDSTOPS_STATUS(self, gcmd):
"""Comando para consultar el estado real de todos los endstops."""
if not self.toolhead:
raise gcmd.error("QUERY_ENDSTOPS_STATUS: Sistema no listo.")
# Obtener la cinemática y los rails
kin = self.toolhead.get_kinematics()
gcmd.respond_info("📏 Estado de endstops:")
gcmd.respond_info(f"🕐 Tiempo actual: {self.toolhead.get_last_move_time():.3f}")
# Verificar cada rail
for i, rail in enumerate(kin.rails[:3]): # Solo X, Y, Z
axis_name = "XYZ"[i]
rail_name = rail.get_name()
endstops = rail.get_endstops()
if endstops:
for endstop, name in endstops:
try:
if hasattr(endstop, 'query_endstop'):
state = endstop.query_endstop(self.toolhead.get_last_move_time())
state_text = "ACTIVADO 🔴" if state else "LIBRE 🟢"
gcmd.respond_info(f" {axis_name} ({rail_name}): {state_text}")
else:
gcmd.respond_info(f" {axis_name} ({rail_name}): No consultable")
except Exception as e:
gcmd.respond_info(f" {axis_name} ({rail_name}): Error - {str(e)}")
else:
gcmd.respond_info(f" {axis_name} ({rail_name}): Sin endstop")
# Posición actual
current_pos = self.toolhead.get_position()
gcmd.respond_info(f"📍 Posición actual: X={current_pos[0]:.3f} Y={current_pos[1]:.3f} Z={current_pos[2]:.3f}")
def load_config(config):
return MonitoredMove(config)

View file

@ -0,0 +1,575 @@
# Obstacle Detector for TMC5160 using Hybrid Detection
#
# This module monitors Z homing to detect obstacles using multiple methods:
# 1. StallGuard (SG_RESULT) - Motor load detection
# 2. Timeout - Expected vs actual homing time
# 3. Position stall - No movement despite commands
#
# Copyright (C) 2024 MicroLay
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging
import os
import time
# TMC5160 register addresses
TMC5160_TCOOLTHRS = 0x14
TMC5160_SGTHRS = 0x74
TMC5160_COOLCONF = 0x6d
class ObstacleDetector:
def __init__(self, config):
self.printer = config.get_printer()
self.name = config.get_name()
# Configuration - SG threshold detection
# At 10mm/s, normal SG values are 200-350. Obstacles cause SG drop to near 0.
# We use a low threshold to catch real obstacles while avoiding false positives
# from endstop trigger (which also causes SG=0 briefly)
self.sg_threshold = config.getint('sg_threshold', 30) # Very low - only catch real stalls
self.check_interval = config.getfloat('check_interval', 0.05)
self.min_samples = config.getint('min_samples', 8) # ~400ms of sustained low SG
self.flag_file = config.get('flag_file', '/tmp/homing_obstacle_flag')
self.drop_threshold = config.getint('drop_threshold', 50) # Detect rapid SG drop
# Configuration - Timeout detection
self.expected_travel = config.getfloat('expected_travel', 80.0) # Expected Z travel in mm
self.homing_speed = config.getfloat('homing_speed', 5.0) # Homing speed in mm/s
self.timeout_margin = config.getfloat('timeout_margin', 1.5) # Safety margin multiplier
# Calculate expected homing time with margin
self.expected_time = (self.expected_travel / self.homing_speed) * self.timeout_margin
# TCOOLTHRS: velocity threshold for StallGuard activation
self.tcoolthrs = config.getint('tcoolthrs', 0xFFFFF)
# State
self.monitoring = False
self.tmc = None
self.toolhead = None
self.check_timer = None
self.low_sg_count = 0
self.last_sg_values = []
self.original_tcoolthrs = None
self.last_sg_reading = None
# Hybrid detection state
self.homing_start_time = None
self.last_z_position = None
self.stall_count = 0
self.detection_reason = None
# Register events
self.printer.register_event_handler("klippy:connect", self._handle_connect)
self.printer.register_event_handler("klippy:ready", self._handle_ready)
self.printer.register_event_handler("homing:homing_move_begin", self._start_monitor)
self.printer.register_event_handler("homing:homing_move_end", self._stop_monitor)
# Register commands
self.gcode = self.printer.lookup_object('gcode')
self.gcode.register_command('QUERY_SG', self.cmd_QUERY_SG,
desc=self.cmd_QUERY_SG_help)
self.gcode.register_command('SET_SG_THRESHOLD', self.cmd_SET_SG_THRESHOLD,
desc=self.cmd_SET_SG_THRESHOLD_help)
self.gcode.register_command('SG_MONITOR_TEST', self.cmd_SG_MONITOR_TEST,
desc=self.cmd_SG_MONITOR_TEST_help)
self.gcode.register_command('ENABLE_STALLGUARD', self.cmd_ENABLE_STALLGUARD,
desc=self.cmd_ENABLE_STALLGUARD_help)
self.gcode.register_command('QUERY_TMC_REGS', self.cmd_QUERY_TMC_REGS,
desc=self.cmd_QUERY_TMC_REGS_help)
logging.info(f"ObstacleDetector: HYBRID mode initialized")
logging.info(f" SG threshold={self.sg_threshold}, min_samples={self.min_samples}")
logging.info(f" Expected travel={self.expected_travel}mm, speed={self.homing_speed}mm/s")
logging.info(f" Expected time={self.expected_time:.1f}s (with {self.timeout_margin}x margin)")
def _handle_connect(self):
"""Called when Klipper connects to MCU"""
# Try to get TMC5160 object for stepper_z
self.tmc = self.printer.lookup_object('tmc5160 stepper_z', None)
if self.tmc is None:
logging.warning("ObstacleDetector: TMC5160 stepper_z not found - SG detection disabled")
else:
logging.info("ObstacleDetector: Connected to TMC5160 stepper_z")
# Get toolhead for position monitoring
self.toolhead = self.printer.lookup_object('toolhead', None)
if self.toolhead is None:
logging.warning("ObstacleDetector: Toolhead not found - position monitoring disabled")
def _handle_ready(self):
"""Called when Klipper is fully ready"""
logging.info("ObstacleDetector: Klipper ready, hybrid detection available")
if self.tmc is not None:
logging.info(" - StallGuard monitoring: ENABLED")
else:
logging.info(" - StallGuard monitoring: DISABLED")
if self.toolhead is not None:
logging.info(" - Position/timeout monitoring: ENABLED")
def _set_tcoolthrs_for_homing(self):
"""Set TCOOLTHRS for StallGuard during homing - follows Klipper's TMCEndstopHelper pattern
IMPORTANT: TCOOLTHRS is a WRITE-ONLY register on TMC5160!
Reading it back returns garbage/SPI shift register value, not actual content.
We trust set_register() works (it would throw exception on SPI failure).
We verify StallGuard activation by checking TSTEP instead.
"""
try:
mcu_tmc = self.tmc.mcu_tmc
fields = self.tmc.fields
# Save current field cache value (not from register read - it's write-only)
self.original_tcoolthrs = fields.get_field("tcoolthrs")
logging.info(f"ObstacleDetector: Saving TCOOLTHRS cache value=0x{self.original_tcoolthrs:X}")
# Set TCOOLTHRS to max value (0xFFFFF) to enable StallGuard at all velocities
# Using the proper Klipper pattern: set_field returns full register value
new_tcoolthrs = 0xFFFFF
tc_val = fields.set_field("tcoolthrs", new_tcoolthrs)
# Write to register - set_register() will throw exception if SPI fails
mcu_tmc.set_register("TCOOLTHRS", tc_val)
logging.info(f"ObstacleDetector: Set TCOOLTHRS=0x{new_tcoolthrs:X} (write-only, trusting SPI)")
except Exception as e:
logging.error(f"ObstacleDetector: Error setting TCOOLTHRS for homing: {e}")
import traceback
logging.error(traceback.format_exc())
def _restore_tcoolthrs(self):
"""Restore TCOOLTHRS to original value after homing"""
if self.original_tcoolthrs is None:
return
try:
fields = self.tmc.fields
mcu_tmc = self.tmc.mcu_tmc
tc_val = fields.set_field("tcoolthrs", self.original_tcoolthrs)
mcu_tmc.set_register("TCOOLTHRS", tc_val)
logging.info(f"ObstacleDetector: Restored TCOOLTHRS to 0x{self.original_tcoolthrs:X}")
self.original_tcoolthrs = None
except Exception as e:
logging.error(f"ObstacleDetector: Error restoring TCOOLTHRS: {e}")
def _start_monitor(self, hmove):
"""Called when homing move begins"""
if self.tmc is None:
return
# Check if this is Z-min homing (going DOWN toward the tank)
# We only want to detect obstacles when going DOWN, not during ZMAX_HOME (going up)
#
# ZMAX_HOME uses 'y' endstop (but moves stepper_z) -> DON'T monitor
# G28 Z / HOME_AXIS uses 'z' endstop -> DO monitor
#
# hmove.endstops is a list of (mcu_endstop, name) tuples
is_z_min_homing = False
for mcu_endstop, name in hmove.endstops:
# Check if this is the Z endstop (name 'z' or 'stepper_z')
# ZMAX_HOME uses 'y' endstop, so it won't match
if name in ('z', 'stepper_z'):
steppers = mcu_endstop.get_steppers()
for stepper in steppers:
if stepper.get_name() == 'stepper_z':
is_z_min_homing = True
break
if is_z_min_homing:
break
if not is_z_min_homing:
return
logging.info(f"ObstacleDetector: Starting timeout monitoring for Z homing")
logging.info(f" Expected time={self.expected_time:.1f}s (travel={self.expected_travel}mm @ {self.homing_speed}mm/s)")
# Reset SG detection state
self.monitoring = True
self.low_sg_count = 0
self.last_sg_values = []
self.last_sg_reading = None
self.obstacle_detected = False
self.obstacle_sg_value = None
self.detection_reason = None
# Reset hybrid detection state
self.homing_start_time = time.time()
self.last_z_position = None
self.stall_count = 0
if self.toolhead:
pos = self.toolhead.get_position()
self.last_z_position = pos[2]
logging.info(f" Starting Z position: {self.last_z_position:.3f}mm")
# Start polling timer with initial delay to allow motor to accelerate
# At startup, velocity is 0 and StealthChop is active (SG=0)
# We need to wait for the motor to reach homing speed and switch to SpreadCycle
initial_delay = 0.3 # 300ms delay to allow acceleration
reactor = self.printer.get_reactor()
self.check_timer = reactor.register_timer(
self._check_callback,
reactor.monotonic() + initial_delay
)
logging.info(f"ObstacleDetector: Monitoring active after {initial_delay}s delay")
def _stop_monitor(self, hmove):
"""Called when homing move ends"""
if not self.monitoring:
return
self.monitoring = False
if self.check_timer:
self.printer.get_reactor().unregister_timer(self.check_timer)
self.check_timer = None
# Calculate elapsed time
elapsed = time.time() - self.homing_start_time if self.homing_start_time else 0
# Get final Z position for verification
final_z = None
if self.toolhead:
pos = self.toolhead.get_position()
final_z = pos[2]
# Log final stats
if self.last_sg_values:
avg_sg = sum(self.last_sg_values) / len(self.last_sg_values)
min_sg = min(self.last_sg_values)
max_sg = max(self.last_sg_values)
logging.info(f"ObstacleDetector: Homing completed in {elapsed:.2f}s")
logging.info(f" SG stats: avg={avg_sg:.0f}, min={min_sg}, max={max_sg}, samples={len(self.last_sg_values)}")
if final_z is not None:
logging.info(f" Final Z position: {final_z:.3f}mm")
# Verify if potential obstacle was a false positive
#
# IMPORTANT: When SG threshold detection triggers (SG=0), the endstop often
# triggers shortly after due to vibration from hitting the obstacle.
# This makes the final Z position unreliable for verification.
#
# New logic:
# - sg_threshold detection: ALWAYS treat as real obstacle (SG=0 is very specific)
# - timeout detection: Only treat as real if Z > threshold (motor might just be slow)
if self.obstacle_detected:
is_real_obstacle = False
z_threshold = 5.0 # mm
if self.detection_reason == "sg_threshold":
# SG threshold detection is highly reliable - SG=0 means real stall
is_real_obstacle = True
logging.warning(f"ObstacleDetector: REAL OBSTACLE confirmed by StallGuard!")
logging.warning(f" Final Z={final_z:.3f}mm, SG dropped to threshold")
logging.warning(f" Flag file preserved for MicLoop: {self.flag_file}")
elif self.detection_reason == "timeout":
# Timeout detection - homing took longer than expected
# This usually means an obstacle blocked the endstop from being reached
#
# IMPORTANT: We can't use Z position for verification because:
# - When motor skips steps hitting an obstacle, Klipper loses position
# - Klipper may think Z=1mm when physically it's at 30mm+
# - The position is UNRELIABLE after step loss
#
# A timeout during homing almost always means something went wrong
# (obstacle, mechanical issue, etc). Better to keep the flag and let
# MicLoop handle it rather than clear a valid detection.
is_real_obstacle = True
logging.warning(f"ObstacleDetector: REAL OBSTACLE confirmed by timeout!")
logging.warning(f" Homing took {elapsed:.2f}s (expected {self.expected_time:.2f}s)")
logging.warning(f" Klipper position Z={final_z:.3f}mm (may be inaccurate due to step loss)")
logging.warning(f" Flag file preserved for MicLoop: {self.flag_file}")
else:
# Unknown detection reason - keep the flag to be safe
is_real_obstacle = True
logging.warning(f"ObstacleDetector: Unknown detection reason '{self.detection_reason}', keeping flag")
if not is_real_obstacle:
self.obstacle_detected = False
self.detection_reason = None
# Reset hybrid detection state
self.homing_start_time = None
self.last_z_position = None
self.stall_count = 0
def _check_callback(self, eventtime):
"""Timer callback for HYBRID obstacle detection
Detection methods:
1. SG threshold - TMC5160 StallGuard value drops below threshold
2. Timeout - Homing takes longer than expected (obstacle blocking endstop)
3. Position stall - Z position not changing despite motor commands
"""
if not self.monitoring:
return self.printer.get_reactor().NEVER
try:
# ===== METHOD 1: StallGuard Detection =====
# Read DRV_STATUS register
drv_status = self.tmc.mcu_tmc.get_register("DRV_STATUS")
sg_result = drv_status & 0x3FF # bits 0-9
stallguard_flag = (drv_status >> 24) & 0x01
stst = (drv_status >> 31) & 0x01 # standstill flag
cs_actual = (drv_status >> 16) & 0x1F
# Read TSTEP (readable!) to verify velocity
tstep = self.tmc.mcu_tmc.get_register("TSTEP")
tstep_val = tstep & 0xFFFFF # 20-bit value
# Log detailed debug info for first few samples
if len(self.last_sg_values) < 5:
elapsed = time.time() - self.homing_start_time if self.homing_start_time else 0
logging.info(f"ObstacleDetector DEBUG: SG={sg_result} TSTEP={tstep_val} "
f"stst={stst} CS={cs_actual} elapsed={elapsed:.2f}s")
# Skip SG check if motor is at standstill (not moving)
if not stst:
# Track values for statistics
self.last_sg_values.append(sg_result)
if len(self.last_sg_values) > 100:
self.last_sg_values.pop(0)
self.last_sg_reading = sg_result
# Check for obstacle by SG threshold
if sg_result < self.sg_threshold:
self.low_sg_count += 1
if self.low_sg_count >= self.min_samples and not self.obstacle_detected:
logging.warning(f"ObstacleDetector: OBSTACLE by SG threshold "
f"(SG={sg_result}, count={self.low_sg_count})")
self._handle_obstacle(sg_result, reason="sg_threshold")
else:
if self.low_sg_count > 0:
self.low_sg_count = 0
# ===== METHOD 2: Timeout Detection =====
if self.homing_start_time:
elapsed = time.time() - self.homing_start_time
if elapsed > self.expected_time and not self.obstacle_detected:
logging.warning(f"ObstacleDetector: OBSTACLE by timeout "
f"(elapsed={elapsed:.2f}s > expected={self.expected_time:.2f}s)")
self._handle_obstacle(sg_result if self.last_sg_reading else 0, reason="timeout")
# ===== METHOD 3: Position Stall Detection =====
# NOTE: Position stall detection is DISABLED during homing because
# toolhead.get_position() doesn't update reliably during drip moves.
# We rely on SG threshold and timeout detection instead.
# This method is kept for future use with regular moves.
#
# if self.toolhead and self.last_z_position is not None:
# ... position stall detection code ...
except Exception as e:
logging.error(f"ObstacleDetector: Error in check callback: {e}")
return eventtime + self.check_interval
def _handle_obstacle(self, sg_value, reason="unknown"):
"""Handle detected obstacle - mark for later verification
We DON'T shutdown immediately because SG=0 also occurs when the
Z-min endstop is triggered (normal homing completion). Instead:
1. Create a flag file
2. Let homing continue
3. If homing completes normally (endstop triggered), the flag will be
cleared in _stop_monitor
4. If homing times out (obstacle blocked endstop), flag persists for MicLoop
Detection reasons:
- sg_threshold: StallGuard value dropped below threshold
- timeout: Homing took longer than expected
- position_stall: Z position not changing despite motor running
"""
# Don't stop monitoring - let homing continue
# The flag will be verified in _stop_monitor
# Calculate elapsed time for diagnostics
elapsed = time.time() - self.homing_start_time if self.homing_start_time else 0
# Create flag file for potential obstacle
try:
with open(self.flag_file, 'w') as f:
f.write(f'OBSTACLE:{reason}:SG={sg_value}:elapsed={elapsed:.2f}s')
logging.warning(f"ObstacleDetector: Potential obstacle detected by {reason} "
f"(SG={sg_value}, elapsed={elapsed:.2f}s), flag created")
except Exception as e:
logging.error(f"ObstacleDetector: Error creating flag file: {e}")
# Mark that we detected something (for verification in _stop_monitor)
self.obstacle_detected = True
self.obstacle_sg_value = sg_value
self.detection_reason = reason
cmd_QUERY_SG_help = "Query current StallGuard value from TMC5160"
def cmd_QUERY_SG(self, gcmd):
"""Manual command to query SG_RESULT"""
if self.tmc is None:
raise gcmd.error("TMC5160 stepper_z not found")
try:
drv_status = self.tmc.mcu_tmc.get_register("DRV_STATUS")
sg_result = drv_status & 0x3FF
stallguard_flag = (drv_status >> 24) & 0x01
cs_actual = (drv_status >> 16) & 0x1F
stst = (drv_status >> 31) & 0x01
gcmd.respond_info(f"TMC5160 stepper_z Status:")
gcmd.respond_info(f" SG_RESULT: {sg_result} (threshold: {self.sg_threshold})")
gcmd.respond_info(f" STALLGUARD flag: {stallguard_flag}")
gcmd.respond_info(f" CS_ACTUAL: {cs_actual}")
gcmd.respond_info(f" Standstill: {stst}")
gcmd.respond_info(f" Raw DRV_STATUS: 0x{drv_status:08X}")
except Exception as e:
raise gcmd.error(f"Error reading DRV_STATUS: {e}")
cmd_SET_SG_THRESHOLD_help = "Set StallGuard threshold for obstacle detection"
def cmd_SET_SG_THRESHOLD(self, gcmd):
"""Set the SG threshold dynamically"""
threshold = gcmd.get_int('THRESHOLD', None)
if threshold is None:
gcmd.respond_info(f"Current SG threshold: {self.sg_threshold}")
return
if threshold < 0 or threshold > 1023:
raise gcmd.error("Threshold must be between 0 and 1023")
old_threshold = self.sg_threshold
self.sg_threshold = threshold
gcmd.respond_info(f"SG threshold changed: {old_threshold} -> {self.sg_threshold}")
cmd_SG_MONITOR_TEST_help = "Test SG monitoring for specified duration"
def cmd_SG_MONITOR_TEST(self, gcmd):
"""Test command to monitor SG for a duration without homing"""
if self.tmc is None:
raise gcmd.error("TMC5160 stepper_z not found")
duration = gcmd.get_float('DURATION', 5.0)
gcmd.respond_info(f"Monitoring SG_RESULT for {duration}s...")
gcmd.respond_info("Move Z axis manually or run a G1 Z command to see values")
# Start a simple monitoring loop
import time
start_time = time.time()
values = []
while time.time() - start_time < duration:
try:
drv_status = self.tmc.mcu_tmc.get_register("DRV_STATUS")
sg_result = drv_status & 0x3FF
stst = (drv_status >> 31) & 0x01
if not stst: # Only log when moving
values.append(sg_result)
gcmd.respond_info(f"SG={sg_result}")
time.sleep(0.1)
except:
pass
if values:
gcmd.respond_info(f"Results: min={min(values)}, max={max(values)}, "
f"avg={sum(values)/len(values):.0f}, samples={len(values)}")
else:
gcmd.respond_info("No movement detected during test")
cmd_ENABLE_STALLGUARD_help = "Enable StallGuard by setting TCOOLTHRS"
def cmd_ENABLE_STALLGUARD(self, gcmd):
"""Command to enable/re-enable StallGuard
Note: TCOOLTHRS is WRITE-ONLY. We cannot verify by read-back.
We trust set_register() works (would throw exception on SPI failure).
"""
if self.tmc is None:
raise gcmd.error("TMC5160 stepper_z not found")
tcoolthrs = gcmd.get_int('TCOOLTHRS', self.tcoolthrs)
try:
# Use fields API for proper register setting
fields = self.tmc.fields
old_cached = fields.get_field("tcoolthrs")
tc_val = fields.set_field("tcoolthrs", tcoolthrs)
# Get synchronized print time
toolhead = self.printer.lookup_object('toolhead')
print_time = toolhead.get_last_move_time()
# Write to register (TCOOLTHRS is write-only, cannot verify by read)
self.tmc.mcu_tmc.set_register("TCOOLTHRS", tc_val, print_time)
gcmd.respond_info(f"TCOOLTHRS set: 0x{old_cached:X} -> 0x{tcoolthrs:X}")
gcmd.respond_info(f"Note: TCOOLTHRS is write-only, cannot verify by read-back")
gcmd.respond_info(f"Run QUERY_TMC_REGS while motor is moving to verify SG is working")
except Exception as e:
import traceback
gcmd.respond_info(f"Error: {traceback.format_exc()}")
raise gcmd.error(f"Error setting TCOOLTHRS: {e}")
cmd_QUERY_TMC_REGS_help = "Query TMC5160 registers relevant to StallGuard"
def cmd_QUERY_TMC_REGS(self, gcmd):
"""Query TMC5160 registers for debugging"""
if self.tmc is None:
raise gcmd.error("TMC5160 stepper_z not found")
try:
# Read key registers (only readable ones!)
drv_status = self.tmc.mcu_tmc.get_register("DRV_STATUS")
tstep = self.tmc.mcu_tmc.get_register("TSTEP") # Readable: current velocity
chopconf = self.tmc.mcu_tmc.get_register("CHOPCONF")
gconf = self.tmc.mcu_tmc.get_register("GCONF")
# Get cached values for write-only registers
fields = self.tmc.fields
tcoolthrs_cached = fields.get_field("tcoolthrs")
tpwmthrs_cached = fields.get_field("tpwmthrs")
sgt_cached = fields.get_field("sgt")
# Parse DRV_STATUS
sg_result = drv_status & 0x3FF
cs_actual = (drv_status >> 16) & 0x1F
stallguard = (drv_status >> 24) & 0x01
stst = (drv_status >> 31) & 0x01
# Parse TSTEP
tstep_val = tstep & 0xFFFFF
# Parse CHOPCONF
toff = chopconf & 0x0F
mres = (chopconf >> 24) & 0x0F
# Parse GCONF - TMC5160 uses en_spreadcycle (inverted logic from en_pwm_mode)
en_spreadcycle = (gconf >> 2) & 0x01 # bit 2: en_spreadcycle
stealthchop_active = not en_spreadcycle
# StallGuard is active when: motor moving AND TSTEP <= TCOOLTHRS AND not in StealthChop
# Note: Higher TSTEP = lower velocity, so TSTEP <= TCOOLTHRS means "velocity below threshold"
sg_should_work = (not stst) and (tstep_val <= tcoolthrs_cached) and (not stealthchop_active)
gcmd.respond_info("TMC5160 StallGuard Configuration:")
gcmd.respond_info(f" TCOOLTHRS (cached): 0x{tcoolthrs_cached:X} ({tcoolthrs_cached}) [WRITE-ONLY]")
gcmd.respond_info(f" TPWMTHRS (cached): 0x{tpwmthrs_cached:X} ({tpwmthrs_cached})")
gcmd.respond_info(f" SGT (cached): {sgt_cached}")
gcmd.respond_info(f" TSTEP (live): {tstep_val} (lower=faster)")
gcmd.respond_info(f" GCONF: en_spreadcycle={en_spreadcycle} -> StealthChop {'OFF' if en_spreadcycle else 'ON'}")
gcmd.respond_info(f" CHOPCONF: TOFF={toff}, MRES={mres} ({256 >> mres} usteps)")
gcmd.respond_info(f"DRV_STATUS (live):")
gcmd.respond_info(f" SG_RESULT: {sg_result}")
gcmd.respond_info(f" CS_ACTUAL: {cs_actual}")
gcmd.respond_info(f" STALLGUARD flag: {stallguard}")
gcmd.respond_info(f" Standstill: {stst}")
gcmd.respond_info(f" Raw: 0x{drv_status:08X}")
gcmd.respond_info(f"Analysis: SG should work = {sg_should_work}")
if stst:
gcmd.respond_info(f" -> Motor at standstill")
if stealthchop_active:
gcmd.respond_info(f" -> StealthChop active (SG disabled)")
if tstep_val > tcoolthrs_cached:
gcmd.respond_info(f" -> TSTEP > TCOOLTHRS (velocity too low for SG)")
except Exception as e:
raise gcmd.error(f"Error reading registers: {e}")
def load_config(config):
return ObstacleDetector(config)

View file

@ -0,0 +1,455 @@
# Archivo: klipper/klippy/extras/resonance_test.py
# Copyright (C) 2024 MicroLay
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging, math
class TestAxis:
def __init__(self, axis='z'):
self._name = axis
self._vib_dir = (0., 0., 1.) if axis == 'z' else (0., 0., 0.)
def get_name(self):
return self._name
def get_point(self, l):
return (0., 0., self._vib_dir[2] * l)
class VibrationGenerator:
def __init__(self, config):
self.min_freq = config.getfloat('min_freq', 5., minval=1.)
self.max_freq = config.getfloat('max_freq', 135., minval=self.min_freq, maxval=100000.)
self.accel_per_hz = config.getfloat('accel_per_hz', 60., above=0.)
self.hz_per_sec = config.getfloat('hz_per_sec', 1., minval=0.1, maxval=100.)
self.default_amplitude = config.getfloat('amplitude', 1., above=0.) # Amplitud en mm
def prepare_test(self, gcmd):
self.freq_start = gcmd.get_float("FREQ_START", self.min_freq, minval=1.)
self.freq_end = gcmd.get_float("FREQ_END", self.max_freq, minval=self.freq_start, maxval=100000.)
self.test_amplitude = gcmd.get_float("AMPLITUDE", self.default_amplitude, above=0.)
self.test_hz_per_sec = gcmd.get_float("HZ_PER_SEC", self.hz_per_sec, above=0., maxval=100.)
# Calcular aceleración basada en la amplitud deseada
# Para un movimiento sinusoidal: a = (2πf)²A
# donde f es la frecuencia y A es la amplitud
self.test_accel_per_hz = (2. * math.pi) ** 2 * self.test_amplitude
def gen_test(self):
freq = self.freq_start
res = []
sign = 1.
time = 0.
# Asegurar un incremento mínimo para evitar quedarse atascado
min_freq_increment = 0.01 # Incremento mínimo de 0.01 Hz
# Caso especial: si freq_start == freq_end, generar al menos un ciclo
if abs(self.freq_start - self.freq_end) < 0.00001:
t_seg = .25 / freq
accel = self.test_accel_per_hz * freq * freq # a = (2πf)²A
# Generar al menos 4 puntos (un ciclo completo)
time += t_seg
res.append((time, sign * accel, freq))
time += t_seg
res.append((time, -sign * accel, freq))
time += t_seg
res.append((time, -sign * accel, freq))
time += t_seg
res.append((time, sign * accel, freq))
return res
# Usar una condición más estricta para asegurar la terminación
while freq < self.freq_end:
t_seg = .25 / freq
accel = self.test_accel_per_hz * freq * freq # a = (2πf)²A
time += t_seg
res.append((time, sign * accel, freq))
time += t_seg
res.append((time, -sign * accel, freq))
# Calcular el incremento basado en el algoritmo original
freq_increment = 2. * t_seg * self.test_hz_per_sec
# Asegurar que el incremento no sea demasiado pequeño
if freq_increment < min_freq_increment:
freq_increment = min_freq_increment
# Prevenir incrementos demasiado grandes para frecuencias muy bajas
if freq_increment > (self.freq_end - self.freq_start) / 10.0:
freq_increment = (self.freq_end - self.freq_start) / 10.0
freq += freq_increment
# Asegurar que la última iteración llegue exactamente a freq_end
if freq > self.freq_end - freq_increment and freq < self.freq_end:
freq = self.freq_end
sign = -sign
# Añadir la frecuencia final si no está ya
if len(res) > 0 and res[-1][2] < self.freq_end:
t_seg = .25 / self.freq_end
accel = self.test_accel_per_hz * self.freq_end * self.freq_end
time += t_seg
res.append((time, sign * accel, self.freq_end))
time += t_seg
res.append((time, -sign * accel, self.freq_end))
return res
class ResonanceTest:
def __init__(self, config):
self.printer = config.get_printer()
self.gcode = self.printer.lookup_object('gcode')
self.axis = TestAxis('z')
self.generator = VibrationGenerator(config)
# Registrar comandos
self.gcode.register_command(
'RESONANCE_TEST_START',
self.cmd_RESONANCE_TEST_START,
desc=self.cmd_RESONANCE_TEST_START_help
)
self.gcode.register_command(
'RESONANCE_TEST_STOP',
self.cmd_RESONANCE_TEST_STOP,
desc=self.cmd_RESONANCE_TEST_STOP_help
)
self.gcode.register_command(
'RESONANCE_TEST_FIXED',
self.cmd_RESONANCE_TEST_FIXED,
desc=self.cmd_RESONANCE_TEST_FIXED_help
)
# Variables para el control de la prueba
self.is_testing = False
def _check_axis_homed(self, toolhead, axis, skip_home):
status = toolhead.get_status(self.printer.get_reactor().monotonic())
homed_axes = status.get('homed_axes', '')
if axis in homed_axes or skip_home:
return True
return False
cmd_RESONANCE_TEST_START_help = "Inicia la prueba de resonancia con movimientos oscilatorios"
def cmd_RESONANCE_TEST_START(self, gcmd):
if self.is_testing:
raise gcmd.error("Ya hay una prueba de resonancia en curso")
# Preparar la prueba
self.generator.prepare_test(gcmd)
test_seq = self.generator.gen_test()
# Verificar que la secuencia de prueba no esté vacía
if not test_seq:
raise gcmd.error("No se pudo generar una secuencia de prueba válida con los parámetros proporcionados")
# Obtener parámetros para controlar el homing
no_home = gcmd.get_int("NO_HOME", 0)
skip_pre_home = gcmd.get_int("SKIP_HOME", 0)
# Obtener objetos necesarios
reactor = self.printer.get_reactor()
toolhead = self.printer.lookup_object('toolhead')
# Verificar si Z está homeado, si es necesario
if not self._check_axis_homed(toolhead, 'z', skip_pre_home):
if not skip_pre_home:
gcmd.respond_info("Ejecutando G28 Z antes de la prueba...")
self.gcode.run_script_from_command("G28 Z")
else:
gcmd.respond_info("ADVERTENCIA: Eje Z no está homeado pero SKIP_HOME=1 fue especificado")
# Guardar configuración actual
systime = reactor.monotonic()
toolhead_info = toolhead.get_status(systime)
old_max_accel = toolhead_info['max_accel']
old_max_z_accel = toolhead_info.get('max_z_accel', old_max_accel)
old_max_velocity = toolhead_info['max_velocity']
old_max_z_velocity = toolhead_info.get('max_z_velocity', old_max_velocity)
# Obtener 'minimum_cruise_ratio' solo si existe, de lo contrario usar un valor predeterminado
old_minimum_cruise_ratio = toolhead_info.get('minimum_cruise_ratio', 0.5)
# Calcular aceleración máxima necesaria
max_accel = max([abs(a) for _, a, _ in test_seq]) if test_seq else old_max_accel
# Variables para restauración
tmc_z = None
old_thresh = None
try:
self.is_testing = True
# Configurar límites de velocidad más altos para la prueba
self.gcode.run_script_from_command(
"SET_VELOCITY_LIMIT ACCEL=%.3f ACCEL_TO_DECEL=%.3f SQUARE_CORNER_VELOCITY=5 VELOCITY=500"
% (max_accel, max_accel))
# Deshabilitar el modo stealthchop durante la prueba
tmc_z = self.printer.lookup_object('tmc5160 stepper_z', None)
if tmc_z is not None:
try:
old_thresh = tmc_z.get_register("TPWMTHRS")
tmc_z.set_register("TPWMTHRS", 0)
except:
gcmd.respond_info("No se pudo modificar TPWMTHRS")
# Obtener posición actual
X, Y, Z, E = toolhead.get_position()
# Ejecutar secuencia de prueba
last_v = last_t = last_freq = 0.
gcmd.respond_info("Iniciando prueba de resonancia de %.1f Hz a %.1f Hz" %
(self.generator.freq_start, self.generator.freq_end))
# Mostrar el número total de pasos para dar feedback de progreso
total_steps = len(test_seq)
current_step = 0
for next_t, accel, freq in test_seq:
current_step += 1
if not self.is_testing:
break
t_seg = next_t - last_t
toolhead.cmd_M204(self.gcode.create_gcode_command(
"M204", "M204", {"S": abs(accel)}))
# Calcular nueva velocidad y posición
v = last_v + accel * t_seg
abs_v = abs(v)
if abs_v < 0.000001:
v = abs_v = 0.
abs_last_v = abs(last_v)
# Calcular desplazamiento
v2 = v * v
last_v2 = last_v * last_v
half_inv_accel = .5 / accel if accel != 0 else 0
d = (v2 - last_v2) * half_inv_accel
_, _, dZ = self.axis.get_point(d)
nZ = Z + dZ
# Ejecutar movimiento
toolhead.limit_next_junction_speed(abs_last_v)
if v * last_v < 0:
# El movimiento primero se detiene y luego cambia de dirección
d_decel = -last_v2 * half_inv_accel
_, _, decel_Z = self.axis.get_point(d_decel)
toolhead.move([X, Y, Z + decel_Z, E], abs_last_v)
toolhead.move([X, Y, nZ, E], abs_v)
else:
toolhead.move([X, Y, nZ, E], max(abs_v, abs_last_v))
# Actualizar estado y mostrar progreso
if math.floor(freq) > math.floor(last_freq):
progress = int((current_step / total_steps) * 100)
gcmd.respond_info("Probando frecuencia %.1f Hz (Progreso: %d%%)" % (freq, progress))
Z = nZ
last_t = next_t
last_v = v
last_freq = freq
# Desacelerar al final si es necesario
if last_v:
d_decel = -.5 * last_v2 / old_max_accel
_, _, decel_Z = self.axis.get_point(d_decel)
toolhead.cmd_M204(self.gcode.create_gcode_command(
"M204", "M204", {"S": old_max_accel}))
toolhead.move([X, Y, Z + decel_Z, E], abs(last_v))
# Mensaje de finalización
gcmd.respond_info("¡Prueba de resonancia completada con éxito!")
finally:
self.is_testing = False
# Restaurar configuración original
self.gcode.run_script_from_command(
"SET_VELOCITY_LIMIT ACCEL=%.3f VELOCITY=%.3f SQUARE_CORNER_VELOCITY=5"
% (old_max_accel, old_max_velocity))
# Restaurar stealthchop si estaba activo
if tmc_z is not None and old_thresh is not None:
try:
tmc_z.set_register("TPWMTHRS", old_thresh)
except:
gcmd.respond_info("No se pudo restaurar TPWMTHRS")
# Volver a home en Z solo si no se especificó NO_HOME=1
if not no_home:
self.gcode.run_script_from_command("G28 Z")
gcmd.respond_info("Eje Z homeado después de la prueba")
else:
gcmd.respond_info("Se omitió el homing del eje Z según lo solicitado")
# Mensaje final incluso si hubo algún error
if not gcmd.get_int("SILENT", 0):
gcmd.respond_info("Prueba de resonancia finalizada y configuración restaurada")
cmd_RESONANCE_TEST_STOP_help = "Detiene la prueba de resonancia en curso"
def cmd_RESONANCE_TEST_STOP(self, gcmd):
if not self.is_testing:
gcmd.respond_info("No hay ninguna prueba de resonancia en curso")
return
self.is_testing = False
gcmd.respond_info("Prueba de resonancia detenida")
def _setup_test_conditions(self, gcmd, max_accel):
# Obtener objetos necesarios
reactor = self.printer.get_reactor()
toolhead = self.printer.lookup_object('toolhead')
# Guardar configuración actual
systime = reactor.monotonic()
toolhead_info = toolhead.get_status(systime)
old_max_accel = toolhead_info['max_accel']
old_max_velocity = toolhead_info['max_velocity']
# Variables para restauración
tmc_z = None
old_thresh = None
try:
# Configurar límites de velocidad más altos para la prueba
self.gcode.run_script_from_command(
"SET_VELOCITY_LIMIT ACCEL=%.3f ACCEL_TO_DECEL=%.3f SQUARE_CORNER_VELOCITY=5 VELOCITY=500"
% (max_accel, max_accel))
# Deshabilitar el modo stealthchop durante la prueba
tmc_z = self.printer.lookup_object('tmc5160 stepper_z', None)
if tmc_z is not None:
try:
old_thresh = tmc_z.get_register("TPWMTHRS")
tmc_z.set_register("TPWMTHRS", 0)
except:
gcmd.respond_info("No se pudo modificar TPWMTHRS")
return toolhead, old_max_accel, old_max_velocity, tmc_z, old_thresh
except Exception as e:
if tmc_z is not None and old_thresh is not None:
try:
tmc_z.set_register("TPWMTHRS", old_thresh)
except:
pass
raise
def _restore_settings(self, gcmd, old_max_accel, old_max_velocity, tmc_z, old_thresh):
# Restaurar configuración original
self.gcode.run_script_from_command(
"SET_VELOCITY_LIMIT ACCEL=%.3f VELOCITY=%.3f SQUARE_CORNER_VELOCITY=5"
% (old_max_accel, old_max_velocity))
# Restaurar stealthchop si estaba activo
if tmc_z is not None and old_thresh is not None:
try:
tmc_z.set_register("TPWMTHRS", old_thresh)
except:
gcmd.respond_info("No se pudo restaurar TPWMTHRS")
# Volver a home en Z solo si no se especificó NO_HOME=1
no_home = gcmd.get_int("NO_HOME", 0)
skip_pre_home = gcmd.get_int("SKIP_HOME", 0)
if not no_home and not skip_pre_home:
self.gcode.run_script_from_command("G28 Z")
gcmd.respond_info("Eje Z homeado después de la prueba")
else:
gcmd.respond_info("Se omitió el homing del eje Z según lo solicitado")
cmd_RESONANCE_TEST_FIXED_help = "Ejecuta un movimiento oscilatorio con frecuencia fija"
def cmd_RESONANCE_TEST_FIXED(self, gcmd):
if self.is_testing:
raise gcmd.error("Ya hay una prueba de resonancia en curso")
# Obtener parámetros para controlar el homing
skip_pre_home = gcmd.get_int("SKIP_HOME", 0)
# Obtener y validar parámetros
freq = gcmd.get_float('FREQ', above=0.)
if freq > 100000:
raise gcmd.error("Frecuencia demasiado alta. Máximo recomendado: 100000 Hz")
accel_per_hz = gcmd.get_float('ACCEL_PER_HZ', 60., above=0.)
duration = gcmd.get_float('DURATION', 5., above=0.)
# Calcular aceleración basada en ACCEL_PER_HZ
accel = accel_per_hz * freq
# Obtener toolhead
toolhead = self.printer.lookup_object('toolhead')
# Verificar si Z está homeado, si es necesario
if not self._check_axis_homed(toolhead, 'z', skip_pre_home):
if not skip_pre_home:
gcmd.respond_info("Ejecutando G28 Z antes de la prueba...")
self.gcode.run_script_from_command("G28 Z")
else:
gcmd.respond_info("ADVERTENCIA: Eje Z no está homeado pero SKIP_HOME=1 fue especificado")
try:
self.is_testing = True
# Configurar condiciones de prueba
toolhead, old_max_accel, old_max_velocity, tmc_z, old_thresh = self._setup_test_conditions(gcmd, accel)
gcmd.respond_info("Configuración inicial completada")
# Obtener el stepper Z
kin = toolhead.get_kinematics()
steppers = [s for s in kin.get_steppers() if s.get_name() == 'stepper_z']
if not steppers:
raise gcmd.error("No se encontró el stepper Z")
stepper = steppers[0]
# Obtener posición actual
X, Y, Z, E = toolhead.get_position()
gcmd.respond_info("Posición inicial: X=%.3f Y=%.3f Z=%.3f" % (X, Y, Z))
# Calcular parámetros del movimiento
period = 1. / freq # Periodo en segundos
t_seg = period / 4. # Tiempo por segmento (1/4 del periodo)
mcu = stepper.get_mcu()
print_time = mcu.estimated_print_time(toolhead.get_last_move_time())
clock = mcu.print_time_to_clock(print_time)
# Calcular tiempos en ciclos de reloj
cycle_ticks = mcu.seconds_to_clock(period)
# Generar comandos de movimiento
gcmd.respond_info("Generando comandos de movimiento...")
# Obtener el comando de paso
step_cmd = mcu.lookup_command(
"queue_step oid=%c interval=%u count=%hu add=%hi",
"queue_step oid=%c interval=%u count=%hu add=%hi")
# Calcular parámetros de movimiento
interval = cycle_ticks // 4 # Dividir el ciclo en 4 partes
steps = 100 # Número de pasos por segmento
# Enviar comandos de movimiento
for i in range(int(duration * freq)):
# Primer cuarto (hacia arriba)
step_cmd.send([stepper.get_oid(), interval, steps, 1])
# Segundo cuarto (desaceleración)
step_cmd.send([stepper.get_oid(), interval, steps, -1])
# Tercer cuarto (hacia abajo)
step_cmd.send([stepper.get_oid(), interval, steps, -1])
# Cuarto cuarto (desaceleración)
step_cmd.send([stepper.get_oid(), interval, steps, 1])
if i % 10 == 0:
gcmd.respond_info("Ciclo %d enviado" % i)
gcmd.respond_info("Comandos de movimiento enviados")
finally:
self.is_testing = False
self._restore_settings(gcmd, old_max_accel, old_max_velocity, tmc_z, old_thresh)
def load_config(config):
return ResonanceTest(config)

View file

@ -0,0 +1,592 @@
# Módulo para homing del eje Z hacia Zmax usando el endstop de Y
#
# Copyright (C) 2024 MicroLay
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging
class YEndstopWrapper:
"""Wrapper para el endstop de Y que es compatible con probing_move()
Usa el endstop de Y para la detección, pero devuelve los steppers de Z
para que probing_move mida correctamente el movimiento del eje Z.
"""
def __init__(self, y_endstop, z_steppers):
self.y_endstop = y_endstop
self.z_steppers = z_steppers
# Delegar métodos del endstop original
self.get_mcu = y_endstop.get_mcu
self.home_start = y_endstop.home_start
self.home_wait = y_endstop.home_wait
self.query_endstop = y_endstop.query_endstop
def get_steppers(self):
# Devolver steppers de Z para que probing_move mida el eje correcto
return self.z_steppers
def add_stepper(self, stepper):
# No añadir steppers al endstop Y original
pass
class ZMaxHomingAlt:
def __init__(self, config):
self.printer = config.get_printer()
self.gcode = self.printer.lookup_object('gcode')
# Registrar para eventos
self.printer.register_event_handler("klippy:connect", self._handle_connect)
# Variables
self.toolhead = None
self.y_endstop = None
self.y_endstop_wrapper = None
self.z_stepper = None
self.phoming = None
# Configuración
self.speed = config.getfloat('speed', 5.0, above=0.)
self.second_speed = config.getfloat('second_speed', 2.0, above=0.)
self.retract_dist = config.getfloat('retract_dist', 3.0, minval=0.)
self.retract_speed = config.getfloat('retract_speed', 3.0, above=0.)
self.final_retract = config.getboolean('final_retract', False)
# Registrar comandos
self.gcode.register_command(
'ZMAX_HOME',
self.cmd_ZMAX_HOME,
desc=self.cmd_ZMAX_HOME_help
)
self.gcode.register_command(
'Z_CALIBRATE',
self.cmd_Z_CALIBRATE,
desc=self.cmd_Z_CALIBRATE_help
)
logging.info(f"ZMaxHoming: Initialized with speeds - homing:{self.speed} mm/s, second:{self.second_speed} mm/s")
def _handle_connect(self):
# Obtener referencias necesarias
self.toolhead = self.printer.lookup_object('toolhead')
self.phoming = self.printer.lookup_object('homing')
# Obtener referencias de la cinemática
kin = self.toolhead.get_kinematics()
z_steppers = None
if hasattr(kin, 'rails'):
# Primero obtener el stepper Z
for rail in kin.rails:
if rail.get_name() == "stepper_z":
self.z_stepper = rail
z_steppers = rail.get_steppers()
z_max_cfg = rail.get_range()[1]
logging.info(f"ZMaxHoming: Z-max position from config: {z_max_cfg}mm")
logging.info(f"ZMaxHoming: Z steppers: {[s.get_name() for s in z_steppers]}")
break
# Obtener el endstop de Y
for rail in kin.rails:
if rail.get_name() == "stepper_y":
endstops = rail.get_endstops()
if endstops:
self.y_endstop = endstops[0][0]
logging.info(f"ZMaxHoming: Using Y endstop for Z-max homing")
break
if self.y_endstop is None:
raise self.printer.config_error("No se encontró el endstop de Y")
if self.z_stepper is None:
raise self.printer.config_error("No se encontró el stepper Z")
# Crear wrapper que usa endstop Y pero mide con steppers Z
self.y_endstop_wrapper = YEndstopWrapper(self.y_endstop, z_steppers)
def _continuous_probe_move(self, target_pos, speed):
"""
Realiza un movimiento continuo usando la función oficial probing_move() de Klipper.
Esta es la misma función que usan todos los módulos de sondeo oficiales.
"""
try:
trigger_pos = self.phoming.probing_move(self.y_endstop_wrapper, target_pos, speed)
# Mensaje en el momento exacto de detección
gcode = self.printer.lookup_object('gcode')
gcode.respond_info(f"Y-endstop triggered at Z={trigger_pos[2]:.3f}mm")
return trigger_pos
except self.printer.command_error as e:
if "No trigger" in str(e):
# Create flag file for MicLoop obstacle recovery
import logging
try:
flag_file = '/tmp/homing_obstacle_flag'
with open(flag_file, 'w') as flag:
flag.write('TIMEOUT_OBSTACLE')
logging.info(f"ZMaxHoming: Obstacle flag created at {flag_file} (timeout detected)")
except Exception as flag_err:
logging.warning(f"ZMaxHoming: Could not create flag file: {flag_err}")
raise self.printer.command_error(
f"El endstop Y no se activó durante el movimiento hacia Z={target_pos[2]:.3f}. "
"Verifica que la plataforma pueda alcanzar el final de carrera.")
raise
def _measure_platform_travel(self, gcmd, start_pos, z_max_cfg):
"""
Mide el recorrido total de la plataforma desde Z-max hasta Z-min + position_endstop
"""
try:
gcmd.respond_info("=== INICIANDO MEDICIÓN DE RECORRIDO ===")
# Obtener configuración del stepper Z
z_min_cfg = self.z_stepper.get_range()[0]
# Obtener position_endstop del stepper Z
z_endstop_pos = 0.0 # Valor por defecto
try:
# Buscar en la configuración del stepper Z
kin = self.toolhead.get_kinematics()
if hasattr(kin, 'rails'):
for rail in kin.rails:
if rail.get_name() == "stepper_z":
endstops = rail.get_endstops()
if endstops:
# Obtener position_endstop del endstop Z
z_endstop = endstops[0][0]
if hasattr(z_endstop, 'position_endstop'):
z_endstop_pos = z_endstop.position_endstop
break
except Exception as e:
gcmd.respond_info(f"Advertencia: No se pudo obtener position_endstop: {e}")
gcmd.respond_info(f"Configuración Z: min={z_min_cfg:.3f}, max={z_max_cfg:.3f}, position_endstop={z_endstop_pos:.3f}")
# Posición inicial (Z-max)
start_z = start_pos[2]
gcmd.respond_info(f"Posición inicial (Z-max): {start_z:.3f}mm")
# Mover a Z-min
gcmd.respond_info("Moviendo a Z-min...")
min_pos = list(start_pos)
min_pos[2] = z_min_cfg
self.toolhead.move(min_pos, self.speed)
self.toolhead.wait_moves()
gcmd.respond_info(f"En Z-min: {z_min_cfg:.3f}mm")
# Mover posición adicional según position_endstop
target_z = z_min_cfg - abs(z_endstop_pos)
gcmd.respond_info(f"Bajando {abs(z_endstop_pos):.3f}mm adicionales (position_endstop)...")
final_pos = list(min_pos)
final_pos[2] = target_z
self.toolhead.move(final_pos, self.speed)
self.toolhead.wait_moves()
# Calcular recorrido total
total_travel = start_z - target_z
config_travel = z_max_cfg - (z_min_cfg - abs(z_endstop_pos))
gcmd.respond_info("=== RESULTADOS DE MEDICIÓN ===")
gcmd.respond_info(f"📏 Recorrido REAL medido: {total_travel:.3f}mm")
gcmd.respond_info(f"📐 Recorrido CONFIG teórico: {config_travel:.3f}mm")
gcmd.respond_info(f"📊 Diferencia: {abs(total_travel - config_travel):.3f}mm")
if abs(total_travel - config_travel) > 0.5:
gcmd.respond_info("⚠️ ATENCIÓN: Diferencia > 0.5mm - Revisar montaje/configuración")
else:
gcmd.respond_info("✅ Medición dentro de tolerancia")
except Exception as e:
gcmd.respond_info(f"❌ Error durante medición: {e}")
cmd_ZMAX_HOME_help = "Realiza el homing del eje Z hacia Zmax usando el endstop de Y. Usa MEASURE=1 para medir el recorrido total de la plataforma."
def cmd_ZMAX_HOME(self, gcmd):
if not all([self.toolhead, self.y_endstop_wrapper, self.z_stepper, self.phoming]):
raise gcmd.error("ZMAX_HOME: Sistema no inicializado correctamente")
# Obtener parámetros
speed = gcmd.get_float('SPEED', self.speed, above=0.)
second_speed = gcmd.get_float('SECOND_SPEED', self.second_speed, above=0.)
retract_dist = gcmd.get_float('RETRACT_DIST', self.retract_dist, minval=0.)
final_retract = gcmd.get_int('FINAL_RETRACT', -1)
if final_retract == -1:
final_retract = self.final_retract
else:
final_retract = bool(final_retract)
measure_travel = gcmd.get_int('MEASURE', 0, minval=0, maxval=1)
# Máximo recorrido de búsqueda (por defecto 150mm, suficiente para cualquier impresora)
max_travel = gcmd.get_float('MAX_TRAVEL', 150.0, above=0.)
# Obtener configuración Z (solo para referencia, NO como límite)
z_max_cfg = self.z_stepper.get_range()[1]
# Verificar estado de Z
curtime = self.printer.get_reactor().monotonic()
z_homed = 'z' in self.toolhead.get_status(curtime)['homed_axes']
# Detectar si ya estamos en Z-max desde el inicio
if not z_homed and self.y_endstop.query_endstop(0):
pos = self.toolhead.get_position()
pos[2] = z_max_cfg # Establecer directamente en Z-max
self.toolhead.set_position(pos, homing_axes=('z',))
z_homed = True # Marcar como homeado
elif not z_homed:
# Homear Z temporalmente en posición actual
pos = self.toolhead.get_position()
self.toolhead.set_position(pos, homing_axes=('z',))
# Configurar modo absoluto
gcode_move = self.printer.lookup_object('gcode_move')
gcode_state = gcode_move.get_status()['absolute_coordinates']
self.gcode.run_script_from_command("G90")
# === EXPANDIR LÍMITES DE Z TEMPORALMENTE ===
# Esto es CRÍTICO: si position_max está mal configurado, el primer probe
# no alcanzará el endstop físico. Expandimos los límites para permitir
# el recorrido completo.
kin = self.toolhead.get_kinematics()
original_z_limits = kin.limits[2] # (min, max) para eje Z
current_pos = self.toolhead.get_position()
temp_z_max = current_pos[2] + max_travel
kin.update_limits(2, (original_z_limits[0], temp_z_max))
logging.info(f"ZMAX_HOME: Límites Z expandidos temporalmente: {original_z_limits} -> ({original_z_limits[0]}, {temp_z_max})")
try:
# Variables para detección de pérdida de pasos
initial_z_pos = None
detected_z_pos = None
# Obtener posición inicial
current_pos = self.toolhead.get_position()
initial_z_pos = current_pos[2]
# Detectar si el endstop ya está activado al inicio
endstop_initially_triggered = self.y_endstop.query_endstop(0)
# Solo considerar que está "ya activado" si estamos cerca de Z-max
if endstop_initially_triggered and current_pos[2] > (z_max_cfg - 10.0):
# Ya estamos en contacto con el endstop cerca de Z-max
detected_z_pos = current_pos[2]
else:
# Endstop no activado - hacer búsqueda rápida primero
# --- Búsqueda rápida continua ---
# IMPORTANTE: Usamos temp_z_max (límite expandido), NO z_max_cfg
movepos = list(current_pos)
movepos[2] = temp_z_max
try:
first_trigger_pos = self._continuous_probe_move(movepos, speed)
detected_z_pos = first_trigger_pos[2] # Guardar posición REAL donde se detectó
except self.printer.command_error as e:
if "Probe triggered prior to movement" in str(e):
# El endstop se activó inmediatamente al iniciar el movimiento
detected_z_pos = current_pos[2]
else:
raise
# La posición donde se detectó el endstop es el nuevo Z-max real
actual_z_max = detected_z_pos
logging.info(f"ZMAX_HOME: Endstop detectado en Z={actual_z_max:.3f}mm (z_max_cfg={z_max_cfg:.3f}mm)")
# --- "Engañar" a Klipper estableciendo Z en la posición detectada ---
fake_pos = self.toolhead.get_position()
fake_pos[2] = actual_z_max
self.toolhead.set_position(fake_pos)
# --- Retracción desde posición actual ---
retract_pos = list(fake_pos)
retract_pos[2] = actual_z_max - retract_dist
self.toolhead.move(retract_pos, self.retract_speed)
self.toolhead.wait_moves()
# Verificar que el endstop se liberó
self.toolhead.wait_moves()
query_time = self.toolhead.get_last_move_time()
if self.y_endstop.query_endstop(query_time):
raise gcmd.error(f"Endstop Y sigue activado después de retracción de {retract_dist}mm. "
f"Aumenta retract_dist o verifica el endstop.")
# --- Búsqueda fina continua ---
movepos = list(retract_pos)
movepos[2] = actual_z_max + 5.0 # Un poco más allá por si hay variación
try:
final_trigger_pos = self._continuous_probe_move(movepos, second_speed)
except self.printer.command_error as e:
if "Probe triggered prior to movement" in str(e):
# El endstop se activó inmediatamente, usar posición actual
current_pos = self.toolhead.get_position()
gcmd.respond_info(f"Y-endstop triggered at Z={current_pos[2]:.3f}mm")
final_trigger_pos = current_pos
else:
raise
# La posición final detectada es el Z-max definitivo
final_z_max = final_trigger_pos[2]
# --- Establecer posición final ---
final_pos = list(final_trigger_pos)
# Usar z_max_cfg si está cerca, sino usar el valor detectado
if abs(final_z_max - z_max_cfg) < 5.0:
final_pos[2] = z_max_cfg
else:
final_pos[2] = final_z_max
logging.warning(f"ZMAX_HOME: Z-max detectado ({final_z_max:.3f}mm) difiere de config ({z_max_cfg:.3f}mm)")
if not z_homed:
self.toolhead.set_position(final_pos, homing_axes=('z',))
else:
self.toolhead.set_position(final_pos)
gcmd.respond_info(f"Z position set to {final_pos[2]:.3f}mm")
# --- Retracción final opcional ---
if final_retract and retract_dist > 0:
retract_pos = list(final_pos)
retract_pos[2] -= retract_dist
z_min_cfg = self.z_stepper.get_range()[0]
if retract_pos[2] < z_min_cfg:
retract_pos[2] = z_min_cfg
self.toolhead.move(retract_pos, self.retract_speed)
self.toolhead.wait_moves()
# --- Medición del recorrido total (opcional) ---
if measure_travel:
self._measure_platform_travel(gcmd, final_pos, z_max_cfg)
finally:
# === RESTAURAR LÍMITES ORIGINALES DE Z ===
kin.update_limits(2, original_z_limits)
logging.info(f"ZMAX_HOME: Límites Z restaurados: {original_z_limits}")
# Restaurar estado Gcode
if not gcode_state:
self.gcode.run_script_from_command("G91")
cmd_Z_CALIBRATE_help = "Calibra el eje Z: hace G28 Z, sube hasta Zmax (endstop Y) y reporta la distancia real recorrida."
def cmd_Z_CALIBRATE(self, gcmd):
"""
Comando de calibración del eje Z:
1. Ejecuta G28 Z (homing normal hacia abajo con endstop Zmin)
2. Sube buscando el endstop Y (Zmax) sin perder la posición
3. Reporta la distancia real recorrida
"""
if not all([self.toolhead, self.y_endstop_wrapper, self.z_stepper, self.phoming]):
raise gcmd.error("Z_CALIBRATE: Sistema no inicializado correctamente")
# Parámetros opcionales
speed = gcmd.get_float('SPEED', self.speed, above=0.)
# Límite de seguridad: cuánto subir como máximo buscando el endstop
max_travel = gcmd.get_float('MAX_TRAVEL', 150.0, above=0.)
gcmd.respond_info("=== Z_CALIBRATE: Iniciando calibración del eje Z ===")
# Guardar estado de coordenadas
gcode_move = self.printer.lookup_object('gcode_move')
gcode_state = gcode_move.get_status()['absolute_coordinates']
self.gcode.run_script_from_command("G90")
try:
# --- PASO 1: Hacer homing normal hacia abajo (G28 Z) ---
gcmd.respond_info("Paso 1: Ejecutando G28 Z (homing hacia Zmin)...")
self.gcode.run_script_from_command("G28 Z")
self.toolhead.wait_moves()
# Obtener posición después del homing (debería ser position_endstop)
pos_after_homing = self.toolhead.get_position()
z_start = pos_after_homing[2]
gcmd.respond_info(f" Posición después de G28 Z: {z_start:.3f}mm")
# --- PASO 2: Subir buscando el endstop Y (Zmax) ---
gcmd.respond_info(f"Paso 2: Subiendo hasta encontrar endstop Zmax (max {max_travel:.1f}mm)...")
# Verificar si el endstop Y ya está activado (no debería estarlo abajo)
self.toolhead.wait_moves()
query_time = self.toolhead.get_last_move_time()
if self.y_endstop.query_endstop(query_time):
gcmd.respond_info(" ADVERTENCIA: Endstop Y ya activado en posición inicial!")
gcmd.respond_info(" Esto indica un problema de cableado o configuración.")
return
# Expandir temporalmente los límites de Z para permitir el movimiento
# Guardamos los límites originales para restaurarlos después
kin = self.toolhead.get_kinematics()
original_z_limits = kin.limits[2] # (min, max) para eje Z
# Establecer nuevos límites temporales: desde posición actual hasta max_travel
current_pos = self.toolhead.get_position()
temp_z_max = current_pos[2] + max_travel
kin.update_limits(2, (original_z_limits[0], temp_z_max))
gcmd.respond_info(f" Límites Z temporales: {original_z_limits[0]:.1f} - {temp_z_max:.1f}mm")
# Calcular posición objetivo
target_pos = list(current_pos)
target_pos[2] = temp_z_max
# Usar probing_move para subir hasta que se active el endstop Y
probing_error = None
try:
trigger_pos = self.phoming.probing_move(
self.y_endstop_wrapper,
target_pos,
speed
)
# La distancia recorrida es la diferencia entre donde activó y donde empezó
z_end = trigger_pos[2]
travel_distance = z_end - z_start
gcmd.respond_info(f" Endstop Y activado en Z={z_end:.3f}mm (recorrido: {travel_distance:.3f}mm)")
except self.printer.command_error as e:
if "No trigger" in str(e):
probing_error = gcmd.error(
f"El endstop Y (Zmax) no se activó después de {max_travel:.1f}mm. "
"Aumenta MAX_TRAVEL o verifica el endstop."
)
elif "Probe triggered prior to movement" in str(e):
# El endstop se activó inmediatamente
travel_distance = 0.0
z_end = z_start
gcmd.respond_info(f" Endstop Y activado inmediatamente en Z={z_end:.3f}mm")
else:
probing_error = e
# Restaurar límites originales de Z (siempre)
kin.update_limits(2, original_z_limits)
# Si hubo error, lanzarlo ahora
if probing_error:
raise probing_error
# --- PASO 3: Calcular y reportar resultados ---
gcmd.respond_info("")
gcmd.respond_info("=== RESULTADOS DE CALIBRACIÓN ===")
gcmd.respond_info(f" Posición inicial (Zmin): {z_start:.3f}mm")
gcmd.respond_info(f" Posición final (Zmax): {z_end:.3f}mm")
gcmd.respond_info(f" ----------------------------------------")
gcmd.respond_info(f" DISTANCIA RECORRIDA: {travel_distance:.3f}mm")
gcmd.respond_info("")
# Obtener configuración actual para comparar
z_min_cfg, z_max_cfg = self.z_stepper.get_range()
config_range = z_max_cfg - z_min_cfg
gcmd.respond_info(f" Configuración actual:")
gcmd.respond_info(f" position_min: {z_min_cfg:.3f}mm")
gcmd.respond_info(f" position_max: {z_max_cfg:.3f}mm")
gcmd.respond_info(f" Rango config: {config_range:.3f}mm")
gcmd.respond_info("")
# Sugerencia de nuevo position_max
# El nuevo position_max debería ser: position_endstop + distancia_recorrida
# Pero position_endstop ya está incluido en z_start después del G28
suggested_max = z_start + travel_distance
diff = suggested_max - z_max_cfg
if abs(diff) > 0.5:
gcmd.respond_info(f" Diferencia con config actual: {diff:+.3f}mm")
gcmd.respond_info("")
gcmd.respond_info("=== ACTUALIZANDO CONFIGURACIÓN ===")
# --- Actualizar NanoDLP CustomValues.ZLength ---
nanodlp_updated = self._update_nanodlp_zlength(gcmd, suggested_max)
# --- Actualizar Klipper printer.cfg position_max ---
klipper_updated = self._update_klipper_position_max(gcmd, suggested_max)
if klipper_updated:
gcmd.respond_info("")
gcmd.respond_info("=== REINICIANDO KLIPPER ===")
gcmd.respond_info(" La nueva configuración se cargará automáticamente...")
# Programar reinicio después de responder
reactor = self.printer.get_reactor()
reactor.register_callback(lambda e: self._restart_klipper())
elif not nanodlp_updated and not klipper_updated:
gcmd.respond_info("")
gcmd.respond_info(" ⚠ No se pudo actualizar ninguna configuración")
else:
gcmd.respond_info(f" La configuración actual es correcta (diff: {diff:+.3f}mm)")
gcmd.respond_info(" No se requieren cambios.")
finally:
# Restaurar estado Gcode
if not gcode_state:
self.gcode.run_script_from_command("G91")
def _update_nanodlp_zlength(self, gcmd, new_zlength):
"""Actualiza CustomValues.ZLength en NanoDLP modificando machine.json directamente"""
import json
machine_json_path = "/home/pi/nanodlp/db/machine.json"
gcmd.respond_info(f" Actualizando NanoDLP ZLength a {new_zlength:.3f}mm...")
try:
# Leer archivo JSON actual
with open(machine_json_path, 'r') as f:
machine_config = json.load(f)
# Asegurar que CustomValues existe
if 'CustomValues' not in machine_config:
machine_config['CustomValues'] = {}
# Actualizar ZLength (como string, igual que los demás valores)
old_value = machine_config['CustomValues'].get('ZLength', 'N/A')
machine_config['CustomValues']['ZLength'] = f"{new_zlength:.3f}"
# Escribir archivo actualizado
with open(machine_json_path, 'w') as f:
json.dump(machine_config, f, indent='\t')
gcmd.respond_info(f" ✓ NanoDLP ZLength actualizado: {old_value}{new_zlength:.3f}mm")
return True
except Exception as e:
gcmd.respond_info(f" ✗ Error actualizando NanoDLP: {e}")
return False
def _update_klipper_position_max(self, gcmd, new_position_max):
"""Actualiza position_max en printer.cfg"""
import re
config_path = "/home/pi/printer_data/config/printer.cfg"
gcmd.respond_info(f" Actualizando Klipper position_max a {new_position_max:.3f}mm...")
try:
# Leer archivo actual
with open(config_path, 'r') as f:
content = f.read()
# Buscar y reemplazar position_max en la sección [stepper_z]
# Patrón: position_max: <número>
pattern = r'(\[stepper_z\].*?position_max\s*:\s*)(\d+\.?\d*)'
def replacer(match):
return f"{match.group(1)}{new_position_max:.3f}"
new_content, count = re.subn(pattern, replacer, content, count=1, flags=re.DOTALL)
if count == 0:
gcmd.respond_info(f" ✗ No se encontró position_max en [stepper_z]")
return False
# Escribir archivo actualizado
with open(config_path, 'w') as f:
f.write(new_content)
gcmd.respond_info(f" ✓ Klipper position_max actualizado correctamente")
return True
except Exception as e:
gcmd.respond_info(f" ✗ Error actualizando printer.cfg: {e}")
return False
def _restart_klipper(self):
"""Reinicia Klipper para cargar la nueva configuración"""
import subprocess
try:
subprocess.run(['sudo', 'systemctl', 'restart', 'klipper.service'],
timeout=5, capture_output=True)
except Exception as e:
logging.error(f"Error reiniciando Klipper: {e}")
def load_config(config):
return ZMaxHomingAlt(config)