mirror of
https://github.com/Ultimaker/Cura.git
synced 2025-07-19 12:47:49 -06:00
Added unit tests for authorization service
CURA-5744
This commit is contained in:
parent
081b2a28fe
commit
1e5177a44f
2 changed files with 93 additions and 3 deletions
|
@ -93,7 +93,7 @@ class AuthorizationService:
|
|||
Refresh the access token when it expired.
|
||||
"""
|
||||
if self._auth_data is None or self._auth_data.refresh_token is None:
|
||||
Logger.log("w", "Unable to refresh acces token, since there is no refresh token.")
|
||||
Logger.log("w", "Unable to refresh access token, since there is no refresh token.")
|
||||
return
|
||||
self._storeAuthData(self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token))
|
||||
self.onAuthStateChanged.emit(logged_in=True)
|
||||
|
@ -148,8 +148,8 @@ class AuthorizationService:
|
|||
if preferences_data:
|
||||
self._auth_data = AuthenticationResponse(**preferences_data)
|
||||
self.onAuthStateChanged.emit(logged_in=True)
|
||||
except ValueError as err:
|
||||
Logger.log("w", "Could not load auth data from preferences: %s", err)
|
||||
except ValueError:
|
||||
Logger.logException("w", "Could not load auth data from preferences")
|
||||
|
||||
def _storeAuthData(self, auth_data: Optional[AuthenticationResponse] = None) -> None:
|
||||
"""Store authentication data in preferences and locally."""
|
||||
|
|
90
tests/TestOAuth2.py
Normal file
90
tests/TestOAuth2.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from UM.Preferences import Preferences
|
||||
from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers
|
||||
from cura.OAuth2.AuthorizationService import AuthorizationService
|
||||
from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
|
||||
|
||||
CALLBACK_PORT = 32118
|
||||
OAUTH_ROOT = "https://account.ultimaker.com"
|
||||
CLOUD_API_ROOT = "https://api.ultimaker.com"
|
||||
|
||||
OAUTH_SETTINGS = OAuth2Settings(
|
||||
OAUTH_SERVER_URL= OAUTH_ROOT,
|
||||
CALLBACK_PORT=CALLBACK_PORT,
|
||||
CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT),
|
||||
CLIENT_ID="",
|
||||
CLIENT_SCOPES="",
|
||||
AUTH_DATA_PREFERENCE_KEY="test/auth_data",
|
||||
AUTH_SUCCESS_REDIRECT="{}/auth-success".format(CLOUD_API_ROOT),
|
||||
AUTH_FAILED_REDIRECT="{}/auth-error".format(CLOUD_API_ROOT)
|
||||
)
|
||||
|
||||
FAILED_AUTH_RESPONSE = AuthenticationResponse(success = False, err_message = "FAILURE!")
|
||||
|
||||
SUCCESFULL_AUTH_RESPONSE = AuthenticationResponse(access_token = "beep", refresh_token = "beep?")
|
||||
|
||||
MALFORMED_AUTH_RESPONSE = AuthenticationResponse()
|
||||
|
||||
|
||||
def test_cleanAuthService():
|
||||
# Ensure that when setting up an AuthorizationService, no data is set.
|
||||
authorization_service = AuthorizationService(Preferences(), OAUTH_SETTINGS)
|
||||
assert authorization_service.getUserProfile() is None
|
||||
assert authorization_service.getAccessToken() is None
|
||||
|
||||
|
||||
def test_failedLogin():
|
||||
authorization_service = AuthorizationService(Preferences(), OAUTH_SETTINGS)
|
||||
authorization_service.onAuthenticationError.emit = MagicMock()
|
||||
authorization_service.onAuthStateChanged.emit = MagicMock()
|
||||
|
||||
# Let the service think there was a failed response
|
||||
authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
|
||||
|
||||
# Check that the error signal was triggered
|
||||
assert authorization_service.onAuthenticationError.emit.call_count == 1
|
||||
|
||||
# Since nothing changed, this should still be 0.
|
||||
assert authorization_service.onAuthStateChanged.emit.call_count == 0
|
||||
|
||||
# Validate that there is no user profile or token
|
||||
assert authorization_service.getUserProfile() is None
|
||||
assert authorization_service.getAccessToken() is None
|
||||
|
||||
|
||||
def test_loginAndLogout():
|
||||
preferences = Preferences()
|
||||
authorization_service = AuthorizationService(preferences, OAUTH_SETTINGS)
|
||||
authorization_service.onAuthenticationError.emit = MagicMock()
|
||||
authorization_service.onAuthStateChanged.emit = MagicMock()
|
||||
|
||||
# Let the service think there was a succesfull response
|
||||
with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
|
||||
authorization_service._onAuthStateChanged(SUCCESFULL_AUTH_RESPONSE)
|
||||
|
||||
# Ensure that the error signal was not triggered
|
||||
assert authorization_service.onAuthenticationError.emit.call_count == 0
|
||||
|
||||
# Since we said that it went right this time, validate that we got a signal.
|
||||
assert authorization_service.onAuthStateChanged.emit.call_count == 1
|
||||
assert authorization_service.getUserProfile() is not None
|
||||
assert authorization_service.getAccessToken() == "beep"
|
||||
|
||||
# Check that we stored the authentication data, so next time the user won't have to log in again.
|
||||
assert preferences.getValue("test/auth_data") is not None
|
||||
|
||||
# We're logged in now, also check if logging out works
|
||||
authorization_service.deleteAuthData()
|
||||
assert authorization_service.onAuthStateChanged.emit.call_count == 2
|
||||
assert authorization_service.getUserProfile() is None
|
||||
|
||||
# Ensure the data is gone after we logged out.
|
||||
assert preferences.getValue("test/auth_data") == "{}"
|
||||
|
||||
|
||||
def test_wrongServerResponses():
|
||||
authorization_service = AuthorizationService(Preferences(), OAUTH_SETTINGS)
|
||||
with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
|
||||
authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
|
||||
assert authorization_service.getUserProfile() is None
|
Loading…
Add table
Add a link
Reference in a new issue