Cura/cura/OAuth2/AuthorizationHelpers.py
Ghostkeeper a9990eaa75
Make parseJWT asynchronous
This involves returning the user profile via a callback.
No longer use the Requests library, which doesn't properly use the SSL certificates locally on the computer like the QNetworkManager does.

Contributes to issue CURA-8539.
2021-11-18 17:11:27 +01:00

194 lines
8.6 KiB
Python

# Copyright (c) 2021 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from base64 import b64encode
from datetime import datetime
from hashlib import sha512
from PyQt5.QtNetwork import QNetworkReply
import secrets
from threading import Lock
from typing import Callable, Optional
import requests
import urllib.parse
from UM.i18n import i18nCatalog
from UM.Logger import Logger
from UM.TaskManagement.HttpRequestManager import HttpRequestManager # To download log-in tokens.
from cura.OAuth2.Models import AuthenticationResponse, UserProfile, OAuth2Settings
catalog = i18nCatalog("cura")
TOKEN_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
class AuthorizationHelpers:
"""Class containing several helpers to deal with the authorization flow."""
def __init__(self, settings: "OAuth2Settings") -> None:
self._settings = settings
self._token_url = "{}/token".format(self._settings.OAUTH_SERVER_URL)
self._request_lock = Lock()
self._auth_response = None # type: Optional[AuthenticationResponse]
@property
def settings(self) -> "OAuth2Settings":
"""The OAuth2 settings object."""
return self._settings
def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str) -> "AuthenticationResponse":
"""Request the access token from the authorization server.
:param authorization_code: The authorization code from the 1st step.
:param verification_code: The verification code needed for the PKCE extension.
:return: An AuthenticationResponse object.
"""
data = {
"client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
"redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
"grant_type": "authorization_code",
"code": authorization_code,
"code_verifier": verification_code,
"scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
}
headers = {"Content-type": "application/x-www-form-urlencoded"}
self._request_lock.acquire()
HttpRequestManager.getInstance().post(
self._token_url,
data = urllib.parse.urlencode(data).encode("UTF-8"),
headers_dict = headers,
callback = self.parseTokenResponse
)
self._request_lock.acquire(timeout = 60) # Block until the request is completed. 1 minute timeout.
response = self._auth_response
self._auth_response = None
self._request_lock.release()
return response
def getAccessTokenUsingRefreshToken(self, refresh_token: str) -> "AuthenticationResponse":
"""Request the access token from the authorization server using a refresh token.
:param refresh_token:
:return: An AuthenticationResponse object.
"""
Logger.log("d", "Refreshing the access token for [%s]", self._settings.OAUTH_SERVER_URL)
data = {
"client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
"redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
}
headers = {"Content-type": "application/x-www-form-urlencoded"}
self._request_lock.acquire()
HttpRequestManager.getInstance().post(
self._token_url,
data = urllib.parse.urlencode(data).encode("UTF-8"),
headers_dict = headers,
callback = self.parseTokenResponse
)
self._request_lock.acquire(timeout = 60) # Block until the request is completed. 1 minute timeout.
response = self._auth_response
self._auth_response = None
self._request_lock.release()
return response
def parseTokenResponse(self, token_response: QNetworkReply) -> None:
"""Parse the token response from the authorization server into an AuthenticationResponse object.
:param token_response: The JSON string data response from the authorization server.
:return: An AuthenticationResponse object.
"""
token_data = HttpRequestManager.readJSON(token_response)
if not token_data:
self._auth_response = AuthenticationResponse(success = False, err_message = catalog.i18nc("@message", "Could not read response."))
self._request_lock.release()
return
if token_response.error() != QNetworkReply.NetworkError.NoError:
self._auth_response = AuthenticationResponse(success = False, err_message = token_data["error_description"])
self._request_lock.release()
return
self._auth_response = AuthenticationResponse(success = True,
token_type = token_data["token_type"],
access_token = token_data["access_token"],
refresh_token = token_data["refresh_token"],
expires_in = token_data["expires_in"],
scope = token_data["scope"],
received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT))
self._request_lock.release()
return
def checkToken(self, access_token: str, callback: Optional[Callable[[UserProfile], None]] = None) -> None:
"""Calls the authentication API endpoint to get the token data.
The API is called asynchronously. When a response is given, the callback is called with the user's profile.
:param access_token: The encoded JWT token.
:param callback: When a response is given, this function will be called with a user profile. If None, there will
not be a callback. If the token failed to give/parse a user profile, the callback will not be called either.
"""
self._user_profile = None
check_token_url = "{}/check-token".format(self._settings.OAUTH_SERVER_URL)
Logger.log("d", "Checking the access token for [%s]", check_token_url)
headers = {
"Authorization": f"Bearer {access_token}"
}
HttpRequestManager.getInstance().get(
check_token_url,
headers_dict = headers,
callback = lambda reply: self._parseUserProfile(reply, callback)
)
def _parseUserProfile(self, reply: QNetworkReply, callback: Optional[Callable[[UserProfile], None]]) -> None:
"""
Parses the user profile from a reply to /check-token.
If the response is valid, the callback will be called to return the user profile to the caller.
:param reply: A network reply to a request to the /check-token URL.
:param callback: A function to call once a user profile was successfully obtained.
"""
if reply.error() != QNetworkReply.NetworkError.NoError:
Logger.warning(f"Could not access account information. QNetworkError {reply.errorString()}")
return
profile_data = HttpRequestManager.getInstance().readJSON(reply)
if profile_data is None or "data" not in profile_data:
Logger.warning("Could not parse user data from token.")
return
profile_data = profile_data["data"]
required_fields = {"user_id", "username"}
if "user_id" not in profile_data or "username" not in profile_data:
Logger.warning(f"User data missing required field(s): {required_fields - set(profile_data.keys())}")
return
callback(UserProfile(
user_id = profile_data["user_id"],
username = profile_data["username"],
profile_image_url = profile_data.get("profile_image_url", ""),
organization_id = profile_data.get("organization", {}).get("organization_id"),
subscriptions = profile_data.get("subscriptions", [])
))
@staticmethod
def generateVerificationCode(code_length: int = 32) -> str:
"""Generate a verification code of arbitrary length.
:param code_length:: How long should the code be in bytes? This should never be lower than 16, but it's probably
better to leave it at 32
"""
return secrets.token_hex(code_length)
@staticmethod
def generateVerificationCodeChallenge(verification_code: str) -> str:
"""Generates a base64 encoded sha512 encrypted version of a given string.
:param verification_code:
:return: The encrypted code in base64 format.
"""
encoded = sha512(verification_code.encode()).digest()
return b64encode(encoded, altchars = b"_-").decode()