From 3ac5342dfc0d37e7462ed77e9d9b7321ec769ef3 Mon Sep 17 00:00:00 2001 From: fieldOfView Date: Wed, 1 Aug 2018 15:51:10 +0200 Subject: [PATCH] Separate firmware updater from USBPrinterOutputDevice --- plugins/USBPrinting/AvrFirmwareUpdater.py | 122 ++++++++++++++++++ plugins/USBPrinting/USBPrinterOutputDevice.py | 118 ++--------------- 2 files changed, 130 insertions(+), 110 deletions(-) create mode 100644 plugins/USBPrinting/AvrFirmwareUpdater.py diff --git a/plugins/USBPrinting/AvrFirmwareUpdater.py b/plugins/USBPrinting/AvrFirmwareUpdater.py new file mode 100644 index 0000000000..5c2b8dc19e --- /dev/null +++ b/plugins/USBPrinting/AvrFirmwareUpdater.py @@ -0,0 +1,122 @@ +# Copyright (c) 2018 Ultimaker B.V. +# Cura is released under the terms of the LGPLv3 or higher. + +from PyQt5.QtCore import QObject, QUrl, pyqtSignal, pyqtProperty + +from cura.PrinterOutputDevice import PrinterOutputDevice + +from .avr_isp import stk500v2, intelHex + +from enum import IntEnum + +@signalemitter +class AvrFirmwareUpdater(QObject): + firmwareProgressChanged = pyqtSignal() + firmwareUpdateStateChanged = pyqtSignal() + + def __init__(self, output_device: PrinterOutputDevice) -> None: + self._output_device = output_device + + self._update_firmware_thread = Thread(target=self._updateFirmware, daemon = True) + + self._firmware_view = None + self._firmware_location = None + self._firmware_progress = 0 + self._firmware_update_state = FirmwareUpdateState.idle + + def updateFirmware(self, file): + # the file path could be url-encoded. + if file.startswith("file://"): + self._firmware_location = QUrl(file).toLocalFile() + else: + self._firmware_location = file + self.showFirmwareInterface() + self.setFirmwareUpdateState(FirmwareUpdateState.updating) + self._update_firmware_thread.start() + + def _updateFirmware(self): + # Ensure that other connections are closed. + if self._connection_state != ConnectionState.closed: + self.close() + + try: + hex_file = intelHex.readHex(self._firmware_location) + assert len(hex_file) > 0 + except (FileNotFoundError, AssertionError): + Logger.log("e", "Unable to read provided hex file. Could not update firmware.") + self.setFirmwareUpdateState(FirmwareUpdateState.firmware_not_found_error) + return + + programmer = stk500v2.Stk500v2() + programmer.progress_callback = self._onFirmwareProgress + + try: + programmer.connect(self._serial_port) + except: + programmer.close() + Logger.logException("e", "Failed to update firmware") + self.setFirmwareUpdateState(FirmwareUpdateState.communication_error) + return + + # Give programmer some time to connect. Might need more in some cases, but this worked in all tested cases. + sleep(1) + if not programmer.isConnected(): + Logger.log("e", "Unable to connect with serial. Could not update firmware") + self.setFirmwareUpdateState(FirmwareUpdateState.communication_error) + try: + programmer.programChip(hex_file) + except SerialException: + self.setFirmwareUpdateState(FirmwareUpdateState.io_error) + return + except: + self.setFirmwareUpdateState(FirmwareUpdateState.unknown_error) + return + + programmer.close() + + # Clean up for next attempt. + self._update_firmware_thread = Thread(target=self._updateFirmware, daemon=True) + self._firmware_location = "" + self._onFirmwareProgress(100) + self.setFirmwareUpdateState(FirmwareUpdateState.completed) + + # Try to re-connect with the machine again, which must be done on the Qt thread, so we use call later. + CuraApplication.getInstance().callLater(self.connect) + + ## Show firmware interface. + # This will create the view if its not already created. + def showFirmwareInterface(self): + if self._firmware_view is None: + path = os.path.join(PluginRegistry.getInstance().getPluginPath("USBPrinting"), "FirmwareUpdateWindow.qml") + self._firmware_view = CuraApplication.getInstance().createQmlComponent(path, {"manager": self}) + + self._firmware_view.show() + + @pyqtProperty(float, notify = firmwareProgressChanged) + def firmwareProgress(self): + return self._firmware_progress + + @pyqtProperty(int, notify=firmwareUpdateStateChanged) + def firmwareUpdateState(self): + return self._firmware_update_state + + def setFirmwareUpdateState(self, state): + if self._firmware_update_state != state: + self._firmware_update_state = state + self.firmwareUpdateStateChanged.emit() + + # Callback function for firmware update progress. + def _onFirmwareProgress(self, progress, max_progress = 100): + self._firmware_progress = (progress / max_progress) * 100 # Convert to scale of 0-100 + self.firmwareProgressChanged.emit() + + +class FirmwareUpdateState(IntEnum): + idle = 0 + updating = 1 + completed = 2 + unknown_error = 3 + communication_error = 4 + io_error = 5 + firmware_not_found_error = 6 + diff --git a/plugins/USBPrinting/USBPrinterOutputDevice.py b/plugins/USBPrinting/USBPrinterOutputDevice.py index 45b566fcab..bc2350e50f 100644 --- a/plugins/USBPrinting/USBPrinterOutputDevice.py +++ b/plugins/USBPrinting/USBPrinterOutputDevice.py @@ -13,15 +13,14 @@ from cura.PrinterOutput.PrintJobOutputModel import PrintJobOutputModel from cura.PrinterOutput.GenericOutputController import GenericOutputController from .AutoDetectBaudJob import AutoDetectBaudJob -from .avr_isp import stk500v2, intelHex +from .AvrFirmwareUpdater import AvrFirmwareUpdater -from PyQt5.QtCore import pyqtSlot, pyqtSignal, pyqtProperty, QUrl +from PyQt5.QtCore import pyqtSlot, pyqtSignal, pyqtProperty from serial import Serial, SerialException, SerialTimeoutException from threading import Thread, Event from time import time, sleep from queue import Queue -from enum import IntEnum from typing import Union, Optional, List, cast import re @@ -32,9 +31,6 @@ catalog = i18nCatalog("cura") class USBPrinterOutputDevice(PrinterOutputDevice): - firmwareProgressChanged = pyqtSignal() - firmwareUpdateStateChanged = pyqtSignal() - def __init__(self, serial_port: str, baud_rate: Optional[int] = None) -> None: super().__init__(serial_port) self.setName(catalog.i18nc("@item:inmenu", "USB printing")) @@ -61,8 +57,6 @@ class USBPrinterOutputDevice(PrinterOutputDevice): # Instead of using a timer, we really need the update to be as a thread, as reading from serial can block. self._update_thread = Thread(target=self._update, daemon = True) - self._update_firmware_thread = Thread(target=self._updateFirmware, daemon = True) - self._last_temperature_request = None # type: Optional[int] self._is_printing = False # A print is being sent. @@ -75,11 +69,6 @@ class USBPrinterOutputDevice(PrinterOutputDevice): self._paused = False - self._firmware_view = None - self._firmware_location = None - self._firmware_progress = 0 - self._firmware_update_state = FirmwareUpdateState.idle - self.setConnectionText(catalog.i18nc("@info:status", "Connected via USB")) # Queue for commands that need to be sent. @@ -88,6 +77,8 @@ class USBPrinterOutputDevice(PrinterOutputDevice): self._command_received = Event() self._command_received.set() + self._firmware_updater = AvrFirmwareUpdater(self) + CuraApplication.getInstance().getOnExitCallbackManager().addCallback(self._checkActivePrintingUponAppExit) # This is a callback function that checks if there is any printing in progress via USB when the application tries @@ -107,6 +98,10 @@ class USBPrinterOutputDevice(PrinterOutputDevice): application = CuraApplication.getInstance() application.triggerNextExitCheck() + @pyqtSlot(str) + def updateFirmware(self, file): + self._firmware_updater.updateFirmware(file) + ## Reset USB device settings # def resetDeviceSettings(self): @@ -135,93 +130,6 @@ class USBPrinterOutputDevice(PrinterOutputDevice): self._printGCode(gcode_list) - ## Show firmware interface. - # This will create the view if its not already created. - def showFirmwareInterface(self): - if self._firmware_view is None: - path = os.path.join(PluginRegistry.getInstance().getPluginPath("USBPrinting"), "FirmwareUpdateWindow.qml") - self._firmware_view = CuraApplication.getInstance().createQmlComponent(path, {"manager": self}) - - self._firmware_view.show() - - @pyqtSlot(str) - def updateFirmware(self, file): - # the file path could be url-encoded. - if file.startswith("file://"): - self._firmware_location = QUrl(file).toLocalFile() - else: - self._firmware_location = file - self.showFirmwareInterface() - self.setFirmwareUpdateState(FirmwareUpdateState.updating) - self._update_firmware_thread.start() - - def _updateFirmware(self): - # Ensure that other connections are closed. - if self._connection_state != ConnectionState.closed: - self.close() - - try: - hex_file = intelHex.readHex(self._firmware_location) - assert len(hex_file) > 0 - except (FileNotFoundError, AssertionError): - Logger.log("e", "Unable to read provided hex file. Could not update firmware.") - self.setFirmwareUpdateState(FirmwareUpdateState.firmware_not_found_error) - return - - programmer = stk500v2.Stk500v2() - programmer.progress_callback = self._onFirmwareProgress - - try: - programmer.connect(self._serial_port) - except: - programmer.close() - Logger.logException("e", "Failed to update firmware") - self.setFirmwareUpdateState(FirmwareUpdateState.communication_error) - return - - # Give programmer some time to connect. Might need more in some cases, but this worked in all tested cases. - sleep(1) - if not programmer.isConnected(): - Logger.log("e", "Unable to connect with serial. Could not update firmware") - self.setFirmwareUpdateState(FirmwareUpdateState.communication_error) - try: - programmer.programChip(hex_file) - except SerialException: - self.setFirmwareUpdateState(FirmwareUpdateState.io_error) - return - except: - self.setFirmwareUpdateState(FirmwareUpdateState.unknown_error) - return - - programmer.close() - - # Clean up for next attempt. - self._update_firmware_thread = Thread(target=self._updateFirmware, daemon=True) - self._firmware_location = "" - self._onFirmwareProgress(100) - self.setFirmwareUpdateState(FirmwareUpdateState.completed) - - # Try to re-connect with the machine again, which must be done on the Qt thread, so we use call later. - CuraApplication.getInstance().callLater(self.connect) - - @pyqtProperty(float, notify = firmwareProgressChanged) - def firmwareProgress(self): - return self._firmware_progress - - @pyqtProperty(int, notify=firmwareUpdateStateChanged) - def firmwareUpdateState(self): - return self._firmware_update_state - - def setFirmwareUpdateState(self, state): - if self._firmware_update_state != state: - self._firmware_update_state = state - self.firmwareUpdateStateChanged.emit() - - # Callback function for firmware update progress. - def _onFirmwareProgress(self, progress, max_progress = 100): - self._firmware_progress = (progress / max_progress) * 100 # Convert to scale of 0-100 - self.firmwareProgressChanged.emit() - ## Start a print based on a g-code. # \param gcode_list List with gcode (strings). def _printGCode(self, gcode_list: List[str]): @@ -456,13 +364,3 @@ class USBPrinterOutputDevice(PrinterOutputDevice): print_job.updateTimeTotal(estimated_time) self._gcode_position += 1 - - -class FirmwareUpdateState(IntEnum): - idle = 0 - updating = 1 - completed = 2 - unknown_error = 3 - communication_error = 4 - io_error = 5 - firmware_not_found_error = 6