diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py index b3abc74ff4..448aa4d2e7 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py @@ -92,7 +92,7 @@ class CloudApiClient(NetworkClient): # \param job_id: The ID of the print job. # \param on_finished: The function to be called after the result is parsed. def requestPrint(self, cluster_id: str, job_id: str, on_finished: Callable[[CloudPrintResponse], any]) -> None: - url = "{}/cluster/{}/print/{}".format(self.CLUSTER_API_ROOT, cluster_id, job_id) + url = "{}/clusters/{}/print/{}".format(self.CLUSTER_API_ROOT, cluster_id, job_id) self.post(url, data = "", on_finished=self._wrapCallback(on_finished, CloudPrintResponse)) ## We override _createEmptyRequest in order to add the user credentials. diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py index a137e5261f..17acbe2e3f 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py @@ -217,7 +217,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): printer.updateOutputModel(model) # Always have an active printer - if not self._active_printer: + if self._printers and not self._active_printer: self.setActivePrinter(self._printers[0]) self.printersChanged.emit() # TODO: Make this more efficient by not updating every request diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/__init__.py b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/__init__.py new file mode 100644 index 0000000000..777afc92c2 --- /dev/null +++ b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) 2018 Ultimaker B.V. +# Cura is released under the terms of the LGPLv3 or higher. +import json +import os + + +def readFixture(fixture_name: str) -> bytes: + with open("{}/{}.json".format(os.path.dirname(__file__), fixture_name), "rb") as f: + return f.read() + +def parseFixture(fixture_name: str) -> dict: + return json.loads(readFixture(fixture_name).decode()) diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/clusters.json b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/getClusters.json similarity index 100% rename from plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/clusters.json rename to plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/getClusters.json diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/postJobPrintResponse.json b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/postJobPrintResponse.json new file mode 100644 index 0000000000..8b9574359f --- /dev/null +++ b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/postJobPrintResponse.json @@ -0,0 +1,7 @@ +{ + "data": { + "cluster_job_id": "9a59d8e9-91d3-4ff6-b4cb-9db91c4094dd", + "job_id": "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE=", + "status": "queued" + } +} diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/putJobUploadResponse.json b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/putJobUploadResponse.json new file mode 100644 index 0000000000..0474862720 --- /dev/null +++ b/plugins/UM3NetworkPrinting/tests/Cloud/Fixtures/putJobUploadResponse.json @@ -0,0 +1,10 @@ +{ + "data": { + "content_type": "text/plain", + "download_url": "https://api.ultimaker.com/print-job-download", + "job_id": "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE=", + "job_name": "Ultimaker Robot v3.0", + "status": "queued", + "upload_url": "https://api.ultimaker.com/print-job-upload" + } +} diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py b/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py index c7dc1bac35..5a76672b83 100644 --- a/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py +++ b/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py @@ -1,10 +1,10 @@ # Copyright (c) 2018 Ultimaker B.V. # Cura is released under the terms of the LGPLv3 or higher. import json -from typing import Dict, Tuple, Union +from typing import Dict, Tuple, Union, Optional from unittest.mock import MagicMock -from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply +from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply, QNetworkRequest from UM.Logger import Logger from UM.Signal import Signal @@ -25,23 +25,30 @@ class NetworkManagerMock: "PUT": QNetworkAccessManager.PutOperation, "DELETE": QNetworkAccessManager.DeleteOperation, "HEAD": QNetworkAccessManager.HeadOperation, - } + } # type: Dict[str, int] ## Initializes the network manager mock. - def __init__(self): + def __init__(self) -> None: # a dict with the prepared replies, using the format {(http_method, url): reply} self.replies = {} # type: Dict[Tuple[str, str], QNetworkReply] + self.request_bodies = {} # type: Dict[Tuple[str, str], bytes] ## Mock implementation of the get, post, put, delete and head methods from the network manager. # Since the methods are very simple and the same it didn't make sense to repeat the code. # \param method: The method being called. # \return The mocked function, if the method name is known. Defaults to the standard getattr function. - def __getattr__(self, method: str): + def __getattr__(self, method: str) -> any: + ## This mock implementation will simply return the reply from the prepared ones. + # it raises a KeyError if requests are done without being prepared. + def doRequest(request: QNetworkRequest, body: Optional[bytes] = None, *_): + key = method.upper(), request.url().toString() + if body: + self.request_bodies[key] = body + return self.replies[key] + operation = self._OPERATIONS.get(method.upper()) if operation: - # this mock implementation will simply return the reply from the prepared ones. - # it raises a KeyError if requests are done without being prepared. - return lambda request, *_: self.replies[method.upper(), request.url().toString()] + return doRequest # the attribute is not one of the implemented methods, default to the standard implementation. return getattr(super(), method) @@ -60,12 +67,18 @@ class NetworkManagerMock: self.replies[method, url] = reply_mock Logger.log("i", "Prepared mock {}-response to {} {}", status_code, method, url) + ## Gets the request that was sent to the network manager for the given method and URL. + # \param method: The HTTP method. + # \param url: The URL. + def getRequestBody(self, method: str, url: str) -> Optional[bytes]: + return self.request_bodies.get((method.upper(), url)) + ## Emits the signal that the reply is ready to all prepared replies. - def flushReplies(self): + def flushReplies(self) -> None: for reply in self.replies.values(): self.finished.emit(reply) self.reset() ## Deletes all prepared replies - def reset(self): + def reset(self) -> None: self.replies.clear() diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDevice.py b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDevice.py index 4ed2767288..6eca5d250d 100644 --- a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDevice.py +++ b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDevice.py @@ -1,25 +1,28 @@ # Copyright (c) 2018 Ultimaker B.V. # Cura is released under the terms of the LGPLv3 or higher. import json -import os from unittest import TestCase from unittest.mock import patch, MagicMock +from UM.Scene.SceneNode import SceneNode from cura.CuraApplication import CuraApplication from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel from src.Cloud.CloudApiClient import CloudApiClient -from src.Cloud.CloudOutputController import CloudOutputController from src.Cloud.CloudOutputDevice import CloudOutputDevice +from tests.Cloud.Fixtures import readFixture, parseFixture from .NetworkManagerMock import NetworkManagerMock @patch("cura.NetworkClient.QNetworkAccessManager") class TestCloudOutputDevice(TestCase): CLUSTER_ID = "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq" + JOB_ID = "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE=" HOST_NAME = "ultimakersystem-ccbdd30044ec" - URL = "https://api-staging.ultimaker.com/connect/v1/clusters/{}/status".format(CLUSTER_ID) - with open("{}/Fixtures/getClusterStatusResponse.json".format(os.path.dirname(__file__)), "rb") as f: - DEFAULT_RESPONSE = f.read() + + BASE_URL = "https://api-staging.ultimaker.com" + STATUS_URL = "{}/connect/v1/clusters/{}/status".format(BASE_URL, CLUSTER_ID) + PRINT_URL = "{}/connect/v1/clusters/{}/print/{}".format(BASE_URL, CLUSTER_ID, JOB_ID) + REQUEST_UPLOAD_URL = "{}/cura/v1/jobs/upload".format(BASE_URL) def setUp(self): super().setUp() @@ -28,21 +31,12 @@ class TestCloudOutputDevice(TestCase): self.account = MagicMock(isLoggedIn=True, accessToken="TestAccessToken") self.onError = MagicMock() self.device = CloudOutputDevice(CloudApiClient(self.account, self.onError), self.CLUSTER_ID, self.HOST_NAME) - self.cluster_status = json.loads(self.DEFAULT_RESPONSE.decode()) - self.network.prepareReply("GET", self.URL, 200, self.DEFAULT_RESPONSE) + self.cluster_status = parseFixture("getClusterStatusResponse") + self.network.prepareReply("GET", self.STATUS_URL, 200, readFixture("getClusterStatusResponse")) def tearDown(self): - try: - self._beforeTearDown() - finally: - super().tearDown() - - ## Before tear down method we check whether the state of the output device manager is what we expect based on the - # mocked API response. - def _beforeTearDown(self): - # let the network send replies + super().tearDown() self.network.flushReplies() - # TODO def test_status(self, network_mock): network_mock.return_value = self.network @@ -67,7 +61,66 @@ class TestCloudOutputDevice(TestCase): self.assertEqual([controller_fields, controller_fields], [printer.getController().__dict__ for printer in self.device.printers]) + self.assertEqual(["UM3PrintJobOutputModel"], [type(printer).__name__ for printer in self.device.printJobs]) self.assertEqual({job["uuid"] for job in self.cluster_status["data"]["print_jobs"]}, {job.key for job in self.device.printJobs}) - self.assertEqual(["Daniel Testing"], [job.owner for job in self.device.printJobs]) - self.assertEqual(["UM3_dragon"], [job.name for job in self.device.printJobs]) + self.assertEqual({job["owner"] for job in self.cluster_status["data"]["print_jobs"]}, + {job.owner for job in self.device.printJobs}) + self.assertEqual({job["name"] for job in self.cluster_status["data"]["print_jobs"]}, + {job.name for job in self.device.printJobs}) + + def test_remove_print_job(self, network_mock): + network_mock.return_value = self.network + self.device._update() + self.network.flushReplies() + self.assertEqual(1, len(self.device.printJobs)) + + self.cluster_status["data"]["print_jobs"].clear() + self.network.prepareReply("GET", self.STATUS_URL, 200, self.cluster_status) + self.device._update() + self.network.flushReplies() + self.assertEqual([], self.device.printJobs) + + def test_remove_printers(self, network_mock): + network_mock.return_value = self.network + self.device._update() + self.network.flushReplies() + self.assertEqual(2, len(self.device.printers)) + + self.cluster_status["data"]["printers"].clear() + self.network.prepareReply("GET", self.STATUS_URL, 200, self.cluster_status) + self.device._update() + self.network.flushReplies() + self.assertEqual([], self.device.printers) + + @patch("cura.CuraApplication.CuraApplication.getGlobalContainerStack") + def test_print_to_cloud(self, global_container_stack_mock, network_mock): + active_machine_mock = global_container_stack_mock.return_value + active_machine_mock.getMetaDataEntry.side_effect = {"file_formats": "application/gzip"}.get + + request_upload_response = parseFixture("putJobUploadResponse") + request_print_response = parseFixture("postJobPrintResponse") + self.network.prepareReply("PUT", self.REQUEST_UPLOAD_URL, 201, request_upload_response) + self.network.prepareReply("PUT", request_upload_response["data"]["upload_url"], 201, b"{}") + self.network.prepareReply("POST", self.PRINT_URL, 200, request_print_response) + + network_mock.return_value = self.network + file_handler = MagicMock() + file_handler.getSupportedFileTypesWrite.return_value = [{ + "extension": "gcode.gz", + "mime_type": "application/gzip", + "mode": 2, + }] + file_handler.getWriterByMimeType.return_value.write.side_effect = \ + lambda stream, nodes: stream.write(str(nodes).encode()) + + scene_nodes = [SceneNode()] + self.device.requestWrite(scene_nodes, file_handler=file_handler, file_name="FileName") + + self.network.flushReplies() + self.assertEqual({"data": {"content_type": "application/gzip", "file_size": 57, "job_name": "FileName"}}, + json.loads(self.network.getRequestBody("PUT", self.REQUEST_UPLOAD_URL).decode())) + self.assertEqual(str(scene_nodes).encode(), + self.network.getRequestBody("PUT", request_upload_response["data"]["upload_url"])) + + self.assertIsNone(self.network.getRequestBody("POST", self.PRINT_URL)) diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDeviceManager.py b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDeviceManager.py index 9e980a8681..420d71d0fe 100644 --- a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDeviceManager.py +++ b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudOutputDeviceManager.py @@ -1,29 +1,26 @@ # Copyright (c) 2018 Ultimaker B.V. # Cura is released under the terms of the LGPLv3 or higher. -import json -import os from unittest import TestCase from unittest.mock import patch from cura.CuraApplication import CuraApplication from src.Cloud.CloudOutputDevice import CloudOutputDevice from src.Cloud.CloudOutputDeviceManager import CloudOutputDeviceManager +from tests.Cloud.Fixtures import parseFixture, readFixture from .NetworkManagerMock import NetworkManagerMock @patch("cura.NetworkClient.QNetworkAccessManager") class TestCloudOutputDeviceManager(TestCase): URL = "https://api-staging.ultimaker.com/connect/v1/clusters" - with open("{}/Fixtures/clusters.json".format(os.path.dirname(__file__)), "rb") as f: - DEFAULT_RESPONSE = f.read() def setUp(self): super().setUp() self.app = CuraApplication.getInstance() self.network = NetworkManagerMock() self.manager = CloudOutputDeviceManager() - self.clusters_response = json.loads(self.DEFAULT_RESPONSE.decode()) - self.network.prepareReply("GET", self.URL, 200, self.DEFAULT_RESPONSE) + self.clusters_response = parseFixture("getClusters") + self.network.prepareReply("GET", self.URL, 200, readFixture("getClusters")) def tearDown(self): try: