Start inheriting both output devices from a base device

This commit is contained in:
ChrisTerBeke 2019-07-26 17:05:39 +02:00
parent bd4f0c4e25
commit 4268c011a7
5 changed files with 79 additions and 140 deletions

View file

@ -3,7 +3,7 @@
import os
from time import time
from typing import Dict, List, Optional, Set, cast
from typing import List, Optional, Set, cast
from PyQt5.QtCore import QObject, QUrl, pyqtProperty, pyqtSignal, pyqtSlot
from PyQt5.QtGui import QDesktopServices
@ -14,14 +14,13 @@ from UM.FileHandler.FileHandler import FileHandler
from UM.Logger import Logger
from UM.Message import Message
from UM.PluginRegistry import PluginRegistry
from UM.Qt.Duration import Duration, DurationFormat
from UM.Scene.SceneNode import SceneNode
from UM.Version import Version
from cura.CuraApplication import CuraApplication
from cura.PrinterOutput.NetworkedPrinterOutputDevice import AuthState, NetworkedPrinterOutputDevice
from cura.PrinterOutput.Models.PrinterOutputModel import PrinterOutputModel
from cura.PrinterOutput.NetworkedPrinterOutputDevice import AuthState
from cura.PrinterOutput.PrinterOutputDevice import ConnectionType
from plugins.UM3NetworkPrinting.src.UltimakerNetworkedPrinterOutputDevice import UltimakerNetworkedPrinterOutputDevice
from .CloudOutputController import CloudOutputController
from ..MeshFormatHandler import MeshFormatHandler
@ -35,7 +34,7 @@ from plugins.UM3NetworkPrinting.src.Models.CloudPrintResponse import CloudPrintR
from plugins.UM3NetworkPrinting.src.Models.CloudPrintJobResponse import CloudPrintJobResponse
from plugins.UM3NetworkPrinting.src.Models.CloudClusterPrinterStatus import CloudClusterPrinterStatus
from plugins.UM3NetworkPrinting.src.Models.CloudClusterPrintJobStatus import CloudClusterPrintJobStatus
from .Utils import formatDateCompleted, formatTimeCompleted
I18N_CATALOG = i18nCatalog("cura")
@ -44,7 +43,8 @@ I18N_CATALOG = i18nCatalog("cura")
# Currently it only supports viewing the printer and print job status and adding a new job to the queue.
# As such, those methods have been implemented here.
# Note that this device represents a single remote cluster, not a list of multiple clusters.
class CloudOutputDevice(NetworkedPrinterOutputDevice):
class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
# The interval with which the remote clusters are checked
CHECK_CLUSTER_INTERVAL = 10.0 # seconds
@ -81,12 +81,11 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
super().__init__(device_id=cluster.cluster_id, address="",
connection_type=ConnectionType.CloudConnection, properties=properties, parent=parent)
self._api = api_client
self._account = api_client.account
self._cluster = cluster
self._setInterfaceElements()
self._account = api_client.account
# We use the Cura Connect monitor tab to get most functionality right away.
if PluginRegistry.getInstance() is not None:
plugin_path = PluginRegistry.getInstance().getPluginPath("UM3NetworkPrinting")
@ -98,13 +97,6 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
# Trigger the printersChanged signal when the private signal is triggered.
self.printersChanged.connect(self._clusterPrintersChanged)
# We keep track of which printer is visible in the monitor page.
self._active_printer = None # type: Optional[PrinterOutputModel]
# Properties to populate later on with received cloud data.
self._print_jobs = [] # type: List[UM3PrintJobOutputModel]
self._number_of_extruders = 2 # All networked printers are dual-extrusion Ultimaker machines.
# We only allow a single upload at a time.
self._progress = CloudProgressMessage()
@ -382,98 +374,27 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
firmware_version = Version([version_number[0], version_number[1], version_number[2]])
return firmware_version >= self.PRINT_JOB_ACTIONS_MIN_VERSION
## Gets the number of printers in the cluster.
# We use a minimum of 1 because cloud devices are always a cluster and printer discovery needs it.
@pyqtProperty(int, notify=_clusterPrintersChanged)
def clusterSize(self) -> int:
return max(1, len(self._printers))
## Gets the remote printers.
@pyqtProperty("QVariantList", notify=_clusterPrintersChanged)
def printers(self) -> List[PrinterOutputModel]:
return self._printers
## Get the active printer in the UI (monitor page).
@pyqtProperty(QObject, notify=activePrinterChanged)
def activePrinter(self) -> Optional[PrinterOutputModel]:
return self._active_printer
## Set the active printer in the UI (monitor page).
@pyqtSlot(QObject)
def setActivePrinter(self, printer: Optional[PrinterOutputModel] = None) -> None:
if printer != self._active_printer:
self._active_printer = printer
self.activePrinterChanged.emit()
## Get remote print jobs.
@pyqtProperty("QVariantList", notify=printJobsChanged)
def printJobs(self) -> List[UM3PrintJobOutputModel]:
return self._print_jobs
## Get remote print jobs that are still in the print queue.
@pyqtProperty("QVariantList", notify=printJobsChanged)
def queuedPrintJobs(self) -> List[UM3PrintJobOutputModel]:
return [print_job for print_job in self._print_jobs
if print_job.state == "queued" or print_job.state == "error"]
## Get remote print jobs that are assigned to a printer.
@pyqtProperty("QVariantList", notify=printJobsChanged)
def activePrintJobs(self) -> List[UM3PrintJobOutputModel]:
return [print_job for print_job in self._print_jobs if
print_job.assignedPrinter is not None and print_job.state != "queued"]
## Set the remote print job state.
def setJobState(self, print_job_uuid: str, state: str) -> None:
self._api.doPrintJobAction(self._cluster.cluster_id, print_job_uuid, state)
@pyqtSlot(str)
@pyqtSlot(str, name="sendJobToTop")
def sendJobToTop(self, print_job_uuid: str) -> None:
self._api.doPrintJobAction(self._cluster.cluster_id, print_job_uuid, "move",
{"list": "queued", "to_position": 0})
@pyqtSlot(str)
@pyqtSlot(str, name="deleteJobFromQueue")
def deleteJobFromQueue(self, print_job_uuid: str) -> None:
self._api.doPrintJobAction(self._cluster.cluster_id, print_job_uuid, "remove")
@pyqtSlot(str)
@pyqtSlot(str, name="forceSendJob")
def forceSendJob(self, print_job_uuid: str) -> None:
self._api.doPrintJobAction(self._cluster.cluster_id, print_job_uuid, "force")
@pyqtSlot(int, result=str)
def formatDuration(self, seconds: int) -> str:
return Duration(seconds).getDisplayString(DurationFormat.Format.Short)
@pyqtSlot(int, result=str)
def getTimeCompleted(self, time_remaining: int) -> str:
return formatTimeCompleted(time_remaining)
@pyqtSlot(int, result=str)
def getDateCompleted(self, time_remaining: int) -> str:
return formatDateCompleted(time_remaining)
@pyqtProperty(bool, notify=printJobsChanged)
def receivedPrintJobs(self) -> bool:
return bool(self._print_jobs)
@pyqtSlot()
@pyqtSlot(name="openPrintJobControlPanel")
def openPrintJobControlPanel(self) -> None:
QDesktopServices.openUrl(QUrl("https://mycloud.ultimaker.com"))
@pyqtSlot()
@pyqtSlot(name="openPrinterControlPanel")
def openPrinterControlPanel(self) -> None:
QDesktopServices.openUrl(QUrl("https://mycloud.ultimaker.com"))
## TODO: The following methods are required by the monitor page QML, but are not actually available using cloud.
# TODO: We fake the methods here to not break the monitor page.
@pyqtProperty(QUrl, notify=_clusterPrintersChanged)
def activeCameraUrl(self) -> "QUrl":
return QUrl()
@pyqtSlot(QUrl)
def setActiveCameraUrl(self, camera_url: "QUrl") -> None:
pass
@pyqtProperty("QVariantList", notify=_clusterPrintersChanged)
def connectedPrintersTypeCount(self) -> List[Dict[str, str]]:
return []

View file

@ -15,7 +15,7 @@ from .CloudApiClient import CloudApiClient
from .CloudOutputDevice import CloudOutputDevice
from plugins.UM3NetworkPrinting.src.Models.CloudClusterResponse import CloudClusterResponse
from plugins.UM3NetworkPrinting.src.Models.CloudError import CloudError
from .Utils import findChanges
from plugins.UM3NetworkPrinting.src.Utils import findChanges
## The cloud output device manager is responsible for using the Ultimaker Cloud APIs to manage remote clusters.

View file

@ -1,54 +0,0 @@
from datetime import datetime, timedelta
from typing import TypeVar, Dict, Tuple, List
from UM import i18nCatalog
T = TypeVar("T")
U = TypeVar("U")
## Splits the given dictionaries into three lists (in a tuple):
# - `removed`: Items that were in the first argument but removed in the second one.
# - `added`: Items that were not in the first argument but were included in the second one.
# - `updated`: Items that were in both dictionaries. Both values are given in a tuple.
# \param previous: The previous items
# \param received: The received items
# \return: The tuple (removed, added, updated) as explained above.
def findChanges(previous: Dict[str, T], received: Dict[str, U]) -> Tuple[List[T], List[U], List[Tuple[T, U]]]:
previous_ids = set(previous)
received_ids = set(received)
removed_ids = previous_ids.difference(received_ids)
new_ids = received_ids.difference(previous_ids)
updated_ids = received_ids.intersection(previous_ids)
removed = [previous[removed_id] for removed_id in removed_ids]
added = [received[new_id] for new_id in new_ids]
updated = [(previous[updated_id], received[updated_id]) for updated_id in updated_ids]
return removed, added, updated
def formatTimeCompleted(seconds_remaining: int) -> str:
completed = datetime.now() + timedelta(seconds=seconds_remaining)
return "{hour:02d}:{minute:02d}".format(hour = completed.hour, minute = completed.minute)
def formatDateCompleted(seconds_remaining: int) -> str:
now = datetime.now()
completed = now + timedelta(seconds=seconds_remaining)
days = (completed.date() - now.date()).days
i18n = i18nCatalog("cura")
# If finishing date is more than 7 days out, using "Mon Dec 3 at HH:MM" format
if days >= 7:
return completed.strftime("%a %b ") + "{day}".format(day = completed.day)
# If finishing date is within the next week, use "Monday at HH:MM" format
elif days >= 2:
return completed.strftime("%a")
# If finishing tomorrow, use "tomorrow at HH:MM" format
elif days >= 1:
return i18n.i18nc("@info:status", "tomorrow")
# If finishing today, use "today at HH:MM" format
else:
return i18n.i18nc("@info:status", "today")