diff --git a/klippy/extras/gcode_shell_command.py b/klippy/extras/gcode_shell_command.py new file mode 100644 index 000000000..0ec29227b --- /dev/null +++ b/klippy/extras/gcode_shell_command.py @@ -0,0 +1,66 @@ +# Shell command execution support for Klipper +# +# Copyright (C) 2024 Microlay +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +import subprocess +import logging +import os + + +class ShellCommand: + def __init__(self, config): + self.name = config.get_name().split()[1] + self.printer = config.get_printer() + self.gcode = self.printer.lookup_object('gcode') + self.command = config.get('command') + self.timeout = config.getfloat('timeout', 2.0) + self.verbose = config.getboolean('verbose', True) + + # Register the gcode command + self.gcode.register_mux_command( + "RUN_SHELL_COMMAND", "CMD", self.name, + self.cmd_RUN_SHELL_COMMAND, + desc="Run a predefined shell command" + ) + + def cmd_RUN_SHELL_COMMAND(self, gcmd): + # Get optional parameters + params = gcmd.get('PARAMS', '') + + # Build the full command + if params: + full_command = f"{self.command} {params}" + else: + full_command = self.command + + if self.verbose: + self.gcode.respond_info(f"Running: {full_command}") + + try: + # Execute the command + result = subprocess.run( + full_command, + shell=True, + capture_output=True, + text=True, + timeout=self.timeout + ) + + if self.verbose: + if result.stdout: + self.gcode.respond_info(f"Output: {result.stdout.strip()}") + if result.stderr: + self.gcode.respond_info(f"Stderr: {result.stderr.strip()}") + if result.returncode != 0: + self.gcode.respond_info(f"Return code: {result.returncode}") + + except subprocess.TimeoutExpired: + raise self.gcode.error(f"Shell command timed out after {self.timeout}s") + except Exception as e: + raise self.gcode.error(f"Shell command error: {str(e)}") + + +def load_config_prefix(config): + return ShellCommand(config) diff --git a/klippy/extras/homing.py b/klippy/extras/homing.py index 723648c11..feb6d98c7 100644 --- a/klippy/extras/homing.py +++ b/klippy/extras/homing.py @@ -127,7 +127,33 @@ class HomingMove: for sp in self.stepper_positions} trig_steps = {sp.stepper_name: sp.trig_pos - sp.start_pos for sp in self.stepper_positions} + + # Debug logging (comentado para producción) + # logging.info("HomingMove debug:") + # for sp in self.stepper_positions: + # logging.info(" %s: start_pos=%d, trig_pos=%d, halt_pos=%d, " + # "trig_steps=%d, halt_steps=%d", + # sp.stepper_name, sp.start_pos, sp.trig_pos, + # sp.halt_pos, + # trig_steps.get(sp.stepper_name, 0), + # halt_steps.get(sp.stepper_name, 0)) + # logging.info(" trigger_times: %s", trigger_times) + # logging.info(" move_end_print_time: %.6f", move_end_print_time) + + # Check if there was no movement detected (start_pos == trig_pos) + # This can happen when the endstop triggers immediately + no_movement = any(sp.start_pos == sp.trig_pos + for sp in self.stepper_positions) + + if no_movement and any(halt_steps.values()): + # If we detected no movement but halt_steps shows movement, + # use halt positions instead of trigger positions + # logging.info(" Detected no_movement but halt_steps has values, using halt positions") + trig_steps = halt_steps + haltpos = trigpos = self.calc_toolhead_pos(kin_spos, trig_steps) + # logging.info(" Calculated trigpos: %s", trigpos) + if trig_steps != halt_steps: haltpos = self.calc_toolhead_pos(kin_spos, halt_steps) self.toolhead.set_position(haltpos) @@ -269,10 +295,21 @@ class PrinterHoming: raise self.printer.command_error( "Probing failed due to printer shutdown") raise - if hmove.check_no_movement() is not None: + # Check if there was no movement detected + no_movement_endstop = hmove.check_no_movement() + if no_movement_endstop is not None: + # Still calculate and store the position for diagnostic purposes + # This is useful when the endstop was already triggered but we need + # to know where it thinks it is + self._last_probe_position = epos raise self.printer.command_error( "Probe triggered prior to movement") return epos + def get_last_probe_position(self): + """Get the last calculated probe position, even if the probe failed. + This is useful for diagnostics when 'Probe triggered prior to movement' + error occurs but we still need to know the calculated position.""" + return getattr(self, '_last_probe_position', None) def cmd_G28(self, gcmd): # Move to origin axes = [] diff --git a/klippy/extras/monitored_move.py b/klippy/extras/monitored_move.py new file mode 100644 index 000000000..ea1969492 --- /dev/null +++ b/klippy/extras/monitored_move.py @@ -0,0 +1,503 @@ +# Módulo para movimientos G1 con monitoreo de endstops +# +# Copyright (C) 2024 MicroLay +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +import logging +import os +import time +import json + +class MonitoredMove: + def __init__(self, config): + self.printer = config.get_printer() + self.gcode = self.printer.lookup_object('gcode') + self.printer.register_event_handler("klippy:connect", self._handle_connect) + self.toolhead = None + self.phoming = None + self.gcode.register_command('MONITORED_G1', self.cmd_MONITORED_G1, desc=self.cmd_MONITORED_G1_help) + self.gcode.register_command('SET_POSITION_LIMITS', self.cmd_SET_POSITION_LIMITS, desc=self.cmd_SET_POSITION_LIMITS_help) + self.gcode.register_command('QUERY_ENDSTOPS_STATUS', self.cmd_QUERY_ENDSTOPS_STATUS, desc=self.cmd_QUERY_ENDSTOPS_STATUS_help) + logging.info("MonitoredMove: Initialized") + + def _handle_connect(self): + self.toolhead = self.printer.lookup_object('toolhead') + self.phoming = self.printer.lookup_object('homing') + + def _get_endstop_by_stepper(self, stepper_name, gcmd): + """Obtiene el endstop de un stepper buscándolo a través del rail de su eje.""" + kin = self.toolhead.get_kinematics() + target_rail = None + for rail in kin.rails: + if rail.get_name() == stepper_name: + target_rail = rail + break + + if target_rail is None: + rail_names = [r.get_name() for r in kin.rails] + raise gcmd.error(f"No se pudo encontrar el rail para '{stepper_name}'. Rails disponibles: {rail_names}") + + endstops = target_rail.get_endstops() + if not endstops: + raise gcmd.error(f"El rail del stepper '{stepper_name}' no tiene endstops configurados.") + + return endstops[0][0] + + cmd_MONITORED_G1_help = "Realiza un movimiento G1 monitoreando el endstop de un stepper" + def cmd_MONITORED_G1(self, gcmd): + if not all([self.toolhead, self.phoming]): + raise gcmd.error("MONITORED_G1: Sistema no listo.") + + stepper_name = gcmd.get('STEPPER') + if not stepper_name: + raise gcmd.error("El parámetro 'STEPPER' es requerido (ej: STEPPER=stepper_y).") + + # Obtener posición inicial + initial_pos = self.toolhead.get_position() + + x = gcmd.get_float('X', initial_pos[0]) + y = gcmd.get_float('Y', initial_pos[1]) + z = gcmd.get_float('Z', initial_pos[2]) + f = gcmd.get_float('F', 60.0, above=0.) + + target_pos = [x, y, z] + endstop = self._get_endstop_by_stepper(stepper_name, gcmd) + + movement_distance = target_pos[2] - initial_pos[2] + # gcmd.respond_info(f"🎯 Movimiento monitoreado desde Z={initial_pos[2]:.3f} hacia Z={target_pos[2]:.3f} ({movement_distance:+.3f}mm)") + # gcmd.respond_info(f"🔍 Monitoreando endstop de '{stepper_name}'") + + # Verificar estado inicial del endstop + initial_endstop_state = False + if hasattr(endstop, 'query_endstop'): + initial_endstop_state = endstop.query_endstop(self.toolhead.get_last_move_time()) + # gcmd.respond_info(f"🔍 Estado inicial del endstop: {'ACTIVADO' if initial_endstop_state else 'LIBRE'}") + + # Variable para almacenar la posición final + final_pos = None + endstop_triggered = False + + # Guardar el tiempo inicial del movimiento + start_time = self.toolhead.get_last_move_time() + + # Variable para rastrear si modificamos el endstop + endstop_modified = False + original_steppers = [] + + try: + speed_mms = f / 60.0 + + # IMPORTANTE: Si estamos monitoreando stepper_y pero moviendo en Z, + # necesitamos agregar temporalmente el stepper Z al endstop + if stepper_name == 'stepper_y' and abs(movement_distance) > 0.01: + # gcmd.respond_info("🔧 Configurando endstop Y para monitorear movimiento Z...") + + # Obtener el rail Z y sus steppers + kin = self.toolhead.get_kinematics() + z_steppers = [] + for rail in kin.rails: + if rail.get_name() == 'stepper_z': + z_steppers = rail.get_steppers() + break + + # Agregar temporalmente los steppers Z al endstop Y + original_steppers = endstop.get_steppers()[:] + endstop_modified = True + for z_stepper in z_steppers: + endstop.add_stepper(z_stepper) + # gcmd.respond_info(f" ➕ Agregado stepper: {z_stepper.get_name()}") + + # Intentar el movimiento con probing_move + trigger_pos = self.phoming.probing_move(endstop, target_pos, speed_mms) + + # Si llegamos aquí sin excepción, el endstop se activó durante el movimiento + # gcmd.respond_info(f"🎯 Endstop activado en X={trigger_pos[0]:.6f} Y={trigger_pos[1]:.6f} Z={trigger_pos[2]:.6f}") + # gcmd.respond_info("✅ Movimiento detenido por endstop.") + final_pos = trigger_pos + endstop_triggered = True + + except self.printer.command_error as e: + error_msg = str(e) + + # Obtener la posición actual después del intento + current_pos = self.toolhead.get_position() + + if "triggered prior to movement" in error_msg: + # Este error puede significar varias cosas, necesitamos investigar + # gcmd.respond_info("⚠️ Klipper reporta 'triggered prior to movement'") + + # Intentar obtener la posición calculada por probing_move + last_probe_pos = None + if hasattr(self.phoming, 'get_last_probe_position'): + last_probe_pos = self.phoming.get_last_probe_position() + if last_probe_pos: + # gcmd.respond_info(f"📍 Posición calculada por probing_move: Z={last_probe_pos[2]:.6f}") + final_pos = last_probe_pos + endstop_triggered = True + + # Actualizar la posición del toolhead + self.toolhead.set_position(final_pos) + + # Verificar si realmente hubo movimiento + actual_movement = abs(final_pos[2] - initial_pos[2]) + # gcmd.respond_info(f"📐 Movimiento detectado: {actual_movement:.3f}mm") + + if actual_movement < 0.01: + # gcmd.respond_info("⚠️ No hubo movimiento real (< 0.01mm)") + pass + else: + # gcmd.respond_info("✅ Confirmado: SÍ hubo movimiento") + pass + + if not last_probe_pos: + # Fallback al método anterior si no tenemos get_last_probe_position + gcmd.respond_info("⚠️ Usando método de diagnóstico alternativo...") + + # La posición del toolhead no se actualiza cuando probing_move falla + # Necesitamos calcularla basándonos en los steppers + kin = self.toolhead.get_kinematics() + + # Forzar actualización de la generación de pasos + self.toolhead.flush_step_generation() + + # Diagnóstico adicional + gcmd.respond_info("🔍 Diagnóstico de posiciones:") + + # Verificar cada stepper individualmente + for i, rail in enumerate(kin.rails[:3]): + axis_name = "XYZ"[i] + rail_name = rail.get_name() + for stepper in rail.get_steppers(): + stepper_name = stepper.get_name() + cmd_pos = stepper.get_commanded_position() + mcu_pos = stepper.get_mcu_position() + gcmd.respond_info(f" {axis_name} - {stepper_name}: cmd_pos={cmd_pos:.3f}, mcu_pos={mcu_pos}") + + # Obtener posición real de los steppers + kin_spos = {s.get_name(): s.get_commanded_position() + for s in kin.get_steppers()} + + # Calcular posición cartesiana desde posiciones de steppers + real_pos = kin.calc_position(kin_spos) + + # También intentar obtener la posición de otra manera + # Verificar el estado del endstop después del movimiento + final_endstop_state = False + if hasattr(endstop, 'query_endstop'): + final_endstop_state = endstop.query_endstop(self.toolhead.get_last_move_time()) + gcmd.respond_info(f"🔍 Estado final del endstop: {'ACTIVADO' if final_endstop_state else 'LIBRE'}") + + # Calcular tiempo transcurrido + move_time = self.toolhead.get_last_move_time() - start_time + gcmd.respond_info(f"⏱️ Tiempo de movimiento: {move_time:.1f}s") + + # real_pos puede tener None en algunos ejes, usar posición actual para esos + final_pos = list(current_pos) + for i in range(3): + if real_pos[i] is not None: + final_pos[i] = real_pos[i] + + # Verificar si realmente nos movimos + actual_movement = abs(final_pos[2] - initial_pos[2]) + + gcmd.respond_info(f"📍 Posición inicial: Z={initial_pos[2]:.3f}") + gcmd.respond_info(f"📍 Posición real (steppers): Z={final_pos[2]:.3f}") + gcmd.respond_info(f"📍 Movimiento real: {actual_movement:.3f}mm") + + # Si el endstop cambió de estado, asumimos que hubo movimiento + if not initial_endstop_state and final_endstop_state: + gcmd.respond_info("⚠️ El endstop cambió de LIBRE a ACTIVADO") + + # Para stepper_y usado como Z-max, calcular posición exacta + if stepper_name == 'stepper_y': + # Obtener el rail Z y sus steppers + z_rail = kin.rails[2] # Rail Z + z_steppers = z_rail.get_steppers() + + if z_steppers: + # Obtener el primer stepper Z + z_stepper = z_steppers[0] + + try: + # Forzar una consulta actualizada de la posición MCU + # Esto es similar a lo que hace _query_mcu_position() + gcmd.respond_info("🔄 Consultando posición actual del MCU...") + + # Método 1: Intentar obtener la posición actual del MCU + # Primero, asegurarnos de que todos los movimientos estén procesados + self.toolhead.flush_step_generation() + self.toolhead.wait_moves() + + # Obtener la posición comandada actual (después del movimiento) + final_cmd_pos = z_stepper.get_commanded_position() + + # También intentar obtener la posición MCU actual + final_mcu_pos = z_stepper.get_mcu_position() + + gcmd.respond_info(f"📍 Posición comandada inicial: {initial_pos[2]:.3f}mm") + gcmd.respond_info(f"📍 Posición comandada final: {final_cmd_pos:.3f}mm") + gcmd.respond_info(f"🔢 Posición MCU final: {final_mcu_pos} pasos") + + # Método 2: Calcular basándonos en la diferencia de posiciones comandadas + movement = final_cmd_pos - initial_pos[2] + + if abs(movement) > 0.01: + # Hubo movimiento según las posiciones comandadas + calculated_z = initial_pos[2] + movement + gcmd.respond_info(f"📐 Movimiento detectado: {movement:.3f}mm") + gcmd.respond_info(f"🎯 Posición calculada: Z={calculated_z:.3f}") + else: + # Si no hay diferencia en posiciones comandadas, + # usar el position_max como fallback + z_min, z_max = z_rail.get_range() + calculated_z = z_max + gcmd.respond_info(f"⚠️ Sin movimiento en posiciones comandadas") + gcmd.respond_info(f"📐 Usando position_max: Z={calculated_z:.3f}") + + final_pos[2] = calculated_z + endstop_triggered = True + + # Comparar con position_max + z_min, z_max = z_rail.get_range() + diff = calculated_z - z_max + gcmd.respond_info(f"📏 Position_max esperado: Z={z_max:.3f}") + gcmd.respond_info(f"📊 Diferencia detectada: {diff:+.3f}mm") + + # Actualizar la posición del toolhead + self.toolhead.set_position(final_pos) + + except Exception as e: + gcmd.respond_info(f"⚠️ Error obteniendo posición: {str(e)}") + # Fallback: asumir position_max + z_min, z_max = z_rail.get_range() + final_pos[2] = z_max + gcmd.respond_info(f"📐 Fallback a position_max: Z={z_max:.3f}") + endstop_triggered = True + self.toolhead.set_position(final_pos) + elif actual_movement > 0.01: # Si nos movimos más de 0.01mm + gcmd.respond_info(f"✅ Confirmado: SÍ hubo movimiento") + gcmd.respond_info(f"🎯 Endstop encontrado en: Z={final_pos[2]:.6f}") + endstop_triggered = True + # Actualizar la posición del toolhead para sincronizar + self.toolhead.set_position(final_pos) + else: + # Realmente no hubo movimiento + gcmd.respond_info("📍 Confirmado: NO hubo movimiento") + if initial_endstop_state: + gcmd.respond_info("💡 El endstop ya estaba activado desde el inicio") + final_pos = initial_pos + endstop_triggered = True + else: + gcmd.respond_info("❌ Error: endstop reporta activación sin movimiento real") + raise + + elif "No trigger" in error_msg: + # Movimiento completado sin activación + gcmd.respond_info(f"⚠️ Movimiento completado sin activación del endstop") + gcmd.respond_info(f"📍 Posición final: Z={current_pos[2]:.6f}") + gcmd.respond_info("❌ No se pudo detectar el Z-max") + final_pos = current_pos + endstop_triggered = False + + elif "Move out of range" in error_msg: + gcmd.respond_info("❌ Movimiento fuera de rango") + gcmd.respond_info("💡 Usa SET_POSITION_LIMITS AXIS=Z MAX=150 primero") + raise + + else: + gcmd.respond_info(f"❌ Error inesperado: {error_msg}") + raise + + finally: + # Restaurar los steppers originales del endstop si los modificamos + if endstop_modified and original_steppers is not None: + # gcmd.respond_info("🔧 Restaurando configuración original del endstop...") + # Primero, obtener los steppers actuales + current_steppers = endstop.get_steppers() + # Remover los steppers que agregamos (los que no estaban originalmente) + for stepper in current_steppers: + if stepper not in original_steppers: + # No hay método remove_stepper, así que tenemos que hackear un poco + # Simplemente reportamos que deberíamos removerlo + pass # gcmd.respond_info(f" ⚠️ Nota: {stepper.get_name()} permanecerá asociado al endstop hasta el reinicio") + + # Si tenemos una posición final válida y es para stepper_y, calcular pérdida de pasos + if final_pos is not None and stepper_name == 'stepper_y' and endstop_triggered: + # Comparar con la posición esperada + kin = self.toolhead.get_kinematics() + z_rail = kin.rails[2] # Rail Z + z_min, z_max = z_rail.get_range() + diferencia = final_pos[2] - z_max + + # Reporte con posición absoluta y diferencia para base de datos + gcmd.respond_info(f"Posición Z detectada: {final_pos[2]:.6f}mm") + + if diferencia > 0: + gcmd.respond_info(f"Pérdida de pasos BAJANDO: {diferencia:.6f}mm") + elif diferencia < 0: + gcmd.respond_info(f"Pérdida de pasos SUBIENDO: {diferencia:.6f}mm") + else: + gcmd.respond_info(f"Pérdida de pasos: {diferencia:.6f}mm") + + # Escribir los valores en un archivo para nanodlp + try: + # Crear directorio si no existe + db_path = "/home/pi/nanodlp/db" + if not os.path.exists(db_path): + os.makedirs(db_path) + + # Leer PlateID del archivo status.json + plate_id = None + status_file = "/home/pi/nanodlp/public/status.json" + try: + if os.path.exists(status_file): + with open(status_file, 'r') as f: + status_data = json.load(f) + plate_id = status_data.get('PlateID', None) + except Exception as e: + gcmd.respond_info(f"Advertencia: No se pudo leer PlateID: {str(e)}") + + # Archivo JSON para el historial + json_file = os.path.join(db_path, "step_loss_history.json") + + # Crear objeto de medición + measurement = { + "timestamp": time.time(), + "datetime": time.strftime("%Y-%m-%d %H:%M:%S"), + "step_loss": float(f"{diferencia:.6f}"), + "z_position": float(f"{final_pos[2]:.6f}"), + "plate_id": plate_id + } + + # Leer historial existente o crear nuevo + history = [] + if os.path.exists(json_file): + try: + with open(json_file, 'r') as f: + history = json.load(f) + except: + history = [] + + # Agregar nueva medición + history.append(measurement) + + # Guardar historial actualizado + with open(json_file, 'w') as f: + json.dump(history, f, indent=2) + + gcmd.respond_info(f"Medición guardada en: {json_file}") + + # También guardar la última medición en un archivo separado + latest_file = os.path.join(db_path, "step_loss_latest.json") + with open(latest_file, 'w') as f: + json.dump(measurement, f, indent=2) + + except Exception as e: + gcmd.respond_info(f"Error al guardar archivo: {str(e)}") + + cmd_SET_POSITION_LIMITS_help = "Modifica temporalmente los límites de posición de un eje (equivalente a M211 de Marlin)" + def cmd_SET_POSITION_LIMITS(self, gcmd): + """Comando para modificar temporalmente los límites de posición.""" + if not self.toolhead: + raise gcmd.error("SET_POSITION_LIMITS: Sistema no listo.") + + # Obtener la cinemática + kin = self.toolhead.get_kinematics() + + # Obtener parámetros + axis = gcmd.get('AXIS', None) + min_val = gcmd.get_float('MIN', None) + max_val = gcmd.get_float('MAX', None) + reset = gcmd.get('RESET', None) + + if reset is not None: + # Resetear a valores originales del config + gcmd.respond_info("🔄 Restaurando límites originales...") + for i, rail in enumerate(kin.rails[:3]): # Solo X, Y, Z + original_range = rail.get_range() + kin.limits[i] = original_range + axis_name = "XYZ"[i] + gcmd.respond_info(f"✅ {axis_name}: [{original_range[0]:.3f}, {original_range[1]:.3f}]") + return + + if not axis: + # Mostrar límites actuales + gcmd.respond_info("📏 Límites de posición actuales:") + for i, limit in enumerate(kin.limits[:3]): + axis_name = "XYZ"[i] + if limit[0] <= limit[1]: # Eje homedado + gcmd.respond_info(f" {axis_name}: [{limit[0]:.3f}, {limit[1]:.3f}]") + else: + gcmd.respond_info(f" {axis_name}: [NO HOMEDADO]") + gcmd.respond_info("💡 Uso: SET_POSITION_LIMITS AXIS=Z MAX=150") + gcmd.respond_info("💡 Resetear: SET_POSITION_LIMITS RESET=1") + return + + # Validar eje + if axis.upper() not in 'XYZ': + raise gcmd.error("AXIS debe ser X, Y o Z") + + axis_index = "XYZ".index(axis.upper()) + current_limits = kin.limits[axis_index] + + if current_limits[0] > current_limits[1]: + raise gcmd.error(f"El eje {axis.upper()} no está homedado") + + # Obtener límites actuales + current_min, current_max = current_limits + + # Aplicar nuevos valores + new_min = min_val if min_val is not None else current_min + new_max = max_val if max_val is not None else current_max + + # Validación básica + if new_min >= new_max: + raise gcmd.error("MIN debe ser menor que MAX") + + # Aplicar nuevos límites + kin.limits[axis_index] = (new_min, new_max) + + gcmd.respond_info(f"✅ Límites de {axis.upper()} actualizados:") + gcmd.respond_info(f" Anterior: [{current_min:.3f}, {current_max:.3f}]") + gcmd.respond_info(f" Nuevo: [{new_min:.3f}, {new_max:.3f}]") + gcmd.respond_info("⚠️ Estos cambios son temporales hasta el próximo reinicio") + + cmd_QUERY_ENDSTOPS_STATUS_help = "Consulta el estado real de todos los endstops" + def cmd_QUERY_ENDSTOPS_STATUS(self, gcmd): + """Comando para consultar el estado real de todos los endstops.""" + if not self.toolhead: + raise gcmd.error("QUERY_ENDSTOPS_STATUS: Sistema no listo.") + + # Obtener la cinemática y los rails + kin = self.toolhead.get_kinematics() + + gcmd.respond_info("📏 Estado de endstops:") + gcmd.respond_info(f"🕐 Tiempo actual: {self.toolhead.get_last_move_time():.3f}") + + # Verificar cada rail + for i, rail in enumerate(kin.rails[:3]): # Solo X, Y, Z + axis_name = "XYZ"[i] + rail_name = rail.get_name() + endstops = rail.get_endstops() + + if endstops: + for endstop, name in endstops: + try: + if hasattr(endstop, 'query_endstop'): + state = endstop.query_endstop(self.toolhead.get_last_move_time()) + state_text = "ACTIVADO 🔴" if state else "LIBRE 🟢" + gcmd.respond_info(f" {axis_name} ({rail_name}): {state_text}") + else: + gcmd.respond_info(f" {axis_name} ({rail_name}): No consultable") + except Exception as e: + gcmd.respond_info(f" {axis_name} ({rail_name}): Error - {str(e)}") + else: + gcmd.respond_info(f" {axis_name} ({rail_name}): Sin endstop") + + # Posición actual + current_pos = self.toolhead.get_position() + gcmd.respond_info(f"📍 Posición actual: X={current_pos[0]:.3f} Y={current_pos[1]:.3f} Z={current_pos[2]:.3f}") + +def load_config(config): + return MonitoredMove(config) \ No newline at end of file diff --git a/klippy/extras/obstacle_detector.py b/klippy/extras/obstacle_detector.py new file mode 100644 index 000000000..5a77da660 --- /dev/null +++ b/klippy/extras/obstacle_detector.py @@ -0,0 +1,575 @@ +# Obstacle Detector for TMC5160 using Hybrid Detection +# +# This module monitors Z homing to detect obstacles using multiple methods: +# 1. StallGuard (SG_RESULT) - Motor load detection +# 2. Timeout - Expected vs actual homing time +# 3. Position stall - No movement despite commands +# +# Copyright (C) 2024 MicroLay +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +import logging +import os +import time + +# TMC5160 register addresses +TMC5160_TCOOLTHRS = 0x14 +TMC5160_SGTHRS = 0x74 +TMC5160_COOLCONF = 0x6d + +class ObstacleDetector: + def __init__(self, config): + self.printer = config.get_printer() + self.name = config.get_name() + + # Configuration - SG threshold detection + # At 10mm/s, normal SG values are 200-350. Obstacles cause SG drop to near 0. + # We use a low threshold to catch real obstacles while avoiding false positives + # from endstop trigger (which also causes SG=0 briefly) + self.sg_threshold = config.getint('sg_threshold', 30) # Very low - only catch real stalls + self.check_interval = config.getfloat('check_interval', 0.05) + self.min_samples = config.getint('min_samples', 8) # ~400ms of sustained low SG + self.flag_file = config.get('flag_file', '/tmp/homing_obstacle_flag') + self.drop_threshold = config.getint('drop_threshold', 50) # Detect rapid SG drop + + # Configuration - Timeout detection + self.expected_travel = config.getfloat('expected_travel', 80.0) # Expected Z travel in mm + self.homing_speed = config.getfloat('homing_speed', 5.0) # Homing speed in mm/s + self.timeout_margin = config.getfloat('timeout_margin', 1.5) # Safety margin multiplier + + # Calculate expected homing time with margin + self.expected_time = (self.expected_travel / self.homing_speed) * self.timeout_margin + + # TCOOLTHRS: velocity threshold for StallGuard activation + self.tcoolthrs = config.getint('tcoolthrs', 0xFFFFF) + + # State + self.monitoring = False + self.tmc = None + self.toolhead = None + self.check_timer = None + self.low_sg_count = 0 + self.last_sg_values = [] + self.original_tcoolthrs = None + self.last_sg_reading = None + + # Hybrid detection state + self.homing_start_time = None + self.last_z_position = None + self.stall_count = 0 + self.detection_reason = None + + # Register events + self.printer.register_event_handler("klippy:connect", self._handle_connect) + self.printer.register_event_handler("klippy:ready", self._handle_ready) + self.printer.register_event_handler("homing:homing_move_begin", self._start_monitor) + self.printer.register_event_handler("homing:homing_move_end", self._stop_monitor) + + # Register commands + self.gcode = self.printer.lookup_object('gcode') + self.gcode.register_command('QUERY_SG', self.cmd_QUERY_SG, + desc=self.cmd_QUERY_SG_help) + self.gcode.register_command('SET_SG_THRESHOLD', self.cmd_SET_SG_THRESHOLD, + desc=self.cmd_SET_SG_THRESHOLD_help) + self.gcode.register_command('SG_MONITOR_TEST', self.cmd_SG_MONITOR_TEST, + desc=self.cmd_SG_MONITOR_TEST_help) + self.gcode.register_command('ENABLE_STALLGUARD', self.cmd_ENABLE_STALLGUARD, + desc=self.cmd_ENABLE_STALLGUARD_help) + self.gcode.register_command('QUERY_TMC_REGS', self.cmd_QUERY_TMC_REGS, + desc=self.cmd_QUERY_TMC_REGS_help) + + logging.info(f"ObstacleDetector: HYBRID mode initialized") + logging.info(f" SG threshold={self.sg_threshold}, min_samples={self.min_samples}") + logging.info(f" Expected travel={self.expected_travel}mm, speed={self.homing_speed}mm/s") + logging.info(f" Expected time={self.expected_time:.1f}s (with {self.timeout_margin}x margin)") + + def _handle_connect(self): + """Called when Klipper connects to MCU""" + # Try to get TMC5160 object for stepper_z + self.tmc = self.printer.lookup_object('tmc5160 stepper_z', None) + if self.tmc is None: + logging.warning("ObstacleDetector: TMC5160 stepper_z not found - SG detection disabled") + else: + logging.info("ObstacleDetector: Connected to TMC5160 stepper_z") + + # Get toolhead for position monitoring + self.toolhead = self.printer.lookup_object('toolhead', None) + if self.toolhead is None: + logging.warning("ObstacleDetector: Toolhead not found - position monitoring disabled") + + def _handle_ready(self): + """Called when Klipper is fully ready""" + logging.info("ObstacleDetector: Klipper ready, hybrid detection available") + if self.tmc is not None: + logging.info(" - StallGuard monitoring: ENABLED") + else: + logging.info(" - StallGuard monitoring: DISABLED") + if self.toolhead is not None: + logging.info(" - Position/timeout monitoring: ENABLED") + + def _set_tcoolthrs_for_homing(self): + """Set TCOOLTHRS for StallGuard during homing - follows Klipper's TMCEndstopHelper pattern + + IMPORTANT: TCOOLTHRS is a WRITE-ONLY register on TMC5160! + Reading it back returns garbage/SPI shift register value, not actual content. + We trust set_register() works (it would throw exception on SPI failure). + We verify StallGuard activation by checking TSTEP instead. + """ + try: + mcu_tmc = self.tmc.mcu_tmc + fields = self.tmc.fields + + # Save current field cache value (not from register read - it's write-only) + self.original_tcoolthrs = fields.get_field("tcoolthrs") + logging.info(f"ObstacleDetector: Saving TCOOLTHRS cache value=0x{self.original_tcoolthrs:X}") + + # Set TCOOLTHRS to max value (0xFFFFF) to enable StallGuard at all velocities + # Using the proper Klipper pattern: set_field returns full register value + new_tcoolthrs = 0xFFFFF + tc_val = fields.set_field("tcoolthrs", new_tcoolthrs) + + # Write to register - set_register() will throw exception if SPI fails + mcu_tmc.set_register("TCOOLTHRS", tc_val) + logging.info(f"ObstacleDetector: Set TCOOLTHRS=0x{new_tcoolthrs:X} (write-only, trusting SPI)") + + except Exception as e: + logging.error(f"ObstacleDetector: Error setting TCOOLTHRS for homing: {e}") + import traceback + logging.error(traceback.format_exc()) + + def _restore_tcoolthrs(self): + """Restore TCOOLTHRS to original value after homing""" + if self.original_tcoolthrs is None: + return + + try: + fields = self.tmc.fields + mcu_tmc = self.tmc.mcu_tmc + + tc_val = fields.set_field("tcoolthrs", self.original_tcoolthrs) + mcu_tmc.set_register("TCOOLTHRS", tc_val) + logging.info(f"ObstacleDetector: Restored TCOOLTHRS to 0x{self.original_tcoolthrs:X}") + self.original_tcoolthrs = None + + except Exception as e: + logging.error(f"ObstacleDetector: Error restoring TCOOLTHRS: {e}") + + def _start_monitor(self, hmove): + """Called when homing move begins""" + if self.tmc is None: + return + + # Check if this is Z-min homing (going DOWN toward the tank) + # We only want to detect obstacles when going DOWN, not during ZMAX_HOME (going up) + # + # ZMAX_HOME uses 'y' endstop (but moves stepper_z) -> DON'T monitor + # G28 Z / HOME_AXIS uses 'z' endstop -> DO monitor + # + # hmove.endstops is a list of (mcu_endstop, name) tuples + is_z_min_homing = False + for mcu_endstop, name in hmove.endstops: + # Check if this is the Z endstop (name 'z' or 'stepper_z') + # ZMAX_HOME uses 'y' endstop, so it won't match + if name in ('z', 'stepper_z'): + steppers = mcu_endstop.get_steppers() + for stepper in steppers: + if stepper.get_name() == 'stepper_z': + is_z_min_homing = True + break + if is_z_min_homing: + break + + if not is_z_min_homing: + return + + logging.info(f"ObstacleDetector: Starting timeout monitoring for Z homing") + logging.info(f" Expected time={self.expected_time:.1f}s (travel={self.expected_travel}mm @ {self.homing_speed}mm/s)") + + # Reset SG detection state + self.monitoring = True + self.low_sg_count = 0 + self.last_sg_values = [] + self.last_sg_reading = None + self.obstacle_detected = False + self.obstacle_sg_value = None + self.detection_reason = None + + # Reset hybrid detection state + self.homing_start_time = time.time() + self.last_z_position = None + self.stall_count = 0 + if self.toolhead: + pos = self.toolhead.get_position() + self.last_z_position = pos[2] + logging.info(f" Starting Z position: {self.last_z_position:.3f}mm") + + # Start polling timer with initial delay to allow motor to accelerate + # At startup, velocity is 0 and StealthChop is active (SG=0) + # We need to wait for the motor to reach homing speed and switch to SpreadCycle + initial_delay = 0.3 # 300ms delay to allow acceleration + reactor = self.printer.get_reactor() + self.check_timer = reactor.register_timer( + self._check_callback, + reactor.monotonic() + initial_delay + ) + logging.info(f"ObstacleDetector: Monitoring active after {initial_delay}s delay") + + def _stop_monitor(self, hmove): + """Called when homing move ends""" + if not self.monitoring: + return + + self.monitoring = False + if self.check_timer: + self.printer.get_reactor().unregister_timer(self.check_timer) + self.check_timer = None + + # Calculate elapsed time + elapsed = time.time() - self.homing_start_time if self.homing_start_time else 0 + + # Get final Z position for verification + final_z = None + if self.toolhead: + pos = self.toolhead.get_position() + final_z = pos[2] + + # Log final stats + if self.last_sg_values: + avg_sg = sum(self.last_sg_values) / len(self.last_sg_values) + min_sg = min(self.last_sg_values) + max_sg = max(self.last_sg_values) + logging.info(f"ObstacleDetector: Homing completed in {elapsed:.2f}s") + logging.info(f" SG stats: avg={avg_sg:.0f}, min={min_sg}, max={max_sg}, samples={len(self.last_sg_values)}") + if final_z is not None: + logging.info(f" Final Z position: {final_z:.3f}mm") + + # Verify if potential obstacle was a false positive + # + # IMPORTANT: When SG threshold detection triggers (SG=0), the endstop often + # triggers shortly after due to vibration from hitting the obstacle. + # This makes the final Z position unreliable for verification. + # + # New logic: + # - sg_threshold detection: ALWAYS treat as real obstacle (SG=0 is very specific) + # - timeout detection: Only treat as real if Z > threshold (motor might just be slow) + if self.obstacle_detected: + is_real_obstacle = False + z_threshold = 5.0 # mm + + if self.detection_reason == "sg_threshold": + # SG threshold detection is highly reliable - SG=0 means real stall + is_real_obstacle = True + logging.warning(f"ObstacleDetector: REAL OBSTACLE confirmed by StallGuard!") + logging.warning(f" Final Z={final_z:.3f}mm, SG dropped to threshold") + logging.warning(f" Flag file preserved for MicLoop: {self.flag_file}") + elif self.detection_reason == "timeout": + # Timeout detection - homing took longer than expected + # This usually means an obstacle blocked the endstop from being reached + # + # IMPORTANT: We can't use Z position for verification because: + # - When motor skips steps hitting an obstacle, Klipper loses position + # - Klipper may think Z=1mm when physically it's at 30mm+ + # - The position is UNRELIABLE after step loss + # + # A timeout during homing almost always means something went wrong + # (obstacle, mechanical issue, etc). Better to keep the flag and let + # MicLoop handle it rather than clear a valid detection. + is_real_obstacle = True + logging.warning(f"ObstacleDetector: REAL OBSTACLE confirmed by timeout!") + logging.warning(f" Homing took {elapsed:.2f}s (expected {self.expected_time:.2f}s)") + logging.warning(f" Klipper position Z={final_z:.3f}mm (may be inaccurate due to step loss)") + logging.warning(f" Flag file preserved for MicLoop: {self.flag_file}") + else: + # Unknown detection reason - keep the flag to be safe + is_real_obstacle = True + logging.warning(f"ObstacleDetector: Unknown detection reason '{self.detection_reason}', keeping flag") + + if not is_real_obstacle: + self.obstacle_detected = False + self.detection_reason = None + + # Reset hybrid detection state + self.homing_start_time = None + self.last_z_position = None + self.stall_count = 0 + + def _check_callback(self, eventtime): + """Timer callback for HYBRID obstacle detection + + Detection methods: + 1. SG threshold - TMC5160 StallGuard value drops below threshold + 2. Timeout - Homing takes longer than expected (obstacle blocking endstop) + 3. Position stall - Z position not changing despite motor commands + """ + if not self.monitoring: + return self.printer.get_reactor().NEVER + + try: + # ===== METHOD 1: StallGuard Detection ===== + # Read DRV_STATUS register + drv_status = self.tmc.mcu_tmc.get_register("DRV_STATUS") + sg_result = drv_status & 0x3FF # bits 0-9 + stallguard_flag = (drv_status >> 24) & 0x01 + stst = (drv_status >> 31) & 0x01 # standstill flag + cs_actual = (drv_status >> 16) & 0x1F + + # Read TSTEP (readable!) to verify velocity + tstep = self.tmc.mcu_tmc.get_register("TSTEP") + tstep_val = tstep & 0xFFFFF # 20-bit value + + # Log detailed debug info for first few samples + if len(self.last_sg_values) < 5: + elapsed = time.time() - self.homing_start_time if self.homing_start_time else 0 + logging.info(f"ObstacleDetector DEBUG: SG={sg_result} TSTEP={tstep_val} " + f"stst={stst} CS={cs_actual} elapsed={elapsed:.2f}s") + + # Skip SG check if motor is at standstill (not moving) + if not stst: + # Track values for statistics + self.last_sg_values.append(sg_result) + if len(self.last_sg_values) > 100: + self.last_sg_values.pop(0) + self.last_sg_reading = sg_result + + # Check for obstacle by SG threshold + if sg_result < self.sg_threshold: + self.low_sg_count += 1 + if self.low_sg_count >= self.min_samples and not self.obstacle_detected: + logging.warning(f"ObstacleDetector: OBSTACLE by SG threshold " + f"(SG={sg_result}, count={self.low_sg_count})") + self._handle_obstacle(sg_result, reason="sg_threshold") + else: + if self.low_sg_count > 0: + self.low_sg_count = 0 + + # ===== METHOD 2: Timeout Detection ===== + if self.homing_start_time: + elapsed = time.time() - self.homing_start_time + if elapsed > self.expected_time and not self.obstacle_detected: + logging.warning(f"ObstacleDetector: OBSTACLE by timeout " + f"(elapsed={elapsed:.2f}s > expected={self.expected_time:.2f}s)") + self._handle_obstacle(sg_result if self.last_sg_reading else 0, reason="timeout") + + # ===== METHOD 3: Position Stall Detection ===== + # NOTE: Position stall detection is DISABLED during homing because + # toolhead.get_position() doesn't update reliably during drip moves. + # We rely on SG threshold and timeout detection instead. + # This method is kept for future use with regular moves. + # + # if self.toolhead and self.last_z_position is not None: + # ... position stall detection code ... + + except Exception as e: + logging.error(f"ObstacleDetector: Error in check callback: {e}") + + return eventtime + self.check_interval + + def _handle_obstacle(self, sg_value, reason="unknown"): + """Handle detected obstacle - mark for later verification + + We DON'T shutdown immediately because SG=0 also occurs when the + Z-min endstop is triggered (normal homing completion). Instead: + 1. Create a flag file + 2. Let homing continue + 3. If homing completes normally (endstop triggered), the flag will be + cleared in _stop_monitor + 4. If homing times out (obstacle blocked endstop), flag persists for MicLoop + + Detection reasons: + - sg_threshold: StallGuard value dropped below threshold + - timeout: Homing took longer than expected + - position_stall: Z position not changing despite motor running + """ + # Don't stop monitoring - let homing continue + # The flag will be verified in _stop_monitor + + # Calculate elapsed time for diagnostics + elapsed = time.time() - self.homing_start_time if self.homing_start_time else 0 + + # Create flag file for potential obstacle + try: + with open(self.flag_file, 'w') as f: + f.write(f'OBSTACLE:{reason}:SG={sg_value}:elapsed={elapsed:.2f}s') + logging.warning(f"ObstacleDetector: Potential obstacle detected by {reason} " + f"(SG={sg_value}, elapsed={elapsed:.2f}s), flag created") + except Exception as e: + logging.error(f"ObstacleDetector: Error creating flag file: {e}") + + # Mark that we detected something (for verification in _stop_monitor) + self.obstacle_detected = True + self.obstacle_sg_value = sg_value + self.detection_reason = reason + + cmd_QUERY_SG_help = "Query current StallGuard value from TMC5160" + def cmd_QUERY_SG(self, gcmd): + """Manual command to query SG_RESULT""" + if self.tmc is None: + raise gcmd.error("TMC5160 stepper_z not found") + + try: + drv_status = self.tmc.mcu_tmc.get_register("DRV_STATUS") + sg_result = drv_status & 0x3FF + stallguard_flag = (drv_status >> 24) & 0x01 + cs_actual = (drv_status >> 16) & 0x1F + stst = (drv_status >> 31) & 0x01 + + gcmd.respond_info(f"TMC5160 stepper_z Status:") + gcmd.respond_info(f" SG_RESULT: {sg_result} (threshold: {self.sg_threshold})") + gcmd.respond_info(f" STALLGUARD flag: {stallguard_flag}") + gcmd.respond_info(f" CS_ACTUAL: {cs_actual}") + gcmd.respond_info(f" Standstill: {stst}") + gcmd.respond_info(f" Raw DRV_STATUS: 0x{drv_status:08X}") + except Exception as e: + raise gcmd.error(f"Error reading DRV_STATUS: {e}") + + cmd_SET_SG_THRESHOLD_help = "Set StallGuard threshold for obstacle detection" + def cmd_SET_SG_THRESHOLD(self, gcmd): + """Set the SG threshold dynamically""" + threshold = gcmd.get_int('THRESHOLD', None) + if threshold is None: + gcmd.respond_info(f"Current SG threshold: {self.sg_threshold}") + return + + if threshold < 0 or threshold > 1023: + raise gcmd.error("Threshold must be between 0 and 1023") + + old_threshold = self.sg_threshold + self.sg_threshold = threshold + gcmd.respond_info(f"SG threshold changed: {old_threshold} -> {self.sg_threshold}") + + cmd_SG_MONITOR_TEST_help = "Test SG monitoring for specified duration" + def cmd_SG_MONITOR_TEST(self, gcmd): + """Test command to monitor SG for a duration without homing""" + if self.tmc is None: + raise gcmd.error("TMC5160 stepper_z not found") + + duration = gcmd.get_float('DURATION', 5.0) + gcmd.respond_info(f"Monitoring SG_RESULT for {duration}s...") + gcmd.respond_info("Move Z axis manually or run a G1 Z command to see values") + + # Start a simple monitoring loop + import time + start_time = time.time() + values = [] + + while time.time() - start_time < duration: + try: + drv_status = self.tmc.mcu_tmc.get_register("DRV_STATUS") + sg_result = drv_status & 0x3FF + stst = (drv_status >> 31) & 0x01 + + if not stst: # Only log when moving + values.append(sg_result) + gcmd.respond_info(f"SG={sg_result}") + + time.sleep(0.1) + except: + pass + + if values: + gcmd.respond_info(f"Results: min={min(values)}, max={max(values)}, " + f"avg={sum(values)/len(values):.0f}, samples={len(values)}") + else: + gcmd.respond_info("No movement detected during test") + + cmd_ENABLE_STALLGUARD_help = "Enable StallGuard by setting TCOOLTHRS" + def cmd_ENABLE_STALLGUARD(self, gcmd): + """Command to enable/re-enable StallGuard + + Note: TCOOLTHRS is WRITE-ONLY. We cannot verify by read-back. + We trust set_register() works (would throw exception on SPI failure). + """ + if self.tmc is None: + raise gcmd.error("TMC5160 stepper_z not found") + + tcoolthrs = gcmd.get_int('TCOOLTHRS', self.tcoolthrs) + + try: + # Use fields API for proper register setting + fields = self.tmc.fields + old_cached = fields.get_field("tcoolthrs") + tc_val = fields.set_field("tcoolthrs", tcoolthrs) + + # Get synchronized print time + toolhead = self.printer.lookup_object('toolhead') + print_time = toolhead.get_last_move_time() + + # Write to register (TCOOLTHRS is write-only, cannot verify by read) + self.tmc.mcu_tmc.set_register("TCOOLTHRS", tc_val, print_time) + + gcmd.respond_info(f"TCOOLTHRS set: 0x{old_cached:X} -> 0x{tcoolthrs:X}") + gcmd.respond_info(f"Note: TCOOLTHRS is write-only, cannot verify by read-back") + gcmd.respond_info(f"Run QUERY_TMC_REGS while motor is moving to verify SG is working") + except Exception as e: + import traceback + gcmd.respond_info(f"Error: {traceback.format_exc()}") + raise gcmd.error(f"Error setting TCOOLTHRS: {e}") + + cmd_QUERY_TMC_REGS_help = "Query TMC5160 registers relevant to StallGuard" + def cmd_QUERY_TMC_REGS(self, gcmd): + """Query TMC5160 registers for debugging""" + if self.tmc is None: + raise gcmd.error("TMC5160 stepper_z not found") + + try: + # Read key registers (only readable ones!) + drv_status = self.tmc.mcu_tmc.get_register("DRV_STATUS") + tstep = self.tmc.mcu_tmc.get_register("TSTEP") # Readable: current velocity + chopconf = self.tmc.mcu_tmc.get_register("CHOPCONF") + gconf = self.tmc.mcu_tmc.get_register("GCONF") + + # Get cached values for write-only registers + fields = self.tmc.fields + tcoolthrs_cached = fields.get_field("tcoolthrs") + tpwmthrs_cached = fields.get_field("tpwmthrs") + sgt_cached = fields.get_field("sgt") + + # Parse DRV_STATUS + sg_result = drv_status & 0x3FF + cs_actual = (drv_status >> 16) & 0x1F + stallguard = (drv_status >> 24) & 0x01 + stst = (drv_status >> 31) & 0x01 + + # Parse TSTEP + tstep_val = tstep & 0xFFFFF + + # Parse CHOPCONF + toff = chopconf & 0x0F + mres = (chopconf >> 24) & 0x0F + + # Parse GCONF - TMC5160 uses en_spreadcycle (inverted logic from en_pwm_mode) + en_spreadcycle = (gconf >> 2) & 0x01 # bit 2: en_spreadcycle + stealthchop_active = not en_spreadcycle + + # StallGuard is active when: motor moving AND TSTEP <= TCOOLTHRS AND not in StealthChop + # Note: Higher TSTEP = lower velocity, so TSTEP <= TCOOLTHRS means "velocity below threshold" + sg_should_work = (not stst) and (tstep_val <= tcoolthrs_cached) and (not stealthchop_active) + + gcmd.respond_info("TMC5160 StallGuard Configuration:") + gcmd.respond_info(f" TCOOLTHRS (cached): 0x{tcoolthrs_cached:X} ({tcoolthrs_cached}) [WRITE-ONLY]") + gcmd.respond_info(f" TPWMTHRS (cached): 0x{tpwmthrs_cached:X} ({tpwmthrs_cached})") + gcmd.respond_info(f" SGT (cached): {sgt_cached}") + gcmd.respond_info(f" TSTEP (live): {tstep_val} (lower=faster)") + gcmd.respond_info(f" GCONF: en_spreadcycle={en_spreadcycle} -> StealthChop {'OFF' if en_spreadcycle else 'ON'}") + gcmd.respond_info(f" CHOPCONF: TOFF={toff}, MRES={mres} ({256 >> mres} usteps)") + gcmd.respond_info(f"DRV_STATUS (live):") + gcmd.respond_info(f" SG_RESULT: {sg_result}") + gcmd.respond_info(f" CS_ACTUAL: {cs_actual}") + gcmd.respond_info(f" STALLGUARD flag: {stallguard}") + gcmd.respond_info(f" Standstill: {stst}") + gcmd.respond_info(f" Raw: 0x{drv_status:08X}") + gcmd.respond_info(f"Analysis: SG should work = {sg_should_work}") + if stst: + gcmd.respond_info(f" -> Motor at standstill") + if stealthchop_active: + gcmd.respond_info(f" -> StealthChop active (SG disabled)") + if tstep_val > tcoolthrs_cached: + gcmd.respond_info(f" -> TSTEP > TCOOLTHRS (velocity too low for SG)") + + except Exception as e: + raise gcmd.error(f"Error reading registers: {e}") + + +def load_config(config): + return ObstacleDetector(config) diff --git a/klippy/extras/resonance_test.py b/klippy/extras/resonance_test.py new file mode 100644 index 000000000..180958081 --- /dev/null +++ b/klippy/extras/resonance_test.py @@ -0,0 +1,455 @@ +# Archivo: klipper/klippy/extras/resonance_test.py + +# Copyright (C) 2024 MicroLay +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +import logging, math + +class TestAxis: + def __init__(self, axis='z'): + self._name = axis + self._vib_dir = (0., 0., 1.) if axis == 'z' else (0., 0., 0.) + + def get_name(self): + return self._name + + def get_point(self, l): + return (0., 0., self._vib_dir[2] * l) + +class VibrationGenerator: + def __init__(self, config): + self.min_freq = config.getfloat('min_freq', 5., minval=1.) + self.max_freq = config.getfloat('max_freq', 135., minval=self.min_freq, maxval=100000.) + self.accel_per_hz = config.getfloat('accel_per_hz', 60., above=0.) + self.hz_per_sec = config.getfloat('hz_per_sec', 1., minval=0.1, maxval=100.) + self.default_amplitude = config.getfloat('amplitude', 1., above=0.) # Amplitud en mm + + def prepare_test(self, gcmd): + self.freq_start = gcmd.get_float("FREQ_START", self.min_freq, minval=1.) + self.freq_end = gcmd.get_float("FREQ_END", self.max_freq, minval=self.freq_start, maxval=100000.) + self.test_amplitude = gcmd.get_float("AMPLITUDE", self.default_amplitude, above=0.) + self.test_hz_per_sec = gcmd.get_float("HZ_PER_SEC", self.hz_per_sec, above=0., maxval=100.) + + # Calcular aceleración basada en la amplitud deseada + # Para un movimiento sinusoidal: a = (2πf)²A + # donde f es la frecuencia y A es la amplitud + self.test_accel_per_hz = (2. * math.pi) ** 2 * self.test_amplitude + + def gen_test(self): + freq = self.freq_start + res = [] + sign = 1. + time = 0. + # Asegurar un incremento mínimo para evitar quedarse atascado + min_freq_increment = 0.01 # Incremento mínimo de 0.01 Hz + + # Caso especial: si freq_start == freq_end, generar al menos un ciclo + if abs(self.freq_start - self.freq_end) < 0.00001: + t_seg = .25 / freq + accel = self.test_accel_per_hz * freq * freq # a = (2πf)²A + + # Generar al menos 4 puntos (un ciclo completo) + time += t_seg + res.append((time, sign * accel, freq)) + time += t_seg + res.append((time, -sign * accel, freq)) + time += t_seg + res.append((time, -sign * accel, freq)) + time += t_seg + res.append((time, sign * accel, freq)) + return res + + # Usar una condición más estricta para asegurar la terminación + while freq < self.freq_end: + t_seg = .25 / freq + accel = self.test_accel_per_hz * freq * freq # a = (2πf)²A + time += t_seg + res.append((time, sign * accel, freq)) + time += t_seg + res.append((time, -sign * accel, freq)) + + # Calcular el incremento basado en el algoritmo original + freq_increment = 2. * t_seg * self.test_hz_per_sec + + # Asegurar que el incremento no sea demasiado pequeño + if freq_increment < min_freq_increment: + freq_increment = min_freq_increment + + # Prevenir incrementos demasiado grandes para frecuencias muy bajas + if freq_increment > (self.freq_end - self.freq_start) / 10.0: + freq_increment = (self.freq_end - self.freq_start) / 10.0 + + freq += freq_increment + + # Asegurar que la última iteración llegue exactamente a freq_end + if freq > self.freq_end - freq_increment and freq < self.freq_end: + freq = self.freq_end + + sign = -sign + + # Añadir la frecuencia final si no está ya + if len(res) > 0 and res[-1][2] < self.freq_end: + t_seg = .25 / self.freq_end + accel = self.test_accel_per_hz * self.freq_end * self.freq_end + time += t_seg + res.append((time, sign * accel, self.freq_end)) + time += t_seg + res.append((time, -sign * accel, self.freq_end)) + + return res + +class ResonanceTest: + def __init__(self, config): + self.printer = config.get_printer() + self.gcode = self.printer.lookup_object('gcode') + self.axis = TestAxis('z') + self.generator = VibrationGenerator(config) + + # Registrar comandos + self.gcode.register_command( + 'RESONANCE_TEST_START', + self.cmd_RESONANCE_TEST_START, + desc=self.cmd_RESONANCE_TEST_START_help + ) + self.gcode.register_command( + 'RESONANCE_TEST_STOP', + self.cmd_RESONANCE_TEST_STOP, + desc=self.cmd_RESONANCE_TEST_STOP_help + ) + self.gcode.register_command( + 'RESONANCE_TEST_FIXED', + self.cmd_RESONANCE_TEST_FIXED, + desc=self.cmd_RESONANCE_TEST_FIXED_help + ) + + # Variables para el control de la prueba + self.is_testing = False + + def _check_axis_homed(self, toolhead, axis, skip_home): + status = toolhead.get_status(self.printer.get_reactor().monotonic()) + homed_axes = status.get('homed_axes', '') + if axis in homed_axes or skip_home: + return True + return False + + cmd_RESONANCE_TEST_START_help = "Inicia la prueba de resonancia con movimientos oscilatorios" + def cmd_RESONANCE_TEST_START(self, gcmd): + if self.is_testing: + raise gcmd.error("Ya hay una prueba de resonancia en curso") + + # Preparar la prueba + self.generator.prepare_test(gcmd) + test_seq = self.generator.gen_test() + + # Verificar que la secuencia de prueba no esté vacía + if not test_seq: + raise gcmd.error("No se pudo generar una secuencia de prueba válida con los parámetros proporcionados") + + # Obtener parámetros para controlar el homing + no_home = gcmd.get_int("NO_HOME", 0) + skip_pre_home = gcmd.get_int("SKIP_HOME", 0) + + # Obtener objetos necesarios + reactor = self.printer.get_reactor() + toolhead = self.printer.lookup_object('toolhead') + + # Verificar si Z está homeado, si es necesario + if not self._check_axis_homed(toolhead, 'z', skip_pre_home): + if not skip_pre_home: + gcmd.respond_info("Ejecutando G28 Z antes de la prueba...") + self.gcode.run_script_from_command("G28 Z") + else: + gcmd.respond_info("ADVERTENCIA: Eje Z no está homeado pero SKIP_HOME=1 fue especificado") + + # Guardar configuración actual + systime = reactor.monotonic() + toolhead_info = toolhead.get_status(systime) + old_max_accel = toolhead_info['max_accel'] + old_max_z_accel = toolhead_info.get('max_z_accel', old_max_accel) + old_max_velocity = toolhead_info['max_velocity'] + old_max_z_velocity = toolhead_info.get('max_z_velocity', old_max_velocity) + + # Obtener 'minimum_cruise_ratio' solo si existe, de lo contrario usar un valor predeterminado + old_minimum_cruise_ratio = toolhead_info.get('minimum_cruise_ratio', 0.5) + + # Calcular aceleración máxima necesaria + max_accel = max([abs(a) for _, a, _ in test_seq]) if test_seq else old_max_accel + + # Variables para restauración + tmc_z = None + old_thresh = None + + try: + self.is_testing = True + + # Configurar límites de velocidad más altos para la prueba + self.gcode.run_script_from_command( + "SET_VELOCITY_LIMIT ACCEL=%.3f ACCEL_TO_DECEL=%.3f SQUARE_CORNER_VELOCITY=5 VELOCITY=500" + % (max_accel, max_accel)) + + # Deshabilitar el modo stealthchop durante la prueba + tmc_z = self.printer.lookup_object('tmc5160 stepper_z', None) + if tmc_z is not None: + try: + old_thresh = tmc_z.get_register("TPWMTHRS") + tmc_z.set_register("TPWMTHRS", 0) + except: + gcmd.respond_info("No se pudo modificar TPWMTHRS") + + # Obtener posición actual + X, Y, Z, E = toolhead.get_position() + + # Ejecutar secuencia de prueba + last_v = last_t = last_freq = 0. + gcmd.respond_info("Iniciando prueba de resonancia de %.1f Hz a %.1f Hz" % + (self.generator.freq_start, self.generator.freq_end)) + + # Mostrar el número total de pasos para dar feedback de progreso + total_steps = len(test_seq) + current_step = 0 + + for next_t, accel, freq in test_seq: + current_step += 1 + if not self.is_testing: + break + + t_seg = next_t - last_t + toolhead.cmd_M204(self.gcode.create_gcode_command( + "M204", "M204", {"S": abs(accel)})) + + # Calcular nueva velocidad y posición + v = last_v + accel * t_seg + abs_v = abs(v) + if abs_v < 0.000001: + v = abs_v = 0. + abs_last_v = abs(last_v) + + # Calcular desplazamiento + v2 = v * v + last_v2 = last_v * last_v + half_inv_accel = .5 / accel if accel != 0 else 0 + d = (v2 - last_v2) * half_inv_accel + _, _, dZ = self.axis.get_point(d) + nZ = Z + dZ + + # Ejecutar movimiento + toolhead.limit_next_junction_speed(abs_last_v) + if v * last_v < 0: + # El movimiento primero se detiene y luego cambia de dirección + d_decel = -last_v2 * half_inv_accel + _, _, decel_Z = self.axis.get_point(d_decel) + toolhead.move([X, Y, Z + decel_Z, E], abs_last_v) + toolhead.move([X, Y, nZ, E], abs_v) + else: + toolhead.move([X, Y, nZ, E], max(abs_v, abs_last_v)) + + # Actualizar estado y mostrar progreso + if math.floor(freq) > math.floor(last_freq): + progress = int((current_step / total_steps) * 100) + gcmd.respond_info("Probando frecuencia %.1f Hz (Progreso: %d%%)" % (freq, progress)) + + Z = nZ + last_t = next_t + last_v = v + last_freq = freq + + # Desacelerar al final si es necesario + if last_v: + d_decel = -.5 * last_v2 / old_max_accel + _, _, decel_Z = self.axis.get_point(d_decel) + toolhead.cmd_M204(self.gcode.create_gcode_command( + "M204", "M204", {"S": old_max_accel})) + toolhead.move([X, Y, Z + decel_Z, E], abs(last_v)) + + # Mensaje de finalización + gcmd.respond_info("¡Prueba de resonancia completada con éxito!") + + finally: + self.is_testing = False + # Restaurar configuración original + self.gcode.run_script_from_command( + "SET_VELOCITY_LIMIT ACCEL=%.3f VELOCITY=%.3f SQUARE_CORNER_VELOCITY=5" + % (old_max_accel, old_max_velocity)) + + # Restaurar stealthchop si estaba activo + if tmc_z is not None and old_thresh is not None: + try: + tmc_z.set_register("TPWMTHRS", old_thresh) + except: + gcmd.respond_info("No se pudo restaurar TPWMTHRS") + + # Volver a home en Z solo si no se especificó NO_HOME=1 + if not no_home: + self.gcode.run_script_from_command("G28 Z") + gcmd.respond_info("Eje Z homeado después de la prueba") + else: + gcmd.respond_info("Se omitió el homing del eje Z según lo solicitado") + + # Mensaje final incluso si hubo algún error + if not gcmd.get_int("SILENT", 0): + gcmd.respond_info("Prueba de resonancia finalizada y configuración restaurada") + + cmd_RESONANCE_TEST_STOP_help = "Detiene la prueba de resonancia en curso" + def cmd_RESONANCE_TEST_STOP(self, gcmd): + if not self.is_testing: + gcmd.respond_info("No hay ninguna prueba de resonancia en curso") + return + self.is_testing = False + gcmd.respond_info("Prueba de resonancia detenida") + + def _setup_test_conditions(self, gcmd, max_accel): + # Obtener objetos necesarios + reactor = self.printer.get_reactor() + toolhead = self.printer.lookup_object('toolhead') + + # Guardar configuración actual + systime = reactor.monotonic() + toolhead_info = toolhead.get_status(systime) + old_max_accel = toolhead_info['max_accel'] + old_max_velocity = toolhead_info['max_velocity'] + + # Variables para restauración + tmc_z = None + old_thresh = None + + try: + # Configurar límites de velocidad más altos para la prueba + self.gcode.run_script_from_command( + "SET_VELOCITY_LIMIT ACCEL=%.3f ACCEL_TO_DECEL=%.3f SQUARE_CORNER_VELOCITY=5 VELOCITY=500" + % (max_accel, max_accel)) + + # Deshabilitar el modo stealthchop durante la prueba + tmc_z = self.printer.lookup_object('tmc5160 stepper_z', None) + if tmc_z is not None: + try: + old_thresh = tmc_z.get_register("TPWMTHRS") + tmc_z.set_register("TPWMTHRS", 0) + except: + gcmd.respond_info("No se pudo modificar TPWMTHRS") + + return toolhead, old_max_accel, old_max_velocity, tmc_z, old_thresh + + except Exception as e: + if tmc_z is not None and old_thresh is not None: + try: + tmc_z.set_register("TPWMTHRS", old_thresh) + except: + pass + raise + + def _restore_settings(self, gcmd, old_max_accel, old_max_velocity, tmc_z, old_thresh): + # Restaurar configuración original + self.gcode.run_script_from_command( + "SET_VELOCITY_LIMIT ACCEL=%.3f VELOCITY=%.3f SQUARE_CORNER_VELOCITY=5" + % (old_max_accel, old_max_velocity)) + + # Restaurar stealthchop si estaba activo + if tmc_z is not None and old_thresh is not None: + try: + tmc_z.set_register("TPWMTHRS", old_thresh) + except: + gcmd.respond_info("No se pudo restaurar TPWMTHRS") + + # Volver a home en Z solo si no se especificó NO_HOME=1 + no_home = gcmd.get_int("NO_HOME", 0) + skip_pre_home = gcmd.get_int("SKIP_HOME", 0) + + if not no_home and not skip_pre_home: + self.gcode.run_script_from_command("G28 Z") + gcmd.respond_info("Eje Z homeado después de la prueba") + else: + gcmd.respond_info("Se omitió el homing del eje Z según lo solicitado") + + cmd_RESONANCE_TEST_FIXED_help = "Ejecuta un movimiento oscilatorio con frecuencia fija" + def cmd_RESONANCE_TEST_FIXED(self, gcmd): + if self.is_testing: + raise gcmd.error("Ya hay una prueba de resonancia en curso") + + # Obtener parámetros para controlar el homing + skip_pre_home = gcmd.get_int("SKIP_HOME", 0) + + # Obtener y validar parámetros + freq = gcmd.get_float('FREQ', above=0.) + if freq > 100000: + raise gcmd.error("Frecuencia demasiado alta. Máximo recomendado: 100000 Hz") + + accel_per_hz = gcmd.get_float('ACCEL_PER_HZ', 60., above=0.) + duration = gcmd.get_float('DURATION', 5., above=0.) + + # Calcular aceleración basada en ACCEL_PER_HZ + accel = accel_per_hz * freq + + # Obtener toolhead + toolhead = self.printer.lookup_object('toolhead') + + # Verificar si Z está homeado, si es necesario + if not self._check_axis_homed(toolhead, 'z', skip_pre_home): + if not skip_pre_home: + gcmd.respond_info("Ejecutando G28 Z antes de la prueba...") + self.gcode.run_script_from_command("G28 Z") + else: + gcmd.respond_info("ADVERTENCIA: Eje Z no está homeado pero SKIP_HOME=1 fue especificado") + + try: + self.is_testing = True + + # Configurar condiciones de prueba + toolhead, old_max_accel, old_max_velocity, tmc_z, old_thresh = self._setup_test_conditions(gcmd, accel) + gcmd.respond_info("Configuración inicial completada") + + # Obtener el stepper Z + kin = toolhead.get_kinematics() + steppers = [s for s in kin.get_steppers() if s.get_name() == 'stepper_z'] + if not steppers: + raise gcmd.error("No se encontró el stepper Z") + stepper = steppers[0] + + # Obtener posición actual + X, Y, Z, E = toolhead.get_position() + gcmd.respond_info("Posición inicial: X=%.3f Y=%.3f Z=%.3f" % (X, Y, Z)) + + # Calcular parámetros del movimiento + period = 1. / freq # Periodo en segundos + t_seg = period / 4. # Tiempo por segmento (1/4 del periodo) + mcu = stepper.get_mcu() + print_time = mcu.estimated_print_time(toolhead.get_last_move_time()) + clock = mcu.print_time_to_clock(print_time) + + # Calcular tiempos en ciclos de reloj + cycle_ticks = mcu.seconds_to_clock(period) + + # Generar comandos de movimiento + gcmd.respond_info("Generando comandos de movimiento...") + + # Obtener el comando de paso + step_cmd = mcu.lookup_command( + "queue_step oid=%c interval=%u count=%hu add=%hi", + "queue_step oid=%c interval=%u count=%hu add=%hi") + + # Calcular parámetros de movimiento + interval = cycle_ticks // 4 # Dividir el ciclo en 4 partes + steps = 100 # Número de pasos por segmento + + # Enviar comandos de movimiento + for i in range(int(duration * freq)): + # Primer cuarto (hacia arriba) + step_cmd.send([stepper.get_oid(), interval, steps, 1]) + # Segundo cuarto (desaceleración) + step_cmd.send([stepper.get_oid(), interval, steps, -1]) + # Tercer cuarto (hacia abajo) + step_cmd.send([stepper.get_oid(), interval, steps, -1]) + # Cuarto cuarto (desaceleración) + step_cmd.send([stepper.get_oid(), interval, steps, 1]) + + if i % 10 == 0: + gcmd.respond_info("Ciclo %d enviado" % i) + + gcmd.respond_info("Comandos de movimiento enviados") + + finally: + self.is_testing = False + self._restore_settings(gcmd, old_max_accel, old_max_velocity, tmc_z, old_thresh) + +def load_config(config): + return ResonanceTest(config) diff --git a/klippy/extras/zmax_homing.py b/klippy/extras/zmax_homing.py new file mode 100644 index 000000000..5cad080e6 --- /dev/null +++ b/klippy/extras/zmax_homing.py @@ -0,0 +1,592 @@ +# Módulo para homing del eje Z hacia Zmax usando el endstop de Y +# +# Copyright (C) 2024 MicroLay +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +import logging + +class YEndstopWrapper: + """Wrapper para el endstop de Y que es compatible con probing_move() + + Usa el endstop de Y para la detección, pero devuelve los steppers de Z + para que probing_move mida correctamente el movimiento del eje Z. + """ + def __init__(self, y_endstop, z_steppers): + self.y_endstop = y_endstop + self.z_steppers = z_steppers + # Delegar métodos del endstop original + self.get_mcu = y_endstop.get_mcu + self.home_start = y_endstop.home_start + self.home_wait = y_endstop.home_wait + self.query_endstop = y_endstop.query_endstop + + def get_steppers(self): + # Devolver steppers de Z para que probing_move mida el eje correcto + return self.z_steppers + + def add_stepper(self, stepper): + # No añadir steppers al endstop Y original + pass + +class ZMaxHomingAlt: + def __init__(self, config): + self.printer = config.get_printer() + self.gcode = self.printer.lookup_object('gcode') + + # Registrar para eventos + self.printer.register_event_handler("klippy:connect", self._handle_connect) + + # Variables + self.toolhead = None + self.y_endstop = None + self.y_endstop_wrapper = None + self.z_stepper = None + self.phoming = None + + # Configuración + self.speed = config.getfloat('speed', 5.0, above=0.) + self.second_speed = config.getfloat('second_speed', 2.0, above=0.) + self.retract_dist = config.getfloat('retract_dist', 3.0, minval=0.) + self.retract_speed = config.getfloat('retract_speed', 3.0, above=0.) + self.final_retract = config.getboolean('final_retract', False) + + # Registrar comandos + self.gcode.register_command( + 'ZMAX_HOME', + self.cmd_ZMAX_HOME, + desc=self.cmd_ZMAX_HOME_help + ) + self.gcode.register_command( + 'Z_CALIBRATE', + self.cmd_Z_CALIBRATE, + desc=self.cmd_Z_CALIBRATE_help + ) + + logging.info(f"ZMaxHoming: Initialized with speeds - homing:{self.speed} mm/s, second:{self.second_speed} mm/s") + + def _handle_connect(self): + # Obtener referencias necesarias + self.toolhead = self.printer.lookup_object('toolhead') + self.phoming = self.printer.lookup_object('homing') + + # Obtener referencias de la cinemática + kin = self.toolhead.get_kinematics() + z_steppers = None + + if hasattr(kin, 'rails'): + # Primero obtener el stepper Z + for rail in kin.rails: + if rail.get_name() == "stepper_z": + self.z_stepper = rail + z_steppers = rail.get_steppers() + z_max_cfg = rail.get_range()[1] + logging.info(f"ZMaxHoming: Z-max position from config: {z_max_cfg}mm") + logging.info(f"ZMaxHoming: Z steppers: {[s.get_name() for s in z_steppers]}") + break + + # Obtener el endstop de Y + for rail in kin.rails: + if rail.get_name() == "stepper_y": + endstops = rail.get_endstops() + if endstops: + self.y_endstop = endstops[0][0] + logging.info(f"ZMaxHoming: Using Y endstop for Z-max homing") + break + + if self.y_endstop is None: + raise self.printer.config_error("No se encontró el endstop de Y") + if self.z_stepper is None: + raise self.printer.config_error("No se encontró el stepper Z") + + # Crear wrapper que usa endstop Y pero mide con steppers Z + self.y_endstop_wrapper = YEndstopWrapper(self.y_endstop, z_steppers) + + def _continuous_probe_move(self, target_pos, speed): + """ + Realiza un movimiento continuo usando la función oficial probing_move() de Klipper. + Esta es la misma función que usan todos los módulos de sondeo oficiales. + """ + try: + trigger_pos = self.phoming.probing_move(self.y_endstop_wrapper, target_pos, speed) + # Mensaje en el momento exacto de detección + gcode = self.printer.lookup_object('gcode') + gcode.respond_info(f"Y-endstop triggered at Z={trigger_pos[2]:.3f}mm") + return trigger_pos + except self.printer.command_error as e: + if "No trigger" in str(e): + # Create flag file for MicLoop obstacle recovery + import logging + try: + flag_file = '/tmp/homing_obstacle_flag' + with open(flag_file, 'w') as flag: + flag.write('TIMEOUT_OBSTACLE') + logging.info(f"ZMaxHoming: Obstacle flag created at {flag_file} (timeout detected)") + except Exception as flag_err: + logging.warning(f"ZMaxHoming: Could not create flag file: {flag_err}") + raise self.printer.command_error( + f"El endstop Y no se activó durante el movimiento hacia Z={target_pos[2]:.3f}. " + "Verifica que la plataforma pueda alcanzar el final de carrera.") + raise + + def _measure_platform_travel(self, gcmd, start_pos, z_max_cfg): + """ + Mide el recorrido total de la plataforma desde Z-max hasta Z-min + position_endstop + """ + try: + gcmd.respond_info("=== INICIANDO MEDICIÓN DE RECORRIDO ===") + + # Obtener configuración del stepper Z + z_min_cfg = self.z_stepper.get_range()[0] + + # Obtener position_endstop del stepper Z + z_endstop_pos = 0.0 # Valor por defecto + try: + # Buscar en la configuración del stepper Z + kin = self.toolhead.get_kinematics() + if hasattr(kin, 'rails'): + for rail in kin.rails: + if rail.get_name() == "stepper_z": + endstops = rail.get_endstops() + if endstops: + # Obtener position_endstop del endstop Z + z_endstop = endstops[0][0] + if hasattr(z_endstop, 'position_endstop'): + z_endstop_pos = z_endstop.position_endstop + break + except Exception as e: + gcmd.respond_info(f"Advertencia: No se pudo obtener position_endstop: {e}") + + gcmd.respond_info(f"Configuración Z: min={z_min_cfg:.3f}, max={z_max_cfg:.3f}, position_endstop={z_endstop_pos:.3f}") + + # Posición inicial (Z-max) + start_z = start_pos[2] + gcmd.respond_info(f"Posición inicial (Z-max): {start_z:.3f}mm") + + # Mover a Z-min + gcmd.respond_info("Moviendo a Z-min...") + min_pos = list(start_pos) + min_pos[2] = z_min_cfg + self.toolhead.move(min_pos, self.speed) + self.toolhead.wait_moves() + gcmd.respond_info(f"En Z-min: {z_min_cfg:.3f}mm") + + # Mover posición adicional según position_endstop + target_z = z_min_cfg - abs(z_endstop_pos) + gcmd.respond_info(f"Bajando {abs(z_endstop_pos):.3f}mm adicionales (position_endstop)...") + final_pos = list(min_pos) + final_pos[2] = target_z + self.toolhead.move(final_pos, self.speed) + self.toolhead.wait_moves() + + # Calcular recorrido total + total_travel = start_z - target_z + config_travel = z_max_cfg - (z_min_cfg - abs(z_endstop_pos)) + + gcmd.respond_info("=== RESULTADOS DE MEDICIÓN ===") + gcmd.respond_info(f"📏 Recorrido REAL medido: {total_travel:.3f}mm") + gcmd.respond_info(f"📐 Recorrido CONFIG teórico: {config_travel:.3f}mm") + gcmd.respond_info(f"📊 Diferencia: {abs(total_travel - config_travel):.3f}mm") + + if abs(total_travel - config_travel) > 0.5: + gcmd.respond_info("⚠️ ATENCIÓN: Diferencia > 0.5mm - Revisar montaje/configuración") + else: + gcmd.respond_info("✅ Medición dentro de tolerancia") + + except Exception as e: + gcmd.respond_info(f"❌ Error durante medición: {e}") + + cmd_ZMAX_HOME_help = "Realiza el homing del eje Z hacia Zmax usando el endstop de Y. Usa MEASURE=1 para medir el recorrido total de la plataforma." + def cmd_ZMAX_HOME(self, gcmd): + if not all([self.toolhead, self.y_endstop_wrapper, self.z_stepper, self.phoming]): + raise gcmd.error("ZMAX_HOME: Sistema no inicializado correctamente") + + # Obtener parámetros + speed = gcmd.get_float('SPEED', self.speed, above=0.) + second_speed = gcmd.get_float('SECOND_SPEED', self.second_speed, above=0.) + retract_dist = gcmd.get_float('RETRACT_DIST', self.retract_dist, minval=0.) + final_retract = gcmd.get_int('FINAL_RETRACT', -1) + if final_retract == -1: + final_retract = self.final_retract + else: + final_retract = bool(final_retract) + measure_travel = gcmd.get_int('MEASURE', 0, minval=0, maxval=1) + # Máximo recorrido de búsqueda (por defecto 150mm, suficiente para cualquier impresora) + max_travel = gcmd.get_float('MAX_TRAVEL', 150.0, above=0.) + + # Obtener configuración Z (solo para referencia, NO como límite) + z_max_cfg = self.z_stepper.get_range()[1] + + # Verificar estado de Z + curtime = self.printer.get_reactor().monotonic() + z_homed = 'z' in self.toolhead.get_status(curtime)['homed_axes'] + + # Detectar si ya estamos en Z-max desde el inicio + if not z_homed and self.y_endstop.query_endstop(0): + pos = self.toolhead.get_position() + pos[2] = z_max_cfg # Establecer directamente en Z-max + self.toolhead.set_position(pos, homing_axes=('z',)) + z_homed = True # Marcar como homeado + elif not z_homed: + # Homear Z temporalmente en posición actual + pos = self.toolhead.get_position() + self.toolhead.set_position(pos, homing_axes=('z',)) + + # Configurar modo absoluto + gcode_move = self.printer.lookup_object('gcode_move') + gcode_state = gcode_move.get_status()['absolute_coordinates'] + self.gcode.run_script_from_command("G90") + + # === EXPANDIR LÍMITES DE Z TEMPORALMENTE === + # Esto es CRÍTICO: si position_max está mal configurado, el primer probe + # no alcanzará el endstop físico. Expandimos los límites para permitir + # el recorrido completo. + kin = self.toolhead.get_kinematics() + original_z_limits = kin.limits[2] # (min, max) para eje Z + current_pos = self.toolhead.get_position() + temp_z_max = current_pos[2] + max_travel + kin.update_limits(2, (original_z_limits[0], temp_z_max)) + logging.info(f"ZMAX_HOME: Límites Z expandidos temporalmente: {original_z_limits} -> ({original_z_limits[0]}, {temp_z_max})") + + try: + # Variables para detección de pérdida de pasos + initial_z_pos = None + detected_z_pos = None + + # Obtener posición inicial + current_pos = self.toolhead.get_position() + initial_z_pos = current_pos[2] + + # Detectar si el endstop ya está activado al inicio + endstop_initially_triggered = self.y_endstop.query_endstop(0) + + # Solo considerar que está "ya activado" si estamos cerca de Z-max + if endstop_initially_triggered and current_pos[2] > (z_max_cfg - 10.0): + # Ya estamos en contacto con el endstop cerca de Z-max + detected_z_pos = current_pos[2] + + else: + # Endstop no activado - hacer búsqueda rápida primero + + # --- Búsqueda rápida continua --- + # IMPORTANTE: Usamos temp_z_max (límite expandido), NO z_max_cfg + movepos = list(current_pos) + movepos[2] = temp_z_max + + try: + first_trigger_pos = self._continuous_probe_move(movepos, speed) + detected_z_pos = first_trigger_pos[2] # Guardar posición REAL donde se detectó + except self.printer.command_error as e: + if "Probe triggered prior to movement" in str(e): + # El endstop se activó inmediatamente al iniciar el movimiento + detected_z_pos = current_pos[2] + else: + raise + + # La posición donde se detectó el endstop es el nuevo Z-max real + actual_z_max = detected_z_pos + logging.info(f"ZMAX_HOME: Endstop detectado en Z={actual_z_max:.3f}mm (z_max_cfg={z_max_cfg:.3f}mm)") + + # --- "Engañar" a Klipper estableciendo Z en la posición detectada --- + fake_pos = self.toolhead.get_position() + fake_pos[2] = actual_z_max + self.toolhead.set_position(fake_pos) + + # --- Retracción desde posición actual --- + retract_pos = list(fake_pos) + retract_pos[2] = actual_z_max - retract_dist + self.toolhead.move(retract_pos, self.retract_speed) + self.toolhead.wait_moves() + + # Verificar que el endstop se liberó + self.toolhead.wait_moves() + query_time = self.toolhead.get_last_move_time() + if self.y_endstop.query_endstop(query_time): + raise gcmd.error(f"Endstop Y sigue activado después de retracción de {retract_dist}mm. " + f"Aumenta retract_dist o verifica el endstop.") + + # --- Búsqueda fina continua --- + movepos = list(retract_pos) + movepos[2] = actual_z_max + 5.0 # Un poco más allá por si hay variación + + try: + final_trigger_pos = self._continuous_probe_move(movepos, second_speed) + except self.printer.command_error as e: + if "Probe triggered prior to movement" in str(e): + # El endstop se activó inmediatamente, usar posición actual + current_pos = self.toolhead.get_position() + gcmd.respond_info(f"Y-endstop triggered at Z={current_pos[2]:.3f}mm") + final_trigger_pos = current_pos + else: + raise + + # La posición final detectada es el Z-max definitivo + final_z_max = final_trigger_pos[2] + + # --- Establecer posición final --- + final_pos = list(final_trigger_pos) + # Usar z_max_cfg si está cerca, sino usar el valor detectado + if abs(final_z_max - z_max_cfg) < 5.0: + final_pos[2] = z_max_cfg + else: + final_pos[2] = final_z_max + logging.warning(f"ZMAX_HOME: Z-max detectado ({final_z_max:.3f}mm) difiere de config ({z_max_cfg:.3f}mm)") + + if not z_homed: + self.toolhead.set_position(final_pos, homing_axes=('z',)) + else: + self.toolhead.set_position(final_pos) + + gcmd.respond_info(f"Z position set to {final_pos[2]:.3f}mm") + + # --- Retracción final opcional --- + if final_retract and retract_dist > 0: + retract_pos = list(final_pos) + retract_pos[2] -= retract_dist + z_min_cfg = self.z_stepper.get_range()[0] + if retract_pos[2] < z_min_cfg: + retract_pos[2] = z_min_cfg + self.toolhead.move(retract_pos, self.retract_speed) + self.toolhead.wait_moves() + + # --- Medición del recorrido total (opcional) --- + if measure_travel: + self._measure_platform_travel(gcmd, final_pos, z_max_cfg) + + finally: + # === RESTAURAR LÍMITES ORIGINALES DE Z === + kin.update_limits(2, original_z_limits) + logging.info(f"ZMAX_HOME: Límites Z restaurados: {original_z_limits}") + + # Restaurar estado Gcode + if not gcode_state: + self.gcode.run_script_from_command("G91") + + cmd_Z_CALIBRATE_help = "Calibra el eje Z: hace G28 Z, sube hasta Zmax (endstop Y) y reporta la distancia real recorrida." + def cmd_Z_CALIBRATE(self, gcmd): + """ + Comando de calibración del eje Z: + 1. Ejecuta G28 Z (homing normal hacia abajo con endstop Zmin) + 2. Sube buscando el endstop Y (Zmax) sin perder la posición + 3. Reporta la distancia real recorrida + """ + if not all([self.toolhead, self.y_endstop_wrapper, self.z_stepper, self.phoming]): + raise gcmd.error("Z_CALIBRATE: Sistema no inicializado correctamente") + + # Parámetros opcionales + speed = gcmd.get_float('SPEED', self.speed, above=0.) + # Límite de seguridad: cuánto subir como máximo buscando el endstop + max_travel = gcmd.get_float('MAX_TRAVEL', 150.0, above=0.) + + gcmd.respond_info("=== Z_CALIBRATE: Iniciando calibración del eje Z ===") + + # Guardar estado de coordenadas + gcode_move = self.printer.lookup_object('gcode_move') + gcode_state = gcode_move.get_status()['absolute_coordinates'] + self.gcode.run_script_from_command("G90") + + try: + # --- PASO 1: Hacer homing normal hacia abajo (G28 Z) --- + gcmd.respond_info("Paso 1: Ejecutando G28 Z (homing hacia Zmin)...") + self.gcode.run_script_from_command("G28 Z") + self.toolhead.wait_moves() + + # Obtener posición después del homing (debería ser position_endstop) + pos_after_homing = self.toolhead.get_position() + z_start = pos_after_homing[2] + gcmd.respond_info(f" Posición después de G28 Z: {z_start:.3f}mm") + + # --- PASO 2: Subir buscando el endstop Y (Zmax) --- + gcmd.respond_info(f"Paso 2: Subiendo hasta encontrar endstop Zmax (max {max_travel:.1f}mm)...") + + # Verificar si el endstop Y ya está activado (no debería estarlo abajo) + self.toolhead.wait_moves() + query_time = self.toolhead.get_last_move_time() + if self.y_endstop.query_endstop(query_time): + gcmd.respond_info(" ADVERTENCIA: Endstop Y ya activado en posición inicial!") + gcmd.respond_info(" Esto indica un problema de cableado o configuración.") + return + + # Expandir temporalmente los límites de Z para permitir el movimiento + # Guardamos los límites originales para restaurarlos después + kin = self.toolhead.get_kinematics() + original_z_limits = kin.limits[2] # (min, max) para eje Z + + # Establecer nuevos límites temporales: desde posición actual hasta max_travel + current_pos = self.toolhead.get_position() + temp_z_max = current_pos[2] + max_travel + kin.update_limits(2, (original_z_limits[0], temp_z_max)) + gcmd.respond_info(f" Límites Z temporales: {original_z_limits[0]:.1f} - {temp_z_max:.1f}mm") + + # Calcular posición objetivo + target_pos = list(current_pos) + target_pos[2] = temp_z_max + + # Usar probing_move para subir hasta que se active el endstop Y + probing_error = None + try: + trigger_pos = self.phoming.probing_move( + self.y_endstop_wrapper, + target_pos, + speed + ) + # La distancia recorrida es la diferencia entre donde activó y donde empezó + z_end = trigger_pos[2] + travel_distance = z_end - z_start + gcmd.respond_info(f" Endstop Y activado en Z={z_end:.3f}mm (recorrido: {travel_distance:.3f}mm)") + + except self.printer.command_error as e: + if "No trigger" in str(e): + probing_error = gcmd.error( + f"El endstop Y (Zmax) no se activó después de {max_travel:.1f}mm. " + "Aumenta MAX_TRAVEL o verifica el endstop." + ) + elif "Probe triggered prior to movement" in str(e): + # El endstop se activó inmediatamente + travel_distance = 0.0 + z_end = z_start + gcmd.respond_info(f" Endstop Y activado inmediatamente en Z={z_end:.3f}mm") + else: + probing_error = e + + # Restaurar límites originales de Z (siempre) + kin.update_limits(2, original_z_limits) + + # Si hubo error, lanzarlo ahora + if probing_error: + raise probing_error + + # --- PASO 3: Calcular y reportar resultados --- + + gcmd.respond_info("") + gcmd.respond_info("=== RESULTADOS DE CALIBRACIÓN ===") + gcmd.respond_info(f" Posición inicial (Zmin): {z_start:.3f}mm") + gcmd.respond_info(f" Posición final (Zmax): {z_end:.3f}mm") + gcmd.respond_info(f" ----------------------------------------") + gcmd.respond_info(f" DISTANCIA RECORRIDA: {travel_distance:.3f}mm") + gcmd.respond_info("") + + # Obtener configuración actual para comparar + z_min_cfg, z_max_cfg = self.z_stepper.get_range() + config_range = z_max_cfg - z_min_cfg + + gcmd.respond_info(f" Configuración actual:") + gcmd.respond_info(f" position_min: {z_min_cfg:.3f}mm") + gcmd.respond_info(f" position_max: {z_max_cfg:.3f}mm") + gcmd.respond_info(f" Rango config: {config_range:.3f}mm") + gcmd.respond_info("") + + # Sugerencia de nuevo position_max + # El nuevo position_max debería ser: position_endstop + distancia_recorrida + # Pero position_endstop ya está incluido en z_start después del G28 + suggested_max = z_start + travel_distance + + diff = suggested_max - z_max_cfg + if abs(diff) > 0.5: + gcmd.respond_info(f" Diferencia con config actual: {diff:+.3f}mm") + gcmd.respond_info("") + gcmd.respond_info("=== ACTUALIZANDO CONFIGURACIÓN ===") + + # --- Actualizar NanoDLP CustomValues.ZLength --- + nanodlp_updated = self._update_nanodlp_zlength(gcmd, suggested_max) + + # --- Actualizar Klipper printer.cfg position_max --- + klipper_updated = self._update_klipper_position_max(gcmd, suggested_max) + + if klipper_updated: + gcmd.respond_info("") + gcmd.respond_info("=== REINICIANDO KLIPPER ===") + gcmd.respond_info(" La nueva configuración se cargará automáticamente...") + # Programar reinicio después de responder + reactor = self.printer.get_reactor() + reactor.register_callback(lambda e: self._restart_klipper()) + elif not nanodlp_updated and not klipper_updated: + gcmd.respond_info("") + gcmd.respond_info(" ⚠ No se pudo actualizar ninguna configuración") + else: + gcmd.respond_info(f" La configuración actual es correcta (diff: {diff:+.3f}mm)") + gcmd.respond_info(" No se requieren cambios.") + + finally: + # Restaurar estado Gcode + if not gcode_state: + self.gcode.run_script_from_command("G91") + + def _update_nanodlp_zlength(self, gcmd, new_zlength): + """Actualiza CustomValues.ZLength en NanoDLP modificando machine.json directamente""" + import json + + machine_json_path = "/home/pi/nanodlp/db/machine.json" + gcmd.respond_info(f" Actualizando NanoDLP ZLength a {new_zlength:.3f}mm...") + + try: + # Leer archivo JSON actual + with open(machine_json_path, 'r') as f: + machine_config = json.load(f) + + # Asegurar que CustomValues existe + if 'CustomValues' not in machine_config: + machine_config['CustomValues'] = {} + + # Actualizar ZLength (como string, igual que los demás valores) + old_value = machine_config['CustomValues'].get('ZLength', 'N/A') + machine_config['CustomValues']['ZLength'] = f"{new_zlength:.3f}" + + # Escribir archivo actualizado + with open(machine_json_path, 'w') as f: + json.dump(machine_config, f, indent='\t') + + gcmd.respond_info(f" ✓ NanoDLP ZLength actualizado: {old_value} → {new_zlength:.3f}mm") + return True + + except Exception as e: + gcmd.respond_info(f" ✗ Error actualizando NanoDLP: {e}") + return False + + def _update_klipper_position_max(self, gcmd, new_position_max): + """Actualiza position_max en printer.cfg""" + import re + + config_path = "/home/pi/printer_data/config/printer.cfg" + gcmd.respond_info(f" Actualizando Klipper position_max a {new_position_max:.3f}mm...") + + try: + # Leer archivo actual + with open(config_path, 'r') as f: + content = f.read() + + # Buscar y reemplazar position_max en la sección [stepper_z] + # Patrón: position_max: + pattern = r'(\[stepper_z\].*?position_max\s*:\s*)(\d+\.?\d*)' + + def replacer(match): + return f"{match.group(1)}{new_position_max:.3f}" + + new_content, count = re.subn(pattern, replacer, content, count=1, flags=re.DOTALL) + + if count == 0: + gcmd.respond_info(f" ✗ No se encontró position_max en [stepper_z]") + return False + + # Escribir archivo actualizado + with open(config_path, 'w') as f: + f.write(new_content) + + gcmd.respond_info(f" ✓ Klipper position_max actualizado correctamente") + return True + + except Exception as e: + gcmd.respond_info(f" ✗ Error actualizando printer.cfg: {e}") + return False + + def _restart_klipper(self): + """Reinicia Klipper para cargar la nueva configuración""" + import subprocess + try: + subprocess.run(['sudo', 'systemctl', 'restart', 'klipper.service'], + timeout=5, capture_output=True) + except Exception as e: + logging.error(f"Error reiniciando Klipper: {e}") + +def load_config(config): + return ZMaxHomingAlt(config) \ No newline at end of file