STAR-322: Fixing style errors

This commit is contained in:
Daniel Schiavini 2018-12-11 11:56:36 +01:00
parent 7668801564
commit d54fc4182b
15 changed files with 69 additions and 72 deletions

View file

@ -31,7 +31,7 @@ class CloudApiClient(NetworkClient):
## Initializes a new cloud API client.
# \param account: The user's account object
# \param on_error: The callback to be called whenever we receive errors from the server.
def __init__(self, account: Account, on_error: Callable[[List[CloudErrorObject]], None]):
def __init__(self, account: Account, on_error: Callable[[List[CloudErrorObject]], None]) -> None:
super().__init__()
self._account = account
self._on_error = on_error

View file

@ -1,7 +1,7 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from datetime import datetime, timezone
from typing import Dict, Union, TypeVar, Type, List
from typing import Dict, Union, TypeVar, Type, List, Any
from ...Models import BaseModel
@ -21,7 +21,7 @@ class BaseCloudModel(BaseModel):
return type(self) != type(other) or self.toDict() != other.toDict()
## Converts the model into a serializable dictionary
def toDict(self) -> Dict[str, any]:
def toDict(self) -> Dict[str, Any]:
return self.__dict__
# Type variable used in the parse methods below, which should be a subclass of BaseModel.
@ -32,7 +32,7 @@ class BaseCloudModel(BaseModel):
# \param values: The value of the model, which is usually a dictionary, but may also be already parsed.
# \return An instance of the model_class given.
@staticmethod
def parseModel(model_class: Type[T], values: Union[T, Dict[str, any]]) -> T:
def parseModel(model_class: Type[T], values: Union[T, Dict[str, Any]]) -> T:
if isinstance(values, dict):
return model_class(**values)
return values
@ -42,7 +42,7 @@ class BaseCloudModel(BaseModel):
# \param values: The value of the list. Each value is usually a dictionary, but may also be already parsed.
# \return A list of instances of the model_class given.
@classmethod
def parseModels(cls, model_class: Type[T], values: List[Union[T, Dict[str, any]]]) -> List[T]:
def parseModels(cls, model_class: Type[T], values: List[Union[T, Dict[str, Any]]]) -> List[T]:
return [cls.parseModel(model_class, value) for value in values]
## Parses the given date string.

View file

@ -1,6 +1,6 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import List, Optional, Union, Dict
from typing import List, Optional, Union, Dict, Any
from cura.PrinterOutput.ConfigurationModel import ConfigurationModel
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputController import CloudOutputController
@ -36,12 +36,12 @@ class CloudClusterPrintJobStatus(BaseCloudModel):
# \param uuid: UUID of this print job. Should be used for identification purposes.
def __init__(self, created_at: str, force: bool, machine_variant: str, name: str, started: bool, status: str,
time_total: int, uuid: str,
configuration: List[Union[Dict[str, any], CloudClusterPrinterConfiguration]],
constraints: List[Union[Dict[str, any], CloudClusterPrintJobConstraints]],
configuration: List[Union[Dict[str, Any], CloudClusterPrinterConfiguration]],
constraints: List[Union[Dict[str, Any], CloudClusterPrintJobConstraints]],
last_seen: Optional[float] = None, network_error_count: Optional[int] = None,
owner: Optional[str] = None, printer_uuid: Optional[str] = None, time_elapsed: Optional[int] = None,
assigned_to: Optional[str] = None, **kwargs) -> None:
self.assigned_to = assigned_to # type: str
self.assigned_to = assigned_to
self.configuration = self.parseModels(CloudClusterPrinterConfiguration, configuration)
self.constraints = self.parseModels(CloudClusterPrintJobConstraints, constraints)
self.created_at = created_at

View file

@ -1,6 +1,6 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Union, Dict, Optional
from typing import Union, Dict, Optional, Any
from cura.PrinterOutput.ExtruderConfigurationModel import ExtruderConfigurationModel
from cura.PrinterOutput.ExtruderOutputModel import ExtruderOutputModel
@ -17,10 +17,10 @@ class CloudClusterPrinterConfiguration(BaseCloudModel):
# \param nozzle_diameter: The diameter of the print core at this position in millimeters, e.g. '0.4'.
# \param print_core_id: The type of print core inserted at this position, e.g. 'AA 0.4'.
def __init__(self, extruder_index: int,
material: Union[None, Dict[str, any], CloudClusterPrinterConfigurationMaterial],
material: Union[None, Dict[str, Any], CloudClusterPrinterConfigurationMaterial],
nozzle_diameter: Optional[str] = None, print_core_id: Optional[str] = None, **kwargs) -> None:
self.extruder_index = extruder_index
self.material = self.parseModel(CloudClusterPrinterConfigurationMaterial, material)
self.material = self.parseModel(CloudClusterPrinterConfigurationMaterial, material) if material else None
self.nozzle_diameter = nozzle_diameter
self.print_core_id = print_core_id
super().__init__(**kwargs)
@ -28,11 +28,16 @@ class CloudClusterPrinterConfiguration(BaseCloudModel):
## Updates the given output model.
# \param model - The output model to update.
def updateOutputModel(self, model: ExtruderOutputModel) -> None:
if self.print_core_id is not None:
model.updateHotendID(self.print_core_id)
if model.activeMaterial is None or model.activeMaterial.guid != self.material.guid:
if self.material:
active_material = model.activeMaterial
if active_material is None or active_material.guid != self.material.guid:
material = self.material.createOutputModel()
model.updateActiveMaterial(material)
else:
model.updateActiveMaterial(None)
## Creates a configuration model
def createConfigurationModel(self) -> ExtruderConfigurationModel:

View file

@ -1,6 +1,6 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import List, Union, Dict, Optional
from typing import List, Union, Dict, Optional, Any
from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
@ -24,7 +24,7 @@ class CloudClusterPrinterStatus(BaseCloudModel):
# \param reserved_by: A printer can be claimed by a specific print job.
def __init__(self, enabled: bool, firmware_version: str, friendly_name: str, ip_address: str, machine_variant: str,
status: str, unique_name: str, uuid: str,
configuration: List[Union[Dict[str, any], CloudClusterPrinterConfiguration]],
configuration: List[Union[Dict[str, Any], CloudClusterPrinterConfiguration]],
reserved_by: Optional[str] = None, **kwargs) -> None:
self.configuration = self.parseModels(CloudClusterPrinterConfiguration, configuration)

View file

@ -16,7 +16,7 @@ class CloudClusterResponse(BaseCloudModel):
# \param status: The status of the cluster authentication (active or inactive).
# \param host_version: The firmware version of the cluster host. This is where the Stardust client is running on.
def __init__(self, cluster_id: str, host_guid: str, host_name: str, is_online: bool, status: str,
host_version: Optional[str] = None, **kwargs):
host_version: Optional[str] = None, **kwargs) -> None:
self.cluster_id = cluster_id
self.host_guid = host_guid
self.host_name = host_name

View file

@ -1,7 +1,7 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from datetime import datetime
from typing import List, Dict, Union
from typing import List, Dict, Union, Any
from .CloudClusterPrinterStatus import CloudClusterPrinterStatus
from .CloudClusterPrintJobStatus import CloudClusterPrintJobStatus
@ -16,8 +16,8 @@ class CloudClusterStatus(BaseCloudModel):
# \param print_jobs: The latest status of each print job in the cluster.
# \param generated_time: The datetime when the object was generated on the server-side.
def __init__(self,
printers: List[Union[CloudClusterPrinterStatus, Dict[str, any]]],
print_jobs: List[Union[CloudClusterPrintJobStatus, Dict[str, any]]],
printers: List[Union[CloudClusterPrinterStatus, Dict[str, Any]]],
print_jobs: List[Union[CloudClusterPrintJobStatus, Dict[str, Any]]],
generated_time: Union[str, datetime],
**kwargs) -> None:
self.generated_time = self.parseDate(generated_time)

View file

@ -1,6 +1,6 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from typing import Dict, Optional
from typing import Dict, Optional, Any
from .BaseCloudModel import BaseCloudModel
@ -18,7 +18,7 @@ class CloudErrorObject(BaseCloudModel):
# \param http_status: The HTTP status code applicable to this problem, converted to string.
# \param meta: Non-standard meta-information about the error, depending on the error code.
def __init__(self, id: str, code: str, title: str, http_status: str, detail: Optional[str] = None,
meta: Optional[Dict[str, any]] = None, **kwargs) -> None:
meta: Optional[Dict[str, Any]] = None, **kwargs) -> None:
self.id = id
self.code = code
self.http_status = http_status

View file

@ -56,7 +56,8 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
self._number_of_extruders = 2
self._dummy_lambdas = ("", {}, io.BytesIO()) #type: Tuple[str, Dict, Union[io.StringIO, io.BytesIO]]
self._dummy_lambdas = ("", {}, io.BytesIO()
) # type: Tuple[str, Dict[str, Union[str, int, bool]], Union[io.StringIO, io.BytesIO]]
self._print_jobs = [] # type: List[UM3PrintJobOutputModel]
self._received_print_jobs = False # type: bool
@ -254,7 +255,8 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
# Treat upload progress as response. Uploading can take more than 10 seconds, so if we don't, we can get
# timeout responses if this happens.
self._last_response_time = time()
if self._progress_message and new_progress > self._progress_message.getProgress():
old_progress = self._progress_message.getProgress()
if self._progress_message and (old_progress is None or new_progress > old_progress):
self._progress_message.show() # Ensure that the message is visible.
self._progress_message.setProgress(bytes_sent / bytes_total * 100)
@ -345,28 +347,6 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
def getDateCompleted(self, time_remaining: int) -> str:
return formatDateCompleted(time_remaining)
@pyqtSlot(int, result = str)
def getDateCompleted(self, time_remaining: int) -> str:
current_time = time()
completed = datetime.fromtimestamp(current_time + time_remaining)
today = datetime.fromtimestamp(current_time)
# If finishing date is more than 7 days out, using "Mon Dec 3 at HH:MM" format
if completed.toordinal() > today.toordinal() + 7:
return completed.strftime("%a %b ") + "{day}".format(day=completed.day)
# If finishing date is within the next week, use "Monday at HH:MM" format
elif completed.toordinal() > today.toordinal() + 1:
return completed.strftime("%a")
# If finishing tomorrow, use "tomorrow at HH:MM" format
elif completed.toordinal() > today.toordinal():
return "tomorrow"
# If finishing today, use "today at HH:MM" format
else:
return "today"
@pyqtSlot(str)
def sendJobToTop(self, print_job_uuid: str) -> None:
# This function is part of the output device (and not of the printjob output model) as this type of operation

View file

@ -1,7 +1,7 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import io
from typing import Optional, Dict, Union, List
from typing import Optional, Dict, Union, List, cast
from UM.FileHandler.FileHandler import FileHandler
from UM.FileHandler.FileWriter import FileWriter
@ -26,7 +26,7 @@ class MeshFormatHandler:
def __init__(self, file_handler: Optional[FileHandler], firmware_version: str) -> None:
self._file_handler = file_handler or CuraApplication.getInstance().getMeshFileHandler()
self._preferred_format = self._getPreferredFormat(firmware_version)
self._writer = self._getWriter(self._preferred_format["mime_type"]) if self._preferred_format else None
self._writer = self._getWriter(self.mime_type) if self._preferred_format else None
@property
def is_valid(self) -> bool:
@ -47,32 +47,40 @@ class MeshFormatHandler:
@property
def mime_type(self) -> str:
return self._preferred_format["mime_type"]
return cast(str, self._preferred_format["mime_type"])
## Gets the file mode (FileWriter.OutputMode.TextMode or FileWriter.OutputMode.BinaryMode)
@property
def file_mode(self) -> int:
return self._preferred_format["mode"]
return cast(int, self._preferred_format["mode"])
## Gets the file extension
@property
def file_extension(self) -> str:
return self._preferred_format["extension"]
return cast(str, self._preferred_format["extension"])
## Creates the right kind of stream based on the preferred format.
def createStream(self) -> Union[io.BytesIO, io.StringIO]:
return io.StringIO() if self.file_mode == FileWriter.OutputMode.TextMode else io.BytesIO()
if self.file_mode == FileWriter.OutputMode.TextMode:
return io.StringIO()
else:
return io.BytesIO()
## Writes the mesh and returns its value.
def getBytes(self, nodes: List[SceneNode]) -> bytes:
if self.writer is None:
raise ValueError("There is no writer for the mesh format handler.")
stream = self.createStream()
self.writer.write(stream, nodes)
return stream.getvalue()
value = stream.getvalue()
if isinstance(value, str):
value = value.encode()
return value
## Chooses the preferred file format for the given file handler.
# \param firmware_version: The version of the firmware.
# \return A dict with the file format details.
def _getPreferredFormat(self, firmware_version: str) -> Optional[Dict[str, Union[str, int, bool]]]:
def _getPreferredFormat(self, firmware_version: str) -> Dict[str, Union[str, int, bool]]:
# Formats supported by this application (file types that we can actually write).
application = CuraApplication.getInstance()
@ -82,7 +90,7 @@ class MeshFormatHandler:
# Create a list from the supported file formats string.
if not global_stack:
Logger.log("e", "Missing global stack!")
return
return {}
machine_file_formats = global_stack.getMetaDataEntry("file_formats").split(";")
machine_file_formats = [file_type.strip() for file_type in machine_file_formats]

View file

@ -3,7 +3,7 @@
import json
import os
import urllib.parse
from typing import Dict, TYPE_CHECKING, Set
from typing import Dict, TYPE_CHECKING, Set, Optional
from PyQt5.QtNetwork import QNetworkReply, QNetworkRequest
@ -151,7 +151,7 @@ class SendMaterialJob(Job):
# \return a dictionary of ClusterMaterial objects by GUID
# \throw KeyError Raised when on of the materials does not include a valid guid
@classmethod
def _parseReply(cls, reply: QNetworkReply) -> Dict[str, ClusterMaterial]:
def _parseReply(cls, reply: QNetworkReply) -> Optional[Dict[str, ClusterMaterial]]:
try:
remote_materials = json.loads(reply.readAll().data().decode("utf-8"))
return {material["guid"]: ClusterMaterial(**material) for material in remote_materials}
@ -163,6 +163,7 @@ class SendMaterialJob(Job):
Logger.log("e", "Request material storage on printer: Printer's answer had an incorrect value.")
except TypeError:
Logger.log("e", "Request material storage on printer: Printer's answer was missing a required value.")
return None
## Retrieves a list of local materials
#
@ -184,7 +185,8 @@ class SendMaterialJob(Job):
local_material = LocalMaterial(**material)
if local_material.GUID not in result or \
local_material.version > result.get(local_material.GUID).version:
local_material.GUID not in result or \
local_material.version > result[local_material.GUID].version:
result[local_material.GUID] = local_material
except KeyError:

View file

@ -1,13 +1,12 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from PyQt5.QtCore import pyqtSignal, pyqtProperty, QObject, pyqtSlot
from typing import Optional, TYPE_CHECKING, List
from PyQt5.QtCore import QUrl
from PyQt5.QtGui import QImage
from typing import List
from PyQt5.QtCore import pyqtProperty, pyqtSignal
from cura.PrinterOutput.PrintJobOutputModel import PrintJobOutputModel
from cura.PrinterOutput.PrinterOutputController import PrinterOutputController
from .ConfigurationChangeModel import ConfigurationChangeModel

View file

@ -0,0 +1,2 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.

View file

@ -14,9 +14,6 @@ from UM.Signal import Signal
# After patching the QNetworkManager class, requests are prepared before they can be executed.
# Any requests not prepared beforehand will cause KeyErrors.
class NetworkManagerMock:
# signals used in the network manager.
finished = Signal()
authenticationRequired = Signal()
# an enumeration of the supported operations and their code for the network access manager.
_OPERATIONS = {
@ -33,6 +30,10 @@ class NetworkManagerMock:
self.replies = {} # type: Dict[Tuple[str, str], QNetworkReply]
self.request_bodies = {} # type: Dict[Tuple[str, str], bytes]
# signals used in the network manager.
self.finished = Signal()
self.authenticationRequired = Signal()
## Mock implementation of the get, post, put, delete and head methods from the network manager.
# Since the methods are very simple and the same it didn't make sense to repeat the code.
# \param method: The method being called.

View file

@ -2,22 +2,23 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import os
from typing import List
from unittest import TestCase
from unittest.mock import patch, MagicMock
from cura.CuraApplication import CuraApplication
from src.Cloud.CloudApiClient import CloudApiClient
from src.Cloud.CloudOutputDeviceManager import CloudOutputDeviceManager
from src.Cloud.Models.CloudPrintJobResponse import CloudPrintJobResponse
from src.Cloud.Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
from src.Cloud.Models.CloudErrorObject import CloudErrorObject
from tests.Cloud.Fixtures import readFixture, parseFixture
from .NetworkManagerMock import NetworkManagerMock
@patch("cura.NetworkClient.QNetworkAccessManager")
class TestCloudApiClient(TestCase):
def _errorHandler(self):
pass
def _errorHandler(self, errors: List[CloudErrorObject]):
raise Exception("Received unexpected error: {}".format(errors))
def setUp(self):
super().setUp()
@ -26,7 +27,6 @@ class TestCloudApiClient(TestCase):
self.app = CuraApplication.getInstance()
self.network = NetworkManagerMock()
self.manager = CloudOutputDeviceManager()
self.api = CloudApiClient(self.account, self._errorHandler)
def test_GetClusters(self, network_mock):