diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/clusters.json b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/clusters.json new file mode 100644 index 0000000000..79a4c30113 --- /dev/null +++ b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/clusters.json @@ -0,0 +1,17 @@ +{ + "data": [{ + "cluster_id": "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq", + "host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050", + "host_name": "ultimakersystem-ccbdd30044ec", + "host_version": "5.0.0.20170101", + "is_online": true, + "status": "active" + }, { + "cluster_id": "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8", + "host_guid": "e0ace90a-91ee-1257-4403-e8050a44c9b7", + "host_name": "ultimakersystem-ccbdd30044ec", + "host_version": "5.1.2.20180807", + "is_online": true, + "status": "active" + }] +} diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py b/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py index 25107971c0..fc84569fa4 100644 --- a/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py +++ b/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py @@ -1,11 +1,13 @@ # Copyright (c) 2018 Ultimaker B.V. # Cura is released under the terms of the LGPLv3 or higher. import json -from typing import Dict, Tuple +import os +from typing import Dict, Tuple, Optional from unittest.mock import MagicMock from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply +from UM.Logger import Logger from UM.Signal import Signal @@ -26,6 +28,7 @@ class NetworkManagerMock: "HEAD": QNetworkAccessManager.HeadOperation, } + ## Initializes the network manager mock. def __init__(self): # a dict with the prepared replies, using the format {(http_method, url): reply} self.replies = {} # type: Dict[Tuple[str, str], QNetworkReply] @@ -48,33 +51,37 @@ class NetworkManagerMock: # \param method: The HTTP method. # \param url: The URL being requested. # \param status_code: The HTTP status code for the response. - # \param response: A dictionary with the response from the server (this is converted to JSON). - def prepareReply(self, method: str, url: str, status_code: int, response: dict) -> None: + # \param response: The response body from the server (generally json-encoded). + def prepareReply(self, method: str, url: str, status_code: int, response: bytes) -> None: reply_mock = MagicMock() reply_mock.url().toString.return_value = url reply_mock.operation.return_value = self._OPERATIONS[method] reply_mock.attribute.return_value = status_code - reply_mock.readAll.return_value = json.dumps(response).encode() + reply_mock.readAll.return_value = response self.replies[method, url] = reply_mock + Logger.log("i", "Prepared mock {}-response to {} {}", status_code, method, url) ## Prepares a reply for the API call to get clusters. - def prepareGetClusters(self) -> None: - self.prepareReply( - "GET", "https://api-staging.ultimaker.com/connect/v1/clusters", - 200, { - "data": [{ - "cluster_id": "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq", - "host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050", - "host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807", - "is_online": False, "status": "inactive" - }, { - "cluster_id": "R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC", - "host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050", - "host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807", - "is_online": True, "status": "active" - }] - } - ) + # \param data: The data the server should return. If not given, a default response will be used. + # \return The data in the response. + def prepareGetClusters(self, data: Optional[dict] = None) -> dict: + data, response = self._getResponseData("clusters", data) + self.prepareReply("GET", "https://api-staging.ultimaker.com/connect/v1/clusters", 200, response) + return data + + ## Gets the data that should be in the server's response in both dictionary and JSON-encoded bytes format. + # \param fixture_name: The name of the fixture. + # \param data: The data that should be returned (optional) + # \return The server's response in both dictionary and JSON-encoded bytes format. + @staticmethod + def _getResponseData(fixture_name: str, data: Optional[dict] = None) -> Tuple[dict, bytes]: + if data is None: + with open("{}/Fixtures/{}.json".format(os.path.dirname(__file__), fixture_name), "rb") as f: + response = f.read() + data = json.loads(response.decode()) + else: + response = json.dumps(data).encode() + return data, response ## Emits the signal that the reply is ready to all prepared replies. def flushReplies(self): diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDeviceManager.py b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDeviceManager.py index b22308ac1e..a043288e59 100644 --- a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDeviceManager.py +++ b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDeviceManager.py @@ -11,28 +11,56 @@ from plugins.UM3NetworkPrinting.tests.Cloud.NetworkManagerMock import NetworkMan @patch("cura.NetworkClient.QNetworkAccessManager") class TestCloudOutputDeviceManager(TestCase): - app = CuraApplication.getInstance() or CuraApplication() def setUp(self): super().setUp() - self.app.initialize() + self.app = CuraApplication.getInstance() + if not self.app: + self.app = CuraApplication() + self.app.initialize() self.network = NetworkManagerMock() - self.network.prepareGetClusters() + self.manager = CloudOutputDeviceManager() + self.clusters_response = self.network.prepareGetClusters() + ## In the tear down method we check whether the state of the output device manager is what we expect based on the + # mocked API response. def tearDown(self): super().tearDown() - - def test_device(self, network_mock): - network_mock.return_value = self.network - - manager = CloudOutputDeviceManager() - manager._account.loginStateChanged.emit(True) - manager._update_timer.timeout.emit() - + # let the network send replies self.network.flushReplies() - + # get the created devices devices = self.app.getOutputDeviceManager().getOutputDevices() - self.assertEqual([CloudOutputDevice], [type(d) for d in devices]) - self.assertEqual(["R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC"], [d.key for d in devices]) - self.assertEqual(["ultimakersystem-ccbdd30044ec"], [d.host_name for d in devices]) + # get the server data + clusters = self.clusters_response["data"] + self.assertEqual([CloudOutputDevice] * len(clusters), [type(d) for d in devices]) + self.assertEqual({cluster["cluster_id"] for cluster in clusters}, {device.key for device in devices}) + self.assertEqual({cluster["host_name"] for cluster in clusters}, {device.host_name for device in devices}) + + ## Runs the initial request to retrieve the clusters. + def _loadData(self, network_mock): + network_mock.return_value = self.network + self.manager._account.loginStateChanged.emit(True) + self.manager._update_timer.timeout.emit() + + def test_device_is_created(self, network_mock): + # just create the cluster, it is checked at tearDown + self._loadData(network_mock) + + def test_device_is_updated(self, network_mock): + self._loadData(network_mock) + + # update the cluster from member variable, which is checked at tearDown + self.clusters_response["data"][0]["host_name"] = "New host name" + self.network.prepareGetClusters(self.clusters_response) + + self.manager._update_timer.timeout.emit() + + def test_device_is_removed(self, network_mock): + self._loadData(network_mock) + + # delete the cluster from member variable, which is checked at tearDown + del self.clusters_response["data"][1] + self.network.prepareGetClusters(self.clusters_response) + + self.manager._update_timer.timeout.emit()