diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py index 1c9670d87f..65dde1e519 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py @@ -20,13 +20,15 @@ from ..Models.Http.CloudPrintJobResponse import CloudPrintJobResponse from ..Models.Http.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest from ..Models.Http.CloudPrintResponse import CloudPrintResponse -## The generic type variable used to document the methods below. CloudApiClientModel = TypeVar("CloudApiClientModel", bound=BaseModel) +"""The generic type variable used to document the methods below.""" -## The cloud API client is responsible for handling the requests and responses from the cloud. -# Each method should only handle models instead of exposing Any HTTP details. class CloudApiClient: + """The cloud API client is responsible for handling the requests and responses from the cloud. + + Each method should only handle models instead of exposing Any HTTP details. + """ # The cloud URL to use for this remote cluster. ROOT_PATH = UltimakerCloudAuthentication.CuraCloudAPIRoot @@ -36,54 +38,70 @@ class CloudApiClient: # In order to avoid garbage collection we keep the callbacks in this list. _anti_gc_callbacks = [] # type: List[Callable[[], None]] - ## Initializes a new cloud API client. - # \param account: The user's account object - # \param on_error: The callback to be called whenever we receive errors from the server. def __init__(self, account: Account, on_error: Callable[[List[CloudError]], None]) -> None: + """Initializes a new cloud API client. + + :param account: The user's account object + :param on_error: The callback to be called whenever we receive errors from the server. + """ super().__init__() self._manager = QNetworkAccessManager() self._account = account self._on_error = on_error self._upload = None # type: Optional[ToolPathUploader] - ## Gets the account used for the API. @property def account(self) -> Account: + """Gets the account used for the API.""" + return self._account - ## Retrieves all the clusters for the user that is currently logged in. - # \param on_finished: The function to be called after the result is parsed. def getClusters(self, on_finished: Callable[[List[CloudClusterResponse]], Any], failed: Callable) -> None: + """Retrieves all the clusters for the user that is currently logged in. + + :param on_finished: The function to be called after the result is parsed. + """ + url = "{}/clusters?status=active".format(self.CLUSTER_API_ROOT) reply = self._manager.get(self._createEmptyRequest(url)) self._addCallback(reply, on_finished, CloudClusterResponse, failed) - ## Retrieves the status of the given cluster. - # \param cluster_id: The ID of the cluster. - # \param on_finished: The function to be called after the result is parsed. def getClusterStatus(self, cluster_id: str, on_finished: Callable[[CloudClusterStatus], Any]) -> None: + """Retrieves the status of the given cluster. + + :param cluster_id: The ID of the cluster. + :param on_finished: The function to be called after the result is parsed. + """ + url = "{}/clusters/{}/status".format(self.CLUSTER_API_ROOT, cluster_id) reply = self._manager.get(self._createEmptyRequest(url)) self._addCallback(reply, on_finished, CloudClusterStatus) - ## Requests the cloud to register the upload of a print job mesh. - # \param request: The request object. - # \param on_finished: The function to be called after the result is parsed. def requestUpload(self, request: CloudPrintJobUploadRequest, on_finished: Callable[[CloudPrintJobResponse], Any]) -> None: + + """Requests the cloud to register the upload of a print job mesh. + + :param request: The request object. + :param on_finished: The function to be called after the result is parsed. + """ + url = "{}/jobs/upload".format(self.CURA_API_ROOT) body = json.dumps({"data": request.toDict()}) reply = self._manager.put(self._createEmptyRequest(url), body.encode()) self._addCallback(reply, on_finished, CloudPrintJobResponse) - ## Uploads a print job tool path to the cloud. - # \param print_job: The object received after requesting an upload with `self.requestUpload`. - # \param mesh: The tool path data to be uploaded. - # \param on_finished: The function to be called after the upload is successful. - # \param on_progress: A function to be called during upload progress. It receives a percentage (0-100). - # \param on_error: A function to be called if the upload fails. def uploadToolPath(self, print_job: CloudPrintJobResponse, mesh: bytes, on_finished: Callable[[], Any], on_progress: Callable[[int], Any], on_error: Callable[[], Any]): + """Uploads a print job tool path to the cloud. + + :param print_job: The object received after requesting an upload with `self.requestUpload`. + :param mesh: The tool path data to be uploaded. + :param on_finished: The function to be called after the upload is successful. + :param on_progress: A function to be called during upload progress. It receives a percentage (0-100). + :param on_error: A function to be called if the upload fails. + """ + self._upload = ToolPathUploader(self._manager, print_job, mesh, on_finished, on_progress, on_error) self._upload.start() @@ -96,20 +114,27 @@ class CloudApiClient: reply = self._manager.post(self._createEmptyRequest(url), b"") self._addCallback(reply, on_finished, CloudPrintResponse) - ## Send a print job action to the cluster for the given print job. - # \param cluster_id: The ID of the cluster. - # \param cluster_job_id: The ID of the print job within the cluster. - # \param action: The name of the action to execute. def doPrintJobAction(self, cluster_id: str, cluster_job_id: str, action: str, data: Optional[Dict[str, Any]] = None) -> None: + + """Send a print job action to the cluster for the given print job. + + :param cluster_id: The ID of the cluster. + :param cluster_job_id: The ID of the print job within the cluster. + :param action: The name of the action to execute. + """ + body = json.dumps({"data": data}).encode() if data else b"" url = "{}/clusters/{}/print_jobs/{}/action/{}".format(self.CLUSTER_API_ROOT, cluster_id, cluster_job_id, action) self._manager.post(self._createEmptyRequest(url), body) - ## We override _createEmptyRequest in order to add the user credentials. - # \param url: The URL to request - # \param content_type: The type of the body contents. def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest: + """We override _createEmptyRequest in order to add the user credentials. + + :param url: The URL to request + :param content_type: The type of the body contents. + """ + request = QNetworkRequest(QUrl(path)) if content_type: request.setHeader(QNetworkRequest.ContentTypeHeader, content_type) @@ -118,11 +143,14 @@ class CloudApiClient: request.setRawHeader(b"Authorization", "Bearer {}".format(access_token).encode()) return request - ## Parses the given JSON network reply into a status code and a dictionary, handling unexpected errors as well. - # \param reply: The reply from the server. - # \return A tuple with a status code and a dictionary. @staticmethod def _parseReply(reply: QNetworkReply) -> Tuple[int, Dict[str, Any]]: + """Parses the given JSON network reply into a status code and a dictionary, handling unexpected errors as well. + + :param reply: The reply from the server. + :return: A tuple with a status code and a dictionary. + """ + status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) try: response = bytes(reply.readAll()).decode() @@ -133,14 +161,15 @@ class CloudApiClient: Logger.logException("e", "Could not parse the stardust response: %s", error.toDict()) return status_code, {"errors": [error.toDict()]} - ## Parses the given models and calls the correct callback depending on the result. - # \param response: The response from the server, after being converted to a dict. - # \param on_finished: The callback in case the response is successful. - # \param model_class: The type of the model to convert the response to. It may either be a single record or a list. - def _parseModels(self, response: Dict[str, Any], - on_finished: Union[Callable[[CloudApiClientModel], Any], - Callable[[List[CloudApiClientModel]], Any]], - model_class: Type[CloudApiClientModel]) -> None: + def _parseModels(self, response: Dict[str, Any], on_finished: Union[Callable[[CloudApiClientModel], Any], + Callable[[List[CloudApiClientModel]], Any]], model_class: Type[CloudApiClientModel]) -> None: + """Parses the given models and calls the correct callback depending on the result. + + :param response: The response from the server, after being converted to a dict. + :param on_finished: The callback in case the response is successful. + :param model_class: The type of the model to convert the response to. It may either be a single record or a list. + """ + if "data" in response: data = response["data"] if isinstance(data, list): @@ -156,18 +185,21 @@ class CloudApiClient: else: Logger.log("e", "Cannot find data or errors in the cloud response: %s", response) - ## Creates a callback function so that it includes the parsing of the response into the correct model. - # The callback is added to the 'finished' signal of the reply. - # \param reply: The reply that should be listened to. - # \param on_finished: The callback in case the response is successful. Depending on the endpoint it will be either - # a list or a single item. - # \param model: The type of the model to convert the response to. def _addCallback(self, reply: QNetworkReply, on_finished: Union[Callable[[CloudApiClientModel], Any], Callable[[List[CloudApiClientModel]], Any]], model: Type[CloudApiClientModel], on_error: Optional[Callable] = None) -> None: + """Creates a callback function so that it includes the parsing of the response into the correct model. + + The callback is added to the 'finished' signal of the reply. + :param reply: The reply that should be listened to. + :param on_finished: The callback in case the response is successful. Depending on the endpoint it will be either + a list or a single item. + :param model: The type of the model to convert the response to. + """ + def parse() -> None: self._anti_gc_callbacks.remove(parse) diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py index 64638a0a1e..939b9cce23 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py @@ -35,11 +35,13 @@ from ..Models.Http.ClusterPrintJobStatus import ClusterPrintJobStatus I18N_CATALOG = i18nCatalog("cura") -## The cloud output device is a network output device that works remotely but has limited functionality. -# Currently it only supports viewing the printer and print job status and adding a new job to the queue. -# As such, those methods have been implemented here. -# Note that this device represents a single remote cluster, not a list of multiple clusters. class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): + """The cloud output device is a network output device that works remotely but has limited functionality. + + Currently it only supports viewing the printer and print job status and adding a new job to the queue. + As such, those methods have been implemented here. + Note that this device represents a single remote cluster, not a list of multiple clusters. + """ # The interval with which the remote cluster is checked. # We can do this relatively often as this API call is quite fast. @@ -56,11 +58,13 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): # Therefore we create a private signal used to trigger the printersChanged signal. _cloudClusterPrintersChanged = pyqtSignal() - ## Creates a new cloud output device - # \param api_client: The client that will run the API calls - # \param cluster: The device response received from the cloud API. - # \param parent: The optional parent of this output device. def __init__(self, api_client: CloudApiClient, cluster: CloudClusterResponse, parent: QObject = None) -> None: + """Creates a new cloud output device + + :param api_client: The client that will run the API calls + :param cluster: The device response received from the cloud API. + :param parent: The optional parent of this output device. + """ # The following properties are expected on each networked output device. # Because the cloud connection does not off all of these, we manually construct this version here. @@ -99,8 +103,9 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): self._tool_path = None # type: Optional[bytes] self._uploaded_print_job = None # type: Optional[CloudPrintJobResponse] - ## Connects this device. def connect(self) -> None: + """Connects this device.""" + if self.isConnected(): return super().connect() @@ -108,21 +113,24 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): CuraApplication.getInstance().getBackend().backendStateChange.connect(self._onBackendStateChange) self._update() - ## Disconnects the device def disconnect(self) -> None: + """Disconnects the device""" + if not self.isConnected(): return super().disconnect() Logger.log("i", "Disconnected from cluster %s", self.key) CuraApplication.getInstance().getBackend().backendStateChange.disconnect(self._onBackendStateChange) - ## Resets the print job that was uploaded to force a new upload, runs whenever the user re-slices. def _onBackendStateChange(self, _: BackendState) -> None: + """Resets the print job that was uploaded to force a new upload, runs whenever the user re-slices.""" + self._tool_path = None self._uploaded_print_job = None - ## Checks whether the given network key is found in the cloud's host name def matchesNetworkKey(self, network_key: str) -> bool: + """Checks whether the given network key is found in the cloud's host name""" + # Typically, a network key looks like "ultimakersystem-aabbccdd0011._ultimaker._tcp.local." # the host name should then be "ultimakersystem-aabbccdd0011" if network_key.startswith(str(self.clusterData.host_name or "")): @@ -133,15 +141,17 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): return True return False - ## Set all the interface elements and texts for this output device. def _setInterfaceElements(self) -> None: + """Set all the interface elements and texts for this output device.""" + self.setPriority(2) # Make sure we end up below the local networking and above 'save to file'. self.setShortDescription(I18N_CATALOG.i18nc("@action:button", "Print via Cloud")) self.setDescription(I18N_CATALOG.i18nc("@properties:tooltip", "Print via Cloud")) self.setConnectionText(I18N_CATALOG.i18nc("@info:status", "Connected via Cloud")) - ## Called when the network data should be updated. def _update(self) -> None: + """Called when the network data should be updated.""" + super()._update() if time() - self._time_of_last_request < self.CHECK_CLUSTER_INTERVAL: return # avoid calling the cloud too often @@ -153,9 +163,11 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): else: self.setAuthenticationState(AuthState.NotAuthenticated) - ## Method called when HTTP request to status endpoint is finished. - # Contains both printers and print jobs statuses in a single response. def _onStatusCallFinished(self, status: CloudClusterStatus) -> None: + """Method called when HTTP request to status endpoint is finished. + + Contains both printers and print jobs statuses in a single response. + """ self._responseReceived() if status.printers != self._received_printers: self._received_printers = status.printers @@ -164,10 +176,11 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): self._received_print_jobs = status.print_jobs self._updatePrintJobs(status.print_jobs) - ## Called when Cura requests an output device to receive a (G-code) file. def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, filter_by_machine: bool = False, **kwargs) -> None: + """Called when Cura requests an output device to receive a (G-code) file.""" + # Show an error message if we're already sending a job. if self._progress.visible: PrintJobUploadBlockedMessage().show() @@ -187,9 +200,11 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): job.finished.connect(self._onPrintJobCreated) job.start() - ## Handler for when the print job was created locally. - # It can now be sent over the cloud. def _onPrintJobCreated(self, job: ExportFileJob) -> None: + """Handler for when the print job was created locally. + + It can now be sent over the cloud. + """ output = job.getOutput() self._tool_path = output # store the tool path to prevent re-uploading when printing the same file again file_name = job.getFileName() @@ -200,9 +215,11 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): ) self._api.requestUpload(request, self._uploadPrintJob) - ## Uploads the mesh when the print job was registered with the cloud API. - # \param job_response: The response received from the cloud API. def _uploadPrintJob(self, job_response: CloudPrintJobResponse) -> None: + """Uploads the mesh when the print job was registered with the cloud API. + + :param job_response: The response received from the cloud API. + """ if not self._tool_path: return self._onUploadError() self._progress.show() @@ -210,38 +227,45 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): self._api.uploadToolPath(job_response, self._tool_path, self._onPrintJobUploaded, self._progress.update, self._onUploadError) - ## Requests the print to be sent to the printer when we finished uploading the mesh. def _onPrintJobUploaded(self) -> None: + """Requests the print to be sent to the printer when we finished uploading the mesh.""" + self._progress.update(100) print_job = cast(CloudPrintJobResponse, self._uploaded_print_job) self._api.requestPrint(self.key, print_job.job_id, self._onPrintUploadCompleted) - ## Shows a message when the upload has succeeded - # \param response: The response from the cloud API. def _onPrintUploadCompleted(self, response: CloudPrintResponse) -> None: + """Shows a message when the upload has succeeded + + :param response: The response from the cloud API. + """ self._progress.hide() PrintJobUploadSuccessMessage().show() self.writeFinished.emit() - ## Displays the given message if uploading the mesh has failed - # \param message: The message to display. def _onUploadError(self, message: str = None) -> None: + """Displays the given message if uploading the mesh has failed + + :param message: The message to display. + """ self._progress.hide() self._uploaded_print_job = None PrintJobUploadErrorMessage(message).show() self.writeError.emit() - ## Whether the printer that this output device represents supports print job actions via the cloud. @pyqtProperty(bool, notify=_cloudClusterPrintersChanged) def supportsPrintJobActions(self) -> bool: + """Whether the printer that this output device represents supports print job actions via the cloud.""" + if not self._printers: return False version_number = self.printers[0].firmwareVersion.split(".") firmware_version = Version([version_number[0], version_number[1], version_number[2]]) return firmware_version >= self.PRINT_JOB_ACTIONS_MIN_VERSION - ## Set the remote print job state. def setJobState(self, print_job_uuid: str, state: str) -> None: + """Set the remote print job state.""" + self._api.doPrintJobAction(self._cluster.cluster_id, print_job_uuid, state) @pyqtSlot(str, name="sendJobToTop") @@ -265,18 +289,21 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice): def openPrinterControlPanel(self) -> None: QDesktopServices.openUrl(QUrl(self.clusterCloudUrl)) - ## Gets the cluster response from which this device was created. @property def clusterData(self) -> CloudClusterResponse: + """Gets the cluster response from which this device was created.""" + return self._cluster - ## Updates the cluster data from the cloud. @clusterData.setter def clusterData(self, value: CloudClusterResponse) -> None: + """Updates the cluster data from the cloud.""" + self._cluster = value - ## Gets the URL on which to monitor the cluster via the cloud. @property def clusterCloudUrl(self) -> str: + """Gets the URL on which to monitor the cluster via the cloud.""" + root_url_prefix = "-staging" if self._account.is_staging else "" return "https://mycloud{}.ultimaker.com/app/jobs/{}".format(root_url_prefix, self.clusterData.cluster_id) diff --git a/plugins/UM3NetworkPrinting/src/Cloud/ToolPathUploader.py b/plugins/UM3NetworkPrinting/src/Cloud/ToolPathUploader.py index 6aa341c0e5..5de2378042 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/ToolPathUploader.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/ToolPathUploader.py @@ -10,8 +10,9 @@ from UM.Logger import Logger from ..Models.Http.CloudPrintJobResponse import CloudPrintJobResponse -## Class responsible for uploading meshes to the cloud in separate requests. class ToolPathUploader: + """Class responsible for uploading meshes to the cloud in separate requests.""" + # The maximum amount of times to retry if the server returns one of the RETRY_HTTP_CODES MAX_RETRIES = 10 @@ -22,16 +23,19 @@ class ToolPathUploader: # The amount of bytes to send per request BYTES_PER_REQUEST = 256 * 1024 - ## Creates a mesh upload object. - # \param manager: The network access manager that will handle the HTTP requests. - # \param print_job: The print job response that was returned by the cloud after registering the upload. - # \param data: The mesh bytes to be uploaded. - # \param on_finished: The method to be called when done. - # \param on_progress: The method to be called when the progress changes (receives a percentage 0-100). - # \param on_error: The method to be called when an error occurs. def __init__(self, manager: QNetworkAccessManager, print_job: CloudPrintJobResponse, data: bytes, on_finished: Callable[[], Any], on_progress: Callable[[int], Any], on_error: Callable[[], Any] ) -> None: + """Creates a mesh upload object. + + :param manager: The network access manager that will handle the HTTP requests. + :param print_job: The print job response that was returned by the cloud after registering the upload. + :param data: The mesh bytes to be uploaded. + :param on_finished: The method to be called when done. + :param on_progress: The method to be called when the progress changes (receives a percentage 0-100). + :param on_error: The method to be called when an error occurs. + """ + self._manager = manager self._print_job = print_job self._data = data @@ -45,13 +49,15 @@ class ToolPathUploader: self._finished = False self._reply = None # type: Optional[QNetworkReply] - ## Returns the print job for which this object was created. @property def printJob(self): + """Returns the print job for which this object was created.""" + return self._print_job - ## Creates a network request to the print job upload URL, adding the needed content range header. def _createRequest(self) -> QNetworkRequest: + """Creates a network request to the print job upload URL, adding the needed content range header.""" + request = QNetworkRequest(QUrl(self._print_job.upload_url)) request.setHeader(QNetworkRequest.ContentTypeHeader, self._print_job.content_type) @@ -62,14 +68,17 @@ class ToolPathUploader: return request - ## Determines the bytes that should be uploaded next. - # \return: A tuple with the first and the last byte to upload. def _chunkRange(self) -> Tuple[int, int]: + """Determines the bytes that should be uploaded next. + + :return: A tuple with the first and the last byte to upload. + """ last_byte = min(len(self._data), self._sent_bytes + self.BYTES_PER_REQUEST) return self._sent_bytes, last_byte - ## Starts uploading the mesh. def start(self) -> None: + """Starts uploading the mesh.""" + if self._finished: # reset state. self._sent_bytes = 0 @@ -77,13 +86,15 @@ class ToolPathUploader: self._finished = False self._uploadChunk() - ## Stops uploading the mesh, marking it as finished. def stop(self): + """Stops uploading the mesh, marking it as finished.""" + Logger.log("i", "Stopped uploading") self._finished = True - ## Uploads a chunk of the mesh to the cloud. def _uploadChunk(self) -> None: + """Uploads a chunk of the mesh to the cloud.""" + if self._finished: raise ValueError("The upload is already finished") @@ -96,25 +107,29 @@ class ToolPathUploader: self._reply.uploadProgress.connect(self._progressCallback) self._reply.error.connect(self._errorCallback) - ## Handles an update to the upload progress - # \param bytes_sent: The amount of bytes sent in the current request. - # \param bytes_total: The amount of bytes to send in the current request. def _progressCallback(self, bytes_sent: int, bytes_total: int) -> None: + """Handles an update to the upload progress + + :param bytes_sent: The amount of bytes sent in the current request. + :param bytes_total: The amount of bytes to send in the current request. + """ Logger.log("i", "Progress callback %s / %s", bytes_sent, bytes_total) if bytes_total: total_sent = self._sent_bytes + bytes_sent self._on_progress(int(total_sent / len(self._data) * 100)) - ## Handles an error uploading. def _errorCallback(self) -> None: + """Handles an error uploading.""" + reply = cast(QNetworkReply, self._reply) body = bytes(reply.readAll()).decode() Logger.log("e", "Received error while uploading: %s", body) self.stop() self._on_error() - ## Checks whether a chunk of data was uploaded successfully, starting the next chunk if needed. def _finishedCallback(self) -> None: + """Checks whether a chunk of data was uploaded successfully, starting the next chunk if needed.""" + reply = cast(QNetworkReply, self._reply) Logger.log("i", "Finished callback %s %s", reply.attribute(QNetworkRequest.HttpStatusCodeAttribute), reply.url().toString()) @@ -140,8 +155,9 @@ class ToolPathUploader: [bytes(header).decode() for header in reply.rawHeaderList()], bytes(reply.readAll()).decode()) self._chunkUploaded() - ## Handles a chunk of data being uploaded, starting the next chunk if needed. def _chunkUploaded(self) -> None: + """Handles a chunk of data being uploaded, starting the next chunk if needed.""" + # We got a successful response. Let's start the next chunk or report the upload is finished. first_byte, last_byte = self._chunkRange() self._sent_bytes += last_byte - first_byte diff --git a/plugins/UM3NetworkPrinting/src/ExportFileJob.py b/plugins/UM3NetworkPrinting/src/ExportFileJob.py index 56d15bc835..6fde08cc5f 100644 --- a/plugins/UM3NetworkPrinting/src/ExportFileJob.py +++ b/plugins/UM3NetworkPrinting/src/ExportFileJob.py @@ -9,8 +9,8 @@ from cura.CuraApplication import CuraApplication from .MeshFormatHandler import MeshFormatHandler -## Job that exports the build plate to the correct file format for the target cluster. class ExportFileJob(WriteFileJob): + """Job that exports the build plate to the correct file format for the target cluster.""" def __init__(self, file_handler: Optional[FileHandler], nodes: List[SceneNode], firmware_version: str) -> None: @@ -27,12 +27,14 @@ class ExportFileJob(WriteFileJob): extension = self._mesh_format_handler.preferred_format.get("extension", "") self.setFileName("{}.{}".format(job_name, extension)) - ## Get the mime type of the selected export file type. def getMimeType(self) -> str: + """Get the mime type of the selected export file type.""" + return self._mesh_format_handler.mime_type - ## Get the job result as bytes as that is what we need to upload to the cluster. def getOutput(self) -> bytes: + """Get the job result as bytes as that is what we need to upload to the cluster.""" + output = self.getStream().getvalue() if isinstance(output, str): output = output.encode("utf-8") diff --git a/plugins/UM3NetworkPrinting/src/MeshFormatHandler.py b/plugins/UM3NetworkPrinting/src/MeshFormatHandler.py index 9927bf744e..0287d72eb6 100644 --- a/plugins/UM3NetworkPrinting/src/MeshFormatHandler.py +++ b/plugins/UM3NetworkPrinting/src/MeshFormatHandler.py @@ -16,8 +16,9 @@ from cura.CuraApplication import CuraApplication I18N_CATALOG = i18nCatalog("cura") -## This class is responsible for choosing the formats used by the connected clusters. class MeshFormatHandler: + """This class is responsible for choosing the formats used by the connected clusters.""" + def __init__(self, file_handler: Optional[FileHandler], firmware_version: str) -> None: self._file_handler = file_handler or CuraApplication.getInstance().getMeshFileHandler() @@ -28,42 +29,50 @@ class MeshFormatHandler: def is_valid(self) -> bool: return bool(self._writer) - ## Chooses the preferred file format. - # \return A dict with the file format details, with the following keys: - # {id: str, extension: str, description: str, mime_type: str, mode: int, hide_in_file_dialog: bool} @property def preferred_format(self) -> Dict[str, Union[str, int, bool]]: + """Chooses the preferred file format. + + :return: A dict with the file format details, with the following keys: + {id: str, extension: str, description: str, mime_type: str, mode: int, hide_in_file_dialog: bool} + """ return self._preferred_format - ## Gets the file writer for the given file handler and mime type. - # \return A file writer. @property def writer(self) -> Optional[FileWriter]: + """Gets the file writer for the given file handler and mime type. + + :return: A file writer. + """ return self._writer @property def mime_type(self) -> str: return cast(str, self._preferred_format["mime_type"]) - ## Gets the file mode (FileWriter.OutputMode.TextMode or FileWriter.OutputMode.BinaryMode) @property def file_mode(self) -> int: + """Gets the file mode (FileWriter.OutputMode.TextMode or FileWriter.OutputMode.BinaryMode)""" + return cast(int, self._preferred_format["mode"]) - ## Gets the file extension @property def file_extension(self) -> str: + """Gets the file extension""" + return cast(str, self._preferred_format["extension"]) - ## Creates the right kind of stream based on the preferred format. def createStream(self) -> Union[io.BytesIO, io.StringIO]: + """Creates the right kind of stream based on the preferred format.""" + if self.file_mode == FileWriter.OutputMode.TextMode: return io.StringIO() else: return io.BytesIO() - ## Writes the mesh and returns its value. def getBytes(self, nodes: List[SceneNode]) -> bytes: + """Writes the mesh and returns its value.""" + if self.writer is None: raise ValueError("There is no writer for the mesh format handler.") stream = self.createStream() @@ -73,10 +82,12 @@ class MeshFormatHandler: value = value.encode() return value - ## Chooses the preferred file format for the given file handler. - # \param firmware_version: The version of the firmware. - # \return A dict with the file format details. def _getPreferredFormat(self, firmware_version: str) -> Dict[str, Union[str, int, bool]]: + """Chooses the preferred file format for the given file handler. + + :param firmware_version: The version of the firmware. + :return: A dict with the file format details. + """ # Formats supported by this application (file types that we can actually write). application = CuraApplication.getInstance() @@ -108,9 +119,11 @@ class MeshFormatHandler: ) return file_formats[0] - ## Gets the file writer for the given file handler and mime type. - # \param mime_type: The mine type. - # \return A file writer. def _getWriter(self, mime_type: str) -> Optional[FileWriter]: + """Gets the file writer for the given file handler and mime type. + + :param mime_type: The mine type. + :return: A file writer. + """ # Just take the first file format available. return self._file_handler.getWriterByMimeType(mime_type) diff --git a/plugins/UM3NetworkPrinting/src/Messages/LegacyDeviceNoLongerSupportedMessage.py b/plugins/UM3NetworkPrinting/src/Messages/LegacyDeviceNoLongerSupportedMessage.py index f4132dbcbc..146767467a 100644 --- a/plugins/UM3NetworkPrinting/src/Messages/LegacyDeviceNoLongerSupportedMessage.py +++ b/plugins/UM3NetworkPrinting/src/Messages/LegacyDeviceNoLongerSupportedMessage.py @@ -7,12 +7,12 @@ from UM.Message import Message I18N_CATALOG = i18nCatalog("cura") -## Message shown when trying to connect to a legacy printer device. class LegacyDeviceNoLongerSupportedMessage(Message): - - # Singleton used to prevent duplicate messages of this type at the same time. + """Message shown when trying to connect to a legacy printer device.""" + __is_visible = False - + """Singleton used to prevent duplicate messages of this type at the same time.""" + def __init__(self) -> None: super().__init__( text = I18N_CATALOG.i18nc("@info:status", "You are attempting to connect to a printer that is not " diff --git a/plugins/UM3NetworkPrinting/src/Messages/MaterialSyncMessage.py b/plugins/UM3NetworkPrinting/src/Messages/MaterialSyncMessage.py index e021b2ae99..6b481ff4a1 100644 --- a/plugins/UM3NetworkPrinting/src/Messages/MaterialSyncMessage.py +++ b/plugins/UM3NetworkPrinting/src/Messages/MaterialSyncMessage.py @@ -13,11 +13,11 @@ if TYPE_CHECKING: I18N_CATALOG = i18nCatalog("cura") -## Message shown when sending material files to cluster host. class MaterialSyncMessage(Message): + """Message shown when sending material files to cluster host.""" - # Singleton used to prevent duplicate messages of this type at the same time. __is_visible = False + """Singleton used to prevent duplicate messages of this type at the same time.""" def __init__(self, device: "UltimakerNetworkedPrinterOutputDevice") -> None: super().__init__( diff --git a/plugins/UM3NetworkPrinting/src/Messages/NotClusterHostMessage.py b/plugins/UM3NetworkPrinting/src/Messages/NotClusterHostMessage.py index 77d7995fc7..70bfa769ee 100644 --- a/plugins/UM3NetworkPrinting/src/Messages/NotClusterHostMessage.py +++ b/plugins/UM3NetworkPrinting/src/Messages/NotClusterHostMessage.py @@ -16,11 +16,11 @@ if TYPE_CHECKING: I18N_CATALOG = i18nCatalog("cura") -## Message shown when trying to connect to a printer that is not a host. class NotClusterHostMessage(Message): + """Message shown when trying to connect to a printer that is not a host.""" - # Singleton used to prevent duplicate messages of this type at the same time. __is_visible = False + """Singleton used to prevent duplicate messages of this type at the same time.""" def __init__(self, device: "UltimakerNetworkedPrinterOutputDevice") -> None: super().__init__( diff --git a/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadBlockedMessage.py b/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadBlockedMessage.py index be00292559..39dc985cb8 100644 --- a/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadBlockedMessage.py +++ b/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadBlockedMessage.py @@ -7,9 +7,9 @@ from UM.Message import Message I18N_CATALOG = i18nCatalog("cura") -## Message shown when uploading a print job to a cluster is blocked because another upload is already in progress. class PrintJobUploadBlockedMessage(Message): - + """Message shown when uploading a print job to a cluster is blocked because another upload is already in progress.""" + def __init__(self) -> None: super().__init__( text = I18N_CATALOG.i18nc("@info:status", "Please wait until the current job has been sent."), diff --git a/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadErrorMessage.py b/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadErrorMessage.py index bb26a84953..5145844ea7 100644 --- a/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadErrorMessage.py +++ b/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadErrorMessage.py @@ -7,9 +7,9 @@ from UM.Message import Message I18N_CATALOG = i18nCatalog("cura") -## Message shown when uploading a print job to a cluster failed. class PrintJobUploadErrorMessage(Message): - + """Message shown when uploading a print job to a cluster failed.""" + def __init__(self, message: str = None) -> None: super().__init__( text = message or I18N_CATALOG.i18nc("@info:text", "Could not upload the data to the printer."), diff --git a/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadProgressMessage.py b/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadProgressMessage.py index bdbab008e3..b7ddf7f550 100644 --- a/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadProgressMessage.py +++ b/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadProgressMessage.py @@ -7,8 +7,9 @@ from UM.Message import Message I18N_CATALOG = i18nCatalog("cura") -## Class responsible for showing a progress message while a mesh is being uploaded to the cloud. class PrintJobUploadProgressMessage(Message): + """Class responsible for showing a progress message while a mesh is being uploaded to the cloud.""" + def __init__(self): super().__init__( title = I18N_CATALOG.i18nc("@info:status", "Sending Print Job"), @@ -19,14 +20,17 @@ class PrintJobUploadProgressMessage(Message): use_inactivity_timer = False ) - ## Shows the progress message. def show(self): + """Shows the progress message.""" + self.setProgress(0) super().show() - ## Updates the percentage of the uploaded. - # \param percentage: The percentage amount (0-100). def update(self, percentage: int) -> None: + """Updates the percentage of the uploaded. + + :param percentage: The percentage amount (0-100). + """ if not self._visible: super().show() self.setProgress(percentage) diff --git a/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadSuccessMessage.py b/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadSuccessMessage.py index c9be28d57f..aa64f338dd 100644 --- a/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadSuccessMessage.py +++ b/plugins/UM3NetworkPrinting/src/Messages/PrintJobUploadSuccessMessage.py @@ -7,9 +7,9 @@ from UM.Message import Message I18N_CATALOG = i18nCatalog("cura") -## Message shown when uploading a print job to a cluster succeeded. class PrintJobUploadSuccessMessage(Message): - + """Message shown when uploading a print job to a cluster succeeded.""" + def __init__(self) -> None: super().__init__( text = I18N_CATALOG.i18nc("@info:status", "Print job was successfully sent to the printer."), diff --git a/plugins/UM3NetworkPrinting/src/Models/BaseModel.py b/plugins/UM3NetworkPrinting/src/Models/BaseModel.py index 3d38a4b116..f1385a0270 100644 --- a/plugins/UM3NetworkPrinting/src/Models/BaseModel.py +++ b/plugins/UM3NetworkPrinting/src/Models/BaseModel.py @@ -18,45 +18,56 @@ class BaseModel: def validate(self) -> None: pass - ## Checks whether the two models are equal. - # \param other: The other model. - # \return True if they are equal, False if they are different. def __eq__(self, other): + """Checks whether the two models are equal. + + :param other: The other model. + :return: True if they are equal, False if they are different. + """ return type(self) == type(other) and self.toDict() == other.toDict() - ## Checks whether the two models are different. - # \param other: The other model. - # \return True if they are different, False if they are the same. def __ne__(self, other) -> bool: + """Checks whether the two models are different. + + :param other: The other model. + :return: True if they are different, False if they are the same. + """ return type(self) != type(other) or self.toDict() != other.toDict() - ## Converts the model into a serializable dictionary def toDict(self) -> Dict[str, Any]: + """Converts the model into a serializable dictionary""" + return self.__dict__ - ## Parses a single model. - # \param model_class: The model class. - # \param values: The value of the model, which is usually a dictionary, but may also be already parsed. - # \return An instance of the model_class given. @staticmethod def parseModel(model_class: Type[T], values: Union[T, Dict[str, Any]]) -> T: + """Parses a single model. + + :param model_class: The model class. + :param values: The value of the model, which is usually a dictionary, but may also be already parsed. + :return: An instance of the model_class given. + """ if isinstance(values, dict): return model_class(**values) return values - ## Parses a list of models. - # \param model_class: The model class. - # \param values: The value of the list. Each value is usually a dictionary, but may also be already parsed. - # \return A list of instances of the model_class given. @classmethod def parseModels(cls, model_class: Type[T], values: List[Union[T, Dict[str, Any]]]) -> List[T]: + """Parses a list of models. + + :param model_class: The model class. + :param values: The value of the list. Each value is usually a dictionary, but may also be already parsed. + :return: A list of instances of the model_class given. + """ return [cls.parseModel(model_class, value) for value in values] - ## Parses the given date string. - # \param date: The date to parse. - # \return The parsed date. @staticmethod def parseDate(date: Union[str, datetime]) -> datetime: + """Parses the given date string. + + :param date: The date to parse. + :return: The parsed date. + """ if isinstance(date, datetime): return date return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc) diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/CloudClusterResponse.py b/plugins/UM3NetworkPrinting/src/Models/Http/CloudClusterResponse.py index 7ecfe8b0a3..a108b8dc87 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/CloudClusterResponse.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/CloudClusterResponse.py @@ -5,22 +5,26 @@ from typing import Optional from ..BaseModel import BaseModel -## Class representing a cloud connected cluster. class CloudClusterResponse(BaseModel): + """Class representing a cloud connected cluster.""" + - ## Creates a new cluster response object. - # \param cluster_id: The secret unique ID, e.g. 'kBEeZWEifXbrXviO8mRYLx45P8k5lHVGs43XKvRniPg='. - # \param host_guid: The unique identifier of the print cluster host, e.g. 'e90ae0ac-1257-4403-91ee-a44c9b7e8050'. - # \param host_name: The name of the printer as configured during the Wi-Fi setup. Used as identifier for end users. - # \param is_online: Whether this cluster is currently connected to the cloud. - # \param status: The status of the cluster authentication (active or inactive). - # \param host_version: The firmware version of the cluster host. This is where the Stardust client is running on. - # \param host_internal_ip: The internal IP address of the host printer. - # \param friendly_name: The human readable name of the host printer. - # \param printer_type: The machine type of the host printer. def __init__(self, cluster_id: str, host_guid: str, host_name: str, is_online: bool, status: str, host_internal_ip: Optional[str] = None, host_version: Optional[str] = None, friendly_name: Optional[str] = None, printer_type: str = "ultimaker3", **kwargs) -> None: + """Creates a new cluster response object. + + :param cluster_id: The secret unique ID, e.g. 'kBEeZWEifXbrXviO8mRYLx45P8k5lHVGs43XKvRniPg='. + :param host_guid: The unique identifier of the print cluster host, e.g. 'e90ae0ac-1257-4403-91ee-a44c9b7e8050'. + :param host_name: The name of the printer as configured during the Wi-Fi setup. Used as identifier for end users. + :param is_online: Whether this cluster is currently connected to the cloud. + :param status: The status of the cluster authentication (active or inactive). + :param host_version: The firmware version of the cluster host. This is where the Stardust client is running on. + :param host_internal_ip: The internal IP address of the host printer. + :param friendly_name: The human readable name of the host printer. + :param printer_type: The machine type of the host printer. + """ + self.cluster_id = cluster_id self.host_guid = host_guid self.host_name = host_name diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/CloudClusterStatus.py b/plugins/UM3NetworkPrinting/src/Models/Http/CloudClusterStatus.py index 330e61d343..10f7b0ce6b 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/CloudClusterStatus.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/CloudClusterStatus.py @@ -11,15 +11,17 @@ from .ClusterPrintJobStatus import ClusterPrintJobStatus # Model that represents the status of the cluster for the cloud class CloudClusterStatus(BaseModel): - ## Creates a new cluster status model object. - # \param printers: The latest status of each printer in the cluster. - # \param print_jobs: The latest status of each print job in the cluster. - # \param generated_time: The datetime when the object was generated on the server-side. - def __init__(self, - printers: List[Union[ClusterPrinterStatus, Dict[str, Any]]], + def __init__(self, printers: List[Union[ClusterPrinterStatus, Dict[str, Any]]], print_jobs: List[Union[ClusterPrintJobStatus, Dict[str, Any]]], generated_time: Union[str, datetime], **kwargs) -> None: + """Creates a new cluster status model object. + + :param printers: The latest status of each printer in the cluster. + :param print_jobs: The latest status of each print job in the cluster. + :param generated_time: The datetime when the object was generated on the server-side. + """ + self.generated_time = self.parseDate(generated_time) self.printers = self.parseModels(ClusterPrinterStatus, printers) self.print_jobs = self.parseModels(ClusterPrintJobStatus, print_jobs) diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/CloudError.py b/plugins/UM3NetworkPrinting/src/Models/Http/CloudError.py index 9381e4b8cf..97e7862ff6 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/CloudError.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/CloudError.py @@ -5,20 +5,23 @@ from typing import Dict, Optional, Any from ..BaseModel import BaseModel -## Class representing errors generated by the cloud servers, according to the JSON-API standard. class CloudError(BaseModel): + """Class representing errors generated by the cloud servers, according to the JSON-API standard.""" - ## Creates a new error object. - # \param id: Unique identifier for this particular occurrence of the problem. - # \param title: A short, human-readable summary of the problem that SHOULD NOT change from occurrence to occurrence - # of the problem, except for purposes of localization. - # \param code: An application-specific error code, expressed as a string value. - # \param detail: A human-readable explanation specific to this occurrence of the problem. Like title, this field's - # value can be localized. - # \param http_status: The HTTP status code applicable to this problem, converted to string. - # \param meta: Non-standard meta-information about the error, depending on the error code. def __init__(self, id: str, code: str, title: str, http_status: str, detail: Optional[str] = None, meta: Optional[Dict[str, Any]] = None, **kwargs) -> None: + """Creates a new error object. + + :param id: Unique identifier for this particular occurrence of the problem. + :param title: A short, human-readable summary of the problem that SHOULD NOT change from occurrence to occurrence + of the problem, except for purposes of localization. + :param code: An application-specific error code, expressed as a string value. + :param detail: A human-readable explanation specific to this occurrence of the problem. Like title, this field's + value can be localized. + :param http_status: The HTTP status code applicable to this problem, converted to string. + :param meta: Non-standard meta-information about the error, depending on the error code. + """ + self.id = id self.code = code self.http_status = http_status diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintJobResponse.py b/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintJobResponse.py index a1880e8751..ccc9ffb2fc 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintJobResponse.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintJobResponse.py @@ -8,19 +8,22 @@ from ..BaseModel import BaseModel # Model that represents the response received from the cloud after requesting to upload a print job class CloudPrintJobResponse(BaseModel): - ## Creates a new print job response model. - # \param job_id: The job unique ID, e.g. 'kBEeZWEifXbrXviO8mRYLx45P8k5lHVGs43XKvRniPg='. - # \param status: The status of the print job. - # \param status_description: Contains more details about the status, e.g. the cause of failures. - # \param download_url: A signed URL to download the resulting status. Only available when the job is finished. - # \param job_name: The name of the print job. - # \param slicing_details: Model for slice information. - # \param upload_url: The one-time use URL where the toolpath must be uploaded to (only if status is uploading). - # \param content_type: The content type of the print job (e.g. text/plain or application/gzip) - # \param generated_time: The datetime when the object was generated on the server-side. def __init__(self, job_id: str, status: str, download_url: Optional[str] = None, job_name: Optional[str] = None, upload_url: Optional[str] = None, content_type: Optional[str] = None, status_description: Optional[str] = None, slicing_details: Optional[dict] = None, **kwargs) -> None: + """Creates a new print job response model. + + :param job_id: The job unique ID, e.g. 'kBEeZWEifXbrXviO8mRYLx45P8k5lHVGs43XKvRniPg='. + :param status: The status of the print job. + :param status_description: Contains more details about the status, e.g. the cause of failures. + :param download_url: A signed URL to download the resulting status. Only available when the job is finished. + :param job_name: The name of the print job. + :param slicing_details: Model for slice information. + :param upload_url: The one-time use URL where the toolpath must be uploaded to (only if status is uploading). + :param content_type: The content type of the print job (e.g. text/plain or application/gzip) + :param generated_time: The datetime when the object was generated on the server-side. + """ + self.job_id = job_id self.status = status self.download_url = download_url diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintJobUploadRequest.py b/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintJobUploadRequest.py index ff705ae495..efa1efb7e4 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintJobUploadRequest.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintJobUploadRequest.py @@ -6,11 +6,14 @@ from ..BaseModel import BaseModel # Model that represents the request to upload a print job to the cloud class CloudPrintJobUploadRequest(BaseModel): - ## Creates a new print job upload request. - # \param job_name: The name of the print job. - # \param file_size: The size of the file in bytes. - # \param content_type: The content type of the print job (e.g. text/plain or application/gzip) def __init__(self, job_name: str, file_size: int, content_type: str, **kwargs) -> None: + """Creates a new print job upload request. + + :param job_name: The name of the print job. + :param file_size: The size of the file in bytes. + :param content_type: The content type of the print job (e.g. text/plain or application/gzip) + """ + self.job_name = job_name self.file_size = file_size self.content_type = content_type diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintResponse.py b/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintResponse.py index b108f40e27..ee2b8a307f 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintResponse.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/CloudPrintResponse.py @@ -9,13 +9,16 @@ from ..BaseModel import BaseModel # Model that represents the responses received from the cloud after requesting a job to be printed. class CloudPrintResponse(BaseModel): - ## Creates a new print response object. - # \param job_id: The unique ID of a print job inside of the cluster. This ID is generated by Cura Connect. - # \param status: The status of the print request (queued or failed). - # \param generated_time: The datetime when the object was generated on the server-side. - # \param cluster_job_id: The unique ID of a print job inside of the cluster. This ID is generated by Cura Connect. def __init__(self, job_id: str, status: str, generated_time: Union[str, datetime], cluster_job_id: Optional[str] = None, **kwargs) -> None: + """Creates a new print response object. + + :param job_id: The unique ID of a print job inside of the cluster. This ID is generated by Cura Connect. + :param status: The status of the print request (queued or failed). + :param generated_time: The datetime when the object was generated on the server-side. + :param cluster_job_id: The unique ID of a print job inside of the cluster. This ID is generated by Cura Connect. + """ + self.job_id = job_id self.status = status self.cluster_job_id = cluster_job_id diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterBuildPlate.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterBuildPlate.py index a5a392488d..c81e0a372c 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterBuildPlate.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterBuildPlate.py @@ -3,11 +3,13 @@ from ..BaseModel import BaseModel -## Class representing a cluster printer class ClusterBuildPlate(BaseModel): + """Class representing a cluster printer""" - ## Create a new build plate - # \param type: The type of build plate glass or aluminium def __init__(self, type: str = "glass", **kwargs) -> None: + """Create a new build plate + + :param type: The type of build plate glass or aluminium + """ self.type = type super().__init__(**kwargs) diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintCoreConfiguration.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintCoreConfiguration.py index e11d2be2d2..75ce234e23 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintCoreConfiguration.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintCoreConfiguration.py @@ -9,26 +9,33 @@ from .ClusterPrinterConfigurationMaterial import ClusterPrinterConfigurationMate from ..BaseModel import BaseModel -## Class representing a cloud cluster printer configuration -# Also used for representing slots in a Material Station (as from Cura's perspective these are the same). class ClusterPrintCoreConfiguration(BaseModel): + """Class representing a cloud cluster printer configuration + + Also used for representing slots in a Material Station (as from Cura's perspective these are the same). + """ + + def __init__(self, extruder_index: int, material: Union[None, Dict[str, Any], + ClusterPrinterConfigurationMaterial] = None, print_core_id: Optional[str] = None, **kwargs) -> None: + """Creates a new cloud cluster printer configuration object + + :param extruder_index: The position of the extruder on the machine as list index. Numbered from left to right. + :param material: The material of a configuration object in a cluster printer. May be in a dict or an object. + :param nozzle_diameter: The diameter of the print core at this position in millimeters, e.g. '0.4'. + :param print_core_id: The type of print core inserted at this position, e.g. 'AA 0.4'. + """ - ## Creates a new cloud cluster printer configuration object - # \param extruder_index: The position of the extruder on the machine as list index. Numbered from left to right. - # \param material: The material of a configuration object in a cluster printer. May be in a dict or an object. - # \param nozzle_diameter: The diameter of the print core at this position in millimeters, e.g. '0.4'. - # \param print_core_id: The type of print core inserted at this position, e.g. 'AA 0.4'. - def __init__(self, extruder_index: int, - material: Union[None, Dict[str, Any], ClusterPrinterConfigurationMaterial] = None, - print_core_id: Optional[str] = None, **kwargs) -> None: self.extruder_index = extruder_index self.material = self.parseModel(ClusterPrinterConfigurationMaterial, material) if material else None self.print_core_id = print_core_id super().__init__(**kwargs) - ## Updates the given output model. - # \param model - The output model to update. def updateOutputModel(self, model: ExtruderOutputModel) -> None: + """Updates the given output model. + + :param model: The output model to update. + """ + if self.print_core_id is not None: model.updateHotendID(self.print_core_id) @@ -40,14 +47,16 @@ class ClusterPrintCoreConfiguration(BaseModel): else: model.updateActiveMaterial(None) - ## Creates a configuration model def createConfigurationModel(self) -> ExtruderConfigurationModel: + """Creates a configuration model""" + model = ExtruderConfigurationModel(position = self.extruder_index) self.updateConfigurationModel(model) return model - ## Creates a configuration model def updateConfigurationModel(self, model: ExtruderConfigurationModel) -> ExtruderConfigurationModel: + """Creates a configuration model""" + model.setHotendID(self.print_core_id) if self.material: model.setMaterial(self.material.createOutputModel()) diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobConfigurationChange.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobConfigurationChange.py index 88251bbf53..cdfa633170 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobConfigurationChange.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobConfigurationChange.py @@ -5,19 +5,22 @@ from typing import Optional from ..BaseModel import BaseModel -## Model for the types of changes that are needed before a print job can start class ClusterPrintJobConfigurationChange(BaseModel): + """Model for the types of changes that are needed before a print job can start""" + + + def __init__(self, type_of_change: str, target_id: str, origin_id: str, index: Optional[int] = None, + target_name: Optional[str] = None, origin_name: Optional[str] = None, **kwargs) -> None: + """Creates a new print job constraint. + + :param type_of_change: The type of configuration change, one of: "material", "print_core_change" + :param index: The hotend slot or extruder index to change + :param target_id: Target material guid or hotend id + :param origin_id: Original/current material guid or hotend id + :param target_name: Target material name or hotend id + :param origin_name: Original/current material name or hotend id + """ - ## Creates a new print job constraint. - # \param type_of_change: The type of configuration change, one of: "material", "print_core_change" - # \param index: The hotend slot or extruder index to change - # \param target_id: Target material guid or hotend id - # \param origin_id: Original/current material guid or hotend id - # \param target_name: Target material name or hotend id - # \param origin_name: Original/current material name or hotend id - def __init__(self, type_of_change: str, target_id: str, origin_id: str, - index: Optional[int] = None, target_name: Optional[str] = None, origin_name: Optional[str] = None, - **kwargs) -> None: self.type_of_change = type_of_change self.index = index self.target_id = target_id diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobConstraint.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobConstraint.py index 9239004b18..258d940a03 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobConstraint.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobConstraint.py @@ -5,12 +5,14 @@ from typing import Optional from ..BaseModel import BaseModel -## Class representing a cloud cluster print job constraint class ClusterPrintJobConstraints(BaseModel): + """Class representing a cloud cluster print job constraint""" - ## Creates a new print job constraint. - # \param require_printer_name: Unique name of the printer that this job should be printed on. - # Should be one of the unique_name field values in the cluster, e.g. 'ultimakersystem-ccbdd30044ec' def __init__(self, require_printer_name: Optional[str] = None, **kwargs) -> None: + """Creates a new print job constraint. + + :param require_printer_name: Unique name of the printer that this job should be printed on. + Should be one of the unique_name field values in the cluster, e.g. 'ultimakersystem-ccbdd30044ec' + """ self.require_printer_name = require_printer_name super().__init__(**kwargs) diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobImpediment.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobImpediment.py index 5a8f0aa46d..7beaf6f61f 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobImpediment.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobImpediment.py @@ -3,14 +3,17 @@ from ..BaseModel import BaseModel -## Class representing the reasons that prevent this job from being printed on the associated printer class ClusterPrintJobImpediment(BaseModel): + """Class representing the reasons that prevent this job from being printed on the associated printer""" - ## Creates a new print job constraint. - # \param translation_key: A string indicating a reason the print cannot be printed, - # such as 'does_not_fit_in_build_volume' - # \param severity: A number indicating the severity of the problem, with higher being more severe def __init__(self, translation_key: str, severity: int, **kwargs) -> None: + """Creates a new print job constraint. + + :param translation_key: A string indicating a reason the print cannot be printed, + such as 'does_not_fit_in_build_volume' + :param severity: A number indicating the severity of the problem, with higher being more severe + """ + self.translation_key = translation_key self.severity = severity super().__init__(**kwargs) diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobStatus.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobStatus.py index 22fb9bb37a..9fb94ab12f 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobStatus.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrintJobStatus.py @@ -15,36 +15,9 @@ from ..BaseModel import BaseModel from ...ClusterOutputController import ClusterOutputController -## Model for the status of a single print job in a cluster. class ClusterPrintJobStatus(BaseModel): + """Model for the status of a single print job in a cluster.""" - ## Creates a new cloud print job status model. - # \param assigned_to: The name of the printer this job is assigned to while being queued. - # \param configuration: The required print core configurations of this print job. - # \param constraints: Print job constraints object. - # \param created_at: The timestamp when the job was created in Cura Connect. - # \param force: Allow this job to be printed despite of mismatching configurations. - # \param last_seen: The number of seconds since this job was checked. - # \param machine_variant: The machine type that this job should be printed on.Coincides with the machine_type field - # of the printer object. - # \param name: The name of the print job. Usually the name of the .gcode file. - # \param network_error_count: The number of errors encountered when requesting data for this print job. - # \param owner: The name of the user who added the print job to Cura Connect. - # \param printer_uuid: UUID of the printer that the job is currently printing on or assigned to. - # \param started: Whether the job has started printing or not. - # \param status: The status of the print job. - # \param time_elapsed: The remaining printing time in seconds. - # \param time_total: The total printing time in seconds. - # \param uuid: UUID of this print job. Should be used for identification purposes. - # \param deleted_at: The time when this print job was deleted. - # \param printed_on_uuid: UUID of the printer used to print this job. - # \param configuration_changes_required: List of configuration changes the printer this job is associated with - # needs to make in order to be able to print this job - # \param build_plate: The build plate (type) this job needs to be printed on. - # \param compatible_machine_families: Family names of machines suitable for this print job - # \param impediments_to_printing: A list of reasons that prevent this job from being printed on the associated - # printer - # \param preview_url: URL to the preview image (same as wou;d've been included in the ufp). def __init__(self, created_at: str, force: bool, machine_variant: str, name: str, started: bool, status: str, time_total: int, uuid: str, configuration: List[Union[Dict[str, Any], ClusterPrintCoreConfiguration]], @@ -60,6 +33,37 @@ class ClusterPrintJobStatus(BaseModel): impediments_to_printing: List[Union[Dict[str, Any], ClusterPrintJobImpediment]] = None, preview_url: Optional[str] = None, **kwargs) -> None: + + """Creates a new cloud print job status model. + + :param assigned_to: The name of the printer this job is assigned to while being queued. + :param configuration: The required print core configurations of this print job. + :param constraints: Print job constraints object. + :param created_at: The timestamp when the job was created in Cura Connect. + :param force: Allow this job to be printed despite of mismatching configurations. + :param last_seen: The number of seconds since this job was checked. + :param machine_variant: The machine type that this job should be printed on.Coincides with the machine_type field + of the printer object. + :param name: The name of the print job. Usually the name of the .gcode file. + :param network_error_count: The number of errors encountered when requesting data for this print job. + :param owner: The name of the user who added the print job to Cura Connect. + :param printer_uuid: UUID of the printer that the job is currently printing on or assigned to. + :param started: Whether the job has started printing or not. + :param status: The status of the print job. + :param time_elapsed: The remaining printing time in seconds. + :param time_total: The total printing time in seconds. + :param uuid: UUID of this print job. Should be used for identification purposes. + :param deleted_at: The time when this print job was deleted. + :param printed_on_uuid: UUID of the printer used to print this job. + :param configuration_changes_required: List of configuration changes the printer this job is associated with + needs to make in order to be able to print this job + :param build_plate: The build plate (type) this job needs to be printed on. + :param compatible_machine_families: Family names of machines suitable for this print job + :param impediments_to_printing: A list of reasons that prevent this job from being printed on the associated + printer + :param preview_url: URL to the preview image (same as wou;d've been included in the ufp). + """ + self.assigned_to = assigned_to self.configuration = self.parseModels(ClusterPrintCoreConfiguration, configuration) self.constraints = self.parseModels(ClusterPrintJobConstraints, constraints) @@ -90,24 +94,31 @@ class ClusterPrintJobStatus(BaseModel): super().__init__(**kwargs) - ## Creates an UM3 print job output model based on this cloud cluster print job. - # \param printer: The output model of the printer def createOutputModel(self, controller: ClusterOutputController) -> UM3PrintJobOutputModel: + """Creates an UM3 print job output model based on this cloud cluster print job. + + :param printer: The output model of the printer + """ + model = UM3PrintJobOutputModel(controller, self.uuid, self.name) self.updateOutputModel(model) return model - ## Creates a new configuration model def _createConfigurationModel(self) -> PrinterConfigurationModel: + """Creates a new configuration model""" + extruders = [extruder.createConfigurationModel() for extruder in self.configuration or ()] configuration = PrinterConfigurationModel() configuration.setExtruderConfigurations(extruders) configuration.setPrinterType(self.machine_variant) return configuration - ## Updates an UM3 print job output model based on this cloud cluster print job. - # \param model: The model to update. def updateOutputModel(self, model: UM3PrintJobOutputModel) -> None: + """Updates an UM3 print job output model based on this cloud cluster print job. + + :param model: The model to update. + """ + model.updateConfiguration(self._createConfigurationModel()) model.updateTimeTotal(self.time_total) model.updateTimeElapsed(self.time_elapsed) diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterConfigurationMaterial.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterConfigurationMaterial.py index 8edb9fb808..62f99293d1 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterConfigurationMaterial.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterConfigurationMaterial.py @@ -9,29 +9,35 @@ from cura.PrinterOutput.Models.MaterialOutputModel import MaterialOutputModel from ..BaseModel import BaseModel -## Class representing a cloud cluster printer configuration class ClusterPrinterConfigurationMaterial(BaseModel): + """Class representing a cloud cluster printer configuration""" - ## Creates a new material configuration model. - # \param brand: The brand of material in this print core, e.g. 'Ultimaker'. - # \param color: The color of material in this print core, e.g. 'Blue'. - # \param guid: he GUID of the material in this print core, e.g. '506c9f0d-e3aa-4bd4-b2d2-23e2425b1aa9'. - # \param material: The type of material in this print core, e.g. 'PLA'. def __init__(self, brand: Optional[str] = None, color: Optional[str] = None, guid: Optional[str] = None, material: Optional[str] = None, **kwargs) -> None: + + """Creates a new material configuration model. + + :param brand: The brand of material in this print core, e.g. 'Ultimaker'. + :param color: The color of material in this print core, e.g. 'Blue'. + :param guid: he GUID of the material in this print core, e.g. '506c9f0d-e3aa-4bd4-b2d2-23e2425b1aa9'. + :param material: The type of material in this print core, e.g. 'PLA'. + """ + self.guid = guid self.brand = brand self.color = color self.material = material super().__init__(**kwargs) - ## Creates a material output model based on this cloud printer material. - # - # A material is chosen that matches the current GUID. If multiple such - # materials are available, read-only materials are preferred and the - # material with the earliest alphabetical name will be selected. - # \return A material output model that matches the current GUID. def createOutputModel(self) -> MaterialOutputModel: + """Creates a material output model based on this cloud printer material. + + A material is chosen that matches the current GUID. If multiple such + materials are available, read-only materials are preferred and the + material with the earliest alphabetical name will be selected. + :return: A material output model that matches the current GUID. + """ + container_registry = ContainerRegistry.getInstance() same_guid = container_registry.findInstanceContainersMetadata(GUID = self.guid) if same_guid: diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterMaterialStation.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterMaterialStation.py index c51e07bcfc..1929c2a388 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterMaterialStation.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterMaterialStation.py @@ -6,16 +6,19 @@ from ..BaseModel import BaseModel from .ClusterPrinterMaterialStationSlot import ClusterPrinterMaterialStationSlot -## Class representing the data of a Material Station in the cluster. class ClusterPrinterMaterialStation(BaseModel): + """Class representing the data of a Material Station in the cluster.""" - ## Creates a new Material Station status. - # \param status: The status of the material station. - # \param: supported: Whether the material station is supported on this machine or not. - # \param material_slots: The active slots configurations of this material station. def __init__(self, status: str, supported: bool = False, material_slots: List[Union[ClusterPrinterMaterialStationSlot, Dict[str, Any]]] = None, **kwargs) -> None: + """Creates a new Material Station status. + + :param status: The status of the material station. + :param: supported: Whether the material station is supported on this machine or not. + :param material_slots: The active slots configurations of this material station. + """ + self.status = status self.supported = supported self.material_slots = self.parseModels(ClusterPrinterMaterialStationSlot, material_slots)\ diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterMaterialStationSlot.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterMaterialStationSlot.py index b9c40592e5..d41d6c14fc 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterMaterialStationSlot.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterMaterialStationSlot.py @@ -5,16 +5,19 @@ from typing import Optional from .ClusterPrintCoreConfiguration import ClusterPrintCoreConfiguration -## Class representing the data of a single slot in the material station. class ClusterPrinterMaterialStationSlot(ClusterPrintCoreConfiguration): - - ## Create a new material station slot object. - # \param slot_index: The index of the slot in the material station (ranging 0 to 5). - # \param compatible: Whether the configuration is compatible with the print core. - # \param material_remaining: How much material is remaining on the spool (between 0 and 1, or -1 for missing data). - # \param material_empty: Whether the material spool is too empty to be used. + """Class representing the data of a single slot in the material station.""" + def __init__(self, slot_index: int, compatible: bool, material_remaining: float, material_empty: Optional[bool] = False, **kwargs) -> None: + """Create a new material station slot object. + + :param slot_index: The index of the slot in the material station (ranging 0 to 5). + :param compatible: Whether the configuration is compatible with the print core. + :param material_remaining: How much material is remaining on the spool (between 0 and 1, or -1 for missing data). + :param material_empty: Whether the material spool is too empty to be used. + """ + self.slot_index = slot_index self.compatible = compatible self.material_remaining = material_remaining diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterStatus.py b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterStatus.py index 2e0912f057..3d342a519d 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterStatus.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/ClusterPrinterStatus.py @@ -17,26 +17,10 @@ from .ClusterPrinterConfigurationMaterial import ClusterPrinterConfigurationMate from ..BaseModel import BaseModel -## Class representing a cluster printer class ClusterPrinterStatus(BaseModel): + """Class representing a cluster printer""" + - ## Creates a new cluster printer status - # \param enabled: A printer can be disabled if it should not receive new jobs. By default every printer is enabled. - # \param firmware_version: Firmware version installed on the printer. Can differ for each printer in a cluster. - # \param friendly_name: Human readable name of the printer. Can be used for identification purposes. - # \param ip_address: The IP address of the printer in the local network. - # \param machine_variant: The type of printer. Can be 'Ultimaker 3' or 'Ultimaker 3ext'. - # \param status: The status of the printer. - # \param unique_name: The unique name of the printer in the network. - # \param uuid: The unique ID of the printer, also known as GUID. - # \param configuration: The active print core configurations of this printer. - # \param reserved_by: A printer can be claimed by a specific print job. - # \param maintenance_required: Indicates if maintenance is necessary. - # \param firmware_update_status: Whether the printer's firmware is up-to-date, value is one of: "up_to_date", - # "pending_update", "update_available", "update_in_progress", "update_failed", "update_impossible". - # \param latest_available_firmware: The version of the latest firmware that is available. - # \param build_plate: The build plate that is on the printer. - # \param material_station: The material station that is on the printer. def __init__(self, enabled: bool, firmware_version: str, friendly_name: str, ip_address: str, machine_variant: str, status: str, unique_name: str, uuid: str, configuration: List[Union[Dict[str, Any], ClusterPrintCoreConfiguration]], @@ -44,6 +28,25 @@ class ClusterPrinterStatus(BaseModel): firmware_update_status: Optional[str] = None, latest_available_firmware: Optional[str] = None, build_plate: Union[Dict[str, Any], ClusterBuildPlate] = None, material_station: Union[Dict[str, Any], ClusterPrinterMaterialStation] = None, **kwargs) -> None: + """Creates a new cluster printer status + + :param enabled: A printer can be disabled if it should not receive new jobs. By default every printer is enabled. + :param firmware_version: Firmware version installed on the printer. Can differ for each printer in a cluster. + :param friendly_name: Human readable name of the printer. Can be used for identification purposes. + :param ip_address: The IP address of the printer in the local network. + :param machine_variant: The type of printer. Can be 'Ultimaker 3' or 'Ultimaker 3ext'. + :param status: The status of the printer. + :param unique_name: The unique name of the printer in the network. + :param uuid: The unique ID of the printer, also known as GUID. + :param configuration: The active print core configurations of this printer. + :param reserved_by: A printer can be claimed by a specific print job. + :param maintenance_required: Indicates if maintenance is necessary. + :param firmware_update_status: Whether the printer's firmware is up-to-date, value is one of: "up_to_date", + "pending_update", "update_available", "update_in_progress", "update_failed", "update_impossible". + :param latest_available_firmware: The version of the latest firmware that is available. + :param build_plate: The build plate that is on the printer. + :param material_station: The material station that is on the printer. + """ self.configuration = self.parseModels(ClusterPrintCoreConfiguration, configuration) self.enabled = enabled @@ -63,9 +66,12 @@ class ClusterPrinterStatus(BaseModel): material_station) if material_station else None super().__init__(**kwargs) - ## Creates a new output model. - # \param controller - The controller of the model. def createOutputModel(self, controller: PrinterOutputController) -> PrinterOutputModel: + """Creates a new output model. + + :param controller: - The controller of the model. + """ + # FIXME # Note that we're using '2' here as extruder count. We have hardcoded this for now to prevent issues where the # amount of extruders coming back from the API is actually lower (which it can be if a printer was just added @@ -74,9 +80,12 @@ class ClusterPrinterStatus(BaseModel): self.updateOutputModel(model) return model - ## Updates the given output model. - # \param model - The output model to update. def updateOutputModel(self, model: PrinterOutputModel) -> None: + """Updates the given output model. + + :param model: - The output model to update. + """ + model.updateKey(self.uuid) model.updateName(self.friendly_name) model.updateUniqueName(self.unique_name) @@ -110,9 +119,12 @@ class ClusterPrinterStatus(BaseModel): ) for left_slot, right_slot in product(self._getSlotsForExtruder(0), self._getSlotsForExtruder(1))] model.setAvailableConfigurations(available_configurations) - ## Create a list of Material Station slots for the given extruder index. - # Returns a list with a single empty material slot if none are found to ensure we don't miss configurations. def _getSlotsForExtruder(self, extruder_index: int) -> List[ClusterPrinterMaterialStationSlot]: + """Create a list of Material Station slots for the given extruder index. + + Returns a list with a single empty material slot if none are found to ensure we don't miss configurations. + """ + if not self.material_station: # typing guard return [] slots = [slot for slot in self.material_station.material_slots if self._isSupportedConfiguration( @@ -121,15 +133,19 @@ class ClusterPrinterStatus(BaseModel): )] return slots or [self._createEmptyMaterialSlot(extruder_index)] - ## Check if a configuration is supported in order to make it selectable by the user. - # We filter out any slot that is not supported by the extruder index, print core type or if the material is empty. @staticmethod def _isSupportedConfiguration(slot: ClusterPrinterMaterialStationSlot, extruder_index: int) -> bool: + """Check if a configuration is supported in order to make it selectable by the user. + + We filter out any slot that is not supported by the extruder index, print core type or if the material is empty. + """ + return slot.extruder_index == extruder_index and slot.compatible and not slot.material_empty - ## Create an empty material slot with a fake empty material. @staticmethod def _createEmptyMaterialSlot(extruder_index: int) -> ClusterPrinterMaterialStationSlot: + """Create an empty material slot with a fake empty material.""" + empty_material = ClusterPrinterConfigurationMaterial(guid = "", material = "empty", brand = "", color = "") return ClusterPrinterMaterialStationSlot(slot_index = 0, extruder_index = extruder_index, compatible = True, material_remaining = 0, material = empty_material) diff --git a/plugins/UM3NetworkPrinting/src/Models/Http/PrinterSystemStatus.py b/plugins/UM3NetworkPrinting/src/Models/Http/PrinterSystemStatus.py index ad7b9c8698..58d6c94ee1 100644 --- a/plugins/UM3NetworkPrinting/src/Models/Http/PrinterSystemStatus.py +++ b/plugins/UM3NetworkPrinting/src/Models/Http/PrinterSystemStatus.py @@ -5,12 +5,11 @@ from typing import Dict, Any from ..BaseModel import BaseModel -## Class representing the system status of a printer. class PrinterSystemStatus(BaseModel): + """Class representing the system status of a printer.""" def __init__(self, guid: str, firmware: str, hostname: str, name: str, platform: str, variant: str, - hardware: Dict[str, Any], **kwargs - ) -> None: + hardware: Dict[str, Any], **kwargs) -> None: self.guid = guid self.firmware = firmware self.hostname = hostname diff --git a/plugins/UM3NetworkPrinting/src/Network/ClusterApiClient.py b/plugins/UM3NetworkPrinting/src/Network/ClusterApiClient.py index 6a8b9f625c..7d6b260b90 100644 --- a/plugins/UM3NetworkPrinting/src/Network/ClusterApiClient.py +++ b/plugins/UM3NetworkPrinting/src/Network/ClusterApiClient.py @@ -16,12 +16,13 @@ from ..Models.Http.PrinterSystemStatus import PrinterSystemStatus from ..Models.Http.ClusterMaterial import ClusterMaterial -## The generic type variable used to document the methods below. ClusterApiClientModel = TypeVar("ClusterApiClientModel", bound=BaseModel) +"""The generic type variable used to document the methods below.""" -## The ClusterApiClient is responsible for all network calls to local network clusters. class ClusterApiClient: + """The ClusterApiClient is responsible for all network calls to local network clusters.""" + PRINTER_API_PREFIX = "/api/v1" CLUSTER_API_PREFIX = "/cluster-api/v1" @@ -29,75 +30,92 @@ class ClusterApiClient: # In order to avoid garbage collection we keep the callbacks in this list. _anti_gc_callbacks = [] # type: List[Callable[[], None]] - ## Initializes a new cluster API client. - # \param address: The network address of the cluster to call. - # \param on_error: The callback to be called whenever we receive errors from the server. def __init__(self, address: str, on_error: Callable) -> None: + """Initializes a new cluster API client. + + :param address: The network address of the cluster to call. + :param on_error: The callback to be called whenever we receive errors from the server. + """ super().__init__() self._manager = QNetworkAccessManager() self._address = address self._on_error = on_error - ## Get printer system information. - # \param on_finished: The callback in case the response is successful. def getSystem(self, on_finished: Callable) -> None: + """Get printer system information. + + :param on_finished: The callback in case the response is successful. + """ url = "{}/system".format(self.PRINTER_API_PREFIX) reply = self._manager.get(self._createEmptyRequest(url)) self._addCallback(reply, on_finished, PrinterSystemStatus) - ## Get the installed materials on the printer. - # \param on_finished: The callback in case the response is successful. def getMaterials(self, on_finished: Callable[[List[ClusterMaterial]], Any]) -> None: + """Get the installed materials on the printer. + + :param on_finished: The callback in case the response is successful. + """ url = "{}/materials".format(self.CLUSTER_API_PREFIX) reply = self._manager.get(self._createEmptyRequest(url)) self._addCallback(reply, on_finished, ClusterMaterial) - ## Get the printers in the cluster. - # \param on_finished: The callback in case the response is successful. def getPrinters(self, on_finished: Callable[[List[ClusterPrinterStatus]], Any]) -> None: + """Get the printers in the cluster. + + :param on_finished: The callback in case the response is successful. + """ url = "{}/printers".format(self.CLUSTER_API_PREFIX) reply = self._manager.get(self._createEmptyRequest(url)) self._addCallback(reply, on_finished, ClusterPrinterStatus) - ## Get the print jobs in the cluster. - # \param on_finished: The callback in case the response is successful. def getPrintJobs(self, on_finished: Callable[[List[ClusterPrintJobStatus]], Any]) -> None: + """Get the print jobs in the cluster. + + :param on_finished: The callback in case the response is successful. + """ url = "{}/print_jobs".format(self.CLUSTER_API_PREFIX) reply = self._manager.get(self._createEmptyRequest(url)) self._addCallback(reply, on_finished, ClusterPrintJobStatus) - ## Move a print job to the top of the queue. def movePrintJobToTop(self, print_job_uuid: str) -> None: + """Move a print job to the top of the queue.""" + url = "{}/print_jobs/{}/action/move".format(self.CLUSTER_API_PREFIX, print_job_uuid) self._manager.post(self._createEmptyRequest(url), json.dumps({"to_position": 0, "list": "queued"}).encode()) - ## Override print job configuration and force it to be printed. def forcePrintJob(self, print_job_uuid: str) -> None: + """Override print job configuration and force it to be printed.""" + url = "{}/print_jobs/{}".format(self.CLUSTER_API_PREFIX, print_job_uuid) self._manager.put(self._createEmptyRequest(url), json.dumps({"force": True}).encode()) - ## Delete a print job from the queue. def deletePrintJob(self, print_job_uuid: str) -> None: + """Delete a print job from the queue.""" + url = "{}/print_jobs/{}".format(self.CLUSTER_API_PREFIX, print_job_uuid) self._manager.deleteResource(self._createEmptyRequest(url)) - ## Set the state of a print job. def setPrintJobState(self, print_job_uuid: str, state: str) -> None: + """Set the state of a print job.""" + url = "{}/print_jobs/{}/action".format(self.CLUSTER_API_PREFIX, print_job_uuid) # We rewrite 'resume' to 'print' here because we are using the old print job action endpoints. action = "print" if state == "resume" else state self._manager.put(self._createEmptyRequest(url), json.dumps({"action": action}).encode()) - ## Get the preview image data of a print job. def getPrintJobPreviewImage(self, print_job_uuid: str, on_finished: Callable) -> None: + """Get the preview image data of a print job.""" + url = "{}/print_jobs/{}/preview_image".format(self.CLUSTER_API_PREFIX, print_job_uuid) reply = self._manager.get(self._createEmptyRequest(url)) self._addCallback(reply, on_finished) - ## We override _createEmptyRequest in order to add the user credentials. - # \param url: The URL to request - # \param content_type: The type of the body contents. def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest: + """We override _createEmptyRequest in order to add the user credentials. + + :param url: The URL to request + :param content_type: The type of the body contents. + """ url = QUrl("http://" + self._address + path) request = QNetworkRequest(url) request.setAttribute(QNetworkRequest.FollowRedirectsAttribute, True) @@ -105,11 +123,13 @@ class ClusterApiClient: request.setHeader(QNetworkRequest.ContentTypeHeader, content_type) return request - ## Parses the given JSON network reply into a status code and a dictionary, handling unexpected errors as well. - # \param reply: The reply from the server. - # \return A tuple with a status code and a dictionary. @staticmethod def _parseReply(reply: QNetworkReply) -> Tuple[int, Dict[str, Any]]: + """Parses the given JSON network reply into a status code and a dictionary, handling unexpected errors as well. + + :param reply: The reply from the server. + :return: A tuple with a status code and a dictionary. + """ status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) try: response = bytes(reply.readAll()).decode() @@ -118,14 +138,15 @@ class ClusterApiClient: Logger.logException("e", "Could not parse the cluster response: %s", err) return status_code, {"errors": [err]} - ## Parses the given models and calls the correct callback depending on the result. - # \param response: The response from the server, after being converted to a dict. - # \param on_finished: The callback in case the response is successful. - # \param model_class: The type of the model to convert the response to. It may either be a single record or a list. - def _parseModels(self, response: Dict[str, Any], - on_finished: Union[Callable[[ClusterApiClientModel], Any], - Callable[[List[ClusterApiClientModel]], Any]], - model_class: Type[ClusterApiClientModel]) -> None: + def _parseModels(self, response: Dict[str, Any], on_finished: Union[Callable[[ClusterApiClientModel], Any], + Callable[[List[ClusterApiClientModel]], Any]], model_class: Type[ClusterApiClientModel]) -> None: + """Parses the given models and calls the correct callback depending on the result. + + :param response: The response from the server, after being converted to a dict. + :param on_finished: The callback in case the response is successful. + :param model_class: The type of the model to convert the response to. It may either be a single record or a list. + """ + try: if isinstance(response, list): results = [model_class(**c) for c in response] # type: List[ClusterApiClientModel] @@ -138,16 +159,15 @@ class ClusterApiClient: except (JSONDecodeError, TypeError, ValueError): Logger.log("e", "Could not parse response from network: %s", str(response)) - ## Creates a callback function so that it includes the parsing of the response into the correct model. - # The callback is added to the 'finished' signal of the reply. - # \param reply: The reply that should be listened to. - # \param on_finished: The callback in case the response is successful. - def _addCallback(self, - reply: QNetworkReply, - on_finished: Union[Callable[[ClusterApiClientModel], Any], - Callable[[List[ClusterApiClientModel]], Any]], - model: Type[ClusterApiClientModel] = None, + def _addCallback(self, reply: QNetworkReply, on_finished: Union[Callable[[ClusterApiClientModel], Any], + Callable[[List[ClusterApiClientModel]], Any]], model: Type[ClusterApiClientModel] = None, ) -> None: + """Creates a callback function so that it includes the parsing of the response into the correct model. + + The callback is added to the 'finished' signal of the reply. + :param reply: The reply that should be listened to. + :param on_finished: The callback in case the response is successful. + """ def parse() -> None: self._anti_gc_callbacks.remove(parse) diff --git a/plugins/UM3NetworkPrinting/src/Network/LocalClusterOutputDevice.py b/plugins/UM3NetworkPrinting/src/Network/LocalClusterOutputDevice.py index 1266afcca8..ea120f8978 100644 --- a/plugins/UM3NetworkPrinting/src/Network/LocalClusterOutputDevice.py +++ b/plugins/UM3NetworkPrinting/src/Network/LocalClusterOutputDevice.py @@ -51,15 +51,17 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice): self._setInterfaceElements() self._active_camera_url = QUrl() # type: QUrl - ## Set all the interface elements and texts for this output device. def _setInterfaceElements(self) -> None: + """Set all the interface elements and texts for this output device.""" + self.setPriority(3) # Make sure the output device gets selected above local file output self.setShortDescription(I18N_CATALOG.i18nc("@action:button Preceded by 'Ready to'.", "Print over network")) self.setDescription(I18N_CATALOG.i18nc("@properties:tooltip", "Print over network")) self.setConnectionText(I18N_CATALOG.i18nc("@info:status", "Connected over the network")) - ## Called when the connection to the cluster changes. def connect(self) -> None: + """Called when the connection to the cluster changes.""" + super().connect() self._update() self.sendMaterialProfiles() @@ -94,10 +96,13 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice): def forceSendJob(self, print_job_uuid: str) -> None: self._getApiClient().forcePrintJob(print_job_uuid) - ## Set the remote print job state. - # \param print_job_uuid: The UUID of the print job to set the state for. - # \param action: The action to undertake ('pause', 'resume', 'abort'). def setJobState(self, print_job_uuid: str, action: str) -> None: + """Set the remote print job state. + + :param print_job_uuid: The UUID of the print job to set the state for. + :param action: The action to undertake ('pause', 'resume', 'abort'). + """ + self._getApiClient().setPrintJobState(print_job_uuid, action) def _update(self) -> None: @@ -106,19 +111,22 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice): self._getApiClient().getPrintJobs(self._updatePrintJobs) self._updatePrintJobPreviewImages() - ## Get a list of materials that are installed on the cluster host. def getMaterials(self, on_finished: Callable[[List[ClusterMaterial]], Any]) -> None: + """Get a list of materials that are installed on the cluster host.""" + self._getApiClient().getMaterials(on_finished = on_finished) - ## Sync the material profiles in Cura with the printer. - # This gets called when connecting to a printer as well as when sending a print. def sendMaterialProfiles(self) -> None: + """Sync the material profiles in Cura with the printer. + + This gets called when connecting to a printer as well as when sending a print. + """ job = SendMaterialJob(device = self) job.run() - ## Send a print job to the cluster. def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, filter_by_machine: bool = False, **kwargs) -> None: + """Send a print job to the cluster.""" # Show an error message if we're already sending a job. if self._progress.visible: @@ -132,15 +140,20 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice): job.finished.connect(self._onPrintJobCreated) job.start() - ## Allows the user to choose a printer to print with from the printer selection dialogue. - # \param unique_name: The unique name of the printer to target. @pyqtSlot(str, name="selectTargetPrinter") def selectTargetPrinter(self, unique_name: str = "") -> None: + """Allows the user to choose a printer to print with from the printer selection dialogue. + + :param unique_name: The unique name of the printer to target. + """ self._startPrintJobUpload(unique_name if unique_name != "" else None) - ## Handler for when the print job was created locally. - # It can now be sent over the network. def _onPrintJobCreated(self, job: ExportFileJob) -> None: + """Handler for when the print job was created locally. + + It can now be sent over the network. + """ + self._active_exported_job = job # TODO: add preference to enable/disable this feature? if self.clusterSize > 1: @@ -148,8 +161,9 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice): return self._startPrintJobUpload() - ## Shows a dialog allowing the user to select which printer in a group to send a job to. def _showPrinterSelectionDialog(self) -> None: + """Shows a dialog allowing the user to select which printer in a group to send a job to.""" + if not self._printer_select_dialog: plugin_path = CuraApplication.getInstance().getPluginRegistry().getPluginPath("UM3NetworkPrinting") or "" path = os.path.join(plugin_path, "resources", "qml", "PrintWindow.qml") @@ -157,8 +171,9 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice): if self._printer_select_dialog is not None: self._printer_select_dialog.show() - ## Upload the print job to the group. def _startPrintJobUpload(self, unique_name: str = None) -> None: + """Upload the print job to the group.""" + if not self._active_exported_job: Logger.log("e", "No active exported job to upload!") return @@ -177,33 +192,40 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice): on_progress=self._onPrintJobUploadProgress) self._active_exported_job = None - ## Handler for print job upload progress. def _onPrintJobUploadProgress(self, bytes_sent: int, bytes_total: int) -> None: + """Handler for print job upload progress.""" + percentage = (bytes_sent / bytes_total) if bytes_total else 0 self._progress.setProgress(percentage * 100) self.writeProgress.emit() - ## Handler for when the print job was fully uploaded to the cluster. def _onPrintUploadCompleted(self, _: QNetworkReply) -> None: + """Handler for when the print job was fully uploaded to the cluster.""" + self._progress.hide() PrintJobUploadSuccessMessage().show() self.writeFinished.emit() - ## Displays the given message if uploading the mesh has failed - # \param message: The message to display. def _onUploadError(self, message: str = None) -> None: + """Displays the given message if uploading the mesh has failed + + :param message: The message to display. + """ + self._progress.hide() PrintJobUploadErrorMessage(message).show() self.writeError.emit() - ## Download all the images from the cluster and load their data in the print job models. def _updatePrintJobPreviewImages(self): + """Download all the images from the cluster and load their data in the print job models.""" + for print_job in self._print_jobs: if print_job.getPreviewImage() is None: self._getApiClient().getPrintJobPreviewImage(print_job.key, print_job.updatePreviewImageData) - ## Get the API client instance. def _getApiClient(self) -> ClusterApiClient: + """Get the API client instance.""" + if not self._cluster_api: self._cluster_api = ClusterApiClient(self.address, on_error = lambda error: Logger.log("e", str(error))) return self._cluster_api diff --git a/plugins/UM3NetworkPrinting/src/Network/LocalClusterOutputDeviceManager.py b/plugins/UM3NetworkPrinting/src/Network/LocalClusterOutputDeviceManager.py index 273c64ef4d..68500d777e 100644 --- a/plugins/UM3NetworkPrinting/src/Network/LocalClusterOutputDeviceManager.py +++ b/plugins/UM3NetworkPrinting/src/Network/LocalClusterOutputDeviceManager.py @@ -24,8 +24,9 @@ from ..Models.Http.PrinterSystemStatus import PrinterSystemStatus I18N_CATALOG = i18nCatalog("cura") -## The LocalClusterOutputDeviceManager is responsible for discovering and managing local networked clusters. class LocalClusterOutputDeviceManager: + """The LocalClusterOutputDeviceManager is responsible for discovering and managing local networked clusters.""" + META_NETWORK_KEY = "um_network_key" @@ -49,30 +50,35 @@ class LocalClusterOutputDeviceManager: self._zero_conf_client.addedNetworkCluster.connect(self._onDeviceDiscovered) self._zero_conf_client.removedNetworkCluster.connect(self._onDiscoveredDeviceRemoved) - ## Start the network discovery. def start(self) -> None: + """Start the network discovery.""" + self._zero_conf_client.start() for address in self._getStoredManualAddresses(): self.addManualDevice(address) - ## Stop network discovery and clean up discovered devices. def stop(self) -> None: + """Stop network discovery and clean up discovered devices.""" + self._zero_conf_client.stop() for instance_name in list(self._discovered_devices): self._onDiscoveredDeviceRemoved(instance_name) - ## Restart discovery on the local network. def startDiscovery(self): + """Restart discovery on the local network.""" + self.stop() self.start() - ## Add a networked printer manually by address. def addManualDevice(self, address: str, callback: Optional[Callable[[bool, str], None]] = None) -> None: + """Add a networked printer manually by address.""" + api_client = ClusterApiClient(address, lambda error: Logger.log("e", str(error))) api_client.getSystem(lambda status: self._onCheckManualDeviceResponse(address, status, callback)) - ## Remove a manually added networked printer. def removeManualDevice(self, device_id: str, address: Optional[str] = None) -> None: + """Remove a manually added networked printer.""" + if device_id not in self._discovered_devices and address is not None: device_id = "manual:{}".format(address) @@ -83,16 +89,19 @@ class LocalClusterOutputDeviceManager: if address in self._getStoredManualAddresses(): self._removeStoredManualAddress(address) - ## Force reset all network device connections. def refreshConnections(self) -> None: + """Force reset all network device connections.""" + self._connectToActiveMachine() - ## Get the discovered devices. def getDiscoveredDevices(self) -> Dict[str, LocalClusterOutputDevice]: + """Get the discovered devices.""" + return self._discovered_devices - ## Connect the active machine to a given device. def associateActiveMachineWithPrinterDevice(self, device: LocalClusterOutputDevice) -> None: + """Connect the active machine to a given device.""" + active_machine = CuraApplication.getInstance().getGlobalContainerStack() if not active_machine: return @@ -106,8 +115,9 @@ class LocalClusterOutputDeviceManager: return CuraApplication.getInstance().getMachineManager().switchPrinterType(definitions[0].getName()) - ## Callback for when the active machine was changed by the user or a new remote cluster was found. def _connectToActiveMachine(self) -> None: + """Callback for when the active machine was changed by the user or a new remote cluster was found.""" + active_machine = CuraApplication.getInstance().getGlobalContainerStack() if not active_machine: return @@ -122,9 +132,10 @@ class LocalClusterOutputDeviceManager: # Remove device if it is not meant for the active machine. output_device_manager.removeOutputDevice(device.key) - ## Callback for when a manual device check request was responded to. def _onCheckManualDeviceResponse(self, address: str, status: PrinterSystemStatus, callback: Optional[Callable[[bool, str], None]] = None) -> None: + """Callback for when a manual device check request was responded to.""" + self._onDeviceDiscovered("manual:{}".format(address), address, { b"name": status.name.encode("utf-8"), b"address": address.encode("utf-8"), @@ -137,10 +148,13 @@ class LocalClusterOutputDeviceManager: if callback is not None: CuraApplication.getInstance().callLater(callback, True, address) - ## Returns a dict of printer BOM numbers to machine types. - # These numbers are available in the machine definition already so we just search for them here. @staticmethod def _getPrinterTypeIdentifiers() -> Dict[str, str]: + """Returns a dict of printer BOM numbers to machine types. + + These numbers are available in the machine definition already so we just search for them here. + """ + container_registry = CuraApplication.getInstance().getContainerRegistry() ultimaker_machines = container_registry.findContainersMetadata(type="machine", manufacturer="Ultimaker B.V.") found_machine_type_identifiers = {} # type: Dict[str, str] @@ -154,8 +168,9 @@ class LocalClusterOutputDeviceManager: found_machine_type_identifiers[str(bom_number)] = machine_type return found_machine_type_identifiers - ## Add a new device. def _onDeviceDiscovered(self, key: str, address: str, properties: Dict[bytes, bytes]) -> None: + """Add a new device.""" + machine_identifier = properties.get(b"machine", b"").decode("utf-8") printer_type_identifiers = self._getPrinterTypeIdentifiers() @@ -189,8 +204,9 @@ class LocalClusterOutputDeviceManager: self.discoveredDevicesChanged.emit() self._connectToActiveMachine() - ## Remove a device. def _onDiscoveredDeviceRemoved(self, device_id: str) -> None: + """Remove a device.""" + device = self._discovered_devices.pop(device_id, None) # type: Optional[LocalClusterOutputDevice] if not device: return @@ -198,8 +214,9 @@ class LocalClusterOutputDeviceManager: CuraApplication.getInstance().getDiscoveredPrintersModel().removeDiscoveredPrinter(device.address) self.discoveredDevicesChanged.emit() - ## Create a machine instance based on the discovered network printer. def _createMachineFromDiscoveredDevice(self, device_id: str) -> None: + """Create a machine instance based on the discovered network printer.""" + device = self._discovered_devices.get(device_id) if device is None: return @@ -216,8 +233,9 @@ class LocalClusterOutputDeviceManager: self._connectToOutputDevice(device, new_machine) self._showCloudFlowMessage(device) - ## Add an address to the stored preferences. def _storeManualAddress(self, address: str) -> None: + """Add an address to the stored preferences.""" + stored_addresses = self._getStoredManualAddresses() if address in stored_addresses: return # Prevent duplicates. @@ -225,8 +243,9 @@ class LocalClusterOutputDeviceManager: new_value = ",".join(stored_addresses) CuraApplication.getInstance().getPreferences().setValue(self.MANUAL_DEVICES_PREFERENCE_KEY, new_value) - ## Remove an address from the stored preferences. def _removeStoredManualAddress(self, address: str) -> None: + """Remove an address from the stored preferences.""" + stored_addresses = self._getStoredManualAddresses() try: stored_addresses.remove(address) # Can throw a ValueError @@ -235,15 +254,16 @@ class LocalClusterOutputDeviceManager: except ValueError: Logger.log("w", "Could not remove address from stored_addresses, it was not there") - ## Load the user-configured manual devices from Cura preferences. def _getStoredManualAddresses(self) -> List[str]: + """Load the user-configured manual devices from Cura preferences.""" + preferences = CuraApplication.getInstance().getPreferences() preferences.addPreference(self.MANUAL_DEVICES_PREFERENCE_KEY, "") manual_instances = preferences.getValue(self.MANUAL_DEVICES_PREFERENCE_KEY).split(",") return manual_instances - ## Add a device to the current active machine. def _connectToOutputDevice(self, device: UltimakerNetworkedPrinterOutputDevice, machine: GlobalStack) -> None: + """Add a device to the current active machine.""" # Make sure users know that we no longer support legacy devices. if Version(device.firmwareVersion) < self.MIN_SUPPORTED_CLUSTER_VERSION: @@ -262,9 +282,10 @@ class LocalClusterOutputDeviceManager: if device.key not in output_device_manager.getOutputDeviceIds(): output_device_manager.addOutputDevice(device) - ## Nudge the user to start using Ultimaker Cloud. @staticmethod def _showCloudFlowMessage(device: LocalClusterOutputDevice) -> None: + """Nudge the user to start using Ultimaker Cloud.""" + if CuraApplication.getInstance().getMachineManager().activeMachineIsUsingCloudConnection: # This printer is already cloud connected, so we do not bother the user anymore. return diff --git a/plugins/UM3NetworkPrinting/src/Network/SendMaterialJob.py b/plugins/UM3NetworkPrinting/src/Network/SendMaterialJob.py index 49e088100d..90aa68eedb 100644 --- a/plugins/UM3NetworkPrinting/src/Network/SendMaterialJob.py +++ b/plugins/UM3NetworkPrinting/src/Network/SendMaterialJob.py @@ -16,27 +16,33 @@ if TYPE_CHECKING: from .LocalClusterOutputDevice import LocalClusterOutputDevice -## Asynchronous job to send material profiles to the printer. -# -# This way it won't freeze up the interface while sending those materials. class SendMaterialJob(Job): + """Asynchronous job to send material profiles to the printer. + + This way it won't freeze up the interface while sending those materials. + """ + def __init__(self, device: "LocalClusterOutputDevice") -> None: super().__init__() self.device = device # type: LocalClusterOutputDevice - ## Send the request to the printer and register a callback def run(self) -> None: + """Send the request to the printer and register a callback""" + self.device.getMaterials(on_finished = self._onGetMaterials) - ## Callback for when the remote materials were returned. def _onGetMaterials(self, materials: List[ClusterMaterial]) -> None: + """Callback for when the remote materials were returned.""" + remote_materials_by_guid = {material.guid: material for material in materials} self._sendMissingMaterials(remote_materials_by_guid) - ## Determine which materials should be updated and send them to the printer. - # \param remote_materials_by_guid The remote materials by GUID. def _sendMissingMaterials(self, remote_materials_by_guid: Dict[str, ClusterMaterial]) -> None: + """Determine which materials should be updated and send them to the printer. + + :param remote_materials_by_guid: The remote materials by GUID. + """ local_materials_by_guid = self._getLocalMaterials() if len(local_materials_by_guid) == 0: Logger.log("d", "There are no local materials to synchronize with the printer.") @@ -47,25 +53,31 @@ class SendMaterialJob(Job): return self._sendMaterials(material_ids_to_send) - ## From the local and remote materials, determine which ones should be synchronized. - # Makes a Set of id's containing only the id's of the materials that are not on the printer yet or the ones that - # are newer in Cura. - # \param local_materials The local materials by GUID. - # \param remote_materials The remote materials by GUID. @staticmethod def _determineMaterialsToSend(local_materials: Dict[str, LocalMaterial], remote_materials: Dict[str, ClusterMaterial]) -> Set[str]: + """From the local and remote materials, determine which ones should be synchronized. + + Makes a Set of id's containing only the id's of the materials that are not on the printer yet or the ones that + are newer in Cura. + :param local_materials: The local materials by GUID. + :param remote_materials: The remote materials by GUID. + """ + return { local_material.id for guid, local_material in local_materials.items() if guid not in remote_materials.keys() or local_material.version > remote_materials[guid].version } - ## Send the materials to the printer. - # The given materials will be loaded from disk en sent to to printer. - # The given id's will be matched with filenames of the locally stored materials. - # \param materials_to_send A set with id's of materials that must be sent. def _sendMaterials(self, materials_to_send: Set[str]) -> None: + """Send the materials to the printer. + + The given materials will be loaded from disk en sent to to printer. + The given id's will be matched with filenames of the locally stored materials. + :param materials_to_send: A set with id's of materials that must be sent. + """ + container_registry = CuraApplication.getInstance().getContainerRegistry() all_materials = container_registry.findInstanceContainersMetadata(type = "material") all_base_files = {material["base_file"] for material in all_materials if "base_file" in material} # Filters out uniques by making it a set. Don't include files without base file (i.e. empty material). @@ -83,12 +95,14 @@ class SendMaterialJob(Job): file_name = os.path.basename(file_path) self._sendMaterialFile(file_path, file_name, root_material_id) - ## Send a single material file to the printer. - # Also add the material signature file if that is available. - # \param file_path The path of the material file. - # \param file_name The name of the material file. - # \param material_id The ID of the material in the file. def _sendMaterialFile(self, file_path: str, file_name: str, material_id: str) -> None: + """Send a single material file to the printer. + + Also add the material signature file if that is available. + :param file_path: The path of the material file. + :param file_name: The name of the material file. + :param material_id: The ID of the material in the file. + """ parts = [] # Add the material file. @@ -112,8 +126,9 @@ class SendMaterialJob(Job): self.device.postFormWithParts(target = "/cluster-api/v1/materials/", parts = parts, on_finished = self._sendingFinished) - ## Check a reply from an upload to the printer and log an error when the call failed def _sendingFinished(self, reply: QNetworkReply) -> None: + """Check a reply from an upload to the printer and log an error when the call failed""" + if reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) != 200: Logger.log("w", "Error while syncing material: %s", reply.errorString()) return @@ -125,11 +140,14 @@ class SendMaterialJob(Job): # Because of the guards above it is not shown when syncing failed (which is not always an actual problem). MaterialSyncMessage(self.device).show() - ## Retrieves a list of local materials - # Only the new newest version of the local materials is returned - # \return a dictionary of LocalMaterial objects by GUID @staticmethod def _getLocalMaterials() -> Dict[str, LocalMaterial]: + """Retrieves a list of local materials + + Only the new newest version of the local materials is returned + :return: a dictionary of LocalMaterial objects by GUID + """ + result = {} # type: Dict[str, LocalMaterial] all_materials = CuraApplication.getInstance().getContainerRegistry().findInstanceContainersMetadata(type = "material") all_base_files = [material for material in all_materials if material["id"] == material.get("base_file")] # Don't send materials without base_file: The empty material doesn't need to be sent. diff --git a/plugins/UM3NetworkPrinting/src/Network/ZeroConfClient.py b/plugins/UM3NetworkPrinting/src/Network/ZeroConfClient.py index 466638d99e..b41cd7d151 100644 --- a/plugins/UM3NetworkPrinting/src/Network/ZeroConfClient.py +++ b/plugins/UM3NetworkPrinting/src/Network/ZeroConfClient.py @@ -12,9 +12,11 @@ from UM.Signal import Signal from cura.CuraApplication import CuraApplication -## The ZeroConfClient handles all network discovery logic. -# It emits signals when new network services were found or disappeared. class ZeroConfClient: + """The ZeroConfClient handles all network discovery logic. + + It emits signals when new network services were found or disappeared. + """ # The discovery protocol name for Ultimaker printers. ZERO_CONF_NAME = u"_ultimaker._tcp.local." @@ -30,10 +32,13 @@ class ZeroConfClient: self._service_changed_request_event = None # type: Optional[Event] self._service_changed_request_thread = None # type: Optional[Thread] - ## The ZeroConf service changed requests are handled in a separate thread so we don't block the UI. - # We can also re-schedule the requests when they fail to get detailed service info. - # Any new or re-reschedule requests will be appended to the request queue and the thread will process them. def start(self) -> None: + """The ZeroConf service changed requests are handled in a separate thread so we don't block the UI. + + We can also re-schedule the requests when they fail to get detailed service info. + Any new or re-reschedule requests will be appended to the request queue and the thread will process them. + """ + self._service_changed_request_queue = Queue() self._service_changed_request_event = Event() try: @@ -56,16 +61,18 @@ class ZeroConfClient: self._zero_conf_browser.cancel() self._zero_conf_browser = None - ## Handles a change is discovered network services. def _queueService(self, zeroconf: Zeroconf, service_type, name: str, state_change: ServiceStateChange) -> None: + """Handles a change is discovered network services.""" + item = (zeroconf, service_type, name, state_change) if not self._service_changed_request_queue or not self._service_changed_request_event: return self._service_changed_request_queue.put(item) self._service_changed_request_event.set() - ## Callback for when a ZeroConf service has changes. def _handleOnServiceChangedRequests(self) -> None: + """Callback for when a ZeroConf service has changes.""" + if not self._service_changed_request_queue or not self._service_changed_request_event: return @@ -98,19 +105,23 @@ class ZeroConfClient: for request in reschedule_requests: self._service_changed_request_queue.put(request) - ## Handler for zeroConf detection. - # Return True or False indicating if the process succeeded. - # Note that this function can take over 3 seconds to complete. Be careful calling it from the main thread. - def _onServiceChanged(self, zero_conf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange - ) -> bool: + def _onServiceChanged(self, zero_conf: Zeroconf, service_type: str, name: str, + state_change: ServiceStateChange) -> bool: + """Handler for zeroConf detection. + + Return True or False indicating if the process succeeded. + Note that this function can take over 3 seconds to complete. Be careful calling it from the main thread. + """ + if state_change == ServiceStateChange.Added: return self._onServiceAdded(zero_conf, service_type, name) elif state_change == ServiceStateChange.Removed: return self._onServiceRemoved(name) return True - ## Handler for when a ZeroConf service was added. def _onServiceAdded(self, zero_conf: Zeroconf, service_type: str, name: str) -> bool: + """Handler for when a ZeroConf service was added.""" + # First try getting info from zero-conf cache info = ServiceInfo(service_type, name, properties={}) for record in zero_conf.cache.entries_with_name(name.lower()): @@ -141,8 +152,9 @@ class ZeroConfClient: return True - ## Handler for when a ZeroConf service was removed. def _onServiceRemoved(self, name: str) -> bool: + """Handler for when a ZeroConf service was removed.""" + Logger.log("d", "ZeroConf service removed: %s" % name) self.removedNetworkCluster.emit(str(name)) return True diff --git a/plugins/UM3NetworkPrinting/src/UM3OutputDevicePlugin.py b/plugins/UM3NetworkPrinting/src/UM3OutputDevicePlugin.py index 3ab37297b5..72b2da33cc 100644 --- a/plugins/UM3NetworkPrinting/src/UM3OutputDevicePlugin.py +++ b/plugins/UM3NetworkPrinting/src/UM3OutputDevicePlugin.py @@ -13,11 +13,11 @@ from .Network.LocalClusterOutputDeviceManager import LocalClusterOutputDeviceMan from .Cloud.CloudOutputDeviceManager import CloudOutputDeviceManager -## This plugin handles the discovery and networking for Ultimaker 3D printers that support network and cloud printing. class UM3OutputDevicePlugin(OutputDevicePlugin): - - # Signal emitted when the list of discovered devices changed. Used by printer action in this plugin. + """This plugin handles the discovery and networking for Ultimaker 3D printers""" + discoveredDevicesChanged = Signal() + """Signal emitted when the list of discovered devices changed. Used by printer action in this plugin.""" def __init__(self) -> None: super().__init__() @@ -33,8 +33,9 @@ class UM3OutputDevicePlugin(OutputDevicePlugin): # This ensures no output devices are still connected that do not belong to the new active machine. CuraApplication.getInstance().globalContainerStackChanged.connect(self.refreshConnections) - ## Start looking for devices in the network and cloud. def start(self): + """Start looking for devices in the network and cloud.""" + self._network_output_device_manager.start() self._cloud_output_device_manager.start() @@ -43,31 +44,38 @@ class UM3OutputDevicePlugin(OutputDevicePlugin): self._network_output_device_manager.stop() self._cloud_output_device_manager.stop() - ## Restart network discovery. def startDiscovery(self) -> None: + """Restart network discovery.""" + self._network_output_device_manager.startDiscovery() - ## Force refreshing the network connections. def refreshConnections(self) -> None: + """Force refreshing the network connections.""" + self._network_output_device_manager.refreshConnections() self._cloud_output_device_manager.refreshConnections() - ## Indicate that this plugin supports adding networked printers manually. def canAddManualDevice(self, address: str = "") -> ManualDeviceAdditionAttempt: + """Indicate that this plugin supports adding networked printers manually.""" + return ManualDeviceAdditionAttempt.PRIORITY - ## Add a networked printer manually based on its network address. def addManualDevice(self, address: str, callback: Optional[Callable[[bool, str], None]] = None) -> None: + """Add a networked printer manually based on its network address.""" + self._network_output_device_manager.addManualDevice(address, callback) - ## Remove a manually connected networked printer. def removeManualDevice(self, key: str, address: Optional[str] = None) -> None: + """Remove a manually connected networked printer.""" + self._network_output_device_manager.removeManualDevice(key, address) - ## Get the discovered devices from the local network. def getDiscoveredDevices(self) -> Dict[str, LocalClusterOutputDevice]: + """Get the discovered devices from the local network.""" + return self._network_output_device_manager.getDiscoveredDevices() - ## Connect the active machine to a device. def associateActiveMachineWithPrinterDevice(self, device: LocalClusterOutputDevice) -> None: + """Connect the active machine to a device.""" + self._network_output_device_manager.associateActiveMachineWithPrinterDevice(device) diff --git a/plugins/UM3NetworkPrinting/src/UltimakerNetworkedPrinterAction.py b/plugins/UM3NetworkPrinting/src/UltimakerNetworkedPrinterAction.py index 8c5f5c12ea..dd3f3939ed 100644 --- a/plugins/UM3NetworkPrinting/src/UltimakerNetworkedPrinterAction.py +++ b/plugins/UM3NetworkPrinting/src/UltimakerNetworkedPrinterAction.py @@ -15,9 +15,11 @@ from .Network.LocalClusterOutputDevice import LocalClusterOutputDevice I18N_CATALOG = i18nCatalog("cura") -## Machine action that allows to connect the active machine to a networked devices. -# TODO: in the future this should be part of the new discovery workflow baked into Cura. class UltimakerNetworkedPrinterAction(MachineAction): + """Machine action that allows to connect the active machine to a networked devices. + + TODO: in the future this should be part of the new discovery workflow baked into Cura. + """ # Signal emitted when discovered devices have changed. discoveredDevicesChanged = pyqtSignal() @@ -27,59 +29,69 @@ class UltimakerNetworkedPrinterAction(MachineAction): self._qml_url = "resources/qml/DiscoverUM3Action.qml" self._network_plugin = None # type: Optional[UM3OutputDevicePlugin] - ## Override the default value. def needsUserInteraction(self) -> bool: + """Override the default value.""" + return False - ## Start listening to network discovery events via the plugin. @pyqtSlot(name = "startDiscovery") def startDiscovery(self) -> None: + """Start listening to network discovery events via the plugin.""" + self._networkPlugin.discoveredDevicesChanged.connect(self._onDeviceDiscoveryChanged) self.discoveredDevicesChanged.emit() # trigger at least once to populate the list - ## Reset the discovered devices. @pyqtSlot(name = "reset") def reset(self) -> None: + """Reset the discovered devices.""" + self.discoveredDevicesChanged.emit() # trigger to reset the list - ## Reset the discovered devices. @pyqtSlot(name = "restartDiscovery") def restartDiscovery(self) -> None: + """Reset the discovered devices.""" + self._networkPlugin.startDiscovery() self.discoveredDevicesChanged.emit() # trigger to reset the list - ## Remove a manually added device. @pyqtSlot(str, str, name = "removeManualDevice") def removeManualDevice(self, key: str, address: str) -> None: + """Remove a manually added device.""" + self._networkPlugin.removeManualDevice(key, address) - ## Add a new manual device. Can replace an existing one by key. @pyqtSlot(str, str, name = "setManualDevice") def setManualDevice(self, key: str, address: str) -> None: + """Add a new manual device. Can replace an existing one by key.""" + if key != "": self._networkPlugin.removeManualDevice(key) if address != "": self._networkPlugin.addManualDevice(address) - ## Get the devices discovered in the local network sorted by name. @pyqtProperty("QVariantList", notify = discoveredDevicesChanged) def foundDevices(self): + """Get the devices discovered in the local network sorted by name.""" + discovered_devices = list(self._networkPlugin.getDiscoveredDevices().values()) discovered_devices.sort(key = lambda d: d.name) return discovered_devices - ## Connect a device selected in the list with the active machine. @pyqtSlot(QObject, name = "associateActiveMachineWithPrinterDevice") def associateActiveMachineWithPrinterDevice(self, device: LocalClusterOutputDevice) -> None: + """Connect a device selected in the list with the active machine.""" + self._networkPlugin.associateActiveMachineWithPrinterDevice(device) - ## Callback for when the list of discovered devices in the plugin was changed. def _onDeviceDiscoveryChanged(self) -> None: + """Callback for when the list of discovered devices in the plugin was changed.""" + self.discoveredDevicesChanged.emit() - ## Get the network manager from the plugin. @property def _networkPlugin(self) -> UM3OutputDevicePlugin: + """Get the network manager from the plugin.""" + if not self._network_plugin: output_device_manager = CuraApplication.getInstance().getOutputDeviceManager() network_plugin = output_device_manager.getOutputDevicePlugin("UM3NetworkPrinting") diff --git a/plugins/UM3NetworkPrinting/src/UltimakerNetworkedPrinterOutputDevice.py b/plugins/UM3NetworkPrinting/src/UltimakerNetworkedPrinterOutputDevice.py index f50bab8a1f..8090177b83 100644 --- a/plugins/UM3NetworkPrinting/src/UltimakerNetworkedPrinterOutputDevice.py +++ b/plugins/UM3NetworkPrinting/src/UltimakerNetworkedPrinterOutputDevice.py @@ -22,10 +22,12 @@ from .Models.Http.ClusterPrinterStatus import ClusterPrinterStatus from .Models.Http.ClusterPrintJobStatus import ClusterPrintJobStatus -## Output device class that forms the basis of Ultimaker networked printer output devices. -# Currently used for local networking and cloud printing using Ultimaker Connect. -# This base class primarily contains all the Qt properties and slots needed for the monitor page to work. class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice): + """Output device class that forms the basis of Ultimaker networked printer output devices. + + Currently used for local networking and cloud printing using Ultimaker Connect. + This base class primarily contains all the Qt properties and slots needed for the monitor page to work. + """ META_NETWORK_KEY = "um_network_key" META_CLUSTER_ID = "um_cloud_cluster_id" @@ -85,14 +87,16 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice): # The job upload progress message modal. self._progress = PrintJobUploadProgressMessage() - ## The IP address of the printer. @pyqtProperty(str, constant=True) def address(self) -> str: + """The IP address of the printer.""" + return self._address - ## The display name of the printer. @pyqtProperty(str, constant=True) def printerTypeName(self) -> str: + """The display name of the printer.""" + return self._printer_type_name # Get all print jobs for this cluster. @@ -157,13 +161,15 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice): self._active_printer = printer self.activePrinterChanged.emit() - ## Whether the printer that this output device represents supports print job actions via the local network. @pyqtProperty(bool, constant=True) def supportsPrintJobActions(self) -> bool: + """Whether the printer that this output device represents supports print job actions via the local network.""" + return True - ## Set the remote print job state. def setJobState(self, print_job_uuid: str, state: str) -> None: + """Set the remote print job state.""" + raise NotImplementedError("setJobState must be implemented") @pyqtSlot(str, name="sendJobToTop") @@ -210,11 +216,13 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice): self._checkStillConnected() super()._update() - ## Check if we're still connected by comparing the last timestamps for network response and the current time. - # This implementation is similar to the base NetworkedPrinterOutputDevice, but is tweaked slightly. - # Re-connecting is handled automatically by the output device managers in this plugin. - # TODO: it would be nice to have this logic in the managers, but connecting those with signals causes crashes. def _checkStillConnected(self) -> None: + """Check if we're still connected by comparing the last timestamps for network response and the current time. + + This implementation is similar to the base NetworkedPrinterOutputDevice, but is tweaked slightly. + Re-connecting is handled automatically by the output device managers in this plugin. + TODO: it would be nice to have this logic in the managers, but connecting those with signals causes crashes. + """ time_since_last_response = time() - self._time_of_last_response if time_since_last_response > self.NETWORK_RESPONSE_CONSIDER_OFFLINE: self.setConnectionState(ConnectionState.Closed) @@ -223,9 +231,11 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice): elif self.connectionState == ConnectionState.Closed: self._reconnectForActiveMachine() - ## Reconnect for the active output device. - # Does nothing if the device is not meant for the active machine. def _reconnectForActiveMachine(self) -> None: + """Reconnect for the active output device. + + Does nothing if the device is not meant for the active machine. + """ active_machine = CuraApplication.getInstance().getGlobalContainerStack() if not active_machine: return @@ -281,16 +291,19 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice): self.printersChanged.emit() self._checkIfClusterHost() - ## Check is this device is a cluster host and takes the needed actions when it is not. def _checkIfClusterHost(self): + """Check is this device is a cluster host and takes the needed actions when it is not.""" + if len(self._printers) < 1 and self.isConnected(): NotClusterHostMessage(self).show() self.close() CuraApplication.getInstance().getOutputDeviceManager().removeOutputDevice(self.key) - ## Updates the local list of print jobs with the list received from the cluster. - # \param remote_jobs: The print jobs received from the cluster. def _updatePrintJobs(self, remote_jobs: List[ClusterPrintJobStatus]) -> None: + """Updates the local list of print jobs with the list received from the cluster. + + :param remote_jobs: The print jobs received from the cluster. + """ self._responseReceived() # Keep track of the new print jobs to show. @@ -321,9 +334,11 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice): self._print_jobs = new_print_jobs self.printJobsChanged.emit() - ## Create a new print job model based on the remote status of the job. - # \param remote_job: The remote print job data. def _createPrintJobModel(self, remote_job: ClusterPrintJobStatus) -> UM3PrintJobOutputModel: + """Create a new print job model based on the remote status of the job. + + :param remote_job: The remote print job data. + """ model = remote_job.createOutputModel(ClusterOutputController(self)) if remote_job.printer_uuid: self._updateAssignedPrinter(model, remote_job.printer_uuid) @@ -333,16 +348,18 @@ class UltimakerNetworkedPrinterOutputDevice(NetworkedPrinterOutputDevice): model.loadPreviewImageFromUrl(remote_job.preview_url) return model - ## Updates the printer assignment for the given print job model. def _updateAssignedPrinter(self, model: UM3PrintJobOutputModel, printer_uuid: str) -> None: + """Updates the printer assignment for the given print job model.""" + printer = next((p for p in self._printers if printer_uuid == p.key), None) if not printer: return printer.updateActivePrintJob(model) model.updateAssignedPrinter(printer) - ## Load Monitor tab QML. def _loadMonitorTab(self) -> None: + """Load Monitor tab QML.""" + plugin_registry = CuraApplication.getInstance().getPluginRegistry() if not plugin_registry: Logger.log("e", "Could not get plugin registry")