Merge branch 'STAR-322_cloud-connection' of https://github.com/Ultimaker/Cura into STAR-322_cloud-connection

This commit is contained in:
ChrisTerBeke 2018-12-11 12:39:56 +01:00
commit c07fea1405
32 changed files with 461 additions and 335 deletions

View file

@ -10,12 +10,12 @@ from UM.Logger import Logger
from cura.API import Account
from cura.NetworkClient import NetworkClient
from ..Models import BaseModel
from .Models.CloudCluster import CloudCluster
from .Models.CloudClusterResponse import CloudClusterResponse
from .Models.CloudErrorObject import CloudErrorObject
from .Models.CloudClusterStatus import CloudClusterStatus
from .Models.CloudJobUploadRequest import CloudJobUploadRequest
from .Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
from .Models.CloudPrintResponse import CloudPrintResponse
from .Models.CloudJobResponse import CloudJobResponse
from .Models.CloudPrintJobResponse import CloudPrintJobResponse
## The cloud API client is responsible for handling the requests and responses from the cloud.
@ -31,7 +31,7 @@ class CloudApiClient(NetworkClient):
## Initializes a new cloud API client.
# \param account: The user's account object
# \param on_error: The callback to be called whenever we receive errors from the server.
def __init__(self, account: Account, on_error: Callable[[List[CloudErrorObject]], None]):
def __init__(self, account: Account, on_error: Callable[[List[CloudErrorObject]], None]) -> None:
super().__init__()
self._account = account
self._on_error = on_error
@ -43,9 +43,9 @@ class CloudApiClient(NetworkClient):
## Retrieves all the clusters for the user that is currently logged in.
# \param on_finished: The function to be called after the result is parsed.
def getClusters(self, on_finished: Callable[[List[CloudCluster]], any]) -> None:
def getClusters(self, on_finished: Callable[[List[CloudClusterResponse]], any]) -> None:
url = "{}/clusters".format(self.CLUSTER_API_ROOT)
self.get(url, on_finished=self._wrapCallback(on_finished, CloudCluster))
self.get(url, on_finished=self._wrapCallback(on_finished, CloudClusterResponse))
## Retrieves the status of the given cluster.
# \param cluster_id: The ID of the cluster.
@ -57,10 +57,11 @@ class CloudApiClient(NetworkClient):
## Requests the cloud to register the upload of a print job mesh.
# \param request: The request object.
# \param on_finished: The function to be called after the result is parsed.
def requestUpload(self, request: CloudJobUploadRequest, on_finished: Callable[[CloudJobResponse], any]) -> None:
def requestUpload(self, request: CloudPrintJobUploadRequest, on_finished: Callable[[CloudPrintJobResponse], any]
) -> None:
url = "{}/jobs/upload".format(self.CURA_API_ROOT)
body = json.dumps({"data": request.__dict__})
self.put(url, body, on_finished=self._wrapCallback(on_finished, CloudJobResponse))
body = json.dumps({"data": request.toDict()})
self.put(url, body, on_finished=self._wrapCallback(on_finished, CloudPrintJobResponse))
## Requests the cloud to register the upload of a print job mesh.
# \param upload_response: The object received after requesting an upload with `self.requestUpload`.
@ -68,7 +69,7 @@ class CloudApiClient(NetworkClient):
# \param on_finished: The function to be called after the result is parsed. It receives the print job ID.
# \param on_progress: A function to be called during upload progress. It receives a percentage (0-100).
# \param on_error: A function to be called if the upload fails. It receives a dict with the error.
def uploadMesh(self, upload_response: CloudJobResponse, mesh: bytes, on_finished: Callable[[str], any],
def uploadMesh(self, upload_response: CloudPrintJobResponse, mesh: bytes, on_finished: Callable[[str], any],
on_progress: Callable[[int], any], on_error: Callable[[dict], any]):
def progressCallback(bytes_sent: int, bytes_total: int) -> None:
@ -126,13 +127,13 @@ class CloudApiClient(NetworkClient):
## Parses the given models and calls the correct callback depending on the result.
# \param response: The response from the server, after being converted to a dict.
# \param on_finished: The callback in case the response is successful.
# \param model: The type of the model to convert the response to. It may either be a single record or a list.
# \param model_class: The type of the model to convert the response to. It may either be a single record or a list.
def _parseModels(self, response: Dict[str, any],
on_finished: Callable[[Union[Model, List[Model]]], any],
model: Type[Model]) -> None:
model_class: Type[Model]) -> None:
if "data" in response:
data = response["data"]
result = [model(**c) for c in data] if isinstance(data, list) else model(**data)
result = [model_class(**c) for c in data] if isinstance(data, list) else model_class(**data)
on_finished(result)
elif "errors" in response:
self._on_error([CloudErrorObject(**error) for error in response["errors"]])

View file

@ -20,11 +20,11 @@ from ..MeshFormatHandler import MeshFormatHandler
from ..UM3PrintJobOutputModel import UM3PrintJobOutputModel
from .CloudApiClient import CloudApiClient
from .Models.CloudClusterStatus import CloudClusterStatus
from .Models.CloudJobUploadRequest import CloudJobUploadRequest
from .Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
from .Models.CloudPrintResponse import CloudPrintResponse
from .Models.CloudJobResponse import CloudJobResponse
from .Models.CloudClusterPrinter import CloudClusterPrinter
from .Models.CloudClusterPrintJob import CloudClusterPrintJob
from .Models.CloudPrintJobResponse import CloudPrintJobResponse
from .Models.CloudClusterPrinterStatus import CloudClusterPrinterStatus
from .Models.CloudClusterPrintJobStatus import CloudClusterPrintJobStatus
from .Utils import findChanges, formatDateCompleted, formatTimeCompleted
@ -114,8 +114,8 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self._progress_message = None # type: Optional[Message]
# Keep server string of the last generated time to avoid updating models more than once for the same response
self._received_printers = None # type: Optional[List[CloudClusterPrinter]]
self._received_print_jobs = None # type: Optional[List[CloudClusterPrintJob]]
self._received_printers = None # type: Optional[List[CloudClusterPrinterStatus]]
self._received_print_jobs = None # type: Optional[List[CloudClusterPrintJobStatus]]
# A set of the user's job IDs that have finished
self._finished_jobs = set() # type: Set[str]
@ -164,7 +164,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
mesh_bytes = mesh_format.getBytes(nodes)
request = CloudJobUploadRequest(
request = CloudPrintJobUploadRequest(
job_name = file_name,
file_size = len(mesh_bytes),
content_type = mesh_format.mime_type,
@ -197,9 +197,9 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
## Updates the local list of printers with the list received from the cloud.
# \param jobs: The printers received from the cloud.
def _updatePrinters(self, printers: List[CloudClusterPrinter]) -> None:
def _updatePrinters(self, printers: List[CloudClusterPrinterStatus]) -> None:
previous = {p.key: p for p in self._printers} # type: Dict[str, PrinterOutputModel]
received = {p.uuid: p for p in printers} # type: Dict[str, CloudClusterPrinter]
received = {p.uuid: p for p in printers} # type: Dict[str, CloudClusterPrinterStatus]
removed_printers, added_printers, updated_printers = findChanges(previous, received)
@ -222,8 +222,8 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
## Updates the local list of print jobs with the list received from the cloud.
# \param jobs: The print jobs received from the cloud.
def _updatePrintJobs(self, jobs: List[CloudClusterPrintJob]) -> None:
received = {j.uuid: j for j in jobs} # type: Dict[str, CloudClusterPrintJob]
def _updatePrintJobs(self, jobs: List[CloudClusterPrintJobStatus]) -> None:
received = {j.uuid: j for j in jobs} # type: Dict[str, CloudClusterPrintJobStatus]
previous = {j.key: j for j in self._print_jobs} # type: Dict[str, UM3PrintJobOutputModel]
removed_jobs, added_jobs, updated_jobs = findChanges(previous, received)
@ -246,7 +246,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
## Registers a new print job received via the cloud API.
# \param job: The print job received.
def _addPrintJob(self, job: CloudClusterPrintJob) -> None:
def _addPrintJob(self, job: CloudClusterPrintJobStatus) -> None:
model = job.createOutputModel(CloudOutputController(self))
model.stateChanged.connect(self._onPrintJobStateChanged)
if job.printer_uuid:
@ -282,7 +282,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
## Uploads the mesh when the print job was registered with the cloud API.
# \param mesh: The bytes to upload.
# \param job_response: The response received from the cloud API.
def _onPrintJobCreated(self, mesh: bytes, job_response: CloudJobResponse) -> None:
def _onPrintJobCreated(self, mesh: bytes, job_response: CloudPrintJobResponse) -> None:
self._api.uploadMesh(job_response, mesh, self._onPrintJobUploaded, self._updateUploadProgress,
lambda _: self._onUploadError(T.UPLOAD_ERROR))

View file

@ -12,7 +12,7 @@ from cura.CuraApplication import CuraApplication
from cura.Settings.GlobalStack import GlobalStack
from .CloudApiClient import CloudApiClient
from .CloudOutputDevice import CloudOutputDevice
from .Models.CloudCluster import CloudCluster
from .Models.CloudClusterResponse import CloudClusterResponse
from .Models.CloudErrorObject import CloudErrorObject
from .Utils import findChanges
@ -72,8 +72,8 @@ class CloudOutputDeviceManager:
self._api.getClusters(self._onGetRemoteClustersFinished)
## Callback for when the request for getting the clusters. is finished.
def _onGetRemoteClustersFinished(self, clusters: List[CloudCluster]) -> None:
online_clusters = {c.cluster_id: c for c in clusters if c.is_online} # type: Dict[str, CloudCluster]
def _onGetRemoteClustersFinished(self, clusters: List[CloudClusterResponse]) -> None:
online_clusters = {c.cluster_id: c for c in clusters if c.is_online} # type: Dict[str, CloudClusterResponse]
removed_devices, added_clusters, updates = findChanges(self._remote_clusters, online_clusters)

View file

@ -1,17 +1,55 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from datetime import datetime, timezone
from typing import Dict, Union, TypeVar, Type, List, Any
from ...Models import BaseModel
## Base class for the models used in the interface with the Ultimaker cloud APIs.
class BaseCloudModel(BaseModel):
## Checks whether the two models are equal.
# \param other: The other model.
# \return True if they are equal, False if they are different.
def __eq__(self, other):
return type(self) == type(other) and self.__dict__ == other.__dict__
return type(self) == type(other) and self.toDict() == other.toDict()
def __ne__(self, other):
return type(self) != type(other) or self.__dict__ != other.__dict__
## Checks whether the two models are different.
# \param other: The other model.
# \return True if they are different, False if they are the same.
def __ne__(self, other) -> bool:
return type(self) != type(other) or self.toDict() != other.toDict()
## Converts the model into a serializable dictionary
def toDict(self) -> Dict[str, Any]:
return self.__dict__
# Type variable used in the parse methods below, which should be a subclass of BaseModel.
T = TypeVar("T", bound=BaseModel)
## Parses a single model.
# \param model_class: The model class.
# \param values: The value of the model, which is usually a dictionary, but may also be already parsed.
# \return An instance of the model_class given.
@staticmethod
def parseDate(date_str: str) -> datetime:
return datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
def parseModel(model_class: Type[T], values: Union[T, Dict[str, Any]]) -> T:
if isinstance(values, dict):
return model_class(**values)
return values
## Parses a list of models.
# \param model_class: The model class.
# \param values: The value of the list. Each value is usually a dictionary, but may also be already parsed.
# \return A list of instances of the model_class given.
@classmethod
def parseModels(cls, model_class: Type[T], values: List[Union[T, Dict[str, Any]]]) -> List[T]:
return [cls.parseModel(model_class, value) for value in values]
## Parses the given date string.
# \param date: The date to parse.
# \return The parsed date.
@staticmethod
def parseDate(date: Union[str, datetime]) -> datetime:
if isinstance(date, datetime):
return date
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)

View file

@ -1,21 +0,0 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from .BaseCloudModel import BaseCloudModel
## Class representing a cloud connected cluster.
class CloudCluster(BaseCloudModel):
def __init__(self, **kwargs):
self.cluster_id = None # type: str
self.host_guid = None # type: str
self.host_name = None # type: str
self.host_version = None # type: str
self.status = None # type: str
self.is_online = False # type: bool
super().__init__(**kwargs)
# Validates the model, raising an exception if the model is invalid.
def validate(self) -> None:
super().validate()
if not self.cluster_id:
raise ValueError("cluster_id is required on CloudCluster")

View file

@ -1,65 +0,0 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import List
from cura.PrinterOutput.ConfigurationModel import ConfigurationModel
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputController import CloudOutputController
from .CloudClusterPrinterConfiguration import CloudClusterPrinterConfiguration
from .CloudClusterPrintJobConstraint import CloudClusterPrintJobConstraint
from .BaseCloudModel import BaseCloudModel
## Class representing a print job
from plugins.UM3NetworkPrinting.src.UM3PrintJobOutputModel import UM3PrintJobOutputModel
class CloudClusterPrintJob(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.assigned_to = None # type: str
self.configuration = [] # type: List[CloudClusterPrinterConfiguration]
self.constraints = [] # type: List[CloudClusterPrintJobConstraint]
self.created_at = None # type: str
self.force = None # type: str
self.last_seen = None # type: str
self.machine_variant = None # type: str
self.name = None # type: str
self.network_error_count = None # type: int
self.owner = None # type: str
self.printer_uuid = None # type: str
self.started = None # type: str
self.status = None # type: str
self.time_elapsed = None # type: str
self.time_total = None # type: str
self.uuid = None # type: str
super().__init__(**kwargs)
self.configuration = [CloudClusterPrinterConfiguration(**c) if isinstance(c, dict) else c
for c in self.configuration]
self.constraints = [CloudClusterPrintJobConstraint(**p) if isinstance(p, dict) else p
for p in self.constraints]
## Creates an UM3 print job output model based on this cloud cluster print job.
# \param printer: The output model of the printer
def createOutputModel(self, controller: CloudOutputController) -> UM3PrintJobOutputModel:
model = UM3PrintJobOutputModel(controller, self.uuid, self.name)
self.updateOutputModel(model)
return model
## Creates a new configuration model
def _createConfigurationModel(self) -> ConfigurationModel:
extruders = [extruder.createConfigurationModel() for extruder in self.configuration or ()]
configuration = ConfigurationModel()
configuration.setExtruderConfigurations(extruders)
return configuration
## Updates an UM3 print job output model based on this cloud cluster print job.
# \param model: The model to update.
def updateOutputModel(self, model: UM3PrintJobOutputModel) -> None:
# TODO: Add `compatible_machine_families` to the cloud, than add model.setCompatibleMachineFamilies()
# TODO: Add `impediments_to_printing` to the cloud, see ClusterUM3OutputDevice._updatePrintJob
# TODO: Use model.updateConfigurationChanges, see ClusterUM3OutputDevice#_createConfigurationChanges
model.updateConfiguration(self._createConfigurationModel())
model.updateTimeTotal(self.time_total)
model.updateTimeElapsed(self.time_elapsed)
model.updateOwner(self.owner)
model.updateState(self.status)

View file

@ -1,10 +1,16 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Optional
from .BaseCloudModel import BaseCloudModel
## Class representing a cloud cluster print job constraint
class CloudClusterPrintJobConstraint(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.require_printer_name = None # type: str
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrintJobConstraints(BaseCloudModel):
## Creates a new print job constraint.
# \param require_printer_name: Unique name of the printer that this job should be printed on.
# Should be one of the unique_name field values in the cluster, e.g. 'ultimakersystem-ccbdd30044ec'
def __init__(self, require_printer_name: Optional[str] = None, **kwargs) -> None:
self.require_printer_name = require_printer_name
super().__init__(**kwargs)

View file

@ -0,0 +1,87 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import List, Optional, Union, Dict, Any
from cura.PrinterOutput.ConfigurationModel import ConfigurationModel
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputController import CloudOutputController
from .CloudClusterPrinterConfiguration import CloudClusterPrinterConfiguration
from .CloudClusterPrintJobConstraint import CloudClusterPrintJobConstraints
from .BaseCloudModel import BaseCloudModel
## Class representing a print job
from plugins.UM3NetworkPrinting.src.UM3PrintJobOutputModel import UM3PrintJobOutputModel
## Model for the status of a single print job in a cluster.
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrintJobStatus(BaseCloudModel):
## Creates a new cloud print job status model.
# \param assigned_to: The name of the printer this job is assigned to while being queued.
# \param configuration: The required print core configurations of this print job.
# \param constraints: Print job constraints object.
# \param created_at: The timestamp when the job was created in Cura Connect.
# \param force: Allow this job to be printed despite of mismatching configurations.
# \param last_seen: The number of seconds since this job was checked.
# \param machine_variant: The machine type that this job should be printed on.Coincides with the machine_type field
# of the printer object.
# \param name: The name of the print job. Usually the name of the .gcode file.
# \param network_error_count: The number of errors encountered when requesting data for this print job.
# \param owner: The name of the user who added the print job to Cura Connect.
# \param printer_uuid: UUID of the printer that the job is currently printing on or assigned to.
# \param started: Whether the job has started printing or not.
# \param status: The status of the print job.
# \param time_elapsed: The remaining printing time in seconds.
# \param time_total: The total printing time in seconds.
# \param uuid: UUID of this print job. Should be used for identification purposes.
def __init__(self, created_at: str, force: bool, machine_variant: str, name: str, started: bool, status: str,
time_total: int, uuid: str,
configuration: List[Union[Dict[str, Any], CloudClusterPrinterConfiguration]],
constraints: List[Union[Dict[str, Any], CloudClusterPrintJobConstraints]],
last_seen: Optional[float] = None, network_error_count: Optional[int] = None,
owner: Optional[str] = None, printer_uuid: Optional[str] = None, time_elapsed: Optional[int] = None,
assigned_to: Optional[str] = None, **kwargs) -> None:
self.assigned_to = assigned_to
self.configuration = self.parseModels(CloudClusterPrinterConfiguration, configuration)
self.constraints = self.parseModels(CloudClusterPrintJobConstraints, constraints)
self.created_at = created_at
self.force = force
self.last_seen = last_seen
self.machine_variant = machine_variant
self.name = name
self.network_error_count = network_error_count
self.owner = owner
self.printer_uuid = printer_uuid
self.started = started
self.status = status
self.time_elapsed = time_elapsed
self.time_total = time_total
self.uuid = uuid
super().__init__(**kwargs)
## Creates an UM3 print job output model based on this cloud cluster print job.
# \param printer: The output model of the printer
def createOutputModel(self, controller: CloudOutputController) -> UM3PrintJobOutputModel:
model = UM3PrintJobOutputModel(controller, self.uuid, self.name)
self.updateOutputModel(model)
return model
## Creates a new configuration model
def _createConfigurationModel(self) -> ConfigurationModel:
extruders = [extruder.createConfigurationModel() for extruder in self.configuration or ()]
configuration = ConfigurationModel()
configuration.setExtruderConfigurations(extruders)
return configuration
## Updates an UM3 print job output model based on this cloud cluster print job.
# \param model: The model to update.
def updateOutputModel(self, model: UM3PrintJobOutputModel) -> None:
# TODO: Add `compatible_machine_families` to the cloud, than add model.setCompatibleMachineFamilies()
# TODO: Add `impediments_to_printing` to the cloud, see ClusterUM3OutputDevice._updatePrintJob
# TODO: Use model.updateConfigurationChanges, see ClusterUM3OutputDevice#_createConfigurationChanges
model.updateConfiguration(self._createConfigurationModel())
model.updateTimeTotal(self.time_total)
model.updateTimeElapsed(self.time_elapsed)
model.updateOwner(self.owner)
model.updateState(self.status)

View file

@ -1,49 +0,0 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import List
from cura.PrinterOutput.ConfigurationModel import ConfigurationModel
from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
from .CloudClusterPrinterConfiguration import CloudClusterPrinterConfiguration
from .BaseCloudModel import BaseCloudModel
## Class representing a cluster printer
class CloudClusterPrinter(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.configuration = [] # type: List[CloudClusterPrinterConfiguration]
self.enabled = None # type: str
self.firmware_version = None # type: str
self.friendly_name = None # type: str
self.ip_address = None # type: str
self.machine_variant = None # type: str
self.status = None # type: str
self.unique_name = None # type: str
self.uuid = None # type: str
super().__init__(**kwargs)
self.configuration = [CloudClusterPrinterConfiguration(**c)
if isinstance(c, dict) else c for c in self.configuration]
## Creates a new output model.
# \param controller - The controller of the model.
def createOutputModel(self, controller: PrinterOutputController) -> PrinterOutputModel:
model = PrinterOutputModel(controller, len(self.configuration), firmware_version = self.firmware_version)
self.updateOutputModel(model)
return model
## Updates the given output model.
# \param model - The output model to update.
def updateOutputModel(self, model: PrinterOutputModel) -> None:
model.updateKey(self.uuid)
model.updateName(self.friendly_name)
model.updateType(self.machine_variant)
model.updateState(self.status if self.enabled else "disabled")
for configuration, extruder_output, extruder_config in \
zip(self.configuration, model.extruders, model.printerConfiguration.extruderConfigurations):
configuration.updateOutputModel(extruder_output)
configuration.updateConfigurationModel(extruder_config)
pass

View file

@ -1,5 +1,7 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Union, Dict, Optional, Any
from cura.PrinterOutput.ExtruderConfigurationModel import ExtruderConfigurationModel
from cura.PrinterOutput.ExtruderOutputModel import ExtruderOutputModel
from .CloudClusterPrinterConfigurationMaterial import CloudClusterPrinterConfigurationMaterial
@ -7,25 +9,35 @@ from .BaseCloudModel import BaseCloudModel
## Class representing a cloud cluster printer configuration
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrinterConfiguration(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.extruder_index = None # type: int
self.material = None # type: CloudClusterPrinterConfigurationMaterial
self.nozzle_diameter = None # type: str
self.print_core_id = None # type: str
## Creates a new cloud cluster printer configuration object
# \param extruder_index: The position of the extruder on the machine as list index. Numbered from left to right.
# \param material: The material of a configuration object in a cluster printer. May be in a dict or an object.
# \param nozzle_diameter: The diameter of the print core at this position in millimeters, e.g. '0.4'.
# \param print_core_id: The type of print core inserted at this position, e.g. 'AA 0.4'.
def __init__(self, extruder_index: int,
material: Union[None, Dict[str, Any], CloudClusterPrinterConfigurationMaterial],
nozzle_diameter: Optional[str] = None, print_core_id: Optional[str] = None, **kwargs) -> None:
self.extruder_index = extruder_index
self.material = self.parseModel(CloudClusterPrinterConfigurationMaterial, material) if material else None
self.nozzle_diameter = nozzle_diameter
self.print_core_id = print_core_id
super().__init__(**kwargs)
if isinstance(self.material, dict):
self.material = CloudClusterPrinterConfigurationMaterial(**self.material)
## Updates the given output model.
# \param model - The output model to update.
def updateOutputModel(self, model: ExtruderOutputModel) -> None:
model.updateHotendID(self.print_core_id)
if self.print_core_id is not None:
model.updateHotendID(self.print_core_id)
if model.activeMaterial is None or model.activeMaterial.guid != self.material.guid:
material = self.material.createOutputModel()
model.updateActiveMaterial(material)
if self.material:
active_material = model.activeMaterial
if active_material is None or active_material.guid != self.material.guid:
material = self.material.createOutputModel()
model.updateActiveMaterial(material)
else:
model.updateActiveMaterial(None)
## Creates a configuration model
def createConfigurationModel(self) -> ExtruderConfigurationModel:

View file

@ -1,3 +1,5 @@
from typing import Optional
from UM.Logger import Logger
from cura.CuraApplication import CuraApplication
from cura.PrinterOutput.MaterialOutputModel import MaterialOutputModel
@ -5,12 +7,19 @@ from .BaseCloudModel import BaseCloudModel
## Class representing a cloud cluster printer configuration
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrinterConfigurationMaterial(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.guid = None # type: str
self.brand = None # type: str
self.color = None # type: str
self.material = None # type: str
## Creates a new material configuration model.
# \param brand: The brand of material in this print core, e.g. 'Ultimaker'.
# \param color: The color of material in this print core, e.g. 'Blue'.
# \param guid: he GUID of the material in this print core, e.g. '506c9f0d-e3aa-4bd4-b2d2-23e2425b1aa9'.
# \param material: The type of material in this print core, e.g. 'PLA'.
def __init__(self, brand: Optional[str] = None, color: Optional[str] = None, guid: Optional[str] = None,
material: Optional[str] = None, **kwargs) -> None:
self.guid = guid
self.brand = brand
self.color = color
self.material = material
super().__init__(**kwargs)
## Creates a material output model based on this cloud printer material.

View file

@ -0,0 +1,60 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import List, Union, Dict, Optional, Any
from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
from .CloudClusterPrinterConfiguration import CloudClusterPrinterConfiguration
from .BaseCloudModel import BaseCloudModel
## Class representing a cluster printer
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterPrinterStatus(BaseCloudModel):
## Creates a new cluster printer status
# \param enabled: A printer can be disabled if it should not receive new jobs. By default every printer is enabled.
# \param firmware_version: Firmware version installed on the printer. Can differ for each printer in a cluster.
# \param friendly_name: Human readable name of the printer. Can be used for identification purposes.
# \param ip_address: The IP address of the printer in the local network.
# \param machine_variant: The type of printer. Can be 'Ultimaker 3' or 'Ultimaker 3ext'.
# \param status: The status of the printer.
# \param unique_name: The unique name of the printer in the network.
# \param uuid: The unique ID of the printer, also known as GUID.
# \param configuration: The active print core configurations of this printer.
# \param reserved_by: A printer can be claimed by a specific print job.
def __init__(self, enabled: bool, firmware_version: str, friendly_name: str, ip_address: str, machine_variant: str,
status: str, unique_name: str, uuid: str,
configuration: List[Union[Dict[str, Any], CloudClusterPrinterConfiguration]],
reserved_by: Optional[str] = None, **kwargs) -> None:
self.configuration = self.parseModels(CloudClusterPrinterConfiguration, configuration)
self.enabled = enabled
self.firmware_version = firmware_version
self.friendly_name = friendly_name
self.ip_address = ip_address
self.machine_variant = machine_variant
self.status = status
self.unique_name = unique_name
self.uuid = uuid
self.reserved_by = reserved_by
super().__init__(**kwargs)
## Creates a new output model.
# \param controller - The controller of the model.
def createOutputModel(self, controller: PrinterOutputController) -> PrinterOutputModel:
model = PrinterOutputModel(controller, len(self.configuration), firmware_version = self.firmware_version)
self.updateOutputModel(model)
return model
## Updates the given output model.
# \param model - The output model to update.
def updateOutputModel(self, model: PrinterOutputModel) -> None:
model.updateKey(self.uuid)
model.updateName(self.friendly_name)
model.updateType(self.machine_variant)
model.updateState(self.status if self.enabled else "disabled")
for configuration, extruder_output, extruder_config in \
zip(self.configuration, model.extruders, model.printerConfiguration.extruderConfigurations):
configuration.updateOutputModel(extruder_output)
configuration.updateConfigurationModel(extruder_config)

View file

@ -0,0 +1,32 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Optional
from .BaseCloudModel import BaseCloudModel
## Class representing a cloud connected cluster.
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterResponse(BaseCloudModel):
## Creates a new cluster response object.
# \param cluster_id: The secret unique ID, e.g. 'kBEeZWEifXbrXviO8mRYLx45P8k5lHVGs43XKvRniPg='.
# \param host_guid: The unique identifier of the print cluster host, e.g. 'e90ae0ac-1257-4403-91ee-a44c9b7e8050'.
# \param host_name: The name of the printer as configured during the Wi-Fi setup. Used as identifier for end users.
# \param is_online: Whether this cluster is currently connected to the cloud.
# \param status: The status of the cluster authentication (active or inactive).
# \param host_version: The firmware version of the cluster host. This is where the Stardust client is running on.
def __init__(self, cluster_id: str, host_guid: str, host_name: str, is_online: bool, status: str,
host_version: Optional[str] = None, **kwargs) -> None:
self.cluster_id = cluster_id
self.host_guid = host_guid
self.host_name = host_name
self.status = status
self.is_online = is_online
self.host_version = host_version
super().__init__(**kwargs)
# Validates the model, raising an exception if the model is invalid.
def validate(self) -> None:
super().validate()
if not self.cluster_id:
raise ValueError("cluster_id is required on CloudCluster")

View file

@ -1,28 +1,26 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from datetime import datetime
from typing import List
from typing import List, Dict, Union, Any
from .CloudClusterPrinter import CloudClusterPrinter
from .CloudClusterPrintJob import CloudClusterPrintJob
from .CloudClusterPrinterStatus import CloudClusterPrinterStatus
from .CloudClusterPrintJobStatus import CloudClusterPrintJobStatus
from .BaseCloudModel import BaseCloudModel
# Model that represents the status of the cluster for the cloud
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudClusterStatus(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.generated_time = None # type: datetime
# a list of the printers
self.printers = [] # type: List[CloudClusterPrinter]
# a list of the print jobs
self.print_jobs = [] # type: List[CloudClusterPrintJob]
## Creates a new cluster status model object.
# \param printers: The latest status of each printer in the cluster.
# \param print_jobs: The latest status of each print job in the cluster.
# \param generated_time: The datetime when the object was generated on the server-side.
def __init__(self,
printers: List[Union[CloudClusterPrinterStatus, Dict[str, Any]]],
print_jobs: List[Union[CloudClusterPrintJobStatus, Dict[str, Any]]],
generated_time: Union[str, datetime],
**kwargs) -> None:
self.generated_time = self.parseDate(generated_time)
self.printers = self.parseModels(CloudClusterPrinterStatus, printers)
self.print_jobs = self.parseModels(CloudClusterPrintJobStatus, print_jobs)
super().__init__(**kwargs)
# converting any dictionaries into models
self.printers = [CloudClusterPrinter(**p) if isinstance(p, dict) else p for p in self.printers]
self.print_jobs = [CloudClusterPrintJob(**j) if isinstance(j, dict) else j for j in self.print_jobs]
# converting generated time into datetime
if isinstance(self.generated_time, str):
self.generated_time = self.parseDate(self.generated_time)

View file

@ -1,17 +1,28 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Dict
from typing import Dict, Optional, Any
from .BaseCloudModel import BaseCloudModel
## Class representing errors generated by the cloud servers, according to the json-api standard.
## Class representing errors generated by the cloud servers, according to the JSON-API standard.
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudErrorObject(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.id = None # type: str
self.code = None # type: str
self.http_status = None # type: str
self.title = None # type: str
self.detail = None # type: str
self.meta = None # type: Dict[str, any]
## Creates a new error object.
# \param id: Unique identifier for this particular occurrence of the problem.
# \param title: A short, human-readable summary of the problem that SHOULD NOT change from occurrence to occurrence
# of the problem, except for purposes of localization.
# \param code: An application-specific error code, expressed as a string value.
# \param detail: A human-readable explanation specific to this occurrence of the problem. Like title, this field's
# value can be localized.
# \param http_status: The HTTP status code applicable to this problem, converted to string.
# \param meta: Non-standard meta-information about the error, depending on the error code.
def __init__(self, id: str, code: str, title: str, http_status: str, detail: Optional[str] = None,
meta: Optional[Dict[str, Any]] = None, **kwargs) -> None:
self.id = id
self.code = code
self.http_status = http_status
self.title = title
self.detail = detail
self.meta = meta
super().__init__(**kwargs)

View file

@ -1,16 +0,0 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from .BaseCloudModel import BaseCloudModel
# Model that represents the response received from the cloud after requesting to upload a print job
class CloudJobResponse(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.download_url = None # type: str
self.job_id = None # type: str
self.job_name = None # type: str
self.slicing_details = None # type: str
self.status = None # type: str
self.upload_url = None # type: str
self.content_type = None # type: str
super().__init__(**kwargs)

View file

@ -1,12 +0,0 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from .BaseCloudModel import BaseCloudModel
# Model that represents the request to upload a print job to the cloud
class CloudJobUploadRequest(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.file_size = None # type: int
self.job_name = None # type: str
self.content_type = None # type: str
super().__init__(**kwargs)

View file

@ -0,0 +1,33 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Optional
from .BaseCloudModel import BaseCloudModel
# Model that represents the response received from the cloud after requesting to upload a print job
# Spec: https://api-staging.ultimaker.com/cura/v1/spec
class CloudPrintJobResponse(BaseCloudModel):
## Creates a new print job response model.
# \param job_id: The job unique ID, e.g. 'kBEeZWEifXbrXviO8mRYLx45P8k5lHVGs43XKvRniPg='.
# \param status: The status of the print job.
# \param status_description: Contains more details about the status, e.g. the cause of failures.
# \param download_url: A signed URL to download the resulting status. Only available when the job is finished.
# \param job_name: The name of the print job.
# \param slicing_details: Model for slice information.
# \param upload_url: The one-time use URL where the toolpath must be uploaded to (only if status is uploading).
# \param content_type: The content type of the print job (e.g. text/plain or application/gzip)
# \param generated_time: The datetime when the object was generated on the server-side.
def __init__(self, job_id: str, status: str, download_url: Optional[str] = None, job_name: Optional[str] = None,
upload_url: Optional[str] = None, content_type: Optional[str] = None,
status_description: Optional[str] = None, slicing_details: Optional[dict] = None, **kwargs) -> None:
self.job_id = job_id
self.status = status
self.download_url = download_url
self.job_name = job_name
self.upload_url = upload_url
self.content_type = content_type
self.status_description = status_description
# TODO: Implement slicing details
self.slicing_details = slicing_details
super().__init__(**kwargs)

View file

@ -0,0 +1,17 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from .BaseCloudModel import BaseCloudModel
# Model that represents the request to upload a print job to the cloud
# Spec: https://api-staging.ultimaker.com/cura/v1/spec
class CloudPrintJobUploadRequest(BaseCloudModel):
## Creates a new print job upload request.
# \param job_name: The name of the print job.
# \param file_size: The size of the file in bytes.
# \param content_type: The content type of the print job (e.g. text/plain or application/gzip)
def __init__(self, job_name: str, file_size: int, content_type: str, **kwargs) -> None:
self.job_name = job_name
self.file_size = file_size
self.content_type = content_type
super().__init__(**kwargs)

View file

@ -1,12 +1,23 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from datetime import datetime
from typing import Optional, Union
from .BaseCloudModel import BaseCloudModel
# Model that represents the responses received from the cloud after requesting a job to be printed.
# Spec: https://api-staging.ultimaker.com/connect/v1/spec
class CloudPrintResponse(BaseCloudModel):
def __init__(self, **kwargs) -> None:
self.cluster_job_id = None # type: str
self.job_id = None # type: str
self.status = None # type: str
## Creates a new print response object.
# \param job_id: The unique ID of a print job inside of the cluster. This ID is generated by Cura Connect.
# \param status: The status of the print request (queued or failed).
# \param generated_time: The datetime when the object was generated on the server-side.
# \param cluster_job_id: The unique ID of a print job inside of the cluster. This ID is generated by Cura Connect.
def __init__(self, job_id: str, status: str, generated_time: Union[str, datetime],
cluster_job_id: Optional[str] = None, **kwargs) -> None:
self.job_id = job_id
self.status = status
self.cluster_job_id = cluster_job_id
self.generated_time = self.parseDate(generated_time)
super().__init__(**kwargs)

View file

@ -56,7 +56,8 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._number_of_extruders = 2
self._dummy_lambdas = ("", {}, io.BytesIO()) #type: Tuple[str, Dict, Union[io.StringIO, io.BytesIO]]
self._dummy_lambdas = ("", {}, io.BytesIO()
) # type: Tuple[str, Dict[str, Union[str, int, bool]], Union[io.StringIO, io.BytesIO]]
self._print_jobs = [] # type: List[UM3PrintJobOutputModel]
self._received_print_jobs = False # type: bool
@ -254,7 +255,8 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
# Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get
# timeout responses if this happens.
self._last_response_time = time()
if self._progress_message and new_progress > self._progress_message.getProgress():
old_progress = self._progress_message.getProgress()
if self._progress_message and (old_progress is None or new_progress > old_progress):
self._progress_message.show() # Ensure that the message is visible.
self._progress_message.setProgress(bytes_sent / bytes_total * 100)
@ -345,28 +347,6 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
def getDateCompleted(self, time_remaining: int) -> str:
return formatDateCompleted(time_remaining)
@pyqtSlot(int, result = str)
def getDateCompleted(self, time_remaining: int) -> str:
current_time = time()
completed = datetime.fromtimestamp(current_time + time_remaining)
today = datetime.fromtimestamp(current_time)
# If finishing date is more than 7 days out, using "Mon Dec 3 at HH:MM" format
if completed.toordinal() > today.toordinal() + 7:
return completed.strftime("%a %b ") + "{day}".format(day=completed.day)
# If finishing date is within the next week, use "Monday at HH:MM" format
elif completed.toordinal() > today.toordinal() + 1:
return completed.strftime("%a")
# If finishing tomorrow, use "tomorrow at HH:MM" format
elif completed.toordinal() > today.toordinal():
return "tomorrow"
# If finishing today, use "today at HH:MM" format
else:
return "today"
@pyqtSlot(str)
def sendJobToTop(self, print_job_uuid: str) -> None:
# This function is part of the output device (and not of the printjob output model) as this type of operation

View file

@ -1,7 +1,7 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import io
from typing import Optional, Dict, Union, List
from typing import Optional, Dict, Union, List, cast
from UM.FileHandler.FileHandler import FileHandler
from UM.FileHandler.FileWriter import FileWriter
@ -26,7 +26,7 @@ class MeshFormatHandler:
def __init__(self, file_handler: Optional[FileHandler], firmware_version: str) -> None:
self._file_handler = file_handler or CuraApplication.getInstance().getMeshFileHandler()
self._preferred_format = self._getPreferredFormat(firmware_version)
self._writer = self._getWriter(self._preferred_format["mime_type"]) if self._preferred_format else None
self._writer = self._getWriter(self.mime_type) if self._preferred_format else None
@property
def is_valid(self) -> bool:
@ -47,32 +47,40 @@ class MeshFormatHandler:
@property
def mime_type(self) -> str:
return self._preferred_format["mime_type"]
return cast(str, self._preferred_format["mime_type"])
## Gets the file mode (FileWriter.OutputMode.TextMode or FileWriter.OutputMode.BinaryMode)
@property
def file_mode(self) -> int:
return self._preferred_format["mode"]
return cast(int, self._preferred_format["mode"])
## Gets the file extension
@property
def file_extension(self) -> str:
return self._preferred_format["extension"]
return cast(str, self._preferred_format["extension"])
## Creates the right kind of stream based on the preferred format.
def createStream(self) -> Union[io.BytesIO, io.StringIO]:
return io.StringIO() if self.file_mode == FileWriter.OutputMode.TextMode else io.BytesIO()
if self.file_mode == FileWriter.OutputMode.TextMode:
return io.StringIO()
else:
return io.BytesIO()
## Writes the mesh and returns its value.
def getBytes(self, nodes: List[SceneNode]) -> bytes:
if self.writer is None:
raise ValueError("There is no writer for the mesh format handler.")
stream = self.createStream()
self.writer.write(stream, nodes)
return stream.getvalue()
value = stream.getvalue()
if isinstance(value, str):
value = value.encode()
return value
## Chooses the preferred file format for the given file handler.
# \param firmware_version: The version of the firmware.
# \return A dict with the file format details.
def _getPreferredFormat(self, firmware_version: str) -> Optional[Dict[str, Union[str, int, bool]]]:
def _getPreferredFormat(self, firmware_version: str) -> Dict[str, Union[str, int, bool]]:
# Formats supported by this application (file types that we can actually write).
application = CuraApplication.getInstance()
@ -82,7 +90,7 @@ class MeshFormatHandler:
# Create a list from the supported file formats string.
if not global_stack:
Logger.log("e", "Missing global stack!")
return
return {}
machine_file_formats = global_stack.getMetaDataEntry("file_formats").split(";")
machine_file_formats = [file_type.strip() for file_type in machine_file_formats]

View file

@ -3,7 +3,7 @@
import json
import os
import urllib.parse
from typing import Dict, TYPE_CHECKING, Set
from typing import Dict, TYPE_CHECKING, Set, Optional
from PyQt5.QtNetwork import QNetworkReply, QNetworkRequest
@ -151,7 +151,7 @@ class SendMaterialJob(Job):
# \return a dictionary of ClusterMaterial objects by GUID
# \throw KeyError Raised when on of the materials does not include a valid guid
@classmethod
def _parseReply(cls, reply: QNetworkReply) -> Dict[str, ClusterMaterial]:
def _parseReply(cls, reply: QNetworkReply) -> Optional[Dict[str, ClusterMaterial]]:
try:
remote_materials = json.loads(reply.readAll().data().decode("utf-8"))
return {material["guid"]: ClusterMaterial(**material) for material in remote_materials}
@ -163,6 +163,7 @@ class SendMaterialJob(Job):
Logger.log("e", "Request material storage on printer: Printer's answer had an incorrect value.")
except TypeError:
Logger.log("e", "Request material storage on printer: Printer's answer was missing a required value.")
return None
## Retrieves a list of local materials
#
@ -184,7 +185,8 @@ class SendMaterialJob(Job):
local_material = LocalMaterial(**material)
if local_material.GUID not in result or \
local_material.version > result.get(local_material.GUID).version:
local_material.GUID not in result or \
local_material.version > result[local_material.GUID].version:
result[local_material.GUID] = local_material
except KeyError:

View file

@ -1,13 +1,12 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from PyQt5.QtCore import pyqtSignal, pyqtProperty, QObject, pyqtSlot
from typing import Optional, TYPE_CHECKING, List
from PyQt5.QtCore import QUrl
from PyQt5.QtGui import QImage
from typing import List
from PyQt5.QtCore import pyqtProperty, pyqtSignal
from cura.PrinterOutput.PrintJobOutputModel import PrintJobOutputModel
from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
from .ConfigurationChangeModel import ConfigurationChangeModel

View file

@ -2,6 +2,7 @@
"data": {
"cluster_job_id": "9a59d8e9-91d3-4ff6-b4cb-9db91c4094dd",
"job_id": "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE=",
"status": "queued"
"status": "queued",
"generated_time": "2018-12-10T08:23:55.110Z"
}
}

View file

@ -1,10 +1,9 @@
{
"data": {
"content_type": "text/plain",
"download_url": "https://api.ultimaker.com/print-job-download",
"job_id": "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE=",
"job_name": "Ultimaker Robot v3.0",
"status": "queued",
"status": "uploading",
"upload_url": "https://api.ultimaker.com/print-job-upload"
}
}

View file

@ -1,7 +0,0 @@
{
"data": {
"cluster_job_id": "",
"job_id": "db34b096-c4d5-46f3-bea7-da6a19905e6c",
"status": "queued"
}
}

View file

@ -1,11 +0,0 @@
{
"data": {
"content_type": "text/plain",
"generated_time": "2018-12-10T09:33:00.009Z",
"job_id": "j9KUn4D6FRRRmdtbCo4OGAwUf6Ml3p3oU-Zv7RNRv92T",
"job_name": "job name",
"status": "uploading",
"status_description": "The print job has been created. Please upload the file.",
"upload_url": "https://www.googleapis.com/upload/storage/v1/b/ultimaker-storage-1/o?uploadType=resumable&upload_id=AEnB2Uqhg1H7BXQVeLJEWw6AheqMicydZVLuH9bnkh6Oge0e6i5X76MW3NZHWRmUTmjzulAF42mkczcC7rsAuPg1Nn8JeFpnNA"
}
}

View file

@ -0,0 +1,2 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.

View file

@ -14,9 +14,6 @@ from UM.Signal import Signal
# After patching the QNetworkManager class, requests are prepared before they can be executed.
# Any requests not prepared beforehand will cause KeyErrors.
class NetworkManagerMock:
# signals used in the network manager.
finished = Signal()
authenticationRequired = Signal()
# an enumeration of the supported operations and their code for the network access manager.
_OPERATIONS = {
@ -33,6 +30,10 @@ class NetworkManagerMock:
self.replies = {} # type: Dict[Tuple[str, str], QNetworkReply]
self.request_bodies = {} # type: Dict[Tuple[str, str], bytes]
# signals used in the network manager.
self.finished = Signal()
self.authenticationRequired = Signal()
## Mock implementation of the get, post, put, delete and head methods from the network manager.
# Since the methods are very simple and the same it didn't make sense to repeat the code.
# \param method: The method being called.

View file

@ -1,23 +1,24 @@
# Copyright (c) 2018 Ultimaker B.V.
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import json
import os
from typing import List
from unittest import TestCase
from unittest.mock import patch, MagicMock
from cura.CuraApplication import CuraApplication
from src.Cloud.CloudApiClient import CloudApiClient
from src.Cloud.CloudOutputDeviceManager import CloudOutputDeviceManager
from src.Cloud.Models.CloudJobResponse import CloudJobResponse
from src.Cloud.Models.CloudJobUploadRequest import CloudJobUploadRequest
from src.Cloud.Models.CloudPrintJobResponse import CloudPrintJobResponse
from src.Cloud.Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
from src.Cloud.Models.CloudErrorObject import CloudErrorObject
from tests.Cloud.Fixtures import readFixture, parseFixture
from .NetworkManagerMock import NetworkManagerMock
@patch("cura.NetworkClient.QNetworkAccessManager")
class TestCloudApiClient(TestCase):
def _errorHandler(self):
pass
def _errorHandler(self, errors: List[CloudErrorObject]):
raise Exception("Received unexpected error: {}".format(errors))
def setUp(self):
super().setUp()
@ -26,7 +27,6 @@ class TestCloudApiClient(TestCase):
self.app = CuraApplication.getInstance()
self.network = NetworkManagerMock()
self.manager = CloudOutputDeviceManager()
self.api = CloudApiClient(self.account, self._errorHandler)
def test_GetClusters(self, network_mock):
@ -71,12 +71,11 @@ class TestCloudApiClient(TestCase):
network_mock.return_value = self.network
results = []
with open("{}/Fixtures/requestUploadResponse.json".format(os.path.dirname(__file__)), "rb") as f:
response = f.read()
response = readFixture("putJobUploadResponse")
self.network.prepareReply("PUT", "https://api-staging.ultimaker.com/cura/v1/jobs/upload", 200, response)
self.api.requestUpload(CloudJobUploadRequest(job_name = "job name", file_size = 143234, content_type = "text/plain"),
lambda r: results.append(r))
request = CloudPrintJobUploadRequest(job_name = "job name", file_size = 143234, content_type = "text/plain")
self.api.requestUpload(request, lambda r: results.append(r))
self.network.flushReplies()
self.assertEqual(results[0].content_type, "text/plain")
@ -87,13 +86,11 @@ class TestCloudApiClient(TestCase):
results = []
progress = MagicMock()
with open("{}/Fixtures/requestUploadResponse.json".format(os.path.dirname(__file__)), "rb") as f:
thedata = json.loads(f.read().decode("ascii"))
data = thedata["data"]
upload_response = CloudJobResponse(**data)
data = parseFixture("putJobUploadResponse")["data"]
upload_response = CloudPrintJobResponse(**data)
self.network.prepareReply("PUT", upload_response.upload_url, 200,
'{ data : "" }') # Network client doesn't look into the reply
b'{ data : "" }') # Network client doesn't look into the reply
self.api.uploadMesh(upload_response, b'', lambda job_id: results.append(job_id),
progress.advance, progress.error)
@ -107,11 +104,11 @@ class TestCloudApiClient(TestCase):
network_mock.return_value = self.network
results = []
cluster_id = "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8"
job_id = "db34b096-c4d5-46f3-bea7-da6a19905e6c"
response = readFixture("postJobPrintResponse")
with open("{}/Fixtures/requestPrintResponse.json".format(os.path.dirname(__file__)), "rb") as f:
response = f.read()
cluster_id = "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8"
cluster_job_id = "9a59d8e9-91d3-4ff6-b4cb-9db91c4094dd"
job_id = "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE="
self.network.prepareReply("POST",
"https://api-staging.ultimaker.com/connect/v1/clusters/{}/print/{}"
@ -123,5 +120,6 @@ class TestCloudApiClient(TestCase):
self.network.flushReplies()
self.assertEqual(len(results), 1)
self.assertEqual(results[0].job_id, "db34b096-c4d5-46f3-bea7-da6a19905e6c")
self.assertEqual(results[0].job_id, job_id)
self.assertEqual(results[0].cluster_job_id, cluster_job_id)
self.assertEqual(results[0].status, "queued")

View file

@ -106,7 +106,9 @@ class TestCloudOutputDeviceManager(TestCase):
@patch("UM.Message.Message.show")
def test_api_error(self, message_mock, network_mock):
self.clusters_response = {"errors": [{"id": "notFound", "title": "Not found!", "http_status": "404"}]}
self.clusters_response = {
"errors": [{"id": "notFound", "title": "Not found!", "http_status": "404", "code": "notFound"}]
}
self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
self._loadData(network_mock)
self.network.flushReplies()