Add typing for all functions and fields

Lots of typing so that MyPy can then check whether the functions are safe to call.

Contributes to issue CURA-5330.
This commit is contained in:
Ghostkeeper 2018-06-15 14:07:33 +02:00
parent 8256788ed8
commit 4c3d28709b
No known key found for this signature in database
GPG key ID: 5252B696FB5E7C7A

View file

@ -1,6 +1,13 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from collections import defaultdict
import os
from PyQt5.QtCore import QObject, QTimer, pyqtSlot
import sys
from time import time
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING
from UM.Backend.Backend import Backend, BackendState
from UM.Scene.SceneNode import SceneNode
from UM.Signal import Signal
@ -9,30 +16,29 @@ from UM.Message import Message
from UM.PluginRegistry import PluginRegistry
from UM.Resources import Resources
from UM.Platform import Platform
from UM.Scene.Iterator.DepthFirstIterator import DepthFirstIterator
from UM.Qt.Duration import DurationFormat
from PyQt5.QtCore import QObject, pyqtSlot
from UM.Scene.Iterator.DepthFirstIterator import DepthFirstIterator
from UM.Settings.SettingInstance import SettingInstance #For typing.
from UM.Tool import Tool #For typing.
from collections import defaultdict
from cura.CuraApplication import CuraApplication
from cura.Settings.ExtruderManager import ExtruderManager
from . import ProcessSlicedLayersJob
from . import StartSliceJob
import os
import sys
from time import time
from PyQt5.QtCore import QTimer
import Arcus
if TYPE_CHECKING:
from cura.Machines.Models.MultiBuildPlateModel import MultiBuildPlateModel
from cura.Machines.MachineErrorChecker import MachineErrorChecker
from UM.Scene.Scene import Scene
from UM.Settings.ContainerStack import ContainerStack
from UM.i18n import i18nCatalog
catalog = i18nCatalog("cura")
class CuraEngineBackend(QObject, Backend):
backendError = Signal()
## Starts the back-end plug-in.
@ -40,8 +46,9 @@ class CuraEngineBackend(QObject, Backend):
# This registers all the signal listeners and prepares for communication
# with the back-end in general.
# CuraEngineBackend is exposed to qml as well.
def __init__(self, parent = None):
super().__init__(parent = parent)
def __init__(self, parent = None) -> None:
super(QObject, self).__init__(parent = parent)
super(Backend, self).__init__()
# Find out where the engine is located, and how it is called.
# This depends on how Cura is packaged and which OS we are running on.
executable_name = "CuraEngine"
@ -61,9 +68,9 @@ class CuraEngineBackend(QObject, Backend):
default_engine_location = execpath
break
self._application = CuraApplication.getInstance()
self._multi_build_plate_model = None
self._machine_error_checker = None
self._application = CuraApplication.getInstance() #type: CuraApplication
self._multi_build_plate_model = None #type: MultiBuildPlateModel
self._machine_error_checker = None #type: MachineErrorChecker
if not default_engine_location:
raise EnvironmentError("Could not find CuraEngine")
@ -74,13 +81,13 @@ class CuraEngineBackend(QObject, Backend):
self._application.getPreferences().addPreference("backend/location", default_engine_location)
# Workaround to disable layer view processing if layer view is not active.
self._layer_view_active = False
self._layer_view_active = False #type: bool
self._onActiveViewChanged()
self._stored_layer_data = []
self._stored_optimized_layer_data = {} # key is build plate number, then arrays are stored until they go to the ProcessSlicesLayersJob
self._stored_layer_data = [] #type: List[Arcus.PythonMessage]
self._stored_optimized_layer_data = {} #type: Dict[int, List[Arcus.PythonMessage]] # key is build plate number, then arrays are stored until they go to the ProcessSlicesLayersJob
self._scene = self._application.getController().getScene()
self._scene = self._application.getController().getScene() #type: Scene
self._scene.sceneChanged.connect(self._onSceneChanged)
# Triggers for auto-slicing. Auto-slicing is triggered as follows:
@ -91,7 +98,7 @@ class CuraEngineBackend(QObject, Backend):
# If there is an error check, stop the auto-slicing timer, and only wait for the error check to be finished
# to start the auto-slicing timer again.
#
self._global_container_stack = None
self._global_container_stack = None #type: Optional[ContainerStack]
# Listeners for receiving messages from the back-end.
self._message_handlers["cura.proto.Layer"] = self._onLayerMessage
@ -102,31 +109,31 @@ class CuraEngineBackend(QObject, Backend):
self._message_handlers["cura.proto.PrintTimeMaterialEstimates"] = self._onPrintTimeMaterialEstimates
self._message_handlers["cura.proto.SlicingFinished"] = self._onSlicingFinishedMessage
self._start_slice_job = None
self._start_slice_job_build_plate = None
self._slicing = False # Are we currently slicing?
self._restart = False # Back-end is currently restarting?
self._tool_active = False # If a tool is active, some tasks do not have to do anything
self._always_restart = True # Always restart the engine when starting a new slice. Don't keep the process running. TODO: Fix engine statelessness.
self._process_layers_job = None # The currently active job to process layers, or None if it is not processing layers.
self._build_plates_to_be_sliced = [] # what needs slicing?
self._engine_is_fresh = True # Is the newly started engine used before or not?
self._start_slice_job = None #type: Optional[StartSliceJob]
self._start_slice_job_build_plate = None #type: Optional[StartSliceJob]
self._slicing = False #type: bool # Are we currently slicing?
self._restart = False #type: bool # Back-end is currently restarting?
self._tool_active = False #type: bool # If a tool is active, some tasks do not have to do anything
self._always_restart = True #type: bool # Always restart the engine when starting a new slice. Don't keep the process running. TODO: Fix engine statelessness.
self._process_layers_job = None #type: Optional[ProcessSlicedLayersJob] # The currently active job to process layers, or None if it is not processing layers.
self._build_plates_to_be_sliced = [] #type: List[int] # what needs slicing?
self._engine_is_fresh = True #type: bool # Is the newly started engine used before or not?
self._backend_log_max_lines = 20000 # Maximum number of lines to buffer
self._error_message = None # Pop-up message that shows errors.
self._last_num_objects = defaultdict(int) # Count number of objects to see if there is something changed
self._postponed_scene_change_sources = [] # scene change is postponed (by a tool)
self._backend_log_max_lines = 20000 #type: int # Maximum number of lines to buffer
self._error_message = None #type: Message # Pop-up message that shows errors.
self._last_num_objects = defaultdict(int) #type: Dict[int, int] # Count number of objects to see if there is something changed
self._postponed_scene_change_sources = [] #type: List[SceneNode] # scene change is postponed (by a tool)
self._slice_start_time = None
self._is_disabled = False
self._slice_start_time = None #type: Optional[float]
self._is_disabled = False #type: bool
self._application.getPreferences().addPreference("general/auto_slice", False)
self._use_timer = False
self._use_timer = False #type: bool
# When you update a setting and other settings get changed through inheritance, many propertyChanged signals are fired.
# This timer will group them up, and only slice for the last setting changed signal.
# TODO: Properly group propertyChanged signals by whether they are triggered by the same user interaction.
self._change_timer = QTimer()
self._change_timer = QTimer() #type: QTimer
self._change_timer.setSingleShot(True)
self._change_timer.setInterval(500)
self.determineAutoSlicing()
@ -134,7 +141,7 @@ class CuraEngineBackend(QObject, Backend):
self._application.initializationFinished.connect(self.initialize)
def initialize(self):
def initialize(self) -> None:
self._multi_build_plate_model = self._application.getMultiBuildPlateModel()
self._application.getController().activeViewChanged.connect(self._onActiveViewChanged)
@ -160,14 +167,14 @@ class CuraEngineBackend(QObject, Backend):
#
# This function should terminate the engine process.
# Called when closing the application.
def close(self):
def close(self) -> None:
# Terminate CuraEngine if it is still running at this point
self._terminate()
## Get the command that is used to call the engine.
# This is useful for debugging and used to actually start the engine.
# \return list of commands and args / parameters.
def getEngineCommand(self):
def getEngineCommand(self) -> List[str]:
json_path = Resources.getPath(Resources.DefinitionContainers, "fdmprinter.def.json")
return [self._application.getPreferences().getValue("backend/location"), "connect", "127.0.0.1:{0}".format(self._port), "-j", json_path, ""]
@ -184,7 +191,7 @@ class CuraEngineBackend(QObject, Backend):
slicingCancelled = Signal()
@pyqtSlot()
def stopSlicing(self):
def stopSlicing(self) -> None:
self.backendStateChange.emit(BackendState.NotStarted)
if self._slicing: # We were already slicing. Stop the old job.
self._terminate()
@ -200,12 +207,12 @@ class CuraEngineBackend(QObject, Backend):
## Manually triggers a reslice
@pyqtSlot()
def forceSlice(self):
def forceSlice(self) -> None:
self.markSliceAll()
self.slice()
## Perform a slice of the scene.
def slice(self):
def slice(self) -> None:
Logger.log("d", "Starting to slice...")
self._slice_start_time = time()
if not self._build_plates_to_be_sliced:
@ -262,7 +269,7 @@ class CuraEngineBackend(QObject, Backend):
## Terminate the engine process.
# Start the engine process by calling _createSocket()
def _terminate(self):
def _terminate(self) -> None:
self._slicing = False
self._stored_layer_data = []
if self._start_slice_job_build_plate in self._stored_optimized_layer_data:
@ -295,7 +302,7 @@ class CuraEngineBackend(QObject, Backend):
# bootstrapping of a slice job.
#
# \param job The start slice job that was just finished.
def _onStartSliceCompleted(self, job):
def _onStartSliceCompleted(self, job: StartSliceJob) -> None:
if self._error_message:
self._error_message.hide()
@ -411,7 +418,7 @@ class CuraEngineBackend(QObject, Backend):
# It disables when
# - preference auto slice is off
# - decorator isBlockSlicing is found (used in g-code reader)
def determineAutoSlicing(self):
def determineAutoSlicing(self) -> bool:
enable_timer = True
self._is_disabled = False
@ -437,7 +444,7 @@ class CuraEngineBackend(QObject, Backend):
return False
## Return a dict with number of objects per build plate
def _numObjectsPerBuildPlate(self):
def _numObjectsPerBuildPlate(self) -> Dict[int, int]:
num_objects = defaultdict(int)
for node in DepthFirstIterator(self._scene.getRoot()):
# Only count sliceable objects
@ -451,7 +458,7 @@ class CuraEngineBackend(QObject, Backend):
# This should start a slice if the scene is now ready to slice.
#
# \param source The scene node that was changed.
def _onSceneChanged(self, source):
def _onSceneChanged(self, source: SceneNode) -> None:
if not isinstance(source, SceneNode):
return
@ -506,7 +513,7 @@ class CuraEngineBackend(QObject, Backend):
## Called when an error occurs in the socket connection towards the engine.
#
# \param error The exception that occurred.
def _onSocketError(self, error):
def _onSocketError(self, error: Arcus.Error) -> None:
if self._application.isShuttingDown():
return
@ -521,19 +528,19 @@ class CuraEngineBackend(QObject, Backend):
Logger.log("w", "A socket error caused the connection to be reset")
## Remove old layer data (if any)
def _clearLayerData(self, build_plate_numbers = set()):
def _clearLayerData(self, build_plate_numbers: Set = None) -> None:
for node in DepthFirstIterator(self._scene.getRoot()):
if node.callDecoration("getLayerData"):
if not build_plate_numbers or node.callDecoration("getBuildPlateNumber") in build_plate_numbers:
node.getParent().removeChild(node)
def markSliceAll(self):
def markSliceAll(self) -> None:
for build_plate_number in range(self._application.getMultiBuildPlateModel().maxBuildPlate + 1):
if build_plate_number not in self._build_plates_to_be_sliced:
self._build_plates_to_be_sliced.append(build_plate_number)
## Convenient function: mark everything to slice, emit state and clear layer data
def needsSlicing(self):
def needsSlicing(self) -> None:
self.stopSlicing()
self.markSliceAll()
self.processingProgress.emit(0.0)
@ -545,7 +552,7 @@ class CuraEngineBackend(QObject, Backend):
## A setting has changed, so check if we must reslice.
# \param instance The setting instance that has changed.
# \param property The property of the setting instance that has changed.
def _onSettingChanged(self, instance, property):
def _onSettingChanged(self, instance: SettingInstance, property: str) -> None:
if property == "value": # Only reslice if the value has changed.
self.needsSlicing()
self._onChanged()
@ -554,7 +561,7 @@ class CuraEngineBackend(QObject, Backend):
if self._use_timer:
self._change_timer.stop()
def _onStackErrorCheckFinished(self):
def _onStackErrorCheckFinished(self) -> None:
self.determineAutoSlicing()
if self._is_disabled:
return
@ -566,13 +573,13 @@ class CuraEngineBackend(QObject, Backend):
## Called when a sliced layer data message is received from the engine.
#
# \param message The protobuf message containing sliced layer data.
def _onLayerMessage(self, message):
def _onLayerMessage(self, message: Arcus.PythonMessage) -> None:
self._stored_layer_data.append(message)
## Called when an optimized sliced layer data message is received from the engine.
#
# \param message The protobuf message containing sliced layer data.
def _onOptimizedLayerMessage(self, message):
def _onOptimizedLayerMessage(self, message: Arcus.PythonMessage) -> None:
if self._start_slice_job_build_plate not in self._stored_optimized_layer_data:
self._stored_optimized_layer_data[self._start_slice_job_build_plate] = []
self._stored_optimized_layer_data[self._start_slice_job_build_plate].append(message)
@ -580,11 +587,11 @@ class CuraEngineBackend(QObject, Backend):
## Called when a progress message is received from the engine.
#
# \param message The protobuf message containing the slicing progress.
def _onProgressMessage(self, message):
def _onProgressMessage(self, message: Arcus.PythonMessage) -> None:
self.processingProgress.emit(message.amount)
self.backendStateChange.emit(BackendState.Processing)
def _invokeSlice(self):
def _invokeSlice(self) -> None:
if self._use_timer:
# if the error check is scheduled, wait for the error check finish signal to trigger auto-slice,
# otherwise business as usual
@ -600,7 +607,7 @@ class CuraEngineBackend(QObject, Backend):
## Called when the engine sends a message that slicing is finished.
#
# \param message The protobuf message signalling that slicing is finished.
def _onSlicingFinishedMessage(self, message):
def _onSlicingFinishedMessage(self, message: Arcus.PythonMessage) -> None:
self.backendStateChange.emit(BackendState.Done)
self.processingProgress.emit(1.0)
@ -641,25 +648,25 @@ class CuraEngineBackend(QObject, Backend):
## Called when a g-code message is received from the engine.
#
# \param message The protobuf message containing g-code, encoded as UTF-8.
def _onGCodeLayerMessage(self, message):
def _onGCodeLayerMessage(self, message: Arcus.PythonMessage) -> None:
self._scene.gcode_dict[self._start_slice_job_build_plate].append(message.data.decode("utf-8", "replace"))
## Called when a g-code prefix message is received from the engine.
#
# \param message The protobuf message containing the g-code prefix,
# encoded as UTF-8.
def _onGCodePrefixMessage(self, message):
def _onGCodePrefixMessage(self, message: Arcus.PythonMessage) -> None:
self._scene.gcode_dict[self._start_slice_job_build_plate].insert(0, message.data.decode("utf-8", "replace"))
## Creates a new socket connection.
def _createSocket(self):
def _createSocket(self) -> None:
super()._createSocket(os.path.abspath(os.path.join(PluginRegistry.getInstance().getPluginPath(self.getPluginId()), "Cura.proto")))
self._engine_is_fresh = True
## Called when anything has changed to the stuff that needs to be sliced.
#
# This indicates that we should probably re-slice soon.
def _onChanged(self, *args, **kwargs):
def _onChanged(self, *args: Any, **kwargs: Any) -> None:
self.needsSlicing()
if self._use_timer:
# if the error check is scheduled, wait for the error check finish signal to trigger auto-slice,
@ -677,7 +684,7 @@ class CuraEngineBackend(QObject, Backend):
#
# \param message The protobuf message containing the print time per feature and
# material amount per extruder
def _onPrintTimeMaterialEstimates(self, message):
def _onPrintTimeMaterialEstimates(self, message: Arcus.PythonMessage) -> None:
material_amounts = []
for index in range(message.repeatedMessageCount("materialEstimates")):
material_amounts.append(message.getRepeatedMessage("materialEstimates", index).material_amount)
@ -688,7 +695,7 @@ class CuraEngineBackend(QObject, Backend):
## Called for parsing message to retrieve estimated time per feature
#
# \param message The protobuf message containing the print time per feature
def _parseMessagePrintTimes(self, message):
def _parseMessagePrintTimes(self, message: Arcus.PythonMessage) -> Dict[str, float]:
result = {
"inset_0": message.time_inset_0,
"inset_x": message.time_inset_x,
@ -705,7 +712,7 @@ class CuraEngineBackend(QObject, Backend):
return result
## Called when the back-end connects to the front-end.
def _onBackendConnected(self):
def _onBackendConnected(self) -> None:
if self._restart:
self._restart = False
self._onChanged()
@ -716,7 +723,7 @@ class CuraEngineBackend(QObject, Backend):
# continuously slicing while the user is dragging some tool handle.
#
# \param tool The tool that the user is using.
def _onToolOperationStarted(self, tool):
def _onToolOperationStarted(self, tool: Tool) -> None:
self._tool_active = True # Do not react on scene change
self.disableTimer()
# Restart engine as soon as possible, we know we want to slice afterwards
@ -729,7 +736,7 @@ class CuraEngineBackend(QObject, Backend):
# This indicates that we can safely start slicing again.
#
# \param tool The tool that the user was using.
def _onToolOperationStopped(self, tool):
def _onToolOperationStopped(self, tool: Tool) -> None:
self._tool_active = False # React on scene change again
self.determineAutoSlicing() # Switch timer on if appropriate
# Process all the postponed scene changes
@ -737,14 +744,14 @@ class CuraEngineBackend(QObject, Backend):
source = self._postponed_scene_change_sources.pop(0)
self._onSceneChanged(source)
def _startProcessSlicedLayersJob(self, build_plate_number):
def _startProcessSlicedLayersJob(self, build_plate_number: int) -> None:
self._process_layers_job = ProcessSlicedLayersJob.ProcessSlicedLayersJob(self._stored_optimized_layer_data[build_plate_number])
self._process_layers_job.setBuildPlate(build_plate_number)
self._process_layers_job.finished.connect(self._onProcessLayersFinished)
self._process_layers_job.start()
## Called when the user changes the active view mode.
def _onActiveViewChanged(self):
def _onActiveViewChanged(self) -> None:
view = self._application.getController().getActiveView()
if view:
active_build_plate = self._application.getMultiBuildPlateModel().activeBuildPlate
@ -765,14 +772,14 @@ class CuraEngineBackend(QObject, Backend):
## Called when the back-end self-terminates.
#
# We should reset our state and start listening for new connections.
def _onBackendQuit(self):
def _onBackendQuit(self) -> None:
if not self._restart:
if self._process:
Logger.log("d", "Backend quit with return code %s. Resetting process and socket.", self._process.wait())
self._process = None
## Called when the global container stack changes
def _onGlobalStackChanged(self):
def _onGlobalStackChanged(self) -> None:
if self._global_container_stack:
self._global_container_stack.propertyChanged.disconnect(self._onSettingChanged)
self._global_container_stack.containersChanged.disconnect(self._onChanged)
@ -793,26 +800,26 @@ class CuraEngineBackend(QObject, Backend):
extruder.containersChanged.connect(self._onChanged)
self._onChanged()
def _onProcessLayersFinished(self, job):
def _onProcessLayersFinished(self, job: ProcessSlicedLayersJob) -> None:
del self._stored_optimized_layer_data[job.getBuildPlate()]
self._process_layers_job = None
Logger.log("d", "See if there is more to slice(2)...")
self._invokeSlice()
## Connect slice function to timer.
def enableTimer(self):
def enableTimer(self) -> None:
if not self._use_timer:
self._change_timer.timeout.connect(self.slice)
self._use_timer = True
## Disconnect slice function from timer.
# This means that slicing will not be triggered automatically
def disableTimer(self):
def disableTimer(self) -> None:
if self._use_timer:
self._use_timer = False
self._change_timer.timeout.disconnect(self.slice)
def _onPreferencesChanged(self, preference):
def _onPreferencesChanged(self, preference: str) -> None:
if preference != "general/auto_slice":
return
auto_slice = self.determineAutoSlicing()
@ -820,11 +827,11 @@ class CuraEngineBackend(QObject, Backend):
self._change_timer.start()
## Tickle the backend so in case of auto slicing, it starts the timer.
def tickle(self):
def tickle(self) -> None:
if self._use_timer:
self._change_timer.start()
def _extruderChanged(self):
def _extruderChanged(self) -> None:
for build_plate_number in range(self._multi_build_plate_model.maxBuildPlate + 1):
if build_plate_number not in self._build_plates_to_be_sliced:
self._build_plates_to_be_sliced.append(build_plate_number)