mirror of
https://github.com/Ultimaker/Cura.git
synced 2025-07-13 01:37:51 -06:00
Remove tests that are no longer relevant
This commit is contained in:
parent
11bd520c98
commit
f34fc7e6f9
12 changed files with 1 additions and 656 deletions
|
@ -121,8 +121,7 @@ class ZeroConfClient:
|
||||||
address = '.'.join(map(lambda n: str(n), info.address))
|
address = '.'.join(map(lambda n: str(n), info.address))
|
||||||
self.addedNetworkCluster.emit(str(name), address, info.properties)
|
self.addedNetworkCluster.emit(str(name), address, info.properties)
|
||||||
else:
|
else:
|
||||||
Logger.log("w",
|
Logger.log("w", "The type of the found device is '%s', not 'printer'." % type_of_device)
|
||||||
"The type of the found device is '%s', not 'printer'! Ignoring.." % type_of_device)
|
|
||||||
else:
|
else:
|
||||||
Logger.log("w", "Could not get information about %s" % name)
|
Logger.log("w", "Could not get information about %s" % name)
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -1,12 +0,0 @@
|
||||||
# Copyright (c) 2018 Ultimaker B.V.
|
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
def readFixture(fixture_name: str) -> bytes:
|
|
||||||
with open("{}/{}.json".format(os.path.dirname(__file__), fixture_name), "rb") as f:
|
|
||||||
return f.read()
|
|
||||||
|
|
||||||
def parseFixture(fixture_name: str) -> dict:
|
|
||||||
return json.loads(readFixture(fixture_name).decode())
|
|
|
@ -1,95 +0,0 @@
|
||||||
{
|
|
||||||
"data": {
|
|
||||||
"generated_time": "2018-12-10T08:23:55.110Z",
|
|
||||||
"printers": [
|
|
||||||
{
|
|
||||||
"configuration": [
|
|
||||||
{
|
|
||||||
"extruder_index": 0,
|
|
||||||
"material": {
|
|
||||||
"material": "empty"
|
|
||||||
},
|
|
||||||
"print_core_id": "AA 0.4"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"extruder_index": 1,
|
|
||||||
"material": {
|
|
||||||
"material": "empty"
|
|
||||||
},
|
|
||||||
"print_core_id": "AA 0.4"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"enabled": true,
|
|
||||||
"firmware_version": "5.1.2.20180807",
|
|
||||||
"friendly_name": "Master-Luke",
|
|
||||||
"ip_address": "10.183.1.140",
|
|
||||||
"machine_variant": "Ultimaker 3",
|
|
||||||
"status": "maintenance",
|
|
||||||
"unique_name": "ultimakersystem-ccbdd30044ec",
|
|
||||||
"uuid": "b3a47ea3-1eeb-4323-9626-6f9c3c888f9e"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"configuration": [
|
|
||||||
{
|
|
||||||
"extruder_index": 0,
|
|
||||||
"material": {
|
|
||||||
"brand": "Generic",
|
|
||||||
"color": "Generic",
|
|
||||||
"guid": "506c9f0d-e3aa-4bd4-b2d2-23e2425b1aa9",
|
|
||||||
"material": "PLA"
|
|
||||||
},
|
|
||||||
"print_core_id": "AA 0.4"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"extruder_index": 1,
|
|
||||||
"material": {
|
|
||||||
"brand": "Ultimaker",
|
|
||||||
"color": "Red",
|
|
||||||
"guid": "9cfe5bf1-bdc5-4beb-871a-52c70777842d",
|
|
||||||
"material": "PLA"
|
|
||||||
},
|
|
||||||
"print_core_id": "AA 0.4"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"enabled": true,
|
|
||||||
"firmware_version": "4.3.3.20180529",
|
|
||||||
"friendly_name": "UM-Marijn",
|
|
||||||
"ip_address": "10.183.1.166",
|
|
||||||
"machine_variant": "Ultimaker 3",
|
|
||||||
"status": "idle",
|
|
||||||
"unique_name": "ultimakersystem-ccbdd30058ab",
|
|
||||||
"uuid": "6e62c40a-4601-4b0e-9fec-c7c02c59c30a"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"print_jobs": [
|
|
||||||
{
|
|
||||||
"assigned_to": "6e62c40a-4601-4b0e-9fec-c7c02c59c30a",
|
|
||||||
"configuration": [
|
|
||||||
{
|
|
||||||
"extruder_index": 0,
|
|
||||||
"material": {
|
|
||||||
"brand": "Ultimaker",
|
|
||||||
"color": "Black",
|
|
||||||
"guid": "3ee70a86-77d8-4b87-8005-e4a1bc57d2ce",
|
|
||||||
"material": "PLA"
|
|
||||||
},
|
|
||||||
"print_core_id": "AA 0.4"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"constraints": {},
|
|
||||||
"created_at": "2018-12-10T08:28:04.108Z",
|
|
||||||
"force": false,
|
|
||||||
"last_seen": 500165.109491861,
|
|
||||||
"machine_variant": "Ultimaker 3",
|
|
||||||
"name": "UM3_dragon",
|
|
||||||
"network_error_count": 0,
|
|
||||||
"owner": "Daniel Testing",
|
|
||||||
"started": false,
|
|
||||||
"status": "queued",
|
|
||||||
"time_elapsed": 0,
|
|
||||||
"time_total": 14145,
|
|
||||||
"uuid": "d1c8bd52-5e9f-486a-8c25-a123cc8c7702"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,17 +0,0 @@
|
||||||
{
|
|
||||||
"data": [{
|
|
||||||
"cluster_id": "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq",
|
|
||||||
"host_guid": "e90ae0ac-1257-4403-91ee-a44c9b7e8050",
|
|
||||||
"host_name": "ultimakersystem-ccbdd30044ec",
|
|
||||||
"host_version": "5.0.0.20170101",
|
|
||||||
"is_online": true,
|
|
||||||
"status": "active"
|
|
||||||
}, {
|
|
||||||
"cluster_id": "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8",
|
|
||||||
"host_guid": "e0ace90a-91ee-1257-4403-e8050a44c9b7",
|
|
||||||
"host_name": "ultimakersystem-30044ecccbdd",
|
|
||||||
"host_version": "5.1.2.20180807",
|
|
||||||
"is_online": true,
|
|
||||||
"status": "active"
|
|
||||||
}]
|
|
||||||
}
|
|
|
@ -1,8 +0,0 @@
|
||||||
{
|
|
||||||
"data": {
|
|
||||||
"cluster_job_id": "9a59d8e9-91d3-4ff6-b4cb-9db91c4094dd",
|
|
||||||
"job_id": "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE=",
|
|
||||||
"status": "queued",
|
|
||||||
"generated_time": "2018-12-10T08:23:55.110Z"
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,9 +0,0 @@
|
||||||
{
|
|
||||||
"data": {
|
|
||||||
"content_type": "text/plain",
|
|
||||||
"job_id": "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE=",
|
|
||||||
"job_name": "Ultimaker Robot v3.0",
|
|
||||||
"status": "uploading",
|
|
||||||
"upload_url": "https://api.ultimaker.com/print-job-upload"
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,2 +0,0 @@
|
||||||
# Copyright (c) 2018 Ultimaker B.V.
|
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
|
|
@ -1,105 +0,0 @@
|
||||||
# Copyright (c) 2018 Ultimaker B.V.
|
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
|
||||||
import json
|
|
||||||
from typing import Dict, Tuple, Union, Optional, Any
|
|
||||||
from unittest.mock import MagicMock
|
|
||||||
|
|
||||||
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest
|
|
||||||
|
|
||||||
from UM.Logger import Logger
|
|
||||||
from UM.Signal import Signal
|
|
||||||
|
|
||||||
|
|
||||||
class FakeSignal:
|
|
||||||
def __init__(self):
|
|
||||||
self._callbacks = []
|
|
||||||
|
|
||||||
def connect(self, callback):
|
|
||||||
self._callbacks.append(callback)
|
|
||||||
|
|
||||||
def disconnect(self, callback):
|
|
||||||
self._callbacks.remove(callback)
|
|
||||||
|
|
||||||
def emit(self, *args, **kwargs):
|
|
||||||
for callback in self._callbacks:
|
|
||||||
callback(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
## This class can be used to mock the QNetworkManager class and test the code using it.
|
|
||||||
# After patching the QNetworkManager class, requests are prepared before they can be executed.
|
|
||||||
# Any requests not prepared beforehand will cause KeyErrors.
|
|
||||||
class NetworkManagerMock:
|
|
||||||
|
|
||||||
# An enumeration of the supported operations and their code for the network access manager.
|
|
||||||
_OPERATIONS = {
|
|
||||||
"GET": QNetworkAccessManager.GetOperation,
|
|
||||||
"POST": QNetworkAccessManager.PostOperation,
|
|
||||||
"PUT": QNetworkAccessManager.PutOperation,
|
|
||||||
"DELETE": QNetworkAccessManager.DeleteOperation,
|
|
||||||
"HEAD": QNetworkAccessManager.HeadOperation,
|
|
||||||
} # type: Dict[str, int]
|
|
||||||
|
|
||||||
## Initializes the network manager mock.
|
|
||||||
def __init__(self) -> None:
|
|
||||||
# A dict with the prepared replies, using the format {(http_method, url): reply}
|
|
||||||
self.replies = {} # type: Dict[Tuple[str, str], MagicMock]
|
|
||||||
self.request_bodies = {} # type: Dict[Tuple[str, str], bytes]
|
|
||||||
|
|
||||||
# Signals used in the network manager.
|
|
||||||
self.finished = Signal()
|
|
||||||
self.authenticationRequired = Signal()
|
|
||||||
|
|
||||||
## Mock implementation of the get, post, put, delete and head methods from the network manager.
|
|
||||||
# Since the methods are very simple and the same it didn't make sense to repeat the code.
|
|
||||||
# \param method: The method being called.
|
|
||||||
# \return The mocked function, if the method name is known. Defaults to the standard getattr function.
|
|
||||||
def __getattr__(self, method: str) -> Any:
|
|
||||||
## This mock implementation will simply return the reply from the prepared ones.
|
|
||||||
# it raises a KeyError if requests are done without being prepared.
|
|
||||||
def doRequest(request: QNetworkRequest, body: Optional[bytes] = None, *_):
|
|
||||||
key = method.upper(), request.url().toString()
|
|
||||||
if body:
|
|
||||||
self.request_bodies[key] = body
|
|
||||||
return self.replies[key]
|
|
||||||
|
|
||||||
operation = self._OPERATIONS.get(method.upper())
|
|
||||||
if operation:
|
|
||||||
return doRequest
|
|
||||||
|
|
||||||
# the attribute is not one of the implemented methods, default to the standard implementation.
|
|
||||||
return getattr(super(), method)
|
|
||||||
|
|
||||||
## Prepares a server reply for the given parameters.
|
|
||||||
# \param method: The HTTP method.
|
|
||||||
# \param url: The URL being requested.
|
|
||||||
# \param status_code: The HTTP status code for the response.
|
|
||||||
# \param response: The response body from the server (generally json-encoded).
|
|
||||||
def prepareReply(self, method: str, url: str, status_code: int, response: Union[bytes, dict]) -> None:
|
|
||||||
reply_mock = MagicMock()
|
|
||||||
reply_mock.url().toString.return_value = url
|
|
||||||
reply_mock.operation.return_value = self._OPERATIONS[method]
|
|
||||||
reply_mock.attribute.return_value = status_code
|
|
||||||
reply_mock.finished = FakeSignal()
|
|
||||||
reply_mock.isFinished.return_value = False
|
|
||||||
reply_mock.readAll.return_value = response if isinstance(response, bytes) else json.dumps(response).encode()
|
|
||||||
self.replies[method, url] = reply_mock
|
|
||||||
Logger.log("i", "Prepared mock {}-response to {} {}", status_code, method, url)
|
|
||||||
|
|
||||||
## Gets the request that was sent to the network manager for the given method and URL.
|
|
||||||
# \param method: The HTTP method.
|
|
||||||
# \param url: The URL.
|
|
||||||
def getRequestBody(self, method: str, url: str) -> Optional[bytes]:
|
|
||||||
return self.request_bodies.get((method.upper(), url))
|
|
||||||
|
|
||||||
## Emits the signal that the reply is ready to all prepared replies.
|
|
||||||
def flushReplies(self) -> None:
|
|
||||||
for key, reply in self.replies.items():
|
|
||||||
Logger.log("i", "Flushing reply to {} {}", *key)
|
|
||||||
reply.isFinished.return_value = True
|
|
||||||
reply.finished.emit()
|
|
||||||
self.finished.emit(reply)
|
|
||||||
self.reset()
|
|
||||||
|
|
||||||
## Deletes all prepared replies
|
|
||||||
def reset(self) -> None:
|
|
||||||
self.replies.clear()
|
|
|
@ -1,119 +0,0 @@
|
||||||
# Copyright (c) 2018 Ultimaker B.V.
|
|
||||||
# Copyright (c) 2018 Ultimaker B.V.
|
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
|
||||||
from typing import List
|
|
||||||
from unittest import TestCase
|
|
||||||
from unittest.mock import patch, MagicMock
|
|
||||||
|
|
||||||
from cura.UltimakerCloudAuthentication import CuraCloudAPIRoot
|
|
||||||
|
|
||||||
from ...src.Cloud import CloudApiClient
|
|
||||||
from ...src.Models.Http.CloudClusterResponse import CloudClusterResponse
|
|
||||||
from ...src.Models.Http.CloudClusterStatus import CloudClusterStatus
|
|
||||||
from ...src.Models.Http.CloudPrintJobResponse import CloudPrintJobResponse
|
|
||||||
from ...src.Models.Http.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
|
|
||||||
from ...src.Models.Http.CloudError import CloudError
|
|
||||||
|
|
||||||
from .Fixtures import readFixture, parseFixture
|
|
||||||
from .NetworkManagerMock import NetworkManagerMock
|
|
||||||
|
|
||||||
|
|
||||||
class TestCloudApiClient(TestCase):
|
|
||||||
maxDiff = None
|
|
||||||
|
|
||||||
def _errorHandler(self, errors: List[CloudError]):
|
|
||||||
raise Exception("Received unexpected error: {}".format(errors))
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super().setUp()
|
|
||||||
self.account = MagicMock()
|
|
||||||
self.account.isLoggedIn.return_value = True
|
|
||||||
|
|
||||||
self.network = NetworkManagerMock()
|
|
||||||
with patch.object(CloudApiClient, 'QNetworkAccessManager', return_value = self.network):
|
|
||||||
self.api = CloudApiClient.CloudApiClient(self.account, self._errorHandler)
|
|
||||||
|
|
||||||
def test_getClusters(self):
|
|
||||||
result = []
|
|
||||||
|
|
||||||
response = readFixture("getClusters")
|
|
||||||
data = parseFixture("getClusters")["data"]
|
|
||||||
|
|
||||||
self.network.prepareReply("GET", CuraCloudAPIRoot + "/connect/v1/clusters", 200, response)
|
|
||||||
# The callback is a function that adds the result of the call to getClusters to the result list
|
|
||||||
self.api.getClusters(lambda clusters: result.extend(clusters))
|
|
||||||
|
|
||||||
self.network.flushReplies()
|
|
||||||
|
|
||||||
self.assertEqual([CloudClusterResponse(**data[0]), CloudClusterResponse(**data[1])], result)
|
|
||||||
|
|
||||||
def test_getClusterStatus(self):
|
|
||||||
result = []
|
|
||||||
|
|
||||||
response = readFixture("getClusterStatusResponse")
|
|
||||||
data = parseFixture("getClusterStatusResponse")["data"]
|
|
||||||
|
|
||||||
url = CuraCloudAPIRoot + "/connect/v1/clusters/R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC/status"
|
|
||||||
self.network.prepareReply("GET", url, 200, response)
|
|
||||||
self.api.getClusterStatus("R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC", lambda s: result.append(s))
|
|
||||||
|
|
||||||
self.network.flushReplies()
|
|
||||||
|
|
||||||
self.assertEqual([CloudClusterStatus(**data)], result)
|
|
||||||
|
|
||||||
def test_requestUpload(self):
|
|
||||||
|
|
||||||
results = []
|
|
||||||
|
|
||||||
response = readFixture("putJobUploadResponse")
|
|
||||||
|
|
||||||
self.network.prepareReply("PUT", CuraCloudAPIRoot + "/cura/v1/jobs/upload", 200, response)
|
|
||||||
request = CloudPrintJobUploadRequest(job_name = "job name", file_size = 143234, content_type = "text/plain")
|
|
||||||
self.api.requestUpload(request, lambda r: results.append(r))
|
|
||||||
self.network.flushReplies()
|
|
||||||
|
|
||||||
self.assertEqual(["text/plain"], [r.content_type for r in results])
|
|
||||||
self.assertEqual(["uploading"], [r.status for r in results])
|
|
||||||
|
|
||||||
def test_uploadToolPath(self):
|
|
||||||
|
|
||||||
results = []
|
|
||||||
progress = MagicMock()
|
|
||||||
|
|
||||||
data = parseFixture("putJobUploadResponse")["data"]
|
|
||||||
upload_response = CloudPrintJobResponse(**data)
|
|
||||||
|
|
||||||
# Network client doesn't look into the reply
|
|
||||||
self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
|
|
||||||
|
|
||||||
mesh = ("1234" * 100000).encode()
|
|
||||||
self.api.uploadToolPath(upload_response, mesh, lambda: results.append("sent"), progress.advance, progress.error)
|
|
||||||
|
|
||||||
for _ in range(10):
|
|
||||||
self.network.flushReplies()
|
|
||||||
self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
|
|
||||||
|
|
||||||
self.assertEqual(["sent"], results)
|
|
||||||
|
|
||||||
def test_requestPrint(self):
|
|
||||||
|
|
||||||
results = []
|
|
||||||
|
|
||||||
response = readFixture("postJobPrintResponse")
|
|
||||||
|
|
||||||
cluster_id = "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8"
|
|
||||||
cluster_job_id = "9a59d8e9-91d3-4ff6-b4cb-9db91c4094dd"
|
|
||||||
job_id = "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE="
|
|
||||||
|
|
||||||
self.network.prepareReply("POST",
|
|
||||||
CuraCloudAPIRoot + "/connect/v1/clusters/{}/print/{}"
|
|
||||||
.format(cluster_id, job_id),
|
|
||||||
200, response)
|
|
||||||
|
|
||||||
self.api.requestPrint(cluster_id, job_id, lambda r: results.append(r))
|
|
||||||
|
|
||||||
self.network.flushReplies()
|
|
||||||
|
|
||||||
self.assertEqual([job_id], [r.job_id for r in results])
|
|
||||||
self.assertEqual([cluster_job_id], [r.cluster_job_id for r in results])
|
|
||||||
self.assertEqual(["queued"], [r.status for r in results])
|
|
|
@ -1,157 +0,0 @@
|
||||||
# Copyright (c) 2018 Ultimaker B.V.
|
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
|
||||||
import json
|
|
||||||
from unittest import TestCase
|
|
||||||
from unittest.mock import patch, MagicMock
|
|
||||||
|
|
||||||
from UM.Scene.SceneNode import SceneNode
|
|
||||||
from cura.UltimakerCloudAuthentication import CuraCloudAPIRoot
|
|
||||||
from cura.PrinterOutput.Models.PrinterOutputModel import PrinterOutputModel
|
|
||||||
from ...src.Cloud import CloudApiClient
|
|
||||||
from ...src.Cloud.CloudOutputDevice import CloudOutputDevice
|
|
||||||
from ...src.Models.Http.CloudClusterResponse import CloudClusterResponse
|
|
||||||
from .Fixtures import readFixture, parseFixture
|
|
||||||
from .NetworkManagerMock import NetworkManagerMock
|
|
||||||
|
|
||||||
|
|
||||||
class TestCloudOutputDevice(TestCase):
|
|
||||||
maxDiff = None
|
|
||||||
|
|
||||||
CLUSTER_ID = "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq"
|
|
||||||
JOB_ID = "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE="
|
|
||||||
HOST_NAME = "ultimakersystem-ccbdd30044ec"
|
|
||||||
HOST_GUID = "e90ae0ac-1257-4403-91ee-a44c9b7e8050"
|
|
||||||
HOST_VERSION = "5.2.0"
|
|
||||||
FRIENDLY_NAME = "My Friendly Printer"
|
|
||||||
|
|
||||||
STATUS_URL = "{}/connect/v1/clusters/{}/status".format(CuraCloudAPIRoot, CLUSTER_ID)
|
|
||||||
PRINT_URL = "{}/connect/v1/clusters/{}/print/{}".format(CuraCloudAPIRoot, CLUSTER_ID, JOB_ID)
|
|
||||||
REQUEST_UPLOAD_URL = "{}/cura/v1/jobs/upload".format(CuraCloudAPIRoot)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super().setUp()
|
|
||||||
self.app = MagicMock()
|
|
||||||
|
|
||||||
self.patches = [patch("UM.Qt.QtApplication.QtApplication.getInstance", return_value=self.app),
|
|
||||||
patch("UM.Application.Application.getInstance", return_value=self.app)]
|
|
||||||
for patched_method in self.patches:
|
|
||||||
patched_method.start()
|
|
||||||
|
|
||||||
self.cluster = CloudClusterResponse(self.CLUSTER_ID, self.HOST_GUID, self.HOST_NAME, is_online=True,
|
|
||||||
status="active", host_version=self.HOST_VERSION,
|
|
||||||
friendly_name=self.FRIENDLY_NAME)
|
|
||||||
|
|
||||||
self.network = NetworkManagerMock()
|
|
||||||
self.account = MagicMock(isLoggedIn=True, accessToken="TestAccessToken")
|
|
||||||
self.onError = MagicMock()
|
|
||||||
with patch.object(CloudApiClient, "QNetworkAccessManager", return_value = self.network):
|
|
||||||
self._api = CloudApiClient.CloudApiClient(self.account, self.onError)
|
|
||||||
|
|
||||||
self.device = CloudOutputDevice(self._api, self.cluster)
|
|
||||||
self.cluster_status = parseFixture("getClusterStatusResponse")
|
|
||||||
self.network.prepareReply("GET", self.STATUS_URL, 200, readFixture("getClusterStatusResponse"))
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
try:
|
|
||||||
super().tearDown()
|
|
||||||
self.network.flushReplies()
|
|
||||||
finally:
|
|
||||||
for patched_method in self.patches:
|
|
||||||
patched_method.stop()
|
|
||||||
|
|
||||||
# We test for these in order to make sure the correct file type is selected depending on the firmware version.
|
|
||||||
def test_properties(self):
|
|
||||||
self.assertEqual(self.device.firmwareVersion, self.HOST_VERSION)
|
|
||||||
self.assertEqual(self.device.name, self.FRIENDLY_NAME)
|
|
||||||
|
|
||||||
def test_status(self):
|
|
||||||
self.device._update()
|
|
||||||
self.network.flushReplies()
|
|
||||||
|
|
||||||
self.assertEqual([PrinterOutputModel, PrinterOutputModel], [type(printer) for printer in self.device.printers])
|
|
||||||
|
|
||||||
controller_fields = {
|
|
||||||
"_output_device": self.device,
|
|
||||||
"can_abort": True,
|
|
||||||
"can_control_manually": False,
|
|
||||||
"can_pause": True,
|
|
||||||
"can_pre_heat_bed": False,
|
|
||||||
"can_pre_heat_hotends": False,
|
|
||||||
"can_send_raw_gcode": False,
|
|
||||||
"can_update_firmware": False,
|
|
||||||
}
|
|
||||||
|
|
||||||
self.assertEqual({printer["uuid"] for printer in self.cluster_status["data"]["printers"]},
|
|
||||||
{printer.key for printer in self.device.printers})
|
|
||||||
self.assertEqual([controller_fields, controller_fields],
|
|
||||||
[printer.getController().__dict__ for printer in self.device.printers])
|
|
||||||
|
|
||||||
self.assertEqual(["UM3PrintJobOutputModel"], [type(printer).__name__ for printer in self.device.printJobs])
|
|
||||||
self.assertEqual({job["uuid"] for job in self.cluster_status["data"]["print_jobs"]},
|
|
||||||
{job.key for job in self.device.printJobs})
|
|
||||||
self.assertEqual({job["owner"] for job in self.cluster_status["data"]["print_jobs"]},
|
|
||||||
{job.owner for job in self.device.printJobs})
|
|
||||||
self.assertEqual({job["name"] for job in self.cluster_status["data"]["print_jobs"]},
|
|
||||||
{job.name for job in self.device.printJobs})
|
|
||||||
|
|
||||||
def test_remove_print_job(self):
|
|
||||||
self.device._update()
|
|
||||||
self.network.flushReplies()
|
|
||||||
self.assertEqual(1, len(self.device.printJobs))
|
|
||||||
|
|
||||||
self.cluster_status["data"]["print_jobs"].clear()
|
|
||||||
self.network.prepareReply("GET", self.STATUS_URL, 200, self.cluster_status)
|
|
||||||
|
|
||||||
self.device._last_request_time = None
|
|
||||||
self.device._update()
|
|
||||||
self.network.flushReplies()
|
|
||||||
self.assertEqual([], self.device.printJobs)
|
|
||||||
|
|
||||||
def test_remove_printers(self):
|
|
||||||
self.device._update()
|
|
||||||
self.network.flushReplies()
|
|
||||||
self.assertEqual(2, len(self.device.printers))
|
|
||||||
|
|
||||||
self.cluster_status["data"]["printers"].clear()
|
|
||||||
self.network.prepareReply("GET", self.STATUS_URL, 200, self.cluster_status)
|
|
||||||
|
|
||||||
self.device._last_request_time = None
|
|
||||||
self.device._update()
|
|
||||||
self.network.flushReplies()
|
|
||||||
self.assertEqual([], self.device.printers)
|
|
||||||
|
|
||||||
def test_print_to_cloud(self):
|
|
||||||
active_machine_mock = self.app.getGlobalContainerStack.return_value
|
|
||||||
active_machine_mock.getMetaDataEntry.side_effect = {"file_formats": "application/x-ufp"}.get
|
|
||||||
|
|
||||||
request_upload_response = parseFixture("putJobUploadResponse")
|
|
||||||
request_print_response = parseFixture("postJobPrintResponse")
|
|
||||||
self.network.prepareReply("PUT", self.REQUEST_UPLOAD_URL, 201, request_upload_response)
|
|
||||||
self.network.prepareReply("PUT", request_upload_response["data"]["upload_url"], 201, b"{}")
|
|
||||||
self.network.prepareReply("POST", self.PRINT_URL, 200, request_print_response)
|
|
||||||
|
|
||||||
file_handler = MagicMock()
|
|
||||||
file_handler.getSupportedFileTypesWrite.return_value = [{
|
|
||||||
"extension": "ufp",
|
|
||||||
"mime_type": "application/x-ufp",
|
|
||||||
"mode": 2
|
|
||||||
}, {
|
|
||||||
"extension": "gcode.gz",
|
|
||||||
"mime_type": "application/gzip",
|
|
||||||
"mode": 2,
|
|
||||||
}]
|
|
||||||
file_handler.getWriterByMimeType.return_value.write.side_effect = \
|
|
||||||
lambda stream, nodes: stream.write(str(nodes).encode())
|
|
||||||
|
|
||||||
scene_nodes = [SceneNode()]
|
|
||||||
expected_mesh = str(scene_nodes).encode()
|
|
||||||
self.device.requestWrite(scene_nodes, file_handler=file_handler, file_name="FileName")
|
|
||||||
|
|
||||||
self.network.flushReplies()
|
|
||||||
self.assertEqual(
|
|
||||||
{"data": {"content_type": "application/x-ufp", "file_size": len(expected_mesh), "job_name": "FileName"}},
|
|
||||||
json.loads(self.network.getRequestBody("PUT", self.REQUEST_UPLOAD_URL).decode())
|
|
||||||
)
|
|
||||||
self.assertEqual(expected_mesh,
|
|
||||||
self.network.getRequestBody("PUT", request_upload_response["data"]["upload_url"]))
|
|
||||||
self.assertIsNone(self.network.getRequestBody("POST", self.PRINT_URL))
|
|
|
@ -1,128 +0,0 @@
|
||||||
# Copyright (c) 2018 Ultimaker B.V.
|
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
|
||||||
from unittest import TestCase
|
|
||||||
from unittest.mock import patch, MagicMock
|
|
||||||
|
|
||||||
from UM.OutputDevice.OutputDeviceManager import OutputDeviceManager
|
|
||||||
from cura.UltimakerCloudAuthentication import CuraCloudAPIRoot
|
|
||||||
|
|
||||||
from ...src.Cloud import CloudApiClient
|
|
||||||
from ...src.Cloud import CloudOutputDeviceManager
|
|
||||||
from ...src.Models.Http.CloudClusterResponse import CloudClusterResponse
|
|
||||||
from .Fixtures import parseFixture, readFixture
|
|
||||||
|
|
||||||
from .NetworkManagerMock import NetworkManagerMock, FakeSignal
|
|
||||||
|
|
||||||
|
|
||||||
class TestCloudOutputDeviceManager(TestCase):
|
|
||||||
maxDiff = None
|
|
||||||
|
|
||||||
URL = CuraCloudAPIRoot + "/connect/v1/clusters"
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super().setUp()
|
|
||||||
self.app = MagicMock()
|
|
||||||
self.device_manager = OutputDeviceManager()
|
|
||||||
self.app.getOutputDeviceManager.return_value = self.device_manager
|
|
||||||
|
|
||||||
self.patches = [patch("UM.Qt.QtApplication.QtApplication.getInstance", return_value=self.app),
|
|
||||||
patch("UM.Application.Application.getInstance", return_value=self.app)]
|
|
||||||
for patched_method in self.patches:
|
|
||||||
patched_method.start()
|
|
||||||
|
|
||||||
self.network = NetworkManagerMock()
|
|
||||||
self.timer = MagicMock(timeout = FakeSignal())
|
|
||||||
with patch.object(CloudApiClient, "QNetworkAccessManager", return_value = self.network), \
|
|
||||||
patch.object(CloudOutputDeviceManager, "QTimer", return_value = self.timer):
|
|
||||||
self.manager = CloudOutputDeviceManager.CloudOutputDeviceManager()
|
|
||||||
self.clusters_response = parseFixture("getClusters")
|
|
||||||
self.network.prepareReply("GET", self.URL, 200, readFixture("getClusters"))
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
try:
|
|
||||||
self._beforeTearDown()
|
|
||||||
|
|
||||||
self.network.flushReplies()
|
|
||||||
self.manager.stop()
|
|
||||||
for patched_method in self.patches:
|
|
||||||
patched_method.stop()
|
|
||||||
finally:
|
|
||||||
super().tearDown()
|
|
||||||
|
|
||||||
## Before tear down method we check whether the state of the output device manager is what we expect based on the
|
|
||||||
# mocked API response.
|
|
||||||
def _beforeTearDown(self):
|
|
||||||
# let the network send replies
|
|
||||||
self.network.flushReplies()
|
|
||||||
# get the created devices
|
|
||||||
devices = self.device_manager.getOutputDevices()
|
|
||||||
# TODO: Check active device
|
|
||||||
|
|
||||||
response_clusters = []
|
|
||||||
for cluster in self.clusters_response.get("data", []):
|
|
||||||
response_clusters.append(CloudClusterResponse(**cluster).toDict())
|
|
||||||
manager_clusters = sorted([device.clusterData.toDict() for device in self.manager._remote_clusters.values()],
|
|
||||||
key=lambda cluster: cluster['cluster_id'], reverse=True)
|
|
||||||
self.assertEqual(response_clusters, manager_clusters)
|
|
||||||
|
|
||||||
## Runs the initial request to retrieve the clusters.
|
|
||||||
def _loadData(self):
|
|
||||||
self.manager.start()
|
|
||||||
self.network.flushReplies()
|
|
||||||
|
|
||||||
def test_device_is_created(self):
|
|
||||||
# just create the cluster, it is checked at tearDown
|
|
||||||
self._loadData()
|
|
||||||
|
|
||||||
def test_device_is_updated(self):
|
|
||||||
self._loadData()
|
|
||||||
|
|
||||||
# update the cluster from member variable, which is checked at tearDown
|
|
||||||
self.clusters_response["data"][0]["host_name"] = "New host name"
|
|
||||||
self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
|
|
||||||
|
|
||||||
self.manager._update_timer.timeout.emit()
|
|
||||||
|
|
||||||
def test_device_is_removed(self):
|
|
||||||
self._loadData()
|
|
||||||
|
|
||||||
# delete the cluster from member variable, which is checked at tearDown
|
|
||||||
del self.clusters_response["data"][1]
|
|
||||||
self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
|
|
||||||
|
|
||||||
self.manager._update_timer.timeout.emit()
|
|
||||||
|
|
||||||
def test_device_connects_by_cluster_id(self):
|
|
||||||
active_machine_mock = self.app.getGlobalContainerStack.return_value
|
|
||||||
cluster1, cluster2 = self.clusters_response["data"]
|
|
||||||
cluster_id = cluster1["cluster_id"]
|
|
||||||
active_machine_mock.getMetaDataEntry.side_effect = {"um_cloud_cluster_id": cluster_id}.get
|
|
||||||
|
|
||||||
self._loadData()
|
|
||||||
|
|
||||||
self.assertTrue(self.device_manager.getOutputDevice(cluster1["cluster_id"]).isConnected())
|
|
||||||
self.assertIsNone(self.device_manager.getOutputDevice(cluster2["cluster_id"]))
|
|
||||||
self.assertEqual([], active_machine_mock.setMetaDataEntry.mock_calls)
|
|
||||||
|
|
||||||
def test_device_connects_by_network_key(self):
|
|
||||||
active_machine_mock = self.app.getGlobalContainerStack.return_value
|
|
||||||
|
|
||||||
cluster1, cluster2 = self.clusters_response["data"]
|
|
||||||
network_key = cluster2["host_name"] + ".ultimaker.local"
|
|
||||||
active_machine_mock.getMetaDataEntry.side_effect = {"um_network_key": network_key}.get
|
|
||||||
|
|
||||||
self._loadData()
|
|
||||||
|
|
||||||
self.assertIsNone(self.device_manager.getOutputDevice(cluster1["cluster_id"]))
|
|
||||||
self.assertTrue(self.device_manager.getOutputDevice(cluster2["cluster_id"]).isConnected())
|
|
||||||
|
|
||||||
active_machine_mock.setMetaDataEntry.assert_called_with("um_cloud_cluster_id", cluster2["cluster_id"])
|
|
||||||
|
|
||||||
@patch.object(CloudOutputDeviceManager, "Message")
|
|
||||||
def test_api_error(self, message_mock):
|
|
||||||
self.clusters_response = {
|
|
||||||
"errors": [{"id": "notFound", "title": "Not found!", "http_status": "404", "code": "notFound"}]
|
|
||||||
}
|
|
||||||
self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
|
|
||||||
self._loadData()
|
|
||||||
message_mock.return_value.show.assert_called_once_with()
|
|
|
@ -1,2 +0,0 @@
|
||||||
# Copyright (c) 2018 Ultimaker B.V.
|
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
|
Loading…
Add table
Add a link
Reference in a new issue