Refactor networked output device

All networking related stuff is moved to a separate class called NetworkClient for reusability.
As example it is now also used in the WIP CloudOutputDeviceManager to clean up network calling there.
This commit is contained in:
ChrisTerBeke 2018-11-20 23:44:28 +01:00
parent 07c9980d91
commit c7bb6931f4
No known key found for this signature in database
GPG key ID: A49F1AB9D7E0C263
3 changed files with 260 additions and 192 deletions

220
cura/NetworkClient.py Normal file
View file

@ -0,0 +1,220 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from time import time
from typing import Optional, Dict, Callable, List
from PyQt5.QtCore import QUrl
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply, QHttpMultiPart, QNetworkRequest, QHttpPart, \
QAuthenticator
from UM.Logger import Logger
from cura.CuraApplication import CuraApplication
## Abstraction of QNetworkAccessManager for easier networking in Cura.
# This was originally part of NetworkedPrinterOutputDevice but was moved out for re-use in other classes.
class NetworkClient:
def __init__(self, application: CuraApplication = None):
# Use the given application instance or get the singleton instance.
self._application = application or CuraApplication.getInstance()
# Network manager instance to use for this client.
self._manager = None # type: Optional[QNetworkAccessManager]
# Timings.
self._last_manager_create_time = None # type: Optional[float]
self._last_response_time = None # type: Optional[float]
self._last_request_time = None # type: Optional[float]
# The user agent of Cura.
self._user_agent = "%s/%s " % (self._application.getApplicationName(), self._application.getVersion())
# Uses to store callback methods for finished network requests.
# This allows us to register network calls with a callback directly instead of having to dissect the reply.
self._on_finished_callbacks = {} # type: Dict[str, Callable[[QNetworkReply], None]]
# QHttpMultiPart objects need to be kept alive and not garbage collected during the
# HTTP which uses them. We hold references to these QHttpMultiPart objects here.
self._kept_alive_multiparts = {} # type: Dict[QNetworkReply, QHttpMultiPart]
## Creates a network manager with all the required properties and event bindings.
def _createNetworkManager(self) -> None:
if self._manager:
self._manager.finished.disconnect(self.__handleOnFinished)
self._manager.authenticationRequired.disconnect(self._onAuthenticationRequired)
self._manager = QNetworkAccessManager()
self._manager.finished.connect(self.__handleOnFinished)
self._last_manager_create_time = time()
self._manager.authenticationRequired.connect(self._onAuthenticationRequired)
## Create a new empty network request.
# Automatically adds the required HTTP headers.
def _createEmptyRequest(self, url: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
request = QNetworkRequest(QUrl(url))
if content_type:
request.setHeader(QNetworkRequest.ContentTypeHeader, content_type)
request.setHeader(QNetworkRequest.UserAgentHeader, self._user_agent)
return request
## Executes the correct callback method when a network request finishes.
def __handleOnFinished(self, reply: QNetworkReply) -> None:
# Due to garbage collection, we need to cache certain bits of post operations.
# As we don't want to keep them around forever, delete them if we get a reply.
if reply.operation() == QNetworkAccessManager.PostOperation:
self._clearCachedMultiPart(reply)
# No status code means it never even reached remote.
if reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) is None:
return
# Not used by this class itself, but children might need it for better network handling.
# An example of this is the _update method in the NetworkedPrinterOutputDevice.
self._last_response_time = time()
# Find the right callback and execute it.
# It always takes the full reply as single parameter.
callback_key = reply.url().toString() + str(reply.operation())
if callback_key in self._on_finished_callbacks:
self._on_finished_callbacks[callback_key](reply)
## Removes all cached Multi-Part items.
def _clearCachedMultiPart(self, reply: QNetworkReply) -> None:
if reply in self._kept_alive_multiparts:
del self._kept_alive_multiparts[reply]
## Makes sure the network manager is created.
def _validateManager(self) -> None:
if self._manager is None:
self._createNetworkManager()
assert (self._manager is not None)
## Callback for when the network manager detects that authentication is required but was not given.
@staticmethod
def _onAuthenticationRequired(reply: QNetworkReply, authenticator: QAuthenticator) -> None:
Logger.log("w", "Request to {} required authentication but was not given".format(reply.url().toString()))
## Register a method to be executed when the associated network request finishes.
def _registerOnFinishedCallback(self, reply: QNetworkReply,
on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
if on_finished is not None:
self._on_finished_callbacks[reply.url().toString() + str(reply.operation())] = on_finished
## Add a part to a Multi-Part form.
@staticmethod
def _createFormPart(content_header: str, data: bytes, content_type: Optional[str] = None) -> QHttpPart:
part = QHttpPart()
if not content_header.startswith("form-data;"):
content_header = "form_data; " + content_header
part.setHeader(QNetworkRequest.ContentDispositionHeader, content_header)
if content_type is not None:
part.setHeader(QNetworkRequest.ContentTypeHeader, content_type)
part.setBody(data)
return part
## Public version of _createFormPart. Both are needed for backward compatibility with 3rd party plugins.
def createFormPart(self, content_header: str, data: bytes, content_type: Optional[str] = None) -> QHttpPart:
return self._createFormPart(content_header, data, content_type)
## Does a PUT request to the given URL.
def put(self, url: str, data: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
self._validateManager()
request = self._createEmptyRequest(url)
self._last_request_time = time()
if not self._manager:
Logger.log("e", "No network manager was created to execute the PUT call with.")
return
reply = self._manager.put(request, data.encode())
self._registerOnFinishedCallback(reply, on_finished)
## Does a DELETE request to the given URL.
def delete(self, url: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
self._validateManager()
request = self._createEmptyRequest(url)
self._last_request_time = time()
if not self._manager:
Logger.log("e", "No network manager was created to execute the DELETE call with.")
return
reply = self._manager.deleteResource(request)
self._registerOnFinishedCallback(reply, on_finished)
## Does a GET request to the given URL.
def get(self, url: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
self._validateManager()
request = self._createEmptyRequest(url)
self._last_request_time = time()
if not self._manager:
Logger.log("e", "No network manager was created to execute the GET call with.")
return
reply = self._manager.get(request)
self._registerOnFinishedCallback(reply, on_finished)
## Does a POST request to the given URL.
def post(self, url: str, data: str, on_finished: Optional[Callable[[QNetworkReply], None]],
on_progress: Callable = None) -> None:
self._validateManager()
request = self._createEmptyRequest(url)
self._last_request_time = time()
if not self._manager:
Logger.log("e", "No network manager was created to execute the GET call with.")
return
reply = self._manager.post(request, data.encode())
if on_progress is not None:
reply.uploadProgress.connect(on_progress)
self._registerOnFinishedCallback(reply, on_finished)
## Does a POST request with form data to the given URL.
def postForm(self, url: str, header_data: str, body_data: bytes,
on_finished: Optional[Callable[[QNetworkReply], None]],
on_progress: Callable = None) -> None:
post_part = QHttpPart()
post_part.setHeader(QNetworkRequest.ContentDispositionHeader, header_data)
post_part.setBody(body_data)
self.postFormWithParts(url, [post_part], on_finished, on_progress)
## Does a POST request with form parts to the given URL.
def postFormWithParts(self, target: str, parts: List[QHttpPart],
on_finished: Optional[Callable[[QNetworkReply], None]],
on_progress: Callable = None) -> None:
self._validateManager()
request = self._createEmptyRequest(target, content_type = None)
multi_post_part = QHttpMultiPart(QHttpMultiPart.FormDataType)
for part in parts:
multi_post_part.append(part)
self._last_request_time = time()
if not self._manager:
Logger.log("e", "No network manager was created to execute the POST call with.")
return
reply = self._manager.post(request, multi_post_part)
self._kept_alive_multiparts[reply] = multi_post_part
if on_progress is not None:
reply.uploadProgress.connect(on_progress)
self._registerOnFinishedCallback(reply, on_finished)

View file

@ -5,6 +5,7 @@ from UM.FileHandler.FileHandler import FileHandler #For typing.
from UM.Logger import Logger from UM.Logger import Logger
from UM.Scene.SceneNode import SceneNode #For typing. from UM.Scene.SceneNode import SceneNode #For typing.
from cura.CuraApplication import CuraApplication from cura.CuraApplication import CuraApplication
from cura.NetworkClient import NetworkClient
from cura.PrinterOutputDevice import PrinterOutputDevice, ConnectionState from cura.PrinterOutputDevice import PrinterOutputDevice, ConnectionState
@ -26,35 +27,29 @@ class AuthState(IntEnum):
AuthenticationReceived = 5 AuthenticationReceived = 5
class NetworkedPrinterOutputDevice(PrinterOutputDevice): class NetworkedPrinterOutputDevice(PrinterOutputDevice, NetworkClient):
authenticationStateChanged = pyqtSignal() authenticationStateChanged = pyqtSignal()
def __init__(self, device_id, address: str, properties: Dict[bytes, bytes], parent: QObject = None) -> None: def __init__(self, device_id, address: str, properties: Dict[bytes, bytes], parent: QObject = None) -> None:
super().__init__(device_id = device_id, parent = parent) PrinterOutputDevice.__init__(self, device_id = device_id, parent = parent)
self._manager = None # type: Optional[QNetworkAccessManager] NetworkClient.__init__(self)
self._last_manager_create_time = None # type: Optional[float]
self._recreate_network_manager_time = 30
self._timeout_time = 10 # After how many seconds of no response should a timeout occur?
self._last_response_time = None # type: Optional[float]
self._last_request_time = None # type: Optional[float]
self._api_prefix = "" self._api_prefix = ""
self._address = address self._address = address
self._properties = properties self._properties = properties
self._user_agent = "%s/%s " % (CuraApplication.getInstance().getApplicationName(), CuraApplication.getInstance().getVersion())
self._onFinishedCallbacks = {} # type: Dict[str, Callable[[QNetworkReply], None]]
self._authentication_state = AuthState.NotAuthenticated self._authentication_state = AuthState.NotAuthenticated
# QHttpMultiPart objects need to be kept alive and not garbage collected during the
# HTTP which uses them. We hold references to these QHttpMultiPart objects here.
self._kept_alive_multiparts = {} # type: Dict[QNetworkReply, QHttpMultiPart]
self._sending_gcode = False self._sending_gcode = False
self._compressing_gcode = False self._compressing_gcode = False
self._gcode = [] # type: List[str] self._gcode = [] # type: List[str]
self._connection_state_before_timeout = None # type: Optional[ConnectionState] self._connection_state_before_timeout = None # type: Optional[ConnectionState]
self._timeout_time = 10 # After how many seconds of no response should a timeout occur?
self._recreate_network_manager_time = 30
## Override creating empty request to compile the full URL.
# Needed to keep NetworkedPrinterOutputDevice backwards compatible after refactoring NetworkClient out of it.
def _createEmptyRequest(self, target: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
return super()._createEmptyRequest("http://" + self._address + self._api_prefix + target, content_type)
def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, **kwargs: str) -> None: def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
raise NotImplementedError("requestWrite needs to be implemented") raise NotImplementedError("requestWrite needs to be implemented")
@ -140,30 +135,6 @@ class NetworkedPrinterOutputDevice(PrinterOutputDevice):
self.setConnectionState(self._connection_state_before_timeout) self.setConnectionState(self._connection_state_before_timeout)
self._connection_state_before_timeout = None self._connection_state_before_timeout = None
def _createEmptyRequest(self, target: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
url = QUrl("http://" + self._address + self._api_prefix + target)
request = QNetworkRequest(url)
if content_type is not None:
request.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
request.setHeader(QNetworkRequest.UserAgentHeader, self._user_agent)
return request
def createFormPart(self, content_header: str, data: bytes, content_type: Optional[str] = None) -> QHttpPart:
return self._createFormPart(content_header, data, content_type)
def _createFormPart(self, content_header: str, data: bytes, content_type: Optional[str] = None) -> QHttpPart:
part = QHttpPart()
if not content_header.startswith("form-data;"):
content_header = "form_data; " + content_header
part.setHeader(QNetworkRequest.ContentDispositionHeader, content_header)
if content_type is not None:
part.setHeader(QNetworkRequest.ContentTypeHeader, content_type)
part.setBody(data)
return part
## Convenience function to get the username from the OS. ## Convenience function to get the username from the OS.
# The code was copied from the getpass module, as we try to use as little dependencies as possible. # The code was copied from the getpass module, as we try to use as little dependencies as possible.
def _getUserName(self) -> str: def _getUserName(self) -> str:
@ -173,130 +144,7 @@ class NetworkedPrinterOutputDevice(PrinterOutputDevice):
return user return user
return "Unknown User" # Couldn't find out username. return "Unknown User" # Couldn't find out username.
def _clearCachedMultiPart(self, reply: QNetworkReply) -> None: @pyqtSlot(str, result = str)
if reply in self._kept_alive_multiparts:
del self._kept_alive_multiparts[reply]
def _validateManager(self) -> None:
if self._manager is None:
self._createNetworkManager()
assert (self._manager is not None)
def put(self, target: str, data: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
self._validateManager()
request = self._createEmptyRequest(target)
self._last_request_time = time()
if self._manager is not None:
reply = self._manager.put(request, data.encode())
self._registerOnFinishedCallback(reply, on_finished)
else:
Logger.log("e", "Could not find manager.")
def delete(self, target: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
self._validateManager()
request = self._createEmptyRequest(target)
self._last_request_time = time()
if self._manager is not None:
reply = self._manager.deleteResource(request)
self._registerOnFinishedCallback(reply, on_finished)
else:
Logger.log("e", "Could not find manager.")
def get(self, target: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
self._validateManager()
request = self._createEmptyRequest(target)
self._last_request_time = time()
if self._manager is not None:
reply = self._manager.get(request)
self._registerOnFinishedCallback(reply, on_finished)
else:
Logger.log("e", "Could not find manager.")
def post(self, target: str, data: str, on_finished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> None:
self._validateManager()
request = self._createEmptyRequest(target)
self._last_request_time = time()
if self._manager is not None:
reply = self._manager.post(request, data.encode())
if on_progress is not None:
reply.uploadProgress.connect(on_progress)
self._registerOnFinishedCallback(reply, on_finished)
else:
Logger.log("e", "Could not find manager.")
def postFormWithParts(self, target: str, parts: List[QHttpPart], on_finished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> QNetworkReply:
self._validateManager()
request = self._createEmptyRequest(target, content_type=None)
multi_post_part = QHttpMultiPart(QHttpMultiPart.FormDataType)
for part in parts:
multi_post_part.append(part)
self._last_request_time = time()
if self._manager is not None:
reply = self._manager.post(request, multi_post_part)
self._kept_alive_multiparts[reply] = multi_post_part
if on_progress is not None:
reply.uploadProgress.connect(on_progress)
self._registerOnFinishedCallback(reply, on_finished)
return reply
else:
Logger.log("e", "Could not find manager.")
def postForm(self, target: str, header_data: str, body_data: bytes, on_finished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> None:
post_part = QHttpPart()
post_part.setHeader(QNetworkRequest.ContentDispositionHeader, header_data)
post_part.setBody(body_data)
self.postFormWithParts(target, [post_part], on_finished, on_progress)
def _onAuthenticationRequired(self, reply: QNetworkReply, authenticator: QAuthenticator) -> None:
Logger.log("w", "Request to {url} required authentication, which was not implemented".format(url = reply.url().toString()))
def _createNetworkManager(self) -> None:
Logger.log("d", "Creating network manager")
if self._manager:
self._manager.finished.disconnect(self.__handleOnFinished)
self._manager.authenticationRequired.disconnect(self._onAuthenticationRequired)
self._manager = QNetworkAccessManager()
self._manager.finished.connect(self.__handleOnFinished)
self._last_manager_create_time = time()
self._manager.authenticationRequired.connect(self._onAuthenticationRequired)
if self._properties.get(b"temporary", b"false") != b"true":
CuraApplication.getInstance().getMachineManager().checkCorrectGroupName(self.getId(), self.name)
def _registerOnFinishedCallback(self, reply: QNetworkReply, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
if on_finished is not None:
self._onFinishedCallbacks[reply.url().toString() + str(reply.operation())] = on_finished
def __handleOnFinished(self, reply: QNetworkReply) -> None:
# Due to garbage collection, we need to cache certain bits of post operations.
# As we don't want to keep them around forever, delete them if we get a reply.
if reply.operation() == QNetworkAccessManager.PostOperation:
self._clearCachedMultiPart(reply)
if reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) is None:
# No status code means it never even reached remote.
return
self._last_response_time = time()
if self._connection_state == ConnectionState.connecting:
self.setConnectionState(ConnectionState.connected)
callback_key = reply.url().toString() + str(reply.operation())
try:
if callback_key in self._onFinishedCallbacks:
self._onFinishedCallbacks[callback_key](reply)
except Exception:
Logger.logException("w", "something went wrong with callback")
@pyqtSlot(str, result=str)
def getProperty(self, key: str) -> str: def getProperty(self, key: str) -> str:
bytes_key = key.encode("utf-8") bytes_key = key.encode("utf-8")
if bytes_key in self._properties: if bytes_key in self._properties:
@ -332,7 +180,14 @@ class NetworkedPrinterOutputDevice(PrinterOutputDevice):
def printerType(self) -> str: def printerType(self) -> str:
return self._properties.get(b"printer_type", b"Unknown").decode("utf-8") return self._properties.get(b"printer_type", b"Unknown").decode("utf-8")
## IP adress of this printer ## IP address of this printer
@pyqtProperty(str, constant = True) @pyqtProperty(str, constant = True)
def ipAddress(self) -> str: def ipAddress(self) -> str:
return self._address return self._address
def __handleOnFinished(self, reply: QNetworkReply) -> None:
super().__handleOnFinished(reply)
# Since we got a reply from the network manager we can now be sure we are actually connected.
if self._connection_state == ConnectionState.connecting:
self.setConnectionState(ConnectionState.connected)

View file

@ -3,10 +3,10 @@
import json import json
from typing import TYPE_CHECKING, Dict, Optional from typing import TYPE_CHECKING, Dict, Optional
from PyQt5.QtCore import QUrl from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from UM.Logger import Logger from UM.Logger import Logger
from cura.NetworkClient import NetworkClient
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputDevice import CloudOutputDevice from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputDevice import CloudOutputDevice
@ -21,20 +21,17 @@ if TYPE_CHECKING:
# #
# TODO: figure out how to pair remote clusters, local networked clusters and local cura printer presets. # TODO: figure out how to pair remote clusters, local networked clusters and local cura printer presets.
# TODO: for now we just have multiple output devices if the cluster is available both locally and remote. # TODO: for now we just have multiple output devices if the cluster is available both locally and remote.
class CloudOutputDeviceManager: class CloudOutputDeviceManager(NetworkClient):
# The cloud URL to use for remote clusters. # The cloud URL to use for remote clusters.
API_ROOT_PATH = "https://api.ultimaker.com/connect/v1" API_ROOT_PATH = "https://api.ultimaker.com/connect/v1"
def __init__(self, application: "CuraApplication"): def __init__(self, application: "CuraApplication"):
self._application = application super().__init__(application)
self._output_device_manager = application.getOutputDeviceManager() self._output_device_manager = application.getOutputDeviceManager()
self._account = application.getCuraAPI().account self._account = application.getCuraAPI().account
# Network manager for getting the cluster list.
self._network_manager = QNetworkAccessManager()
self._network_manager.finished.connect(self._onNetworkRequestFinished)
# Persistent dict containing the remote clusters for the authenticated user. # Persistent dict containing the remote clusters for the authenticated user.
self._remote_clusters = {} # type: Dict[str, CloudOutputDevice] self._remote_clusters = {} # type: Dict[str, CloudOutputDevice]
@ -44,27 +41,24 @@ class CloudOutputDeviceManager:
# Fetch all remote clusters for the authenticated user. # Fetch all remote clusters for the authenticated user.
# TODO: update remote clusters periodically # TODO: update remote clusters periodically
self._account.loginStateChanged.connect(self._getRemoteClusters) self._account.loginStateChanged.connect(self._getRemoteClusters)
## Override _createEmptyRequest to add the needed authentication header for talking to the Ultimaker Cloud API.
def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
request = super()._createEmptyRequest(self.API_ROOT_PATH + path, content_type = content_type)
if self._account.isLoggedIn:
# TODO: add correct scopes to OAuth2 client to use remote connect API.
# TODO: don't create the client when not signed in?
request.setRawHeader(b"Authorization", "Bearer {}".format(self._account.accessToken).encode())
return request
## Gets all remote clusters from the API. ## Gets all remote clusters from the API.
def _getRemoteClusters(self): def _getRemoteClusters(self) -> None:
url = QUrl("{}/clusters".format(self.API_ROOT_PATH)) self.get("/clusters", on_finished = self._onGetRemoteClustersFinished)
request = QNetworkRequest(url)
request.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
if not self._account.isLoggedIn:
# TODO: show message to user to sign in
Logger.log("w", "User is not signed in, cannot get remote print clusters")
return
request.setRawHeader(b"Authorization", "Bearer {}".format(self._account.accessToken).encode())
self._network_manager.get(request)
## Callback for network requests. ## Callback for when the request for getting the clusters. is finished.
def _onNetworkRequestFinished(self, reply: QNetworkReply): def _onGetRemoteClustersFinished(self, reply: QNetworkReply) -> None:
# TODO: right now we assume that each reply is from /clusters, we should fix this
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
if status_code != 200: if status_code != 200:
# TODO: add correct scopes to OAuth2 client to use remote connect API.
Logger.log("w", "Got unexpected response while trying to get cloud cluster data: {}, {}" Logger.log("w", "Got unexpected response while trying to get cloud cluster data: {}, {}"
.format(status_code, reply.readAll())) .format(status_code, reply.readAll()))
return return
@ -86,7 +80,6 @@ class CloudOutputDeviceManager:
def _parseStatusResponse(reply: QNetworkReply) -> Optional[dict]: def _parseStatusResponse(reply: QNetworkReply) -> Optional[dict]:
try: try:
result = json.loads(bytes(reply.readAll()).decode("utf-8")) result = json.loads(bytes(reply.readAll()).decode("utf-8"))
print("result=====", result)
# TODO: use model or named tuple here. # TODO: use model or named tuple here.
return result.data return result.data
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError: