Merge pull request #20940 from Ultimaker/CURA-12624_clean_auth_for_compliance_RED3_3

[CURA-12624] Authentication for compliance with RED3.3
This commit is contained in:
HellAholic 2025-09-12 11:33:19 +02:00 committed by GitHub
commit 85c759fbb2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 132 additions and 25 deletions

View file

@ -288,9 +288,11 @@ class NetworkedPrinterOutputDevice(PrinterOutputDevice):
def postFormWithParts(self, target: str, parts: List[QHttpPart],
on_finished: Optional[Callable[[QNetworkReply], None]],
on_progress: Optional[Callable[[int, int], None]] = None) -> QNetworkReply:
on_progress: Optional[Callable[[int, int], None]] = None,
request: Optional[QNetworkRequest] = None) -> QNetworkReply:
self._validateManager()
request = self._createEmptyRequest(target, content_type=None)
if request is None:
request = self._createEmptyRequest(target, content_type=None)
multi_post_part = QHttpMultiPart(QHttpMultiPart.ContentType.FormDataType)
for part in parts:
multi_post_part.append(part)

View file

@ -1,6 +1,11 @@
# Copyright (c) 2019 Ultimaker B.V.
# Copyright (c) 2025 UltiMaker
# Cura is released under the terms of the LGPLv3 or higher.
import hashlib
import json
import platform
import re
import secrets
from enum import StrEnum
from json import JSONDecodeError
from typing import Callable, List, Optional, Dict, Union, Any, Type, cast, TypeVar, Tuple
@ -9,6 +14,8 @@ from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkRepl
from UM.Logger import Logger
from cura.CuraApplication import CuraApplication
from ..Models.BaseModel import BaseModel
from ..Models.Http.ClusterPrintJobStatus import ClusterPrintJobStatus
from ..Models.Http.ClusterPrinterStatus import ClusterPrinterStatus
@ -20,6 +27,18 @@ ClusterApiClientModel = TypeVar("ClusterApiClientModel", bound=BaseModel)
"""The generic type variable used to document the methods below."""
class HttpRequestMethod(StrEnum):
GET = "GET",
HEAD = "HEAD",
POST = "POST",
PUT = "PUT",
DELETE = "DELETE",
CONNECT = "CONNECT",
OPTIONS = "OPTIONS",
TRACE = "TRACE",
PATCH = "PATCH",
class ClusterApiClient:
"""The ClusterApiClient is responsible for all network calls to local network clusters."""
@ -27,6 +46,13 @@ class ClusterApiClient:
PRINTER_API_PREFIX = "/api/v1"
CLUSTER_API_PREFIX = "/cluster-api/v1"
AUTH_REALM = "Jedi-API"
AUTH_QOP = "auth"
AUTH_NONCE_LEN = 16
AUTH_CNONCE_LEN = 8
AUTH_MAX_TRIES = 5
# In order to avoid garbage collection we keep the callbacks in this list.
_anti_gc_callbacks = [] # type: List[Callable[[], None]]
@ -40,6 +66,12 @@ class ClusterApiClient:
self._manager = QNetworkAccessManager()
self._address = address
self._on_error = on_error
self._auth_id = None
self._auth_key = None
self._auth_tries = 0
self._nonce_count = 1
self._nonce = None
def getSystem(self, on_finished: Callable) -> None:
"""Get printer system information.
@ -47,7 +79,7 @@ class ClusterApiClient:
:param on_finished: The callback in case the response is successful.
"""
url = "{}/system".format(self.PRINTER_API_PREFIX)
reply = self._manager.get(self._createEmptyRequest(url))
reply = self._manager.get(self.createEmptyRequest(url))
self._addCallback(reply, on_finished, PrinterSystemStatus)
def getMaterials(self, on_finished: Callable[[List[ClusterMaterial]], Any]) -> None:
@ -56,7 +88,7 @@ class ClusterApiClient:
:param on_finished: The callback in case the response is successful.
"""
url = "{}/materials".format(self.CLUSTER_API_PREFIX)
reply = self._manager.get(self._createEmptyRequest(url))
reply = self._manager.get(self.createEmptyRequest(url))
self._addCallback(reply, on_finished, ClusterMaterial)
def getPrinters(self, on_finished: Callable[[List[ClusterPrinterStatus]], Any]) -> None:
@ -65,7 +97,7 @@ class ClusterApiClient:
:param on_finished: The callback in case the response is successful.
"""
url = "{}/printers".format(self.CLUSTER_API_PREFIX)
reply = self._manager.get(self._createEmptyRequest(url))
reply = self._manager.get(self.createEmptyRequest(url))
self._addCallback(reply, on_finished, ClusterPrinterStatus)
def getPrintJobs(self, on_finished: Callable[[List[ClusterPrintJobStatus]], Any]) -> None:
@ -74,26 +106,26 @@ class ClusterApiClient:
:param on_finished: The callback in case the response is successful.
"""
url = "{}/print_jobs".format(self.CLUSTER_API_PREFIX)
reply = self._manager.get(self._createEmptyRequest(url))
reply = self._manager.get(self.createEmptyRequest(url))
self._addCallback(reply, on_finished, ClusterPrintJobStatus)
def movePrintJobToTop(self, print_job_uuid: str) -> None:
"""Move a print job to the top of the queue."""
url = "{}/print_jobs/{}/action/move".format(self.CLUSTER_API_PREFIX, print_job_uuid)
self._manager.post(self._createEmptyRequest(url), json.dumps({"to_position": 0, "list": "queued"}).encode())
self._manager.post(self.createEmptyRequest(url, method=HttpRequestMethod.POST), json.dumps({"to_position": 0, "list": "queued"}).encode())
def forcePrintJob(self, print_job_uuid: str) -> None:
"""Override print job configuration and force it to be printed."""
url = "{}/print_jobs/{}".format(self.CLUSTER_API_PREFIX, print_job_uuid)
self._manager.put(self._createEmptyRequest(url), json.dumps({"force": True}).encode())
self._manager.put(self.createEmptyRequest(url, method=HttpRequestMethod.PUT), json.dumps({"force": True}).encode())
def deletePrintJob(self, print_job_uuid: str) -> None:
"""Delete a print job from the queue."""
url = "{}/print_jobs/{}".format(self.CLUSTER_API_PREFIX, print_job_uuid)
self._manager.deleteResource(self._createEmptyRequest(url))
self._manager.deleteResource(self.createEmptyRequest(url, method=HttpRequestMethod.DELETE))
def setPrintJobState(self, print_job_uuid: str, state: str) -> None:
"""Set the state of a print job."""
@ -101,25 +133,33 @@ class ClusterApiClient:
url = "{}/print_jobs/{}/action".format(self.CLUSTER_API_PREFIX, print_job_uuid)
# We rewrite 'resume' to 'print' here because we are using the old print job action endpoints.
action = "print" if state == "resume" else state
self._manager.put(self._createEmptyRequest(url), json.dumps({"action": action}).encode())
self._manager.put(self.createEmptyRequest(url, method=HttpRequestMethod.PUT), json.dumps({"action": action}).encode())
def getPrintJobPreviewImage(self, print_job_uuid: str, on_finished: Callable) -> None:
"""Get the preview image data of a print job."""
url = "{}/print_jobs/{}/preview_image".format(self.CLUSTER_API_PREFIX, print_job_uuid)
reply = self._manager.get(self._createEmptyRequest(url))
reply = self._manager.get(self.createEmptyRequest(url))
self._addCallback(reply, on_finished)
def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
def createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json", method: HttpRequestMethod = HttpRequestMethod.GET, skip_auth: bool = False) -> QNetworkRequest:
"""We override _createEmptyRequest in order to add the user credentials.
:param url: The URL to request
:param path: Part added to the base-endpoint forming the total request URL (the path from the endpoint to the requested resource).
:param content_type: The type of the body contents.
:param method: The HTTP method to use, such as GET, POST, PUT, etc.
:param skip_auth: Skips the authentication step if set; prevents a loop on request of authentication token.
"""
url = QUrl("http://" + self._address + path)
request = QNetworkRequest(url)
if content_type:
request.setHeader(QNetworkRequest.KnownHeaders.ContentTypeHeader, content_type)
if self._auth_id and self._auth_key:
digest_str = self._makeAuthDigestHeaderPart(path, method=method)
request.setRawHeader(b"Authorization", f"Digest {digest_str}".encode("utf-8"))
self._nonce_count += 1
elif not skip_auth:
self._setupAuth()
return request
@staticmethod
@ -158,6 +198,64 @@ class ClusterApiClient:
except (JSONDecodeError, TypeError, ValueError):
Logger.log("e", "Could not parse response from network: %s", str(response))
def _makeAuthDigestHeaderPart(self, url_part: str, method: HttpRequestMethod = HttpRequestMethod.GET) -> str:
""" Make the data-part for a Digest Authentication HTTP-header.
:param url_part: The part of the URL beyond the host name.
:param method: The HTTP method to use, such as GET, POST, PUT, etc.
:return: A string with the data, can be used as in `f"Digest {return_value}".encode()`.
"""
def sha256_utf8(x: str) -> str:
return hashlib.sha256(x.encode("utf-8")).hexdigest()
nonce = secrets.token_hex(ClusterApiClient.AUTH_NONCE_LEN) if self._nonce is None else self._nonce
cnonce = secrets.token_hex(ClusterApiClient.AUTH_CNONCE_LEN)
auth_nc = f"{self._nonce_count:08x}"
ha1 = sha256_utf8(f"{self._auth_id}:{ClusterApiClient.AUTH_REALM}:{self._auth_key}")
ha2 = sha256_utf8(f"{method}:{url_part}")
resp_digest = sha256_utf8(f"{ha1}:{nonce}:{auth_nc}:{cnonce}:{ClusterApiClient.AUTH_QOP}:{ha2}")
return ", ".join([
f'username="{self._auth_id}"',
f'realm="{ClusterApiClient.AUTH_REALM}"',
f'nonce="{nonce}"',
f'uri="{url_part}"',
f'nc={auth_nc}',
f'cnonce="{cnonce}"',
f'qop={ClusterApiClient.AUTH_QOP}',
f'response="{resp_digest}"',
f'algorithm="SHA-256"'
])
def _setupAuth(self) -> None:
""" Handles the setup process for authentication by making a temporary digest-token request to the printer API.
"""
if self._auth_tries >= ClusterApiClient.AUTH_MAX_TRIES:
Logger.warning("Maximum authorization temporary digest-token request tries exceeded. Is printer-firmware up to date?")
return
def on_finished(resp) -> None:
self._auth_tries += 1
try:
auth_info = json.loads(resp.data().decode())
self._auth_id = auth_info["id"]
self._auth_key = auth_info["key"]
except Exception as ex:
Logger.warning(f"Couldn't get temporary digest token: {str(ex)}")
return
self._auth_tries = 0
url = "{}/auth/request".format(self.PRINTER_API_PREFIX)
request_body = json.dumps({
"application": CuraApplication.getInstance().getApplicationDisplayName(),
"user": f"user@{platform.node()}",
}).encode("utf-8")
reply = self._manager.post(self.createEmptyRequest(url, method=HttpRequestMethod.POST, skip_auth=True), request_body)
self._addCallback(reply, on_finished)
def _addCallback(self, reply: QNetworkReply, on_finished: Union[Callable[[ClusterApiClientModel], Any],
Callable[[List[ClusterApiClientModel]], Any]], model: Type[ClusterApiClientModel] = None,
) -> None:
@ -179,6 +277,11 @@ class ClusterApiClient:
return
if reply.error() != QNetworkReply.NetworkError.NoError:
if reply.error() == QNetworkReply.NetworkError.AuthenticationRequiredError:
nonce_match = re.search(r'nonce="([^"]+)', str(reply.rawHeader(b"WWW-Authenticate")))
if nonce_match:
self._nonce = nonce_match.group(1)
self._nonce_count = 1
self._on_error(reply.errorString())
return

View file

@ -94,15 +94,15 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice):
@pyqtSlot(str, name="sendJobToTop")
def sendJobToTop(self, print_job_uuid: str) -> None:
self._getApiClient().movePrintJobToTop(print_job_uuid)
self.getApiClient().movePrintJobToTop(print_job_uuid)
@pyqtSlot(str, name="deleteJobFromQueue")
def deleteJobFromQueue(self, print_job_uuid: str) -> None:
self._getApiClient().deletePrintJob(print_job_uuid)
self.getApiClient().deletePrintJob(print_job_uuid)
@pyqtSlot(str, name="forceSendJob")
def forceSendJob(self, print_job_uuid: str) -> None:
self._getApiClient().forcePrintJob(print_job_uuid)
self.getApiClient().forcePrintJob(print_job_uuid)
def setJobState(self, print_job_uuid: str, action: str) -> None:
"""Set the remote print job state.
@ -111,20 +111,20 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice):
:param action: The action to undertake ('pause', 'resume', 'abort').
"""
self._getApiClient().setPrintJobState(print_job_uuid, action)
self.getApiClient().setPrintJobState(print_job_uuid, action)
def _update(self) -> None:
super()._update()
if time() - self._time_of_last_request < self.CHECK_CLUSTER_INTERVAL:
return # avoid calling the cluster too often
self._getApiClient().getPrinters(self._updatePrinters)
self._getApiClient().getPrintJobs(self._updatePrintJobs)
self.getApiClient().getPrinters(self._updatePrinters)
self.getApiClient().getPrintJobs(self._updatePrintJobs)
self._updatePrintJobPreviewImages()
def getMaterials(self, on_finished: Callable[[List[ClusterMaterial]], Any]) -> None:
"""Get a list of materials that are installed on the cluster host."""
self._getApiClient().getMaterials(on_finished = on_finished)
self.getApiClient().getMaterials(on_finished = on_finished)
def sendMaterialProfiles(self) -> None:
"""Sync the material profiles in Cura with the printer.
@ -204,7 +204,8 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice):
parts.append(self._createFormPart("name=require_printer_name", bytes(unique_name, "utf-8"), "text/plain"))
# FIXME: move form posting to API client
self.postFormWithParts("/cluster-api/v1/print_jobs/", parts, on_finished=self._onPrintUploadCompleted,
on_progress=self._onPrintJobUploadProgress)
on_progress=self._onPrintJobUploadProgress,
request=self.getApiClient().createEmptyRequest("/cluster-api/v1/print_jobs/", content_type=None, method="POST"))
self._active_exported_job = None
def _onPrintJobUploadProgress(self, bytes_sent: int, bytes_total: int) -> None:
@ -236,9 +237,9 @@ class LocalClusterOutputDevice(UltimakerNetworkedPrinterOutputDevice):
for print_job in self._print_jobs:
if print_job.getPreviewImage() is None:
self._getApiClient().getPrintJobPreviewImage(print_job.key, print_job.updatePreviewImageData)
self.getApiClient().getPrintJobPreviewImage(print_job.key, print_job.updatePreviewImageData)
def _getApiClient(self) -> ClusterApiClient:
def getApiClient(self) -> ClusterApiClient:
"""Get the API client instance."""
if not self._cluster_api:

View file

@ -147,7 +147,8 @@ class SendMaterialJob(Job):
# FIXME: move form posting to API client
self.device.postFormWithParts(target = "/cluster-api/v1/materials/", parts = parts,
on_finished = self._sendingFinished)
on_finished = self._sendingFinished,
request=self.device.getApiClient().createEmptyRequest("/cluster-api/v1/materials/", content_type=None, method="POST"))
def _sendingFinished(self, reply: QNetworkReply) -> None:
"""Check a reply from an upload to the printer and log an error when the call failed"""