Cura/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDeviceManager.py

111 lines
5.2 KiB
Python

# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import json
from typing import Dict, Optional
from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
from UM.Logger import Logger
from cura.CuraApplication import CuraApplication
from cura.NetworkClient import NetworkClient
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputDevice import CloudOutputDevice
from .Models import Cluster
## The cloud output device manager is responsible for using the Ultimaker Cloud APIs to manage remote clusters.
# Keeping all cloud related logic in this class instead of the UM3OutputDevicePlugin results in more readable code.
#
# API spec is available on https://api.ultimaker.com/docs/connect/spec/.
#
# TODO: figure out how to pair remote clusters, local networked clusters and local cura printer presets.
# TODO: for now we just have multiple output devices if the cluster is available both locally and remote.
class CloudOutputDeviceManager(NetworkClient):
# The cloud URL to use for remote clusters.
API_ROOT_PATH = "https://api.ultimaker.com/connect/v1"
def __init__(self):
super().__init__()
# Persistent dict containing the remote clusters for the authenticated user.
self._remote_clusters = {} # type: Dict[str, CloudOutputDevice]
application = CuraApplication.getInstance()
self._output_device_manager = application.getOutputDeviceManager()
self._account = application.getCuraAPI().account
# When switching machines we check if we have to activate a remote cluster.
application.globalContainerStackChanged.connect(self._activeMachineChanged)
# Fetch all remote clusters for the authenticated user.
# TODO: update remote clusters periodically
self._account.loginStateChanged.connect(self._getRemoteClusters)
## Override _createEmptyRequest to add the needed authentication header for talking to the Ultimaker Cloud API.
def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
request = super()._createEmptyRequest(self.API_ROOT_PATH + path, content_type = content_type)
if self._account.isLoggedIn:
# TODO: add correct scopes to OAuth2 client to use remote connect API.
# TODO: don't create the client when not signed in?
request.setRawHeader(b"Authorization", "Bearer {}".format(self._account.accessToken).encode())
return request
## Gets all remote clusters from the API.
def _getRemoteClusters(self) -> None:
self.get("/clusters", on_finished = self._onGetRemoteClustersFinished)
## Callback for when the request for getting the clusters. is finished.
def _onGetRemoteClustersFinished(self, reply: QNetworkReply) -> None:
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
if status_code != 200:
Logger.log("w", "Got unexpected response while trying to get cloud cluster data: {}, {}"
.format(status_code, reply.readAll()))
return
# Parse the response (returns the "data" field from the body).
clusters = self._parseStatusResponse(reply)
if not clusters:
return
# Add an output device for each remote cluster.
# The clusters are an array of objects in a field called "data".
for cluster in clusters:
self._addCloudOutputDevice(cluster)
# # For testing we add a dummy device:
# self._addCloudOutputDevice({ "cluster_id": "LJ0tciiuZZjarrXAvFLEZ6ox4Cvx8FvtXUlQv4vIhV6w" })
@staticmethod
def _parseStatusResponse(reply: QNetworkReply) -> Optional[Cluster]:
try:
return [Cluster(**c) for c in json.loads(reply.readAll().data().decode("utf-8"))]
except json.decoder.JSONDecodeError:
Logger.logException("w", "Unable to decode JSON from reply.")
return None
except UnicodeDecodeError:
Logger.log("e", "Unable to read server response")
except json.JSONDecodeError:
Logger.logException("w", "Unable to decode JSON from reply.")
return None
## Adds a CloudOutputDevice for each entry in the remote cluster list from the API.
def _addCloudOutputDevice(self, cluster: Cluster):
print("cluster_data====", cluster)
device = CloudOutputDevice(cluster["cluster_id"])
self._output_device_manager.addOutputDevice(device)
self._remote_clusters[cluster["cluster_id"]] = device
## Callback for when the active machine was changed by the user.
def _activeMachineChanged(self):
active_machine = CuraApplication.getInstance().getGlobalContainerStack()
if not active_machine:
return
stored_cluster_id = active_machine.getMetaDataEntry("um_cloud_cluster_id")
if stored_cluster_id not in self._remote_clusters.keys():
# Currently authenticated user does not have access to stored cluster or no user is signed in.
return
# We found the active machine as remote cluster so let's connect to it.
self._remote_clusters.get(stored_cluster_id).connect()