From ad3c5466d1078640f3b509f570aa3b0fde99ec6b Mon Sep 17 00:00:00 2001 From: Jaime van Kessel Date: Wed, 14 Mar 2018 14:45:01 +0100 Subject: [PATCH] Added bunch of typing to clusteroutput device --- .../ClusterUM3OutputDevice.py | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/plugins/UM3NetworkPrinting/ClusterUM3OutputDevice.py b/plugins/UM3NetworkPrinting/ClusterUM3OutputDevice.py index d69d787eab..c19c86d6ce 100644 --- a/plugins/UM3NetworkPrinting/ClusterUM3OutputDevice.py +++ b/plugins/UM3NetworkPrinting/ClusterUM3OutputDevice.py @@ -22,7 +22,7 @@ from PyQt5.QtCore import pyqtSlot, QUrl, pyqtSignal, pyqtProperty, QObject from time import time from datetime import datetime -from typing import Optional +from typing import Optional, Dict, List import json import os @@ -79,7 +79,6 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): self._latest_reply_handler = None - def requestWrite(self, nodes, file_name=None, filter_by_machine=False, file_handler=None, **kwargs): self.writeStarted.emit(self) @@ -116,7 +115,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): @pyqtSlot() @pyqtSlot(str) - def sendPrintJob(self, target_printer = ""): + def sendPrintJob(self, target_printer: str = ""): Logger.log("i", "Sending print job to printer.") if self._sending_gcode: self._error_message = Message( @@ -157,11 +156,11 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): return True @pyqtProperty(QObject, notify=activePrinterChanged) - def activePrinter(self) -> Optional["PrinterOutputModel"]: + def activePrinter(self) -> Optional[PrinterOutputModel]: return self._active_printer @pyqtSlot(QObject) - def setActivePrinter(self, printer): + def setActivePrinter(self, printer: Optional[PrinterOutputModel]): if self._active_printer != printer: if self._active_printer and self._active_printer.camera: self._active_printer.camera.stop() @@ -173,7 +172,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): self._compressing_gcode = False self._sending_gcode = False - def _onUploadPrintJobProgress(self, bytes_sent, bytes_total): + def _onUploadPrintJobProgress(self, bytes_sent:int, bytes_total:int): if bytes_total > 0: new_progress = bytes_sent / bytes_total * 100 # Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get @@ -186,7 +185,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): self._progress_message.setProgress(0) self._progress_message.hide() - def _progressMessageActionTriggered(self, message_id=None, action_id=None): + def _progressMessageActionTriggered(self, message_id: Optional[str]=None, action_id: Optional[str]=None) -> None: if action_id == "Abort": Logger.log("d", "User aborted sending print to remote.") self._progress_message.hide() @@ -202,29 +201,29 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): @pyqtSlot() - def openPrintJobControlPanel(self): + def openPrintJobControlPanel(self) -> None: Logger.log("d", "Opening print job control panel...") QDesktopServices.openUrl(QUrl("http://" + self._address + "/print_jobs")) @pyqtSlot() - def openPrinterControlPanel(self): + def openPrinterControlPanel(self) -> None: Logger.log("d", "Opening printer control panel...") QDesktopServices.openUrl(QUrl("http://" + self._address + "/printers")) @pyqtProperty("QVariantList", notify=printJobsChanged) - def printJobs(self): + def printJobs(self)-> List[PrintJobOutputModel] : return self._print_jobs @pyqtProperty("QVariantList", notify=printJobsChanged) - def queuedPrintJobs(self): + def queuedPrintJobs(self) -> List[PrintJobOutputModel]: return [print_job for print_job in self._print_jobs if print_job.assignedPrinter is None or print_job.state == "queued"] @pyqtProperty("QVariantList", notify=printJobsChanged) - def activePrintJobs(self): + def activePrintJobs(self) -> List[PrintJobOutputModel]: return [print_job for print_job in self._print_jobs if print_job.assignedPrinter is not None and print_job.state != "queued"] @pyqtProperty("QVariantList", notify=clusterPrintersChanged) - def connectedPrintersTypeCount(self): + def connectedPrintersTypeCount(self) -> List[PrinterOutputModel]: printer_count = {} for printer in self._printers: if printer.type in printer_count: @@ -237,22 +236,22 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): return result @pyqtSlot(int, result=str) - def formatDuration(self, seconds): + def formatDuration(self, seconds: int) -> str: return Duration(seconds).getDisplayString(DurationFormat.Format.Short) @pyqtSlot(int, result=str) - def getTimeCompleted(self, time_remaining): + def getTimeCompleted(self, time_remaining: int) -> str: current_time = time() datetime_completed = datetime.fromtimestamp(current_time + time_remaining) return "{hour:02d}:{minute:02d}".format(hour=datetime_completed.hour, minute=datetime_completed.minute) @pyqtSlot(int, result=str) - def getDateCompleted(self, time_remaining): + def getDateCompleted(self, time_remaining: int) -> str: current_time = time() datetime_completed = datetime.fromtimestamp(current_time + time_remaining) return (datetime_completed.strftime("%a %b ") + "{day}".format(day=datetime_completed.day)).upper() - def _printJobStateChanged(self): + def _printJobStateChanged(self) -> None: username = self._getUserName() if username is None: @@ -275,13 +274,13 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): # Keep a list of all completed jobs so we know if something changed next time. self._finished_jobs = finished_jobs - def _update(self): + def _update(self) -> None: if not super()._update(): return self.get("printers/", onFinished=self._onGetPrintersDataFinished) self.get("print_jobs/", onFinished=self._onGetPrintJobsFinished) - def _onGetPrintJobsFinished(self, reply: QNetworkReply): + def _onGetPrintJobsFinished(self, reply: QNetworkReply) -> None: if not checkValidGetReply(reply): return @@ -323,7 +322,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): if job_list_changed: self.printJobsChanged.emit() # Do a single emit for all print job changes. - def _onGetPrintersDataFinished(self, reply: QNetworkReply): + def _onGetPrintersDataFinished(self, reply: QNetworkReply) -> None: if not checkValidGetReply(reply): return @@ -352,27 +351,27 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): if removed_printers or printer_list_changed: self.printersChanged.emit() - def _createPrinterModel(self, data): + def _createPrinterModel(self, data: Dict) -> PrinterOutputModel: printer = PrinterOutputModel(output_controller=ClusterUM3PrinterOutputController(self), number_of_extruders=self._number_of_extruders) printer.setCamera(NetworkCamera("http://" + data["ip_address"] + ":8080/?action=stream")) self._printers.append(printer) return printer - def _createPrintJobModel(self, data): + def _createPrintJobModel(self, data: Dict) -> PrintJobOutputModel: print_job = PrintJobOutputModel(output_controller=ClusterUM3PrinterOutputController(self), key=data["uuid"], name= data["name"]) print_job.stateChanged.connect(self._printJobStateChanged) self._print_jobs.append(print_job) return print_job - def _updatePrintJob(self, print_job, data): + def _updatePrintJob(self, print_job: PrintJobOutputModel, data: Dict) -> None: print_job.updateTimeTotal(data["time_total"]) print_job.updateTimeElapsed(data["time_elapsed"]) print_job.updateState(data["status"]) print_job.updateOwner(data["owner"]) - def _updatePrinter(self, printer, data) -> None: + def _updatePrinter(self, printer: PrinterOutputModel, data: Dict) -> None: # For some unknown reason the cluster wants UUID for everything, except for sending a job directly to a printer. # Then we suddenly need the unique name. So in order to not have to mess up all the other code, we save a mapping. self._printer_uuid_to_unique_name_mapping[data["uuid"]] = data["unique_name"] @@ -427,7 +426,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): brand=brand, color=color, name=name) extruder.updateActiveMaterial(material) - def _removeJob(self, job): + def _removeJob(self, job: PrintJobOutputModel): if job not in self._print_jobs: return False @@ -438,7 +437,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice): return True - def _removePrinter(self, printer): + def _removePrinter(self, printer: PrinterOutputModel): self._printers.remove(printer) if self._active_printer == printer: self._active_printer = None