Transfer the generation of the auth link into its own function

The authentication link should be prepended with a logoff link from
mycloud, if it is requested. In order to make this process testable
this commit separates the generation of the authentication link,
based on whether it requests for a browser logoff first, into its
own function.

This commit also adds tests for this function.

CURA-7427
This commit is contained in:
Kostas Karmas 2020-05-12 13:07:39 +02:00
parent 96387ef2aa
commit d3fb002d9b
2 changed files with 41 additions and 17 deletions

View file

@ -3,7 +3,7 @@
import json import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, TYPE_CHECKING from typing import Optional, TYPE_CHECKING, Dict
from urllib.parse import urlencode, quote_plus from urllib.parse import urlencode, quote_plus
import requests.exceptions import requests.exceptions
@ -24,6 +24,7 @@ if TYPE_CHECKING:
from cura.OAuth2.Models import UserProfile, OAuth2Settings from cura.OAuth2.Models import UserProfile, OAuth2Settings
from UM.Preferences import Preferences from UM.Preferences import Preferences
MYCLOUD_LOGOFF = "https://mycloud.ultimaker.com/logoff"
## The authorization service is responsible for handling the login flow, ## The authorization service is responsible for handling the login flow,
# storing user credentials and providing account information. # storing user credentials and providing account information.
@ -142,7 +143,7 @@ class AuthorizationService:
self.onAuthStateChanged.emit(logged_in = False) self.onAuthStateChanged.emit(logged_in = False)
## Start the flow to become authenticated. This will start a new webbrowser tap, prompting the user to login. ## Start the flow to become authenticated. This will start a new webbrowser tap, prompting the user to login.
def startAuthorizationFlow(self, force_logout_from_mycloud = False) -> None: def startAuthorizationFlow(self, force_browser_logout: bool = False) -> None:
Logger.log("d", "Starting new OAuth2 flow...") Logger.log("d", "Starting new OAuth2 flow...")
# Create the tokens needed for the code challenge (PKCE) extension for OAuth2. # Create the tokens needed for the code challenge (PKCE) extension for OAuth2.
@ -153,8 +154,8 @@ class AuthorizationService:
state = AuthorizationHelpers.generateVerificationCode() state = AuthorizationHelpers.generateVerificationCode()
# Create the query string needed for the OAuth2 flow. # Create the query dict needed for the OAuth2 flow.
query_string = urlencode({ query_parameters_dict = {
"client_id": self._settings.CLIENT_ID, "client_id": self._settings.CLIENT_ID,
"redirect_uri": self._settings.CALLBACK_URL, "redirect_uri": self._settings.CALLBACK_URL,
"scope": self._settings.CLIENT_SCOPES, "scope": self._settings.CLIENT_SCOPES,
@ -162,7 +163,7 @@ class AuthorizationService:
"state": state, # Forever in our Hearts, RIP "(.Y.)" (2018-2020) "state": state, # Forever in our Hearts, RIP "(.Y.)" (2018-2020)
"code_challenge": challenge_code, "code_challenge": challenge_code,
"code_challenge_method": "S512" "code_challenge_method": "S512"
}) }
# Start a local web server to receive the callback URL on. # Start a local web server to receive the callback URL on.
try: try:
@ -173,18 +174,25 @@ class AuthorizationService:
title=i18n_catalog.i18nc("@info:title", "Warning")).show() title=i18n_catalog.i18nc("@info:title", "Warning")).show()
return return
# Open the authorization page in a new browser window. If a force logout is requested during the authorization auth_url = self._generate_auth_url(query_parameters_dict, force_browser_logout)
# flow, the "mycloud logoff" link will be prepended to the authorization url to make sure that the user will be # Open the authorization page in a new browser window.
# logged off from the browser before being redirected to login again. This case is used to sync the accounts QDesktopServices.openUrl(QUrl(auth_url))
# between Cura and the browser.
auth_url = "{}?{}".format(self._auth_url, query_string)
if force_logout_from_mycloud:
mycloud_logoff_link = "https://mycloud.ultimaker.com/logoff"
logoff_auth_url = "{}?next={}".format(mycloud_logoff_link, quote_plus(auth_url))
QDesktopServices.openUrl(QUrl(logoff_auth_url))
else:
QDesktopServices.openUrl(QUrl(auth_url))
def _generate_auth_url(self, query_parameters_dict: Dict[str, str], force_browser_logout: bool) -> str:
"""
Generates the authentications url based on the original auth_url and the query_parameters_dict to be included.
If there is a request to force logging out of mycloud in the browser, the link to logoff from mycloud is
prepended in order to force the browser to logoff from mycloud and then redirect to the authentication url to
login again. This case is used to sync the accounts between Cura and the browser.
:param query_parameters_dict:
:param force_browser_logout:
:return:
"""
auth_url = "{}?{}".format(self._auth_url, urlencode(query_parameters_dict))
if force_browser_logout:
# The url after '?next=' should be urlencoded
auth_url = "{}?next={}".format(MYCLOUD_LOGOFF, quote_plus(auth_url))
return auth_url
## Callback method for the authentication flow. ## Callback method for the authentication flow.
def _onAuthStateChanged(self, auth_response: AuthenticationResponse) -> None: def _onAuthStateChanged(self, auth_response: AuthenticationResponse) -> None:

View file

@ -7,7 +7,7 @@ from PyQt5.QtGui import QDesktopServices
from UM.Preferences import Preferences from UM.Preferences import Preferences
from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
from cura.OAuth2.AuthorizationService import AuthorizationService from cura.OAuth2.AuthorizationService import AuthorizationService, MYCLOUD_LOGOFF
from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
@ -226,3 +226,19 @@ def test_wrongServerResponses() -> None:
with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()): with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE) authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
assert authorization_service.getUserProfile() is None assert authorization_service.getUserProfile() is None
def test__generate_auth_url() -> None:
preferences = Preferences()
authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
query_parameters_dict = {
"client_id": "",
"redirect_uri": OAUTH_SETTINGS.CALLBACK_URL,
"scope": OAUTH_SETTINGS.CLIENT_SCOPES,
"response_type": "code"
}
auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = False)
assert MYCLOUD_LOGOFF + "?next=" not in auth_url
auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = True)
assert MYCLOUD_LOGOFF + "?next=" in auth_url