Fix typing related to Network Printing

Contributes to issue CURA-5330.
This commit is contained in:
Ghostkeeper 2018-06-15 16:53:45 +02:00
parent b07db74011
commit e717abf499
No known key found for this signature in database
GPG key ID: 5252B696FB5E7C7A
4 changed files with 104 additions and 110 deletions

View file

@ -1,15 +1,12 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Generator
from typing import Set
from typing import Tuple
from typing import Union
from typing import Any, cast, Set, Tuple, Union
from UM.FileHandler.FileHandler import FileHandler
from UM.FileHandler.FileWriter import FileWriter #To choose based on the output file mode (text vs. binary).
from UM.FileHandler.WriteFileJob import WriteFileJob #To call the file writer asynchronously.
from UM.Logger import Logger
from UM.Application import Application
from UM.Settings.ContainerRegistry import ContainerRegistry
from UM.i18n import i18nCatalog
from UM.Message import Message
@ -18,6 +15,7 @@ from UM.OutputDevice import OutputDeviceError #To show that something went wrong
from UM.Scene.SceneNode import SceneNode #For typing.
from UM.Version import Version #To check against firmware versions for support.
from cura.CuraApplication import CuraApplication
from cura.PrinterOutput.NetworkedPrinterOutputDevice import NetworkedPrinterOutputDevice, AuthState
from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
from cura.PrinterOutput.PrintJobOutputModel import PrintJobOutputModel
@ -30,7 +28,7 @@ from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
from PyQt5.QtGui import QDesktopServices
from PyQt5.QtCore import pyqtSlot, QUrl, pyqtSignal, pyqtProperty, QObject
from time import time, sleep
from time import time
from datetime import datetime
from typing import Optional, Dict, List
@ -55,7 +53,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._number_of_extruders = 2
self._dummy_lambdas = set() # type: Set[Tuple[str, Dict, Union[io.StringIO, io.BytesIO]]]
self._dummy_lambdas = ("", {}, io.BytesIO()) #type: Tuple[str, Dict, Union[io.StringIO, io.BytesIO]]
self._print_jobs = [] # type: List[PrintJobOutputModel]
@ -65,18 +63,18 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
# See comments about this hack with the clusterPrintersChanged signal
self.printersChanged.connect(self.clusterPrintersChanged)
self._accepts_commands = True
self._accepts_commands = True #type: bool
# Cluster does not have authentication, so default to authenticated
self._authentication_state = AuthState.Authenticated
self._error_message = None
self._write_job_progress_message = None
self._progress_message = None
self._error_message = None #type: Optional[Message]
self._write_job_progress_message = None #type: Optional[Message]
self._progress_message = None #type: Optional[Message]
self._active_printer = None # type: Optional[PrinterOutputModel]
self._printer_selection_dialog = None
self._printer_selection_dialog = None #type: QObject
self.setPriority(3) # Make sure the output device gets selected above local file output
self.setName(self._id)
@ -91,7 +89,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._cluster_size = int(properties.get(b"cluster_size", 0))
self._latest_reply_handler = None
self._latest_reply_handler = None #type: Optional[QNetworkReply]
def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
self.writeStarted.emit(self)
@ -100,10 +98,10 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
if file_handler:
file_formats = file_handler.getSupportedFileTypesWrite()
else:
file_formats = Application.getInstance().getMeshFileHandler().getSupportedFileTypesWrite()
file_formats = CuraApplication.getInstance().getMeshFileHandler().getSupportedFileTypesWrite()
#Create a list from the supported file formats string.
machine_file_formats = Application.getInstance().getGlobalContainerStack().getMetaDataEntry("file_formats").split(";")
machine_file_formats = CuraApplication.getInstance().getGlobalContainerStack().getMetaDataEntry("file_formats").split(";")
machine_file_formats = [file_type.strip() for file_type in machine_file_formats]
#Exception for UM3 firmware version >=4.4: UFP is now supported and should be the preferred file format.
if "application/x-ufp" not in machine_file_formats and self.printerType == "ultimaker3" and Version(self.firmwareVersion) >= Version("4.4"):
@ -120,9 +118,9 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
#Just take the first file format available.
if file_handler is not None:
writer = file_handler.getWriterByMimeType(preferred_format["mime_type"])
writer = file_handler.getWriterByMimeType(cast(str, preferred_format["mime_type"]))
else:
writer = Application.getInstance().getMeshFileHandler().getWriterByMimeType(preferred_format["mime_type"])
writer = CuraApplication.getInstance().getMeshFileHandler().getWriterByMimeType(cast(str, preferred_format["mime_type"]))
#This function pauses with the yield, waiting on instructions on which printer it needs to print with.
self._sending_job = self._sendPrintJob(writer, preferred_format, nodes)
@ -138,7 +136,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
def _spawnPrinterSelectionDialog(self):
if self._printer_selection_dialog is None:
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "PrintWindow.qml")
self._printer_selection_dialog = Application.getInstance().createQmlComponent(path, {"OutputDevice": self})
self._printer_selection_dialog = CuraApplication.getInstance().createQmlComponent(path, {"OutputDevice": self})
if self._printer_selection_dialog is not None:
self._printer_selection_dialog.show()
@ -173,8 +171,6 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._error_message = Message(
i18n_catalog.i18nc("@info:status",
"Sending new jobs (temporarily) blocked, still sending the previous print job."))
assert(self._error_message is not None)
self._error_message.show()
yield #Wait on the user to select a target printer.
yield #Wait for the write job to be finished.
@ -195,8 +191,6 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._write_job_progress_message = Message(i18n_catalog.i18nc("@info:status", "Sending data to printer"), lifetime = 0, dismissable = False, progress = -1,
title = i18n_catalog.i18nc("@info:title", "Sending Data"), use_inactivity_timer = False)
assert(self._write_job_progress_message is not None) # use for typing purposes
self._write_job_progress_message.show()
self._dummy_lambdas = (target_printer, preferred_format, stream)
@ -207,9 +201,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
yield True #Return that we had success!
yield #To prevent having to catch the StopIteration exception.
from cura.Utils.Threading import call_on_qt_thread
def _sendPrintJobWaitOnWriteJobFinished(self, job):
def _sendPrintJobWaitOnWriteJobFinished(self, job: WriteFileJob) -> None:
self._write_job_progress_message.hide()
self._progress_message = Message(i18n_catalog.i18nc("@info:status", "Sending data to printer"), lifetime = 0, dismissable = False, progress = -1,
@ -230,34 +222,35 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
# Add user name to the print_job
parts.append(self._createFormPart("name=owner", bytes(self._getUserName(), "utf-8"), "text/plain"))
file_name = Application.getInstance().getPrintInformation().jobName + "." + preferred_format["extension"]
file_name = CuraApplication.getInstance().getPrintInformation().jobName + "." + preferred_format["extension"]
output = stream.getvalue() #Either str or bytes depending on the output mode.
if isinstance(stream, io.StringIO):
output = output.encode("utf-8")
output = cast(str, output).encode("utf-8")
output = cast(bytes, output)
parts.append(self._createFormPart("name=\"file\"; filename=\"%s\"" % file_name, output))
self._latest_reply_handler = self.postFormWithParts("print_jobs/", parts, onFinished=self._onPostPrintJobFinished, onProgress=self._onUploadPrintJobProgress)
self._latest_reply_handler = self.postFormWithParts("print_jobs/", parts, on_finished = self._onPostPrintJobFinished, on_progress = self._onUploadPrintJobProgress)
@pyqtProperty(QObject, notify=activePrinterChanged)
@pyqtProperty(QObject, notify = activePrinterChanged)
def activePrinter(self) -> Optional[PrinterOutputModel]:
return self._active_printer
@pyqtSlot(QObject)
def setActivePrinter(self, printer: Optional[PrinterOutputModel]):
def setActivePrinter(self, printer: Optional[PrinterOutputModel]) -> None:
if self._active_printer != printer:
if self._active_printer and self._active_printer.camera:
self._active_printer.camera.stop()
self._active_printer = printer
self.activePrinterChanged.emit()
def _onPostPrintJobFinished(self, reply):
def _onPostPrintJobFinished(self, reply: QNetworkReply) -> None:
self._progress_message.hide()
self._compressing_gcode = False
self._sending_gcode = False
def _onUploadPrintJobProgress(self, bytes_sent:int, bytes_total:int):
def _onUploadPrintJobProgress(self, bytes_sent: int, bytes_total: int) -> None:
if bytes_total > 0:
new_progress = bytes_sent / bytes_total * 100
# Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get
@ -284,14 +277,14 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._progress_message.setProgress(0)
self._progress_message.hide()
def _progressMessageActionTriggered(self, message_id: Optional[str]=None, action_id: Optional[str]=None) -> None:
def _progressMessageActionTriggered(self, message_id: Optional[str] = None, action_id: Optional[str] = None) -> None:
if action_id == "Abort":
Logger.log("d", "User aborted sending print to remote.")
if self._progress_message is not None:
self._progress_message.hide()
self._compressing_gcode = False
self._sending_gcode = False
Application.getInstance().getController().setActiveStage("PrepareStage")
CuraApplication.getInstance().getController().setActiveStage("PrepareStage")
# After compressing the sliced model Cura sends data to printer, to stop receiving updates from the request
# the "reply" should be disconnected
@ -299,9 +292,9 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._latest_reply_handler.disconnect()
self._latest_reply_handler = None
def _successMessageActionTriggered(self, message_id: Optional[str]=None, action_id: Optional[str]=None) -> None:
def _successMessageActionTriggered(self, message_id: Optional[str] = None, action_id: Optional[str] = None) -> None:
if action_id == "View":
Application.getInstance().getController().setActiveStage("MonitorStage")
CuraApplication.getInstance().getController().setActiveStage("MonitorStage")
@pyqtSlot()
def openPrintJobControlPanel(self) -> None:
@ -313,19 +306,19 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
Logger.log("d", "Opening printer control panel...")
QDesktopServices.openUrl(QUrl("http://" + self._address + "/printers"))
@pyqtProperty("QVariantList", notify=printJobsChanged)
def printJobs(self)-> List[PrintJobOutputModel] :
@pyqtProperty("QVariantList", notify = printJobsChanged)
def printJobs(self)-> List[PrintJobOutputModel]:
return self._print_jobs
@pyqtProperty("QVariantList", notify=printJobsChanged)
@pyqtProperty("QVariantList", notify = printJobsChanged)
def queuedPrintJobs(self) -> List[PrintJobOutputModel]:
return [print_job for print_job in self._print_jobs if print_job.state == "queued"]
@pyqtProperty("QVariantList", notify=printJobsChanged)
@pyqtProperty("QVariantList", notify = printJobsChanged)
def activePrintJobs(self) -> List[PrintJobOutputModel]:
return [print_job for print_job in self._print_jobs if print_job.assignedPrinter is not None and print_job.state != "queued"]
@pyqtProperty("QVariantList", notify=clusterPrintersChanged)
@pyqtProperty("QVariantList", notify = clusterPrintersChanged)
def connectedPrintersTypeCount(self) -> List[Dict[str, str]]:
printer_count = {} # type: Dict[str, int]
for printer in self._printers:
@ -338,17 +331,17 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
result.append({"machine_type": machine_type, "count": str(printer_count[machine_type])})
return result
@pyqtSlot(int, result=str)
@pyqtSlot(int, result = str)
def formatDuration(self, seconds: int) -> str:
return Duration(seconds).getDisplayString(DurationFormat.Format.Short)
@pyqtSlot(int, result=str)
@pyqtSlot(int, result = str)
def getTimeCompleted(self, time_remaining: int) -> str:
current_time = time()
datetime_completed = datetime.fromtimestamp(current_time + time_remaining)
return "{hour:02d}:{minute:02d}".format(hour=datetime_completed.hour, minute=datetime_completed.minute)
@pyqtSlot(int, result=str)
@pyqtSlot(int, result = str)
def getDateCompleted(self, time_remaining: int) -> str:
current_time = time()
datetime_completed = datetime.fromtimestamp(current_time + time_remaining)
@ -379,8 +372,8 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
def _update(self) -> None:
super()._update()
self.get("printers/", on_finished=self._onGetPrintersDataFinished)
self.get("print_jobs/", on_finished=self._onGetPrintJobsFinished)
self.get("printers/", on_finished = self._onGetPrintersDataFinished)
self.get("print_jobs/", on_finished = self._onGetPrintJobsFinished)
def _onGetPrintJobsFinished(self, reply: QNetworkReply) -> None:
if not checkValidGetReply(reply):
@ -419,7 +412,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
removed_jobs = [print_job for print_job in self._print_jobs if print_job not in print_jobs_seen]
for removed_job in removed_jobs:
job_list_changed |= self._removeJob(removed_job)
job_list_changed = job_list_changed or self._removeJob(removed_job)
if job_list_changed:
self.printJobsChanged.emit() # Do a single emit for all print job changes.
@ -453,27 +446,27 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
if removed_printers or printer_list_changed:
self.printersChanged.emit()
def _createPrinterModel(self, data: Dict) -> PrinterOutputModel:
printer = PrinterOutputModel(output_controller=ClusterUM3PrinterOutputController(self),
number_of_extruders=self._number_of_extruders)
def _createPrinterModel(self, data: Dict[str, Any]) -> PrinterOutputModel:
printer = PrinterOutputModel(output_controller = ClusterUM3PrinterOutputController(self),
number_of_extruders = self._number_of_extruders)
printer.setCamera(NetworkCamera("http://" + data["ip_address"] + ":8080/?action=stream"))
self._printers.append(printer)
return printer
def _createPrintJobModel(self, data: Dict) -> PrintJobOutputModel:
def _createPrintJobModel(self, data: Dict[str, Any]) -> PrintJobOutputModel:
print_job = PrintJobOutputModel(output_controller=ClusterUM3PrinterOutputController(self),
key=data["uuid"], name= data["name"])
print_job.stateChanged.connect(self._printJobStateChanged)
self._print_jobs.append(print_job)
return print_job
def _updatePrintJob(self, print_job: PrintJobOutputModel, data: Dict) -> None:
def _updatePrintJob(self, print_job: PrintJobOutputModel, data: Dict[str, Any]) -> None:
print_job.updateTimeTotal(data["time_total"])
print_job.updateTimeElapsed(data["time_elapsed"])
print_job.updateState(data["status"])
print_job.updateOwner(data["owner"])
def _updatePrinter(self, printer: PrinterOutputModel, data: Dict) -> None:
def _updatePrinter(self, printer: PrinterOutputModel, data: Dict[str, Any]) -> None:
# For some unknown reason the cluster wants UUID for everything, except for sending a job directly to a printer.
# Then we suddenly need the unique name. So in order to not have to mess up all the other code, we save a mapping.
self._printer_uuid_to_unique_name_mapping[data["uuid"]] = data["unique_name"]
@ -489,7 +482,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
printer.updateKey(data["uuid"])
printer.updateType(data["machine_variant"])
# Do not store the buildplate information that comes from connect if the current printer has not buildplate information
# Do not store the build plate information that comes from connect if the current printer has not build plate information
if "build_plate" in data and machine_definition.getMetaDataEntry("has_variant_buildplates", False):
printer.updateBuildplateName(data["build_plate"]["type"])
if not data["enabled"]:
@ -528,7 +521,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
brand=brand, color=color, name=name)
extruder.updateActiveMaterial(material)
def _removeJob(self, job: PrintJobOutputModel):
def _removeJob(self, job: PrintJobOutputModel) -> bool:
if job not in self._print_jobs:
return False
@ -539,23 +532,23 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
return True
def _removePrinter(self, printer: PrinterOutputModel):
def _removePrinter(self, printer: PrinterOutputModel) -> None:
self._printers.remove(printer)
if self._active_printer == printer:
self._active_printer = None
self.activePrinterChanged.emit()
def loadJsonFromReply(reply):
def loadJsonFromReply(reply: QNetworkReply) -> Optional[List[Dict[str, Any]]]:
try:
result = json.loads(bytes(reply.readAll()).decode("utf-8"))
except json.decoder.JSONDecodeError:
Logger.logException("w", "Unable to decode JSON from reply.")
return
return None
return result
def checkValidGetReply(reply):
def checkValidGetReply(reply: QNetworkReply) -> bool:
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
if status_code != 200:
@ -564,7 +557,8 @@ def checkValidGetReply(reply):
return True
def findByKey(list, key):
def findByKey(list: List[Union[PrintJobOutputModel, PrinterOutputModel]], key: str) -> Optional[PrintJobOutputModel]:
for item in list:
if item.key == key:
return item
return item
return None