diff --git a/50_inst_per_sec.py b/50_inst_per_sec.py new file mode 100644 index 0000000000..58d446fdb4 --- /dev/null +++ b/50_inst_per_sec.py @@ -0,0 +1,399 @@ +#!/usr/bin/env python +import copy +import math +import os +import sys +import random +from typing import Dict, List, Optional, Tuple + + +# ==================================== +# Constants and Default Values +# ==================================== +DEFAULT_BUFFER_FILLING_RATE_IN_C_PER_MS = 50.0 / 1000.0 # The buffer filling rate in #commands/ms +DEFAULT_BUFFER_SIZE = 15 # The buffer size in #commands + + +def get_code_and_num(gcode_line: str) -> Tuple[str, str]: + """ + Gets the code and number from the given GCode line. + """ + gcode_line = gcode_line.strip() + cmd_code = gcode_line[0].upper() + cmd_num = str(gcode_line[1:]) + return cmd_code, cmd_num + + +def get_value_dict(parts: List[str]) -> Dict[str, str]: + """ + Fetches arguments such as X1 Y2 Z3 from the given part list and returns a dict. + """ + value_dict = {} + for p in parts: + p = p.strip() + if not p: + continue + code, num = get_code_and_num(p) + value_dict[code] = num + return value_dict + + +# ============================ +# Math Functions - Begin +# ============================ + +def calc_distance(pos1, pos2): + delta = {k: pos1[k] - pos2[k] for k in pos1} + distance = 0 + for value in delta.values(): + distance += value**2 + distance = math.sqrt(distance) + return distance + + +def calc_acceleration_distance(init_speed: float, target_speed: float, acceleration: float) -> float: + """ + Given the initial speed, the target speed, and the acceleration, calculate the distance that's needed for the + acceleration to finish. + """ + if acceleration == 0: + return 0.0 + return (target_speed**2 - init_speed**2) / (2 * acceleration) + + +def calc_travel_time(p0, p1, init_speed: float, target_speed: float, acceleration: float) -> float: + pass + + +class State: + + def __init__(self, previous_state: Optional["State"]) -> None: + self.X = 0.0 + self.Y = 0.0 + self.Z = 0.0 + self.E = 0.0 + self.F = 0.0 + self.speed = {"X": 0.0, + "Y": 0.0, + "Z": 0.0, + } + self.accelerations = {"XY": 0.0, + "Z": 0.0, + "S": 0.0, # printing + "T": 0.0, # travel + } + self.jerks = {"X": 0.0, + "Y": 0.0, + "Z": 0.0, + } + self.in_relative_positioning_mode = False # type: bool + self.in_relative_extrusion_mode = False # type: bool + + if previous_state is not None: + self.X = previous_state.X + self.Y = previous_state.Y + self.Z = previous_state.Z + self.E = previous_state.E + self.F = previous_state.F + self.speed = copy.deepcopy(previous_state.speed) + self.accelerations = copy.deepcopy(previous_state.accelerations) + self.jerks = copy.deepcopy(previous_state.jerks) + self.in_relative_positioning_mode = previous_state.in_relative_positioning_mode + self.in_relative_extrusion_mode = previous_state.in_relative_extrusion_mode + + +class Command: + + def __init__(self, cmd_str: str, previous_state: "State") -> None: + self._cmd_str = cmd_str # type: str + self._previous_state = previous_state # type: State + self._after_state = State(previous_state) # type: State + + self._distance_in_mm = 0.0 # type float + self._estimated_exec_time_in_ms = 0.0 # type: float + + self._cmd_process_function_map = { + "G": self._handle_g, + "M": self._handle_m, + "T": self._handle_t, + } + + self._is_comment = False # type: bool + self._is_empty = False # type: bool + + def get_after_state(self) -> State: + return self._after_state + + @property + def is_command(self) -> bool: + return not self._is_comment and not self._is_empty + + @property + def estimated_exec_time_in_ms(self) -> float: + return self._estimated_exec_time_in_ms + + def __str__(self) -> str: + if self._is_comment or self._is_empty: + return self._cmd_str + + distance_in_mm = round(self._distance_in_mm, 5) + + info = "d=%s f=%s t=%s" % (distance_in_mm, self._after_state.F, self._estimated_exec_time_in_ms) + + return self._cmd_str.strip() + " ; --- " + info + os.linesep + + def process(self) -> None: + """ + Estimates the execution time of this command and calculates the state after this command is executed. + """ + line = self._cmd_str.strip() + if not line: + self._is_empty = True + return + if line.startswith(";"): + self._is_comment = True + return + + # Remove comment + line = line.split(";", 1)[0].strip() + + parts = line.split(" ") + cmd_code, cmd_num = get_code_and_num(parts[0]) + cmd_num = int(cmd_num) + + func = self._cmd_process_function_map.get(cmd_code) + if func is None: + print("!!! no handle function for command type [%s]" % cmd_code) + return + func(cmd_num, parts) + + def _handle_g(self, cmd_num: int, parts: List[str]) -> None: + estimated_exec_time_in_ms = 0.0 + + # G0 and G1: Move + if cmd_num in (0, 1): + # Move + distance = 0.0 + if len(parts) > 0: + value_dict = get_value_dict(parts[1:]) + for key, value in value_dict.items(): + setattr(self._after_state, key, float(value)) + + current_position = {"X": self._previous_state.X, + "Y": self._previous_state.Y, + "Z": self._previous_state.Z, + } + new_position = copy.deepcopy(current_position) + for key in new_position: + new_value = float(value_dict.get(key, new_position[key])) + new_position[key] = new_value + + distance = calc_distance(current_position, new_position) + self._distance_in_mm = distance + travel_time_in_ms = distance / (self._after_state.F / 60.0) * 1000.0 + + estimated_exec_time_in_ms = travel_time_in_ms + + # TODO: take acceleration into account + + # G4: Dwell, pause the machine for a period of time. TODO + if cmd_num == 4: + # Pnnn is time to wait in milliseconds (P0 wait until all previous moves are finished) + cmd, num = get_code_and_num(parts[1]) + num = float(num) + if cmd == "P": + if num > 0: + estimated_exec_time_in_ms = num + + # G10: Retract. Assume 0.3 seconds for short retractions and 0.5 seconds for long retractions. + if cmd_num == 10: + # S0 is short retract (default), S1 is long retract + is_short_retract = True + if len(parts) > 1: + cmd, num = get_code_and_num(parts[1]) + if cmd == "S" and num == 1: + is_short_retract = False + estimated_exec_time_in_ms = (0.3 if is_short_retract else 0.5) * 1000 + + # G11: Unretract. Assume 0.5 seconds. + if cmd_num == 11: + estimated_exec_time_in_ms = 0.5 * 1000 + + # G90: Set to absolute positioning. Assume 0 seconds. + if cmd_num == 90: + self._after_state.in_relative_positioning_mode = False + estimated_exec_time_in_ms = 0.0 + + # G91: Set to relative positioning. Assume 0 seconds. + if cmd_num == 91: + self._after_state.in_relative_positioning_mode = True + estimated_exec_time_in_ms = 0.0 + + # G92: Set position. Assume 0 seconds. + if cmd_num == 92: + # TODO: check + value_dict = get_value_dict(parts[1:]) + for key, value in value_dict.items(): + setattr(self._previous_state, key, value) + + # G280: Prime. Assume 10 seconds for using blob and 5 seconds for no blob. + if cmd_num == 280: + use_blob = True + if len(parts) > 1: + cmd, num = get_code_and_num(parts[1]) + if cmd == "S" and num == 1: + use_blob = False + estimated_exec_time_in_ms = (10.0 if use_blob else 5.0) * 1000 + + # Update estimated execution time + self._estimated_exec_time_in_ms = round(estimated_exec_time_in_ms, 5) + + def _handle_m(self, cmd_num: int, parts: List[str]) -> None: + estimated_exec_time_in_ms = 0.0 + + # M82: Set extruder to absolute mode. Assume 0 execution time. + if cmd_num == 82: + self._after_state.in_relative_extrusion_mode = False + estimated_exec_time_in_ms = 0.0 + + # M83: Set extruder to relative mode. Assume 0 execution time. + if cmd_num == 83: + self._after_state.in_relative_extrusion_mode = True + estimated_exec_time_in_ms = 0.0 + + # M104: Set extruder temperature (no wait). Assume 0 execution time. + if cmd_num == 104: + estimated_exec_time_in_ms = 0.0 + + # M106: Set fan speed. Assume 0 execution time. + if cmd_num == 106: + estimated_exec_time_in_ms = 0.0 + + # M107: Turn fan off. Assume 0 execution time. + if cmd_num == 107: + estimated_exec_time_in_ms = 0.0 + + # M109: Set extruder temperature (wait). Uniformly random time between 30 - 90 seconds. + if cmd_num == 109: + estimated_exec_time_in_ms = random.uniform(30, 90) * 1000 # TODO: Check + + # M140: Set bed temperature (no wait). Assume 0 execution time. + if cmd_num == 140: + estimated_exec_time_in_ms = 0.0 + + # M204: Set default acceleration. Assume 0 execution time. + if cmd_num == 204: + value_dict = get_value_dict(parts[1:]) + for key, value in value_dict.items(): + self._after_state.accelerations[key] = float(value) + estimated_exec_time_in_ms = 0.0 + + # M205: Advanced settings, we only set jerks for Griffin. Assume 0 execution time. + if cmd_num == 205: + value_dict = get_value_dict(parts[1:]) + for key, value in value_dict.items(): + self._after_state.jerks[key] = float(value) + estimated_exec_time_in_ms = 0.0 + + self._estimated_exec_time_in_ms = estimated_exec_time_in_ms + + def _handle_t(self, cmd_num: int, parts: List[str]) -> None: + # Tn: Switching extruder. Assume 2 seconds. + estimated_exec_time_in_ms = 2.0 + + self._estimated_exec_time_in_ms = estimated_exec_time_in_ms + + +class CommandBuffer: + def __init__(self, all_lines: List[str], + buffer_filling_rate: float = DEFAULT_BUFFER_FILLING_RATE_IN_C_PER_MS, + buffer_size: int = DEFAULT_BUFFER_SIZE + ) -> None: + self._all_lines = all_lines + self._all_commands = list() + + self._buffer_filling_rate = buffer_filling_rate # type: float + self._buffer_size = buffer_size # type: int + + # If the buffer can depletes less than this amount time, it can be filled up in time. + lower_bound_buffer_depletion_time = self._buffer_size / self._buffer_filling_rate # type: float + + self._detection_time_frame = lower_bound_buffer_depletion_time + self._code_count_limit = self._buffer_size + print("Time Frame: %s" % self._detection_time_frame) + print("Code Limit: %s" % self._code_count_limit) + + self._bad_frame_ranges = [] + + def process(self) -> None: + previous_state = None + cmd0_idx = 0 + total_frame_time_in_ms = 0.0 + cmd_count = 0 + for idx, line in enumerate(self._all_lines): + cmd = Command(line, previous_state) + cmd.process() + self._all_commands.append(cmd) + previous_state = cmd.get_after_state() + + if not cmd.is_command: + continue + + cmd_count += 1 + if idx > cmd0_idx or idx == 0: + total_frame_time_in_ms += cmd.estimated_exec_time_in_ms + + if total_frame_time_in_ms > 1000.0: + # Find the next starting command which makes the total execution time of the frame to be less than + # 1 second. + cmd0_idx += 1 + total_frame_time_in_ms -= self._all_commands[cmd0_idx].estimated_exec_time_in_ms + cmd_count -= 1 + while total_frame_time_in_ms > 1000.0: + cmd0_idx += 1 + total_frame_time_in_ms -= self._all_commands[cmd0_idx].estimated_exec_time_in_ms + cmd_count -= 1 + + # If within the current time frame the code count exceeds the limit, record that. + if total_frame_time_in_ms <= self._detection_time_frame and cmd_count > self._code_count_limit: + need_to_append = True + if self._bad_frame_ranges: + last_item = self._bad_frame_ranges[-1] + if last_item["start_line"] == cmd0_idx: + last_item["end_line"] = idx + last_item["cmd_count"] = cmd_count + last_item["time_in_ms"] = total_frame_time_in_ms + need_to_append = False + if need_to_append: + self._bad_frame_ranges.append({"start_line": cmd0_idx, + "end_line": idx, + "cmd_count": cmd_count, + "time_in_ms": total_frame_time_in_ms}) + + def to_file(self, file_name: str) -> None: + all_lines = [str(c) for c in self._all_commands] + with open(file_name, "w", encoding = "utf-8") as f: + f.writelines(all_lines) + + def report(self) -> None: + for item in self._bad_frame_ranges: + print("!!!!! potential bad frame from line %s to %s, code count = %s, in %s ms" % ( + item["start_line"], item["end_line"], item["cmd_count"], round(item["time_in_ms"], 4))) + + +if __name__ == "__main__": + #if len(sys.argv) != 3: + # print("Usage: ") + # sys.exit(1) + #in_filename = sys.argv[1] + #out_filename = sys.argv[2] + + in_filename = "/home/lfei/UM3_elephant_pendant_2.gcode" + out_filename = "/home/lfei/UM3_elephant_pendant_2-annotated.gcode" + with open(in_filename, "r", encoding = "utf-8") as f: + all_lines = f.readlines() + + buf = CommandBuffer(all_lines) + buf.process() + buf.to_file(out_filename) + buf.report()