mirror of
https://github.com/Ultimaker/Cura.git
synced 2025-07-07 23:17:32 -06:00
STAR-322: Fixing the multipart upload
This commit is contained in:
parent
fed779d0d2
commit
4dc8edb996
6 changed files with 105 additions and 80 deletions
|
@ -1,7 +1,7 @@
|
|||
# Copyright (c) 2018 Ultimaker B.V.
|
||||
# Cura is released under the terms of the LGPLv3 or higher.
|
||||
from time import time
|
||||
from typing import Optional, Dict, Callable, List, Union
|
||||
from typing import Optional, Dict, Callable, List, Union, Tuple
|
||||
|
||||
from PyQt5.QtCore import QUrl
|
||||
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply, QHttpMultiPart, QNetworkRequest, QHttpPart, \
|
||||
|
@ -16,50 +16,63 @@ from UM.Logger import Logger
|
|||
class NetworkClient:
|
||||
|
||||
def __init__(self) -> None:
|
||||
|
||||
|
||||
# Network manager instance to use for this client.
|
||||
self._manager = None # type: Optional[QNetworkAccessManager]
|
||||
|
||||
|
||||
# Timings.
|
||||
self._last_manager_create_time = None # type: Optional[float]
|
||||
self._last_response_time = None # type: Optional[float]
|
||||
self._last_request_time = None # type: Optional[float]
|
||||
|
||||
|
||||
# The user agent of Cura.
|
||||
application = Application.getInstance()
|
||||
self._user_agent = "%s/%s " % (application.getApplicationName(), application.getVersion())
|
||||
|
||||
# Uses to store callback methods for finished network requests.
|
||||
# This allows us to register network calls with a callback directly instead of having to dissect the reply.
|
||||
self._on_finished_callbacks = {} # type: Dict[str, Callable[[QNetworkReply], None]]
|
||||
# The key is created out of a tuple (operation, url)
|
||||
self._on_finished_callbacks = {} # type: Dict[Tuple[int, str], Callable[[QNetworkReply], None]]
|
||||
|
||||
# QHttpMultiPart objects need to be kept alive and not garbage collected during the
|
||||
# HTTP which uses them. We hold references to these QHttpMultiPart objects here.
|
||||
self._kept_alive_multiparts = {} # type: Dict[QNetworkReply, QHttpMultiPart]
|
||||
|
||||
## Creates a network manager with all the required properties and event bindings.
|
||||
def _createNetworkManager(self) -> None:
|
||||
## Creates a network manager if needed, with all the required properties and event bindings.
|
||||
def start(self) -> None:
|
||||
if self._manager:
|
||||
self._manager.finished.disconnect(self.__handleOnFinished)
|
||||
self._manager.authenticationRequired.disconnect(self._onAuthenticationRequired)
|
||||
return
|
||||
self._manager = QNetworkAccessManager()
|
||||
self._manager.finished.connect(self.__handleOnFinished)
|
||||
self._manager.finished.connect(self._handleOnFinished)
|
||||
self._last_manager_create_time = time()
|
||||
self._manager.authenticationRequired.connect(self._onAuthenticationRequired)
|
||||
|
||||
## Destroys the network manager and event bindings.
|
||||
def stop(self) -> None:
|
||||
if not self._manager:
|
||||
return
|
||||
self._manager.finished.disconnect(self._handleOnFinished)
|
||||
self._manager.authenticationRequired.disconnect(self._onAuthenticationRequired)
|
||||
self._manager = None
|
||||
|
||||
## Create a new empty network request.
|
||||
# Automatically adds the required HTTP headers.
|
||||
# \param url: The URL to request
|
||||
# \param content_type: The type of the body contents.
|
||||
def _createEmptyRequest(self, url: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
|
||||
if not self._manager:
|
||||
self.start() # make sure the manager is created
|
||||
request = QNetworkRequest(QUrl(url))
|
||||
if content_type:
|
||||
request.setHeader(QNetworkRequest.ContentTypeHeader, content_type)
|
||||
request.setHeader(QNetworkRequest.UserAgentHeader, self._user_agent)
|
||||
self._last_request_time = time()
|
||||
return request
|
||||
|
||||
## Executes the correct callback method when a network request finishes.
|
||||
def __handleOnFinished(self, reply: QNetworkReply) -> None:
|
||||
def _handleOnFinished(self, reply: QNetworkReply) -> None:
|
||||
|
||||
Logger.log("i", "On finished %s %s", reply.attribute(QNetworkRequest.HttpStatusCodeAttribute), reply.url().toString())
|
||||
|
||||
# Due to garbage collection, we need to cache certain bits of post operations.
|
||||
# As we don't want to keep them around forever, delete them if we get a reply.
|
||||
|
@ -76,7 +89,7 @@ class NetworkClient:
|
|||
|
||||
# Find the right callback and execute it.
|
||||
# It always takes the full reply as single parameter.
|
||||
callback_key = reply.url().toString() + str(reply.operation())
|
||||
callback_key = reply.operation(), reply.url().toString()
|
||||
if callback_key in self._on_finished_callbacks:
|
||||
self._on_finished_callbacks[callback_key](reply)
|
||||
else:
|
||||
|
@ -87,12 +100,6 @@ class NetworkClient:
|
|||
if reply in self._kept_alive_multiparts:
|
||||
del self._kept_alive_multiparts[reply]
|
||||
|
||||
## Makes sure the network manager is created.
|
||||
def _validateManager(self) -> None:
|
||||
if self._manager is None:
|
||||
self._createNetworkManager()
|
||||
assert self._manager is not None
|
||||
|
||||
## Callback for when the network manager detects that authentication is required but was not given.
|
||||
@staticmethod
|
||||
def _onAuthenticationRequired(reply: QNetworkReply, authenticator: QAuthenticator) -> None:
|
||||
|
@ -102,7 +109,7 @@ class NetworkClient:
|
|||
def _registerOnFinishedCallback(self, reply: QNetworkReply,
|
||||
on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
|
||||
if on_finished is not None:
|
||||
self._on_finished_callbacks[reply.url().toString() + str(reply.operation())] = on_finished
|
||||
self._on_finished_callbacks[reply.operation(), reply.url().toString()] = on_finished
|
||||
|
||||
## Add a part to a Multi-Part form.
|
||||
@staticmethod
|
||||
|
@ -133,33 +140,23 @@ class NetworkClient:
|
|||
def put(self, url: str, data: Union[str, bytes], content_type: Optional[str] = None,
|
||||
on_finished: Optional[Callable[[QNetworkReply], None]] = None,
|
||||
on_progress: Optional[Callable[[int, int], None]] = None) -> None:
|
||||
self._validateManager()
|
||||
|
||||
request = self._createEmptyRequest(url, content_type = content_type)
|
||||
self._last_request_time = time()
|
||||
|
||||
if not self._manager:
|
||||
return Logger.log("e", "No network manager was created to execute the PUT call with.")
|
||||
|
||||
body = data if isinstance(data, bytes) else data.encode() # type: bytes
|
||||
reply = self._manager.put(request, body)
|
||||
self._registerOnFinishedCallback(reply, on_finished)
|
||||
|
||||
if on_progress is not None:
|
||||
# TODO: Do we need to disconnect() as well?
|
||||
reply.uploadProgress.connect(on_progress)
|
||||
reply.finished.connect(lambda r: Logger.log("i", "On finished %s %s", url, r))
|
||||
reply.error.connect(lambda r: Logger.log("i", "On error %s %s", url, r))
|
||||
|
||||
## Sends a delete request to the given path.
|
||||
# url: The path after the API prefix.
|
||||
# on_finished: The function to be call when the response is received.
|
||||
def delete(self, url: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
|
||||
self._validateManager()
|
||||
|
||||
request = self._createEmptyRequest(url)
|
||||
self._last_request_time = time()
|
||||
|
||||
if not self._manager:
|
||||
return Logger.log("e", "No network manager was created to execute the DELETE call with.")
|
||||
|
||||
reply = self._manager.deleteResource(request)
|
||||
self._registerOnFinishedCallback(reply, on_finished)
|
||||
|
||||
|
@ -167,14 +164,7 @@ class NetworkClient:
|
|||
# \param url: The path after the API prefix.
|
||||
# \param on_finished: The function to be call when the response is received.
|
||||
def get(self, url: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
|
||||
self._validateManager()
|
||||
|
||||
request = self._createEmptyRequest(url)
|
||||
self._last_request_time = time()
|
||||
|
||||
if not self._manager:
|
||||
return Logger.log("e", "No network manager was created to execute the GET call with.")
|
||||
|
||||
reply = self._manager.get(request)
|
||||
self._registerOnFinishedCallback(reply, on_finished)
|
||||
|
||||
|
@ -186,13 +176,7 @@ class NetworkClient:
|
|||
def post(self, url: str, data: Union[str, bytes],
|
||||
on_finished: Optional[Callable[[QNetworkReply], None]],
|
||||
on_progress: Optional[Callable[[int, int], None]] = None) -> None:
|
||||
self._validateManager()
|
||||
|
||||
request = self._createEmptyRequest(url)
|
||||
self._last_request_time = time()
|
||||
|
||||
if not self._manager:
|
||||
return Logger.log("e", "Could not find manager.")
|
||||
|
||||
body = data if isinstance(data, bytes) else data.encode() # type: bytes
|
||||
reply = self._manager.post(request, body)
|
||||
|
@ -213,20 +197,12 @@ class NetworkClient:
|
|||
def postFormWithParts(self, target: str, parts: List[QHttpPart],
|
||||
on_finished: Optional[Callable[[QNetworkReply], None]],
|
||||
on_progress: Optional[Callable[[int, int], None]] = None) -> Optional[QNetworkReply]:
|
||||
self._validateManager()
|
||||
|
||||
request = self._createEmptyRequest(target, content_type = None)
|
||||
multi_post_part = QHttpMultiPart(QHttpMultiPart.FormDataType)
|
||||
|
||||
for part in parts:
|
||||
multi_post_part.append(part)
|
||||
|
||||
self._last_request_time = time()
|
||||
|
||||
if not self._manager:
|
||||
Logger.log("e", "No network manager was created to execute the POST call with.")
|
||||
return None
|
||||
|
||||
reply = self._manager.post(request, multi_post_part)
|
||||
|
||||
self._kept_alive_multiparts[reply] = multi_post_part
|
||||
|
|
|
@ -54,6 +54,8 @@ class AuthorizationService:
|
|||
self._user_profile = self._parseJWT()
|
||||
if not self._user_profile:
|
||||
# If there is still no user profile from the JWT, we have to log in again.
|
||||
Logger.log("w", "The user profile could not be loaded. The user must log in again!")
|
||||
self.deleteAuthData()
|
||||
return None
|
||||
|
||||
return self._user_profile
|
||||
|
|
|
@ -66,7 +66,7 @@ class T:
|
|||
class CloudOutputDevice(NetworkedPrinterOutputDevice):
|
||||
|
||||
# The interval with which the remote clusters are checked
|
||||
CHECK_CLUSTER_INTERVAL = 5.0 # seconds
|
||||
CHECK_CLUSTER_INTERVAL = 50.0 # seconds
|
||||
|
||||
# Signal triggered when the print jobs in the queue were changed.
|
||||
printJobsChanged = pyqtSignal()
|
||||
|
@ -150,7 +150,6 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
|
|||
return
|
||||
|
||||
# Indicate we have started sending a job.
|
||||
self._sending_job = True
|
||||
self.writeStarted.emit(self)
|
||||
|
||||
mesh_format = MeshFormatHandler(file_handler, self.firmwareVersion)
|
||||
|
@ -173,6 +172,8 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
|
|||
if self._last_response_time and time() - self._last_response_time < self.CHECK_CLUSTER_INTERVAL:
|
||||
return # avoid calling the cloud too often
|
||||
|
||||
Logger.log("i", "Requesting update for %s after %s", self._device_id,
|
||||
self._last_response_time and time() - self._last_response_time)
|
||||
if self._account.isLoggedIn:
|
||||
self.setAuthenticationState(AuthState.Authenticated)
|
||||
self._api.getClusterStatus(self._device_id, self._onStatusCallFinished)
|
||||
|
@ -183,6 +184,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
|
|||
# Contains both printers and print jobs statuses in a single response.
|
||||
def _onStatusCallFinished(self, status: CloudClusterStatus) -> None:
|
||||
# Update all data from the cluster.
|
||||
self._last_response_time = time()
|
||||
if self._received_printers != status.printers:
|
||||
self._received_printers = status.printers
|
||||
self._updatePrinters(status.printers)
|
||||
|
@ -289,7 +291,8 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
|
|||
## Requests the print to be sent to the printer when we finished uploading the mesh.
|
||||
# \param job_id: The ID of the job.
|
||||
def _onPrintJobUploaded(self, job_id: str) -> None:
|
||||
self._api.requestPrint(self._device_id, job_id, self._onUploadSuccess)
|
||||
self._progress.update(100)
|
||||
self._api.requestPrint(self._device_id, job_id, self._onPrintRequested)
|
||||
|
||||
## Displays the given message if uploading the mesh has failed
|
||||
# \param message: The message to display.
|
||||
|
@ -304,7 +307,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
|
|||
|
||||
## Shows a message when the upload has succeeded
|
||||
# \param response: The response from the cloud API.
|
||||
def _onUploadSuccess(self, response: CloudPrintResponse) -> None:
|
||||
def _onPrintRequested(self, response: CloudPrintResponse) -> None:
|
||||
Logger.log("i", "The cluster will be printing this print job with the ID %s", response.cluster_job_id)
|
||||
self._progress.hide()
|
||||
Message(
|
||||
|
@ -312,7 +315,6 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
|
|||
title = T.UPLOAD_SUCCESS_TITLE,
|
||||
lifetime = 5
|
||||
).show()
|
||||
self._sending_job = False # the upload has finished so we're not sending a job anymore
|
||||
self.writeFinished.emit()
|
||||
|
||||
## Gets the remote printers.
|
||||
|
|
|
@ -26,7 +26,7 @@ class CloudOutputDeviceManager:
|
|||
META_CLUSTER_ID = "um_cloud_cluster_id"
|
||||
|
||||
# The interval with which the remote clusters are checked
|
||||
CHECK_CLUSTER_INTERVAL = 5.0 # seconds
|
||||
CHECK_CLUSTER_INTERVAL = 50.0 # seconds
|
||||
|
||||
# The translation catalog for this device.
|
||||
I18N_CATALOG = i18nCatalog("cura")
|
||||
|
@ -39,26 +39,22 @@ class CloudOutputDeviceManager:
|
|||
self._output_device_manager = application.getOutputDeviceManager()
|
||||
|
||||
self._account = application.getCuraAPI().account # type: Account
|
||||
self._account.loginStateChanged.connect(self._onLoginStateChanged)
|
||||
self._api = CloudApiClient(self._account, self._onApiError)
|
||||
|
||||
# When switching machines we check if we have to activate a remote cluster.
|
||||
application.globalContainerStackChanged.connect(self._connectToActiveMachine)
|
||||
|
||||
# create a timer to update the remote cluster list
|
||||
self._update_timer = QTimer(application)
|
||||
self._update_timer.setInterval(int(self.CHECK_CLUSTER_INTERVAL * 1000))
|
||||
self._update_timer.setSingleShot(False)
|
||||
self._update_timer.timeout.connect(self._getRemoteClusters)
|
||||
|
||||
# Make sure the timer is started in case we missed the loginChanged signal
|
||||
self._onLoginStateChanged(self._account.isLoggedIn)
|
||||
self._running = False
|
||||
|
||||
# Called when the uses logs in or out
|
||||
def _onLoginStateChanged(self, is_logged_in: bool) -> None:
|
||||
Logger.log("i", "Log in state changed to %s", is_logged_in)
|
||||
if is_logged_in:
|
||||
if not self._update_timer.isActive():
|
||||
self._update_timer.start()
|
||||
self._getRemoteClusters()
|
||||
else:
|
||||
if self._update_timer.isActive():
|
||||
self._update_timer.stop()
|
||||
|
@ -136,3 +132,25 @@ class CloudOutputDeviceManager:
|
|||
lifetime = 10,
|
||||
dismissable = True
|
||||
).show()
|
||||
|
||||
def start(self):
|
||||
if self._running:
|
||||
return
|
||||
application = CuraApplication.getInstance()
|
||||
self._account.loginStateChanged.connect(self._onLoginStateChanged)
|
||||
# When switching machines we check if we have to activate a remote cluster.
|
||||
application.globalContainerStackChanged.connect(self._connectToActiveMachine)
|
||||
self._update_timer.timeout.connect(self._getRemoteClusters)
|
||||
self._api.start()
|
||||
self._onLoginStateChanged(is_logged_in = self._account.isLoggedIn)
|
||||
|
||||
def stop(self):
|
||||
if not self._running:
|
||||
return
|
||||
application = CuraApplication.getInstance()
|
||||
self._account.loginStateChanged.disconnect(self._onLoginStateChanged)
|
||||
# When switching machines we check if we have to activate a remote cluster.
|
||||
application.globalContainerStackChanged.disconnect(self._connectToActiveMachine)
|
||||
self._update_timer.timeout.disconnect(self._getRemoteClusters)
|
||||
self._api.stop()
|
||||
self._onLoginStateChanged(is_logged_in = False)
|
||||
|
|
|
@ -51,44 +51,69 @@ class ResumableUpload(NetworkClient):
|
|||
return self._sent_bytes, last_byte
|
||||
|
||||
def start(self) -> None:
|
||||
super().start()
|
||||
if self._finished:
|
||||
self._sent_bytes = 0
|
||||
self._retries = 0
|
||||
self._finished = False
|
||||
self._uploadChunk()
|
||||
|
||||
def stop(self):
|
||||
super().stop()
|
||||
Logger.log("i", "Stopped uploading")
|
||||
self._finished = True
|
||||
|
||||
def _uploadChunk(self) -> None:
|
||||
if self._finished:
|
||||
raise ValueError("The upload is already finished")
|
||||
|
||||
first_byte, last_byte = self._chunkRange()
|
||||
Logger.log("i", "PUT %s - %s", first_byte, last_byte)
|
||||
self.put(self._url, data = self._data[first_byte:last_byte], content_type = self._content_type,
|
||||
on_finished = self.finishedCallback, on_progress = self.progressCallback)
|
||||
# self.put(self._url, data = self._data[first_byte:last_byte], content_type = self._content_type,
|
||||
# on_finished = self.finishedCallback, on_progress = self._progressCallback)
|
||||
request = self._createEmptyRequest(self._url, content_type=self._content_type)
|
||||
|
||||
def progressCallback(self, bytes_sent: int, bytes_total: int) -> None:
|
||||
reply = self._manager.put(request, self._data[first_byte:last_byte])
|
||||
reply.finished.connect(lambda: self._finishedCallback(reply))
|
||||
reply.uploadProgress.connect(self._progressCallback)
|
||||
reply.error.connect(self._errorCallback)
|
||||
if reply.isFinished():
|
||||
self._finishedCallback(reply)
|
||||
|
||||
def _progressCallback(self, bytes_sent: int, bytes_total: int) -> None:
|
||||
Logger.log("i", "Progress callback %s / %s", bytes_sent, bytes_total)
|
||||
if bytes_total:
|
||||
self._on_progress(int((self._sent_bytes + bytes_sent) / len(self._data) * 100))
|
||||
|
||||
def finishedCallback(self, reply: QNetworkReply) -> None:
|
||||
def _errorCallback(self, reply: QNetworkReply) -> None:
|
||||
body = bytes(reply.readAll()).decode()
|
||||
Logger.log("e", "Received error while uploading: %s", body)
|
||||
self.stop()
|
||||
self._on_error()
|
||||
|
||||
def _finishedCallback(self, reply: QNetworkReply) -> None:
|
||||
Logger.log("i", "Finished callback %s %s",
|
||||
reply.attribute(QNetworkRequest.HttpStatusCodeAttribute), reply.url().toString())
|
||||
|
||||
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
|
||||
|
||||
if self._retries < self.MAX_RETRIES and status_code in self.RETRY_HTTP_CODES:
|
||||
self._retries += 1
|
||||
Logger.log("i", "Retrying %s/%s request %s", tries, self.MAX_RETRIES, request.url)
|
||||
Logger.log("i", "Retrying %s/%s request %s", self._retries, self.MAX_RETRIES, reply.url().toString())
|
||||
self._uploadChunk()
|
||||
return
|
||||
|
||||
if status_code > 308:
|
||||
self._errorCallback(reply)
|
||||
return
|
||||
|
||||
body = bytes(reply.readAll()).decode()
|
||||
Logger.log("w", "status_code: %s, Headers: %s, body: %s", status_code,
|
||||
[bytes(header).decode() for header in reply.rawHeaderList()], body)
|
||||
|
||||
if status_code > 308:
|
||||
self._finished = True
|
||||
Logger.log("e", "Received error while uploading: %s", body)
|
||||
self._on_error()
|
||||
return
|
||||
|
||||
first_byte, last_byte = self._chunkRange()
|
||||
self._sent_bytes += last_byte - first_byte
|
||||
self._finished = self._sent_bytes >= len(self._data)
|
||||
if self._finished:
|
||||
if self._sent_bytes >= len(self._data):
|
||||
self.stop()
|
||||
self._on_finished()
|
||||
else:
|
||||
self._uploadChunk()
|
||||
|
|
|
@ -86,6 +86,7 @@ class UM3OutputDevicePlugin(OutputDevicePlugin):
|
|||
## Start looking for devices on network.
|
||||
def start(self):
|
||||
self.startDiscovery()
|
||||
self._cloud_output_device_manager.start()
|
||||
|
||||
def startDiscovery(self):
|
||||
self.stop()
|
||||
|
@ -142,6 +143,7 @@ class UM3OutputDevicePlugin(OutputDevicePlugin):
|
|||
if self._zero_conf is not None:
|
||||
Logger.log("d", "zeroconf close...")
|
||||
self._zero_conf.close()
|
||||
self._cloud_output_device_manager.stop()
|
||||
|
||||
def removeManualDevice(self, key, address = None):
|
||||
if key in self._discovered_devices:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue