mirror of
https://github.com/Ultimaker/Cura.git
synced 2025-07-09 07:56:22 -06:00
STAR-322: Creating a Cloud API client to handle the interaction
This commit is contained in:
parent
97535ffa24
commit
9046b39b43
7 changed files with 367 additions and 235 deletions
|
@ -1,7 +1,7 @@
|
|||
# Copyright (c) 2018 Ultimaker B.V.
|
||||
# Cura is released under the terms of the LGPLv3 or higher.
|
||||
from time import time
|
||||
from typing import Optional, Dict, Callable, List
|
||||
from typing import Optional, Dict, Callable, List, Union
|
||||
|
||||
from PyQt5.QtCore import QUrl
|
||||
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply, QHttpMultiPart, QNetworkRequest, QHttpPart, \
|
||||
|
@ -49,6 +49,8 @@ class NetworkClient:
|
|||
|
||||
## Create a new empty network request.
|
||||
# Automatically adds the required HTTP headers.
|
||||
# \param url: The URL to request
|
||||
# \param content_type: The type of the body contents.
|
||||
def _createEmptyRequest(self, url: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
|
||||
request = QNetworkRequest(QUrl(url))
|
||||
if content_type:
|
||||
|
@ -120,67 +122,82 @@ class NetworkClient:
|
|||
def createFormPart(self, content_header: str, data: bytes, content_type: Optional[str] = None) -> QHttpPart:
|
||||
return self._createFormPart(content_header, data, content_type)
|
||||
|
||||
## Does a PUT request to the given URL.
|
||||
def put(self, url: str, data: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
|
||||
## Sends a put request to the given path.
|
||||
# url: The path after the API prefix.
|
||||
# data: The data to be sent in the body
|
||||
# content_type: The content type of the body data.
|
||||
# on_finished: The function to call when the response is received.
|
||||
# on_progress: The function to call when the progress changes. Parameters are bytes_sent / bytes_total.
|
||||
def put(self, url: str, data: Union[str, bytes], content_type: Optional[str] = None,
|
||||
on_finished: Optional[Callable[[QNetworkReply], None]] = None,
|
||||
on_progress: Optional[Callable[[int, int], None]] = None) -> None:
|
||||
self._validateManager()
|
||||
|
||||
request = self._createEmptyRequest(url)
|
||||
self._last_request_time = time()
|
||||
|
||||
if not self._manager:
|
||||
Logger.log("e", "No network manager was created to execute the PUT call with.")
|
||||
return
|
||||
|
||||
reply = self._manager.put(request, data.encode())
|
||||
request = self._createEmptyRequest(url, content_type = content_type)
|
||||
self._last_request_time = time()
|
||||
|
||||
if not self._manager:
|
||||
return Logger.log("e", "No network manager was created to execute the PUT call with.")
|
||||
|
||||
body = data if isinstance(data, bytes) else data.encode() # type: bytes
|
||||
reply = self._manager.put(request, body)
|
||||
self._registerOnFinishedCallback(reply, on_finished)
|
||||
|
||||
## Does a DELETE request to the given URL.
|
||||
if on_progress is not None:
|
||||
reply.uploadProgress.connect(on_progress)
|
||||
|
||||
## Sends a delete request to the given path.
|
||||
# url: The path after the API prefix.
|
||||
# on_finished: The function to be call when the response is received.
|
||||
def delete(self, url: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
|
||||
self._validateManager()
|
||||
|
||||
|
||||
request = self._createEmptyRequest(url)
|
||||
self._last_request_time = time()
|
||||
|
||||
|
||||
if not self._manager:
|
||||
Logger.log("e", "No network manager was created to execute the DELETE call with.")
|
||||
return
|
||||
|
||||
return Logger.log("e", "No network manager was created to execute the DELETE call with.")
|
||||
|
||||
reply = self._manager.deleteResource(request)
|
||||
self._registerOnFinishedCallback(reply, on_finished)
|
||||
|
||||
## Does a GET request to the given URL.
|
||||
## Sends a get request to the given path.
|
||||
# \param url: The path after the API prefix.
|
||||
# \param on_finished: The function to be call when the response is received.
|
||||
def get(self, url: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
|
||||
self._validateManager()
|
||||
|
||||
|
||||
request = self._createEmptyRequest(url)
|
||||
self._last_request_time = time()
|
||||
|
||||
|
||||
if not self._manager:
|
||||
Logger.log("e", "No network manager was created to execute the GET call with.")
|
||||
return
|
||||
|
||||
return Logger.log("e", "No network manager was created to execute the GET call with.")
|
||||
|
||||
reply = self._manager.get(request)
|
||||
self._registerOnFinishedCallback(reply, on_finished)
|
||||
|
||||
## Does a POST request to the given URL.
|
||||
def post(self, url: str, data: str, on_finished: Optional[Callable[[QNetworkReply], None]],
|
||||
on_progress: Callable = None) -> None:
|
||||
## Sends a post request to the given path.
|
||||
# \param url: The path after the API prefix.
|
||||
# \param data: The data to be sent in the body
|
||||
# \param on_finished: The function to call when the response is received.
|
||||
# \param on_progress: The function to call when the progress changes. Parameters are bytes_sent / bytes_total.
|
||||
def post(self, url: str, data: Union[str, bytes],
|
||||
on_finished: Optional[Callable[[QNetworkReply], None]],
|
||||
on_progress: Optional[Callable[[int, int], None]] = None) -> None:
|
||||
self._validateManager()
|
||||
|
||||
|
||||
request = self._createEmptyRequest(url)
|
||||
self._last_request_time = time()
|
||||
|
||||
|
||||
if not self._manager:
|
||||
Logger.log("e", "No network manager was created to execute the GET call with.")
|
||||
return
|
||||
|
||||
reply = self._manager.post(request, data.encode())
|
||||
|
||||
return Logger.log("e", "Could not find manager.")
|
||||
|
||||
body = data if isinstance(data, bytes) else data.encode() # type: bytes
|
||||
reply = self._manager.post(request, body)
|
||||
if on_progress is not None:
|
||||
reply.uploadProgress.connect(on_progress)
|
||||
|
||||
self._registerOnFinishedCallback(reply, on_finished)
|
||||
|
||||
|
||||
## Does a POST request with form data to the given URL.
|
||||
def postForm(self, url: str, header_data: str, body_data: bytes,
|
||||
on_finished: Optional[Callable[[QNetworkReply], None]],
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue