mirror of
https://github.com/Ultimaker/Cura.git
synced 2025-07-10 08:17:49 -06:00
STAR-322: Using a test setup to run tests with Cura app
This commit is contained in:
parent
134f97d5f1
commit
f432d7c858
5 changed files with 58 additions and 17 deletions
|
@ -66,7 +66,8 @@ class NetworkManagerMock:
|
|||
# \return The data in the response.
|
||||
def prepareGetClusters(self, data: Optional[dict] = None) -> dict:
|
||||
data, response = self._getResponseData("clusters", data)
|
||||
self.prepareReply("GET", "https://api-staging.ultimaker.com/connect/v1/clusters", 200, response)
|
||||
status_code = 200 if "data" in data else int(data["errors"][0]["http_status"])
|
||||
self.prepareReply("GET", "https://api-staging.ultimaker.com/connect/v1/clusters", status_code, response)
|
||||
return data
|
||||
|
||||
## Gets the data that should be in the server's response in both dictionary and JSON-encoded bytes format.
|
||||
|
@ -87,6 +88,7 @@ class NetworkManagerMock:
|
|||
def flushReplies(self):
|
||||
for reply in self.replies.values():
|
||||
self.finished.emit(reply)
|
||||
self.reset()
|
||||
|
||||
## Deletes all prepared replies
|
||||
def reset(self):
|
||||
|
|
|
@ -95,9 +95,8 @@ class TestCloudApiClient(TestCase):
|
|||
def _callback(clusters):
|
||||
result.extend(clusters)
|
||||
|
||||
with mock.patch.object(Application, "getInstance", new = lambda: FixtureApplication()):
|
||||
api = CloudApiClient(account_mock, self._errorHandler)
|
||||
api.getClusters(_callback)
|
||||
api = CloudApiClient(account_mock, self._errorHandler)
|
||||
api.getClusters(_callback)
|
||||
|
||||
manager_mock.return_value.finished.emit(reply_mock)
|
||||
|
||||
|
|
|
@ -4,9 +4,9 @@ from unittest import TestCase
|
|||
from unittest.mock import patch
|
||||
|
||||
from cura.CuraApplication import CuraApplication
|
||||
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputDevice import CloudOutputDevice
|
||||
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputDeviceManager import CloudOutputDeviceManager
|
||||
from plugins.UM3NetworkPrinting.tests.Cloud.NetworkManagerMock import NetworkManagerMock
|
||||
from src.Cloud.CloudOutputDevice import CloudOutputDevice
|
||||
from src.Cloud.CloudOutputDeviceManager import CloudOutputDeviceManager
|
||||
from .NetworkManagerMock import NetworkManagerMock
|
||||
|
||||
|
||||
@patch("cura.NetworkClient.QNetworkAccessManager")
|
||||
|
@ -15,18 +15,19 @@ class TestCloudOutputDeviceManager(TestCase):
|
|||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app = CuraApplication.getInstance()
|
||||
if not self.app:
|
||||
self.app = CuraApplication()
|
||||
self.app.initialize()
|
||||
|
||||
self.network = NetworkManagerMock()
|
||||
self.manager = CloudOutputDeviceManager()
|
||||
self.clusters_response = self.network.prepareGetClusters()
|
||||
|
||||
## In the tear down method we check whether the state of the output device manager is what we expect based on the
|
||||
# mocked API response.
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
try:
|
||||
self._beforeTearDown()
|
||||
finally:
|
||||
super().tearDown()
|
||||
|
||||
## Before tear down method we check whether the state of the output device manager is what we expect based on the
|
||||
# mocked API response.
|
||||
def _beforeTearDown(self):
|
||||
# let the network send replies
|
||||
self.network.flushReplies()
|
||||
# get the created devices
|
||||
|
@ -82,6 +83,7 @@ class TestCloudOutputDeviceManager(TestCase):
|
|||
|
||||
self.assertTrue(self.app.getOutputDeviceManager().getOutputDevice(cluster1["cluster_id"]).isConnected())
|
||||
self.assertFalse(self.app.getOutputDeviceManager().getOutputDevice(cluster2["cluster_id"]).isConnected())
|
||||
self.assertEquals([], active_machine_mock.setMetaDataEntry.mock_calls)
|
||||
|
||||
@patch("cura.CuraApplication.CuraApplication.getGlobalContainerStack")
|
||||
def test_device_connects_by_network_key(self, global_container_stack_mock, network_mock):
|
||||
|
@ -97,11 +99,12 @@ class TestCloudOutputDeviceManager(TestCase):
|
|||
self.assertFalse(self.app.getOutputDeviceManager().getOutputDevice(cluster1["cluster_id"]).isConnected())
|
||||
self.assertTrue(self.app.getOutputDeviceManager().getOutputDevice(cluster2["cluster_id"]).isConnected())
|
||||
|
||||
active_machine_mock.setMetaDataEntry.assert_called_once_with("um_cloud_cluster_id", cluster2["cluster_id"])
|
||||
active_machine_mock.setMetaDataEntry.assert_called_with("um_cloud_cluster_id", cluster2["cluster_id"])
|
||||
|
||||
@patch("UM.Message.Message.show")
|
||||
def test_api_error(self, message_mock, network_mock):
|
||||
self.clusters_response = {"errors": [{"id": "notFound"}]}
|
||||
self.clusters_response = {"errors": [{"id": "notFound", "title": "Not found!", "http_status": "404"}]}
|
||||
self.network.prepareGetClusters(self.clusters_response)
|
||||
self._loadData(network_mock)
|
||||
self.network.flushReplies()
|
||||
message_mock.assert_called_once_with()
|
||||
|
|
|
@ -10,7 +10,7 @@ from PyQt5.QtCore import QByteArray
|
|||
|
||||
from UM.MimeTypeDatabase import MimeType
|
||||
from UM.Application import Application
|
||||
from ..src.SendMaterialJob import SendMaterialJob
|
||||
from src.SendMaterialJob import SendMaterialJob
|
||||
|
||||
|
||||
@patch("builtins.open", lambda _, __: io.StringIO("<xml></xml>"))
|
||||
|
|
37
plugins/UM3NetworkPrinting/tests/conftest.py
Normal file
37
plugins/UM3NetworkPrinting/tests/conftest.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
# Copyright (c) 2018 Ultimaker B.V.
|
||||
# Uranium is released under the terms of the LGPLv3 or higher.
|
||||
|
||||
import pytest
|
||||
import Arcus #Prevents error: "PyCapsule_GetPointer called with incorrect name" with conflicting SIP configurations between Arcus and PyQt: Import Arcus first!
|
||||
from UM.Qt.QtApplication import QtApplication # QT application import is required, even though it isn't used.
|
||||
from UM.Application import Application
|
||||
from UM.Signal import Signal
|
||||
|
||||
from cura.CuraApplication import CuraApplication
|
||||
|
||||
|
||||
# This mock application must extend from Application and not QtApplication otherwise some QObjects are created and
|
||||
# a segfault is raised.
|
||||
class FixtureApplication(CuraApplication):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
super().initialize()
|
||||
Signal._signalQueue = self
|
||||
|
||||
def functionEvent(self, event):
|
||||
event.call()
|
||||
|
||||
def parseCommandLine(self):
|
||||
pass
|
||||
|
||||
def processEvents(self):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def application():
|
||||
# Since we need to use it more that once, we create the application the first time and use its instance the second
|
||||
application = FixtureApplication.getInstance()
|
||||
if application is None:
|
||||
application = FixtureApplication()
|
||||
return application
|
Loading…
Add table
Add a link
Reference in a new issue