# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import copy
import math
import os
import sys
import random
from typing import Dict, List, Optional, Tuple
# ====================================
# Constants and Default Values
# ====================================
DEFAULT_BUFFER_FILLING_RATE_IN_C_PER_MS = 50.0 / 1000.0 # The buffer filling rate in #commands/ms
DEFAULT_BUFFER_SIZE = 15 # The buffer size in #commands
#Values for Ultimaker S5.
MACHINE_MAX_FEEDRATE_X = 300
MACHINE_MAX_FEEDRATE_Y = 300
MACHINE_MAX_FEEDRATE_Z = 40
MACHINE_MAX_FEEDRATE_E = 45
MACHINE_MAX_ACCELERATION_X = 9000
MACHINE_MAX_ACCELERATION_Y = 9000
MACHINE_MAX_ACCELERATION_Z = 100
MACHINE_MAX_ACCELERATION_E = 10000
MACHINE_MAX_JERK_XY = 20
MACHINE_MAX_JERK_Z = 0.4
MACHINE_MAX_JERK_E = 5
MACHINE_MINIMUM_FEEDRATE = 0
MACHINE_ACCELERATION = 3000
## Gets the code and number from the given g-code line.
def get_code_and_num(gcode_line: str) -> Tuple[str, str]:
gcode_line = gcode_line.strip()
cmd_code = gcode_line[0].upper()
cmd_num = str(gcode_line[1:])
return cmd_code, cmd_num
## Fetches arguments such as X1 Y2 Z3 from the given part list and returns a
# dict.
def get_value_dict(parts: List[str]) -> Dict[str, str]:
value_dict = {}
for p in parts:
p = p.strip()
if not p:
continue
code, num = get_code_and_num(p)
value_dict[code] = num
return value_dict
# ============================
# Math Functions - Begin
# ============================
def calc_distance(pos1, pos2):
delta = {k: pos1[k] - pos2[k] for k in pos1}
distance = 0
for value in delta.values():
distance += value ** 2
distance = math.sqrt(distance)
return distance
## Given the initial speed, the target speed, and the acceleration, calculate
# the distance that's neede for the acceleration to finish.
def calc_acceleration_distance(init_speed: float, target_speed: float, acceleration: float) -> float:
if acceleration == 0:
return 0.0
return (target_speed ** 2 - init_speed ** 2) / (2 * acceleration)
def calc_travel_time(p0, p1, init_speed: float, target_speed: float, acceleration: float) -> float:
pass
## Calculates the point at which you must start braking.
#
# This gives the distance from the start of a line at which you must start
# decelerating (at a rate of `-acceleration`) if you started at speed
# `initial_feedrate` and accelerated until this point and want to end at the
# `final_feedrate` after a total travel of `distance`. This can be used to
# compute the intersection point between acceleration and deceleration in the
# cases where the trapezoid has no plateau (i.e. never reaches maximum speed).
def calc_intersection_distance(initial_feedrate: float, final_feedrate: float, acceleration: float, distance: float) -> float:
if acceleration == 0:
return 0
return (2 * acceleration * distance - initial_feedrate * initial_feedrate + final_feedrate * final_feedrate) / (4 * acceleration)
class State:
def __init__(self, previous_state: Optional["State"]) -> None:
self.X = 0.0
self.Y = 0.0
self.Z = 0.0
self.E = 0.0
self.F = 0.0
self.speed = {"X": 0.0,
"Y": 0.0,
"Z": 0.0,
"E": 0.0,
}
self.accelerations = {"X": 0.0,
"Y": 0.0,
"Z": 0.0,
"E": 0.0,
"S": 0.0, # printing
"T": 0.0, # travel
}
self.jerks = {"X": 0.0,
"Y": 0.0,
"Z": 0.0,
"E": 0.0,
}
self.in_relative_positioning_mode = False # type: bool
self.in_relative_extrusion_mode = False # type: bool
if previous_state is not None:
self.X = previous_state.X
self.Y = previous_state.Y
self.Z = previous_state.Z
self.E = previous_state.E
self.F = previous_state.F
self.speed = copy.deepcopy(previous_state.speed)
self.accelerations = copy.deepcopy(previous_state.accelerations)
self.jerks = copy.deepcopy(previous_state.jerks)
self.in_relative_positioning_mode = previous_state.in_relative_positioning_mode
self.in_relative_extrusion_mode = previous_state.in_relative_extrusion_mode
class Command:
def __init__(self, cmd_str: str, previous_state: "State") -> None:
self._cmd_str = cmd_str # type: str
self._previous_state = previous_state # type: State
self._after_state = State(previous_state) # type: State
self._distance_in_mm = 0.0 # type float
self._estimated_exec_time_in_ms = 0.0 # type: float
self._cmd_process_function_map = {
"G": self._handle_g,
"M": self._handle_m,
"T": self._handle_t,
}
self._is_comment = False # type: bool
self._is_empty = False # type: bool
#Fields taken from CuraEngine's implementation.
self._recalculate = False
self._accelerate_until = 0
self._decelerate_after = 0
self._initial_feedrate = 0
self._final_feedrate = 0
self._entry_speed = 0
self._max_entry_speed =0
self._nominal_length = False
self._nominal_feedrate = 0
self._max_travel = 0
self._distance = 0
self._acceleration = 0
self._delta = [0, 0, 0]
self._abs_delta = [0, 0, 0]
## Calculate the velocity-time trapezoid function for this move.
#
# Each move has a three-part function mapping time to velocity.
def calculate_trapezoid(self, entry_factor, exit_factor):
initial_feedrate = self._nominal_feedrate * entry_factor
final_feedrate = self._nominal_feedrate * exit_factor
#How far are we accelerating and how far are we decelerating?
accelerate_distance = calc_acceleration_distance(initial_feedrate, self._nominal_feedrate, self._acceleration)
decelerate_distance = calc_acceleration_distance(self._nominal_feedrate, final_feedrate, -self._acceleration)
plateau_distance = self._distance - accelerate_distance - decelerate_distance #And how far in between at max speed?
#Is the plateau negative size? That means no cruising, and we'll have to
#use intersection_distance to calculate when to abort acceleration and
#start braking in order to reach the final_rate exactly at the end of
#this command.
if plateau_distance < 0:
accelerate_distance = calc_intersection_distance(initial_feedrate, final_feedrate, self._acceleration, self._distance)
accelerate_distance = max(accelerate_distance, 0) #Due to rounding errors.
accelerate_distance = min(accelerate_distance, self._distance)
plateau_distance = 0
self._accelerate_until = accelerate_distance
self._decelerate_after = accelerate_distance + plateau_distance
self._initial_feedrate = initial_feedrate
self._final_feedrate = final_feedrate
def get_after_state(self) -> State:
return self._after_state
@property
def is_command(self) -> bool:
return not self._is_comment and not self._is_empty
@property
def estimated_exec_time_in_ms(self) -> float:
return self._estimated_exec_time_in_ms
def __str__(self) -> str:
if self._is_comment or self._is_empty:
return self._cmd_str
distance_in_mm = round(self._distance_in_mm, 5)
info = "d=%s f=%s t=%s" % (distance_in_mm, self._after_state.F, self._estimated_exec_time_in_ms)
return self._cmd_str.strip() + " ; --- " + info + os.linesep
## Estimates the execution time of this command and calculates the state
# after this command is executed.
def parse(self) -> None:
line = self._cmd_str.strip()
if not line:
self._is_empty = True
return
if line.startswith(";"):
self._is_comment = True
return
# Remove comment
line = line.split(";", 1)[0].strip()
parts = line.split(" ")
cmd_code, cmd_num = get_code_and_num(parts[0])
cmd_num = int(cmd_num)
func = self._cmd_process_function_map.get(cmd_code)
if func is None:
print("!!! no handle function for command type [%s]" % cmd_code)
return
func(cmd_num, parts)
def _handle_g(self, cmd_num: int, parts: List[str]) -> None:
estimated_exec_time_in_ms = 0.0
# G0 and G1: Move
if cmd_num in (0, 1):
# Move
distance = 0.0
if len(parts) > 0:
value_dict = get_value_dict(parts[1:])
for key, value in value_dict.items():
setattr(self._after_state, key, float(value))
current_position = {"X": self._previous_state.X,
"Y": self._previous_state.Y,
"Z": self._previous_state.Z,
"E": self._previous_state.E,
}
new_position = copy.deepcopy(current_position)
for key in new_position:
new_value = float(value_dict.get(key, new_position[key]))
new_position[key] = new_value
distance = calc_distance(current_position, new_position)
self._distance_in_mm = distance
self._delta = [
new_position["X"] - current_position["X"],
new_position["Y"] - current_position["Y"],
new_position["Z"] - current_position["Z"],
new_position["E"] - current_position["E"]
]
self._abs_delta = [abs(x) for x in self._delta]
self._max_travel = max(self._abs_delta)
if self._max_travel > 0:
feedrate = self._previous_state.F
if "F" in value_dict:
feedrate = value_dict["F"]
if feedrate < MACHINE_MINIMUM_FEEDRATE:
feedrate = MACHINE_MINIMUM_FEEDRATE
self._nominal_feedrate = feedrate
self._distance = math.sqrt(self._abs_delta[0] ** 2 + self._abs_delta[1] ** 2 + self._abs_delta[2] ** 2)
if self._distance == 0:
self._distance = self._abs_delta[3]
current_feedrate = [d * feedrate / self._distance for d in self._delta]
current_abs_feedrate = [abs(f) for f in current_feedrate]
feedrate_factor = min(1.0, MACHINE_MAX_FEEDRATE_X)
feedrate_factor = min(feedrate_factor, MACHINE_MAX_FEEDRATE_Y)
feedrate_factor = min(feedrate_factor, MACHINE_MAX_FEEDRATE_Z)
feedrate_factor = min(feedrate_factor, MACHINE_MAX_FEEDRATE_E)
#TODO: XY_FREQUENCY_LIMIT
current_feedrate = [f * feedrate_factor for f in current_feedrate]
current_abs_feedrate = [f * feedrate_factor for f in current_abs_feedrate]
self._nominal_feedrate *= feedrate_factor
self._acceleration = MACHINE_ACCELERATION
max_accelerations = [MACHINE_MAX_ACCELERATION_X, MACHINE_MAX_ACCELERATION_Y, MACHINE_MAX_ACCELERATION_Z, MACHINE_MAX_ACCELERATION_E]
for n in range(len(max_accelerations)):
if self._acceleration * self._abs_delta[n] / self._distance > max_accelerations[n]:
self._acceleration = max_accelerations[n]
vmax_junction = MACHINE_MAX_JERK_XY / 2
vmax_junction_factor = 1.0
if current_abs_feedrate[2] > MACHINE_MAX_JERK_Z / 2:
vmax_junction = min(vmax_junction, MACHINE_MAX_JERK_Z)
if current_abs_feedrate[3] > MACHINE_MAX_JERK_E / 2:
vmax_junction = min(vmax_junction, MACHINE_MAX_JERK_E)
vmax_junction = min(vmax_junction, self._nominal_feedrate)
safe_speed = vmax_junction
#TODO: Compute junction maximum speed factor and apply this to entry speed, set flags and calculate trapezoid.
travel_time_in_ms = distance / (self._after_state.F / 60.0) * 1000.0
estimated_exec_time_in_ms = travel_time_in_ms
# TODO: take acceleration into account
# G4: Dwell, pause the machine for a period of time. TODO
if cmd_num == 4:
# Pnnn is time to wait in milliseconds (P0 wait until all previous moves are finished)
cmd, num = get_code_and_num(parts[1])
num = float(num)
if cmd == "P":
if num > 0:
estimated_exec_time_in_ms = num
# G10: Retract. Assume 0.3 seconds for short retractions and 0.5 seconds for long retractions.
if cmd_num == 10:
# S0 is short retract (default), S1 is long retract
is_short_retract = True
if len(parts) > 1:
cmd, num = get_code_and_num(parts[1])
if cmd == "S" and num == 1:
is_short_retract = False
estimated_exec_time_in_ms = (0.3 if is_short_retract else 0.5) * 1000
# G11: Unretract. Assume 0.5 seconds.
if cmd_num == 11:
estimated_exec_time_in_ms = 0.5 * 1000
# G90: Set to absolute positioning. Assume 0 seconds.
if cmd_num == 90:
self._after_state.in_relative_positioning_mode = False
estimated_exec_time_in_ms = 0.0
# G91: Set to relative positioning. Assume 0 seconds.
if cmd_num == 91:
self._after_state.in_relative_positioning_mode = True
estimated_exec_time_in_ms = 0.0
# G92: Set position. Assume 0 seconds.
if cmd_num == 92:
# TODO: check
value_dict = get_value_dict(parts[1:])
for key, value in value_dict.items():
setattr(self._previous_state, key, value)
# G280: Prime. Assume 10 seconds for using blob and 5 seconds for no blob.
if cmd_num == 280:
use_blob = True
if len(parts) > 1:
cmd, num = get_code_and_num(parts[1])
if cmd == "S" and num == 1:
use_blob = False
estimated_exec_time_in_ms = (10.0 if use_blob else 5.0) * 1000
# Update estimated execution time
self._estimated_exec_time_in_ms = round(estimated_exec_time_in_ms, 5)
def _handle_m(self, cmd_num: int, parts: List[str]) -> None:
estimated_exec_time_in_ms = 0.0
# M82: Set extruder to absolute mode. Assume 0 execution time.
if cmd_num == 82:
self._after_state.in_relative_extrusion_mode = False
estimated_exec_time_in_ms = 0.0
# M83: Set extruder to relative mode. Assume 0 execution time.
if cmd_num == 83:
self._after_state.in_relative_extrusion_mode = True
estimated_exec_time_in_ms = 0.0
# M104: Set extruder temperature (no wait). Assume 0 execution time.
if cmd_num == 104:
estimated_exec_time_in_ms = 0.0
# M106: Set fan speed. Assume 0 execution time.
if cmd_num == 106:
estimated_exec_time_in_ms = 0.0
# M107: Turn fan off. Assume 0 execution time.
if cmd_num == 107:
estimated_exec_time_in_ms = 0.0
# M109: Set extruder temperature (wait). Uniformly random time between 30 - 90 seconds.
if cmd_num == 109:
estimated_exec_time_in_ms = random.uniform(30, 90) * 1000 # TODO: Check
# M140: Set bed temperature (no wait). Assume 0 execution time.
if cmd_num == 140:
estimated_exec_time_in_ms = 0.0
# M204: Set default acceleration. Assume 0 execution time.
if cmd_num == 204:
value_dict = get_value_dict(parts[1:])
for key, value in value_dict.items():
self._after_state.accelerations[key] = float(value)
estimated_exec_time_in_ms = 0.0
# M205: Advanced settings, we only set jerks for Griffin. Assume 0 execution time.
if cmd_num == 205:
value_dict = get_value_dict(parts[1:])
for key, value in value_dict.items():
self._after_state.jerks[key] = float(value)
estimated_exec_time_in_ms = 0.0
self._estimated_exec_time_in_ms = estimated_exec_time_in_ms
def _handle_t(self, cmd_num: int, parts: List[str]) -> None:
# Tn: Switching extruder. Assume 2 seconds.
estimated_exec_time_in_ms = 2.0
self._estimated_exec_time_in_ms = estimated_exec_time_in_ms
class CommandBuffer:
def __init__(self, all_lines: List[str],
buffer_filling_rate: float = DEFAULT_BUFFER_FILLING_RATE_IN_C_PER_MS,
buffer_size: int = DEFAULT_BUFFER_SIZE
) -> None:
self._all_lines = all_lines
self._all_commands = list()
self._buffer_filling_rate = buffer_filling_rate # type: float
self._buffer_size = buffer_size # type: int
# If the buffer can depletes less than this amount time, it can be filled up in time.
lower_bound_buffer_depletion_time = self._buffer_size / self._buffer_filling_rate # type: float
self._detection_time_frame = lower_bound_buffer_depletion_time
self._code_count_limit = self._buffer_size
print("Time Frame: %s" % self._detection_time_frame)
print("Code Limit: %s" % self._code_count_limit)
self._bad_frame_ranges = []
def process(self) -> None:
previous_state = None
cmd0_idx = 0
total_frame_time_in_ms = 0.0
cmd_count = 0
for idx, line in enumerate(self._all_lines):
cmd = Command(line, previous_state)
cmd.parse()
self._all_commands.append(cmd)
previous_state = cmd.get_after_state()
if not cmd.is_command:
continue
cmd_count += 1
if idx > cmd0_idx or idx == 0:
total_frame_time_in_ms += cmd.estimated_exec_time_in_ms
if total_frame_time_in_ms > 1000.0:
# Find the next starting command which makes the total execution time of the frame to be less than
# 1 second.
cmd0_idx += 1
total_frame_time_in_ms -= self._all_commands[cmd0_idx].estimated_exec_time_in_ms
cmd_count -= 1
while total_frame_time_in_ms > 1000.0:
cmd0_idx += 1
total_frame_time_in_ms -= self._all_commands[cmd0_idx].estimated_exec_time_in_ms
cmd_count -= 1
# If within the current time frame the code count exceeds the limit, record that.
if total_frame_time_in_ms <= self._detection_time_frame and cmd_count > self._code_count_limit:
need_to_append = True
if self._bad_frame_ranges:
last_item = self._bad_frame_ranges[-1]
if last_item["start_line"] == cmd0_idx:
last_item["end_line"] = idx
last_item["cmd_count"] = cmd_count
last_item["time_in_ms"] = total_frame_time_in_ms
need_to_append = False
if need_to_append:
self._bad_frame_ranges.append({"start_line": cmd0_idx,
"end_line": idx,
"cmd_count": cmd_count,
"time_in_ms": total_frame_time_in_ms})
def to_file(self, file_name: str) -> None:
all_lines = [str(c) for c in self._all_commands]
with open(file_name, "w", encoding = "utf-8") as f:
f.writelines(all_lines)
def report(self) -> None:
for item in self._bad_frame_ranges:
print("!!!!! potential bad frame from line %s to %s, code count = %s, in %s ms" % (
item["start_line"], item["end_line"], item["cmd_count"], round(item["time_in_ms"], 4)))
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: