STAR-332: Fixing warnings

This commit is contained in:
Daniel Schiavini 2018-12-03 11:02:49 +01:00
parent 8b42b84618
commit 8066074a2f
4 changed files with 146 additions and 131 deletions

View file

@ -41,7 +41,8 @@ class NetworkedPrinterOutputDevice(PrinterOutputDevice):
self._api_prefix = "" self._api_prefix = ""
self._address = address self._address = address
self._properties = properties self._properties = properties
self._user_agent = "%s/%s " % (CuraApplication.getInstance().getApplicationName(), CuraApplication.getInstance().getVersion()) self._user_agent = "%s/%s " % (CuraApplication.getInstance().getApplicationName(),
CuraApplication.getInstance().getVersion())
self._onFinishedCallbacks = {} # type: Dict[str, Callable[[QNetworkReply], None]] self._onFinishedCallbacks = {} # type: Dict[str, Callable[[QNetworkReply], None]]
self._authentication_state = AuthState.NotAuthenticated self._authentication_state = AuthState.NotAuthenticated
@ -55,7 +56,8 @@ class NetworkedPrinterOutputDevice(PrinterOutputDevice):
self._gcode = [] # type: List[str] self._gcode = [] # type: List[str]
self._connection_state_before_timeout = None # type: Optional[ConnectionState] self._connection_state_before_timeout = None # type: Optional[ConnectionState]
def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, **kwargs: str) -> None: def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False,
file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
raise NotImplementedError("requestWrite needs to be implemented") raise NotImplementedError("requestWrite needs to be implemented")
def setAuthenticationState(self, authentication_state: AuthState) -> None: def setAuthenticationState(self, authentication_state: AuthState) -> None:

View file

@ -3,20 +3,20 @@
import io import io
import json import json
import os import os
from typing import List, Optional, Dict, cast from typing import List, Optional, Dict, cast, Union
from PyQt5.QtCore import QObject, pyqtSignal, QUrl, pyqtProperty from PyQt5.QtCore import QObject, pyqtSignal, QUrl, pyqtProperty
from PyQt5.QtNetwork import QNetworkReply, QNetworkRequest from PyQt5.QtNetwork import QNetworkReply, QNetworkRequest
from UM import i18nCatalog from UM import i18nCatalog
from UM.FileHandler import FileWriter from UM.FileHandler.FileWriter import FileWriter
from UM.FileHandler.FileHandler import FileHandler from UM.FileHandler.FileHandler import FileHandler
from UM.Logger import Logger from UM.Logger import Logger
from UM.OutputDevice import OutputDeviceError from UM.OutputDevice import OutputDeviceError
from UM.Scene.SceneNode import SceneNode from UM.Scene.SceneNode import SceneNode
from UM.Version import Version from UM.Version import Version
from cura.CuraApplication import CuraApplication from cura.CuraApplication import CuraApplication
from cura.PrinterOutput import PrinterOutputController, PrintJobOutputModel from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
from cura.PrinterOutput.MaterialOutputModel import MaterialOutputModel from cura.PrinterOutput.MaterialOutputModel import MaterialOutputModel
from cura.PrinterOutput.NetworkedPrinterOutputDevice import NetworkedPrinterOutputDevice, AuthState from cura.PrinterOutput.NetworkedPrinterOutputDevice import NetworkedPrinterOutputDevice, AuthState
from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
@ -38,7 +38,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
# The cloud URL to use for this remote cluster. # The cloud URL to use for this remote cluster.
# TODO: Make sure that this url goes to the live api before release # TODO: Make sure that this url goes to the live api before release
ROOT_PATH= "https://api-staging.ultimaker.com" ROOT_PATH = "https://api-staging.ultimaker.com"
CLUSTER_API_ROOT = "{}/connect/v1/".format(ROOT_PATH) CLUSTER_API_ROOT = "{}/connect/v1/".format(ROOT_PATH)
CURA_API_ROOT = "{}/cura/v1/".format(ROOT_PATH) CURA_API_ROOT = "{}/cura/v1/".format(ROOT_PATH)
CURA_DRIVE_API_ROOT = "{}/cura-drive/v1/".format(ROOT_PATH) CURA_DRIVE_API_ROOT = "{}/cura-drive/v1/".format(ROOT_PATH)
@ -66,10 +66,26 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self._printers = {} # type: Dict[str, PrinterOutputModel] self._printers = {} # type: Dict[str, PrinterOutputModel]
self._print_jobs = {} # type: Dict[str, PrintJobOutputModel] self._print_jobs = {} # type: Dict[str, PrintJobOutputModel]
self._number_of_extruders = 2 # All networked printers are dual-extrusion Ultimaker machines. self._number_of_extruders = 2 # All networked printers are dual-extrusion Ultimaker machines.
@staticmethod
def _parseReply(reply: QNetworkReply) -> Tuple[int, Union[None, str, bytes]]:
"""
Parses a reply from the stardust server.
:param reply: The reply received from the server.
:return: The status code and the response dict.
"""
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
response = None
try:
response = bytes(reply.readAll()).decode("utf-8")
response = json.loads(response)
except JSONDecodeError:
Logger.logException("w", "Unable to decode JSON from reply.")
return status_code, response
## We need to override _createEmptyRequest to work for the cloud. ## We need to override _createEmptyRequest to work for the cloud.
def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest: def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
#url = QUrl(self.CLUSTER_API_ROOT_PATH_FORMAT.format(cluster_id = self._device_id) + path) # noinspection PyArgumentList
url = QUrl(path) url = QUrl(path)
request = QNetworkRequest(url) request = QNetworkRequest(url)
request.setHeader(QNetworkRequest.ContentTypeHeader, content_type) request.setHeader(QNetworkRequest.ContentTypeHeader, content_type)
@ -98,8 +114,9 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mime_types: bool = False, def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mime_types: bool = False,
file_handler: Optional[FileHandler] = None, **kwargs: str) -> None: file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
self.writeStarted.emit(self) self.writeStarted.emit(self)
file_format = self._determineFileFormat(file_handler) file_format = self._determineFileFormat(file_handler)
writer = self._determineWriter(file_format) writer = self._determineWriter(file_handler, file_format)
# This function pauses with the yield, waiting on instructions on which printer it needs to print with. # This function pauses with the yield, waiting on instructions on which printer it needs to print with.
if not writer: if not writer:
@ -123,7 +140,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self._addPrintJobToQueue(stream, request) self._addPrintJobToQueue(stream, request)
# TODO: This is yanked right out of ClusterUM3OoutputDevice, great candidate for a utility or base class # TODO: This is yanked right out of ClusterUM3OoutputDevice, great candidate for a utility or base class
def _determineFileFormat(self, file_handler) -> None: def _determineFileFormat(self, file_handler) -> Optional[Dict[str, Union[str, int]]]:
# Formats supported by this application (file types that we can actually write). # Formats supported by this application (file types that we can actually write).
if file_handler: if file_handler:
file_formats = file_handler.getSupportedFileTypesWrite() file_formats = file_handler.getSupportedFileTypesWrite()
@ -143,21 +160,28 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
machine_file_formats = ["application/x-ufp"] + machine_file_formats machine_file_formats = ["application/x-ufp"] + machine_file_formats
# Take the intersection between file_formats and machine_file_formats. # Take the intersection between file_formats and machine_file_formats.
format_by_mimetype = {format["mime_type"]: format for format in file_formats} format_by_mimetype = {f["mime_type"]: f for f in file_formats}
file_formats = [format_by_mimetype[mimetype] for mimetype in machine_file_formats] #Keep them ordered according to the preference in machine_file_formats.
# Keep them ordered according to the preference in machine_file_formats.
file_formats = [format_by_mimetype[mimetype] for mimetype in machine_file_formats]
if len(file_formats) == 0: if len(file_formats) == 0:
Logger.log("e", "There are no file formats available to write with!") Logger.log("e", "There are no file formats available to write with!")
raise OutputDeviceError.WriteRequestFailedError(self.I18N_CATALOG.i18nc("@info:status", "There are no file formats available to write with!")) raise OutputDeviceError.WriteRequestFailedError(
self.I18N_CATALOG.i18nc("@info:status", "There are no file formats available to write with!")
)
return file_formats[0] return file_formats[0]
# TODO: This is yanked right out of ClusterUM3OoutputDevice, great candidate for a utility or base class # TODO: This is yanked right out of ClusterUM3OoutputDevice, great candidate for a utility or base class
def _determineWriter(self, file_handler, file_format): @staticmethod
def _determineWriter(file_handler, file_format) -> Optional[FileWriter]:
# Just take the first file format available. # Just take the first file format available.
if file_handler is not None: if file_handler is not None:
writer = file_handler.getWriterByMimeType(cast(str, file_format["mime_type"])) writer = file_handler.getWriterByMimeType(cast(str, file_format["mime_type"]))
else: else:
writer = CuraApplication.getInstance().getMeshFileHandler().getWriterByMimeType(cast(str, file_format["mime_type"])) writer = CuraApplication.getInstance().getMeshFileHandler().getWriterByMimeType(
cast(str, file_format["mime_type"])
)
if not writer: if not writer:
Logger.log("e", "Unexpected error when trying to get the FileWriter") Logger.log("e", "Unexpected error when trying to get the FileWriter")
@ -173,7 +197,8 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
## Get remote print jobs. ## Get remote print jobs.
@pyqtProperty("QVariantList", notify = printJobsChanged) @pyqtProperty("QVariantList", notify = printJobsChanged)
def queuedPrintJobs(self) -> List[UM3PrintJobOutputModel]: def queuedPrintJobs(self) -> List[UM3PrintJobOutputModel]:
return [print_job for print_job in self._print_jobs if print_job.state == "queued" or print_job.state == "error"] return [print_job for print_job in self._print_jobs.values()
if print_job.state == "queued" or print_job.state == "error"]
## Called when the connection to the cluster changes. ## Called when the connection to the cluster changes.
def connect(self) -> None: def connect(self) -> None:
@ -182,20 +207,19 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
## Called when the network data should be updated. ## Called when the network data should be updated.
def _update(self) -> None: def _update(self) -> None:
super()._update() super()._update()
self.get("{root}/cluster/{cluster_id}/status".format(self.CLUSTER_API_ROOT, self._device_id), self.get("{root}/cluster/{cluster_id}/status".format(root=self.CLUSTER_API_ROOT, cluster_id=self._device_id),
on_finished = self._onStatusCallFinished) on_finished = self._onStatusCallFinished)
## Method called when HTTP request to status endpoint is finished. ## Method called when HTTP request to status endpoint is finished.
# Contains both printers and print jobs statuses in a single response. # Contains both printers and print jobs statuses in a single response.
def _onStatusCallFinished(self, reply: QNetworkReply) -> None: def _onStatusCallFinished(self, reply: QNetworkReply) -> None:
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) status_code, response = self._parseReply(reply)
if status_code != 200: if status_code > 204:
Logger.log("w", "Got unexpected response while trying to get cloud cluster data: {}, {}" Logger.log("w", "Got unexpected response while trying to get cloud cluster data: {}, {}"
.format(status_code, reply.readAll())) .format(status_code, status, response))
return return
printers, print_jobs = self._parseStatusResponse(reply) printers, print_jobs = self._parseStatusResponse(response)
if not printers and not print_jobs: if not printers and not print_jobs:
return return
@ -204,79 +228,69 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self._updatePrintJobs(print_jobs) self._updatePrintJobs(print_jobs)
@staticmethod @staticmethod
def _parseStatusResponse(reply: QNetworkReply): # Optional[(CloudClusterPrinter, CloudClusterPrintJob)] doesn't work def _parseStatusResponse(response: dict) -> Optional[Tuple[CloudClusterPrinter, CloudClusterPrintJob]]:
printers = [] printers = []
print_jobs = [] print_jobs = []
s = ''
try:
s = json.loads(bytes(reply.readAll()).decode("utf-8"))
for p in s["printers"]: data = response["data"]
printer = CloudClusterPrinter(**p) for p in data["printers"]:
configuration = printer.configuration printer = CloudClusterPrinter(**p)
printer.configuration = [] configuration = printer.configuration
for c in configuration: printer.configuration = []
extruder = CloudClusterPrinterConfiguration(**c) for c in configuration:
extruder.material = CloudClusterPrinterConfigurationMaterial(extruder.material) extruder = CloudClusterPrinterConfiguration(**c)
printer.configuration.append(extruder) extruder.material = CloudClusterPrinterConfigurationMaterial(material=extruder.material)
printer.configuration.append(extruder)
printers.append(printer) printers.append(printer)
for j in s["print_jobs"]: for j in data["print_jobs"]:
job = CloudClusterPrintJob(**j) job = CloudClusterPrintJob(**j)
constraints = job.constraints constraints = job.constraints
job.constraints = [] job.constraints = []
for c in constraints: for c in constraints:
job.constraints.append(CloudClusterPrintJobConstraint(**c)) job.constraints.append(CloudClusterPrintJobConstraint(**c))
configuration = job.configuration configuration = job.configuration
job.configuration = [] job.configuration = []
for c in configuration: for c in configuration:
configuration = CloudClusterPrinterConfiguration(**c) configuration = CloudClusterPrinterConfiguration(**c)
configuration.material = CloudClusterPrinterConfigurationMaterial(configuration.material) configuration.material = CloudClusterPrinterConfigurationMaterial(material=configuration.material)
job.configuration.append(configuration) job.configuration.append(configuration)
print_jobs.append(job) print_jobs.append(job)
except json.decoder.JSONDecodeError:
Logger.logException("w", "Unable to decode JSON from reply.")
return None
return printers, print_jobs return printers, print_jobs
def _updatePrinters(self, printers: List[CloudClusterPrinter]) -> None: def _updatePrinters(self, printers: List[CloudClusterPrinter]) -> None:
remote_printers = {p.uuid: p for p in printers} remote_printers = {p.uuid: p for p in printers} # type: Dict[str, CloudClusterPrinter]
removed_printers = set(self._printers.keys()).difference(set(remote_printers.keys())) removed_printer_ids = set(self._printers).difference(remote_printers)
new_printers = set(remote_printers.keys()).difference(set(self._printers.keys())) new_printer_ids = set(remote_printers).difference(self._printers)
updated_printers = set(self._printers.keys()).intersection(set(remote_printers.keys())) updated_printer_ids = set(self._printers).intersection(remote_printers)
for p in removed_printers: for printer_guid in removed_printer_ids:
self._removePrinter(p) self._removePrinter(printer_guid)
for p in new_printers: for printer_guid in new_printer_ids:
self._addPrinter(printers[p]) self._addPrinter(remote_printers[printer_guid])
self._updatePrinter(printers[p]) self._updatePrinter(remote_printers[printer_guid])
for p in updated_printers: for printer_guid in updated_printer_ids:
self._updatePrinter(printers[p]) self._updatePrinter(remote_printers[printer_guid])
# TODO: properly handle removed and updated printers # TODO: properly handle removed and updated printers
self.printersChanged.emit() self.printersChanged.emit()
def _addPrinter(self, printer: CloudClusterPrinter) -> None:
def _addPrinter(self, printer): self._printers[printer.uuid] = self._createPrinterOutputModel(printer)
self._printers[printer.uuid] = self._createPrinterOutputModel(self, printer)
def _createPrinterOutputModel(self, printer: CloudClusterPrinter) -> PrinterOutputModel: def _createPrinterOutputModel(self, printer: CloudClusterPrinter) -> PrinterOutputModel:
return PrinterOutputModel(PrinterOutputController(self), len(printer.configuration), return PrinterOutputModel(PrinterOutputController(self), len(printer.configuration),
firmware_version=printer.firmware_version) firmware_version=printer.firmware_version)
def _updatePrinter(self, guid : str, printer : CloudClusterPrinter): def _updatePrinter(self, printer: CloudClusterPrinter) -> None:
self._printers[guid] = self._updatePrinterOutputModel(self, printer) model = self._printers[printer.uuid]
def _updatePrinterOutputModel(self, printer: CloudClusterPrinter, model : PrinterOutputModel) -> PrinterOutputModel:
model.updateKey(printer.uuid) model.updateKey(printer.uuid)
model.updateName(printer.friendly_name) model.updateName(printer.friendly_name)
model.updateType(printer.machine_variant) model.updateType(printer.machine_variant)
@ -291,43 +305,42 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
extruder.updateHotendID(extruder_data.print_core_id) extruder.updateHotendID(extruder_data.print_core_id)
material_data = extruder_data.material if extruder.activeMaterial is None or extruder.activeMaterial.guid != extruder_data.material.guid:
if extruder.activeMaterial is None or extruder.activeMaterial.guid != material.guid: material = self._createMaterialOutputModel(extruder_data.material)
material = self._createMaterialOutputModel(material_data)
extruder.updateActiveMaterial(material) extruder.updateActiveMaterial(material)
def _createMaterialOutputModel(self, material: CloudClusterPrinterConfigurationMaterial) -> MaterialOutputModel: @staticmethod
material_manager = CuraApplication.getInstance().getMaterialManager() def _createMaterialOutputModel(material: CloudClusterPrinterConfigurationMaterial) -> MaterialOutputModel:
material_group_list = material_manager.getMaterialGroupListByGUID(material.guid) or [] material_manager = CuraApplication.getInstance().getMaterialManager()
material_group_list = material_manager.getMaterialGroupListByGUID(material.guid) or []
# Sort the material groups by "is_read_only = True" first, and then the name alphabetically. # Sort the material groups by "is_read_only = True" first, and then the name alphabetically.
read_only_material_group_list = list(filter(lambda x: x.is_read_only, material_group_list)) read_only_material_group_list = list(filter(lambda x: x.is_read_only, material_group_list))
non_read_only_material_group_list = list(filter(lambda x: not x.is_read_only, material_group_list)) non_read_only_material_group_list = list(filter(lambda x: not x.is_read_only, material_group_list))
material_group = None material_group = None
if read_only_material_group_list: if read_only_material_group_list:
read_only_material_group_list = sorted(read_only_material_group_list, key=lambda x: x.name) read_only_material_group_list = sorted(read_only_material_group_list, key=lambda x: x.name)
material_group = read_only_material_group_list[0] material_group = read_only_material_group_list[0]
elif non_read_only_material_group_list: elif non_read_only_material_group_list:
non_read_only_material_group_list = sorted(non_read_only_material_group_list, key=lambda x: x.name) non_read_only_material_group_list = sorted(non_read_only_material_group_list, key=lambda x: x.name)
material_group = non_read_only_material_group_list[0] material_group = non_read_only_material_group_list[0]
if material_group: if material_group:
container = material_group.root_material_node.getContainer() container = material_group.root_material_node.getContainer()
color = container.getMetaDataEntry("color_code") color = container.getMetaDataEntry("color_code")
brand = container.getMetaDataEntry("brand") brand = container.getMetaDataEntry("brand")
material_type = container.getMetaDataEntry("material") material_type = container.getMetaDataEntry("material")
name = container.getName() name = container.getName()
else: else:
Logger.log("w", Logger.log("w",
"Unable to find material with guid {guid}. Using data as provided by cluster".format( "Unable to find material with guid {guid}. Using data as provided by cluster".format(
guid=material.guid)) guid=material.guid))
color = material.color color = material.color
brand = material.brand brand = material.brand
material_type = material.material material_type = material.material
name = "Empty" if material.material == "empty" else "Unknown" name = "Empty" if material.material == "empty" else "Unknown"
return MaterialOutputModel(guid=material.guid, type=material_type, brand=brand, color=color, name=name)
return MaterialOutputModel(guid=material.guid, type=material_type, brand=brand, color=color, name=name)
def _removePrinter(self, guid): def _removePrinter(self, guid):
del self._printers[guid] del self._printers[guid]
@ -346,53 +359,51 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self._addPrintJob(jobs[j]) self._addPrintJob(jobs[j])
for j in updated_jobs: for j in updated_jobs:
self._updatePrintJob(jobs[j]) self._updatePrintJob(remote_jobs[j])
# TODO: properly handle removed and updated printers # TODO: properly handle removed and updated printers
self.printJobsChanged() self.printJobsChanged()
def _addPrintJob(self, job: CloudClusterPrintJob): def _addPrintJob(self, job: CloudClusterPrintJob) -> None:
self._print_jobs[job.uuid] = self._createPrintJobOutputModel(job) self._print_jobs[job.uuid] = self._createPrintJobOutputModel(job)
def _createPrintJobOutputModel(self, job:CloudClusterPrintJob) -> PrintJobOutputModel: def _createPrintJobOutputModel(self, job: CloudClusterPrintJob) -> PrintJobOutputModel:
controller = self._printers[job.printer_uuid]._controller # TODO: Can we access this property? controller = self._printers[job.printer_uuid]._controller # TODO: Can we access this property?
model = PrintJobOutputModel(controller, job.uuid, job.name) model = PrintJobOutputModel(controller, job.uuid, job.name)
assigned_printer = self._printes[job.printer_uuid] # TODO: Or do we have to use the assigned_to field? assigned_printer = self._printes[job.printer_uuid] # TODO: Or do we have to use the assigned_to field?
model.updateAssignedPrinter(assigned_printer) model.updateAssignedPrinter(assigned_printer)
return model
def _updatePrintJobOutputModel(self, guid: str, job:CloudClusterPrintJob): def _updatePrintJobOutputModel(self, guid: str, job: CloudClusterPrintJob) -> None:
model =self._print_jobs[guid] model = self._print_jobs[guid]
model.updateTimeTotal(job.time_total) model.updateTimeTotal(job.time_total)
model.updateTimeElapsed(job.time_elapsed) model.updateTimeElapsed(job.time_elapsed)
model.updateOwner(job.owner) model.updateOwner(job.owner)
model.updateState(job.status) model.updateState(job.status)
def _removePrintJob(self, guid:str): def _removePrintJob(self, guid: str):
del self._print_jobs[guid] del self._print_jobs[guid]
def _addPrintJobToQueue(self, stream, request:JobUploadRequest): def _addPrintJobToQueue(self, stream, request: JobUploadRequest):
self.put("{}/jobs/upload".format(self.CURA_API_ROOT), data = json.dumps(request.__dict__), self.put("{}/jobs/upload".format(self.CURA_API_ROOT), data = json.dumps(request.__dict__),
on_finished = lambda reply: self._onAddPrintJobToQueueFinished(stream, reply)) on_finished = lambda reply: self._onAddPrintJobToQueueFinished(stream, reply))
def _onAddPrintJobToQueueFinished(self, stream, reply: QNetworkReply) -> None: def _onAddPrintJobToQueueFinished(self, stream, reply: QNetworkReply) -> None:
s = json.loads(bytes(reply.readAll()).decode("utf-8")) status_code, response = self._parseReply(reply) # type: Tuple[int, dict]
if status_code > 204 or not isinstance(dict, response) or "data" not in response:
Logger.error()
return
self.put() job_response = JobUploadResponse(**response.get("data"))
self.put(job_response.upload_url, data=stream.getvalue(), on_finished=self._onPrintJobUploaded)
# try: def _onPrintJobUploaded(self, reply: QNetworkReply) -> None:
# r = requests.put(self._job.output_url, data=data) status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
# if r.status_code == 200: if status_code > 204:
# Logger.log("d", "Finished writing %s to remote URL %s", "", self._job.output_url) self.onWriteFailed.emit("Failed to export G-code to remote URL: {}".format(r.text))
# self.onWriteSuccess.emit(r.text) Logger.logException("w", "Received unexpected response from the job upload: %s, %s.", status_code,
# else: bytes(reply.readAll()).decode())
# Logger.log("d", "Error writing %s to remote URL %s", "", self._job.output_url) return
# self.onWriteFailed.emit("Failed to export G-code to remote URL: {}".format(r.text))
# except requests.ConnectionError as e:
# Logger.log("e", "There was a connection error when uploading the G-code to a remote URL: %s", e)
# self.onWriteFailed.emit("Failed to export G-code to remote URL: {}".format(e))
# except requests.HTTPError as e:
# Logger.log("e", "There was an HTTP error when uploading the G-code to a remote URL: %s", e)
# self.onWriteFailed.emit("Failed to export G-code to remote URL: {}".format(e))
pass self.onWriteSuccess.emit(r.text)

View file

@ -3,7 +3,7 @@
import json import json
from time import sleep from time import sleep
from threading import Thread from threading import Thread
from typing import Dict, Optional, List from typing import Dict, Optional
from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply

View file

@ -1,5 +1,7 @@
# Copyright (c) 2018 Ultimaker B.V. # Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher. # Cura is released under the terms of the LGPLv3 or higher.
from typing import List
from ..Models import BaseModel from ..Models import BaseModel
@ -41,7 +43,7 @@ class CloudClusterPrinterConfiguration(BaseModel):
## Class representing a cluster printer ## Class representing a cluster printer
class CloudClusterPrinter(BaseModel): class CloudClusterPrinter(BaseModel):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.configuration = None # type: CloudClusterPrinterConfiguration self.configuration = None # type: List[CloudClusterPrinterConfiguration]
self.enabled = None # type: str self.enabled = None # type: str
self.firmware_version = None # type: str self.firmware_version = None # type: str
self.friendly_name = None # type: str self.friendly_name = None # type: str
@ -56,7 +58,7 @@ class CloudClusterPrinter(BaseModel):
## Class representing a cloud cluster print job constraint ## Class representing a cloud cluster print job constraint
class CloudClusterPrintJobConstraint(BaseModel): class CloudClusterPrintJobConstraint(BaseModel):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.require_printer_name: None # type: str self.require_printer_name = None # type: str
super().__init__(**kwargs) super().__init__(**kwargs)
## Class representing a print job ## Class representing a print job