Fix code style and documentation formatting

This commit is contained in:
Ghostkeeper 2019-02-08 11:10:43 +01:00
parent eb3129815d
commit 12b2154a96
No known key found for this signature in database
GPG key ID: 86BEF881AE2CF276
7 changed files with 80 additions and 77 deletions

View file

@ -1,5 +1,6 @@
# Copyright (c) 2018 Ultimaker B.V.
# Copyright (c) 2019 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import json
import random
from hashlib import sha512
@ -12,22 +13,22 @@ from UM.Logger import Logger
from cura.OAuth2.Models import AuthenticationResponse, UserProfile, OAuth2Settings
# Class containing several helpers to deal with the authorization flow.
## Class containing several helpers to deal with the authorization flow.
class AuthorizationHelpers:
def __init__(self, settings: "OAuth2Settings") -> None:
self._settings = settings
self._token_url = "{}/token".format(self._settings.OAUTH_SERVER_URL)
@property
# The OAuth2 settings object.
## The OAuth2 settings object.
def settings(self) -> "OAuth2Settings":
return self._settings
# Request the access token from the authorization server.
## Request the access token from the authorization server.
# \param authorization_code: The authorization code from the 1st step.
# \param verification_code: The verification code needed for the PKCE extension.
# \return: An AuthenticationResponse object.
# \param verification_code: The verification code needed for the PKCE
# extension.
# \return An AuthenticationResponse object.
def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str) -> "AuthenticationResponse":
data = {
"client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
@ -39,9 +40,9 @@ class AuthorizationHelpers:
}
return self.parseTokenResponse(requests.post(self._token_url, data = data)) # type: ignore
# Request the access token from the authorization server using a refresh token.
## Request the access token from the authorization server using a refresh token.
# \param refresh_token:
# \return: An AuthenticationResponse object.
# \return An AuthenticationResponse object.
def getAccessTokenUsingRefreshToken(self, refresh_token: str) -> "AuthenticationResponse":
data = {
"client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
@ -53,9 +54,9 @@ class AuthorizationHelpers:
return self.parseTokenResponse(requests.post(self._token_url, data = data)) # type: ignore
@staticmethod
# Parse the token response from the authorization server into an AuthenticationResponse object.
## Parse the token response from the authorization server into an AuthenticationResponse object.
# \param token_response: The JSON string data response from the authorization server.
# \return: An AuthenticationResponse object.
# \return An AuthenticationResponse object.
def parseTokenResponse(token_response: requests.models.Response) -> "AuthenticationResponse":
token_data = None
@ -65,21 +66,21 @@ class AuthorizationHelpers:
Logger.log("w", "Could not parse token response data: %s", token_response.text)
if not token_data:
return AuthenticationResponse(success=False, err_message="Could not read response.")
return AuthenticationResponse(success = False, err_message = "Could not read response.")
if token_response.status_code not in (200, 201):
return AuthenticationResponse(success=False, err_message=token_data["error_description"])
return AuthenticationResponse(success = False, err_message = token_data["error_description"])
return AuthenticationResponse(success=True,
token_type=token_data["token_type"],
access_token=token_data["access_token"],
refresh_token=token_data["refresh_token"],
expires_in=token_data["expires_in"],
scope=token_data["scope"])
return AuthenticationResponse(success = True,
token_type = token_data["token_type"],
access_token = token_data["access_token"],
refresh_token = token_data["refresh_token"],
expires_in = token_data["expires_in"],
scope = token_data["scope"])
# Calls the authentication API endpoint to get the token data.
## Calls the authentication API endpoint to get the token data.
# \param access_token: The encoded JWT token.
# \return: Dict containing some profile data.
# \return Dict containing some profile data.
def parseJWT(self, access_token: str) -> Optional["UserProfile"]:
try:
token_request = requests.get("{}/check-token".format(self._settings.OAUTH_SERVER_URL), headers = {
@ -103,15 +104,15 @@ class AuthorizationHelpers:
)
@staticmethod
# Generate a 16-character verification code.
## Generate a 16-character verification code.
# \param code_length: How long should the code be?
def generateVerificationCode(code_length: int = 16) -> str:
return "".join(random.choice("0123456789ABCDEF") for i in range(code_length))
@staticmethod
# Generates a base64 encoded sha512 encrypted version of a given string.
## Generates a base64 encoded sha512 encrypted version of a given string.
# \param verification_code:
# \return: The encrypted code in base64 format.
# \return The encrypted code in base64 format.
def generateVerificationCodeChallenge(verification_code: str) -> str:
encoded = sha512(verification_code.encode()).digest()
return b64encode(encoded, altchars = b"_-").decode()

View file

@ -13,7 +13,7 @@ if TYPE_CHECKING:
from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers
# This handler handles all HTTP requests on the local web server.
## This handler handles all HTTP requests on the local web server.
# It also requests the access token for the 2nd stage of the OAuth flow.
class AuthorizationRequestHandler(BaseHTTPRequestHandler):
def __init__(self, request, client_address, server) -> None:
@ -48,9 +48,9 @@ class AuthorizationRequestHandler(BaseHTTPRequestHandler):
# This will cause the server to shut down, so we do it at the very end of the request handling.
self.authorization_callback(token_response)
# Handler for the callback URL redirect.
# \param query: Dict containing the HTTP query parameters.
# \return: HTTP ResponseData containing a success page to show to the user.
## Handler for the callback URL redirect.
# \param query Dict containing the HTTP query parameters.
# \return HTTP ResponseData containing a success page to show to the user.
def _handleCallback(self, query: Dict[Any, List]) -> Tuple[ResponseData, Optional[AuthenticationResponse]]:
code = self._queryGet(query, "code")
if code and self.authorization_helpers is not None and self.verification_code is not None:
@ -81,8 +81,8 @@ class AuthorizationRequestHandler(BaseHTTPRequestHandler):
self.authorization_helpers.settings.AUTH_FAILED_REDIRECT
), token_response
## Handle all other non-existing server calls.
@staticmethod
# Handle all other non-existing server calls.
def _handleNotFound() -> ResponseData:
return ResponseData(status = HTTP_STATUS["NOT_FOUND"], content_type = "text/html", data_stream = b"Not found.")
@ -96,7 +96,7 @@ class AuthorizationRequestHandler(BaseHTTPRequestHandler):
def _sendData(self, data: bytes) -> None:
self.wfile.write(data)
## Convenience helper for getting values from a pre-parsed query string
@staticmethod
# Convenience Helper for getting values from a pre-parsed query string
def _queryGet(query_data: Dict[Any, List], key: str, default: Optional[str] = None) -> Optional[str]:
return query_data.get(key, [default])[0]

View file

@ -1,5 +1,6 @@
# Copyright (c) 2018 Ultimaker B.V.
# Copyright (c) 2019 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from http.server import HTTPServer
from typing import Callable, Any, TYPE_CHECKING
@ -8,19 +9,19 @@ if TYPE_CHECKING:
from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers
# The authorization request callback handler server.
## The authorization request callback handler server.
# This subclass is needed to be able to pass some data to the request handler.
# This cannot be done on the request handler directly as the HTTPServer creates an instance of the handler after
# init.
# This cannot be done on the request handler directly as the HTTPServer
# creates an instance of the handler after init.
class AuthorizationRequestServer(HTTPServer):
# Set the authorization helpers instance on the request handler.
## Set the authorization helpers instance on the request handler.
def setAuthorizationHelpers(self, authorization_helpers: "AuthorizationHelpers") -> None:
self.RequestHandlerClass.authorization_helpers = authorization_helpers # type: ignore
# Set the authorization callback on the request handler.
## Set the authorization callback on the request handler.
def setAuthorizationCallback(self, authorization_callback: Callable[["AuthenticationResponse"], Any]) -> None:
self.RequestHandlerClass.authorization_callback = authorization_callback # type: ignore
# Set the verification code on the request handler.
## Set the verification code on the request handler.
def setVerificationCode(self, verification_code: str) -> None:
self.RequestHandlerClass.verification_code = verification_code # type: ignore

View file

@ -1,5 +1,6 @@
# Copyright (c) 2018 Ultimaker B.V.
# Copyright (c) 2019 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import json
import webbrowser
from typing import Optional, TYPE_CHECKING
@ -17,12 +18,9 @@ if TYPE_CHECKING:
from UM.Preferences import Preferences
## The authorization service is responsible for handling the login flow,
# storing user credentials and providing account information.
class AuthorizationService:
"""
The authorization service is responsible for handling the login flow,
storing user credentials and providing account information.
"""
# Emit signal when authentication is completed.
onAuthStateChanged = Signal()
@ -44,7 +42,7 @@ class AuthorizationService:
if self._preferences:
self._preferences.addPreference(self._settings.AUTH_DATA_PREFERENCE_KEY, "{}")
# Get the user profile as obtained from the JWT (JSON Web Token).
## Get the user profile as obtained from the JWT (JSON Web Token).
# If the JWT is not yet parsed, calling this will take care of that.
# \return UserProfile if a user is logged in, None otherwise.
# \sa _parseJWT
@ -61,7 +59,7 @@ class AuthorizationService:
return self._user_profile
# Tries to parse the JWT (JSON Web Token) data, which it does if all the needed data is there.
## Tries to parse the JWT (JSON Web Token) data, which it does if all the needed data is there.
# \return UserProfile if it was able to parse, None otherwise.
def _parseJWT(self) -> Optional["UserProfile"]:
if not self._auth_data or self._auth_data.access_token is None:
@ -81,7 +79,7 @@ class AuthorizationService:
return self._auth_helpers.parseJWT(self._auth_data.access_token)
# Get the access token as provided by the repsonse data.
## Get the access token as provided by the repsonse data.
def getAccessToken(self) -> Optional[str]:
if not self.getUserProfile():
# We check if we can get the user profile.
@ -95,21 +93,21 @@ class AuthorizationService:
return self._auth_data.access_token
# Try to refresh the access token. This should be used when it has expired.
## Try to refresh the access token. This should be used when it has expired.
def refreshAccessToken(self) -> None:
if self._auth_data is None or self._auth_data.refresh_token is None:
Logger.log("w", "Unable to refresh access token, since there is no refresh token.")
return
self._storeAuthData(self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token))
self.onAuthStateChanged.emit(logged_in=True)
self.onAuthStateChanged.emit(logged_in = True)
# Delete the authentication data that we have stored locally (eg; logout)
## Delete the authentication data that we have stored locally (eg; logout)
def deleteAuthData(self) -> None:
if self._auth_data is not None:
self._storeAuthData()
self.onAuthStateChanged.emit(logged_in=False)
self.onAuthStateChanged.emit(logged_in = False)
# Start the flow to become authenticated. This will start a new webbrowser tap, prompting the user to login.
## Start the flow to become authenticated. This will start a new webbrowser tap, prompting the user to login.
def startAuthorizationFlow(self) -> None:
Logger.log("d", "Starting new OAuth2 flow...")
@ -136,16 +134,16 @@ class AuthorizationService:
# Start a local web server to receive the callback URL on.
self._server.start(verification_code)
# Callback method for the authentication flow.
## Callback method for the authentication flow.
def _onAuthStateChanged(self, auth_response: AuthenticationResponse) -> None:
if auth_response.success:
self._storeAuthData(auth_response)
self.onAuthStateChanged.emit(logged_in=True)
self.onAuthStateChanged.emit(logged_in = True)
else:
self.onAuthenticationError.emit(logged_in=False, error_message=auth_response.err_message)
self.onAuthenticationError.emit(logged_in = False, error_message = auth_response.err_message)
self._server.stop() # Stop the web server at all times.
# Load authentication data from preferences.
## Load authentication data from preferences.
def loadAuthDataFromPreferences(self) -> None:
if self._preferences is None:
Logger.log("e", "Unable to load authentication data, since no preference has been set!")
@ -154,11 +152,11 @@ class AuthorizationService:
preferences_data = json.loads(self._preferences.getValue(self._settings.AUTH_DATA_PREFERENCE_KEY))
if preferences_data:
self._auth_data = AuthenticationResponse(**preferences_data)
self.onAuthStateChanged.emit(logged_in=True)
self.onAuthStateChanged.emit(logged_in = True)
except ValueError:
Logger.logException("w", "Could not load auth data from preferences")
# Store authentication data in preferences.
## Store authentication data in preferences.
def _storeAuthData(self, auth_data: Optional[AuthenticationResponse] = None) -> None:
if self._preferences is None:
Logger.log("e", "Unable to save authentication data, since no preference has been set!")

View file

@ -1,5 +1,6 @@
# Copyright (c) 2018 Ultimaker B.V.
# Copyright (c) 2019 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import threading
from typing import Optional, Callable, Any, TYPE_CHECKING
@ -14,12 +15,15 @@ if TYPE_CHECKING:
class LocalAuthorizationServer:
# The local LocalAuthorizationServer takes care of the oauth2 callbacks.
# Once the flow is completed, this server should be closed down again by calling stop()
# \param auth_helpers: An instance of the authorization helpers class.
# \param auth_state_changed_callback: A callback function to be called when the authorization state changes.
# \param daemon: Whether the server thread should be run in daemon mode. Note: Daemon threads are abruptly stopped
# at shutdown. Their resources (e.g. open files) may never be released.
## The local LocalAuthorizationServer takes care of the oauth2 callbacks.
# Once the flow is completed, this server should be closed down again by
# calling stop()
# \param auth_helpers An instance of the authorization helpers class.
# \param auth_state_changed_callback A callback function to be called when
# the authorization state changes.
# \param daemon Whether the server thread should be run in daemon mode.
# Note: Daemon threads are abruptly stopped at shutdown. Their resources
# (e.g. open files) may never be released.
def __init__(self, auth_helpers: "AuthorizationHelpers",
auth_state_changed_callback: Callable[["AuthenticationResponse"], Any],
daemon: bool) -> None:
@ -30,8 +34,8 @@ class LocalAuthorizationServer:
self._auth_state_changed_callback = auth_state_changed_callback
self._daemon = daemon
# Starts the local web server to handle the authorization callback.
# \param verification_code: The verification code part of the OAuth2 client identification.
## Starts the local web server to handle the authorization callback.
# \param verification_code The verification code part of the OAuth2 client identification.
def start(self, verification_code: str) -> None:
if self._web_server:
# If the server is already running (because of a previously aborted auth flow), we don't have to start it.
@ -54,7 +58,7 @@ class LocalAuthorizationServer:
self._web_server_thread = threading.Thread(None, self._web_server.serve_forever, daemon = self._daemon)
self._web_server_thread.start()
# Stops the web server if it was running. It also does some cleanup.
## Stops the web server if it was running. It also does some cleanup.
def stop(self) -> None:
Logger.log("d", "Stopping local oauth2 web server...")

View file

@ -9,7 +9,7 @@ class BaseModel:
self.__dict__.update(kwargs)
# OAuth OAuth2Settings data template.
## OAuth OAuth2Settings data template.
class OAuth2Settings(BaseModel):
CALLBACK_PORT = None # type: Optional[int]
OAUTH_SERVER_URL = None # type: Optional[str]
@ -21,14 +21,14 @@ class OAuth2Settings(BaseModel):
AUTH_FAILED_REDIRECT = "https://ultimaker.com" # type: str
# User profile data template.
## User profile data template.
class UserProfile(BaseModel):
user_id = None # type: Optional[str]
username = None # type: Optional[str]
profile_image_url = None # type: Optional[str]
# Authentication data template.
## Authentication data template.
class AuthenticationResponse(BaseModel):
"""Data comes from the token response with success flag and error message added."""
success = True # type: bool
@ -40,21 +40,20 @@ class AuthenticationResponse(BaseModel):
err_message = None # type: Optional[str]
# Response status template.
## Response status template.
class ResponseStatus(BaseModel):
code = 200 # type: int
message = "" # type str
# Response data template.
## Response data template.
class ResponseData(BaseModel):
status = None # type: ResponseStatus
data_stream = None # type: Optional[bytes]
redirect_uri = None # type: Optional[str]
content_type = "text/html" # type: str
# Possible HTTP responses.
## Possible HTTP responses.
HTTP_STATUS = {
"OK": ResponseStatus(code = 200, message = "OK"),
"NOT_FOUND": ResponseStatus(code = 404, message = "NOT FOUND"),

View file

@ -1,2 +1,2 @@
# Copyright (c) 2018 Ultimaker B.V.
# Copyright (c) 2019 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.