STAR-322: Making mypy happy

This commit is contained in:
Daniel Schiavini 2018-12-17 12:01:10 +01:00
parent 9f4b7bd703
commit 9eb743bcb8
5 changed files with 64 additions and 46 deletions

View file

@ -3,7 +3,7 @@
import json
from json import JSONDecodeError
from time import time
from typing import Callable, List, Type, TypeVar, Union, Optional, Tuple, Dict, Any
from typing import Callable, List, Type, TypeVar, Union, Optional, Tuple, Dict, Any, cast
from PyQt5.QtCore import QUrl
from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply, QNetworkAccessManager
@ -40,7 +40,7 @@ class CloudApiClient:
self._on_error = on_error
self._upload = None # type: Optional[MeshUploader]
# in order to avoid garbage collection we keep the callbacks in this list.
self._anti_gc_callbacks = [] # type: List[Callable[[QNetworkReply], None]]
self._anti_gc_callbacks = [] # type: List[Callable[[], None]]
## Gets the account used for the API.
@property
@ -128,12 +128,16 @@ class CloudApiClient:
# \param on_finished: The callback in case the response is successful.
# \param model_class: The type of the model to convert the response to. It may either be a single record or a list.
def _parseModels(self, response: Dict[str, Any],
on_finished: Callable[[Union[Model, List[Model]]], Any],
on_finished: Union[Callable[[Model], Any], Callable[[List[Model]], Any]],
model_class: Type[Model]) -> None:
if "data" in response:
data = response["data"]
result = [model_class(**c) for c in data] if isinstance(data, list) else model_class(**data)
on_finished(result)
if isinstance(data, list):
results = [model_class(**c) for c in data] # type: List[CloudApiClient.Model]
cast(Callable[[List[CloudApiClient.Model]], Any], on_finished)(results)
else:
result = model_class(**data) # type: CloudApiClient.Model
cast(Callable[[CloudApiClient.Model], Any], on_finished)(result)
elif "errors" in response:
self._on_error([CloudErrorObject(**error) for error in response["errors"]])
else:
@ -145,7 +149,7 @@ class CloudApiClient:
# \return: A function that can be passed to the
def _addCallbacks(self,
reply: QNetworkReply,
on_finished: Callable[[Union[Model, List[Model]]], Any],
on_finished: Union[Callable[[Model], Any], Callable[[List[Model]], Any]],
model: Type[Model],
) -> None:
def parse() -> None:

View file

@ -3,7 +3,7 @@
import os
from time import time
from typing import Dict, List, Optional, Set
from typing import Dict, List, Optional, Set, cast
from PyQt5.QtCore import QObject, QUrl, pyqtProperty, pyqtSignal, pyqtSlot
@ -161,7 +161,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self.setConnectionText(T.CONNECTED_VIA_CLOUD)
## Called when Cura requests an output device to receive a (G-code) file.
def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mime_types: bool = False,
def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False,
file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
# Show an error message if we're already sending a job.
@ -190,7 +190,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self._mesh = mesh
request = CloudPrintJobUploadRequest(
job_name = file_name,
job_name = file_name or mesh_format.file_extension,
file_size = len(mesh),
content_type = mesh_format.mime_type,
)
@ -317,13 +317,14 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
def _onPrintJobCreated(self, job_response: CloudPrintJobResponse) -> None:
self._progress.show()
self._uploaded_print_job = job_response
self._api.uploadMesh(job_response, self._mesh, self._onPrintJobUploaded, self._progress.update,
self._onUploadError)
mesh = cast(bytes, self._mesh)
self._api.uploadMesh(job_response, mesh, self._onPrintJobUploaded, self._progress.update, self._onUploadError)
## Requests the print to be sent to the printer when we finished uploading the mesh.
def _onPrintJobUploaded(self) -> None:
self._progress.update(100)
self._api.requestPrint(self.key, self._uploaded_print_job.job_id, self._onPrintRequested)
print_job = cast(CloudPrintJobResponse, self._uploaded_print_job)
self._api.requestPrint(self.key, print_job.job_id, self._onPrintRequested)
## Displays the given message if uploading the mesh has failed
# \param message: The message to display.

View file

@ -3,7 +3,7 @@
# -*- coding: utf-8 -*-
from PyQt5.QtCore import QUrl
from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply, QNetworkAccessManager
from typing import Optional, Callable, Any, Tuple
from typing import Optional, Callable, Any, Tuple, cast
from UM.Logger import Logger
from src.Cloud.Models.CloudPrintJobResponse import CloudPrintJobResponse
@ -20,7 +20,8 @@ class MeshUploader:
# \param http_method: The HTTP method to be used, e.g. "POST" or "PUT".
# \param timeout: The timeout for each chunk upload. Important: If None, no timeout is applied at all.
def __init__(self, manager: QNetworkAccessManager, print_job: CloudPrintJobResponse, data: bytes,
on_finished: Callable[[], Any], on_progress: Callable[[int], Any], on_error: Callable[[], Any]):
on_finished: Callable[[], Any], on_progress: Callable[[int], Any], on_error: Callable[[], Any]
) -> None:
self._manager = manager
self._print_job = print_job
self._data = data
@ -85,20 +86,22 @@ class MeshUploader:
self._on_progress(int((self._sent_bytes + bytes_sent) / len(self._data) * 100))
def _errorCallback(self) -> None:
body = bytes(self._reply.readAll()).decode()
reply = cast(QNetworkReply, self._reply)
body = bytes(reply.readAll()).decode()
Logger.log("e", "Received error while uploading: %s", body)
self.stop()
self._on_error()
def _finishedCallback(self) -> None:
reply = cast(QNetworkReply, self._reply)
Logger.log("i", "Finished callback %s %s",
self._reply.attribute(QNetworkRequest.HttpStatusCodeAttribute), self._reply.url().toString())
reply.attribute(QNetworkRequest.HttpStatusCodeAttribute), reply.url().toString())
status_code = self._reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
if self._retries < self.MAX_RETRIES and status_code in self.RETRY_HTTP_CODES:
self._retries += 1
Logger.log("i", "Retrying %s/%s request %s", self._retries, self.MAX_RETRIES, self._reply.url().toString())
Logger.log("i", "Retrying %s/%s request %s", self._retries, self.MAX_RETRIES, reply.url().toString())
self._uploadChunk()
return
@ -106,9 +109,9 @@ class MeshUploader:
self._errorCallback()
return
body = bytes(self._reply.readAll()).decode()
body = bytes(reply.readAll()).decode()
Logger.log("w", "status_code: %s, Headers: %s, body: %s", status_code,
[bytes(header).decode() for header in self._reply.rawHeaderList()], body)
[bytes(header).decode() for header in reply.rawHeaderList()], body)
first_byte, last_byte = self._chunkRange()
self._sent_bytes += last_byte - first_byte