Added bunch of typing to clusteroutput device

This commit is contained in:
Jaime van Kessel 2018-03-14 14:45:01 +01:00
parent 962c5f3260
commit ad3c5466d1

View file

@ -22,7 +22,7 @@ from PyQt5.QtCore import pyqtSlot, QUrl, pyqtSignal, pyqtProperty, QObject
from time import time from time import time
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional, Dict, List
import json import json
import os import os
@ -79,7 +79,6 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._latest_reply_handler = None self._latest_reply_handler = None
def requestWrite(self, nodes, file_name=None, filter_by_machine=False, file_handler=None, **kwargs): def requestWrite(self, nodes, file_name=None, filter_by_machine=False, file_handler=None, **kwargs):
self.writeStarted.emit(self) self.writeStarted.emit(self)
@ -116,7 +115,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
@pyqtSlot() @pyqtSlot()
@pyqtSlot(str) @pyqtSlot(str)
def sendPrintJob(self, target_printer = ""): def sendPrintJob(self, target_printer: str = ""):
Logger.log("i", "Sending print job to printer.") Logger.log("i", "Sending print job to printer.")
if self._sending_gcode: if self._sending_gcode:
self._error_message = Message( self._error_message = Message(
@ -157,11 +156,11 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
return True return True
@pyqtProperty(QObject, notify=activePrinterChanged) @pyqtProperty(QObject, notify=activePrinterChanged)
def activePrinter(self) -> Optional["PrinterOutputModel"]: def activePrinter(self) -> Optional[PrinterOutputModel]:
return self._active_printer return self._active_printer
@pyqtSlot(QObject) @pyqtSlot(QObject)
def setActivePrinter(self, printer): def setActivePrinter(self, printer: Optional[PrinterOutputModel]):
if self._active_printer != printer: if self._active_printer != printer:
if self._active_printer and self._active_printer.camera: if self._active_printer and self._active_printer.camera:
self._active_printer.camera.stop() self._active_printer.camera.stop()
@ -173,7 +172,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._compressing_gcode = False self._compressing_gcode = False
self._sending_gcode = False self._sending_gcode = False
def _onUploadPrintJobProgress(self, bytes_sent, bytes_total): def _onUploadPrintJobProgress(self, bytes_sent:int, bytes_total:int):
if bytes_total > 0: if bytes_total > 0:
new_progress = bytes_sent / bytes_total * 100 new_progress = bytes_sent / bytes_total * 100
# Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get # Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get
@ -186,7 +185,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._progress_message.setProgress(0) self._progress_message.setProgress(0)
self._progress_message.hide() self._progress_message.hide()
def _progressMessageActionTriggered(self, message_id=None, action_id=None): def _progressMessageActionTriggered(self, message_id: Optional[str]=None, action_id: Optional[str]=None) -> None:
if action_id == "Abort": if action_id == "Abort":
Logger.log("d", "User aborted sending print to remote.") Logger.log("d", "User aborted sending print to remote.")
self._progress_message.hide() self._progress_message.hide()
@ -202,29 +201,29 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
@pyqtSlot() @pyqtSlot()
def openPrintJobControlPanel(self): def openPrintJobControlPanel(self) -> None:
Logger.log("d", "Opening print job control panel...") Logger.log("d", "Opening print job control panel...")
QDesktopServices.openUrl(QUrl("http://" + self._address + "/print_jobs")) QDesktopServices.openUrl(QUrl("http://" + self._address + "/print_jobs"))
@pyqtSlot() @pyqtSlot()
def openPrinterControlPanel(self): def openPrinterControlPanel(self) -> None:
Logger.log("d", "Opening printer control panel...") Logger.log("d", "Opening printer control panel...")
QDesktopServices.openUrl(QUrl("http://" + self._address + "/printers")) QDesktopServices.openUrl(QUrl("http://" + self._address + "/printers"))
@pyqtProperty("QVariantList", notify=printJobsChanged) @pyqtProperty("QVariantList", notify=printJobsChanged)
def printJobs(self): def printJobs(self)-> List[PrintJobOutputModel] :
return self._print_jobs return self._print_jobs
@pyqtProperty("QVariantList", notify=printJobsChanged) @pyqtProperty("QVariantList", notify=printJobsChanged)
def queuedPrintJobs(self): def queuedPrintJobs(self) -> List[PrintJobOutputModel]:
return [print_job for print_job in self._print_jobs if print_job.assignedPrinter is None or print_job.state == "queued"] return [print_job for print_job in self._print_jobs if print_job.assignedPrinter is None or print_job.state == "queued"]
@pyqtProperty("QVariantList", notify=printJobsChanged) @pyqtProperty("QVariantList", notify=printJobsChanged)
def activePrintJobs(self): def activePrintJobs(self) -> List[PrintJobOutputModel]:
return [print_job for print_job in self._print_jobs if print_job.assignedPrinter is not None and print_job.state != "queued"] return [print_job for print_job in self._print_jobs if print_job.assignedPrinter is not None and print_job.state != "queued"]
@pyqtProperty("QVariantList", notify=clusterPrintersChanged) @pyqtProperty("QVariantList", notify=clusterPrintersChanged)
def connectedPrintersTypeCount(self): def connectedPrintersTypeCount(self) -> List[PrinterOutputModel]:
printer_count = {} printer_count = {}
for printer in self._printers: for printer in self._printers:
if printer.type in printer_count: if printer.type in printer_count:
@ -237,22 +236,22 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
return result return result
@pyqtSlot(int, result=str) @pyqtSlot(int, result=str)
def formatDuration(self, seconds): def formatDuration(self, seconds: int) -> str:
return Duration(seconds).getDisplayString(DurationFormat.Format.Short) return Duration(seconds).getDisplayString(DurationFormat.Format.Short)
@pyqtSlot(int, result=str) @pyqtSlot(int, result=str)
def getTimeCompleted(self, time_remaining): def getTimeCompleted(self, time_remaining: int) -> str:
current_time = time() current_time = time()
datetime_completed = datetime.fromtimestamp(current_time + time_remaining) datetime_completed = datetime.fromtimestamp(current_time + time_remaining)
return "{hour:02d}:{minute:02d}".format(hour=datetime_completed.hour, minute=datetime_completed.minute) return "{hour:02d}:{minute:02d}".format(hour=datetime_completed.hour, minute=datetime_completed.minute)
@pyqtSlot(int, result=str) @pyqtSlot(int, result=str)
def getDateCompleted(self, time_remaining): def getDateCompleted(self, time_remaining: int) -> str:
current_time = time() current_time = time()
datetime_completed = datetime.fromtimestamp(current_time + time_remaining) datetime_completed = datetime.fromtimestamp(current_time + time_remaining)
return (datetime_completed.strftime("%a %b ") + "{day}".format(day=datetime_completed.day)).upper() return (datetime_completed.strftime("%a %b ") + "{day}".format(day=datetime_completed.day)).upper()
def _printJobStateChanged(self): def _printJobStateChanged(self) -> None:
username = self._getUserName() username = self._getUserName()
if username is None: if username is None:
@ -275,13 +274,13 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
# Keep a list of all completed jobs so we know if something changed next time. # Keep a list of all completed jobs so we know if something changed next time.
self._finished_jobs = finished_jobs self._finished_jobs = finished_jobs
def _update(self): def _update(self) -> None:
if not super()._update(): if not super()._update():
return return
self.get("printers/", onFinished=self._onGetPrintersDataFinished) self.get("printers/", onFinished=self._onGetPrintersDataFinished)
self.get("print_jobs/", onFinished=self._onGetPrintJobsFinished) self.get("print_jobs/", onFinished=self._onGetPrintJobsFinished)
def _onGetPrintJobsFinished(self, reply: QNetworkReply): def _onGetPrintJobsFinished(self, reply: QNetworkReply) -> None:
if not checkValidGetReply(reply): if not checkValidGetReply(reply):
return return
@ -323,7 +322,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
if job_list_changed: if job_list_changed:
self.printJobsChanged.emit() # Do a single emit for all print job changes. self.printJobsChanged.emit() # Do a single emit for all print job changes.
def _onGetPrintersDataFinished(self, reply: QNetworkReply): def _onGetPrintersDataFinished(self, reply: QNetworkReply) -> None:
if not checkValidGetReply(reply): if not checkValidGetReply(reply):
return return
@ -352,27 +351,27 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
if removed_printers or printer_list_changed: if removed_printers or printer_list_changed:
self.printersChanged.emit() self.printersChanged.emit()
def _createPrinterModel(self, data): def _createPrinterModel(self, data: Dict) -> PrinterOutputModel:
printer = PrinterOutputModel(output_controller=ClusterUM3PrinterOutputController(self), printer = PrinterOutputModel(output_controller=ClusterUM3PrinterOutputController(self),
number_of_extruders=self._number_of_extruders) number_of_extruders=self._number_of_extruders)
printer.setCamera(NetworkCamera("http://" + data["ip_address"] + ":8080/?action=stream")) printer.setCamera(NetworkCamera("http://" + data["ip_address"] + ":8080/?action=stream"))
self._printers.append(printer) self._printers.append(printer)
return printer return printer
def _createPrintJobModel(self, data): def _createPrintJobModel(self, data: Dict) -> PrintJobOutputModel:
print_job = PrintJobOutputModel(output_controller=ClusterUM3PrinterOutputController(self), print_job = PrintJobOutputModel(output_controller=ClusterUM3PrinterOutputController(self),
key=data["uuid"], name= data["name"]) key=data["uuid"], name= data["name"])
print_job.stateChanged.connect(self._printJobStateChanged) print_job.stateChanged.connect(self._printJobStateChanged)
self._print_jobs.append(print_job) self._print_jobs.append(print_job)
return print_job return print_job
def _updatePrintJob(self, print_job, data): def _updatePrintJob(self, print_job: PrintJobOutputModel, data: Dict) -> None:
print_job.updateTimeTotal(data["time_total"]) print_job.updateTimeTotal(data["time_total"])
print_job.updateTimeElapsed(data["time_elapsed"]) print_job.updateTimeElapsed(data["time_elapsed"])
print_job.updateState(data["status"]) print_job.updateState(data["status"])
print_job.updateOwner(data["owner"]) print_job.updateOwner(data["owner"])
def _updatePrinter(self, printer, data) -> None: def _updatePrinter(self, printer: PrinterOutputModel, data: Dict) -> None:
# For some unknown reason the cluster wants UUID for everything, except for sending a job directly to a printer. # For some unknown reason the cluster wants UUID for everything, except for sending a job directly to a printer.
# Then we suddenly need the unique name. So in order to not have to mess up all the other code, we save a mapping. # Then we suddenly need the unique name. So in order to not have to mess up all the other code, we save a mapping.
self._printer_uuid_to_unique_name_mapping[data["uuid"]] = data["unique_name"] self._printer_uuid_to_unique_name_mapping[data["uuid"]] = data["unique_name"]
@ -427,7 +426,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
brand=brand, color=color, name=name) brand=brand, color=color, name=name)
extruder.updateActiveMaterial(material) extruder.updateActiveMaterial(material)
def _removeJob(self, job): def _removeJob(self, job: PrintJobOutputModel):
if job not in self._print_jobs: if job not in self._print_jobs:
return False return False
@ -438,7 +437,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
return True return True
def _removePrinter(self, printer): def _removePrinter(self, printer: PrinterOutputModel):
self._printers.remove(printer) self._printers.remove(printer)
if self._active_printer == printer: if self._active_printer == printer:
self._active_printer = None self._active_printer = None