mirror of
https://github.com/Ultimaker/Cura.git
synced 2025-07-16 03:07:53 -06:00
STAR-322: First test cloud output device manager
This commit is contained in:
parent
e0b159e2ad
commit
45f51c3588
3 changed files with 71 additions and 14 deletions
|
@ -1,9 +1,10 @@
|
||||||
# Copyright (c) 2018 Ultimaker B.V.
|
# Copyright (c) 2018 Ultimaker B.V.
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
# Cura is released under the terms of the LGPLv3 or higher.
|
||||||
import json
|
import json
|
||||||
|
from typing import Dict, Tuple
|
||||||
from unittest.mock import MagicMock
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply, QNetworkRequest
|
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply
|
||||||
|
|
||||||
from UM.Signal import Signal
|
from UM.Signal import Signal
|
||||||
|
|
||||||
|
@ -20,23 +21,26 @@ class NetworkManagerMock:
|
||||||
"HEAD": QNetworkAccessManager.HeadOperation,
|
"HEAD": QNetworkAccessManager.HeadOperation,
|
||||||
}
|
}
|
||||||
|
|
||||||
def __getattr__(self, item):
|
def __init__(self):
|
||||||
operation = self._OPERATIONS.get(item.upper())
|
self.replies = {} # type: Dict[Tuple[str, str], QNetworkReply]
|
||||||
|
|
||||||
|
def __getattr__(self, method):
|
||||||
|
operation = self._OPERATIONS.get(method.upper())
|
||||||
if operation:
|
if operation:
|
||||||
return lambda request, *_: self._fakeReply(operation, request)
|
return lambda request, *_: self.replies[method.upper(), request.url().toString()]
|
||||||
return super().__getattribute__(item)
|
return super().__getattribute__(method)
|
||||||
|
|
||||||
def _fakeReply(self, operation: QNetworkAccessManager.Operation, request: QNetworkRequest) -> QNetworkReply:
|
def prepareResponse(self, method: str, url: str, status_code: int, response: dict) -> None:
|
||||||
reply_mock = MagicMock()
|
|
||||||
reply_mock.url = request.url
|
|
||||||
reply_mock.operation.return_value = operation
|
|
||||||
return reply_mock
|
|
||||||
|
|
||||||
def respond(self, method: str, url: str, status_code: int, response: dict) -> QNetworkReply:
|
|
||||||
reply_mock = MagicMock()
|
reply_mock = MagicMock()
|
||||||
reply_mock.url().toString.return_value = url
|
reply_mock.url().toString.return_value = url
|
||||||
reply_mock.operation.return_value = self._OPERATIONS[method]
|
reply_mock.operation.return_value = self._OPERATIONS[method]
|
||||||
reply_mock.attribute.return_value = status_code
|
reply_mock.attribute.return_value = status_code
|
||||||
reply_mock.readAll.return_value = json.dumps(response).encode()
|
reply_mock.readAll.return_value = json.dumps(response).encode()
|
||||||
self.finished.emit(reply_mock)
|
self.replies[method, url] = reply_mock
|
||||||
return reply_mock
|
|
||||||
|
def flushReplies(self):
|
||||||
|
for reply in self.replies.values():
|
||||||
|
self.finished.emit(reply)
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
self.replies.clear()
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
# Copyright (c) 2018 Ultimaker B.V.
|
||||||
|
# Cura is released under the terms of the LGPLv3 or higher.
|
||||||
|
from unittest import TestCase
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from cura.CuraApplication import CuraApplication
|
||||||
|
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputDevice import CloudOutputDevice
|
||||||
|
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputDeviceManager import CloudOutputDeviceManager
|
||||||
|
from plugins.UM3NetworkPrinting.tests.Cloud.NetworkManagerMock import NetworkManagerMock
|
||||||
|
|
||||||
|
|
||||||
|
@patch("cura.NetworkClient.QNetworkAccessManager")
|
||||||
|
class TestCloudOutputDeviceManager(TestCase):
|
||||||
|
app = CuraApplication.getInstance() or CuraApplication()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
self.app.initialize()
|
||||||
|
|
||||||
|
self.network = NetworkManagerMock()
|
||||||
|
self.network.prepareResponse(
|
||||||
|
"GET", "https://api-staging.ultimaker.com/connect/v1/clusters",
|
||||||
|
200, {
|
||||||
|
"data": [{
|
||||||
|
"cluster_id": "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq",
|
||||||
|
"host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050",
|
||||||
|
"host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807",
|
||||||
|
"is_online": False, "status": "inactive"
|
||||||
|
}, {
|
||||||
|
"cluster_id": "R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC",
|
||||||
|
"host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050",
|
||||||
|
"host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807",
|
||||||
|
"is_online": True, "status": "active"
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super().tearDown()
|
||||||
|
|
||||||
|
def test_device(self, network_mock):
|
||||||
|
network_mock.return_value = self.network
|
||||||
|
|
||||||
|
manager = CloudOutputDeviceManager()
|
||||||
|
manager._account.loginStateChanged.emit(True)
|
||||||
|
manager._update_timer.timeout.emit()
|
||||||
|
|
||||||
|
self.network.flushReplies()
|
||||||
|
|
||||||
|
devices = self.app.getOutputDeviceManager().getOutputDevices()
|
||||||
|
self.assertEqual([CloudOutputDevice], list(map(type, devices)))
|
2
plugins/UM3NetworkPrinting/tests/Cloud/__init__.py
Normal file
2
plugins/UM3NetworkPrinting/tests/Cloud/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
# Copyright (c) 2018 Ultimaker B.V.
|
||||||
|
# Cura is released under the terms of the LGPLv3 or higher.
|
Loading…
Add table
Add a link
Reference in a new issue