STAR-322: Testing more of the device manager

This commit is contained in:
Daniel Schiavini 2018-12-10 10:43:22 +01:00
parent c495ade2d3
commit 75c2e25a01
3 changed files with 88 additions and 36 deletions

View file

@ -0,0 +1,17 @@
{
"data": [{
"cluster_id": "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq",
"host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050",
"host_name": "ultimakersystem-ccbdd30044ec",
"host_version": "5.0.0.20170101",
"is_online": true,
"status": "active"
}, {
"cluster_id": "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8",
"host_guid": "e0ace90a-91ee-1257-4403-e8050a44c9b7",
"host_name": "ultimakersystem-ccbdd30044ec",
"host_version": "5.1.2.20180807",
"is_online": true,
"status": "active"
}]
}

View file

@ -1,11 +1,13 @@
# Copyright (c) 2018 Ultimaker B.V. # Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher. # Cura is released under the terms of the LGPLv3 or higher.
import json import json
from typing import Dict, Tuple import os
from typing import Dict, Tuple, Optional
from unittest.mock import MagicMock from unittest.mock import MagicMock
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply
from UM.Logger import Logger
from UM.Signal import Signal from UM.Signal import Signal
@ -26,6 +28,7 @@ class NetworkManagerMock:
"HEAD": QNetworkAccessManager.HeadOperation, "HEAD": QNetworkAccessManager.HeadOperation,
} }
## Initializes the network manager mock.
def __init__(self): def __init__(self):
# a dict with the prepared replies, using the format {(http_method, url): reply} # a dict with the prepared replies, using the format {(http_method, url): reply}
self.replies = {} # type: Dict[Tuple[str, str], QNetworkReply] self.replies = {} # type: Dict[Tuple[str, str], QNetworkReply]
@ -48,33 +51,37 @@ class NetworkManagerMock:
# \param method: The HTTP method. # \param method: The HTTP method.
# \param url: The URL being requested. # \param url: The URL being requested.
# \param status_code: The HTTP status code for the response. # \param status_code: The HTTP status code for the response.
# \param response: A dictionary with the response from the server (this is converted to JSON). # \param response: The response body from the server (generally json-encoded).
def prepareReply(self, method: str, url: str, status_code: int, response: dict) -> None: def prepareReply(self, method: str, url: str, status_code: int, response: bytes) -> None:
reply_mock = MagicMock() reply_mock = MagicMock()
reply_mock.url().toString.return_value = url reply_mock.url().toString.return_value = url
reply_mock.operation.return_value = self._OPERATIONS[method] reply_mock.operation.return_value = self._OPERATIONS[method]
reply_mock.attribute.return_value = status_code reply_mock.attribute.return_value = status_code
reply_mock.readAll.return_value = json.dumps(response).encode() reply_mock.readAll.return_value = response
self.replies[method, url] = reply_mock self.replies[method, url] = reply_mock
Logger.log("i", "Prepared mock {}-response to {} {}", status_code, method, url)
## Prepares a reply for the API call to get clusters. ## Prepares a reply for the API call to get clusters.
def prepareGetClusters(self) -> None: # \param data: The data the server should return. If not given, a default response will be used.
self.prepareReply( # \return The data in the response.
"GET", "https://api-staging.ultimaker.com/connect/v1/clusters", def prepareGetClusters(self, data: Optional[dict] = None) -> dict:
200, { data, response = self._getResponseData("clusters", data)
"data": [{ self.prepareReply("GET", "https://api-staging.ultimaker.com/connect/v1/clusters", 200, response)
"cluster_id": "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq", return data
"host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050",
"host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807", ## Gets the data that should be in the server's response in both dictionary and JSON-encoded bytes format.
"is_online": False, "status": "inactive" # \param fixture_name: The name of the fixture.
}, { # \param data: The data that should be returned (optional)
"cluster_id": "R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC", # \return The server's response in both dictionary and JSON-encoded bytes format.
"host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050", @staticmethod
"host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807", def _getResponseData(fixture_name: str, data: Optional[dict] = None) -> Tuple[dict, bytes]:
"is_online": True, "status": "active" if data is None:
}] with open("{}/Fixtures/{}.json".format(os.path.dirname(__file__), fixture_name), "rb") as f:
} response = f.read()
) data = json.loads(response.decode())
else:
response = json.dumps(data).encode()
return data, response
## Emits the signal that the reply is ready to all prepared replies. ## Emits the signal that the reply is ready to all prepared replies.
def flushReplies(self): def flushReplies(self):

View file

@ -11,28 +11,56 @@ from plugins.UM3NetworkPrinting.tests.Cloud.NetworkManagerMock import NetworkMan
@patch("cura.NetworkClient.QNetworkAccessManager") @patch("cura.NetworkClient.QNetworkAccessManager")
class TestCloudOutputDeviceManager(TestCase): class TestCloudOutputDeviceManager(TestCase):
app = CuraApplication.getInstance() or CuraApplication()
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.app = CuraApplication.getInstance()
if not self.app:
self.app = CuraApplication()
self.app.initialize() self.app.initialize()
self.network = NetworkManagerMock() self.network = NetworkManagerMock()
self.network.prepareGetClusters() self.manager = CloudOutputDeviceManager()
self.clusters_response = self.network.prepareGetClusters()
## In the tear down method we check whether the state of the output device manager is what we expect based on the
# mocked API response.
def tearDown(self): def tearDown(self):
super().tearDown() super().tearDown()
# let the network send replies
def test_device(self, network_mock):
network_mock.return_value = self.network
manager = CloudOutputDeviceManager()
manager._account.loginStateChanged.emit(True)
manager._update_timer.timeout.emit()
self.network.flushReplies() self.network.flushReplies()
# get the created devices
devices = self.app.getOutputDeviceManager().getOutputDevices() devices = self.app.getOutputDeviceManager().getOutputDevices()
self.assertEqual([CloudOutputDevice], [type(d) for d in devices]) # get the server data
self.assertEqual(["R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC"], [d.key for d in devices]) clusters = self.clusters_response["data"]
self.assertEqual(["ultimakersystem-ccbdd30044ec"], [d.host_name for d in devices]) self.assertEqual([CloudOutputDevice] * len(clusters), [type(d) for d in devices])
self.assertEqual({cluster["cluster_id"] for cluster in clusters}, {device.key for device in devices})
self.assertEqual({cluster["host_name"] for cluster in clusters}, {device.host_name for device in devices})
## Runs the initial request to retrieve the clusters.
def _loadData(self, network_mock):
network_mock.return_value = self.network
self.manager._account.loginStateChanged.emit(True)
self.manager._update_timer.timeout.emit()
def test_device_is_created(self, network_mock):
# just create the cluster, it is checked at tearDown
self._loadData(network_mock)
def test_device_is_updated(self, network_mock):
self._loadData(network_mock)
# update the cluster from member variable, which is checked at tearDown
self.clusters_response["data"][0]["host_name"] = "New host name"
self.network.prepareGetClusters(self.clusters_response)
self.manager._update_timer.timeout.emit()
def test_device_is_removed(self, network_mock):
self._loadData(network_mock)
# delete the cluster from member variable, which is checked at tearDown
del self.clusters_response["data"][1]
self.network.prepareGetClusters(self.clusters_response)
self.manager._update_timer.timeout.emit()