Merge more stuff, re-use models for local networking as well

This commit is contained in:
ChrisTerBeke 2019-07-29 14:53:50 +02:00
parent 8f37c83b9c
commit 4b212d6c05
31 changed files with 688 additions and 975 deletions

View file

@ -186,17 +186,7 @@ Item
}
printJob: modelData
}
model:
{
// When printing over the cloud we don't recieve print jobs until there is one, so
// unless there's at least one print job we'll be stuck with skeleton loading
// indefinitely.
if (Cura.MachineManager.activeMachineIsUsingCloudConnection || OutputDevice.receivedPrintJobs)
{
return OutputDevice.queuedPrintJobs
}
return [null, null]
}
model: OutputDevice.queuedPrintJobs
spacing: 6 // TODO: Theme!
}
}

View file

@ -50,17 +50,7 @@ Component
MonitorCarousel
{
id: carousel
printers:
{
// When printing over the cloud we don't recieve print jobs until there is one, so
// unless there's at least one print job we'll be stuck with skeleton loading
// indefinitely.
if (Cura.MachineManager.activeMachineIsUsingCloudConnection || OutputDevice.receivedPrintJobs)
{
return OutputDevice.printers
}
return [null]
}
printers: OutputDevice.printers
}
}

View file

@ -11,14 +11,15 @@ from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply, QNetworkAccessManage
from UM.Logger import Logger
from cura import UltimakerCloudAuthentication
from cura.API import Account
from .ToolPathUploader import ToolPathUploader
from ..Models.BaseModel import BaseModel
from plugins.UM3NetworkPrinting.src.Models.CloudClusterResponse import CloudClusterResponse
from plugins.UM3NetworkPrinting.src.Models.CloudError import CloudError
from plugins.UM3NetworkPrinting.src.Models.CloudClusterStatus import CloudClusterStatus
from plugins.UM3NetworkPrinting.src.Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
from plugins.UM3NetworkPrinting.src.Models.CloudPrintResponse import CloudPrintResponse
from plugins.UM3NetworkPrinting.src.Models.CloudPrintJobResponse import CloudPrintJobResponse
from ..Models.Http.CloudClusterResponse import CloudClusterResponse
from ..Models.Http.CloudError import CloudError
from ..Models.Http.CloudClusterStatus import CloudClusterStatus
from ..Models.Http.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
from ..Models.Http.CloudPrintResponse import CloudPrintResponse
from ..Models.Http.CloudPrintJobResponse import CloudPrintJobResponse
## The generic type variable used to document the methods below.

View file

@ -1,7 +1,5 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import os
from time import time
from typing import List, Optional, Set, cast
@ -13,27 +11,24 @@ from UM.Backend.Backend import BackendState
from UM.FileHandler.FileHandler import FileHandler
from UM.Logger import Logger
from UM.Message import Message
from UM.PluginRegistry import PluginRegistry
from UM.Scene.SceneNode import SceneNode
from UM.Version import Version
from cura.CuraApplication import CuraApplication
from cura.PrinterOutput.NetworkedPrinterOutputDevice import AuthState
from cura.PrinterOutput.PrinterOutputDevice import ConnectionType
from plugins.UM3NetworkPrinting.src.UltimakerNetworkedPrinterOutputDevice import UltimakerNetworkedPrinterOutputDevice
from .CloudOutputController import CloudOutputController
from ..MeshFormatHandler import MeshFormatHandler
from plugins.UM3NetworkPrinting.src.Models.UM3PrintJobOutputModel import UM3PrintJobOutputModel
from .CloudProgressMessage import CloudProgressMessage
from .CloudApiClient import CloudApiClient
from plugins.UM3NetworkPrinting.src.Models.CloudClusterResponse import CloudClusterResponse
from plugins.UM3NetworkPrinting.src.Models.CloudClusterStatus import CloudClusterStatus
from plugins.UM3NetworkPrinting.src.Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
from plugins.UM3NetworkPrinting.src.Models.CloudPrintResponse import CloudPrintResponse
from plugins.UM3NetworkPrinting.src.Models.CloudPrintJobResponse import CloudPrintJobResponse
from plugins.UM3NetworkPrinting.src.Models.CloudClusterPrinterStatus import CloudClusterPrinterStatus
from plugins.UM3NetworkPrinting.src.Models.CloudClusterPrintJobStatus import CloudClusterPrintJobStatus
from ..UltimakerNetworkedPrinterOutputDevice import UltimakerNetworkedPrinterOutputDevice
from ..MeshFormatHandler import MeshFormatHandler
from ..Models.Http.CloudClusterResponse import CloudClusterResponse
from ..Models.Http.CloudClusterStatus import CloudClusterStatus
from ..Models.Http.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
from ..Models.Http.CloudPrintResponse import CloudPrintResponse
from ..Models.Http.CloudPrintJobResponse import CloudPrintJobResponse
from ..Models.Http.ClusterPrinterStatus import ClusterPrinterStatus
from ..Models.Http.ClusterPrintJobStatus import ClusterPrintJobStatus
I18N_CATALOG = i18nCatalog("cura")
@ -78,22 +73,21 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
b"cluster_size": b"1" # cloud devices are always clusters of at least one
}
super().__init__(device_id=cluster.cluster_id, address="",
connection_type=ConnectionType.CloudConnection, properties=properties, parent=parent)
super().__init__(
device_id=cluster.cluster_id,
address="",
connection_type=ConnectionType.CloudConnection,
properties=properties,
parent=parent
)
self._api = api_client
self._account = api_client.account
self._cluster = cluster
self.setAuthenticationState(AuthState.NotAuthenticated)
self._setInterfaceElements()
# We use the Cura Connect monitor tab to get most functionality right away.
if PluginRegistry.getInstance() is not None:
plugin_path = PluginRegistry.getInstance().getPluginPath("UM3NetworkPrinting")
if plugin_path is None:
Logger.log("e", "Cloud not find plugin path for plugin UM3NetworkPrnting")
raise RuntimeError("Cloud not find plugin path for plugin UM3NetworkPrnting")
self._monitor_view_qml_path = os.path.join(plugin_path, "resources", "qml", "MonitorStage.qml")
# Trigger the printersChanged signal when the private signal is triggered.
self.printersChanged.connect(self._clusterPrintersChanged)
@ -101,11 +95,8 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
self._progress = CloudProgressMessage()
# Keep server string of the last generated time to avoid updating models more than once for the same response
self._received_printers = None # type: Optional[List[CloudClusterPrinterStatus]]
self._received_print_jobs = None # type: Optional[List[CloudClusterPrintJobStatus]]
# A set of the user's job IDs that have finished
self._finished_jobs = set() # type: Set[str]
self._received_printers = None # type: Optional[List[ClusterPrinterStatus]]
self._received_print_jobs = None # type: Optional[List[ClusterPrintJobStatus]]
# Reference to the uploaded print job / mesh
self._tool_path = None # type: Optional[bytes]
@ -130,33 +121,21 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
self._tool_path = None
self._uploaded_print_job = None
## Gets the cluster response from which this device was created.
@property
def clusterData(self) -> CloudClusterResponse:
return self._cluster
## Updates the cluster data from the cloud.
@clusterData.setter
def clusterData(self, value: CloudClusterResponse) -> None:
self._cluster = value
## Checks whether the given network key is found in the cloud's host name
def matchesNetworkKey(self, network_key: str) -> bool:
# Typically, a network key looks like "ultimakersystem-aabbccdd0011._ultimaker._tcp.local."
# the host name should then be "ultimakersystem-aabbccdd0011"
if network_key.startswith(self.clusterData.host_name):
return True
# However, for manually added printers, the local IP address is used in lieu of a proper
# network key, so check for that as well
if self.clusterData.host_internal_ip is not None and network_key.find(self.clusterData.host_internal_ip):
return True
return False
## Set all the interface elements and texts for this output device.
def _setInterfaceElements(self) -> None:
self.setPriority(2) # Make sure we end up below the local networking and above 'save to file'
self.setPriority(2) # Make sure we end up below the local networking and above 'save to file'.
self.setName(self._id)
self.setShortDescription(I18N_CATALOG.i18nc("@action:button", "Print via Cloud"))
self.setDescription(I18N_CATALOG.i18nc("@properties:tooltip", "Print via Cloud"))
@ -227,105 +206,6 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
self._received_print_jobs = status.print_jobs
self._updatePrintJobs(status.print_jobs)
## Updates the local list of printers with the list received from the cloud.
# \param remote_printers: The printers received from the cloud.
def _updatePrinters(self, remote_printers: List[CloudClusterPrinterStatus]) -> None:
# Keep track of the new printers to show.
# We create a new list instead of changing the existing one to get the correct order.
new_printers = []
# Check which printers need to be created or updated.
for index, printer_data in enumerate(remote_printers):
printer = next(iter(printer for printer in self._printers if printer.key == printer_data.uuid), None)
if not printer:
new_printers.append(printer_data.createOutputModel(CloudOutputController(self)))
else:
printer_data.updateOutputModel(printer)
new_printers.append(printer)
# Check which printers need to be removed (de-referenced).
remote_printers_keys = [printer_data.uuid for printer_data in remote_printers]
removed_printers = [printer for printer in self._printers if printer.key not in remote_printers_keys]
for removed_printer in removed_printers:
if self._active_printer and self._active_printer.key == removed_printer.key:
self.setActivePrinter(None)
self._printers = new_printers
if self._printers and not self.activePrinter:
self.setActivePrinter(self._printers[0])
self.printersChanged.emit()
## Updates the local list of print jobs with the list received from the cloud.
# \param remote_jobs: The print jobs received from the cloud.
def _updatePrintJobs(self, remote_jobs: List[CloudClusterPrintJobStatus]) -> None:
# Keep track of the new print jobs to show.
# We create a new list instead of changing the existing one to get the correct order.
new_print_jobs = []
# Check which print jobs need to be created or updated.
for index, print_job_data in enumerate(remote_jobs):
print_job = next(
iter(print_job for print_job in self._print_jobs if print_job.key == print_job_data.uuid), None)
if not print_job:
new_print_jobs.append(self._createPrintJobModel(print_job_data))
else:
print_job_data.updateOutputModel(print_job)
if print_job_data.printer_uuid:
self._updateAssignedPrinter(print_job, print_job_data.printer_uuid)
new_print_jobs.append(print_job)
# Check which print job need to be removed (de-referenced).
remote_job_keys = [print_job_data.uuid for print_job_data in remote_jobs]
removed_jobs = [print_job for print_job in self._print_jobs if print_job.key not in remote_job_keys]
for removed_job in removed_jobs:
if removed_job.assignedPrinter:
removed_job.assignedPrinter.updateActivePrintJob(None)
removed_job.stateChanged.disconnect(self._onPrintJobStateChanged)
self._print_jobs = new_print_jobs
self.printJobsChanged.emit()
## Create a new print job model based on the remote status of the job.
# \param remote_job: The remote print job data.
def _createPrintJobModel(self, remote_job: CloudClusterPrintJobStatus) -> UM3PrintJobOutputModel:
model = remote_job.createOutputModel(CloudOutputController(self))
model.stateChanged.connect(self._onPrintJobStateChanged)
if remote_job.printer_uuid:
self._updateAssignedPrinter(model, remote_job.printer_uuid)
return model
## Handles the event of a change in a print job state
def _onPrintJobStateChanged(self) -> None:
user_name = self._getUserName()
# TODO: confirm that notifications in Cura are still required
for job in self._print_jobs:
if job.state == "wait_cleanup" and job.key not in self._finished_jobs and job.owner == user_name:
self._finished_jobs.add(job.key)
Message(
title=I18N_CATALOG.i18nc("@info:status", "Print finished"),
text=(I18N_CATALOG.i18nc("@info:status",
"Printer '{printer_name}' has finished printing '{job_name}'.").format(
printer_name=job.assignedPrinter.name,
job_name=job.name
) if job.assignedPrinter else
I18N_CATALOG.i18nc("@info:status", "The print job '{job_name}' was finished.").format(
job_name=job.name
)),
).show()
## Updates the printer assignment for the given print job model.
def _updateAssignedPrinter(self, model: UM3PrintJobOutputModel, printer_uuid: str) -> None:
printer = next((p for p in self._printers if printer_uuid == p.key), None)
if not printer:
Logger.log("w", "Missing printer %s for job %s in %s", model.assignedPrinter, model.key,
[p.key for p in self._printers])
return
printer.updateActivePrintJob(model)
model.updateAssignedPrinter(printer)
## Uploads the mesh when the print job was registered with the cloud API.
# \param job_response: The response received from the cloud API.
def _onPrintJobCreated(self, job_response: CloudPrintJobResponse) -> None:
@ -398,3 +278,13 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
@pyqtSlot(name="openPrinterControlPanel")
def openPrinterControlPanel(self) -> None:
QDesktopServices.openUrl(QUrl("https://mycloud.ultimaker.com"))
## Gets the cluster response from which this device was created.
@property
def clusterData(self) -> CloudClusterResponse:
return self._cluster
## Updates the cluster data from the cloud.
@clusterData.setter
def clusterData(self, value: CloudClusterResponse) -> None:
self._cluster = value

View file

@ -11,11 +11,12 @@ from UM.Signal import Signal
from cura.API import Account
from cura.CuraApplication import CuraApplication
from cura.Settings.GlobalStack import GlobalStack
from .CloudApiClient import CloudApiClient
from .CloudOutputDevice import CloudOutputDevice
from plugins.UM3NetworkPrinting.src.Models.CloudClusterResponse import CloudClusterResponse
from plugins.UM3NetworkPrinting.src.Models.CloudError import CloudError
from plugins.UM3NetworkPrinting.src.Utils import findChanges
from ..Models.Http.CloudClusterResponse import CloudClusterResponse
from ..Models.Http.CloudError import CloudError
from ..Utils import findChanges
## The cloud output device manager is responsible for using the Ultimaker Cloud APIs to manage remote clusters.
@ -186,14 +187,9 @@ class CloudOutputDeviceManager:
## Handles an API error received from the cloud.
# \param errors: The errors received
def _onApiError(self, errors: List[CloudError] = None) -> None:
@staticmethod
def _onApiError(errors: List[CloudError] = None) -> None:
Logger.log("w", str(errors))
message = Message(
text = self.I18N_CATALOG.i18nc("@info:description", "There was an error connecting to the cloud."),
title = self.I18N_CATALOG.i18nc("@info:title", "Error"),
lifetime = 10
)
message.show()
## Starts running the cloud output device manager, thus periodically requesting cloud data.
def start(self):

View file

@ -6,7 +6,8 @@ from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply, QNetworkAccessManage
from typing import Optional, Callable, Any, Tuple, cast
from UM.Logger import Logger
from plugins.UM3NetworkPrinting.src.Models.CloudPrintJobResponse import CloudPrintJobResponse
from ..Models.Http.CloudPrintJobResponse import CloudPrintJobResponse
## Class responsible for uploading meshes to the cloud in separate requests.

View file

@ -2,18 +2,13 @@
# Cura is released under the terms of the LGPLv3 or higher.
from cura.PrinterOutput.Models.PrintJobOutputModel import PrintJobOutputModel
from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .CloudOutputDevice import CloudOutputDevice
from cura.PrinterOutput.PrinterOutputDevice import PrinterOutputDevice
class CloudOutputController(PrinterOutputController):
def __init__(self, output_device: "CloudOutputDevice") -> None:
class ClusterOutputController(PrinterOutputController):
def __init__(self, output_device: PrinterOutputDevice) -> None:
super().__init__(output_device)
# The cloud connection only supports fetching the printer and queue status and adding a job to the queue.
# To let the UI know this we mark all features below as False.
self.can_pause = True
self.can_abort = True
self.can_pre_heat_bed = False
@ -22,5 +17,5 @@ class CloudOutputController(PrinterOutputController):
self.can_control_manually = False
self.can_update_firmware = False
def setJobState(self, job: "PrintJobOutputModel", state: str):
def setJobState(self, job: PrintJobOutputModel, state: str):
self._output_device.setJobState(job.key, state)

View file

@ -1,30 +0,0 @@
from typing import List, Any, Dict
from PyQt5.QtCore import QUrl
from cura.PrinterOutput.Models.PrinterOutputModel import PrinterOutputModel
from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
from plugins.UM3NetworkPrinting.src.Models.ConfigurationChangeModel import ConfigurationChangeModel
class PrinterModelFactory:
CAMERA_URL_FORMAT = "http://{ip_address}:8080/?action=stream"
# Create a printer output model from some data.
@classmethod
def createPrinter(cls, output_controller: PrinterOutputController, ip_address: str, extruder_count: int = 2
) -> PrinterOutputModel:
printer = PrinterOutputModel(output_controller=output_controller, number_of_extruders=extruder_count)
printer.setCameraUrl(QUrl(cls.CAMERA_URL_FORMAT.format(ip_address=ip_address)))
return printer
# Create a list of configuration change models.
@classmethod
def createConfigurationChanges(cls, data: List[Dict[str, Any]]) -> List[ConfigurationChangeModel]:
return [ConfigurationChangeModel(
type_of_change=change.get("type_of_change"),
index=change.get("index"),
target_name=change.get("target_name"),
origin_name=change.get("origin_name")
) for change in data]

View file

@ -1,55 +0,0 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from datetime import datetime, timezone
from typing import Dict, Union, TypeVar, Type, List, Any
from plugins.UM3NetworkPrinting.src.Models.BaseModel import BaseModel
## Base class for the models used in the interface with the Ultimaker cloud APIs.
class BaseCloudModel(BaseModel):
## Checks whether the two models are equal.
# \param other: The other model.
# \return True if they are equal, False if they are different.
def __eq__(self, other):
return type(self) == type(other) and self.toDict() == other.toDict()
## Checks whether the two models are different.
# \param other: The other model.
# \return True if they are different, False if they are the same.
def __ne__(self, other) -> bool:
return type(self) != type(other) or self.toDict() != other.toDict()
## Converts the model into a serializable dictionary
def toDict(self) -> Dict[str, Any]:
return self.__dict__
# Type variable used in the parse methods below, which should be a subclass of BaseModel.
T = TypeVar("T", bound=BaseModel)
## Parses a single model.
# \param model_class: The model class.
# \param values: The value of the model, which is usually a dictionary, but may also be already parsed.
# \return An instance of the model_class given.
@staticmethod
def parseModel(model_class: Type[T], values: Union[T, Dict[str, Any]]) -> T:
if isinstance(values, dict):
return model_class(**values)
return values
## Parses a list of models.
# \param model_class: The model class.
# \param values: The value of the list. Each value is usually a dictionary, but may also be already parsed.
# \return A list of instances of the model_class given.
@classmethod
def parseModels(cls, model_class: Type[T], values: List[Union[T, Dict[str, Any]]]) -> List[T]:
return [cls.parseModel(model_class, value) for value in values]
## Parses the given date string.
# \param date: The date to parse.
# \return The parsed date.
@staticmethod
def parseDate(date: Union[str, datetime]) -> datetime:
if isinstance(date, datetime):
return date
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)

View file

@ -1,5 +1,10 @@
## Base model that maps kwargs to instance attributes.
from datetime import datetime, timezone
from typing import TypeVar, Dict, List, Any, Type, Union
class BaseModel:
def __init__(self, **kwargs) -> None:
self.__dict__.update(kwargs)
self.validate()
@ -7,3 +12,49 @@ class BaseModel:
# Validates the model, raising an exception if the model is invalid.
def validate(self) -> None:
pass
## Checks whether the two models are equal.
# \param other: The other model.
# \return True if they are equal, False if they are different.
def __eq__(self, other):
return type(self) == type(other) and self.toDict() == other.toDict()
## Checks whether the two models are different.
# \param other: The other model.
# \return True if they are different, False if they are the same.
def __ne__(self, other) -> bool:
return type(self) != type(other) or self.toDict() != other.toDict()
## Converts the model into a serializable dictionary
def toDict(self) -> Dict[str, Any]:
return self.__dict__
# Type variable used in the parse methods below, which should be a subclass of BaseModel.
T = TypeVar("T", bound="BaseModel")
## Parses a single model.
# \param model_class: The model class.
# \param values: The value of the model, which is usually a dictionary, but may also be already parsed.
# \return An instance of the model_class given.
@staticmethod
def parseModel(model_class: Type[T], values: Union[T, Dict[str, Any]]) -> T:
if isinstance(values, dict):
return model_class(**values)
return values
## Parses a list of models.
# \param model_class: The model class.
# \param values: The value of the list. Each value is usually a dictionary, but may also be already parsed.
# \return A list of instances of the model_class given.
@classmethod
def parseModels(cls, model_class: Type[T], values: List[Union[T, Dict[str, Any]]]) -> List[T]:
return [cls.parseModel(model_class, value) for value in values]
## Parses the given date string.
# \param date: The date to parse.
# \return The parsed date.
@staticmethod
def parseDate(date: Union[str, datetime]) -> datetime:
if isinstance(date, datetime):
return date
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)

View file

@ -2,12 +2,12 @@
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Optional
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
## Class representing a cloud connected cluster.
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterResponse(BaseCloudModel):
class CloudClusterResponse(BaseModel):
## Creates a new cluster response object.
# \param cluster_id: The secret unique ID, e.g. 'kBEeZWEifXbrXviO8mRYLx45P8k5lHVGs43XKvRniPg='.
# \param host_guid: The unique identifier of the print cluster host, e.g. 'e90ae0ac-1257-4403-91ee-a44c9b7e8050'.

View file

@ -3,24 +3,24 @@
from datetime import datetime
from typing import List, Dict, Union, Any
from .CloudClusterPrinterStatus import CloudClusterPrinterStatus
from .CloudClusterPrintJobStatus import CloudClusterPrintJobStatus
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
from .ClusterPrinterStatus import ClusterPrinterStatus
from .ClusterPrintJobStatus import ClusterPrintJobStatus
# Model that represents the status of the cluster for the cloud
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterStatus(BaseCloudModel):
class CloudClusterStatus(BaseModel):
## Creates a new cluster status model object.
# \param printers: The latest status of each printer in the cluster.
# \param print_jobs: The latest status of each print job in the cluster.
# \param generated_time: The datetime when the object was generated on the server-side.
def __init__(self,
printers: List[Union[CloudClusterPrinterStatus, Dict[str, Any]]],
print_jobs: List[Union[CloudClusterPrintJobStatus, Dict[str, Any]]],
printers: List[Union[ClusterPrinterStatus, Dict[str, Any]]],
print_jobs: List[Union[ClusterPrintJobStatus, Dict[str, Any]]],
generated_time: Union[str, datetime],
**kwargs) -> None:
self.generated_time = self.parseDate(generated_time)
self.printers = self.parseModels(CloudClusterPrinterStatus, printers)
self.print_jobs = self.parseModels(CloudClusterPrintJobStatus, print_jobs)
self.printers = self.parseModels(ClusterPrinterStatus, printers)
self.print_jobs = self.parseModels(ClusterPrintJobStatus, print_jobs)
super().__init__(**kwargs)

View file

@ -2,12 +2,12 @@
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Dict, Optional, Any
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
## Class representing errors generated by the cloud servers, according to the JSON-API standard.
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudError(BaseCloudModel):
class CloudError(BaseModel):
## Creates a new error object.
# \param id: Unique identifier for this particular occurrence of the problem.
# \param title: A short, human-readable summary of the problem that SHOULD NOT change from occurrence to occurrence

View file

@ -2,12 +2,12 @@
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Optional
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
# Model that represents the response received from the cloud after requesting to upload a print job
# Spec: https://api-staging.ultimaker.com/cura/v1/spec
class CloudPrintJobResponse(BaseCloudModel):
class CloudPrintJobResponse(BaseModel):
## Creates a new print job response model.
# \param job_id: The job unique ID, e.g. 'kBEeZWEifXbrXviO8mRYLx45P8k5lHVGs43XKvRniPg='.
# \param status: The status of the print job.

View file

@ -1,11 +1,11 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
# Model that represents the request to upload a print job to the cloud
# Spec: https://api-staging.ultimaker.com/cura/v1/spec
class CloudPrintJobUploadRequest(BaseCloudModel):
class CloudPrintJobUploadRequest(BaseModel):
## Creates a new print job upload request.
# \param job_name: The name of the print job.
# \param file_size: The size of the file in bytes.

View file

@ -3,12 +3,12 @@
from datetime import datetime
from typing import Optional, Union
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
# Model that represents the responses received from the cloud after requesting a job to be printed.
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudPrintResponse(BaseCloudModel):
class CloudPrintResponse(BaseModel):
## Creates a new print response object.
# \param job_id: The unique ID of a print job inside of the cluster. This ID is generated by Cura Connect.
# \param status: The status of the print request (queued or failed).

View file

@ -1,11 +1,11 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
## Class representing a cluster printer
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterBuildPlate(BaseCloudModel):
class ClusterBuildPlate(BaseModel):
## Create a new build plate
# \param type: The type of build plate glass or aluminium
def __init__(self, type: str = "glass", **kwargs) -> None:

View file

@ -4,23 +4,23 @@ from typing import Union, Dict, Optional, Any
from cura.PrinterOutput.Models.ExtruderConfigurationModel import ExtruderConfigurationModel
from cura.PrinterOutput.Models.ExtruderOutputModel import ExtruderOutputModel
from .CloudClusterPrinterConfigurationMaterial import CloudClusterPrinterConfigurationMaterial
from .BaseCloudModel import BaseCloudModel
from .ClusterPrinterConfigurationMaterial import ClusterPrinterConfigurationMaterial
from ..BaseModel import BaseModel
## Class representing a cloud cluster printer configuration
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrintCoreConfiguration(BaseCloudModel):
class ClusterPrintCoreConfiguration(BaseModel):
## Creates a new cloud cluster printer configuration object
# \param extruder_index: The position of the extruder on the machine as list index. Numbered from left to right.
# \param material: The material of a configuration object in a cluster printer. May be in a dict or an object.
# \param nozzle_diameter: The diameter of the print core at this position in millimeters, e.g. '0.4'.
# \param print_core_id: The type of print core inserted at this position, e.g. 'AA 0.4'.
def __init__(self, extruder_index: int,
material: Union[None, Dict[str, Any], CloudClusterPrinterConfigurationMaterial],
material: Union[None, Dict[str, Any], ClusterPrinterConfigurationMaterial],
print_core_id: Optional[str] = None, **kwargs) -> None:
self.extruder_index = extruder_index
self.material = self.parseModel(CloudClusterPrinterConfigurationMaterial, material) if material else None
self.material = self.parseModel(ClusterPrinterConfigurationMaterial, material) if material else None
self.print_core_id = print_core_id
super().__init__(**kwargs)

View file

@ -2,12 +2,12 @@
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Optional
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
## Model for the types of changes that are needed before a print job can start
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrintJobConfigurationChange(BaseCloudModel):
class ClusterPrintJobConfigurationChange(BaseModel):
## Creates a new print job constraint.
# \param type_of_change: The type of configuration change, one of: "material", "print_core_change"
# \param index: The hotend slot or extruder index to change

View file

@ -2,12 +2,12 @@
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Optional
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
## Class representing a cloud cluster print job constraint
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrintJobConstraints(BaseCloudModel):
class ClusterPrintJobConstraints(BaseModel):
## Creates a new print job constraint.
# \param require_printer_name: Unique name of the printer that this job should be printed on.
# Should be one of the unique_name field values in the cluster, e.g. 'ultimakersystem-ccbdd30044ec'

View file

@ -1,13 +1,14 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
## Class representing the reasons that prevent this job from being printed on the associated printer
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrintJobImpediment(BaseCloudModel):
class ClusterPrintJobImpediment(BaseModel):
## Creates a new print job constraint.
# \param translation_key: A string indicating a reason the print cannot be printed, such as 'does_not_fit_in_build_volume'
# \param translation_key: A string indicating a reason the print cannot be printed,
# such as 'does_not_fit_in_build_volume'
# \param severity: A number indicating the severity of the problem, with higher being more severe
def __init__(self, translation_key: str, severity: int, **kwargs) -> None:
self.translation_key = translation_key

View file

@ -3,20 +3,21 @@
from typing import List, Optional, Union, Dict, Any
from cura.PrinterOutput.Models.PrinterConfigurationModel import PrinterConfigurationModel
from plugins.UM3NetworkPrinting.src.Models.UM3PrintJobOutputModel import UM3PrintJobOutputModel
from plugins.UM3NetworkPrinting.src.Models.ConfigurationChangeModel import ConfigurationChangeModel
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputController import CloudOutputController
from .BaseCloudModel import BaseCloudModel
from .CloudClusterBuildPlate import CloudClusterBuildPlate
from .CloudClusterPrintJobConfigurationChange import CloudClusterPrintJobConfigurationChange
from .CloudClusterPrintJobImpediment import CloudClusterPrintJobImpediment
from .CloudClusterPrintCoreConfiguration import CloudClusterPrintCoreConfiguration
from .CloudClusterPrintJobConstraint import CloudClusterPrintJobConstraints
from .ClusterBuildPlate import ClusterBuildPlate
from .ClusterPrintJobConfigurationChange import ClusterPrintJobConfigurationChange
from .ClusterPrintJobImpediment import ClusterPrintJobImpediment
from .ClusterPrintCoreConfiguration import ClusterPrintCoreConfiguration
from .ClusterPrintJobConstraint import ClusterPrintJobConstraints
from ..UM3PrintJobOutputModel import UM3PrintJobOutputModel
from ..ConfigurationChangeModel import ConfigurationChangeModel
from ..BaseModel import BaseModel
from ...ClusterOutputController import ClusterOutputController
## Model for the status of a single print job in a cluster.
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrintJobStatus(BaseCloudModel):
class ClusterPrintJobStatus(BaseModel):
## Creates a new cloud print job status model.
# \param assigned_to: The name of the printer this job is assigned to while being queued.
# \param configuration: The required print core configurations of this print job.
@ -45,21 +46,21 @@ class CloudClusterPrintJobStatus(BaseCloudModel):
# printer
def __init__(self, created_at: str, force: bool, machine_variant: str, name: str, started: bool, status: str,
time_total: int, uuid: str,
configuration: List[Union[Dict[str, Any], CloudClusterPrintCoreConfiguration]],
constraints: List[Union[Dict[str, Any], CloudClusterPrintJobConstraints]],
configuration: List[Union[Dict[str, Any], ClusterPrintCoreConfiguration]],
constraints: List[Union[Dict[str, Any], ClusterPrintJobConstraints]],
last_seen: Optional[float] = None, network_error_count: Optional[int] = None,
owner: Optional[str] = None, printer_uuid: Optional[str] = None, time_elapsed: Optional[int] = None,
assigned_to: Optional[str] = None, deleted_at: Optional[str] = None,
printed_on_uuid: Optional[str] = None,
configuration_changes_required: List[
Union[Dict[str, Any], CloudClusterPrintJobConfigurationChange]] = None,
build_plate: Union[Dict[str, Any], CloudClusterBuildPlate] = None,
Union[Dict[str, Any], ClusterPrintJobConfigurationChange]] = None,
build_plate: Union[Dict[str, Any], ClusterBuildPlate] = None,
compatible_machine_families: List[str] = None,
impediments_to_printing: List[Union[Dict[str, Any], CloudClusterPrintJobImpediment]] = None,
impediments_to_printing: List[Union[Dict[str, Any], ClusterPrintJobImpediment]] = None,
**kwargs) -> None:
self.assigned_to = assigned_to
self.configuration = self.parseModels(CloudClusterPrintCoreConfiguration, configuration)
self.constraints = self.parseModels(CloudClusterPrintJobConstraints, constraints)
self.configuration = self.parseModels(ClusterPrintCoreConfiguration, configuration)
self.constraints = self.parseModels(ClusterPrintJobConstraints, constraints)
self.created_at = created_at
self.force = force
self.last_seen = last_seen
@ -76,19 +77,19 @@ class CloudClusterPrintJobStatus(BaseCloudModel):
self.deleted_at = deleted_at
self.printed_on_uuid = printed_on_uuid
self.configuration_changes_required = self.parseModels(CloudClusterPrintJobConfigurationChange,
self.configuration_changes_required = self.parseModels(ClusterPrintJobConfigurationChange,
configuration_changes_required) \
if configuration_changes_required else []
self.build_plate = self.parseModel(CloudClusterBuildPlate, build_plate) if build_plate else None
self.build_plate = self.parseModel(ClusterBuildPlate, build_plate) if build_plate else None
self.compatible_machine_families = compatible_machine_families if compatible_machine_families else []
self.impediments_to_printing = self.parseModels(CloudClusterPrintJobImpediment, impediments_to_printing) \
self.impediments_to_printing = self.parseModels(ClusterPrintJobImpediment, impediments_to_printing) \
if impediments_to_printing else []
super().__init__(**kwargs)
## Creates an UM3 print job output model based on this cloud cluster print job.
# \param printer: The output model of the printer
def createOutputModel(self, controller: CloudOutputController) -> UM3PrintJobOutputModel:
def createOutputModel(self, controller: ClusterOutputController) -> UM3PrintJobOutputModel:
model = UM3PrintJobOutputModel(controller, self.uuid, self.name)
self.updateOutputModel(model)
return model

View file

@ -3,12 +3,13 @@ from typing import Optional
from UM.Logger import Logger
from cura.CuraApplication import CuraApplication
from cura.PrinterOutput.Models.MaterialOutputModel import MaterialOutputModel
from .BaseCloudModel import BaseCloudModel
from ..BaseModel import BaseModel
## Class representing a cloud cluster printer configuration
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrinterConfigurationMaterial(BaseCloudModel):
class ClusterPrinterConfigurationMaterial(BaseModel):
## Creates a new material configuration model.
# \param brand: The brand of material in this print core, e.g. 'Ultimaker'.
# \param color: The color of material in this print core, e.g. 'Blue'.

View file

@ -4,14 +4,14 @@ from typing import List, Union, Dict, Optional, Any
from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
from cura.PrinterOutput.Models.PrinterOutputModel import PrinterOutputModel
from .CloudClusterBuildPlate import CloudClusterBuildPlate
from .CloudClusterPrintCoreConfiguration import CloudClusterPrintCoreConfiguration
from .BaseCloudModel import BaseCloudModel
from .ClusterBuildPlate import ClusterBuildPlate
from .ClusterPrintCoreConfiguration import ClusterPrintCoreConfiguration
from ..BaseModel import BaseModel
## Class representing a cluster printer
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrinterStatus(BaseCloudModel):
class ClusterPrinterStatus(BaseModel):
## Creates a new cluster printer status
# \param enabled: A printer can be disabled if it should not receive new jobs. By default every printer is enabled.
# \param firmware_version: Firmware version installed on the printer. Can differ for each printer in a cluster.
@ -30,12 +30,12 @@ class CloudClusterPrinterStatus(BaseCloudModel):
# \param build_plate: The build plate that is on the printer
def __init__(self, enabled: bool, firmware_version: str, friendly_name: str, ip_address: str, machine_variant: str,
status: str, unique_name: str, uuid: str,
configuration: List[Union[Dict[str, Any], CloudClusterPrintCoreConfiguration]],
configuration: List[Union[Dict[str, Any], ClusterPrintCoreConfiguration]],
reserved_by: Optional[str] = None, maintenance_required: Optional[bool] = None,
firmware_update_status: Optional[str] = None, latest_available_firmware: Optional[str] = None,
build_plate: Union[Dict[str, Any], CloudClusterBuildPlate] = None, **kwargs) -> None:
build_plate: Union[Dict[str, Any], ClusterBuildPlate] = None, **kwargs) -> None:
self.configuration = self.parseModels(CloudClusterPrintCoreConfiguration, configuration)
self.configuration = self.parseModels(ClusterPrintCoreConfiguration, configuration)
self.enabled = enabled
self.firmware_version = firmware_version
self.friendly_name = friendly_name
@ -48,7 +48,7 @@ class CloudClusterPrinterStatus(BaseCloudModel):
self.maintenance_required = maintenance_required
self.firmware_update_status = firmware_update_status
self.latest_available_firmware = latest_available_firmware
self.build_plate = self.parseModel(CloudClusterBuildPlate, build_plate) if build_plate else None
self.build_plate = self.parseModel(ClusterBuildPlate, build_plate) if build_plate else None
super().__init__(**kwargs)
## Creates a new output model.

View file

@ -3,13 +3,13 @@ from plugins.UM3NetworkPrinting.src.Models.BaseModel import BaseModel
class LocalMaterial(BaseModel):
def __init__(self, GUID: str, id: str, version: int, **kwargs) -> None:
self.GUID = GUID # type: str
self.id = id # type: str
self.version = version # type: int
super().__init__(**kwargs)
#
def validate(self) -> None:
super().validate()
if not self.GUID:

View file

@ -1,10 +1,21 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Callable, List, Optional
import json
from json import JSONDecodeError
from typing import Callable, List, Optional, Dict, Union, Any, Type, cast, TypeVar, Tuple
from PyQt5.QtCore import QUrl
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from UM.Logger import Logger
from plugins.UM3NetworkPrinting.src.Models.BaseModel import BaseModel
## The generic type variable used to document the methods below.
from plugins.UM3NetworkPrinting.src.Models.Http.ClusterPrinterStatus import ClusterPrinterStatus
ClusterApiClientModel = TypeVar("ClusterApiClientModel", bound=BaseModel)
## The ClusterApiClient is responsible for all network calls to local network clusters.
class ClusterApiClient:
@ -30,32 +41,95 @@ class ClusterApiClient:
## Get printer system information.
# \param on_finished: The callback in case the response is successful.
def getSystem(self, on_finished: Callable) -> None:
url = f"{self.PRINTER_API_PREFIX}/system"
url = f"{self.PRINTER_API_PREFIX}/system/"
self._manager.get(self._createEmptyRequest(url))
## Get the printers in the cluster.
# \param on_finished: The callback in case the response is successful.
def getPrinters(self, on_finished: Callable[[List[ClusterPrinterStatus]], Any]) -> None:
url = f"{self.CLUSTER_API_PREFIX}/printers/"
reply = self._manager.get(self._createEmptyRequest(url))
self._addCallback(reply, on_finished)
self._addCallback(reply, on_finished, ClusterPrinterStatus)
## Get the print jobs in the cluster.
# \param on_finished: The callback in case the response is successful.
def getPrintJobs(self, on_finished: Callable) -> None:
url = f"{self.CLUSTER_API_PREFIX}/print_jobs/"
# reply = self._manager.get(self._createEmptyRequest(url))
# self._addCallback(reply, on_finished)
def requestPrint(self) -> None:
pass
## Send a print job action to the cluster.
# \param print_job_uuid: The UUID of the print job to perform the action on.
# \param action: The action to perform.
# \param data: The optional data to send along, used for 'move' and 'duplicate'.
def doPrintJobAction(self, print_job_uuid: str, action: str, data: Optional[Dict[str, Union[str, int]]] = None
) -> None:
url = f"{self.CLUSTER_API_PREFIX}/print_jobs/{print_job_uuid}/action/{action}/"
body = json.loads(data).encode() if data else b""
self._manager.put(self._createEmptyRequest(url), body)
## We override _createEmptyRequest in order to add the user credentials.
# \param url: The URL to request
# \param content_type: The type of the body contents.
def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
request = QNetworkRequest(QUrl(self._address + path))
url = QUrl("http://" + self._address + path)
request = QNetworkRequest(url)
if content_type:
request.setHeader(QNetworkRequest.ContentTypeHeader, content_type)
return request
## Parses the given JSON network reply into a status code and a dictionary, handling unexpected errors as well.
# \param reply: The reply from the server.
# \return A tuple with a status code and a dictionary.
@staticmethod
def _parseReply(reply: QNetworkReply) -> Tuple[int, Dict[str, Any]]:
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
try:
response = bytes(reply.readAll()).decode()
return status_code, json.loads(response)
except (UnicodeDecodeError, JSONDecodeError, ValueError) as err:
Logger.logException("e", "Could not parse the cluster response: %s", err)
return status_code, {"errors": [err]}
## Parses the given models and calls the correct callback depending on the result.
# \param response: The response from the server, after being converted to a dict.
# \param on_finished: The callback in case the response is successful.
# \param model_class: The type of the model to convert the response to. It may either be a single record or a list.
def _parseModels(self, response: Dict[str, Any],
on_finished: Union[Callable[[ClusterApiClientModel], Any],
Callable[[List[ClusterApiClientModel]], Any]],
model_class: Type[ClusterApiClientModel]) -> None:
if isinstance(response, list):
results = [model_class(**c) for c in response] # type: List[ClusterApiClientModel]
on_finished_list = cast(Callable[[List[ClusterApiClientModel]], Any], on_finished)
on_finished_list(results)
else:
result = model_class(**response) # type: ClusterApiClientModel
on_finished_item = cast(Callable[[ClusterApiClientModel], Any], on_finished)
on_finished_item(result)
## Creates a callback function so that it includes the parsing of the response into the correct model.
# The callback is added to the 'finished' signal of the reply.
# \param reply: The reply that should be listened to.
# \param on_finished: The callback in case the response is successful.
def _addCallback(self, reply: QNetworkReply, on_finished: Callable) -> None:
def _addCallback(self,
reply: QNetworkReply,
on_finished: Union[Callable[[ClusterApiClientModel], Any],
Callable[[List[ClusterApiClientModel]], Any]],
model: Type[ClusterApiClientModel],
) -> None:
def parse() -> None:
# Don't try to parse the reply if we didn't get one
if reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) is None:
return
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
response = bytes(reply.readAll()).decode()
status_code, response = self._parseReply(reply)
self._anti_gc_callbacks.remove(parse)
on_finished(int(status_code), response)
if on_finished:
self._parseModels(response, on_finished, model)
return
self._anti_gc_callbacks.append(parse)
if on_finished:
reply.finished.connect(parse)

View file

@ -1,218 +1,73 @@
# Copyright (c) 2019 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Any, cast, Tuple, Union, Optional, Dict, List
from time import time
from typing import Optional, Dict, List, Any
import io # To create the correct buffers for sending data to the printer.
import json
import os
from PyQt5.QtGui import QDesktopServices
from PyQt5.QtCore import pyqtSlot, QUrl, pyqtSignal, pyqtProperty
from UM.FileHandler.FileHandler import FileHandler
from UM.FileHandler.WriteFileJob import WriteFileJob # To call the file writer asynchronously.
from UM.i18n import i18nCatalog
from UM.Logger import Logger
from UM.Message import Message
from UM.PluginRegistry import PluginRegistry
from UM.Scene.SceneNode import SceneNode # For typing.
from UM.Settings.ContainerRegistry import ContainerRegistry
from cura.CuraApplication import CuraApplication
from cura.PrinterOutput.Models.PrinterConfigurationModel import PrinterConfigurationModel
from cura.PrinterOutput.Models.ExtruderConfigurationModel import ExtruderConfigurationModel
from cura.PrinterOutput.Models.PrinterOutputModel import PrinterOutputModel
from cura.PrinterOutput.Models.MaterialOutputModel import MaterialOutputModel
from UM.i18n import i18nCatalog
from UM.Scene.SceneNode import SceneNode
from cura.PrinterOutput.NetworkedPrinterOutputDevice import AuthState
from cura.PrinterOutput.PrinterOutputDevice import ConnectionType
from plugins.UM3NetworkPrinting.src.Factories.PrinterModelFactory import PrinterModelFactory
from plugins.UM3NetworkPrinting.src.UltimakerNetworkedPrinterOutputDevice import UltimakerNetworkedPrinterOutputDevice
from plugins.UM3NetworkPrinting.src.Network.ClusterUM3PrinterOutputController import ClusterUM3PrinterOutputController
from plugins.UM3NetworkPrinting.src.MeshFormatHandler import MeshFormatHandler
from plugins.UM3NetworkPrinting.src.SendMaterialJob import SendMaterialJob
from plugins.UM3NetworkPrinting.src.Models.UM3PrintJobOutputModel import UM3PrintJobOutputModel
from .ClusterApiClient import ClusterApiClient
from ..SendMaterialJob import SendMaterialJob
from ..UltimakerNetworkedPrinterOutputDevice import UltimakerNetworkedPrinterOutputDevice
from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
from PyQt5.QtGui import QDesktopServices, QImage
from PyQt5.QtCore import pyqtSlot, QUrl, pyqtSignal, pyqtProperty, QObject
i18n_catalog = i18nCatalog("cura")
I18N_CATALOG = i18nCatalog("cura")
class ClusterUM3OutputDevice(UltimakerNetworkedPrinterOutputDevice):
activeCameraUrlChanged = pyqtSignal()
def __init__(self, device_id, address, properties, parent = None) -> None:
super().__init__(device_id = device_id, address = address, properties=properties, connection_type = ConnectionType.NetworkConnection, parent = parent)
self._api_prefix = "/cluster-api/v1/"
def __init__(self, device_id: str, address: str, properties: Dict[bytes, bytes], parent=None) -> None:
self._application = CuraApplication.getInstance()
super().__init__(
device_id=device_id,
address=address,
properties=properties,
connection_type=ConnectionType.NetworkConnection,
parent=parent
)
self._number_of_extruders = 2
# API client for making requests to the print cluster.
self._cluster_api = ClusterApiClient(address, on_error=self._onNetworkError)
# We don't have authentication over local networking, so we're always authenticated.
self.setAuthenticationState(AuthState.Authenticated)
self._dummy_lambdas = (
"", {}, io.BytesIO()
) # type: Tuple[Optional[str], Dict[str, Union[str, int, bool]], Union[io.StringIO, io.BytesIO]]
if PluginRegistry.getInstance() is not None:
plugin_path = PluginRegistry.getInstance().getPluginPath("UM3NetworkPrinting")
if plugin_path is None:
Logger.log("e", "Cloud not find plugin path for plugin UM3NetworkPrnting")
raise RuntimeError("Cloud not find plugin path for plugin UM3NetworkPrnting")
self._monitor_view_qml_path = os.path.join(plugin_path, "resources", "qml", "MonitorStage.qml")
# Trigger the printersChanged signal when the private signal is triggered
self.printersChanged.connect(self._clusterPrintersChanged)
self._accepts_commands = True # type: bool
self._error_message = None # type: Optional[Message]
self._write_job_progress_message = None # type: Optional[Message]
self._progress_message = None # type: Optional[Message]
self._printer_selection_dialog = None # type: QObject
self.setPriority(3) # Make sure the output device gets selected above local file output
self.setName(self._id)
self.setShortDescription(i18n_catalog.i18nc("@action:button Preceded by 'Ready to'.", "Print over network"))
self.setDescription(i18n_catalog.i18nc("@properties:tooltip", "Print over network"))
self.setConnectionText(i18n_catalog.i18nc("@info:status", "Connected over the network"))
self._printer_uuid_to_unique_name_mapping = {} # type: Dict[str, str]
self._finished_jobs = [] # type: List[UM3PrintJobOutputModel]
self._cluster_size = int(properties.get(b"cluster_size", 0)) # type: int
self._latest_reply_handler = None # type: Optional[QNetworkReply]
self._sending_job = None
self._setInterfaceElements()
self._active_camera_url = QUrl() # type: QUrl
def requestWrite(self, nodes: List["SceneNode"], file_name: Optional[str] = None, limit_mimetypes: bool = False,
file_handler: Optional["FileHandler"] = None, filter_by_machine: bool = False, **kwargs) -> None:
self.writeStarted.emit(self)
# self._number_of_extruders = 2
# self._dummy_lambdas = (
# "", {}, io.BytesIO()
# ) # type: Tuple[Optional[str], Dict[str, Union[str, int, bool]], Union[io.StringIO, io.BytesIO]]
# self._error_message = None # type: Optional[Message]
# self._write_job_progress_message = None # type: Optional[Message]
# self._progress_message = None # type: Optional[Message]
# self._printer_selection_dialog = None # type: QObject
# self._printer_uuid_to_unique_name_mapping = {} # type: Dict[str, str]
# self._finished_jobs = [] # type: List[UM3PrintJobOutputModel]
# self._sending_job = None
## Set all the interface elements and texts for this output device.
def _setInterfaceElements(self) -> None:
self.setPriority(3) # Make sure the output device gets selected above local file output
self.setName(self._id)
self.setShortDescription(I18N_CATALOG.i18nc("@action:button Preceded by 'Ready to'.", "Print over network"))
self.setDescription(I18N_CATALOG.i18nc("@properties:tooltip", "Print over network"))
self.setConnectionText(I18N_CATALOG.i18nc("@info:status", "Connected over the network"))
## Called when the connection to the cluster changes.
def connect(self) -> None:
super().connect()
self.sendMaterialProfiles()
mesh_format = MeshFormatHandler(file_handler, self.firmwareVersion)
# This function pauses with the yield, waiting on instructions on which printer it needs to print with.
if not mesh_format.is_valid:
Logger.log("e", "Missing file or mesh writer!")
return
self._sending_job = self._sendPrintJob(mesh_format, nodes)
if self._sending_job is not None:
self._sending_job.send(None) # Start the generator.
if len(self._printers) > 1: # We need to ask the user.
self._spawnPrinterSelectionDialog()
else: # Just immediately continue.
self._sending_job.send("") # No specifically selected printer.
self._sending_job.send(None)
def _spawnPrinterSelectionDialog(self):
if self._printer_selection_dialog is None:
if PluginRegistry.getInstance() is not None:
path = os.path.join(
PluginRegistry.getInstance().getPluginPath("UM3NetworkPrinting"),
"resources", "qml", "PrintWindow.qml"
)
self._printer_selection_dialog = self._application.createQmlComponent(path, {"OutputDevice": self})
if self._printer_selection_dialog is not None:
self._printer_selection_dialog.show()
## Allows the user to choose a printer to print with from the printer
# selection dialogue.
# \param target_printer The name of the printer to target.
@pyqtSlot(str)
def selectPrinter(self, target_printer: str = "") -> None:
if self._sending_job is not None:
self._sending_job.send(target_printer)
@pyqtSlot()
def cancelPrintSelection(self) -> None:
self._sending_gcode = False
## Greenlet to send a job to the printer over the network.
#
# This greenlet gets called asynchronously in requestWrite. It is a
# greenlet in order to optionally wait for selectPrinter() to select a
# printer.
# The greenlet yields exactly three times: First time None,
# \param mesh_format Object responsible for choosing the right kind of format to write with.
def _sendPrintJob(self, mesh_format: MeshFormatHandler, nodes: List[SceneNode]):
Logger.log("i", "Sending print job to printer.")
if self._sending_gcode:
self._error_message = Message(
i18n_catalog.i18nc("@info:status",
"Sending new jobs (temporarily) blocked, still sending the previous print job."))
self._error_message.show()
yield #Wait on the user to select a target printer.
yield #Wait for the write job to be finished.
yield False #Return whether this was a success or not.
yield #Prevent StopIteration.
self._sending_gcode = True
# Potentially wait on the user to select a target printer.
target_printer = yield # type: Optional[str]
# Using buffering greatly reduces the write time for many lines of gcode
stream = mesh_format.createStream()
job = WriteFileJob(mesh_format.writer, stream, nodes, mesh_format.file_mode)
self._write_job_progress_message = Message(i18n_catalog.i18nc("@info:status", "Sending data to printer"),
lifetime = 0, dismissable = False, progress = -1,
title = i18n_catalog.i18nc("@info:title", "Sending Data"),
use_inactivity_timer = False)
self._write_job_progress_message.show()
if mesh_format.preferred_format is not None:
self._dummy_lambdas = (target_printer, mesh_format.preferred_format, stream)
job.finished.connect(self._sendPrintJobWaitOnWriteJobFinished)
job.start()
yield True # Return that we had success!
yield # To prevent having to catch the StopIteration exception.
def _sendPrintJobWaitOnWriteJobFinished(self, job: WriteFileJob) -> None:
if self._write_job_progress_message:
self._write_job_progress_message.hide()
self._progress_message = Message(i18n_catalog.i18nc("@info:status", "Sending data to printer"), lifetime = 0,
dismissable = False, progress = -1,
title = i18n_catalog.i18nc("@info:title", "Sending Data"))
self._progress_message.addAction("Abort", i18n_catalog.i18nc("@action:button", "Cancel"), icon = "",
description = "")
self._progress_message.actionTriggered.connect(self._progressMessageActionTriggered)
self._progress_message.show()
parts = []
target_printer, preferred_format, stream = self._dummy_lambdas
# If a specific printer was selected, it should be printed with that machine.
if target_printer:
target_printer = self._printer_uuid_to_unique_name_mapping[target_printer]
parts.append(self._createFormPart("name=require_printer_name", bytes(target_printer, "utf-8"), "text/plain"))
# Add user name to the print_job
parts.append(self._createFormPart("name=owner", bytes(self._getUserName(), "utf-8"), "text/plain"))
file_name = self._application.getPrintInformation().jobName + "." + preferred_format["extension"]
output = stream.getvalue() # Either str or bytes depending on the output mode.
if isinstance(stream, io.StringIO):
output = cast(str, output).encode("utf-8")
output = cast(bytes, output)
parts.append(self._createFormPart("name=\"file\"; filename=\"%s\"" % file_name, output))
self._latest_reply_handler = self.postFormWithParts("print_jobs/", parts,
on_finished = self._onPostPrintJobFinished,
on_progress = self._onUploadPrintJobProgress)
@pyqtProperty(QUrl, notify=activeCameraUrlChanged)
def activeCameraUrl(self) -> QUrl:
return self._active_camera_url
@ -223,63 +78,6 @@ class ClusterUM3OutputDevice(UltimakerNetworkedPrinterOutputDevice):
self._active_camera_url = camera_url
self.activeCameraUrlChanged.emit()
## The IP address of the printer.
@pyqtProperty(str, constant = True)
def address(self) -> str:
return self._address
def _onPostPrintJobFinished(self, reply: QNetworkReply) -> None:
if self._progress_message:
self._progress_message.hide()
self._compressing_gcode = False
self._sending_gcode = False
def _onUploadPrintJobProgress(self, bytes_sent: int, bytes_total: int) -> None:
if bytes_total > 0:
new_progress = bytes_sent / bytes_total * 100
# Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get
# timeout responses if this happens.
self._last_response_time = time()
if self._progress_message is not None and new_progress != self._progress_message.getProgress():
self._progress_message.show() # Ensure that the message is visible.
self._progress_message.setProgress(bytes_sent / bytes_total * 100)
# If successfully sent:
if bytes_sent == bytes_total:
# Show a confirmation to the user so they know the job was sucessful and provide the option to switch to
# the monitor tab.
self._success_message = Message(
i18n_catalog.i18nc("@info:status", "Print job was successfully sent to the printer."),
lifetime=5, dismissable=True,
title=i18n_catalog.i18nc("@info:title", "Data Sent"))
self._success_message.addAction("View", i18n_catalog.i18nc("@action:button", "View in Monitor"), icon = "",
description="")
self._success_message.actionTriggered.connect(self._successMessageActionTriggered)
self._success_message.show()
else:
if self._progress_message is not None:
self._progress_message.setProgress(0)
self._progress_message.hide()
def _progressMessageActionTriggered(self, message_id: Optional[str] = None, action_id: Optional[str] = None) -> None:
if action_id == "Abort":
Logger.log("d", "User aborted sending print to remote.")
if self._progress_message is not None:
self._progress_message.hide()
self._compressing_gcode = False
self._sending_gcode = False
self._application.getController().setActiveStage("PrepareStage")
# After compressing the sliced model Cura sends data to printer, to stop receiving updates from the request
# the "reply" should be disconnected
if self._latest_reply_handler:
self._latest_reply_handler.disconnect()
self._latest_reply_handler = None
def _successMessageActionTriggered(self, message_id: Optional[str] = None, action_id: Optional[str] = None) -> None:
if action_id == "View":
self._application.getController().setActiveStage("MonitorStage")
@pyqtSlot(name="openPrintJobControlPanel")
def openPrintJobControlPanel(self) -> None:
QDesktopServices.openUrl(QUrl("http://" + self._address + "/print_jobs"))
@ -290,338 +88,282 @@ class ClusterUM3OutputDevice(UltimakerNetworkedPrinterOutputDevice):
@pyqtSlot(str, name="sendJobToTop")
def sendJobToTop(self, print_job_uuid: str) -> None:
# This function is part of the output device (and not of the printjob output model) as this type of operation
# is a modification of the cluster queue and not of the actual job.
data = "{\"to_position\": 0}"
self.put("print_jobs/{uuid}/move_to_position".format(uuid = print_job_uuid), data, on_finished=None)
self._cluster_api.doPrintJobAction(print_job_uuid, "move", {"to_position": 0, "list": "queued"})
@pyqtSlot(str, name="deleteJobFromQueue")
def deleteJobFromQueue(self, print_job_uuid: str) -> None:
# This function is part of the output device (and not of the printjob output model) as this type of operation
# is a modification of the cluster queue and not of the actual job.
self.delete("print_jobs/{uuid}".format(uuid = print_job_uuid), on_finished=None)
self._cluster_api.doPrintJobAction(print_job_uuid, "delete")
@pyqtSlot(str, name="forceSendJob")
def forceSendJob(self, print_job_uuid: str) -> None:
data = "{\"force\": true}"
self.put("print_jobs/{uuid}".format(uuid=print_job_uuid), data, on_finished=None)
self._cluster_api.doPrintJobAction(print_job_uuid, "force")
# Set the remote print job state.
def setJobState(self, print_job_uuid: str, state: str) -> None:
# We rewrite 'resume' to 'print' here because we are using the old print job action endpoints.
action = "print" if state == "resume" else state
data = "{\"action\": \"%s\"}" % action
self.put("print_jobs/%s/action" % print_job_uuid, data, on_finished=None)
## Set the remote print job state.
# \param print_job_uuid: The UUID of the print job to set the state for.
# \param action: The action to undertake ('pause', 'resume', 'abort').
def setJobState(self, print_job_uuid: str, action: str) -> None:
self._cluster_api.doPrintJobAction(print_job_uuid, action)
def _printJobStateChanged(self) -> None:
username = self._getUserName()
if username is None:
return # We only want to show notifications if username is set.
finished_jobs = [job for job in self._print_jobs if job.state == "wait_cleanup"]
newly_finished_jobs = [job for job in finished_jobs if job not in self._finished_jobs and job.owner == username]
for job in newly_finished_jobs:
if job.assignedPrinter:
job_completed_text = i18n_catalog.i18nc("@info:status", "Printer '{printer_name}' has finished printing '{job_name}'.").format(printer_name=job.assignedPrinter.name, job_name = job.name)
else:
job_completed_text = i18n_catalog.i18nc("@info:status", "The print job '{job_name}' was finished.").format(job_name = job.name)
job_completed_message = Message(text=job_completed_text, title = i18n_catalog.i18nc("@info:status", "Print finished"))
job_completed_message.show()
# Ensure UI gets updated
self.printJobsChanged.emit()
# Keep a list of all completed jobs so we know if something changed next time.
self._finished_jobs = finished_jobs
## Called when the connection to the cluster changes.
def connect(self) -> None:
super().connect()
self.sendMaterialProfiles()
def _onGetPreviewImageFinished(self, reply: QNetworkReply) -> None:
reply_url = reply.url().toString()
uuid = reply_url[reply_url.find("print_jobs/")+len("print_jobs/"):reply_url.rfind("/preview_image")]
print_job = findByKey(self._print_jobs, uuid)
if print_job:
image = QImage()
image.loadFromData(reply.readAll())
print_job.updatePreviewImage(image)
## Handle network errors.
@staticmethod
def _onNetworkError(errors: Dict[str, Any]):
Logger.log("w", str(errors))
def _update(self) -> None:
super()._update()
self.get("printers/", on_finished = self._onGetPrintersDataFinished)
self.get("print_jobs/", on_finished = self._onGetPrintJobsFinished)
for print_job in self._print_jobs:
if print_job.getPreviewImage() is None:
self.get("print_jobs/{uuid}/preview_image".format(uuid=print_job.key), on_finished=self._onGetPreviewImageFinished)
def _onGetPrintJobsFinished(self, reply: QNetworkReply) -> None:
if not checkValidGetReply(reply):
return
result = loadJsonFromReply(reply)
if result is None:
return
print_jobs_seen = []
job_list_changed = False
for idx, print_job_data in enumerate(result):
print_job = findByKey(self._print_jobs, print_job_data["uuid"])
if print_job is None:
print_job = self._createPrintJobModel(print_job_data)
job_list_changed = True
elif not job_list_changed:
# Check if the order of the jobs has changed since the last check
if self._print_jobs.index(print_job) != idx:
job_list_changed = True
self._updatePrintJob(print_job, print_job_data)
if print_job.state != "queued" and print_job.state != "error": # Print job should be assigned to a printer.
if print_job.state in ["failed", "finished", "aborted", "none"]:
# Print job was already completed, so don't attach it to a printer.
printer = None
else:
printer = self._getPrinterByKey(print_job_data["printer_uuid"])
else: # The job can "reserve" a printer if some changes are required.
printer = self._getPrinterByKey(print_job_data["assigned_to"])
if printer:
printer.updateActivePrintJob(print_job)
print_jobs_seen.append(print_job)
# Check what jobs need to be removed.
removed_jobs = [print_job for print_job in self._print_jobs if print_job not in print_jobs_seen]
for removed_job in removed_jobs:
job_list_changed = job_list_changed or self._removeJob(removed_job)
if job_list_changed:
# Override the old list with the new list (either because jobs were removed / added or order changed)
self._print_jobs = print_jobs_seen
self.printJobsChanged.emit() # Do a single emit for all print job changes.
def _onGetPrintersDataFinished(self, reply: QNetworkReply) -> None:
if not checkValidGetReply(reply):
return
result = loadJsonFromReply(reply)
if result is None:
return
printer_list_changed = False
printers_seen = []
for printer_data in result:
printer = findByKey(self._printers, printer_data["uuid"])
if printer is None:
output_controller = ClusterUM3PrinterOutputController(self)
printer = PrinterModelFactory.createPrinter(output_controller=output_controller,
ip_address=printer_data.get("ip_address", ""),
extruder_count=self._number_of_extruders)
self._printers.append(printer)
printer_list_changed = True
printers_seen.append(printer)
self._updatePrinter(printer, printer_data)
removed_printers = [printer for printer in self._printers if printer not in printers_seen]
for printer in removed_printers:
self._removePrinter(printer)
if removed_printers or printer_list_changed:
self.printersChanged.emit()
def _createPrintJobModel(self, data: Dict[str, Any]) -> UM3PrintJobOutputModel:
print_job = UM3PrintJobOutputModel(output_controller=ClusterUM3PrinterOutputController(self),
key=data["uuid"], name= data["name"])
configuration = PrinterConfigurationModel()
extruders = [ExtruderConfigurationModel(position = idx) for idx in range(0, self._number_of_extruders)]
for index in range(0, self._number_of_extruders):
try:
extruder_data = data["configuration"][index]
except IndexError:
continue
extruder = extruders[int(data["configuration"][index]["extruder_index"])]
extruder.setHotendID(extruder_data.get("print_core_id", ""))
extruder.setMaterial(self._createMaterialOutputModel(extruder_data.get("material", {})))
configuration.setExtruderConfigurations(extruders)
configuration.setPrinterType(data.get("machine_variant", ""))
print_job.updateConfiguration(configuration)
print_job.setCompatibleMachineFamilies(data.get("compatible_machine_families", []))
print_job.stateChanged.connect(self._printJobStateChanged)
return print_job
def _updatePrintJob(self, print_job: UM3PrintJobOutputModel, data: Dict[str, Any]) -> None:
print_job.updateTimeTotal(data["time_total"])
print_job.updateTimeElapsed(data["time_elapsed"])
impediments_to_printing = data.get("impediments_to_printing", [])
print_job.updateOwner(data["owner"])
status_set_by_impediment = False
for impediment in impediments_to_printing:
if impediment["severity"] == "UNFIXABLE":
status_set_by_impediment = True
print_job.updateState("error")
break
if not status_set_by_impediment:
print_job.updateState(data["status"])
configuration_changes = PrinterModelFactory.createConfigurationChanges(data.get("configuration_changes_required"))
print_job.updateConfigurationChanges(configuration_changes)
def _createMaterialOutputModel(self, material_data: Dict[str, Any]) -> "MaterialOutputModel":
material_manager = self._application.getMaterialManager()
material_group_list = None
# Avoid crashing if there is no "guid" field in the metadata
material_guid = material_data.get("guid")
if material_guid:
material_group_list = material_manager.getMaterialGroupListByGUID(material_guid)
# This can happen if the connected machine has no material in one or more extruders (if GUID is empty), or the
# material is unknown to Cura, so we should return an "empty" or "unknown" material model.
if material_group_list is None:
material_name = i18n_catalog.i18nc("@label:material", "Empty") if len(material_data.get("guid", "")) == 0 \
else i18n_catalog.i18nc("@label:material", "Unknown")
return MaterialOutputModel(guid = material_data.get("guid", ""),
type = material_data.get("material", ""),
color = material_data.get("color", ""),
brand = material_data.get("brand", ""),
name = material_data.get("name", material_name)
)
# Sort the material groups by "is_read_only = True" first, and then the name alphabetically.
read_only_material_group_list = list(filter(lambda x: x.is_read_only, material_group_list))
non_read_only_material_group_list = list(filter(lambda x: not x.is_read_only, material_group_list))
material_group = None
if read_only_material_group_list:
read_only_material_group_list = sorted(read_only_material_group_list, key = lambda x: x.name)
material_group = read_only_material_group_list[0]
elif non_read_only_material_group_list:
non_read_only_material_group_list = sorted(non_read_only_material_group_list, key = lambda x: x.name)
material_group = non_read_only_material_group_list[0]
if material_group:
container = material_group.root_material_node.getContainer()
color = container.getMetaDataEntry("color_code")
brand = container.getMetaDataEntry("brand")
material_type = container.getMetaDataEntry("material")
name = container.getName()
else:
Logger.log("w",
"Unable to find material with guid {guid}. Using data as provided by cluster".format(
guid=material_data["guid"]))
color = material_data["color"]
brand = material_data["brand"]
material_type = material_data["material"]
name = i18n_catalog.i18nc("@label:material", "Empty") if material_data["material"] == "empty" \
else i18n_catalog.i18nc("@label:material", "Unknown")
return MaterialOutputModel(guid = material_data["guid"], type = material_type,
brand = brand, color = color, name = name)
def _updatePrinter(self, printer: PrinterOutputModel, data: Dict[str, Any]) -> None:
# For some unknown reason the cluster wants UUID for everything, except for sending a job directly to a printer.
# Then we suddenly need the unique name. So in order to not have to mess up all the other code, we save a mapping.
self._printer_uuid_to_unique_name_mapping[data["uuid"]] = data["unique_name"]
definitions = ContainerRegistry.getInstance().findDefinitionContainers(name = data["machine_variant"])
if not definitions:
Logger.log("w", "Unable to find definition for machine variant %s", data["machine_variant"])
return
machine_definition = definitions[0]
printer.updateName(data["friendly_name"])
printer.updateKey(data["uuid"])
printer.updateType(data["machine_variant"])
if data["status"] != "unreachable":
self._application.getDiscoveredPrintersModel().updateDiscoveredPrinter(data["ip_address"],
name = data["friendly_name"],
machine_type = data["machine_variant"])
# Do not store the build plate information that comes from connect if the current printer has not build plate information
if "build_plate" in data and machine_definition.getMetaDataEntry("has_variant_buildplates", False):
printer.updateBuildplate(data["build_plate"]["type"])
if not data["enabled"]:
printer.updateState("disabled")
else:
printer.updateState(data["status"])
for index in range(0, self._number_of_extruders):
extruder = printer.extruders[index]
try:
extruder_data = data["configuration"][index]
except IndexError:
break
extruder.updateHotendID(extruder_data.get("print_core_id", ""))
material_data = extruder_data["material"]
if extruder.activeMaterial is None or extruder.activeMaterial.guid != material_data["guid"]:
material = self._createMaterialOutputModel(material_data)
extruder.updateActiveMaterial(material)
def _removeJob(self, job: UM3PrintJobOutputModel) -> bool:
if job not in self._print_jobs:
return False
if job.assignedPrinter:
job.assignedPrinter.updateActivePrintJob(None)
job.stateChanged.disconnect(self._printJobStateChanged)
self._print_jobs.remove(job)
return True
def _removePrinter(self, printer: PrinterOutputModel) -> None:
self._printers.remove(printer)
if self._active_printer == printer:
self._active_printer = None
self.activePrinterChanged.emit()
self._cluster_api.getPrinters(self._updatePrinters)
self._cluster_api.getPrintJobs(self._updatePrintJobs)
# for print_job in self._print_jobs:
# if print_job.getPreviewImage() is None:
# self.get("print_jobs/{uuid}/preview_image".format(uuid=print_job.key), on_finished=self._onGetPreviewImageFinished)
## Sync the material profiles in Cura with the printer.
#
# This gets called when connecting to a printer as well as when sending a
# print.
# This gets called when connecting to a printer as well as when sending a print.
def sendMaterialProfiles(self) -> None:
job = SendMaterialJob(device=self)
job.run()
def loadJsonFromReply(reply: QNetworkReply) -> Optional[List[Dict[str, Any]]]:
try:
result = json.loads(bytes(reply.readAll()).decode("utf-8"))
except json.decoder.JSONDecodeError:
Logger.logException("w", "Unable to decode JSON from reply.")
return None
return result
# TODO FROM HERE
def checkValidGetReply(reply: QNetworkReply) -> bool:
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
if status_code != 200:
Logger.log("w", "Got status code {status_code} while trying to get data".format(status_code=status_code))
return False
return True
def requestWrite(self, nodes: List["SceneNode"], file_name: Optional[str] = None, limit_mimetypes: bool = False,
file_handler: Optional["FileHandler"] = None, filter_by_machine: bool = False, **kwargs) -> None:
pass
# self.writeStarted.emit(self)
#
# self.sendMaterialProfiles()
#
# mesh_format = MeshFormatHandler(file_handler, self.firmwareVersion)
#
# # This function pauses with the yield, waiting on instructions on which printer it needs to print with.
# if not mesh_format.is_valid:
# Logger.log("e", "Missing file or mesh writer!")
# return
# self._sending_job = self._sendPrintJob(mesh_format, nodes)
# if self._sending_job is not None:
# self._sending_job.send(None) # Start the generator.
#
# if len(self._printers) > 1: # We need to ask the user.
# self._spawnPrinterSelectionDialog()
# else: # Just immediately continue.
# self._sending_job.send("") # No specifically selected printer.
# self._sending_job.send(None)
#
# def _spawnPrinterSelectionDialog(self):
# if self._printer_selection_dialog is None:
# if PluginRegistry.getInstance() is not None:
# path = os.path.join(
# PluginRegistry.getInstance().getPluginPath("UM3NetworkPrinting"),
# "resources", "qml", "PrintWindow.qml"
# )
# self._printer_selection_dialog = self._application.createQmlComponent(path, {"OutputDevice": self})
# if self._printer_selection_dialog is not None:
# self._printer_selection_dialog.show()
# ## Allows the user to choose a printer to print with from the printer
# # selection dialogue.
# # \param target_printer The name of the printer to target.
# @pyqtSlot(str)
# def selectPrinter(self, target_printer: str = "") -> None:
# if self._sending_job is not None:
# self._sending_job.send(target_printer)
def findByKey(lst: List[Union[UM3PrintJobOutputModel, PrinterOutputModel]], key: str) -> Optional[UM3PrintJobOutputModel]:
for item in lst:
if item.key == key:
return item
return None
# @pyqtSlot()
# def cancelPrintSelection(self) -> None:
# self._sending_gcode = False
# ## Greenlet to send a job to the printer over the network.
# #
# # This greenlet gets called asynchronously in requestWrite. It is a
# # greenlet in order to optionally wait for selectPrinter() to select a
# # printer.
# # The greenlet yields exactly three times: First time None,
# # \param mesh_format Object responsible for choosing the right kind of format to write with.
# def _sendPrintJob(self, mesh_format: MeshFormatHandler, nodes: List[SceneNode]):
# Logger.log("i", "Sending print job to printer.")
# if self._sending_gcode:
# self._error_message = Message(
# I18N_CATALOG.i18nc("@info:status",
# "Sending new jobs (temporarily) blocked, still sending the previous print job."))
# self._error_message.show()
# yield #Wait on the user to select a target printer.
# yield #Wait for the write job to be finished.
# yield False #Return whether this was a success or not.
# yield #Prevent StopIteration.
#
# self._sending_gcode = True
#
# # Potentially wait on the user to select a target printer.
# target_printer = yield # type: Optional[str]
#
# # Using buffering greatly reduces the write time for many lines of gcode
#
# stream = mesh_format.createStream()
#
# job = WriteFileJob(mesh_format.writer, stream, nodes, mesh_format.file_mode)
#
# self._write_job_progress_message = Message(I18N_CATALOG.i18nc("@info:status", "Sending data to printer"),
# lifetime = 0, dismissable = False, progress = -1,
# title = I18N_CATALOG.i18nc("@info:title", "Sending Data"),
# use_inactivity_timer = False)
# self._write_job_progress_message.show()
#
# if mesh_format.preferred_format is not None:
# self._dummy_lambdas = (target_printer, mesh_format.preferred_format, stream)
# job.finished.connect(self._sendPrintJobWaitOnWriteJobFinished)
# job.start()
# yield True # Return that we had success!
# yield # To prevent having to catch the StopIteration exception.
# def _sendPrintJobWaitOnWriteJobFinished(self, job: WriteFileJob) -> None:
# if self._write_job_progress_message:
# self._write_job_progress_message.hide()
#
# self._progress_message = Message(I18N_CATALOG.i18nc("@info:status", "Sending data to printer"), lifetime = 0,
# dismissable = False, progress = -1,
# title = I18N_CATALOG.i18nc("@info:title", "Sending Data"))
# self._progress_message.addAction("Abort", I18N_CATALOG.i18nc("@action:button", "Cancel"), icon = "",
# description = "")
# self._progress_message.actionTriggered.connect(self._progressMessageActionTriggered)
# self._progress_message.show()
# parts = []
#
# target_printer, preferred_format, stream = self._dummy_lambdas
#
# # If a specific printer was selected, it should be printed with that machine.
# if target_printer:
# target_printer = self._printer_uuid_to_unique_name_mapping[target_printer]
# parts.append(self._createFormPart("name=require_printer_name", bytes(target_printer, "utf-8"), "text/plain"))
#
# # Add user name to the print_job
# parts.append(self._createFormPart("name=owner", bytes(self._getUserName(), "utf-8"), "text/plain"))
#
# file_name = self._application.getPrintInformation().jobName + "." + preferred_format["extension"]
#
# output = stream.getvalue() # Either str or bytes depending on the output mode.
# if isinstance(stream, io.StringIO):
# output = cast(str, output).encode("utf-8")
# output = cast(bytes, output)
#
# parts.append(self._createFormPart("name=\"file\"; filename=\"%s\"" % file_name, output))
#
# self._latest_reply_handler = self.postFormWithParts("print_jobs/", parts,
# on_finished = self._onPostPrintJobFinished,
# on_progress = self._onUploadPrintJobProgress)
# def _onPostPrintJobFinished(self, reply: QNetworkReply) -> None:
# if self._progress_message:
# self._progress_message.hide()
# self._compressing_gcode = False
# self._sending_gcode = False
# def _onUploadPrintJobProgress(self, bytes_sent: int, bytes_total: int) -> None:
# if bytes_total > 0:
# new_progress = bytes_sent / bytes_total * 100
# # Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get
# # timeout responses if this happens.
# self._last_response_time = time()
# if self._progress_message is not None and new_progress != self._progress_message.getProgress():
# self._progress_message.show() # Ensure that the message is visible.
# self._progress_message.setProgress(bytes_sent / bytes_total * 100)
#
# # If successfully sent:
# if bytes_sent == bytes_total:
# # Show a confirmation to the user so they know the job was sucessful and provide the option to switch to
# # the monitor tab.
# self._success_message = Message(
# I18N_CATALOG.i18nc("@info:status", "Print job was successfully sent to the printer."),
# lifetime=5, dismissable=True,
# title=I18N_CATALOG.i18nc("@info:title", "Data Sent"))
# self._success_message.addAction("View", I18N_CATALOG.i18nc("@action:button", "View in Monitor"), icon = "",
# description="")
# self._success_message.actionTriggered.connect(self._successMessageActionTriggered)
# self._success_message.show()
# else:
# if self._progress_message is not None:
# self._progress_message.setProgress(0)
# self._progress_message.hide()
# def _progressMessageActionTriggered(self, message_id: Optional[str] = None, action_id: Optional[str] = None) -> None:
# if action_id == "Abort":
# Logger.log("d", "User aborted sending print to remote.")
# if self._progress_message is not None:
# self._progress_message.hide()
# self._compressing_gcode = False
# self._sending_gcode = False
# self._application.getController().setActiveStage("PrepareStage")
#
# # After compressing the sliced model Cura sends data to printer, to stop receiving updates from the request
# # the "reply" should be disconnected
# if self._latest_reply_handler:
# self._latest_reply_handler.disconnect()
# self._latest_reply_handler = None
# def _successMessageActionTriggered(self, message_id: Optional[str] = None, action_id: Optional[str] = None) -> None:
# if action_id == "View":
# self._application.getController().setActiveStage("MonitorStage")
# def _onGetPreviewImageFinished(self, reply: QNetworkReply) -> None:
# reply_url = reply.url().toString()
#
# uuid = reply_url[reply_url.find("print_jobs/")+len("print_jobs/"):reply_url.rfind("/preview_image")]
#
# print_job = findByKey(self._print_jobs, uuid)
# if print_job:
# image = QImage()
# image.loadFromData(reply.readAll())
# print_job.updatePreviewImage(image)
# def _createMaterialOutputModel(self, material_data: Dict[str, Any]) -> "MaterialOutputModel":
# material_manager = self._application.getMaterialManager()
# material_group_list = None
#
# # Avoid crashing if there is no "guid" field in the metadata
# material_guid = material_data.get("guid")
# if material_guid:
# material_group_list = material_manager.getMaterialGroupListByGUID(material_guid)
#
# # This can happen if the connected machine has no material in one or more extruders (if GUID is empty), or the
# # material is unknown to Cura, so we should return an "empty" or "unknown" material model.
# if material_group_list is None:
# material_name = I18N_CATALOG.i18nc("@label:material", "Empty") if len(material_data.get("guid", "")) == 0 \
# else I18N_CATALOG.i18nc("@label:material", "Unknown")
#
# return MaterialOutputModel(guid = material_data.get("guid", ""),
# type = material_data.get("material", ""),
# color = material_data.get("color", ""),
# brand = material_data.get("brand", ""),
# name = material_data.get("name", material_name)
# )
#
# # Sort the material groups by "is_read_only = True" first, and then the name alphabetically.
# read_only_material_group_list = list(filter(lambda x: x.is_read_only, material_group_list))
# non_read_only_material_group_list = list(filter(lambda x: not x.is_read_only, material_group_list))
# material_group = None
# if read_only_material_group_list:
# read_only_material_group_list = sorted(read_only_material_group_list, key = lambda x: x.name)
# material_group = read_only_material_group_list[0]
# elif non_read_only_material_group_list:
# non_read_only_material_group_list = sorted(non_read_only_material_group_list, key = lambda x: x.name)
# material_group = non_read_only_material_group_list[0]
#
# if material_group:
# container = material_group.root_material_node.getContainer()
# color = container.getMetaDataEntry("color_code")
# brand = container.getMetaDataEntry("brand")
# material_type = container.getMetaDataEntry("material")
# name = container.getName()
# else:
# Logger.log("w",
# "Unable to find material with guid {guid}. Using data as provided by cluster".format(
# guid=material_data["guid"]))
# color = material_data["color"]
# brand = material_data["brand"]
# material_type = material_data["material"]
# name = I18N_CATALOG.i18nc("@label:material", "Empty") if material_data["material"] == "empty" \
# else I18N_CATALOG.i18nc("@label:material", "Unknown")
# return MaterialOutputModel(guid = material_data["guid"], type = material_type,
# brand = brand, color = color, name = name)

View file

@ -1,20 +0,0 @@
# Copyright (c) 2017 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
MYPY = False
if MYPY:
from cura.PrinterOutput.Models.PrintJobOutputModel import PrintJobOutputModel
class ClusterUM3PrinterOutputController(PrinterOutputController):
def __init__(self, output_device):
super().__init__(output_device)
self.can_pre_heat_bed = False
self.can_pre_heat_hotends = False
self.can_control_manually = False
self.can_send_raw_gcode = False
def setJobState(self, job: "PrintJobOutputModel", state: str) -> None:
self._output_device.setJobState(job.key, state)

View file

@ -1,23 +0,0 @@
# Copyright (c) 2019 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
## The network API client is responsible for handling requests and responses to printer over the local network (LAN).
class NetworkApiClient:
API_PREFIX = "/cluster-api/v1/"
def __init__(self) -> None:
pass
def getPrinters(self):
pass
def getPrintJobs(self):
pass
def requestPrint(self):
pass
def doPrintJobAction(self):
pass

View file

@ -61,7 +61,6 @@ class NetworkOutputDeviceManager:
return
um_network_key = active_machine.getMetaDataEntry("um_network_key")
for key in self._discovered_devices:
if key == um_network_key:
if not self._discovered_devices[key].isConnected():
@ -229,6 +228,7 @@ class NetworkOutputDeviceManager:
global_container_stack = CuraApplication.getInstance().getGlobalContainerStack()
if global_container_stack and device.getId() == global_container_stack.getMetaDataEntry("um_network_key"):
# Ensure that the configured connection type is set.
CuraApplication.getInstance().getOutputDeviceManager().addOutputDevice(device)
global_container_stack.addConfiguredConnectionType(device.connectionType.value)
device.connect()
device.connectionStateChanged.connect(self._onDeviceConnectionStateChanged)
@ -371,8 +371,6 @@ class NetworkOutputDeviceManager:
# Ensure that these containers do know that they are configured for network connection
machine.addConfiguredConnectionType(printer_device.connectionType.value)
self.refreshConnections()
## Create a machine instance based on the discovered network printer.
def _createMachineFromDiscoveredPrinter(self, key: str) -> None:
discovered_device = self._discovered_devices.get(key)

View file

@ -1,14 +1,22 @@
# Copyright (c) 2019 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import os
from typing import List, Optional, Dict
from PyQt5.QtCore import pyqtProperty, pyqtSignal, QObject, pyqtSlot, QUrl
from UM.Logger import Logger
from UM.Qt.Duration import Duration, DurationFormat
from cura.CuraApplication import CuraApplication
from cura.PrinterOutput.Models.PrinterOutputModel import PrinterOutputModel
from cura.PrinterOutput.NetworkedPrinterOutputDevice import NetworkedPrinterOutputDevice, AuthState
from plugins.UM3NetworkPrinting.src.Utils import formatTimeCompleted, formatDateCompleted
from plugins.UM3NetworkPrinting.src.Models.UM3PrintJobOutputModel import UM3PrintJobOutputModel
from cura.PrinterOutput.PrinterOutputDevice import ConnectionType
from plugins.UM3NetworkPrinting.src.Models.Http.ClusterPrintJobStatus import ClusterPrintJobStatus
from .Utils import formatTimeCompleted, formatDateCompleted
from .ClusterOutputController import ClusterOutputController
from .Models.UM3PrintJobOutputModel import UM3PrintJobOutputModel
from .Models.Http.ClusterPrinterStatus import ClusterPrinterStatus
## Output device class that forms the basis of Ultimaker networked printer output devices.
@ -29,13 +37,17 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice):
# States indicating if a print job is queued.
QUEUED_PRINT_JOBS_STATES = {"queued", "error"}
def __init__(self, device_id, address, properties, connection_type, parent=None) -> None:
def __init__(self, device_id: str, address: str, properties: Dict[bytes, bytes], connection_type: ConnectionType,
parent=None) -> None:
super().__init__(device_id=device_id, address=address, properties=properties, connection_type=connection_type,
parent=parent)
# Trigger the printersChanged signal when the private signal is triggered.
self.printersChanged.connect(self._clusterPrintersChanged)
# Keeps track of all printers in the cluster.
self._printers = [] # type: List[PrinterOutputModel]
# Keeps track of all print jobs in the cluster.
self._print_jobs = [] # type: List[UM3PrintJobOutputModel]
@ -45,6 +57,14 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice):
# By default we are not authenticated. This state will be changed later.
self._authentication_state = AuthState.NotAuthenticated
# Load the Monitor UI elements.
self._loadMonitorTab()
## The IP address of the printer.
@pyqtProperty(str, constant=True)
def address(self) -> str:
return self._address
# Get all print jobs for this cluster.
@pyqtProperty("QVariantList", notify=printJobsChanged)
def printJobs(self) -> List[UM3PrintJobOutputModel]:
@ -150,3 +170,93 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice):
@pyqtSlot(int, result=str, name="formatDuration")
def formatDuration(self, seconds: int) -> str:
return Duration(seconds).getDisplayString(DurationFormat.Format.Short)
## Load Monitor tab QML.
def _loadMonitorTab(self):
plugin_registry = CuraApplication.getInstance().getPluginRegistry()
if not plugin_registry:
Logger.log("e", "Could not get plugin registry")
return
plugin_path = plugin_registry.getPluginPath("UM3NetworkPrinting")
if not plugin_path:
Logger.log("e", "Could not get plugin path")
return
self._monitor_view_qml_path = os.path.join(plugin_path, "resources", "qml", "MonitorStage.qml")
def _updatePrinters(self, remote_printers: List[ClusterPrinterStatus]) -> None:
# Keep track of the new printers to show.
# We create a new list instead of changing the existing one to get the correct order.
new_printers = []
# Check which printers need to be created or updated.
for index, printer_data in enumerate(remote_printers):
printer = next(iter(printer for printer in self._printers if printer.key == printer_data.uuid), None)
if not printer:
printer = printer_data.createOutputModel(ClusterOutputController(self))
else:
printer_data.updateOutputModel(printer)
new_printers.append(printer)
# Check which printers need to be removed (de-referenced).
remote_printers_keys = [printer_data.uuid for printer_data in remote_printers]
removed_printers = [printer for printer in self._printers if printer.key not in remote_printers_keys]
for removed_printer in removed_printers:
if self._active_printer and self._active_printer.key == removed_printer.key:
self.setActivePrinter(None)
self._printers = new_printers
if self._printers and not self.activePrinter:
self.setActivePrinter(self._printers[0])
self.printersChanged.emit()
## Updates the local list of print jobs with the list received from the cloud.
# \param remote_jobs: The print jobs received from the cloud.
def _updatePrintJobs(self, remote_jobs: List[ClusterPrintJobStatus]) -> None:
# Keep track of the new print jobs to show.
# We create a new list instead of changing the existing one to get the correct order.
new_print_jobs = []
# Check which print jobs need to be created or updated.
for index, print_job_data in enumerate(remote_jobs):
print_job = next(
iter(print_job for print_job in self._print_jobs if print_job.key == print_job_data.uuid), None)
if not print_job:
new_print_jobs.append(self._createPrintJobModel(print_job_data))
else:
print_job_data.updateOutputModel(print_job)
if print_job_data.printer_uuid:
self._updateAssignedPrinter(print_job, print_job_data.printer_uuid)
new_print_jobs.append(print_job)
# Check which print job need to be removed (de-referenced).
remote_job_keys = [print_job_data.uuid for print_job_data in remote_jobs]
removed_jobs = [print_job for print_job in self._print_jobs if print_job.key not in remote_job_keys]
for removed_job in removed_jobs:
if removed_job.assignedPrinter:
removed_job.assignedPrinter.updateActivePrintJob(None)
removed_job.stateChanged.disconnect(self._onPrintJobStateChanged)
self._print_jobs = new_print_jobs
self.printJobsChanged.emit()
## Create a new print job model based on the remote status of the job.
# \param remote_job: The remote print job data.
def _createPrintJobModel(self, remote_job: ClusterPrintJobStatus) -> UM3PrintJobOutputModel:
model = remote_job.createOutputModel(ClusterOutputController(self))
model.stateChanged.connect(self._onPrintJobStateChanged)
if remote_job.printer_uuid:
self._updateAssignedPrinter(model, remote_job.printer_uuid)
return model
## Updates the printer assignment for the given print job model.
def _updateAssignedPrinter(self, model: UM3PrintJobOutputModel, printer_uuid: str) -> None:
printer = next((p for p in self._printers if printer_uuid == p.key), None)
if not printer:
Logger.log("w", "Missing printer %s for job %s in %s", model.assignedPrinter, model.key,
[p.key for p in self._printers])
return
printer.updateActivePrintJob(model)
model.updateAssignedPrinter(printer)