Separate firmware updater from USBPrinterOutputDevice

This commit is contained in:
fieldOfView 2018-08-01 15:51:10 +02:00
parent 171220205c
commit 3ac5342dfc
2 changed files with 130 additions and 110 deletions

View file

@ -0,0 +1,122 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from PyQt5.QtCore import QObject, QUrl, pyqtSignal, pyqtProperty
from cura.PrinterOutputDevice import PrinterOutputDevice
from .avr_isp import stk500v2, intelHex
from enum import IntEnum
@signalemitter
class AvrFirmwareUpdater(QObject):
firmwareProgressChanged = pyqtSignal()
firmwareUpdateStateChanged = pyqtSignal()
def __init__(self, output_device: PrinterOutputDevice) -> None:
self._output_device = output_device
self._update_firmware_thread = Thread(target=self._updateFirmware, daemon = True)
self._firmware_view = None
self._firmware_location = None
self._firmware_progress = 0
self._firmware_update_state = FirmwareUpdateState.idle
def updateFirmware(self, file):
# the file path could be url-encoded.
if file.startswith("file://"):
self._firmware_location = QUrl(file).toLocalFile()
else:
self._firmware_location = file
self.showFirmwareInterface()
self.setFirmwareUpdateState(FirmwareUpdateState.updating)
self._update_firmware_thread.start()
def _updateFirmware(self):
# Ensure that other connections are closed.
if self._connection_state != ConnectionState.closed:
self.close()
try:
hex_file = intelHex.readHex(self._firmware_location)
assert len(hex_file) > 0
except (FileNotFoundError, AssertionError):
Logger.log("e", "Unable to read provided hex file. Could not update firmware.")
self.setFirmwareUpdateState(FirmwareUpdateState.firmware_not_found_error)
return
programmer = stk500v2.Stk500v2()
programmer.progress_callback = self._onFirmwareProgress
try:
programmer.connect(self._serial_port)
except:
programmer.close()
Logger.logException("e", "Failed to update firmware")
self.setFirmwareUpdateState(FirmwareUpdateState.communication_error)
return
# Give programmer some time to connect. Might need more in some cases, but this worked in all tested cases.
sleep(1)
if not programmer.isConnected():
Logger.log("e", "Unable to connect with serial. Could not update firmware")
self.setFirmwareUpdateState(FirmwareUpdateState.communication_error)
try:
programmer.programChip(hex_file)
except SerialException:
self.setFirmwareUpdateState(FirmwareUpdateState.io_error)
return
except:
self.setFirmwareUpdateState(FirmwareUpdateState.unknown_error)
return
programmer.close()
# Clean up for next attempt.
self._update_firmware_thread = Thread(target=self._updateFirmware, daemon=True)
self._firmware_location = ""
self._onFirmwareProgress(100)
self.setFirmwareUpdateState(FirmwareUpdateState.completed)
# Try to re-connect with the machine again, which must be done on the Qt thread, so we use call later.
CuraApplication.getInstance().callLater(self.connect)
## Show firmware interface.
# This will create the view if its not already created.
def showFirmwareInterface(self):
if self._firmware_view is None:
path = os.path.join(PluginRegistry.getInstance().getPluginPath("USBPrinting"), "FirmwareUpdateWindow.qml")
self._firmware_view = CuraApplication.getInstance().createQmlComponent(path, {"manager": self})
self._firmware_view.show()
@pyqtProperty(float, notify = firmwareProgressChanged)
def firmwareProgress(self):
return self._firmware_progress
@pyqtProperty(int, notify=firmwareUpdateStateChanged)
def firmwareUpdateState(self):
return self._firmware_update_state
def setFirmwareUpdateState(self, state):
if self._firmware_update_state != state:
self._firmware_update_state = state
self.firmwareUpdateStateChanged.emit()
# Callback function for firmware update progress.
def _onFirmwareProgress(self, progress, max_progress = 100):
self._firmware_progress = (progress / max_progress) * 100 # Convert to scale of 0-100
self.firmwareProgressChanged.emit()
class FirmwareUpdateState(IntEnum):
idle = 0
updating = 1
completed = 2
unknown_error = 3
communication_error = 4
io_error = 5
firmware_not_found_error = 6

View file

@ -13,15 +13,14 @@ from cura.PrinterOutput.PrintJobOutputModel import PrintJobOutputModel
from cura.PrinterOutput.GenericOutputController import GenericOutputController from cura.PrinterOutput.GenericOutputController import GenericOutputController
from .AutoDetectBaudJob import AutoDetectBaudJob from .AutoDetectBaudJob import AutoDetectBaudJob
from .avr_isp import stk500v2, intelHex from .AvrFirmwareUpdater import AvrFirmwareUpdater
from PyQt5.QtCore import pyqtSlot, pyqtSignal, pyqtProperty, QUrl from PyQt5.QtCore import pyqtSlot, pyqtSignal, pyqtProperty
from serial import Serial, SerialException, SerialTimeoutException from serial import Serial, SerialException, SerialTimeoutException
from threading import Thread, Event from threading import Thread, Event
from time import time, sleep from time import time, sleep
from queue import Queue from queue import Queue
from enum import IntEnum
from typing import Union, Optional, List, cast from typing import Union, Optional, List, cast
import re import re
@ -32,9 +31,6 @@ catalog = i18nCatalog("cura")
class USBPrinterOutputDevice(PrinterOutputDevice): class USBPrinterOutputDevice(PrinterOutputDevice):
firmwareProgressChanged = pyqtSignal()
firmwareUpdateStateChanged = pyqtSignal()
def __init__(self, serial_port: str, baud_rate: Optional[int] = None) -> None: def __init__(self, serial_port: str, baud_rate: Optional[int] = None) -> None:
super().__init__(serial_port) super().__init__(serial_port)
self.setName(catalog.i18nc("@item:inmenu", "USB printing")) self.setName(catalog.i18nc("@item:inmenu", "USB printing"))
@ -61,8 +57,6 @@ class USBPrinterOutputDevice(PrinterOutputDevice):
# Instead of using a timer, we really need the update to be as a thread, as reading from serial can block. # Instead of using a timer, we really need the update to be as a thread, as reading from serial can block.
self._update_thread = Thread(target=self._update, daemon = True) self._update_thread = Thread(target=self._update, daemon = True)
self._update_firmware_thread = Thread(target=self._updateFirmware, daemon = True)
self._last_temperature_request = None # type: Optional[int] self._last_temperature_request = None # type: Optional[int]
self._is_printing = False # A print is being sent. self._is_printing = False # A print is being sent.
@ -75,11 +69,6 @@ class USBPrinterOutputDevice(PrinterOutputDevice):
self._paused = False self._paused = False
self._firmware_view = None
self._firmware_location = None
self._firmware_progress = 0
self._firmware_update_state = FirmwareUpdateState.idle
self.setConnectionText(catalog.i18nc("@info:status", "Connected via USB")) self.setConnectionText(catalog.i18nc("@info:status", "Connected via USB"))
# Queue for commands that need to be sent. # Queue for commands that need to be sent.
@ -88,6 +77,8 @@ class USBPrinterOutputDevice(PrinterOutputDevice):
self._command_received = Event() self._command_received = Event()
self._command_received.set() self._command_received.set()
self._firmware_updater = AvrFirmwareUpdater(self)
CuraApplication.getInstance().getOnExitCallbackManager().addCallback(self._checkActivePrintingUponAppExit) CuraApplication.getInstance().getOnExitCallbackManager().addCallback(self._checkActivePrintingUponAppExit)
# This is a callback function that checks if there is any printing in progress via USB when the application tries # This is a callback function that checks if there is any printing in progress via USB when the application tries
@ -107,6 +98,10 @@ class USBPrinterOutputDevice(PrinterOutputDevice):
application = CuraApplication.getInstance() application = CuraApplication.getInstance()
application.triggerNextExitCheck() application.triggerNextExitCheck()
@pyqtSlot(str)
def updateFirmware(self, file):
self._firmware_updater.updateFirmware(file)
## Reset USB device settings ## Reset USB device settings
# #
def resetDeviceSettings(self): def resetDeviceSettings(self):
@ -135,93 +130,6 @@ class USBPrinterOutputDevice(PrinterOutputDevice):
self._printGCode(gcode_list) self._printGCode(gcode_list)
## Show firmware interface.
# This will create the view if its not already created.
def showFirmwareInterface(self):
if self._firmware_view is None:
path = os.path.join(PluginRegistry.getInstance().getPluginPath("USBPrinting"), "FirmwareUpdateWindow.qml")
self._firmware_view = CuraApplication.getInstance().createQmlComponent(path, {"manager": self})
self._firmware_view.show()
@pyqtSlot(str)
def updateFirmware(self, file):
# the file path could be url-encoded.
if file.startswith("file://"):
self._firmware_location = QUrl(file).toLocalFile()
else:
self._firmware_location = file
self.showFirmwareInterface()
self.setFirmwareUpdateState(FirmwareUpdateState.updating)
self._update_firmware_thread.start()
def _updateFirmware(self):
# Ensure that other connections are closed.
if self._connection_state != ConnectionState.closed:
self.close()
try:
hex_file = intelHex.readHex(self._firmware_location)
assert len(hex_file) > 0
except (FileNotFoundError, AssertionError):
Logger.log("e", "Unable to read provided hex file. Could not update firmware.")
self.setFirmwareUpdateState(FirmwareUpdateState.firmware_not_found_error)
return
programmer = stk500v2.Stk500v2()
programmer.progress_callback = self._onFirmwareProgress
try:
programmer.connect(self._serial_port)
except:
programmer.close()
Logger.logException("e", "Failed to update firmware")
self.setFirmwareUpdateState(FirmwareUpdateState.communication_error)
return
# Give programmer some time to connect. Might need more in some cases, but this worked in all tested cases.
sleep(1)
if not programmer.isConnected():
Logger.log("e", "Unable to connect with serial. Could not update firmware")
self.setFirmwareUpdateState(FirmwareUpdateState.communication_error)
try:
programmer.programChip(hex_file)
except SerialException:
self.setFirmwareUpdateState(FirmwareUpdateState.io_error)
return
except:
self.setFirmwareUpdateState(FirmwareUpdateState.unknown_error)
return
programmer.close()
# Clean up for next attempt.
self._update_firmware_thread = Thread(target=self._updateFirmware, daemon=True)
self._firmware_location = ""
self._onFirmwareProgress(100)
self.setFirmwareUpdateState(FirmwareUpdateState.completed)
# Try to re-connect with the machine again, which must be done on the Qt thread, so we use call later.
CuraApplication.getInstance().callLater(self.connect)
@pyqtProperty(float, notify = firmwareProgressChanged)
def firmwareProgress(self):
return self._firmware_progress
@pyqtProperty(int, notify=firmwareUpdateStateChanged)
def firmwareUpdateState(self):
return self._firmware_update_state
def setFirmwareUpdateState(self, state):
if self._firmware_update_state != state:
self._firmware_update_state = state
self.firmwareUpdateStateChanged.emit()
# Callback function for firmware update progress.
def _onFirmwareProgress(self, progress, max_progress = 100):
self._firmware_progress = (progress / max_progress) * 100 # Convert to scale of 0-100
self.firmwareProgressChanged.emit()
## Start a print based on a g-code. ## Start a print based on a g-code.
# \param gcode_list List with gcode (strings). # \param gcode_list List with gcode (strings).
def _printGCode(self, gcode_list: List[str]): def _printGCode(self, gcode_list: List[str]):
@ -456,13 +364,3 @@ class USBPrinterOutputDevice(PrinterOutputDevice):
print_job.updateTimeTotal(estimated_time) print_job.updateTimeTotal(estimated_time)
self._gcode_position += 1 self._gcode_position += 1
class FirmwareUpdateState(IntEnum):
idle = 0
updating = 1
completed = 2
unknown_error = 3
communication_error = 4
io_error = 5
firmware_not_found_error = 6