mirror of
https://github.com/Ultimaker/Cura.git
synced 2025-07-06 14:37:29 -06:00
Add 'plugins/USBPrinting/' from commit 'b28ca0881a
'
git-subtree-dir: plugins/USBPrinting git-subtree-mainline:3823afd8cc
git-subtree-split:b28ca0881a
This commit is contained in:
commit
63e8cf72a3
10 changed files with 1204 additions and 0 deletions
55
plugins/USBPrinting/ControlWindow.qml
Normal file
55
plugins/USBPrinting/ControlWindow.qml
Normal file
|
@ -0,0 +1,55 @@
|
|||
import QtQuick 2.1
|
||||
import QtQuick.Controls 1.1
|
||||
import QtQuick.Layouts 1.1
|
||||
Rectangle
|
||||
{
|
||||
width: 300; height: 100
|
||||
ColumnLayout
|
||||
{
|
||||
RowLayout
|
||||
{
|
||||
Text
|
||||
{
|
||||
text: "extruder temperature " + manager.extruderTemperature
|
||||
}
|
||||
Text
|
||||
{
|
||||
text: "bed temperature " + manager.bedTemperature
|
||||
}
|
||||
Text
|
||||
{
|
||||
text: "" + manager.error
|
||||
}
|
||||
|
||||
}
|
||||
RowLayout
|
||||
{
|
||||
Button
|
||||
{
|
||||
text: "Print"
|
||||
onClicked: { manager.startPrint() }
|
||||
enabled: manager.progress == 0 ? true : false
|
||||
}
|
||||
Button
|
||||
{
|
||||
text: "Cancel"
|
||||
onClicked: { manager.cancelPrint() }
|
||||
enabled: manager.progress == 0 ? false: true
|
||||
}
|
||||
}
|
||||
ProgressBar
|
||||
{
|
||||
id: prog;
|
||||
value: manager.progress
|
||||
minimumValue: 0;
|
||||
maximumValue: 100;
|
||||
Layout.maximumWidth:parent.width
|
||||
Layout.preferredWidth:230
|
||||
Layout.preferredHeight:25
|
||||
Layout.minimumWidth:230
|
||||
Layout.minimumHeight:25
|
||||
width: 230
|
||||
height: 25
|
||||
}
|
||||
}
|
||||
}
|
29
plugins/USBPrinting/FirmwareUpdateWindow.qml
Normal file
29
plugins/USBPrinting/FirmwareUpdateWindow.qml
Normal file
|
@ -0,0 +1,29 @@
|
|||
import QtQuick 2.1
|
||||
import QtQuick.Controls 1.1
|
||||
import QtQuick.Layouts 1.1
|
||||
Rectangle
|
||||
{
|
||||
width: 300; height: 100
|
||||
ColumnLayout
|
||||
{
|
||||
|
||||
Text
|
||||
{
|
||||
text: manager.progress == 0 ? "Starting firmware update, may take a while.": manager.progress > 99 ? "Firmware update completed.": "Updating firmware."
|
||||
}
|
||||
ProgressBar
|
||||
{
|
||||
id: prog;
|
||||
value: manager.progress
|
||||
minimumValue: 0;
|
||||
maximumValue: 100;
|
||||
Layout.maximumWidth:parent.width
|
||||
Layout.preferredWidth:230
|
||||
Layout.preferredHeight:25
|
||||
Layout.minimumWidth:230
|
||||
Layout.minimumHeight:25
|
||||
width: 230
|
||||
height: 25
|
||||
}
|
||||
}
|
||||
}
|
470
plugins/USBPrinting/PrinterConnection.py
Normal file
470
plugins/USBPrinting/PrinterConnection.py
Normal file
|
@ -0,0 +1,470 @@
|
|||
from .avr_isp import stk500v2, ispBase, intelHex
|
||||
import serial
|
||||
import threading
|
||||
import time
|
||||
import queue
|
||||
import re
|
||||
import functools
|
||||
|
||||
from UM.Application import Application
|
||||
from UM.Signal import Signal, SignalEmitter
|
||||
from UM.Resources import Resources
|
||||
from UM.Logger import Logger
|
||||
|
||||
class PrinterConnection(SignalEmitter):
|
||||
def __init__(self, serial_port):
|
||||
super().__init__()
|
||||
|
||||
self._serial = None
|
||||
self._serial_port = serial_port
|
||||
self._error_state = None
|
||||
|
||||
self._connect_thread = threading.Thread(target = self._connect)
|
||||
self._connect_thread.daemon = True
|
||||
|
||||
# Printer is connected
|
||||
self._is_connected = False
|
||||
|
||||
# Printer is in the process of connecting
|
||||
self._is_connecting = False
|
||||
|
||||
# The baud checking is done by sending a number of m105 commands to the printer and waiting for a readable
|
||||
# response. If the baudrate is correct, this should make sense, else we get giberish.
|
||||
self._required_responses_auto_baud = 10
|
||||
|
||||
self._progress = 0
|
||||
|
||||
self._listen_thread = threading.Thread(target=self._listen)
|
||||
self._listen_thread.daemon = True
|
||||
|
||||
self._update_firmware_thread = threading.Thread(target= self._updateFirmware)
|
||||
self._update_firmware_thread.demon = True
|
||||
|
||||
self._heatup_wait_start_time = time.time()
|
||||
|
||||
## Queue for commands that need to be send. Used when command is sent when a print is active.
|
||||
self._command_queue = queue.Queue()
|
||||
|
||||
self._is_printing = False
|
||||
|
||||
## Set when print is started in order to check running time.
|
||||
self._print_start_time = None
|
||||
self._print_start_time_100 = None
|
||||
|
||||
## Keep track where in the provided g-code the print is
|
||||
self._gcode_position = 0
|
||||
|
||||
# List of gcode lines to be printed
|
||||
self._gcode = None
|
||||
|
||||
# Number of extruders
|
||||
self._extruder_count = 1
|
||||
|
||||
# Temperatures of all extruders
|
||||
self._extruder_temperatures = [0] * self._extruder_count
|
||||
|
||||
# Target temperatures of all extruders
|
||||
self._target_extruder_temperatures = [0] * self._extruder_count
|
||||
|
||||
#Target temperature of the bed
|
||||
self._target_bed_temperature = 0
|
||||
|
||||
# Temperature of the bed
|
||||
self._bed_temperature = 0
|
||||
|
||||
# Current Z stage location
|
||||
self._current_z = 0
|
||||
|
||||
# In order to keep the connection alive we request the temperature every so often from a different extruder.
|
||||
# This index is the extruder we requested data from the last time.
|
||||
self._temperature_requested_extruder_index = 0
|
||||
|
||||
self._updating_firmware = False
|
||||
|
||||
self._firmware_file_name = None
|
||||
|
||||
# TODO: Might need to add check that extruders can not be changed when it started printing or loading these settings from settings object
|
||||
def setNumExtuders(self, num):
|
||||
self._extruder_count = num
|
||||
self._extruder_temperatures = [0] * self._extruder_count
|
||||
self._target_extruder_temperatures = [0] * self._extruder_count
|
||||
|
||||
## Is the printer actively printing
|
||||
def isPrinting(self):
|
||||
if not self._is_connected or self._serial is None:
|
||||
return False
|
||||
return self._is_printing
|
||||
|
||||
## Start a print based on a g-code.
|
||||
# \param gcode_list List with gcode (strings).
|
||||
def printGCode(self, gcode_list):
|
||||
if self.isPrinting() or not self._is_connected:
|
||||
return
|
||||
self._gcode = gcode_list
|
||||
|
||||
#Reset line number. If this is not done, first line is sometimes ignored
|
||||
self._gcode.insert(0, "M110")
|
||||
self._gcode_position = 0
|
||||
self._print_start_time_100 = None
|
||||
self._is_printing = True
|
||||
self._print_start_time = time.time()
|
||||
|
||||
for i in range(0, 4): #Push first 4 entries before accepting other inputs
|
||||
self._sendNextGcodeLine()
|
||||
|
||||
## Get the serial port string of this connection.
|
||||
# \return serial port
|
||||
def getSerialPort(self):
|
||||
return self._serial_port
|
||||
|
||||
## Try to connect the serial. This simply starts the thread, which runs _connect.
|
||||
def connect(self):
|
||||
if not self._updating_firmware and not self._connect_thread.isAlive():
|
||||
self._connect_thread.start()
|
||||
|
||||
## Private fuction (threaded) that actually uploads the firmware.
|
||||
def _updateFirmware(self):
|
||||
if self._is_connecting or self._is_connected:
|
||||
self.close()
|
||||
hex_file = intelHex.readHex(self._firmware_file_name)
|
||||
|
||||
if len(hex_file) == 0:
|
||||
Logger.log('e', "Unable to read provided hex file. Could not update firmware")
|
||||
return
|
||||
|
||||
programmer = stk500v2.Stk500v2()
|
||||
programmer.progressCallback = self.setProgress
|
||||
programmer.connect(self._serial_port)
|
||||
|
||||
time.sleep(1) # Give programmer some time to connect. Might need more in some cases, but this worked in all tested cases.
|
||||
|
||||
if not programmer.isConnected():
|
||||
Logger.log('e', "Unable to connect with serial. Could not update firmware")
|
||||
return
|
||||
|
||||
self._updating_firmware = True
|
||||
|
||||
try:
|
||||
programmer.programChip(hex_file)
|
||||
self._updating_firmware = False
|
||||
except Exception as e:
|
||||
Logger.log("e", "Exception while trying to update firmware %s" %e)
|
||||
self._updating_firmware = False
|
||||
return
|
||||
programmer.close()
|
||||
|
||||
## Upload new firmware to machine
|
||||
# \param filename full path of firmware file to be uploaded
|
||||
def updateFirmware(self, file_name):
|
||||
self._firmware_file_name = file_name
|
||||
self._update_firmware_thread.start()
|
||||
|
||||
## Private connect function run by thread. Can be started by calling connect.
|
||||
def _connect(self):
|
||||
self._is_connecting = True
|
||||
programmer = stk500v2.Stk500v2()
|
||||
try:
|
||||
programmer.connect(self._serial_port) # Connect with the serial, if this succeeds, it's an arduino based usb device.
|
||||
self._serial = programmer.leaveISP()
|
||||
except ispBase.IspError as e:
|
||||
Logger.log('i', "Could not establish connection on %s: %s. Device is not arduino based." %(self._serial_port,str(e)))
|
||||
except Exception as e:
|
||||
Logger.log('i', "Could not establish connection on %s, unknown reasons. Device is not arduino based." % self._serial_port)
|
||||
|
||||
# If the programmer connected, we know its an atmega based version. Not all that usefull, but it does give some debugging information.
|
||||
for baud_rate in self._getBaudrateList(): # Cycle all baud rates (auto detect)
|
||||
|
||||
if self._serial is None:
|
||||
try:
|
||||
self._serial = serial.Serial(str(self._serial_port), baud_rate, timeout=3, writeTimeout=10000)
|
||||
except serial.SerialException:
|
||||
Logger.log('i', "Could not open port %s" % self._serial_port)
|
||||
return
|
||||
else:
|
||||
if not self.setBaudRate(baud_rate):
|
||||
continue # Could not set the baud rate, go to the next
|
||||
time.sleep(1.5) # Ensure that we are not talking to the bootloader. 1.5 sec seems to be the magic number
|
||||
sucesfull_responses = 0
|
||||
timeout_time = time.time() + 5
|
||||
self._serial.write(b"\n")
|
||||
self._sendCommand("M105") # Request temperature, as this should (if baudrate is correct) result in a command with 'T:' in it
|
||||
while timeout_time > time.time():
|
||||
line = self._readline()
|
||||
if line is None:
|
||||
self.setIsConnected(False) # Something went wrong with reading, could be that close was called.
|
||||
return
|
||||
|
||||
if b"T:" in line:
|
||||
self._serial.timeout = 0.5
|
||||
self._serial.write(b"\n")
|
||||
self._sendCommand("M105")
|
||||
sucesfull_responses += 1
|
||||
if sucesfull_responses >= self._required_responses_auto_baud:
|
||||
self._serial.timeout = 2 #Reset serial timeout
|
||||
self.setIsConnected(True)
|
||||
Logger.log('i', "Established printer connection on port %s" % self._serial_port)
|
||||
return
|
||||
self.close() # Unable to connect, wrap up.
|
||||
self.setIsConnected(False)
|
||||
|
||||
## Set the baud rate of the serial. This can cause exceptions, but we simply want to ignore those.
|
||||
def setBaudRate(self, baud_rate):
|
||||
try:
|
||||
self._serial.baudrate = baud_rate
|
||||
return True
|
||||
except Exception as e:
|
||||
return False
|
||||
|
||||
def setIsConnected(self, state):
|
||||
self._is_connecting = False
|
||||
if self._is_connected != state:
|
||||
self._is_connected = state
|
||||
self.connectionStateChanged.emit(self._serial_port)
|
||||
if self._is_connected:
|
||||
self._listen_thread.start() #Start listening
|
||||
'''Application.getInstance().addOutputDevice(self._serial_port, {
|
||||
'id': self._serial_port,
|
||||
'function': self.printGCode,
|
||||
'description': 'Print with USB {0}'.format(self._serial_port),
|
||||
'icon': 'print_usb',
|
||||
'priority': 1
|
||||
})'''
|
||||
|
||||
else:
|
||||
Logger.log('w', "Printer connection state was not changed")
|
||||
|
||||
connectionStateChanged = Signal()
|
||||
|
||||
## Close the printer connection
|
||||
def close(self):
|
||||
if self._connect_thread.isAlive():
|
||||
self._connect_thread.join()
|
||||
if self._serial is not None:
|
||||
self.setIsConnected(False)
|
||||
self._listen_thread.join()
|
||||
self._serial.close()
|
||||
|
||||
self._serial = None
|
||||
|
||||
def isConnected(self):
|
||||
return self._is_connected
|
||||
|
||||
## Directly send the command, withouth checking connection state (eg; printing).
|
||||
# \param cmd string with g-code
|
||||
def _sendCommand(self, cmd):
|
||||
if self._serial is None:
|
||||
return
|
||||
if 'M109' in cmd or 'M190' in cmd:
|
||||
self._heatup_wait_start_time = time.time()
|
||||
if 'M104' in cmd or 'M109' in cmd:
|
||||
try:
|
||||
t = 0
|
||||
if 'T' in cmd:
|
||||
t = int(re.search('T([0-9]+)', cmd).group(1))
|
||||
self._target_extruder_temperatures[t] = float(re.search('S([0-9]+)', cmd).group(1))
|
||||
except:
|
||||
pass
|
||||
if 'M140' in cmd or 'M190' in cmd:
|
||||
try:
|
||||
self._target_bed_temperature = float(re.search('S([0-9]+)', cmd).group(1))
|
||||
except:
|
||||
pass
|
||||
#Logger.log('i','Sending: %s' % (cmd))
|
||||
try:
|
||||
command = (cmd + '\n').encode()
|
||||
#self._serial.write(b'\n')
|
||||
self._serial.write(command)
|
||||
except serial.SerialTimeoutException:
|
||||
Logger.log("w","Serial timeout while writing to serial port, trying again.")
|
||||
try:
|
||||
time.sleep(0.5)
|
||||
self._serial.write((cmd + '\n').encode())
|
||||
except Exception as e:
|
||||
Logger.log("e","Unexpected error while writing serial port %s " % e)
|
||||
self._setErrorState("Unexpected error while writing serial port %s " % e)
|
||||
self.close()
|
||||
except Exception as e:
|
||||
Logger.log('e',"Unexpected error while writing serial port %s" % e)
|
||||
self._setErrorState("Unexpected error while writing serial port %s " % e)
|
||||
self.close()
|
||||
|
||||
## Ensure that close gets called when object is destroyed
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
## Send a command to printer.
|
||||
# \param cmd string with g-code
|
||||
def sendCommand(self, cmd):
|
||||
if self.isPrinting():
|
||||
self._command_queue.put(cmd)
|
||||
elif self.isConnected():
|
||||
self._sendCommand(cmd)
|
||||
|
||||
## Set the error state with a message.
|
||||
# \param error String with the error message.
|
||||
def _setErrorState(self, error):
|
||||
self._error_state = error
|
||||
self.onError.emit(error)
|
||||
|
||||
onError = Signal()
|
||||
|
||||
## Private function to set the temperature of an extruder
|
||||
# \param index index of the extruder
|
||||
# \param temperature recieved temperature
|
||||
def _setExtruderTemperature(self, index, temperature):
|
||||
try:
|
||||
self._extruder_temperatures[index] = temperature
|
||||
self.onExtruderTemperatureChange.emit(self._serial_port, index, temperature)
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
onExtruderTemperatureChange = Signal()
|
||||
|
||||
## Private function to set the temperature of the bed.
|
||||
# As all printers (as of time of writing) only support a single heated bed,
|
||||
# these are not indexed as with extruders.
|
||||
def _setBedTemperature(self, temperature):
|
||||
self._bed_temperature = temperature
|
||||
self.onBedTemperatureChange.emit(self._serial_port,temperature)
|
||||
|
||||
onBedTemperatureChange = Signal()
|
||||
|
||||
|
||||
## Listen thread function.
|
||||
def _listen(self):
|
||||
Logger.log('i', "Printer connection listen thread started for %s" % self._serial_port)
|
||||
temperature_request_timeout = time.time()
|
||||
ok_timeout = time.time()
|
||||
while self._is_connected:
|
||||
line = self._readline()
|
||||
|
||||
if line is None:
|
||||
break # None is only returned when something went wrong. Stop listening
|
||||
|
||||
if line.startswith(b'Error:'):
|
||||
# Oh YEAH, consistency.
|
||||
# Marlin reports an MIN/MAX temp error as "Error:x\n: Extruder switched off. MAXTEMP triggered !\n"
|
||||
# But a bed temp error is reported as "Error: Temperature heated bed switched off. MAXTEMP triggered !!"
|
||||
# So we can have an extra newline in the most common case. Awesome work people.
|
||||
if re.match(b'Error:[0-9]\n', line):
|
||||
line = line.rstrip() + self._readline()
|
||||
|
||||
# Skip the communication errors, as those get corrected.
|
||||
if b'Extruder switched off' in line or b'Temperature heated bed switched off' in line or b'Something is wrong, please turn off the printer.' in line:
|
||||
if not self.hasError():
|
||||
self._setErrorState(line[6:])
|
||||
elif b' T:' in line or line.startswith(b'T:'): #Temperature message
|
||||
try:
|
||||
self._setExtruderTemperature(self._temperature_requested_extruder_index,float(re.search(b"T: *([0-9\.]*)", line).group(1)))
|
||||
except:
|
||||
pass
|
||||
if b'B:' in line: # Check if it's a bed temperature
|
||||
try:
|
||||
self._setBedTemperature(float(re.search(b"B: *([0-9\.]*)", line).group(1)))
|
||||
except Exception as e:
|
||||
pass
|
||||
#TODO: temperature changed callback
|
||||
|
||||
if self._is_printing:
|
||||
if time.time() > temperature_request_timeout: # When printing, request temperature every 5 seconds.
|
||||
if self._extruder_count > 0:
|
||||
self._temperature_requested_extruder_index = (self._temperature_requested_extruder_index + 1) % self._extruder_count
|
||||
self.sendCommand("M105 T%d" % (self._temperature_requested_extruder_index))
|
||||
else:
|
||||
self.sendCommand("M105")
|
||||
temperature_request_timeout = time.time() + 5
|
||||
|
||||
if line == b'' and time.time() > ok_timeout:
|
||||
line = b'ok' # Force a timeout (basicly, send next command)
|
||||
|
||||
if b'ok' in line:
|
||||
ok_timeout = time.time() + 5
|
||||
if not self._command_queue.empty():
|
||||
self._sendCommand(self._command_queue.get())
|
||||
else:
|
||||
self._sendNextGcodeLine()
|
||||
elif b"resend" in line.lower() or b"rs" in line: # Because a resend can be asked with 'resend' and 'rs'
|
||||
try:
|
||||
self._gcode_position = int(line.replace(b"N:",b" ").replace(b"N",b" ").replace(b":",b" ").split()[-1])
|
||||
except:
|
||||
if b"rs" in line:
|
||||
self._gcode_position = int(line.split()[1])
|
||||
|
||||
else: # Request the temperature on comm timeout (every 2 seconds) when we are not printing.)
|
||||
if line == b'':
|
||||
if self._extruder_count > 0:
|
||||
self._temperature_requested_extruder_index = (self._temperature_requested_extruder_index + 1) % self._extruder_count
|
||||
self.sendCommand("M105 T%d" % self._temperature_requested_extruder_index)
|
||||
else:
|
||||
self.sendCommand("M105")
|
||||
Logger.log('i', "Printer connection listen thread stopped for %s" % self._serial_port)
|
||||
|
||||
## Send next Gcode in the gcode list
|
||||
def _sendNextGcodeLine(self):
|
||||
if self._gcode_position >= len(self._gcode):
|
||||
return
|
||||
if self._gcode_position == 100:
|
||||
self._print_start_time_100 = time.time()
|
||||
line = self._gcode[self._gcode_position]
|
||||
|
||||
if ';' in line:
|
||||
line = line[:line.find(';')]
|
||||
line = line.strip()
|
||||
try:
|
||||
if line == 'M0' or line == 'M1':
|
||||
line = 'M105' #Don't send the M0 or M1 to the machine, as M0 and M1 are handled as an LCD menu pause.
|
||||
if ('G0' in line or 'G1' in line) and 'Z' in line:
|
||||
z = float(re.search('Z([0-9\.]*)', line).group(1))
|
||||
if self._current_z != z:
|
||||
self._current_z = z
|
||||
except Exception as e:
|
||||
Logger.log('e', "Unexpected error with printer connection: %s" % e)
|
||||
self._setErrorState("Unexpected error: %s" %e)
|
||||
checksum = functools.reduce(lambda x,y: x^y, map(ord, 'N%d%s' % (self._gcode_position, line)))
|
||||
|
||||
self._sendCommand("N%d%s*%d" % (self._gcode_position, line, checksum))
|
||||
self._gcode_position += 1
|
||||
self.setProgress(( self._gcode_position / len(self._gcode)) * 100)
|
||||
self.progressChanged.emit(self._progress, self._serial_port)
|
||||
|
||||
progressChanged = Signal()
|
||||
|
||||
## Set the progress of the print.
|
||||
# It will be normalized (based on max_progress) to range 0 - 100
|
||||
def setProgress(self, progress, max_progress = 100):
|
||||
self._progress = progress / max_progress * 100 #Convert to scale of 0-100
|
||||
self.progressChanged.emit(self._progress, self._serial_port)
|
||||
|
||||
## Cancel the current print. Printer connection wil continue to listen.
|
||||
def cancelPrint(self):
|
||||
self._gcode_position = 0
|
||||
self.setProgress(0)
|
||||
self._gcode = []
|
||||
|
||||
# Turn of temperatures
|
||||
self._sendCommand("M140 S0")
|
||||
self._sendCommand("M109 S0")
|
||||
self._is_printing = False
|
||||
|
||||
## Check if the process did not encounter an error yet.
|
||||
def hasError(self):
|
||||
return self._error_state != None
|
||||
|
||||
## private read line used by printer connection to listen for data on serial port.
|
||||
def _readline(self):
|
||||
if self._serial is None:
|
||||
return None
|
||||
try:
|
||||
ret = self._serial.readline()
|
||||
except Exception as e:
|
||||
Logger.log('e',"Unexpected error while reading serial port. %s" %e)
|
||||
self._setErrorState("Printer has been disconnected")
|
||||
self.close()
|
||||
return None
|
||||
return ret
|
||||
|
||||
## Create a list of baud rates at which we can communicate.
|
||||
# \return list of int
|
||||
def _getBaudrateList(self):
|
||||
ret = [250000, 230400, 115200, 57600, 38400, 19200, 9600]
|
||||
return ret
|
274
plugins/USBPrinting/USBPrinterManager.py
Normal file
274
plugins/USBPrinting/USBPrinterManager.py
Normal file
|
@ -0,0 +1,274 @@
|
|||
from UM.Signal import Signal, SignalEmitter
|
||||
from . import PrinterConnection
|
||||
from UM.Application import Application
|
||||
from UM.Scene.Iterator.DepthFirstIterator import DepthFirstIterator
|
||||
from UM.Scene.SceneNode import SceneNode
|
||||
from UM.Resources import Resources
|
||||
import threading
|
||||
import platform
|
||||
import glob
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
from UM.Extension import Extension
|
||||
|
||||
from PyQt5.QtQuick import QQuickView
|
||||
from PyQt5.QtCore import QUrl, QObject, pyqtSlot, pyqtProperty, pyqtSignal
|
||||
|
||||
from UM.i18n import i18nCatalog
|
||||
|
||||
i18n_catalog = i18nCatalog('plugins')
|
||||
|
||||
|
||||
class USBPrinterManager(QObject, SignalEmitter, Extension):
|
||||
def __init__(self, parent = None):
|
||||
super().__init__(parent)
|
||||
self._serial_port_list = []
|
||||
self._printer_connections = []
|
||||
self._check_ports_thread = threading.Thread(target = self._updateConnectionList)
|
||||
self._check_ports_thread.daemon = True
|
||||
self._check_ports_thread.start()
|
||||
|
||||
self._progress = 0
|
||||
|
||||
self._control_view = None
|
||||
self._firmware_view = None
|
||||
self._extruder_temp = 0
|
||||
self._bed_temp = 0
|
||||
self._error_message = ""
|
||||
|
||||
## Add menu item to top menu of the application.
|
||||
self.addMenuItem(i18n_catalog.i18n("Update firmware"), self.updateAllFirmware)
|
||||
|
||||
pyqtError = pyqtSignal(str, arguments = ['amount'])
|
||||
processingProgress = pyqtSignal(float, arguments = ['amount'])
|
||||
pyqtExtruderTemperature = pyqtSignal(float, arguments = ['amount'])
|
||||
pyqtBedTemperature = pyqtSignal(float, arguments = ['amount'])
|
||||
|
||||
## Show firmware interface.
|
||||
# This will create the view if its not already created.
|
||||
def spawnFirmwareInterface(self, serial_port):
|
||||
if self._firmware_view is None:
|
||||
self._firmware_view = QQuickView()
|
||||
self._firmware_view.engine().rootContext().setContextProperty('manager',self)
|
||||
self._firmware_view.setSource(QUrl("plugins/USBPrinting/FirmwareUpdateWindow.qml"))
|
||||
self._firmware_view.show()
|
||||
|
||||
## Show control interface.
|
||||
# This will create the view if its not already created.
|
||||
def spawnControlInterface(self,serial_port):
|
||||
if self._control_view is None:
|
||||
self._control_view = QQuickView()
|
||||
self._control_view.engine().rootContext().setContextProperty('manager',self)
|
||||
self._control_view.setSource(QUrl("plugins/USBPrinting/ControlWindow.qml"))
|
||||
self._control_view.show()
|
||||
|
||||
@pyqtProperty(float,notify = processingProgress)
|
||||
def progress(self):
|
||||
return self._progress
|
||||
|
||||
@pyqtProperty(float,notify = pyqtExtruderTemperature)
|
||||
def extruderTemperature(self):
|
||||
return self._extruder_temp
|
||||
|
||||
@pyqtProperty(float,notify = pyqtBedTemperature)
|
||||
def bedTemperature(self):
|
||||
return self._bed_temp
|
||||
|
||||
@pyqtProperty(str,notify = pyqtError)
|
||||
def error(self):
|
||||
return self._error_message
|
||||
|
||||
## Check all serial ports and create a PrinterConnection object for them.
|
||||
# Note that this does not validate if the serial ports are actually usable!
|
||||
# This (the validation) is only done when the connect function is called.
|
||||
def _updateConnectionList(self):
|
||||
while True:
|
||||
temp_serial_port_list = self.getSerialPortList(only_list_usb = True)
|
||||
if temp_serial_port_list != self._serial_port_list: # Something changed about the list since we last changed something.
|
||||
disconnected_ports = [port for port in self._serial_port_list if port not in temp_serial_port_list ]
|
||||
self._serial_port_list = temp_serial_port_list
|
||||
for serial_port in self._serial_port_list:
|
||||
if self.getConnectionByPort(serial_port) is None: # If it doesn't already exist, add it
|
||||
if not os.path.islink(serial_port): # Only add the connection if it's a non symbolic link
|
||||
connection = PrinterConnection.PrinterConnection(serial_port)
|
||||
connection.connect()
|
||||
connection.connectionStateChanged.connect(self.serialConectionStateCallback)
|
||||
connection.progressChanged.connect(self.onProgress)
|
||||
connection.onExtruderTemperatureChange.connect(self.onExtruderTemperature)
|
||||
connection.onBedTemperatureChange.connect(self.onBedTemperature)
|
||||
connection.onError.connect(self.onError)
|
||||
self._printer_connections.append(connection)
|
||||
|
||||
for serial_port in disconnected_ports: # Close connections and remove them from list.
|
||||
connection = self.getConnectionByPort(serial_port)
|
||||
if connection != None:
|
||||
self._printer_connections.remove(connection)
|
||||
connection.close()
|
||||
time.sleep(5) # Throttle, as we don't need this information to be updated every single second.
|
||||
|
||||
def updateAllFirmware(self):
|
||||
self.spawnFirmwareInterface("")
|
||||
for printer_connection in self._printer_connections:
|
||||
printer_connection.updateFirmware(Resources.getPath(Resources.FirmwareLocation, self._getDefaultFirmwareName()))
|
||||
|
||||
def updateFirmwareBySerial(self, serial_port):
|
||||
printer_connection = self.getConnectionByPort(serial_port)
|
||||
if printer_connection is not None:
|
||||
self.spawnFirmwareInterface(printer_connection.getSerialPort())
|
||||
printer_connection.updateFirmware(Resources.getPath(Resources.FirmwareLocation, self._getDefaultFirmwareName()))
|
||||
|
||||
def _getDefaultFirmwareName(self):
|
||||
machine_type = Application.getInstance().getActiveMachine().getTypeID()
|
||||
firmware_name = ""
|
||||
baudrate = 250000
|
||||
if sys.platform.startswith('linux'):
|
||||
baudrate = 115200
|
||||
if machine_type == "ultimaker_original":
|
||||
firmware_name = 'MarlinUltimaker'
|
||||
firmware_name += '-%d' % (baudrate)
|
||||
elif machine_type == "ultimaker_original_plus":
|
||||
firmware_name = 'MarlinUltimaker-UMOP-%d' % (baudrate)
|
||||
elif machine_type == "Witbox":
|
||||
return "MarlinWitbox.hex"
|
||||
elif machine_type == "ultimaker2go":
|
||||
return "MarlinUltimaker2go.hex"
|
||||
elif machine_type == "ultimaker2extended":
|
||||
return "MarlinUltimaker2extended.hex"
|
||||
elif machine_type == "ultimaker2":
|
||||
return "MarlinUltimaker2.hex"
|
||||
|
||||
##TODO: Add check for multiple extruders
|
||||
|
||||
if firmware_name != "":
|
||||
firmware_name += ".hex"
|
||||
return firmware_name
|
||||
|
||||
## Callback for extruder temperature change
|
||||
def onExtruderTemperature(self, serial_port, index, temperature):
|
||||
self._extruder_temp = temperature
|
||||
self.pyqtExtruderTemperature.emit(temperature)
|
||||
|
||||
## Callback for bed temperature change
|
||||
def onBedTemperature(self, serial_port,temperature):
|
||||
self._bed_temperature = temperature
|
||||
self.pyqtBedTemperature.emit(temperature)
|
||||
|
||||
## Callback for error
|
||||
def onError(self, error):
|
||||
self._error_message = error
|
||||
self.pyqtError.emit(error)
|
||||
|
||||
## Callback for progress change
|
||||
def onProgress(self, progress, serial_port):
|
||||
self._progress = progress
|
||||
self.processingProgress.emit(progress)
|
||||
|
||||
## Attempt to connect with all possible connections.
|
||||
def connectAllConnections(self):
|
||||
for connection in self._printer_connections:
|
||||
connection.connect()
|
||||
|
||||
## Send gcode to printer and start printing
|
||||
def sendGCodeByPort(self, serial_port, gcode_list):
|
||||
printer_connection = self.getConnectionByPort(serial_port)
|
||||
if printer_connection is not None:
|
||||
printer_connection.printGCode(gcode_list)
|
||||
return True
|
||||
return False
|
||||
|
||||
@pyqtSlot()
|
||||
def cancelPrint(self):
|
||||
for printer_connection in self.getActiveConnections():
|
||||
printer_connection.cancelPrint()
|
||||
|
||||
## Send gcode to all active printers.
|
||||
# \return True if there was at least one active connection.
|
||||
def sendGCodeToAllActive(self, gcode_list):
|
||||
for printer_connection in self.getActiveConnections():
|
||||
printer_connection.printGCode(gcode_list)
|
||||
if len(self.getActiveConnections()):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
## Send a command to a printer indentified by port
|
||||
# \param serial port String indentifieing the port
|
||||
# \param command String with the g-code command to send.
|
||||
# \return True if connection was found, false otherwise
|
||||
def sendCommandByPort(self, serial_port, command):
|
||||
printer_connection = self.getConnectionByPort(serial_port)
|
||||
if printer_connection is not None:
|
||||
printer_connection.sendCommand(command)
|
||||
return True
|
||||
return False
|
||||
|
||||
## Send a command to all active (eg; connected) printers
|
||||
# \param command String with the g-code command to send.
|
||||
# \return True if at least one connection was found, false otherwise
|
||||
def sendCommandToAllActive(self, command):
|
||||
for printer_connection in self.getActiveConnections():
|
||||
printer_connection.sendCommand(command)
|
||||
if len(self.getActiveConnections()):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
## Callback if the connection state of a connection is changed.
|
||||
# This adds or removes the connection as a possible output device.
|
||||
def serialConectionStateCallback(self, serial_port):
|
||||
connection = self.getConnectionByPort(serial_port)
|
||||
if connection.isConnected():
|
||||
Application.getInstance().addOutputDevice(serial_port, {
|
||||
'id': serial_port,
|
||||
'function': self.spawnControlInterface,
|
||||
'description': 'Write to USB {0}'.format(serial_port),
|
||||
'icon': 'print_usb',
|
||||
'priority': 1
|
||||
})
|
||||
else:
|
||||
Application.getInstance().removeOutputDevice(serial_port)
|
||||
|
||||
@pyqtSlot()
|
||||
def startPrint(self):
|
||||
gcode_list = getattr(Application.getInstance().getController().getScene(), 'gcode_list', None)
|
||||
if gcode_list:
|
||||
final_list = []
|
||||
for gcode in gcode_list:
|
||||
final_list += gcode.split('\n')
|
||||
self.sendGCodeToAllActive(gcode_list)
|
||||
|
||||
## Get a list of printer connection objects that are connected.
|
||||
def getActiveConnections(self):
|
||||
return [connection for connection in self._printer_connections if connection.isConnected()]
|
||||
|
||||
## Get a printer connection object by serial port
|
||||
def getConnectionByPort(self, serial_port):
|
||||
for printer_connection in self._printer_connections:
|
||||
if serial_port == printer_connection.getSerialPort():
|
||||
return printer_connection
|
||||
return None
|
||||
|
||||
## Create a list of serial ports on the system.
|
||||
# \param only_list_usb If true, only usb ports are listed
|
||||
def getSerialPortList(self,only_list_usb=False):
|
||||
base_list = []
|
||||
if platform.system() == "Windows":
|
||||
import winreg
|
||||
try:
|
||||
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,"HARDWARE\\DEVICEMAP\\SERIALCOMM")
|
||||
i = 0
|
||||
while True:
|
||||
values = winreg.EnumValue(key, i)
|
||||
if not base_list or 'USBSER' in values[0]:
|
||||
base_list += [values[1]]
|
||||
i += 1
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
if base_list:
|
||||
base_list = base_list + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') + glob.glob("/dev/cu.usb*")
|
||||
base_list = filter(lambda s: 'Bluetooth' not in s, base_list) # Filter because mac sometimes puts them in the list
|
||||
else:
|
||||
base_list = base_list + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') + glob.glob("/dev/cu.*") + glob.glob("/dev/tty.usb*") + glob.glob("/dev/rfcomm*") + glob.glob('/dev/serial/by-id/*')
|
||||
return base_list
|
19
plugins/USBPrinting/__init__.py
Normal file
19
plugins/USBPrinting/__init__.py
Normal file
|
@ -0,0 +1,19 @@
|
|||
from . import USBPrinterManager
|
||||
|
||||
from UM.i18n import i18nCatalog
|
||||
|
||||
i18n_catalog = i18nCatalog('plugins')
|
||||
|
||||
def getMetaData():
|
||||
return {
|
||||
'type': 'extension',
|
||||
'plugin': {
|
||||
'name': 'USB printing',
|
||||
'author': 'Jaime van Kessel',
|
||||
'version': '1.0',
|
||||
'description': i18n_catalog.i18nc('usb printing description','Accepts G-Code and sends them to a printer. Plugin can also update firmware')
|
||||
}
|
||||
}
|
||||
|
||||
def register(app):
|
||||
return {"extension":USBPrinterManager.USBPrinterManager()}
|
0
plugins/USBPrinting/avr_isp/__init__.py
Normal file
0
plugins/USBPrinting/avr_isp/__init__.py
Normal file
25
plugins/USBPrinting/avr_isp/chipDB.py
Normal file
25
plugins/USBPrinting/avr_isp/chipDB.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
"""
|
||||
Database of AVR chips for avr_isp programming. Contains signatures and flash sizes from the AVR datasheets.
|
||||
To support more chips add the relevant data to the avrChipDB list.
|
||||
This is a python 3 conversion of the code created by David Braam for the Cura project.
|
||||
"""
|
||||
|
||||
avrChipDB = {
|
||||
'ATMega1280': {
|
||||
'signature': [0x1E, 0x97, 0x03],
|
||||
'pageSize': 128,
|
||||
'pageCount': 512,
|
||||
},
|
||||
'ATMega2560': {
|
||||
'signature': [0x1E, 0x98, 0x01],
|
||||
'pageSize': 128,
|
||||
'pageCount': 1024,
|
||||
},
|
||||
}
|
||||
|
||||
def getChipFromDB(sig):
|
||||
for chip in avrChipDB.values():
|
||||
if chip['signature'] == sig:
|
||||
return chip
|
||||
return False
|
||||
|
46
plugins/USBPrinting/avr_isp/intelHex.py
Normal file
46
plugins/USBPrinting/avr_isp/intelHex.py
Normal file
|
@ -0,0 +1,46 @@
|
|||
"""
|
||||
Module to read intel hex files into binary data blobs.
|
||||
IntelHex files are commonly used to distribute firmware
|
||||
See: http://en.wikipedia.org/wiki/Intel_HEX
|
||||
This is a python 3 conversion of the code created by David Braam for the Cura project.
|
||||
"""
|
||||
import io
|
||||
|
||||
def readHex(filename):
|
||||
"""
|
||||
Read an verify an intel hex file. Return the data as an list of bytes.
|
||||
"""
|
||||
data = []
|
||||
extraAddr = 0
|
||||
f = io.open(filename, "r")
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if len(line) < 1:
|
||||
continue
|
||||
if line[0] != ':':
|
||||
raise Exception("Hex file has a line not starting with ':'")
|
||||
recLen = int(line[1:3], 16)
|
||||
addr = int(line[3:7], 16) + extraAddr
|
||||
recType = int(line[7:9], 16)
|
||||
if len(line) != recLen * 2 + 11:
|
||||
raise Exception("Error in hex file: " + line)
|
||||
checkSum = 0
|
||||
for i in range(0, recLen + 5):
|
||||
checkSum += int(line[i*2+1:i*2+3], 16)
|
||||
checkSum &= 0xFF
|
||||
if checkSum != 0:
|
||||
raise Exception("Checksum error in hex file: " + line)
|
||||
|
||||
if recType == 0:#Data record
|
||||
while len(data) < addr + recLen:
|
||||
data.append(0)
|
||||
for i in range(0, recLen):
|
||||
data[addr + i] = int(line[i*2+9:i*2+11], 16)
|
||||
elif recType == 1: #End Of File record
|
||||
pass
|
||||
elif recType == 2: #Extended Segment Address Record
|
||||
extraAddr = int(line[9:13], 16) * 16
|
||||
else:
|
||||
print(recType, recLen, addr, checkSum, line)
|
||||
f.close()
|
||||
return data
|
66
plugins/USBPrinting/avr_isp/ispBase.py
Normal file
66
plugins/USBPrinting/avr_isp/ispBase.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
"""
|
||||
General interface for Isp based AVR programmers.
|
||||
The ISP AVR programmer can load firmware into AVR chips. Which are commonly used on 3D printers.
|
||||
|
||||
Needs to be subclassed to support different programmers.
|
||||
Currently only the stk500v2 subclass exists.
|
||||
This is a python 3 conversion of the code created by David Braam for the Cura project.
|
||||
"""
|
||||
|
||||
from . import chipDB
|
||||
|
||||
class IspBase():
|
||||
"""
|
||||
Base class for ISP based AVR programmers.
|
||||
Functions in this class raise an IspError when something goes wrong.
|
||||
"""
|
||||
def programChip(self, flashData):
|
||||
""" Program a chip with the given flash data. """
|
||||
self.curExtAddr = -1
|
||||
self.chip = chipDB.getChipFromDB(self.getSignature())
|
||||
if not self.chip:
|
||||
raise IspError("Chip with signature: " + str(self.getSignature()) + "not found")
|
||||
self.chipErase()
|
||||
|
||||
print("Flashing %i bytes" % len(flashData))
|
||||
self.writeFlash(flashData)
|
||||
print("Verifying %i bytes" % len(flashData))
|
||||
self.verifyFlash(flashData)
|
||||
print("Completed")
|
||||
|
||||
def getSignature(self):
|
||||
"""
|
||||
Get the AVR signature from the chip. This is a 3 byte array which describes which chip we are connected to.
|
||||
This is important to verify that we are programming the correct type of chip and that we use proper flash block sizes.
|
||||
"""
|
||||
sig = []
|
||||
sig.append(self.sendISP([0x30, 0x00, 0x00, 0x00])[3])
|
||||
sig.append(self.sendISP([0x30, 0x00, 0x01, 0x00])[3])
|
||||
sig.append(self.sendISP([0x30, 0x00, 0x02, 0x00])[3])
|
||||
return sig
|
||||
|
||||
def chipErase(self):
|
||||
"""
|
||||
Do a full chip erase, clears all data, and lockbits.
|
||||
"""
|
||||
self.sendISP([0xAC, 0x80, 0x00, 0x00])
|
||||
|
||||
def writeFlash(self, flashData):
|
||||
"""
|
||||
Write the flash data, needs to be implemented in a subclass.
|
||||
"""
|
||||
raise IspError("Called undefined writeFlash")
|
||||
|
||||
def verifyFlash(self, flashData):
|
||||
"""
|
||||
Verify the flash data, needs to be implemented in a subclass.
|
||||
"""
|
||||
raise IspError("Called undefined verifyFlash")
|
||||
|
||||
|
||||
class IspError(BaseException):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return repr(self.value)
|
220
plugins/USBPrinting/avr_isp/stk500v2.py
Normal file
220
plugins/USBPrinting/avr_isp/stk500v2.py
Normal file
|
@ -0,0 +1,220 @@
|
|||
"""
|
||||
STK500v2 protocol implementation for programming AVR chips.
|
||||
The STK500v2 protocol is used by the ArduinoMega2560 and a few other Arduino platforms to load firmware.
|
||||
This is a python 3 conversion of the code created by David Braam for the Cura project.
|
||||
"""
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
|
||||
from serial import Serial
|
||||
from serial import SerialException
|
||||
from serial import SerialTimeoutException
|
||||
|
||||
from . import ispBase, intelHex
|
||||
|
||||
|
||||
class Stk500v2(ispBase.IspBase):
|
||||
def __init__(self):
|
||||
self.serial = None
|
||||
self.seq = 1
|
||||
self.lastAddr = -1
|
||||
self.progressCallback = None
|
||||
|
||||
def connect(self, port = 'COM22', speed = 115200):
|
||||
if self.serial is not None:
|
||||
self.close()
|
||||
try:
|
||||
self.serial = Serial(str(port), speed, timeout=1, writeTimeout=10000)
|
||||
except SerialException as e:
|
||||
raise ispBase.IspError("Failed to open serial port")
|
||||
except:
|
||||
raise ispBase.IspError("Unexpected error while connecting to serial port:" + port + ":" + str(sys.exc_info()[0]))
|
||||
self.seq = 1
|
||||
|
||||
#Reset the controller
|
||||
for n in range(0, 2):
|
||||
self.serial.setDTR(True)
|
||||
time.sleep(0.1)
|
||||
self.serial.setDTR(False)
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.2)
|
||||
|
||||
self.serial.flushInput()
|
||||
self.serial.flushOutput()
|
||||
if self.sendMessage([0x10, 0xc8, 0x64, 0x19, 0x20, 0x00, 0x53, 0x03, 0xac, 0x53, 0x00, 0x00]) != [0x10, 0x00]:
|
||||
self.close()
|
||||
raise ispBase.IspError("Failed to enter programming mode")
|
||||
|
||||
self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00])
|
||||
if self.sendMessage([0xEE])[1] == 0x00:
|
||||
self._has_checksum = True
|
||||
else:
|
||||
self._has_checksum = False
|
||||
self.serial.timeout = 5
|
||||
|
||||
def close(self):
|
||||
if self.serial is not None:
|
||||
self.serial.close()
|
||||
self.serial = None
|
||||
|
||||
#Leave ISP does not reset the serial port, only resets the device, and returns the serial port after disconnecting it from the programming interface.
|
||||
# This allows you to use the serial port without opening it again.
|
||||
def leaveISP(self):
|
||||
if self.serial is not None:
|
||||
if self.sendMessage([0x11]) != [0x11, 0x00]:
|
||||
raise ispBase.IspError("Failed to leave programming mode")
|
||||
ret = self.serial
|
||||
self.serial = None
|
||||
return ret
|
||||
return None
|
||||
|
||||
def isConnected(self):
|
||||
return self.serial is not None
|
||||
|
||||
def hasChecksumFunction(self):
|
||||
return self._has_checksum
|
||||
|
||||
def sendISP(self, data):
|
||||
recv = self.sendMessage([0x1D, 4, 4, 0, data[0], data[1], data[2], data[3]])
|
||||
return recv[2:6]
|
||||
|
||||
def writeFlash(self, flash_data):
|
||||
#Set load addr to 0, in case we have more then 64k flash we need to enable the address extension
|
||||
page_size = self.chip['pageSize'] * 2
|
||||
flashSize = page_size * self.chip['pageCount']
|
||||
print("Writing flash")
|
||||
if flashSize > 0xFFFF:
|
||||
self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00])
|
||||
else:
|
||||
self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00])
|
||||
load_count = (len(flash_data) + page_size - 1) / page_size
|
||||
for i in range(0, int(load_count)):
|
||||
recv = self.sendMessage([0x13, page_size >> 8, page_size & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flash_data[(i * page_size):(i * page_size + page_size)])
|
||||
if self.progressCallback is not None:
|
||||
if self._has_checksum:
|
||||
self.progressCallback(i + 1, load_count)
|
||||
else:
|
||||
self.progressCallback(i + 1, load_count*2)
|
||||
|
||||
def verifyFlash(self, flashData):
|
||||
if self._has_checksum:
|
||||
self.sendMessage([0x06, 0x00, (len(flashData) >> 17) & 0xFF, (len(flashData) >> 9) & 0xFF, (len(flashData) >> 1) & 0xFF])
|
||||
res = self.sendMessage([0xEE])
|
||||
checksum_recv = res[2] | (res[3] << 8)
|
||||
checksum = 0
|
||||
for d in flashData:
|
||||
checksum += d
|
||||
checksum &= 0xFFFF
|
||||
if hex(checksum) != hex(checksum_recv):
|
||||
raise ispBase.IspError('Verify checksum mismatch: 0x%x != 0x%x' % (checksum & 0xFFFF, checksum_recv))
|
||||
else:
|
||||
#Set load addr to 0, in case we have more then 64k flash we need to enable the address extension
|
||||
flashSize = self.chip['pageSize'] * 2 * self.chip['pageCount']
|
||||
if flashSize > 0xFFFF:
|
||||
self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00])
|
||||
else:
|
||||
self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00])
|
||||
|
||||
loadCount = (len(flashData) + 0xFF) / 0x100
|
||||
for i in range(0, int(loadCount)):
|
||||
recv = self.sendMessage([0x14, 0x01, 0x00, 0x20])[2:0x102]
|
||||
if self.progressCallback is not None:
|
||||
self.progressCallback(loadCount + i + 1, loadCount*2)
|
||||
for j in range(0, 0x100):
|
||||
if i * 0x100 + j < len(flashData) and flashData[i * 0x100 + j] != recv[j]:
|
||||
raise ispBase.IspError('Verify error at: 0x%x' % (i * 0x100 + j))
|
||||
|
||||
def sendMessage(self, data):
|
||||
message = struct.pack(">BBHB", 0x1B, self.seq, len(data), 0x0E)
|
||||
for c in data:
|
||||
message += struct.pack(">B", c)
|
||||
checksum = 0
|
||||
for c in message:
|
||||
checksum ^= c
|
||||
message += struct.pack(">B", checksum)
|
||||
try:
|
||||
self.serial.write(message)
|
||||
self.serial.flush()
|
||||
except SerialTimeoutException:
|
||||
raise ispBase.IspError('Serial send timeout')
|
||||
self.seq = (self.seq + 1) & 0xFF
|
||||
return self.recvMessage()
|
||||
|
||||
def recvMessage(self):
|
||||
state = 'Start'
|
||||
checksum = 0
|
||||
while True:
|
||||
s = self.serial.read()
|
||||
if len(s) < 1:
|
||||
raise ispBase.IspError("Timeout")
|
||||
b = struct.unpack(">B", s)[0]
|
||||
checksum ^= b
|
||||
#print(hex(b))
|
||||
if state == 'Start':
|
||||
if b == 0x1B:
|
||||
state = 'GetSeq'
|
||||
checksum = 0x1B
|
||||
elif state == 'GetSeq':
|
||||
state = 'MsgSize1'
|
||||
elif state == 'MsgSize1':
|
||||
msgSize = b << 8
|
||||
state = 'MsgSize2'
|
||||
elif state == 'MsgSize2':
|
||||
msgSize |= b
|
||||
state = 'Token'
|
||||
elif state == 'Token':
|
||||
if b != 0x0E:
|
||||
state = 'Start'
|
||||
else:
|
||||
state = 'Data'
|
||||
data = []
|
||||
elif state == 'Data':
|
||||
data.append(b)
|
||||
if len(data) == msgSize:
|
||||
state = 'Checksum'
|
||||
elif state == 'Checksum':
|
||||
if checksum != 0:
|
||||
state = 'Start'
|
||||
else:
|
||||
return data
|
||||
|
||||
def portList():
|
||||
ret = []
|
||||
import _winreg
|
||||
key=_winreg.OpenKey(_winreg.HKEY_LOCAL_MACHINE,"HARDWARE\\DEVICEMAP\\SERIALCOMM")
|
||||
i=0
|
||||
while True:
|
||||
try:
|
||||
values = _winreg.EnumValue(key, i)
|
||||
except:
|
||||
return ret
|
||||
if 'USBSER' in values[0]:
|
||||
ret.append(values[1])
|
||||
i+=1
|
||||
return ret
|
||||
|
||||
def runProgrammer(port, filename):
|
||||
""" Run an STK500v2 program on serial port 'port' and write 'filename' into flash. """
|
||||
programmer = Stk500v2()
|
||||
programmer.connect(port = port)
|
||||
programmer.programChip(intelHex.readHex(filename))
|
||||
programmer.close()
|
||||
|
||||
def main():
|
||||
""" Entry point to call the stk500v2 programmer from the commandline. """
|
||||
import threading
|
||||
if sys.argv[1] == 'AUTO':
|
||||
print(portList())
|
||||
for port in portList():
|
||||
threading.Thread(target=runProgrammer, args=(port,sys.argv[2])).start()
|
||||
time.sleep(5)
|
||||
else:
|
||||
programmer = Stk500v2()
|
||||
programmer.connect(port = sys.argv[1])
|
||||
programmer.programChip(intelHex.readHex(sys.argv[2]))
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Add table
Add a link
Reference in a new issue