From 8ea4edf67e576014c9ec321e5f847f3659d13e00 Mon Sep 17 00:00:00 2001 From: Daniel Schiavini Date: Tue, 4 Dec 2018 17:37:58 +0100 Subject: [PATCH] STAR-322: Fixing job uploads --- cura/NetworkClient.py | 4 +- .../src/Cloud/CloudApiClient.py | 54 ++++++++----------- .../src/Cloud/CloudOutputDevice.py | 43 ++++++++------- .../src/Cloud/CloudOutputDeviceManager.py | 2 +- 4 files changed, 49 insertions(+), 54 deletions(-) diff --git a/cura/NetworkClient.py b/cura/NetworkClient.py index 8a321b6af4..5294813fb7 100644 --- a/cura/NetworkClient.py +++ b/cura/NetworkClient.py @@ -60,7 +60,7 @@ class NetworkClient: ## Executes the correct callback method when a network request finishes. def __handleOnFinished(self, reply: QNetworkReply) -> None: - + # Due to garbage collection, we need to cache certain bits of post operations. # As we don't want to keep them around forever, delete them if we get a reply. if reply.operation() == QNetworkAccessManager.PostOperation: @@ -79,6 +79,8 @@ class NetworkClient: callback_key = reply.url().toString() + str(reply.operation()) if callback_key in self._on_finished_callbacks: self._on_finished_callbacks[callback_key](reply) + else: + Logger.log("w", "Received reply to URL %s but no callbacks are registered", reply.url()) ## Removes all cached Multi-Part items. def _clearCachedMultiPart(self, reply: QNetworkReply) -> None: diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py index 1d2de1d9bf..d6c20d387b 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py @@ -11,9 +11,7 @@ from cura.API import Account from cura.NetworkClient import NetworkClient from plugins.UM3NetworkPrinting.src.Models import BaseModel from plugins.UM3NetworkPrinting.src.Cloud.Models import ( - CloudCluster, CloudErrorObject, CloudClusterStatus, CloudJobUploadRequest, - CloudJobResponse, - CloudPrintResponse + CloudCluster, CloudErrorObject, CloudClusterStatus, CloudJobUploadRequest, CloudPrintResponse, CloudJobResponse ) @@ -24,8 +22,8 @@ class CloudApiClient(NetworkClient): # The cloud URL to use for this remote cluster. # TODO: Make sure that this URL goes to the live api before release ROOT_PATH = "https://api-staging.ultimaker.com" - CLUSTER_API_ROOT = "{}/connect/v1/".format(ROOT_PATH) - CURA_API_ROOT = "{}/cura/v1/".format(ROOT_PATH) + CLUSTER_API_ROOT = "{}/connect/v1".format(ROOT_PATH) + CURA_API_ROOT = "{}/cura/v1".format(ROOT_PATH) ## Initializes a new cloud API client. # \param account: The user's account object @@ -38,15 +36,15 @@ class CloudApiClient(NetworkClient): ## Retrieves all the clusters for the user that is currently logged in. # \param on_finished: The function to be called after the result is parsed. def getClusters(self, on_finished: Callable[[List[CloudCluster]], any]) -> None: - url = "/clusters" - self.get(url, on_finished=self._createCallback(on_finished, CloudCluster)) + url = "{}/clusters".format(self.CLUSTER_API_ROOT) + self.get(url, on_finished=self._wrapCallback(on_finished, CloudCluster)) ## Retrieves the status of the given cluster. # \param cluster_id: The ID of the cluster. # \param on_finished: The function to be called after the result is parsed. def getClusterStatus(self, cluster_id: str, on_finished: Callable[[CloudClusterStatus], any]) -> None: url = "{}/cluster/{}/status".format(self.CLUSTER_API_ROOT, cluster_id) - self.get(url, on_finished=self._createCallback(on_finished, CloudClusterStatus)) + self.get(url, on_finished=self._wrapCallback(on_finished, CloudClusterStatus)) ## Requests the cloud to register the upload of a print job mesh. # \param request: The request object. @@ -54,13 +52,16 @@ class CloudApiClient(NetworkClient): def requestUpload(self, request: CloudJobUploadRequest, on_finished: Callable[[CloudJobResponse], any]) -> None: url = "{}/jobs/upload".format(self.CURA_API_ROOT) body = json.dumps({"data": request.__dict__}) - self.put(url, body, on_finished=self._createCallback(on_finished, CloudJobResponse)) + self.put(url, body, on_finished=self._wrapCallback(on_finished, CloudJobResponse)) ## Requests the cloud to register the upload of a print job mesh. # \param upload_response: The object received after requesting an upload with `self.requestUpload`. + # \param mesh: The mesh data to be uploaded. # \param on_finished: The function to be called after the result is parsed. It receives the print job ID. + # \param on_progress: A function to be called during upload progress. It receives a percentage (0-100). + # \param on_error: A function to be called if the upload fails. It receives a dict with the error. def uploadMesh(self, upload_response: CloudJobResponse, mesh: bytes, on_finished: Callable[[str], any], - on_progress: Callable[[int], any]): + on_progress: Callable[[int], any], on_error: Callable[[dict], any]): def progressCallback(bytes_sent: int, bytes_total: int) -> None: if bytes_total: @@ -71,7 +72,8 @@ class CloudApiClient(NetworkClient): if status_code < 300: on_finished(upload_response.job_id) else: - self._uploadMeshError(status_code, response) + Logger.log("e", "Received unexpected response %s uploading mesh: %s", status_code, response) + on_error(response) # TODO: Multipart upload self.put(upload_response.upload_url, data = mesh, content_type = upload_response.content_type, @@ -81,9 +83,9 @@ class CloudApiClient(NetworkClient): # \param cluster_id: The ID of the cluster. # \param job_id: The ID of the print job. # \param on_finished: The function to be called after the result is parsed. - def requestPrint(self, cluster_id: str, job_id: str, on_finished: Callable[[], any]) -> None: + def requestPrint(self, cluster_id: str, job_id: str, on_finished: Callable[[CloudPrintResponse], any]) -> None: url = "{}/cluster/{}/print/{}".format(self.CLUSTER_API_ROOT, cluster_id, job_id) - self.post(url, data = "", on_finished=self._createCallback(on_finished, CloudPrintResponse)) + self.post(url, data = "", on_finished=self._wrapCallback(on_finished, CloudPrintResponse)) ## We override _createEmptyRequest in order to add the user credentials. # \param url: The URL to request @@ -92,6 +94,7 @@ class CloudApiClient(NetworkClient): request = super()._createEmptyRequest(path, content_type) if self._account.isLoggedIn: request.setRawHeader(b"Authorization", "Bearer {}".format(self._account.accessToken).encode()) + Logger.log("i", "Created request for URL %s. Logged in = %s", path, self._account.isLoggedIn) return request ## Parses the given JSON network reply into a status code and a dictionary, handling unexpected errors as well. @@ -102,26 +105,13 @@ class CloudApiClient(NetworkClient): status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) try: response = bytes(reply.readAll()).decode() - Logger.log("i", "Received an HTTP %s from %s with %s", status_code, reply.url, response) + Logger.log("i", "Received a reply %s from %s with %s", status_code, reply.url().toString(), response) return status_code, json.loads(response) except (UnicodeDecodeError, JSONDecodeError, ValueError) as err: error = {"code": type(err).__name__, "title": str(err), "http_code": str(status_code)} Logger.logException("e", "Could not parse the stardust response: %s", error) return status_code, {"errors": [error]} - ## Calls the error handler that is responsible for handling errors uploading meshes. - # \param http_status - The status of the HTTP request. - # \param response - The response received from the upload endpoint. This is not formatted according to the standard - # JSON-api response. - def _uploadMeshError(self, http_status: int, response: Dict[str, any]) -> None: - error = CloudErrorObject( - code = "uploadError", - http_status = str(http_status), - title = "Could not upload the mesh", - meta = response - ) - self._on_error([error]) - ## The generic type variable used to document the methods below. Model = TypeVar("Model", bound=BaseModel) @@ -141,14 +131,14 @@ class CloudApiClient(NetworkClient): else: Logger.log("e", "Cannot find data or errors in the cloud response: %s", response) - ## Creates a callback function that includes the parsing of the response into the correct model. + ## Wraps a callback function so that it includes the parsing of the response into the correct model. # \param on_finished: The callback in case the response is successful. # \param model: The type of the model to convert the response to. It may either be a single record or a list. # \return: A function that can be passed to the - def _createCallback(self, - on_finished: Callable[[Union[Model, List[Model]]], any], - model: Type[Model], - ) -> Callable[[QNetworkReply], None]: + def _wrapCallback(self, + on_finished: Callable[[Union[Model, List[Model]]], any], + model: Type[Model], + ) -> Callable[[QNetworkReply], None]: def parse(reply: QNetworkReply) -> None: status_code, response = self._parseReply(reply) return self._parseModels(response, on_finished, model) diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py index af9324c9b0..27bf3a821e 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py @@ -2,6 +2,7 @@ # Cura is released under the terms of the LGPLv3 or higher. import io import os +from time import time from typing import List, Optional, Dict, cast, Union, Set from PyQt5.QtCore import QObject, pyqtSignal, QUrl, pyqtProperty, pyqtSlot @@ -23,11 +24,12 @@ from plugins.UM3NetworkPrinting.src.Cloud.CloudApiClient import CloudApiClient from plugins.UM3NetworkPrinting.src.UM3PrintJobOutputModel import UM3PrintJobOutputModel from .Models import ( CloudClusterPrinter, CloudClusterPrintJob, CloudJobUploadRequest, CloudJobResponse, CloudClusterStatus, - CloudClusterPrinterConfigurationMaterial, CloudErrorObject + CloudClusterPrinterConfigurationMaterial, CloudErrorObject, + CloudPrintResponse ) -## Private class that contains all the translations for this component. +## Class that contains all the translations for this module. class T: # The translation catalog for this device. @@ -61,13 +63,20 @@ class T: # TODO: figure our how the QML interface for the cluster networking should operate with this limited functionality. class CloudOutputDevice(NetworkedPrinterOutputDevice): + # The interval with which the remote clusters are checked + CHECK_CLUSTER_INTERVAL = 2.0 # seconds + # Signal triggered when the printers in the remote cluster were changed. printersChanged = pyqtSignal() # Signal triggered when the print jobs in the queue were changed. printJobsChanged = pyqtSignal() - def __init__(self, api_client: CloudApiClient, device_id: str, parent: QObject = None): + ## Creates a new cloud output device + # \param api_client: The client that will run the API calls + # \param device_id: The ID of the device (i.e. the cluster_id for the cloud API) + # \param parent: The optional parent of this output device. + def __init__(self, api_client: CloudApiClient, device_id: str, parent: QObject = None) -> None: super().__init__(device_id = device_id, address = "", properties = {}, parent = parent) self._api = api_client @@ -76,10 +85,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): self._device_id = device_id self._account = CuraApplication.getInstance().getCuraAPI().account - # Cluster does not have authentication, so default to authenticated - self._authentication_state = AuthState.Authenticated - - # We re-use the Cura Connect monitor tab to get the most functionality right away. + # We use the Cura Connect monitor tab to get most functionality right away. self._monitor_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../resources/qml/ClusterMonitorItem.qml") self._control_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), @@ -118,11 +124,12 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): writer = self._determineWriter(file_handler, file_format) if not writer: Logger.log("e", "Missing file or mesh writer!") - self._onUploadError(T.COULD_NOT_EXPORT) - return + return self._onUploadError(T.COULD_NOT_EXPORT) stream = io.StringIO() if file_format["mode"] == FileWriter.OutputMode.TextMode else io.BytesIO() writer.write(stream, nodes) + + # TODO: Remove extension from the file name, since we are using content types now self._sendPrintJob(file_name + "." + file_format["extension"], file_format["mime_type"], stream) # TODO: This is yanked right out of ClusterUM3OutputDevice, great candidate for a utility or base class @@ -202,7 +209,9 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): ## Called when the network data should be updated. def _update(self) -> None: super()._update() - Logger.log("i", "Calling the cloud cluster") + if self._last_response_time and time() - self._last_response_time < self.CHECK_CLUSTER_INTERVAL: + return # avoid calling the cloud too often + if self._account.isLoggedIn: self.setAuthenticationState(AuthState.Authenticated) self._api.getClusterStatus(self._device_id, self._onStatusCallFinished) @@ -212,7 +221,6 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): ## Method called when HTTP request to status endpoint is finished. # Contains both printers and print jobs statuses in a single response. def _onStatusCallFinished(self, status: CloudClusterStatus) -> None: - Logger.log("d", "Got response form the cloud cluster: %s", status.__dict__) # Update all data from the cluster. self._updatePrinters(status.printers) self._updatePrintJobs(status.print_jobs) @@ -342,21 +350,15 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): request.file_size = len(mesh) request.content_type = content_type - Logger.log("i", "Creating new cloud print job: %s", request.__dict__) self._api.requestUpload(request, lambda response: self._onPrintJobCreated(mesh, response)) def _onPrintJobCreated(self, mesh: bytes, job_response: CloudJobResponse) -> None: - Logger.log("i", "Print job created successfully: %s", job_response.__dict__) - self._api.uploadMesh(job_response, mesh, self._onPrintJobUploaded, self._onUploadPrintJobProgress, - lambda error: self._onUploadError(T.UPLOAD_ERROR)) + self._api.uploadMesh(job_response, mesh, self._onPrintJobUploaded, self._updateUploadProgress, + lambda _: self._onUploadError(T.UPLOAD_ERROR)) def _onPrintJobUploaded(self, job_id: str) -> None: self._api.requestPrint(self._device_id, job_id, self._onUploadSuccess) - def _onUploadPrintJobProgress(self, bytes_sent: int, bytes_total: int) -> None: - if bytes_total > 0: - self._updateUploadProgress(int((bytes_sent / bytes_total) * 100)) - def _updateUploadProgress(self, progress: int): if not self._progress_message: self._progress_message = Message( @@ -389,7 +391,8 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): self.writeError.emit() # Shows a message when the upload has succeeded - def _onUploadSuccess(self): + def _onUploadSuccess(self, response: CloudPrintResponse): + Logger.log("i", "The cluster will be printing this print job with the ID %s", response.cluster_job_id) self._resetUploadProgress() message = Message( text = T.UPLOAD_SUCCESS_TEXT, diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDeviceManager.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDeviceManager.py index 5440795e5d..772d40edd4 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDeviceManager.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDeviceManager.py @@ -21,7 +21,7 @@ from .Models import CloudCluster, CloudErrorObject class CloudOutputDeviceManager: # The interval with which the remote clusters are checked - CHECK_CLUSTER_INTERVAL = 5 # seconds + CHECK_CLUSTER_INTERVAL = 5.0 # seconds # The translation catalog for this device. I18N_CATALOG = i18nCatalog("cura")