STAR-322: First tests for cloud output device

This commit is contained in:
Daniel Schiavini 2018-12-10 14:43:02 +01:00
parent e5124532f8
commit d482924ea2
8 changed files with 191 additions and 138 deletions

View file

@ -36,6 +36,11 @@ class CloudApiClient(NetworkClient):
self._account = account
self._on_error = on_error
## Gets the account used for the API.
@property
def account(self) -> Account:
return self._account
## Retrieves all the clusters for the user that is currently logged in.
# \param on_finished: The function to be called after the result is parsed.
def getClusters(self, on_finished: Callable[[List[CloudCluster]], any]) -> None:
@ -46,7 +51,7 @@ class CloudApiClient(NetworkClient):
# \param cluster_id: The ID of the cluster.
# \param on_finished: The function to be called after the result is parsed.
def getClusterStatus(self, cluster_id: str, on_finished: Callable[[CloudClusterStatus], any]) -> None:
url = "{}/cluster/{}/status".format(self.CLUSTER_API_ROOT, cluster_id)
url = "{}/clusters/{}/status".format(self.CLUSTER_API_ROOT, cluster_id)
self.get(url, on_finished=self._wrapCallback(on_finished, CloudClusterStatus))
## Requests the cloud to register the upload of a print job mesh.

View file

@ -13,7 +13,6 @@ from UM.Logger import Logger
from UM.Message import Message
from UM.Qt.Duration import Duration, DurationFormat
from UM.Scene.SceneNode import SceneNode
from cura.CuraApplication import CuraApplication
from cura.PrinterOutput.NetworkedPrinterOutputDevice import AuthState, NetworkedPrinterOutputDevice
from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputController import CloudOutputController
@ -93,7 +92,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self._setInterfaceElements()
self._device_id = device_id
self._account = CuraApplication.getInstance().getCuraAPI().account
self._account = api_client.account
# We use the Cura Connect monitor tab to get most functionality right away.
self._monitor_view_qml_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
@ -174,10 +173,6 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
)
self._api.requestUpload(request, lambda response: self._onPrintJobCreated(mesh_bytes, response))
## Called when the connection to the cluster changes.
def connect(self) -> None:
super().connect()
## Called when the network data should be updated.
def _update(self) -> None:
super()._update()

View file

@ -1,8 +1,5 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import List, Optional
from cura.PrinterOutput.ConfigurationModel import ConfigurationModel
from cura.PrinterOutput.ExtruderConfigurationModel import ExtruderConfigurationModel
from cura.PrinterOutput.ExtruderOutputModel import ExtruderOutputModel
from .CloudClusterPrinterConfigurationMaterial import CloudClusterPrinterConfigurationMaterial

View file

@ -1,6 +1,5 @@
{
"data": [
{
"data": {
"generated_time": "2018-12-10T08:23:55.110Z",
"printers": [
{
@ -93,5 +92,4 @@
}
]
}
]
}

View file

@ -1,8 +1,7 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import json
import os
from typing import Dict, Tuple, Optional
from typing import Dict, Tuple, Union
from unittest.mock import MagicMock
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply
@ -52,38 +51,15 @@ class NetworkManagerMock:
# \param url: The URL being requested.
# \param status_code: The HTTP status code for the response.
# \param response: The response body from the server (generally json-encoded).
def prepareReply(self, method: str, url: str, status_code: int, response: bytes) -> None:
def prepareReply(self, method: str, url: str, status_code: int, response: Union[bytes, dict]) -> None:
reply_mock = MagicMock()
reply_mock.url().toString.return_value = url
reply_mock.operation.return_value = self._OPERATIONS[method]
reply_mock.attribute.return_value = status_code
reply_mock.readAll.return_value = response
reply_mock.readAll.return_value = response if isinstance(response, bytes) else json.dumps(response).encode()
self.replies[method, url] = reply_mock
Logger.log("i", "Prepared mock {}-response to {} {}", status_code, method, url)
## Prepares a reply for the API call to get clusters.
# \param data: The data the server should return. If not given, a default response will be used.
# \return The data in the response.
def prepareGetClusters(self, data: Optional[dict] = None) -> dict:
data, response = self._getResponseData("clusters", data)
status_code = 200 if "data" in data else int(data["errors"][0]["http_status"])
self.prepareReply("GET", "https://api-staging.ultimaker.com/connect/v1/clusters", status_code, response)
return data
## Gets the data that should be in the server's response in both dictionary and JSON-encoded bytes format.
# \param fixture_name: The name of the fixture.
# \param data: The data that should be returned (optional)
# \return The server's response in both dictionary and JSON-encoded bytes format.
@staticmethod
def _getResponseData(fixture_name: str, data: Optional[dict] = None) -> Tuple[dict, bytes]:
if data is None:
with open("{}/Fixtures/{}.json".format(os.path.dirname(__file__), fixture_name), "rb") as f:
response = f.read()
data = json.loads(response.decode())
else:
response = json.dumps(data).encode()
return data, response
## Emits the signal that the reply is ready to all prepared replies.
def flushReplies(self):
for reply in self.replies.values():

View file

@ -0,0 +1,73 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import json
import os
from unittest import TestCase
from unittest.mock import patch, MagicMock
from cura.CuraApplication import CuraApplication
from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
from src.Cloud.CloudApiClient import CloudApiClient
from src.Cloud.CloudOutputController import CloudOutputController
from src.Cloud.CloudOutputDevice import CloudOutputDevice
from .NetworkManagerMock import NetworkManagerMock
@patch("cura.NetworkClient.QNetworkAccessManager")
class TestCloudOutputDevice(TestCase):
CLUSTER_ID = "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq"
HOST_NAME = "ultimakersystem-ccbdd30044ec"
URL = "https://api-staging.ultimaker.com/connect/v1/clusters/{}/status".format(CLUSTER_ID)
with open("{}/Fixtures/getClusterStatusResponse.json".format(os.path.dirname(__file__)), "rb") as f:
DEFAULT_RESPONSE = f.read()
def setUp(self):
super().setUp()
self.app = CuraApplication.getInstance()
self.network = NetworkManagerMock()
self.account = MagicMock(isLoggedIn=True, accessToken="TestAccessToken")
self.onError = MagicMock()
self.device = CloudOutputDevice(CloudApiClient(self.account, self.onError), self.CLUSTER_ID, self.HOST_NAME)
self.cluster_status = json.loads(self.DEFAULT_RESPONSE.decode())
self.network.prepareReply("GET", self.URL, 200, self.DEFAULT_RESPONSE)
def tearDown(self):
try:
self._beforeTearDown()
finally:
super().tearDown()
## Before tear down method we check whether the state of the output device manager is what we expect based on the
# mocked API response.
def _beforeTearDown(self):
# let the network send replies
self.network.flushReplies()
# TODO
def test_status(self, network_mock):
network_mock.return_value = self.network
self.device._update()
self.network.flushReplies()
self.assertEqual([PrinterOutputModel, PrinterOutputModel], [type(printer) for printer in self.device.printers])
controller_fields = {
"_output_device": self.device,
"can_abort": False,
"can_control_manually": False,
"can_pause": False,
"can_pre_heat_bed": False,
"can_pre_heat_hotends": False,
"can_send_raw_gcode": False,
"can_update_firmware": False,
}
self.assertEqual({printer["uuid"] for printer in self.cluster_status["data"]["printers"]},
{printer.key for printer in self.device.printers})
self.assertEqual([controller_fields, controller_fields],
[printer.getController().__dict__ for printer in self.device.printers])
self.assertEqual({job["uuid"] for job in self.cluster_status["data"]["print_jobs"]},
{job.key for job in self.device.printJobs})
self.assertEqual(["Daniel Testing"], [job.owner for job in self.device.printJobs])
self.assertEqual(["UM3_dragon"], [job.name for job in self.device.printJobs])

View file

@ -1,5 +1,7 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import json
import os
from unittest import TestCase
from unittest.mock import patch
@ -11,13 +13,17 @@ from .NetworkManagerMock import NetworkManagerMock
@patch("cura.NetworkClient.QNetworkAccessManager")
class TestCloudOutputDeviceManager(TestCase):
URL = "https://api-staging.ultimaker.com/connect/v1/clusters"
with open("{}/Fixtures/clusters.json".format(os.path.dirname(__file__)), "rb") as f:
DEFAULT_RESPONSE = f.read()
def setUp(self):
super().setUp()
self.app = CuraApplication.getInstance()
self.network = NetworkManagerMock()
self.manager = CloudOutputDeviceManager()
self.clusters_response = self.network.prepareGetClusters()
self.clusters_response = json.loads(self.DEFAULT_RESPONSE.decode())
self.network.prepareReply("GET", self.URL, 200, self.DEFAULT_RESPONSE)
def tearDown(self):
try:
@ -58,7 +64,7 @@ class TestCloudOutputDeviceManager(TestCase):
# update the cluster from member variable, which is checked at tearDown
self.clusters_response["data"][0]["host_name"] = "New host name"
self.network.prepareGetClusters(self.clusters_response)
self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
self.manager._update_timer.timeout.emit()
@ -67,7 +73,7 @@ class TestCloudOutputDeviceManager(TestCase):
# delete the cluster from member variable, which is checked at tearDown
del self.clusters_response["data"][1]
self.network.prepareGetClusters(self.clusters_response)
self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
self.manager._update_timer.timeout.emit()
@ -104,7 +110,7 @@ class TestCloudOutputDeviceManager(TestCase):
@patch("UM.Message.Message.show")
def test_api_error(self, message_mock, network_mock):
self.clusters_response = {"errors": [{"id": "notFound", "title": "Not found!", "http_status": "404"}]}
self.network.prepareGetClusters(self.clusters_response)
self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
self._loadData(network_mock)
self.network.flushReplies()
message_mock.assert_called_once_with()

View file

@ -1,13 +1,11 @@
# Copyright (c) 2018 Ultimaker B.V.
# Uranium is released under the terms of the LGPLv3 or higher.
# Cura is released under the terms of the LGPLv3 or higher.
import pytest
import Arcus #Prevents error: "PyCapsule_GetPointer called with incorrect name" with conflicting SIP configurations between Arcus and PyQt: Import Arcus first!
from UM.Qt.QtApplication import QtApplication # QT application import is required, even though it isn't used.
from UM.Application import Application
from UM.Signal import Signal
from cura.CuraApplication import CuraApplication
from cura.Machines.MaterialManager import MaterialManager
# This mock application must extend from Application and not QtApplication otherwise some QObjects are created and
@ -18,6 +16,11 @@ class FixtureApplication(CuraApplication):
super().initialize()
Signal._signalQueue = self
self.getPreferences().addPreference("cura/favorite_materials", "")
self._material_manager = MaterialManager(self._container_registry, parent = self)
self._material_manager.initialize()
def functionEvent(self, event):
event.call()