STAR-322: Mocking the CuraApp

This commit is contained in:
Daniel Schiavini 2018-12-17 14:41:28 +01:00
parent 3b367938de
commit 75d7d49349
6 changed files with 238 additions and 73 deletions

View file

@ -1,14 +1,14 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from unittest import TestCase
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from cura.CuraApplication import CuraApplication
from UM.OutputDevice.OutputDeviceManager import OutputDeviceManager
from cura.CuraConstants import CuraCloudAPIRoot
from src.Cloud.CloudOutputDevice import CloudOutputDevice
from src.Cloud.CloudOutputDeviceManager import CloudOutputDeviceManager
from tests.Cloud.Fixtures import parseFixture, readFixture
from .NetworkManagerMock import NetworkManagerMock
from .NetworkManagerMock import NetworkManagerMock, FakeSignal
class TestCloudOutputDeviceManager(TestCase):
@ -18,9 +18,19 @@ class TestCloudOutputDeviceManager(TestCase):
def setUp(self):
super().setUp()
self.app = CuraApplication.getInstance()
self.app = MagicMock()
self.device_manager = OutputDeviceManager()
self.app.getOutputDeviceManager.return_value = self.device_manager
self.patches = [patch("UM.Qt.QtApplication.QtApplication.getInstance", return_value=self.app),
patch("UM.Application.Application.getInstance", return_value=self.app)]
for patched_method in self.patches:
patched_method.start()
self.network = NetworkManagerMock()
with patch("src.Cloud.CloudApiClient.QNetworkAccessManager", return_value = self.network):
self.timer = MagicMock(timeout = FakeSignal())
with patch("src.Cloud.CloudApiClient.QNetworkAccessManager", return_value = self.network), \
patch("src.Cloud.CloudOutputDeviceManager.QTimer", return_value = self.timer):
self.manager = CloudOutputDeviceManager()
self.clusters_response = parseFixture("getClusters")
self.network.prepareReply("GET", self.URL, 200, readFixture("getClusters"))
@ -28,7 +38,11 @@ class TestCloudOutputDeviceManager(TestCase):
def tearDown(self):
try:
self._beforeTearDown()
self.network.flushReplies()
self.manager.stop()
for patched_method in self.patches:
patched_method.stop()
finally:
super().tearDown()
@ -38,8 +52,7 @@ class TestCloudOutputDeviceManager(TestCase):
# let the network send replies
self.network.flushReplies()
# get the created devices
device_manager = self.app.getOutputDeviceManager()
devices = device_manager.getOutputDevices()
devices = self.device_manager.getOutputDevices()
# get the server data
clusters = self.clusters_response.get("data", [])
self.assertEqual([CloudOutputDevice] * len(clusters), [type(d) for d in devices])
@ -48,13 +61,12 @@ class TestCloudOutputDeviceManager(TestCase):
key=lambda device_dict: device_dict["host_version"]))
for device in clusters:
device_manager.getOutputDevice(device["cluster_id"]).close()
device_manager.removeOutputDevice(device["cluster_id"])
self.device_manager.getOutputDevice(device["cluster_id"]).close()
self.device_manager.removeOutputDevice(device["cluster_id"])
## Runs the initial request to retrieve the clusters.
def _loadData(self):
self.manager.start()
self.manager._onLoginStateChanged(is_logged_in = True)
self.network.flushReplies()
def test_device_is_created(self):
@ -79,22 +91,20 @@ class TestCloudOutputDeviceManager(TestCase):
self.manager._update_timer.timeout.emit()
@patch("cura.CuraApplication.CuraApplication.getGlobalContainerStack")
def test_device_connects_by_cluster_id(self, global_container_stack_mock):
active_machine_mock = global_container_stack_mock.return_value
def test_device_connects_by_cluster_id(self):
active_machine_mock = self.app.getGlobalContainerStack.return_value
cluster1, cluster2 = self.clusters_response["data"]
cluster_id = cluster1["cluster_id"]
active_machine_mock.getMetaDataEntry.side_effect = {"um_cloud_cluster_id": cluster_id}.get
self._loadData()
self.assertTrue(self.app.getOutputDeviceManager().getOutputDevice(cluster1["cluster_id"]).isConnected())
self.assertFalse(self.app.getOutputDeviceManager().getOutputDevice(cluster2["cluster_id"]).isConnected())
self.assertTrue(self.device_manager.getOutputDevice(cluster1["cluster_id"]).isConnected())
self.assertFalse(self.device_manager.getOutputDevice(cluster2["cluster_id"]).isConnected())
self.assertEquals([], active_machine_mock.setMetaDataEntry.mock_calls)
@patch("cura.CuraApplication.CuraApplication.getGlobalContainerStack")
def test_device_connects_by_network_key(self, global_container_stack_mock):
active_machine_mock = global_container_stack_mock.return_value
def test_device_connects_by_network_key(self):
active_machine_mock = self.app.getGlobalContainerStack.return_value
cluster1, cluster2 = self.clusters_response["data"]
network_key = cluster2["host_name"] + ".ultimaker.local"
@ -102,8 +112,9 @@ class TestCloudOutputDeviceManager(TestCase):
self._loadData()
self.assertFalse(self.app.getOutputDeviceManager().getOutputDevice(cluster1["cluster_id"]).isConnected())
self.assertTrue(self.app.getOutputDeviceManager().getOutputDevice(cluster2["cluster_id"]).isConnected())
self.assertEqual([False, True],
[self.device_manager.getOutputDevice(cluster["cluster_id"]).isConnected()
for cluster in (cluster1, cluster2)])
active_machine_mock.setMetaDataEntry.assert_called_with("um_cloud_cluster_id", cluster2["cluster_id"])