Convert doxygen to rst for UM3NetworkPrinting

This commit is contained in:
Nino van Hooff 2020-05-15 15:05:38 +02:00
parent de82406782
commit 5eb5ffd916
38 changed files with 797 additions and 487 deletions

View file

@ -35,11 +35,13 @@ from ..Models.Http.ClusterPrintJobStatus import ClusterPrintJobStatus
I18N_CATALOG = i18nCatalog("cura")
## The cloud output device is a network output device that works remotely but has limited functionality.
# Currently it only supports viewing the printer and print job status and adding a new job to the queue.
# As such, those methods have been implemented here.
# Note that this device represents a single remote cluster, not a list of multiple clusters.
class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
"""The cloud output device is a network output device that works remotely but has limited functionality.
Currently it only supports viewing the printer and print job status and adding a new job to the queue.
As such, those methods have been implemented here.
Note that this device represents a single remote cluster, not a list of multiple clusters.
"""
# The interval with which the remote cluster is checked.
# We can do this relatively often as this API call is quite fast.
@ -56,11 +58,13 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
# Therefore we create a private signal used to trigger the printersChanged signal.
_cloudClusterPrintersChanged = pyqtSignal()
## Creates a new cloud output device
# \param api_client: The client that will run the API calls
# \param cluster: The device response received from the cloud API.
# \param parent: The optional parent of this output device.
def __init__(self, api_client: CloudApiClient, cluster: CloudClusterResponse, parent: QObject = None) -> None:
"""Creates a new cloud output device
:param api_client: The client that will run the API calls
:param cluster: The device response received from the cloud API.
:param parent: The optional parent of this output device.
"""
# The following properties are expected on each networked output device.
# Because the cloud connection does not off all of these, we manually construct this version here.
@ -99,8 +103,9 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
self._tool_path = None # type: Optional[bytes]
self._uploaded_print_job = None # type: Optional[CloudPrintJobResponse]
## Connects this device.
def connect(self) -> None:
"""Connects this device."""
if self.isConnected():
return
super().connect()
@ -108,21 +113,24 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
CuraApplication.getInstance().getBackend().backendStateChange.connect(self._onBackendStateChange)
self._update()
## Disconnects the device
def disconnect(self) -> None:
"""Disconnects the device"""
if not self.isConnected():
return
super().disconnect()
Logger.log("i", "Disconnected from cluster %s", self.key)
CuraApplication.getInstance().getBackend().backendStateChange.disconnect(self._onBackendStateChange)
## Resets the print job that was uploaded to force a new upload, runs whenever the user re-slices.
def _onBackendStateChange(self, _: BackendState) -> None:
"""Resets the print job that was uploaded to force a new upload, runs whenever the user re-slices."""
self._tool_path = None
self._uploaded_print_job = None
## Checks whether the given network key is found in the cloud's host name
def matchesNetworkKey(self, network_key: str) -> bool:
"""Checks whether the given network key is found in the cloud's host name"""
# Typically, a network key looks like "ultimakersystem-aabbccdd0011._ultimaker._tcp.local."
# the host name should then be "ultimakersystem-aabbccdd0011"
if network_key.startswith(str(self.clusterData.host_name or "")):
@ -133,15 +141,17 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
return True
return False
## Set all the interface elements and texts for this output device.
def _setInterfaceElements(self) -> None:
"""Set all the interface elements and texts for this output device."""
self.setPriority(2) # Make sure we end up below the local networking and above 'save to file'.
self.setShortDescription(I18N_CATALOG.i18nc("@action:button", "Print via Cloud"))
self.setDescription(I18N_CATALOG.i18nc("@properties:tooltip", "Print via Cloud"))
self.setConnectionText(I18N_CATALOG.i18nc("@info:status", "Connected via Cloud"))
## Called when the network data should be updated.
def _update(self) -> None:
"""Called when the network data should be updated."""
super()._update()
if time() - self._time_of_last_request < self.CHECK_CLUSTER_INTERVAL:
return # avoid calling the cloud too often
@ -153,9 +163,11 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
else:
self.setAuthenticationState(AuthState.NotAuthenticated)
## Method called when HTTP request to status endpoint is finished.
# Contains both printers and print jobs statuses in a single response.
def _onStatusCallFinished(self, status: CloudClusterStatus) -> None:
"""Method called when HTTP request to status endpoint is finished.
Contains both printers and print jobs statuses in a single response.
"""
self._responseReceived()
if status.printers != self._received_printers:
self._received_printers = status.printers
@ -164,10 +176,11 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
self._received_print_jobs = status.print_jobs
self._updatePrintJobs(status.print_jobs)
## Called when Cura requests an output device to receive a (G-code) file.
def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False,
file_handler: Optional[FileHandler] = None, filter_by_machine: bool = False, **kwargs) -> None:
"""Called when Cura requests an output device to receive a (G-code) file."""
# Show an error message if we're already sending a job.
if self._progress.visible:
PrintJobUploadBlockedMessage().show()
@ -187,9 +200,11 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
job.finished.connect(self._onPrintJobCreated)
job.start()
## Handler for when the print job was created locally.
# It can now be sent over the cloud.
def _onPrintJobCreated(self, job: ExportFileJob) -> None:
"""Handler for when the print job was created locally.
It can now be sent over the cloud.
"""
output = job.getOutput()
self._tool_path = output # store the tool path to prevent re-uploading when printing the same file again
file_name = job.getFileName()
@ -200,9 +215,11 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
)
self._api.requestUpload(request, self._uploadPrintJob)
## Uploads the mesh when the print job was registered with the cloud API.
# \param job_response: The response received from the cloud API.
def _uploadPrintJob(self, job_response: CloudPrintJobResponse) -> None:
"""Uploads the mesh when the print job was registered with the cloud API.
:param job_response: The response received from the cloud API.
"""
if not self._tool_path:
return self._onUploadError()
self._progress.show()
@ -210,38 +227,45 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
self._api.uploadToolPath(job_response, self._tool_path, self._onPrintJobUploaded, self._progress.update,
self._onUploadError)
## Requests the print to be sent to the printer when we finished uploading the mesh.
def _onPrintJobUploaded(self) -> None:
"""Requests the print to be sent to the printer when we finished uploading the mesh."""
self._progress.update(100)
print_job = cast(CloudPrintJobResponse, self._uploaded_print_job)
self._api.requestPrint(self.key, print_job.job_id, self._onPrintUploadCompleted)
## Shows a message when the upload has succeeded
# \param response: The response from the cloud API.
def _onPrintUploadCompleted(self, response: CloudPrintResponse) -> None:
"""Shows a message when the upload has succeeded
:param response: The response from the cloud API.
"""
self._progress.hide()
PrintJobUploadSuccessMessage().show()
self.writeFinished.emit()
## Displays the given message if uploading the mesh has failed
# \param message: The message to display.
def _onUploadError(self, message: str = None) -> None:
"""Displays the given message if uploading the mesh has failed
:param message: The message to display.
"""
self._progress.hide()
self._uploaded_print_job = None
PrintJobUploadErrorMessage(message).show()
self.writeError.emit()
## Whether the printer that this output device represents supports print job actions via the cloud.
@pyqtProperty(bool, notify=_cloudClusterPrintersChanged)
def supportsPrintJobActions(self) -> bool:
"""Whether the printer that this output device represents supports print job actions via the cloud."""
if not self._printers:
return False
version_number = self.printers[0].firmwareVersion.split(".")
firmware_version = Version([version_number[0], version_number[1], version_number[2]])
return firmware_version >= self.PRINT_JOB_ACTIONS_MIN_VERSION
## Set the remote print job state.
def setJobState(self, print_job_uuid: str, state: str) -> None:
"""Set the remote print job state."""
self._api.doPrintJobAction(self._cluster.cluster_id, print_job_uuid, state)
@pyqtSlot(str, name="sendJobToTop")
@ -265,18 +289,21 @@ class CloudOutputDevice(UltimakerNetworkedPrinterOutputDevice):
def openPrinterControlPanel(self) -> None:
QDesktopServices.openUrl(QUrl(self.clusterCloudUrl))
## Gets the cluster response from which this device was created.
@property
def clusterData(self) -> CloudClusterResponse:
"""Gets the cluster response from which this device was created."""
return self._cluster
## Updates the cluster data from the cloud.
@clusterData.setter
def clusterData(self, value: CloudClusterResponse) -> None:
"""Updates the cluster data from the cloud."""
self._cluster = value
## Gets the URL on which to monitor the cluster via the cloud.
@property
def clusterCloudUrl(self) -> str:
"""Gets the URL on which to monitor the cluster via the cloud."""
root_url_prefix = "-staging" if self._account.is_staging else ""
return "https://mycloud{}.ultimaker.com/app/jobs/{}".format(root_url_prefix, self.clusterData.cluster_id)