from UM.Application import Application from cura.PrinterOutputDevice import PrinterOutputDevice from PyQt5.QtNetwork import QHttpMultiPart, QHttpPart, QNetworkRequest, QNetworkAccessManager, QNetworkReply from PyQt5.QtCore import QUrl from time import time from typing import Callable class NetworkedPrinterOutputDevice(PrinterOutputDevice): def __init__(self, device_id, parent = None): super().__init__(device_id = device_id, parent = parent) self._manager = None self._createNetworkManager() self._last_response_time = time() self._last_request_time = None self._api_prefix = "" self._address = "" self._user_agent = "%s/%s " % (Application.getInstance().getApplicationName(), Application.getInstance().getVersion()) self._onFinishedCallbacks = {} def _update(self): if not self._manager.networkAccessible(): pass # TODO: no internet connection. pass def _createEmptyRequest(self, target): url = QUrl("http://" + self._address + self._api_prefix + target) request = QNetworkRequest(url) request.setHeader(QNetworkRequest.ContentTypeHeader, "application/json") request.setHeader(QNetworkRequest.UserAgentHeader, self._user_agent) return request def _put(self, target: str, data: str, onFinished: Callable): request = self._createEmptyRequest(target) self._onFinishedCallbacks[request] = onFinished self._manager.put(request, data.encode()) def _get(self, target: str, onFinished: Callable): request = self._createEmptyRequest(target) self._onFinishedCallbacks[request] = onFinished self._manager.get(request) def _delete(self, target: str, onFinished: Callable): pass def _post(self, target: str, data: str, onFinished: Callable, onProgress: Callable): pass def _createNetworkManager(self): if self._manager: self._manager.finished.disconnect(self.__handleOnFinished) #self._manager.networkAccessibleChanged.disconnect(self._onNetworkAccesibleChanged) #self._manager.authenticationRequired.disconnect(self._onAuthenticationRequired) self._manager = QNetworkAccessManager() self._manager.finished.connect(self.__handleOnFinished) #self._manager.authenticationRequired.connect(self._onAuthenticationRequired) #self._manager.networkAccessibleChanged.connect(self._onNetworkAccesibleChanged) # for debug purposes def __handleOnFinished(self, reply: QNetworkReply): self._last_response_time = time() try: self._onFinishedCallbacks[reply.request()](reply) del self._onFinishedCallbacks[reply.request] # Remove the callback. except Exception as e: print("Something went wrong with callback", e) pass