CURA-5330 Fix typing and code style in the UM3NetworkPrinting plugin

This commit is contained in:
Diego Prado Gesto 2018-06-14 15:41:12 +02:00
parent b331736cb2
commit 7eba868574
3 changed files with 37 additions and 23 deletions

View file

@ -207,7 +207,7 @@ class NetworkedPrinterOutputDevice(PrinterOutputDevice):
reply = self._manager.get(request) reply = self._manager.get(request)
self._registerOnFinishedCallback(reply, on_finished) self._registerOnFinishedCallback(reply, on_finished)
def post(self, target: str, data: str, onFinished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> None: def post(self, target: str, data: str, on_finished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> None:
if self._manager is None: if self._manager is None:
self._createNetworkManager() self._createNetworkManager()
assert(self._manager is not None) assert(self._manager is not None)
@ -216,7 +216,7 @@ class NetworkedPrinterOutputDevice(PrinterOutputDevice):
reply = self._manager.post(request, data) reply = self._manager.post(request, data)
if on_progress is not None: if on_progress is not None:
reply.uploadProgress.connect(on_progress) reply.uploadProgress.connect(on_progress)
self._registerOnFinishedCallback(reply, onFinished) self._registerOnFinishedCallback(reply, on_finished)
def postFormWithParts(self, target:str, parts: List[QHttpPart], on_finished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> None: def postFormWithParts(self, target:str, parts: List[QHttpPart], on_finished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> None:
if self._manager is None: if self._manager is None:

View file

@ -1,6 +1,11 @@
# Copyright (c) 2018 Ultimaker B.V. # Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher. # Cura is released under the terms of the LGPLv3 or higher.
from typing import Generator
from typing import Set
from typing import Tuple
from typing import Union
from UM.FileHandler.FileHandler import FileHandler
from UM.FileHandler.FileWriter import FileWriter #To choose based on the output file mode (text vs. binary). from UM.FileHandler.FileWriter import FileWriter #To choose based on the output file mode (text vs. binary).
from UM.FileHandler.WriteFileJob import WriteFileJob #To call the file writer asynchronously. from UM.FileHandler.WriteFileJob import WriteFileJob #To call the file writer asynchronously.
from UM.Logger import Logger from UM.Logger import Logger
@ -44,15 +49,15 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
# Inheritance doesn't seem to work. Tying them together does work, but i'm open for better suggestions. # Inheritance doesn't seem to work. Tying them together does work, but i'm open for better suggestions.
clusterPrintersChanged = pyqtSignal() clusterPrintersChanged = pyqtSignal()
def __init__(self, device_id, address, properties, parent = None): def __init__(self, device_id, address, properties, parent = None) -> None:
super().__init__(device_id = device_id, address = address, properties=properties, parent = parent) super().__init__(device_id = device_id, address = address, properties=properties, parent = parent)
self._api_prefix = "/cluster-api/v1/" self._api_prefix = "/cluster-api/v1/"
self._number_of_extruders = 2 self._number_of_extruders = 2
self._dummy_lambdas = set() self._dummy_lambdas = set() # type: Set[Tuple[str, Dict, Union[io.StringIO, io.BytesIO]]]
self._print_jobs = [] self._print_jobs = [] # type: List[PrintJobOutputModel]
self._monitor_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ClusterMonitorItem.qml") self._monitor_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ClusterMonitorItem.qml")
self._control_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ClusterControlItem.qml") self._control_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ClusterControlItem.qml")
@ -80,15 +85,15 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self.setConnectionText(i18n_catalog.i18nc("@info:status", "Connected over the network")) self.setConnectionText(i18n_catalog.i18nc("@info:status", "Connected over the network"))
self._printer_uuid_to_unique_name_mapping = {} self._printer_uuid_to_unique_name_mapping = {} # type: Dict[str, str]
self._finished_jobs = [] self._finished_jobs = [] # type: List[PrintJobOutputModel]
self._cluster_size = int(properties.get(b"cluster_size", 0)) self._cluster_size = int(properties.get(b"cluster_size", 0))
self._latest_reply_handler = None self._latest_reply_handler = None
def requestWrite(self, nodes: List[SceneNode], file_name=None, filter_by_machine=False, file_handler=None, **kwargs): def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
self.writeStarted.emit(self) self.writeStarted.emit(self)
#Formats supported by this application (file types that we can actually write). #Formats supported by this application (file types that we can actually write).
@ -168,6 +173,8 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._error_message = Message( self._error_message = Message(
i18n_catalog.i18nc("@info:status", i18n_catalog.i18nc("@info:status",
"Sending new jobs (temporarily) blocked, still sending the previous print job.")) "Sending new jobs (temporarily) blocked, still sending the previous print job."))
assert(self._error_message is not None)
self._error_message.show() self._error_message.show()
yield #Wait on the user to select a target printer. yield #Wait on the user to select a target printer.
yield #Wait for the write job to be finished. yield #Wait for the write job to be finished.
@ -179,15 +186,17 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
target_printer = yield #Potentially wait on the user to select a target printer. target_printer = yield #Potentially wait on the user to select a target printer.
# Using buffering greatly reduces the write time for many lines of gcode # Using buffering greatly reduces the write time for many lines of gcode
stream = io.BytesIO() # type: Union[io.BytesIO, io.StringIO]# Binary mode.
if preferred_format["mode"] == FileWriter.OutputMode.TextMode: if preferred_format["mode"] == FileWriter.OutputMode.TextMode:
stream = io.StringIO() stream = io.StringIO()
else: #Binary mode.
stream = io.BytesIO()
job = WriteFileJob(writer, stream, nodes, preferred_format["mode"]) job = WriteFileJob(writer, stream, nodes, preferred_format["mode"])
self._write_job_progress_message = Message(i18n_catalog.i18nc("@info:status", "Sending data to printer"), lifetime = 0, dismissable = False, progress = -1, self._write_job_progress_message = Message(i18n_catalog.i18nc("@info:status", "Sending data to printer"), lifetime = 0, dismissable = False, progress = -1,
title = i18n_catalog.i18nc("@info:title", "Sending Data"), use_inactivity_timer = False) title = i18n_catalog.i18nc("@info:title", "Sending Data"), use_inactivity_timer = False)
assert(self._write_job_progress_message is not None) # use for typing purposes
self._write_job_progress_message.show() self._write_job_progress_message.show()
self._dummy_lambdas = (target_printer, preferred_format, stream) self._dummy_lambdas = (target_printer, preferred_format, stream)
@ -254,7 +263,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
# Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get # Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get
# timeout responses if this happens. # timeout responses if this happens.
self._last_response_time = time() self._last_response_time = time()
if new_progress > self._progress_message.getProgress(): if self._progress_message and new_progress > self._progress_message.getProgress():
self._progress_message.show() # Ensure that the message is visible. self._progress_message.show() # Ensure that the message is visible.
self._progress_message.setProgress(bytes_sent / bytes_total * 100) self._progress_message.setProgress(bytes_sent / bytes_total * 100)
@ -271,13 +280,15 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._success_message.actionTriggered.connect(self._successMessageActionTriggered) self._success_message.actionTriggered.connect(self._successMessageActionTriggered)
self._success_message.show() self._success_message.show()
else: else:
self._progress_message.setProgress(0) if self._progress_message is not None:
self._progress_message.hide() self._progress_message.setProgress(0)
self._progress_message.hide()
def _progressMessageActionTriggered(self, message_id: Optional[str]=None, action_id: Optional[str]=None) -> None: def _progressMessageActionTriggered(self, message_id: Optional[str]=None, action_id: Optional[str]=None) -> None:
if action_id == "Abort": if action_id == "Abort":
Logger.log("d", "User aborted sending print to remote.") Logger.log("d", "User aborted sending print to remote.")
self._progress_message.hide() if self._progress_message is not None:
self._progress_message.hide()
self._compressing_gcode = False self._compressing_gcode = False
self._sending_gcode = False self._sending_gcode = False
Application.getInstance().getController().setActiveStage("PrepareStage") Application.getInstance().getController().setActiveStage("PrepareStage")
@ -315,8 +326,8 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
return [print_job for print_job in self._print_jobs if print_job.assignedPrinter is not None and print_job.state != "queued"] return [print_job for print_job in self._print_jobs if print_job.assignedPrinter is not None and print_job.state != "queued"]
@pyqtProperty("QVariantList", notify=clusterPrintersChanged) @pyqtProperty("QVariantList", notify=clusterPrintersChanged)
def connectedPrintersTypeCount(self) -> List[PrinterOutputModel]: def connectedPrintersTypeCount(self) -> List[Dict[str, str]]:
printer_count = {} printer_count = {} # type: Dict[str, int]
for printer in self._printers: for printer in self._printers:
if printer.type in printer_count: if printer.type in printer_count:
printer_count[printer.type] += 1 printer_count[printer.type] += 1
@ -324,7 +335,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
printer_count[printer.type] = 1 printer_count[printer.type] = 1
result = [] result = []
for machine_type in printer_count: for machine_type in printer_count:
result.append({"machine_type": machine_type, "count": printer_count[machine_type]}) result.append({"machine_type": machine_type, "count": str(printer_count[machine_type])})
return result return result
@pyqtSlot(int, result=str) @pyqtSlot(int, result=str)
@ -367,10 +378,9 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._finished_jobs = finished_jobs self._finished_jobs = finished_jobs
def _update(self) -> None: def _update(self) -> None:
if not super()._update(): super()._update()
return self.get("printers/", on_finished=self._onGetPrintersDataFinished)
self.get("printers/", onFinished=self._onGetPrintersDataFinished) self.get("print_jobs/", on_finished=self._onGetPrintJobsFinished)
self.get("print_jobs/", onFinished=self._onGetPrintJobsFinished)
def _onGetPrintJobsFinished(self, reply: QNetworkReply) -> None: def _onGetPrintJobsFinished(self, reply: QNetworkReply) -> None:
if not checkValidGetReply(reply): if not checkValidGetReply(reply):

View file

@ -1,3 +1,7 @@
from typing import List, Optional
from UM.FileHandler.FileHandler import FileHandler
from UM.Scene.SceneNode import SceneNode
from cura.PrinterOutput.NetworkedPrinterOutputDevice import NetworkedPrinterOutputDevice, AuthState from cura.PrinterOutput.NetworkedPrinterOutputDevice import NetworkedPrinterOutputDevice, AuthState
from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
from cura.PrinterOutput.PrintJobOutputModel import PrintJobOutputModel from cura.PrinterOutput.PrintJobOutputModel import PrintJobOutputModel
@ -39,7 +43,7 @@ i18n_catalog = i18nCatalog("cura")
# 4. At this point the machine either has the state Authenticated or AuthenticationDenied. # 4. At this point the machine either has the state Authenticated or AuthenticationDenied.
# 5. As a final step, we verify the authentication, as this forces the QT manager to setup the authenticator. # 5. As a final step, we verify the authentication, as this forces the QT manager to setup the authenticator.
class LegacyUM3OutputDevice(NetworkedPrinterOutputDevice): class LegacyUM3OutputDevice(NetworkedPrinterOutputDevice):
def __init__(self, device_id, address: str, properties, parent = None): def __init__(self, device_id, address: str, properties, parent = None) -> None:
super().__init__(device_id = device_id, address = address, properties = properties, parent = parent) super().__init__(device_id = device_id, address = address, properties = properties, parent = parent)
self._api_prefix = "/api/v1/" self._api_prefix = "/api/v1/"
self._number_of_extruders = 2 self._number_of_extruders = 2
@ -168,7 +172,7 @@ class LegacyUM3OutputDevice(NetworkedPrinterOutputDevice):
# NotImplementedError. We can simply ignore these. # NotImplementedError. We can simply ignore these.
pass pass
def requestWrite(self, nodes, file_name=None, filter_by_machine=False, file_handler=None, **kwargs): def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
if not self.activePrinter: if not self.activePrinter:
# No active printer. Unable to write # No active printer. Unable to write
return return