STAR-322: Fixing job uploads

This commit is contained in:
Daniel Schiavini 2018-12-04 17:37:58 +01:00
parent 02efc9e1a9
commit 8ea4edf67e
4 changed files with 49 additions and 54 deletions

View file

@ -60,7 +60,7 @@ class NetworkClient:
## Executes the correct callback method when a network request finishes. ## Executes the correct callback method when a network request finishes.
def __handleOnFinished(self, reply: QNetworkReply) -> None: def __handleOnFinished(self, reply: QNetworkReply) -> None:
# Due to garbage collection, we need to cache certain bits of post operations. # Due to garbage collection, we need to cache certain bits of post operations.
# As we don't want to keep them around forever, delete them if we get a reply. # As we don't want to keep them around forever, delete them if we get a reply.
if reply.operation() == QNetworkAccessManager.PostOperation: if reply.operation() == QNetworkAccessManager.PostOperation:
@ -79,6 +79,8 @@ class NetworkClient:
callback_key = reply.url().toString() + str(reply.operation()) callback_key = reply.url().toString() + str(reply.operation())
if callback_key in self._on_finished_callbacks: if callback_key in self._on_finished_callbacks:
self._on_finished_callbacks[callback_key](reply) self._on_finished_callbacks[callback_key](reply)
else:
Logger.log("w", "Received reply to URL %s but no callbacks are registered", reply.url())
## Removes all cached Multi-Part items. ## Removes all cached Multi-Part items.
def _clearCachedMultiPart(self, reply: QNetworkReply) -> None: def _clearCachedMultiPart(self, reply: QNetworkReply) -> None:

View file

@ -11,9 +11,7 @@ from cura.API import Account
from cura.NetworkClient import NetworkClient from cura.NetworkClient import NetworkClient
from plugins.UM3NetworkPrinting.src.Models import BaseModel from plugins.UM3NetworkPrinting.src.Models import BaseModel
from plugins.UM3NetworkPrinting.src.Cloud.Models import ( from plugins.UM3NetworkPrinting.src.Cloud.Models import (
CloudCluster, CloudErrorObject, CloudClusterStatus, CloudJobUploadRequest, CloudCluster, CloudErrorObject, CloudClusterStatus, CloudJobUploadRequest, CloudPrintResponse, CloudJobResponse
CloudJobResponse,
CloudPrintResponse
) )
@ -24,8 +22,8 @@ class CloudApiClient(NetworkClient):
# The cloud URL to use for this remote cluster. # The cloud URL to use for this remote cluster.
# TODO: Make sure that this URL goes to the live api before release # TODO: Make sure that this URL goes to the live api before release
ROOT_PATH = "https://api-staging.ultimaker.com" ROOT_PATH = "https://api-staging.ultimaker.com"
CLUSTER_API_ROOT = "{}/connect/v1/".format(ROOT_PATH) CLUSTER_API_ROOT = "{}/connect/v1".format(ROOT_PATH)
CURA_API_ROOT = "{}/cura/v1/".format(ROOT_PATH) CURA_API_ROOT = "{}/cura/v1".format(ROOT_PATH)
## Initializes a new cloud API client. ## Initializes a new cloud API client.
# \param account: The user's account object # \param account: The user's account object
@ -38,15 +36,15 @@ class CloudApiClient(NetworkClient):
## Retrieves all the clusters for the user that is currently logged in. ## Retrieves all the clusters for the user that is currently logged in.
# \param on_finished: The function to be called after the result is parsed. # \param on_finished: The function to be called after the result is parsed.
def getClusters(self, on_finished: Callable[[List[CloudCluster]], any]) -> None: def getClusters(self, on_finished: Callable[[List[CloudCluster]], any]) -> None:
url = "/clusters" url = "{}/clusters".format(self.CLUSTER_API_ROOT)
self.get(url, on_finished=self._createCallback(on_finished, CloudCluster)) self.get(url, on_finished=self._wrapCallback(on_finished, CloudCluster))
## Retrieves the status of the given cluster. ## Retrieves the status of the given cluster.
# \param cluster_id: The ID of the cluster. # \param cluster_id: The ID of the cluster.
# \param on_finished: The function to be called after the result is parsed. # \param on_finished: The function to be called after the result is parsed.
def getClusterStatus(self, cluster_id: str, on_finished: Callable[[CloudClusterStatus], any]) -> None: def getClusterStatus(self, cluster_id: str, on_finished: Callable[[CloudClusterStatus], any]) -> None:
url = "{}/cluster/{}/status".format(self.CLUSTER_API_ROOT, cluster_id) url = "{}/cluster/{}/status".format(self.CLUSTER_API_ROOT, cluster_id)
self.get(url, on_finished=self._createCallback(on_finished, CloudClusterStatus)) self.get(url, on_finished=self._wrapCallback(on_finished, CloudClusterStatus))
## Requests the cloud to register the upload of a print job mesh. ## Requests the cloud to register the upload of a print job mesh.
# \param request: The request object. # \param request: The request object.
@ -54,13 +52,16 @@ class CloudApiClient(NetworkClient):
def requestUpload(self, request: CloudJobUploadRequest, on_finished: Callable[[CloudJobResponse], any]) -> None: def requestUpload(self, request: CloudJobUploadRequest, on_finished: Callable[[CloudJobResponse], any]) -> None:
url = "{}/jobs/upload".format(self.CURA_API_ROOT) url = "{}/jobs/upload".format(self.CURA_API_ROOT)
body = json.dumps({"data": request.__dict__}) body = json.dumps({"data": request.__dict__})
self.put(url, body, on_finished=self._createCallback(on_finished, CloudJobResponse)) self.put(url, body, on_finished=self._wrapCallback(on_finished, CloudJobResponse))
## Requests the cloud to register the upload of a print job mesh. ## Requests the cloud to register the upload of a print job mesh.
# \param upload_response: The object received after requesting an upload with `self.requestUpload`. # \param upload_response: The object received after requesting an upload with `self.requestUpload`.
# \param mesh: The mesh data to be uploaded.
# \param on_finished: The function to be called after the result is parsed. It receives the print job ID. # \param on_finished: The function to be called after the result is parsed. It receives the print job ID.
# \param on_progress: A function to be called during upload progress. It receives a percentage (0-100).
# \param on_error: A function to be called if the upload fails. It receives a dict with the error.
def uploadMesh(self, upload_response: CloudJobResponse, mesh: bytes, on_finished: Callable[[str], any], def uploadMesh(self, upload_response: CloudJobResponse, mesh: bytes, on_finished: Callable[[str], any],
on_progress: Callable[[int], any]): on_progress: Callable[[int], any], on_error: Callable[[dict], any]):
def progressCallback(bytes_sent: int, bytes_total: int) -> None: def progressCallback(bytes_sent: int, bytes_total: int) -> None:
if bytes_total: if bytes_total:
@ -71,7 +72,8 @@ class CloudApiClient(NetworkClient):
if status_code < 300: if status_code < 300:
on_finished(upload_response.job_id) on_finished(upload_response.job_id)
else: else:
self._uploadMeshError(status_code, response) Logger.log("e", "Received unexpected response %s uploading mesh: %s", status_code, response)
on_error(response)
# TODO: Multipart upload # TODO: Multipart upload
self.put(upload_response.upload_url, data = mesh, content_type = upload_response.content_type, self.put(upload_response.upload_url, data = mesh, content_type = upload_response.content_type,
@ -81,9 +83,9 @@ class CloudApiClient(NetworkClient):
# \param cluster_id: The ID of the cluster. # \param cluster_id: The ID of the cluster.
# \param job_id: The ID of the print job. # \param job_id: The ID of the print job.
# \param on_finished: The function to be called after the result is parsed. # \param on_finished: The function to be called after the result is parsed.
def requestPrint(self, cluster_id: str, job_id: str, on_finished: Callable[[], any]) -> None: def requestPrint(self, cluster_id: str, job_id: str, on_finished: Callable[[CloudPrintResponse], any]) -> None:
url = "{}/cluster/{}/print/{}".format(self.CLUSTER_API_ROOT, cluster_id, job_id) url = "{}/cluster/{}/print/{}".format(self.CLUSTER_API_ROOT, cluster_id, job_id)
self.post(url, data = "", on_finished=self._createCallback(on_finished, CloudPrintResponse)) self.post(url, data = "", on_finished=self._wrapCallback(on_finished, CloudPrintResponse))
## We override _createEmptyRequest in order to add the user credentials. ## We override _createEmptyRequest in order to add the user credentials.
# \param url: The URL to request # \param url: The URL to request
@ -92,6 +94,7 @@ class CloudApiClient(NetworkClient):
request = super()._createEmptyRequest(path, content_type) request = super()._createEmptyRequest(path, content_type)
if self._account.isLoggedIn: if self._account.isLoggedIn:
request.setRawHeader(b"Authorization", "Bearer {}".format(self._account.accessToken).encode()) request.setRawHeader(b"Authorization", "Bearer {}".format(self._account.accessToken).encode())
Logger.log("i", "Created request for URL %s. Logged in = %s", path, self._account.isLoggedIn)
return request return request
## Parses the given JSON network reply into a status code and a dictionary, handling unexpected errors as well. ## Parses the given JSON network reply into a status code and a dictionary, handling unexpected errors as well.
@ -102,26 +105,13 @@ class CloudApiClient(NetworkClient):
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
try: try:
response = bytes(reply.readAll()).decode() response = bytes(reply.readAll()).decode()
Logger.log("i", "Received an HTTP %s from %s with %s", status_code, reply.url, response) Logger.log("i", "Received a reply %s from %s with %s", status_code, reply.url().toString(), response)
return status_code, json.loads(response) return status_code, json.loads(response)
except (UnicodeDecodeError, JSONDecodeError, ValueError) as err: except (UnicodeDecodeError, JSONDecodeError, ValueError) as err:
error = {"code": type(err).__name__, "title": str(err), "http_code": str(status_code)} error = {"code": type(err).__name__, "title": str(err), "http_code": str(status_code)}
Logger.logException("e", "Could not parse the stardust response: %s", error) Logger.logException("e", "Could not parse the stardust response: %s", error)
return status_code, {"errors": [error]} return status_code, {"errors": [error]}
## Calls the error handler that is responsible for handling errors uploading meshes.
# \param http_status - The status of the HTTP request.
# \param response - The response received from the upload endpoint. This is not formatted according to the standard
# JSON-api response.
def _uploadMeshError(self, http_status: int, response: Dict[str, any]) -> None:
error = CloudErrorObject(
code = "uploadError",
http_status = str(http_status),
title = "Could not upload the mesh",
meta = response
)
self._on_error([error])
## The generic type variable used to document the methods below. ## The generic type variable used to document the methods below.
Model = TypeVar("Model", bound=BaseModel) Model = TypeVar("Model", bound=BaseModel)
@ -141,14 +131,14 @@ class CloudApiClient(NetworkClient):
else: else:
Logger.log("e", "Cannot find data or errors in the cloud response: %s", response) Logger.log("e", "Cannot find data or errors in the cloud response: %s", response)
## Creates a callback function that includes the parsing of the response into the correct model. ## Wraps a callback function so that it includes the parsing of the response into the correct model.
# \param on_finished: The callback in case the response is successful. # \param on_finished: The callback in case the response is successful.
# \param model: The type of the model to convert the response to. It may either be a single record or a list. # \param model: The type of the model to convert the response to. It may either be a single record or a list.
# \return: A function that can be passed to the # \return: A function that can be passed to the
def _createCallback(self, def _wrapCallback(self,
on_finished: Callable[[Union[Model, List[Model]]], any], on_finished: Callable[[Union[Model, List[Model]]], any],
model: Type[Model], model: Type[Model],
) -> Callable[[QNetworkReply], None]: ) -> Callable[[QNetworkReply], None]:
def parse(reply: QNetworkReply) -> None: def parse(reply: QNetworkReply) -> None:
status_code, response = self._parseReply(reply) status_code, response = self._parseReply(reply)
return self._parseModels(response, on_finished, model) return self._parseModels(response, on_finished, model)

View file

@ -2,6 +2,7 @@
# Cura is released under the terms of the LGPLv3 or higher. # Cura is released under the terms of the LGPLv3 or higher.
import io import io
import os import os
from time import time
from typing import List, Optional, Dict, cast, Union, Set from typing import List, Optional, Dict, cast, Union, Set
from PyQt5.QtCore import QObject, pyqtSignal, QUrl, pyqtProperty, pyqtSlot from PyQt5.QtCore import QObject, pyqtSignal, QUrl, pyqtProperty, pyqtSlot
@ -23,11 +24,12 @@ from plugins.UM3NetworkPrinting.src.Cloud.CloudApiClient import CloudApiClient
from plugins.UM3NetworkPrinting.src.UM3PrintJobOutputModel import UM3PrintJobOutputModel from plugins.UM3NetworkPrinting.src.UM3PrintJobOutputModel import UM3PrintJobOutputModel
from .Models import ( from .Models import (
CloudClusterPrinter, CloudClusterPrintJob, CloudJobUploadRequest, CloudJobResponse, CloudClusterStatus, CloudClusterPrinter, CloudClusterPrintJob, CloudJobUploadRequest, CloudJobResponse, CloudClusterStatus,
CloudClusterPrinterConfigurationMaterial, CloudErrorObject CloudClusterPrinterConfigurationMaterial, CloudErrorObject,
CloudPrintResponse
) )
## Private class that contains all the translations for this component. ## Class that contains all the translations for this module.
class T: class T:
# The translation catalog for this device. # The translation catalog for this device.
@ -61,13 +63,20 @@ class T:
# TODO: figure our how the QML interface for the cluster networking should operate with this limited functionality. # TODO: figure our how the QML interface for the cluster networking should operate with this limited functionality.
class CloudOutputDevice(NetworkedPrinterOutputDevice): class CloudOutputDevice(NetworkedPrinterOutputDevice):
# The interval with which the remote clusters are checked
CHECK_CLUSTER_INTERVAL = 2.0 # seconds
# Signal triggered when the printers in the remote cluster were changed. # Signal triggered when the printers in the remote cluster were changed.
printersChanged = pyqtSignal() printersChanged = pyqtSignal()
# Signal triggered when the print jobs in the queue were changed. # Signal triggered when the print jobs in the queue were changed.
printJobsChanged = pyqtSignal() printJobsChanged = pyqtSignal()
def __init__(self, api_client: CloudApiClient, device_id: str, parent: QObject = None): ## Creates a new cloud output device
# \param api_client: The client that will run the API calls
# \param device_id: The ID of the device (i.e. the cluster_id for the cloud API)
# \param parent: The optional parent of this output device.
def __init__(self, api_client: CloudApiClient, device_id: str, parent: QObject = None) -> None:
super().__init__(device_id = device_id, address = "", properties = {}, parent = parent) super().__init__(device_id = device_id, address = "", properties = {}, parent = parent)
self._api = api_client self._api = api_client
@ -76,10 +85,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self._device_id = device_id self._device_id = device_id
self._account = CuraApplication.getInstance().getCuraAPI().account self._account = CuraApplication.getInstance().getCuraAPI().account
# Cluster does not have authentication, so default to authenticated # We use the Cura Connect monitor tab to get most functionality right away.
self._authentication_state = AuthState.Authenticated
# We re-use the Cura Connect monitor tab to get the most functionality right away.
self._monitor_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), self._monitor_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"../../resources/qml/ClusterMonitorItem.qml") "../../resources/qml/ClusterMonitorItem.qml")
self._control_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), self._control_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
@ -118,11 +124,12 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
writer = self._determineWriter(file_handler, file_format) writer = self._determineWriter(file_handler, file_format)
if not writer: if not writer:
Logger.log("e", "Missing file or mesh writer!") Logger.log("e", "Missing file or mesh writer!")
self._onUploadError(T.COULD_NOT_EXPORT) return self._onUploadError(T.COULD_NOT_EXPORT)
return
stream = io.StringIO() if file_format["mode"] == FileWriter.OutputMode.TextMode else io.BytesIO() stream = io.StringIO() if file_format["mode"] == FileWriter.OutputMode.TextMode else io.BytesIO()
writer.write(stream, nodes) writer.write(stream, nodes)
# TODO: Remove extension from the file name, since we are using content types now
self._sendPrintJob(file_name + "." + file_format["extension"], file_format["mime_type"], stream) self._sendPrintJob(file_name + "." + file_format["extension"], file_format["mime_type"], stream)
# TODO: This is yanked right out of ClusterUM3OutputDevice, great candidate for a utility or base class # TODO: This is yanked right out of ClusterUM3OutputDevice, great candidate for a utility or base class
@ -202,7 +209,9 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
## Called when the network data should be updated. ## Called when the network data should be updated.
def _update(self) -> None: def _update(self) -> None:
super()._update() super()._update()
Logger.log("i", "Calling the cloud cluster") if self._last_response_time and time() - self._last_response_time < self.CHECK_CLUSTER_INTERVAL:
return # avoid calling the cloud too often
if self._account.isLoggedIn: if self._account.isLoggedIn:
self.setAuthenticationState(AuthState.Authenticated) self.setAuthenticationState(AuthState.Authenticated)
self._api.getClusterStatus(self._device_id, self._onStatusCallFinished) self._api.getClusterStatus(self._device_id, self._onStatusCallFinished)
@ -212,7 +221,6 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
## Method called when HTTP request to status endpoint is finished. ## Method called when HTTP request to status endpoint is finished.
# Contains both printers and print jobs statuses in a single response. # Contains both printers and print jobs statuses in a single response.
def _onStatusCallFinished(self, status: CloudClusterStatus) -> None: def _onStatusCallFinished(self, status: CloudClusterStatus) -> None:
Logger.log("d", "Got response form the cloud cluster: %s", status.__dict__)
# Update all data from the cluster. # Update all data from the cluster.
self._updatePrinters(status.printers) self._updatePrinters(status.printers)
self._updatePrintJobs(status.print_jobs) self._updatePrintJobs(status.print_jobs)
@ -342,21 +350,15 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
request.file_size = len(mesh) request.file_size = len(mesh)
request.content_type = content_type request.content_type = content_type
Logger.log("i", "Creating new cloud print job: %s", request.__dict__)
self._api.requestUpload(request, lambda response: self._onPrintJobCreated(mesh, response)) self._api.requestUpload(request, lambda response: self._onPrintJobCreated(mesh, response))
def _onPrintJobCreated(self, mesh: bytes, job_response: CloudJobResponse) -> None: def _onPrintJobCreated(self, mesh: bytes, job_response: CloudJobResponse) -> None:
Logger.log("i", "Print job created successfully: %s", job_response.__dict__) self._api.uploadMesh(job_response, mesh, self._onPrintJobUploaded, self._updateUploadProgress,
self._api.uploadMesh(job_response, mesh, self._onPrintJobUploaded, self._onUploadPrintJobProgress, lambda _: self._onUploadError(T.UPLOAD_ERROR))
lambda error: self._onUploadError(T.UPLOAD_ERROR))
def _onPrintJobUploaded(self, job_id: str) -> None: def _onPrintJobUploaded(self, job_id: str) -> None:
self._api.requestPrint(self._device_id, job_id, self._onUploadSuccess) self._api.requestPrint(self._device_id, job_id, self._onUploadSuccess)
def _onUploadPrintJobProgress(self, bytes_sent: int, bytes_total: int) -> None:
if bytes_total > 0:
self._updateUploadProgress(int((bytes_sent / bytes_total) * 100))
def _updateUploadProgress(self, progress: int): def _updateUploadProgress(self, progress: int):
if not self._progress_message: if not self._progress_message:
self._progress_message = Message( self._progress_message = Message(
@ -389,7 +391,8 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self.writeError.emit() self.writeError.emit()
# Shows a message when the upload has succeeded # Shows a message when the upload has succeeded
def _onUploadSuccess(self): def _onUploadSuccess(self, response: CloudPrintResponse):
Logger.log("i", "The cluster will be printing this print job with the ID %s", response.cluster_job_id)
self._resetUploadProgress() self._resetUploadProgress()
message = Message( message = Message(
text = T.UPLOAD_SUCCESS_TEXT, text = T.UPLOAD_SUCCESS_TEXT,

View file

@ -21,7 +21,7 @@ from .Models import CloudCluster, CloudErrorObject
class CloudOutputDeviceManager: class CloudOutputDeviceManager:
# The interval with which the remote clusters are checked # The interval with which the remote clusters are checked
CHECK_CLUSTER_INTERVAL = 5 # seconds CHECK_CLUSTER_INTERVAL = 5.0 # seconds
# The translation catalog for this device. # The translation catalog for this device.
I18N_CATALOG = i18nCatalog("cura") I18N_CATALOG = i18nCatalog("cura")