Fix the wrong types in this class

These mistakes were found by MyPy.

Contributes to issue CURA-5330.
This commit is contained in:
Ghostkeeper 2018-06-15 14:25:25 +02:00
parent 4c3d28709b
commit 0d52b03716
No known key found for this signature in database
GPG key ID: 5252B696FB5E7C7A

View file

@ -6,7 +6,7 @@ import os
from PyQt5.QtCore import QObject, QTimer, pyqtSlot
import sys
from time import time
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING
from typing import Any, cast, Dict, List, Optional, Set, TYPE_CHECKING
from UM.Backend.Backend import Backend, BackendState
from UM.Scene.SceneNode import SceneNode
@ -18,13 +18,14 @@ from UM.Resources import Resources
from UM.Platform import Platform
from UM.Qt.Duration import DurationFormat
from UM.Scene.Iterator.DepthFirstIterator import DepthFirstIterator
from UM.Settings.DefinitionContainerInterface import DefinitionContainerInterface
from UM.Settings.SettingInstance import SettingInstance #For typing.
from UM.Tool import Tool #For typing.
from cura.CuraApplication import CuraApplication
from cura.Settings.ExtruderManager import ExtruderManager
from . import ProcessSlicedLayersJob
from . import StartSliceJob
from .ProcessSlicedLayersJob import ProcessSlicedLayersJob
from .StartSliceJob import StartSliceJob, StartJobResult
import Arcus
@ -46,9 +47,8 @@ class CuraEngineBackend(QObject, Backend):
# This registers all the signal listeners and prepares for communication
# with the back-end in general.
# CuraEngineBackend is exposed to qml as well.
def __init__(self, parent = None) -> None:
super(QObject, self).__init__(parent = parent)
super(Backend, self).__init__()
def __init__(self) -> None:
super().__init__()
# Find out where the engine is located, and how it is called.
# This depends on how Cura is packaged and which OS we are running on.
executable_name = "CuraEngine"
@ -110,7 +110,7 @@ class CuraEngineBackend(QObject, Backend):
self._message_handlers["cura.proto.SlicingFinished"] = self._onSlicingFinishedMessage
self._start_slice_job = None #type: Optional[StartSliceJob]
self._start_slice_job_build_plate = None #type: Optional[StartSliceJob]
self._start_slice_job_build_plate = None #type: Optional[int]
self._slicing = False #type: bool # Are we currently slicing?
self._restart = False #type: bool # Back-end is currently restarting?
self._tool_active = False #type: bool # If a tool is active, some tasks do not have to do anything
@ -197,7 +197,7 @@ class CuraEngineBackend(QObject, Backend):
self._terminate()
self._createSocket()
if self._process_layers_job: # We were processing layers. Stop that, the layers are going to change soon.
if self._process_layers_job is not None: # We were processing layers. Stop that, the layers are going to change soon.
Logger.log("d", "Aborting process layers job...")
self._process_layers_job.abort()
self._process_layers_job = None
@ -225,7 +225,7 @@ class CuraEngineBackend(QObject, Backend):
return
if not hasattr(self._scene, "gcode_dict"):
self._scene.gcode_dict = {}
self._scene.gcode_dict = {} #type: ignore #Because we are creating the missing attribute here.
# see if we really have to slice
active_build_plate = self._application.getMultiBuildPlateModel().activeBuildPlate
@ -237,7 +237,7 @@ class CuraEngineBackend(QObject, Backend):
self._stored_optimized_layer_data[build_plate_to_be_sliced] = []
if build_plate_to_be_sliced not in num_objects or num_objects[build_plate_to_be_sliced] == 0:
self._scene.gcode_dict[build_plate_to_be_sliced] = []
self._scene.gcode_dict[build_plate_to_be_sliced] = [] #type: ignore #Because we created this attribute above.
Logger.log("d", "Build plate %s has no objects to be sliced, skipping", build_plate_to_be_sliced)
if self._build_plates_to_be_sliced:
self.slice()
@ -254,14 +254,14 @@ class CuraEngineBackend(QObject, Backend):
self.processingProgress.emit(0.0)
self.backendStateChange.emit(BackendState.NotStarted)
self._scene.gcode_dict[build_plate_to_be_sliced] = [] #[] indexed by build plate number
self._scene.gcode_dict[build_plate_to_be_sliced] = [] #type: ignore #[] indexed by build plate number
self._slicing = True
self.slicingStarted.emit()
self.determineAutoSlicing() # Switch timer on or off if appropriate
slice_message = self._socket.createMessage("cura.proto.Slice")
self._start_slice_job = StartSliceJob.StartSliceJob(slice_message)
self._start_slice_job = StartSliceJob(slice_message)
self._start_slice_job_build_plate = build_plate_to_be_sliced
self._start_slice_job.setBuildPlate(self._start_slice_job_build_plate)
self._start_slice_job.start()
@ -310,12 +310,12 @@ class CuraEngineBackend(QObject, Backend):
if self._start_slice_job is job:
self._start_slice_job = None
if job.isCancelled() or job.getError() or job.getResult() == StartSliceJob.StartJobResult.Error:
if job.isCancelled() or job.getError() or job.getResult() == StartJobResult.Error:
self.backendStateChange.emit(BackendState.Error)
self.backendError.emit(job)
return
if job.getResult() == StartSliceJob.StartJobResult.MaterialIncompatible:
if job.getResult() == StartJobResult.MaterialIncompatible:
if self._application.platformActivity:
self._error_message = Message(catalog.i18nc("@info:status",
"Unable to slice with the current material as it is incompatible with the selected machine or configuration."), title = catalog.i18nc("@info:title", "Unable to slice"))
@ -326,10 +326,10 @@ class CuraEngineBackend(QObject, Backend):
self.backendStateChange.emit(BackendState.NotStarted)
return
if job.getResult() == StartSliceJob.StartJobResult.SettingError:
if job.getResult() == StartJobResult.SettingError:
if self._application.platformActivity:
extruders = list(ExtruderManager.getInstance().getMachineExtruders(self._global_container_stack.getId()))
error_keys = []
error_keys = [] #type: List[str]
for extruder in extruders:
error_keys.extend(extruder.getErrorKeys())
if not extruders:
@ -337,7 +337,7 @@ class CuraEngineBackend(QObject, Backend):
error_labels = set()
for key in error_keys:
for stack in [self._global_container_stack] + extruders: #Search all container stacks for the definition of this setting. Some are only in an extruder stack.
definitions = stack.getBottom().findDefinitions(key = key)
definitions = cast(DefinitionContainerInterface, stack.getBottom()).findDefinitions(key = key)
if definitions:
break #Found it! No need to continue search.
else: #No stack has a definition for this setting.
@ -345,8 +345,7 @@ class CuraEngineBackend(QObject, Backend):
continue
error_labels.add(definitions[0].label)
error_labels = ", ".join(error_labels)
self._error_message = Message(catalog.i18nc("@info:status", "Unable to slice with the current settings. The following settings have errors: {0}").format(error_labels),
self._error_message = Message(catalog.i18nc("@info:status", "Unable to slice with the current settings. The following settings have errors: {0}").format(", ".join(error_labels)),
title = catalog.i18nc("@info:title", "Unable to slice"))
self._error_message.show()
self.backendStateChange.emit(BackendState.Error)
@ -355,28 +354,27 @@ class CuraEngineBackend(QObject, Backend):
self.backendStateChange.emit(BackendState.NotStarted)
return
elif job.getResult() == StartSliceJob.StartJobResult.ObjectSettingError:
elif job.getResult() == StartJobResult.ObjectSettingError:
errors = {}
for node in DepthFirstIterator(self._application.getController().getScene().getRoot()):
for node in DepthFirstIterator(self._application.getController().getScene().getRoot()): #type: ignore #Ignore type error because iter() should get called automatically by Python syntax.
stack = node.callDecoration("getStack")
if not stack:
continue
for key in stack.getErrorKeys():
definition = self._global_container_stack.getBottom().findDefinitions(key = key)
definition = cast(DefinitionContainerInterface, self._global_container_stack.getBottom()).findDefinitions(key = key)
if not definition:
Logger.log("e", "When checking settings for errors, unable to find definition for key {key} in per-object stack.".format(key = key))
continue
definition = definition[0]
errors[key] = definition.label
error_labels = ", ".join(errors.values())
self._error_message = Message(catalog.i18nc("@info:status", "Unable to slice due to some per-model settings. The following settings have errors on one or more models: {error_labels}").format(error_labels = error_labels),
self._error_message = Message(catalog.i18nc("@info:status", "Unable to slice due to some per-model settings. The following settings have errors on one or more models: {error_labels}").format(error_labels = ", ".join(errors.values())),
title = catalog.i18nc("@info:title", "Unable to slice"))
self._error_message.show()
self.backendStateChange.emit(BackendState.Error)
self.backendError.emit(job)
return
if job.getResult() == StartSliceJob.StartJobResult.BuildPlateError:
if job.getResult() == StartJobResult.BuildPlateError:
if self._application.platformActivity:
self._error_message = Message(catalog.i18nc("@info:status", "Unable to slice because the prime tower or prime position(s) are invalid."),
title = catalog.i18nc("@info:title", "Unable to slice"))
@ -386,7 +384,7 @@ class CuraEngineBackend(QObject, Backend):
else:
self.backendStateChange.emit(BackendState.NotStarted)
if job.getResult() == StartSliceJob.StartJobResult.ObjectsWithDisabledExtruder:
if job.getResult() == StartJobResult.ObjectsWithDisabledExtruder:
self._error_message = Message(catalog.i18nc("@info:status", "Unable to slice because there are objects associated with disabled Extruder %s." % job.getMessage()),
title = catalog.i18nc("@info:title", "Unable to slice"))
self._error_message.show()
@ -394,7 +392,7 @@ class CuraEngineBackend(QObject, Backend):
self.backendError.emit(job)
return
if job.getResult() == StartSliceJob.StartJobResult.NothingToSlice:
if job.getResult() == StartJobResult.NothingToSlice:
if self._application.platformActivity:
self._error_message = Message(catalog.i18nc("@info:status", "Nothing to slice because none of the models fit the build volume. Please scale or rotate models to fit."),
title = catalog.i18nc("@info:title", "Unable to slice"))
@ -424,14 +422,14 @@ class CuraEngineBackend(QObject, Backend):
if not self._application.getPreferences().getValue("general/auto_slice"):
enable_timer = False
for node in DepthFirstIterator(self._scene.getRoot()):
for node in DepthFirstIterator(self._scene.getRoot()): #type: ignore #Ignore type error because iter() should get called automatically by Python syntax.
if node.callDecoration("isBlockSlicing"):
enable_timer = False
self.backendStateChange.emit(BackendState.Disabled)
self._is_disabled = True
gcode_list = node.callDecoration("getGCodeList")
if gcode_list is not None:
self._scene.gcode_dict[node.callDecoration("getBuildPlateNumber")] = gcode_list
self._scene.gcode_dict[node.callDecoration("getBuildPlateNumber")] = gcode_list #type: ignore #Because we generate this attribute dynamically.
if self._use_timer == enable_timer:
return self._use_timer
@ -445,8 +443,8 @@ class CuraEngineBackend(QObject, Backend):
## Return a dict with number of objects per build plate
def _numObjectsPerBuildPlate(self) -> Dict[int, int]:
num_objects = defaultdict(int)
for node in DepthFirstIterator(self._scene.getRoot()):
num_objects = defaultdict(int) #type: Dict[int, int]
for node in DepthFirstIterator(self._scene.getRoot()): #type: ignore #Ignore type error because iter() should get called automatically by Python syntax.
# Only count sliceable objects
if node.callDecoration("isSliceable"):
build_plate_number = node.callDecoration("getBuildPlateNumber")
@ -529,7 +527,7 @@ class CuraEngineBackend(QObject, Backend):
## Remove old layer data (if any)
def _clearLayerData(self, build_plate_numbers: Set = None) -> None:
for node in DepthFirstIterator(self._scene.getRoot()):
for node in DepthFirstIterator(self._scene.getRoot()): #type: ignore #Ignore type error because iter() should get called automatically by Python syntax.
if node.callDecoration("getLayerData"):
if not build_plate_numbers or node.callDecoration("getBuildPlateNumber") in build_plate_numbers:
node.getParent().removeChild(node)
@ -611,7 +609,7 @@ class CuraEngineBackend(QObject, Backend):
self.backendStateChange.emit(BackendState.Done)
self.processingProgress.emit(1.0)
gcode_list = self._scene.gcode_dict[self._start_slice_job_build_plate]
gcode_list = self._scene.gcode_dict[self._start_slice_job_build_plate] #type: ignore #Because we generate this attribute dynamically.
for index, line in enumerate(gcode_list):
replaced = line.replace("{print_time}", str(self._application.getPrintInformation().currentPrintTime.getDisplayString(DurationFormat.Format.ISO8601)))
replaced = replaced.replace("{filament_amount}", str(self._application.getPrintInformation().materialLengths))
@ -649,18 +647,20 @@ class CuraEngineBackend(QObject, Backend):
#
# \param message The protobuf message containing g-code, encoded as UTF-8.
def _onGCodeLayerMessage(self, message: Arcus.PythonMessage) -> None:
self._scene.gcode_dict[self._start_slice_job_build_plate].append(message.data.decode("utf-8", "replace"))
self._scene.gcode_dict[self._start_slice_job_build_plate].append(message.data.decode("utf-8", "replace")) #type: ignore #Because we generate this attribute dynamically.
## Called when a g-code prefix message is received from the engine.
#
# \param message The protobuf message containing the g-code prefix,
# encoded as UTF-8.
def _onGCodePrefixMessage(self, message: Arcus.PythonMessage) -> None:
self._scene.gcode_dict[self._start_slice_job_build_plate].insert(0, message.data.decode("utf-8", "replace"))
self._scene.gcode_dict[self._start_slice_job_build_plate].insert(0, message.data.decode("utf-8", "replace")) #type: ignore #Because we generate this attribute dynamically.
## Creates a new socket connection.
def _createSocket(self) -> None:
super()._createSocket(os.path.abspath(os.path.join(PluginRegistry.getInstance().getPluginPath(self.getPluginId()), "Cura.proto")))
def _createSocket(self, protocol_file: str = None) -> None:
if not protocol_file:
protocol_file = os.path.abspath(os.path.join(PluginRegistry.getInstance().getPluginPath(self.getPluginId()), "Cura.proto"))
super()._createSocket(protocol_file)
self._engine_is_fresh = True
## Called when anything has changed to the stuff that needs to be sliced.
@ -745,7 +745,7 @@ class CuraEngineBackend(QObject, Backend):
self._onSceneChanged(source)
def _startProcessSlicedLayersJob(self, build_plate_number: int) -> None:
self._process_layers_job = ProcessSlicedLayersJob.ProcessSlicedLayersJob(self._stored_optimized_layer_data[build_plate_number])
self._process_layers_job = ProcessSlicedLayersJob(self._stored_optimized_layer_data[build_plate_number])
self._process_layers_job.setBuildPlate(build_plate_number)
self._process_layers_job.finished.connect(self._onProcessLayersFinished)
self._process_layers_job.start()