Add improved authentication flow
Some checks failed
conan-package / conan-package (push) Has been cancelled
unit-test / Run unit tests (push) Has been cancelled

- Only make auth tokens active once `request/check` endpoint returns `authorized`
- Only invalidate tokens once authentication has failed 5 times and `request/check` endpoint returns `unauthorized`
- Don't store short-lived nonce counter

CURA-12862
This commit is contained in:
c.lamboo 2025-12-10 17:31:02 +01:00
parent 3d9485d834
commit f0e899e7ce
2 changed files with 108 additions and 28 deletions

View file

@ -36,7 +36,7 @@ class Backup:
IGNORED_FOLDERS = [] # type: List[str]
"""These folders should be ignored when making a backup."""
SECRETS_SETTINGS = ["general/ultimaker_auth_data", "cluster_api/auth_ids", "cluster_api/auth_keys", "cluster_api/nonce_counts", "cluster_api/nonces"]
SECRETS_SETTINGS = ["general/ultimaker_auth_data", "cluster_api/auth_ids", "cluster_api/auth_keys"]
"""Secret preferences that need to obfuscated when making a backup of Cura"""
catalog = i18nCatalog("cura")

View file

@ -2,16 +2,16 @@
# Cura is released under the terms of the LGPLv3 or higher.
import hashlib
import json
import platform
import re
import secrets
from enum import StrEnum
from json import JSONDecodeError
from typing import Callable, List, Optional, Dict, Union, Any, Type, cast, TypeVar, Tuple
from PyQt6.QtCore import QUrl
from PyQt6.QtCore import QUrl, QTimer
from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
from UM import i18nCatalog
from UM.Logger import Logger
from cura.CuraApplication import CuraApplication
@ -23,6 +23,7 @@ from ..Models.Http.ClusterPrinterStatus import ClusterPrinterStatus
from ..Models.Http.PrinterSystemStatus import PrinterSystemStatus
from ..Models.Http.ClusterMaterial import ClusterMaterial
catalog = i18nCatalog("cura")
ClusterApiClientModel = TypeVar("ClusterApiClientModel", bound=BaseModel)
"""The generic type variable used to document the methods below."""
@ -53,9 +54,10 @@ class ClusterApiClient:
AUTH_CNONCE_LEN = 8
AUTH_MAX_TRIES = 5
REQUEST_MAX_TRIES = 5
# In order to avoid garbage collection we keep the callbacks in this list.
_anti_gc_callbacks = [] # type: List[Callable[[], None]]
_anti_gc_callbacks: List[Callable[[], None]] = []
def __init__(self, address: str, on_error: Callable, on_auth_required: Callable) -> None:
"""Initializes a new cluster API client.
@ -69,24 +71,25 @@ class ClusterApiClient:
self._on_error = on_error
self._auth_tries = 0
self._request_tries = 0
self._on_auth_required = on_auth_required
self._pending_auth = False
self._preliminary_auth_id: Optional[str] = None
self._preliminary_auth_key: Optional[str] = None
prefs = CuraApplication.getInstance().getPreferences()
prefs.addPreference("cluster_api/auth_ids", "{}")
prefs.addPreference("cluster_api/auth_keys", "{}")
prefs.addPreference("cluster_api/nonce_counts", "{}")
prefs.addPreference("cluster_api/nonces", "{}")
try:
self._auth_id = json.loads(prefs.getValue("cluster_api/auth_ids")).get(self._address, None)
self._auth_key = json.loads(prefs.getValue("cluster_api/auth_keys")).get(self._address, None)
self._nonce_count = int(json.loads(prefs.getValue("cluster_api/nonce_counts")).get(self._address, 1))
self._nonce = json.loads(prefs.getValue("cluster_api/nonces")).get(self._address, None)
except (JSONDecodeError, TypeError, KeyError) as ex:
Logger.info(f"Get new cluster-API auth info ('{str(ex)}').")
self._auth_id = None
self._auth_key = None
self._nonce_count = 1
self._nonce = None
self._nonce_count = 1
self._nonce = None
def _setLocalValueToPrefDict(self, name: str, value: Any) -> None:
prefs = CuraApplication.getInstance().getPreferences()
@ -179,8 +182,6 @@ class ClusterApiClient:
digest_str = self._makeAuthDigestHeaderPart(path, method=method)
request.setRawHeader(b"Authorization", f"Digest {digest_str}".encode("utf-8"))
self._nonce_count += 1
self._setLocalValueToPrefDict("cluster_api/nonce_counts", self._nonce_count)
CuraApplication.getInstance().savePreferences()
elif not skip_auth:
self._setupAuth()
return request
@ -251,9 +252,90 @@ class ClusterApiClient:
f'algorithm="SHA-256"'
])
def _checkAuth(self) -> None:
"""
Polling function to check whether the user has authorized the application yet.
"""
Logger.info("Checking Cluster API authorization status...")
def on_finished(resp) -> None:
try:
auth_info = json.loads(resp.data().decode())
match (auth_info["message"]):
case "authorized":
Logger.info("Cluster API authorization successful.")
AuthorizationRequiredMessage.hide()
self._auth_id = self._preliminary_auth_id
self._auth_key = self._preliminary_auth_key
self._setLocalValueToPrefDict("cluster_api/auth_ids", self._auth_id)
self._setLocalValueToPrefDict("cluster_api/auth_keys", self._auth_key)
CuraApplication.getInstance().savePreferences()
self._preliminary_auth_id = None
self._preliminary_auth_key = None
self._pending_auth = False
return
case "unauthorized":
Logger.warning("Cluster API was denied.")
self._pending_auth = False
return
except Exception as ex:
Logger.warning(f"Couldn't check authorization status: {str(ex)}")
return
self._auth_polling_timer = QTimer()
self._auth_polling_timer.setInterval(1000)
self._auth_polling_timer.setSingleShot(True)
self._auth_polling_timer.timeout.connect(self._checkAuth)
self._auth_polling_timer.start()
url = f"{self.PRINTER_API_PREFIX}/auth/check/{self._preliminary_auth_id}"
reply = self._manager.get(self.createEmptyRequest(url, method=HttpRequestMethod.GET, skip_auth=True))
self._addCallback(reply, on_finished)
def _validateAuth(self) -> None:
"""
Validates whether the current authentication credentials are still valid.
"""
Logger.info("Validating Cluster API authorization status...")
def on_finished(resp) -> None:
try:
auth_info = json.loads(resp.data().decode())
match (auth_info["message"]):
case "authorized":
pass
case _:
# Invalid credentials, clear them
self._auth_id = None
self._auth_key = None
self._setLocalValueToPrefDict("cluster_api/auth_ids", None)
self._setLocalValueToPrefDict("cluster_api/auth_keys", None)
except Exception as ex:
Logger.warning(f"Couldn't check authorization status: {str(ex)}")
return
url = f"{self.PRINTER_API_PREFIX}/auth/check/{self._auth_id}"
reply = self._manager.get(self.createEmptyRequest(url, method=HttpRequestMethod.GET, skip_auth=True))
self._addCallback(reply, on_finished)
def _setupAuth(self) -> None:
""" Handles the setup process for authentication by making a temporary digest-token request to the printer API.
"""
if self._pending_auth:
return
Logger.info("Requesting Cluster API authorization...")
self._pending_auth = True
self._on_auth_required(
catalog.i18nc("@info:status", "Please authorize Cura by approving the request on the printer's display."))
if self._auth_tries >= ClusterApiClient.AUTH_MAX_TRIES:
Logger.warning("Maximum authorization temporary digest-token request tries exceeded. Is printer-firmware up to date?")
@ -263,20 +345,19 @@ class ClusterApiClient:
self._auth_tries += 1
try:
auth_info = json.loads(resp.data().decode())
self._auth_id = auth_info["id"]
self._auth_key = auth_info["key"]
self._setLocalValueToPrefDict("cluster_api/auth_ids", self._auth_id)
self._setLocalValueToPrefDict("cluster_api/auth_keys", self._auth_key)
CuraApplication.getInstance().savePreferences()
self._preliminary_auth_id = auth_info["id"]
self._preliminary_auth_key = auth_info["key"]
self._checkAuth()
except Exception as ex:
Logger.warning(f"Couldn't get temporary digest token: {str(ex)}")
return
self._auth_tries = 0
url = "{}/auth/request".format(self.PRINTER_API_PREFIX)
application_name = CuraApplication.getInstance().getApplicationDisplayName()
request_body = json.dumps({
"application": CuraApplication.getInstance().getApplicationDisplayName(),
"user": f"user@{platform.node()}",
"application": application_name,
"user": f"{application_name} user",
}).encode("utf-8")
reply = self._manager.post(self.createEmptyRequest(url, method=HttpRequestMethod.POST, skip_auth=True), request_body)
@ -303,24 +384,23 @@ class ClusterApiClient:
return
if reply.error() != QNetworkReply.NetworkError.NoError:
Logger.warning("Cluster API request error: %s", reply.errorString())
if reply.error() == QNetworkReply.NetworkError.AuthenticationRequiredError:
self._auth_id = None
self._auth_key = None
self._on_auth_required(reply.errorString())
nonce_match = re.search(r'nonce="([^"]+)', str(reply.rawHeader(b"WWW-Authenticate")))
if nonce_match:
self._nonce = nonce_match.group(1)
self._nonce_count = 1
self._setLocalValueToPrefDict("cluster_api/nonce_counts", self._nonce_count)
self._setLocalValueToPrefDict("cluster_api/nonces", self._nonce)
CuraApplication.getInstance().savePreferences()
self._request_tries += 1
if self._request_tries >= ClusterApiClient.REQUEST_MAX_TRIES:
self._validateAuth()
return
else:
self._on_error(reply.errorString())
return
if self._auth_id and self._auth_key and self._nonce_count > 1:
AuthorizationRequiredMessage.hide()
else:
self._request_tries = 0
# If no parse model is given, simply return the raw data in the callback.
if not model: