diff --git a/cura/OAuth2/AuthorizationService.py b/cura/OAuth2/AuthorizationService.py index 868dbe8034..0f57621a47 100644 --- a/cura/OAuth2/AuthorizationService.py +++ b/cura/OAuth2/AuthorizationService.py @@ -93,7 +93,7 @@ class AuthorizationService: Refresh the access token when it expired. """ if self._auth_data is None or self._auth_data.refresh_token is None: - Logger.log("w", "Unable to refresh acces token, since there is no refresh token.") + Logger.log("w", "Unable to refresh access token, since there is no refresh token.") return self._storeAuthData(self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token)) self.onAuthStateChanged.emit(logged_in=True) @@ -148,8 +148,8 @@ class AuthorizationService: if preferences_data: self._auth_data = AuthenticationResponse(**preferences_data) self.onAuthStateChanged.emit(logged_in=True) - except ValueError as err: - Logger.log("w", "Could not load auth data from preferences: %s", err) + except ValueError: + Logger.logException("w", "Could not load auth data from preferences") def _storeAuthData(self, auth_data: Optional[AuthenticationResponse] = None) -> None: """Store authentication data in preferences and locally.""" diff --git a/tests/TestOAuth2.py b/tests/TestOAuth2.py new file mode 100644 index 0000000000..10578eaeb0 --- /dev/null +++ b/tests/TestOAuth2.py @@ -0,0 +1,90 @@ +from unittest.mock import MagicMock, patch + +from UM.Preferences import Preferences +from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers +from cura.OAuth2.AuthorizationService import AuthorizationService +from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile + +CALLBACK_PORT = 32118 +OAUTH_ROOT = "https://account.ultimaker.com" +CLOUD_API_ROOT = "https://api.ultimaker.com" + +OAUTH_SETTINGS = OAuth2Settings( + OAUTH_SERVER_URL= OAUTH_ROOT, + CALLBACK_PORT=CALLBACK_PORT, + CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT), + CLIENT_ID="", + CLIENT_SCOPES="", + AUTH_DATA_PREFERENCE_KEY="test/auth_data", + AUTH_SUCCESS_REDIRECT="{}/auth-success".format(CLOUD_API_ROOT), + AUTH_FAILED_REDIRECT="{}/auth-error".format(CLOUD_API_ROOT) + ) + +FAILED_AUTH_RESPONSE = AuthenticationResponse(success = False, err_message = "FAILURE!") + +SUCCESFULL_AUTH_RESPONSE = AuthenticationResponse(access_token = "beep", refresh_token = "beep?") + +MALFORMED_AUTH_RESPONSE = AuthenticationResponse() + + +def test_cleanAuthService(): + # Ensure that when setting up an AuthorizationService, no data is set. + authorization_service = AuthorizationService(Preferences(), OAUTH_SETTINGS) + assert authorization_service.getUserProfile() is None + assert authorization_service.getAccessToken() is None + + +def test_failedLogin(): + authorization_service = AuthorizationService(Preferences(), OAUTH_SETTINGS) + authorization_service.onAuthenticationError.emit = MagicMock() + authorization_service.onAuthStateChanged.emit = MagicMock() + + # Let the service think there was a failed response + authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE) + + # Check that the error signal was triggered + assert authorization_service.onAuthenticationError.emit.call_count == 1 + + # Since nothing changed, this should still be 0. + assert authorization_service.onAuthStateChanged.emit.call_count == 0 + + # Validate that there is no user profile or token + assert authorization_service.getUserProfile() is None + assert authorization_service.getAccessToken() is None + + +def test_loginAndLogout(): + preferences = Preferences() + authorization_service = AuthorizationService(preferences, OAUTH_SETTINGS) + authorization_service.onAuthenticationError.emit = MagicMock() + authorization_service.onAuthStateChanged.emit = MagicMock() + + # Let the service think there was a succesfull response + with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()): + authorization_service._onAuthStateChanged(SUCCESFULL_AUTH_RESPONSE) + + # Ensure that the error signal was not triggered + assert authorization_service.onAuthenticationError.emit.call_count == 0 + + # Since we said that it went right this time, validate that we got a signal. + assert authorization_service.onAuthStateChanged.emit.call_count == 1 + assert authorization_service.getUserProfile() is not None + assert authorization_service.getAccessToken() == "beep" + + # Check that we stored the authentication data, so next time the user won't have to log in again. + assert preferences.getValue("test/auth_data") is not None + + # We're logged in now, also check if logging out works + authorization_service.deleteAuthData() + assert authorization_service.onAuthStateChanged.emit.call_count == 2 + assert authorization_service.getUserProfile() is None + + # Ensure the data is gone after we logged out. + assert preferences.getValue("test/auth_data") == "{}" + + +def test_wrongServerResponses(): + authorization_service = AuthorizationService(Preferences(), OAUTH_SETTINGS) + with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()): + authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE) + assert authorization_service.getUserProfile() is None \ No newline at end of file