diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/requestPrintResponse.json b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/requestPrintResponse.json new file mode 100644 index 0000000000..e69589784d --- /dev/null +++ b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/requestPrintResponse.json @@ -0,0 +1,7 @@ +{ + "data": { + "cluster_job_id": "", + "job_id": "db34b096-c4d5-46f3-bea7-da6a19905e6c", + "status": "queued" + } +} diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudApiClient.py b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudApiClient.py index 91f367f9ad..07c7733ac1 100644 --- a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudApiClient.py +++ b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudApiClient.py @@ -2,134 +2,126 @@ # Copyright (c) 2018 Ultimaker B.V. # Cura is released under the terms of the LGPLv3 or higher. import json -from typing import Dict, Tuple -from unittest import TestCase, mock +import os +from unittest import TestCase from unittest.mock import patch, MagicMock -from PyQt5.QtCore import QByteArray -from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply - -from UM.Application import Application -from UM.Signal import Signal from cura.CuraApplication import CuraApplication -from plugins.UM3NetworkPrinting.src.Cloud.CloudApiClient import CloudApiClient -from plugins.UM3NetworkPrinting.src.Cloud.Models import CloudCluster, CloudErrorObject - -# This mock application must extend from Application and not QtApplication otherwise some QObjects are created and -# a segfault is raised. -class FixtureApplication(Application): - def __init__(self): - super().__init__(name = "test", version = "1.0", api_version = "5.0.0") - super().initialize() - Signal._signalQueue = self - - def functionEvent(self, event): - event.call() - - def parseCommandLine(self): - pass - - def processEvents(self): - pass - - def getRenderer(self): - return MagicMock() - -class ManagerMock: - finished = Signal() - authenticationRequired = Signal() - - def __init__(self, reply): - self.reply = reply - - def get(self, request): - self.reply.url.return_value = request.url() - - return self.reply - -class ManagerMock2: - finished = Signal() - authenticationRequired = Signal() - - def get(self, request): - reply_mock = MagicMock() - reply_mock.url = request.url - reply_mock.operation.return_value = QNetworkAccessManager.GetOperation - return reply_mock - - @staticmethod - def createReply(method: str, url: str, status_code: int, response: dict): - reply_mock = MagicMock() - reply_mock.url().toString.return_value = url - reply_mock.operation.return_value = { - "GET": QNetworkAccessManager.GetOperation, - "POST": QNetworkAccessManager.PostOperation, - "PUT": QNetworkAccessManager.PutOperation, - "DELETE": QNetworkAccessManager.DeleteOperation, - "HEAD": QNetworkAccessManager.HeadOperation, - }[method] - reply_mock.attribute.return_value = status_code - reply_mock.readAll.return_value = json.dumps(response).encode() - return reply_mock +from src.Cloud.CloudApiClient import CloudApiClient +from src.Cloud.CloudOutputDeviceManager import CloudOutputDeviceManager +from src.Cloud.Models.CloudJobResponse import CloudJobResponse +from src.Cloud.Models.CloudJobUploadRequest import CloudJobUploadRequest +from .NetworkManagerMock import NetworkManagerMock +@patch("cura.NetworkClient.QNetworkAccessManager") class TestCloudApiClient(TestCase): - - app = CuraApplication.getInstance() or CuraApplication - - def _errorHandler(self, errors: [CloudErrorObject]): + def _errorHandler(self): pass - @patch("cura.NetworkClient.QNetworkAccessManager") - @patch("cura.API.Account") - def test_GetClusters(self, account_mock, manager_mock): - reply_mock = MagicMock() - reply_mock.operation.return_value = 2 - reply_mock.attribute.return_value = 200 - reply_mock.readAll.return_value = b'{"data": [{"cluster_id": "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq", "host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050", "host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807", "is_online": false, "status": "inactive"}, {"cluster_id": "R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC", "host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050", "host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807", "is_online": true, "status": "active"}]}' - manager_mock.return_value = ManagerMock(reply_mock) - account_mock.isLoggedIn.return_value = True + def setUp(self): + super().setUp() + self.account = MagicMock() + self.account.isLoggedIn.return_value = True + + self.app = CuraApplication.getInstance() + self.network = NetworkManagerMock() + self.manager = CloudOutputDeviceManager() + self.api = CloudApiClient(self.account, self._errorHandler) + + def test_GetClusters(self, network_mock): + network_mock.return_value = self.network result = [] - def _callback(clusters): - result.extend(clusters) + with open("{}/Fixtures/getClusters.json".format(os.path.dirname(__file__)), "rb") as f: + response = f.read() - api = CloudApiClient(account_mock, self._errorHandler) - api.getClusters(_callback) + self.network.prepareReply("GET", "https://api-staging.ultimaker.com/connect/v1/clusters", 200, response) + # the callback is a function that adds the result of the call to getClusters to the result list + self.api.getClusters(lambda clusters: result.extend(clusters)) - manager_mock.return_value.finished.emit(reply_mock) + self.network.flushReplies() self.assertEqual(2, len(result)) - @patch("cura.NetworkClient.QNetworkAccessManager") - @patch("cura.API.Account") - def test_GetClusters2(self, account_mock, manager_mock): - manager = ManagerMock2() - manager_mock.return_value = manager - account_mock.isLoggedIn.return_value = True + def test_getClusterStatus(self, network_mock): + network_mock.return_value = self.network result = [] - # with mock.patch.object(Application, "getInstance", new = lambda: FixtureApplication()): - api = CloudApiClient(account_mock, self._errorHandler) - api.getClusters(lambda clusters: result.extend(clusters)) + with open("{}/Fixtures/getClusterStatusResponse.json".format(os.path.dirname(__file__)), "rb") as f: + response = f.read() - manager.finished.emit(ManagerMock2.createReply( - "GET", "https://api-staging.ultimaker.com/connect/v1/clusters", - 200, { - "data": [{ - "cluster_id": "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq", - "host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050", - "host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807", - "is_online": False, "status": "inactive" - }, { - "cluster_id": "R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC", - "host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050", - "host_name": "ultimakersystem-ccbdd30044ec", "host_version": "5.1.2.20180807", - "is_online": True, "status": "active" - }] - } - )) + self.network.prepareReply("GET", + "https://api-staging.ultimaker.com/connect/v1/clusters/R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC/status", + 200, response + ) + self.api.getClusterStatus("R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC", lambda status: result.append(status)) - self.assertEqual(2, len(result)) + self.network.flushReplies() + + self.assertEqual(len(result), 1) + status = result[0] + + self.assertEqual(len(status.printers), 2) + self.assertEqual(len(status.print_jobs), 1) + + def test_requestUpload(self, network_mock): + network_mock.return_value = self.network + results = [] + + with open("{}/Fixtures/requestUploadResponse.json".format(os.path.dirname(__file__)), "rb") as f: + response = f.read() + + self.network.prepareReply("PUT", "https://api-staging.ultimaker.com/cura/v1/jobs/upload", 200, response) + self.api.requestUpload(CloudJobUploadRequest(job_name = "job name", file_size = 143234, content_type = "text/plain"), + lambda r: results.append(r)) + self.network.flushReplies() + + self.assertEqual(results[0].content_type, "text/plain") + self.assertEqual(results[0].status, "uploading") + + def test_uploadMesh(self, network_mock): + network_mock.return_value = self.network + results = [] + progress = MagicMock() + + with open("{}/Fixtures/requestUploadResponse.json".format(os.path.dirname(__file__)), "rb") as f: + thedata = json.loads(f.read().decode("ascii")) + data = thedata["data"] + upload_response = CloudJobResponse(**data) + + self.network.prepareReply("PUT", upload_response.upload_url, 200, + '{ data : "" }') # Network client doesn't look into the reply + + self.api.uploadMesh(upload_response, b'', lambda job_id: results.append(job_id), + progress.advance, progress.error) + + self.network.flushReplies() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0], upload_response.job_id) + + def test_requestPrint(self, network_mock): + network_mock.return_value = self.network + results = [] + + cluster_id = "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8" + job_id = "db34b096-c4d5-46f3-bea7-da6a19905e6c" + + with open("{}/Fixtures/requestPrintResponse.json".format(os.path.dirname(__file__)), "rb") as f: + response = f.read() + + self.network.prepareReply("POST", + "https://api-staging.ultimaker.com/connect/v1/clusters/{}/print/{}" + .format(cluster_id, job_id), + 200, response) + + self.api.requestPrint(cluster_id, job_id, lambda r: results.append(r)) + + self.network.flushReplies() + + self.assertEqual(len(results), 1) + self.assertEqual(results[0].job_id, "db34b096-c4d5-46f3-bea7-da6a19905e6c") + self.assertEqual(results[0].status, "queued")