mirror of
https://github.com/Ultimaker/Cura.git
synced 2025-07-08 15:37:27 -06:00
Use a descriptor to optionally store to Keyring
CURA-7180 keyring storage
This commit is contained in:
parent
6372fbed54
commit
d06a25595a
3 changed files with 23 additions and 86 deletions
|
@ -17,7 +17,6 @@ from UM.i18n import i18nCatalog
|
||||||
from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
|
from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
|
||||||
from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
|
from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
|
||||||
from cura.OAuth2.Models import AuthenticationResponse
|
from cura.OAuth2.Models import AuthenticationResponse
|
||||||
from cura.OAuth2.SecretStorage import SecretStorage
|
|
||||||
|
|
||||||
i18n_catalog = i18nCatalog("cura")
|
i18n_catalog = i18nCatalog("cura")
|
||||||
|
|
||||||
|
@ -53,7 +52,6 @@ class AuthorizationService:
|
||||||
|
|
||||||
self.onAuthStateChanged.connect(self._authChanged)
|
self.onAuthStateChanged.connect(self._authChanged)
|
||||||
|
|
||||||
self._secret_storage = None # type: Optional[SecretStorage]
|
|
||||||
|
|
||||||
def _authChanged(self, logged_in):
|
def _authChanged(self, logged_in):
|
||||||
if logged_in and self._unable_to_get_data_message is not None:
|
if logged_in and self._unable_to_get_data_message is not None:
|
||||||
|
@ -62,7 +60,6 @@ class AuthorizationService:
|
||||||
def initialize(self, preferences: Optional["Preferences"] = None) -> None:
|
def initialize(self, preferences: Optional["Preferences"] = None) -> None:
|
||||||
if preferences is not None:
|
if preferences is not None:
|
||||||
self._preferences = preferences
|
self._preferences = preferences
|
||||||
self._secret_storage = SecretStorage(preferences)
|
|
||||||
if self._preferences:
|
if self._preferences:
|
||||||
self._preferences.addPreference(self._settings.AUTH_DATA_PREFERENCE_KEY, "{}")
|
self._preferences.addPreference(self._settings.AUTH_DATA_PREFERENCE_KEY, "{}")
|
||||||
|
|
||||||
|
@ -234,11 +231,6 @@ class AuthorizationService:
|
||||||
try:
|
try:
|
||||||
preferences_data = json.loads(self._preferences.getValue(self._settings.AUTH_DATA_PREFERENCE_KEY))
|
preferences_data = json.loads(self._preferences.getValue(self._settings.AUTH_DATA_PREFERENCE_KEY))
|
||||||
|
|
||||||
# Since we stored all the sensitive stuff in the keyring, restore that now.
|
|
||||||
# Don't store the access_token, as it's very long and that (or tried workarounds) causes issues on Windows.
|
|
||||||
preferences_data["refresh_token"] = self._secret_storage["refresh_token"]
|
|
||||||
preferences_data["access_token"] = self._secret_storage["access_token"]
|
|
||||||
|
|
||||||
if preferences_data:
|
if preferences_data:
|
||||||
self._auth_data = AuthenticationResponse(**preferences_data)
|
self._auth_data = AuthenticationResponse(**preferences_data)
|
||||||
# Also check if we can actually get the user profile information.
|
# Also check if we can actually get the user profile information.
|
||||||
|
@ -265,20 +257,7 @@ class AuthorizationService:
|
||||||
self._auth_data = auth_data
|
self._auth_data = auth_data
|
||||||
if auth_data:
|
if auth_data:
|
||||||
self._user_profile = self.getUserProfile()
|
self._user_profile = self.getUserProfile()
|
||||||
|
self._preferences.setValue(self._settings.AUTH_DATA_PREFERENCE_KEY, json.dumps(auth_data.dump()))
|
||||||
# Store all the sensitive stuff in the keyring
|
|
||||||
self._secret_storage["refresh_token"] = auth_data.refresh_token
|
|
||||||
|
|
||||||
# The access_token will still be stored in the preference file on windows, due to a 255 length limitation
|
|
||||||
self._secret_storage["access_token"] = auth_data.access_token
|
|
||||||
|
|
||||||
# Store the data in the preference, setting both tokens to None so they won't be written
|
|
||||||
auth_data.refresh_token = None
|
|
||||||
auth_data.access_token = None
|
|
||||||
self._preferences.setValue(self._settings.AUTH_DATA_PREFERENCE_KEY, json.dumps(vars(auth_data)))
|
|
||||||
|
|
||||||
# restore access token so that syncing for the current session doesn't fail
|
|
||||||
auth_data.access_token = self._secret_storage["access_token"]
|
|
||||||
else:
|
else:
|
||||||
self._user_profile = None
|
self._user_profile = None
|
||||||
self._preferences.resetPreference(self._settings.AUTH_DATA_PREFERENCE_KEY)
|
self._preferences.resetPreference(self._settings.AUTH_DATA_PREFERENCE_KEY)
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
# Copyright (c) 2020 Ultimaker B.V.
|
# Copyright (c) 2020 Ultimaker B.V.
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
# Cura is released under the terms of the LGPLv3 or higher.
|
||||||
from typing import Optional, Dict, Any, List
|
from typing import Optional, Dict, Any, List, Union
|
||||||
|
from copy import deepcopy
|
||||||
|
from cura.OAuth2.KeyringAttribute import KeyringAttribute
|
||||||
|
|
||||||
|
|
||||||
class BaseModel:
|
class BaseModel:
|
||||||
|
@ -37,12 +39,29 @@ class AuthenticationResponse(BaseModel):
|
||||||
# Data comes from the token response with success flag and error message added.
|
# Data comes from the token response with success flag and error message added.
|
||||||
success = True # type: bool
|
success = True # type: bool
|
||||||
token_type = None # type: Optional[str]
|
token_type = None # type: Optional[str]
|
||||||
access_token = None # type: Optional[str]
|
|
||||||
refresh_token = None # type: Optional[str]
|
|
||||||
expires_in = None # type: Optional[str]
|
expires_in = None # type: Optional[str]
|
||||||
scope = None # type: Optional[str]
|
scope = None # type: Optional[str]
|
||||||
err_message = None # type: Optional[str]
|
err_message = None # type: Optional[str]
|
||||||
received_at = None # type: Optional[str]
|
received_at = None # type: Optional[str]
|
||||||
|
access_token = KeyringAttribute()
|
||||||
|
refresh_token = KeyringAttribute()
|
||||||
|
|
||||||
|
def __init__(self, **kwargs: Any) -> None:
|
||||||
|
self.access_token = kwargs.pop("access_token", None)
|
||||||
|
self.refresh_token = kwargs.pop("refresh_token", None)
|
||||||
|
super(AuthenticationResponse, self).__init__(**kwargs)
|
||||||
|
|
||||||
|
def dump(self) -> dict[Union[bool, Optional[str]]]:
|
||||||
|
"""
|
||||||
|
Dumps the dictionary of Authentication attributes. KeyringAttributes are transformed to public attributes
|
||||||
|
If the keyring was used, these will have a None value, otherwise they will have the secret value
|
||||||
|
|
||||||
|
:return: Dictionary of Authentication attributes
|
||||||
|
"""
|
||||||
|
dumped = deepcopy(vars(self))
|
||||||
|
dumped["access_token"] = dumped.pop("_access_token")
|
||||||
|
dumped["refresh_token"] = dumped.pop("_refresh_token")
|
||||||
|
return dumped
|
||||||
|
|
||||||
|
|
||||||
class ResponseStatus(BaseModel):
|
class ResponseStatus(BaseModel):
|
||||||
|
|
|
@ -1,61 +0,0 @@
|
||||||
# Copyright (c) 2021 Ultimaker B.V.
|
|
||||||
# Cura is released under the terms of the LGPLv3 or higher.
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import keyring # TODO: Add to about as dependency
|
|
||||||
|
|
||||||
from UM.Logger import Logger
|
|
||||||
|
|
||||||
|
|
||||||
class SecretStorage:
|
|
||||||
"""
|
|
||||||
Secret storage vault. It will by default store a secret in the system keyring. If that fails, is not available or
|
|
||||||
not allowed it will store in the Cura preferences. This is the unsafe "old" behaviour
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, preferences: Optional["Preferences"] = None):
|
|
||||||
self._stored_secrets = set()
|
|
||||||
if preferences:
|
|
||||||
self._preferences = preferences
|
|
||||||
keys = self._preferences.getValue("general/keyring")
|
|
||||||
if keys is not None and keys != '':
|
|
||||||
self._stored_secrets = set(keys.split(";"))
|
|
||||||
else:
|
|
||||||
self._preferences.addPreference("general/keyring", "{}")
|
|
||||||
|
|
||||||
def __delitem__(self, key: str):
|
|
||||||
if key in self._stored_secrets:
|
|
||||||
self._stored_secrets.remove(key)
|
|
||||||
self._preferences.setValue("general/keyring", ";".join(self._stored_secrets))
|
|
||||||
keyring.delete_password("cura", key)
|
|
||||||
else:
|
|
||||||
# TODO: handle removal of secret from preferences
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __setitem__(self, key: str, value: str):
|
|
||||||
try:
|
|
||||||
keyring.set_password("cura", key, value)
|
|
||||||
self._stored_secrets.add(key)
|
|
||||||
self._preferences.setValue("general/{key}".format(key = key), None)
|
|
||||||
except:
|
|
||||||
Logger.logException("w", "Could not store {key} in keyring.".format(key = key))
|
|
||||||
if key in self._stored_secrets:
|
|
||||||
self._stored_secrets.remove(key)
|
|
||||||
self._preferences.addPreference("general/{key}".format(key = key), "{}")
|
|
||||||
self._preferences.setValue("general/{key}".format(key = key), value)
|
|
||||||
self._preferences.setValue("general/keyring", ";".join(self._stored_secrets))
|
|
||||||
|
|
||||||
def __getitem__(self, key: str) -> Optional[str]:
|
|
||||||
secret = None
|
|
||||||
if key in self._stored_secrets:
|
|
||||||
try:
|
|
||||||
secret = keyring.get_password("cura", key)
|
|
||||||
except:
|
|
||||||
secret = self._preferences.getValue("general/{key}".format(key = key))
|
|
||||||
Logger.logException("w", "{key} obtained from preferences, consider giving Cura access to the keyring".format(key = key))
|
|
||||||
else:
|
|
||||||
secret = self._preferences.getValue(f"general/{key}")
|
|
||||||
Logger.logException("w", "{key} obtained from preferences, consider giving Cura access to the keyring".format(key = key))
|
|
||||||
if secret is None or secret == '':
|
|
||||||
Logger.logException("w", "Could not load {key}".format(key = key))
|
|
||||||
return secret
|
|
Loading…
Add table
Add a link
Reference in a new issue