From 3fa02d3710f7cc7e82752d8b394b5242ee16c742 Mon Sep 17 00:00:00 2001 From: Jaime van Kessel Date: Tue, 31 Mar 2015 10:40:21 +0200 Subject: [PATCH] formatting --- PrinterConnection.py | 3 +- USBPrinterManager.py | 2 +- avr_isp/chipDB.py | 30 ++-- avr_isp/intelHex.py | 76 ++++----- avr_isp/ispBase.py | 94 +++++------ avr_isp/stk500v2.py | 370 ++++++++++++++++++++++--------------------- 6 files changed, 289 insertions(+), 286 deletions(-) diff --git a/PrinterConnection.py b/PrinterConnection.py index fa421c5b02..e8343be959 100644 --- a/PrinterConnection.py +++ b/PrinterConnection.py @@ -29,7 +29,8 @@ class PrinterConnection(): def _connect(self): self._is_connecting = True - programmer.connect(serial_port) #Connect with the serial, if this succeeds, it's an arduino based usb device. + programmer = stk500v2.Stk500v2() + programmer.connect(self._serial_port) #Connect with the serial, if this succeeds, it's an arduino based usb device. try: self._serial = programmer.leaveISP() # Create new printer connection diff --git a/USBPrinterManager.py b/USBPrinterManager.py index 940a4c5dae..1e5645439b 100644 --- a/USBPrinterManager.py +++ b/USBPrinterManager.py @@ -17,7 +17,7 @@ class USBPrinterManager(SignalEmitter,PluginObject): self._check_ports_thread = threading.Thread(target=self._updateConnectionList) self._check_ports_thread.daemon = True self._check_ports_thread.start() - time.sleep(6) + time.sleep(2) self.connectAllConnections() ## Check all serial ports and create a PrinterConnection object for them. diff --git a/avr_isp/chipDB.py b/avr_isp/chipDB.py index b28906aea6..ba9cf434d5 100644 --- a/avr_isp/chipDB.py +++ b/avr_isp/chipDB.py @@ -1,25 +1,25 @@ """ Database of AVR chips for avr_isp programming. Contains signatures and flash sizes from the AVR datasheets. To support more chips add the relevant data to the avrChipDB list. +This is a python 3 conversion of the code created by David Braam for the Cura project. """ -__copyright__ = "Copyright (C) 2013 David Braam - Released under terms of the AGPLv3 License" avrChipDB = { - 'ATMega1280': { - 'signature': [0x1E, 0x97, 0x03], - 'pageSize': 128, - 'pageCount': 512, - }, - 'ATMega2560': { - 'signature': [0x1E, 0x98, 0x01], - 'pageSize': 128, - 'pageCount': 1024, - }, + 'ATMega1280': { + 'signature': [0x1E, 0x97, 0x03], + 'pageSize': 128, + 'pageCount': 512, + }, + 'ATMega2560': { + 'signature': [0x1E, 0x98, 0x01], + 'pageSize': 128, + 'pageCount': 1024, + }, } def getChipFromDB(sig): - for chip in avrChipDB.values(): - if chip['signature'] == sig: - return chip - return False + for chip in avrChipDB.values(): + if chip['signature'] == sig: + return chip + return False diff --git a/avr_isp/intelHex.py b/avr_isp/intelHex.py index 21fb46394c..75c7d3a5b2 100644 --- a/avr_isp/intelHex.py +++ b/avr_isp/intelHex.py @@ -2,45 +2,45 @@ Module to read intel hex files into binary data blobs. IntelHex files are commonly used to distribute firmware See: http://en.wikipedia.org/wiki/Intel_HEX +This is a python 3 conversion of the code created by David Braam for the Cura project. """ -__copyright__ = "Copyright (C) 2013 David Braam - Released under terms of the AGPLv3 License" import io def readHex(filename): - """ - Read an verify an intel hex file. Return the data as an list of bytes. - """ - data = [] - extraAddr = 0 - f = io.open(filename, "r") - for line in f: - line = line.strip() - if len(line) < 1: - continue - if line[0] != ':': - raise Exception("Hex file has a line not starting with ':'") - recLen = int(line[1:3], 16) - addr = int(line[3:7], 16) + extraAddr - recType = int(line[7:9], 16) - if len(line) != recLen * 2 + 11: - raise Exception("Error in hex file: " + line) - checkSum = 0 - for i in xrange(0, recLen + 5): - checkSum += int(line[i*2+1:i*2+3], 16) - checkSum &= 0xFF - if checkSum != 0: - raise Exception("Checksum error in hex file: " + line) - - if recType == 0:#Data record - while len(data) < addr + recLen: - data.append(0) - for i in xrange(0, recLen): - data[addr + i] = int(line[i*2+9:i*2+11], 16) - elif recType == 1: #End Of File record - pass - elif recType == 2: #Extended Segment Address Record - extraAddr = int(line[9:13], 16) * 16 - else: - print(recType, recLen, addr, checkSum, line) - f.close() - return data + """ + Read an verify an intel hex file. Return the data as an list of bytes. + """ + data = [] + extraAddr = 0 + f = io.open(filename, "r") + for line in f: + line = line.strip() + if len(line) < 1: + continue + if line[0] != ':': + raise Exception("Hex file has a line not starting with ':'") + recLen = int(line[1:3], 16) + addr = int(line[3:7], 16) + extraAddr + recType = int(line[7:9], 16) + if len(line) != recLen * 2 + 11: + raise Exception("Error in hex file: " + line) + checkSum = 0 + for i in range(0, recLen + 5): + checkSum += int(line[i*2+1:i*2+3], 16) + checkSum &= 0xFF + if checkSum != 0: + raise Exception("Checksum error in hex file: " + line) + + if recType == 0:#Data record + while len(data) < addr + recLen: + data.append(0) + for i in xrange(0, recLen): + data[addr + i] = int(line[i*2+9:i*2+11], 16) + elif recType == 1: #End Of File record + pass + elif recType == 2: #Extended Segment Address Record + extraAddr = int(line[9:13], 16) * 16 + else: + print(recType, recLen, addr, checkSum, line) + f.close() + return data diff --git a/avr_isp/ispBase.py b/avr_isp/ispBase.py index b404030500..16eeec8e67 100644 --- a/avr_isp/ispBase.py +++ b/avr_isp/ispBase.py @@ -4,60 +4,60 @@ The ISP AVR programmer can load firmware into AVR chips. Which are commonly used Needs to be subclassed to support different programmers. Currently only the stk500v2 subclass exists. + This is a python 3 conversion of the code created by David Braam for the Cura project. """ -__copyright__ = "Copyright (C) 2013 David Braam - Released under terms of the AGPLv3 License" from . import chipDB class IspBase(): - """ - Base class for ISP based AVR programmers. - Functions in this class raise an IspError when something goes wrong. - """ - def programChip(self, flashData): - """ Program a chip with the given flash data. """ - self.curExtAddr = -1 - self.chip = chipDB.getChipFromDB(self.getSignature()) - if not self.chip: - raise IspError("Chip with signature: " + str(self.getSignature()) + "not found") - self.chipErase() - - print("Flashing %i bytes" % len(flashData)) - self.writeFlash(flashData) - print("Verifying %i bytes" % len(flashData)) - self.verifyFlash(flashData) + """ + Base class for ISP based AVR programmers. + Functions in this class raise an IspError when something goes wrong. + """ + def programChip(self, flashData): + """ Program a chip with the given flash data. """ + self.curExtAddr = -1 + self.chip = chipDB.getChipFromDB(self.getSignature()) + if not self.chip: + raise IspError("Chip with signature: " + str(self.getSignature()) + "not found") + self.chipErase() + + print("Flashing %i bytes" % len(flashData)) + self.writeFlash(flashData) + print("Verifying %i bytes" % len(flashData)) + self.verifyFlash(flashData) - def getSignature(self): - """ - Get the AVR signature from the chip. This is a 3 byte array which describes which chip we are connected to. - This is important to verify that we are programming the correct type of chip and that we use proper flash block sizes. - """ - sig = [] - sig.append(self.sendISP([0x30, 0x00, 0x00, 0x00])[3]) - sig.append(self.sendISP([0x30, 0x00, 0x01, 0x00])[3]) - sig.append(self.sendISP([0x30, 0x00, 0x02, 0x00])[3]) - return sig - - def chipErase(self): - """ - Do a full chip erase, clears all data, and lockbits. - """ - self.sendISP([0xAC, 0x80, 0x00, 0x00]) + def getSignature(self): + """ + Get the AVR signature from the chip. This is a 3 byte array which describes which chip we are connected to. + This is important to verify that we are programming the correct type of chip and that we use proper flash block sizes. + """ + sig = [] + sig.append(self.sendISP([0x30, 0x00, 0x00, 0x00])[3]) + sig.append(self.sendISP([0x30, 0x00, 0x01, 0x00])[3]) + sig.append(self.sendISP([0x30, 0x00, 0x02, 0x00])[3]) + return sig - def writeFlash(self, flashData): - """ - Write the flash data, needs to be implemented in a subclass. - """ - raise IspError("Called undefined writeFlash") + def chipErase(self): + """ + Do a full chip erase, clears all data, and lockbits. + """ + self.sendISP([0xAC, 0x80, 0x00, 0x00]) - def verifyFlash(self, flashData): - """ - Verify the flash data, needs to be implemented in a subclass. - """ - raise IspError("Called undefined verifyFlash") + def writeFlash(self, flashData): + """ + Write the flash data, needs to be implemented in a subclass. + """ + raise IspError("Called undefined writeFlash") + + def verifyFlash(self, flashData): + """ + Verify the flash data, needs to be implemented in a subclass. + """ + raise IspError("Called undefined verifyFlash") class IspError(): - def __init__(self, value): - self.value = value - def __str__(self): - return repr(self.value) + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) diff --git a/avr_isp/stk500v2.py b/avr_isp/stk500v2.py index d69f05cdd2..a3855109f1 100644 --- a/avr_isp/stk500v2.py +++ b/avr_isp/stk500v2.py @@ -1,8 +1,8 @@ """ STK500v2 protocol implementation for programming AVR chips. The STK500v2 protocol is used by the ArduinoMega2560 and a few other Arduino platforms to load firmware. +This is a python 3 conversion of the code created by David Braam for the Cura project. """ -__copyright__ = "Copyright (C) 2013 David Braam - Released under terms of the AGPLv3 License" import os, struct, sys, time from serial import Serial @@ -12,205 +12,207 @@ from serial import SerialTimeoutException from . import ispBase, intelHex class Stk500v2(ispBase.IspBase): - def __init__(self): - self.serial = None - self.seq = 1 - self.lastAddr = -1 - self.progressCallback = None - - def connect(self, port = 'COM22', speed = 115200): - if self.serial is not None: - self.close() - try: - self.serial = Serial(str(port), speed, timeout=1, writeTimeout=10000) - except SerialException as e: - raise ispBase.IspError("Failed to open serial port") - except: - raise ispBase.IspError("Unexpected error while connecting to serial port:" + port + ":" + str(sys.exc_info()[0])) - self.seq = 1 + def __init__(self): + self.serial = None + self.seq = 1 + self.lastAddr = -1 + self.progressCallback = None + + def connect(self, port = 'COM22', speed = 115200): + if self.serial is not None: + self.close() + try: + self.serial = Serial(str(port), speed, timeout=1, writeTimeout=10000) + except SerialException as e: + raise ispBase.IspError("Failed to open serial port") + except: + raise ispBase.IspError("Unexpected error while connecting to serial port:" + port + ":" + str(sys.exc_info()[0])) + self.seq = 1 - #Reset the controller - for n in xrange(0, 2): - self.serial.setDTR(True) - time.sleep(0.1) - self.serial.setDTR(False) - time.sleep(0.1) - time.sleep(0.2) + #Reset the controller + for n in range(0, 2): + self.serial.setDTR(True) + time.sleep(0.1) + self.serial.setDTR(False) + time.sleep(0.1) + time.sleep(0.2) - self.serial.flushInput() - self.serial.flushOutput() - if self.sendMessage([0x10, 0xc8, 0x64, 0x19, 0x20, 0x00, 0x53, 0x03, 0xac, 0x53, 0x00, 0x00]) != [0x10, 0x00]: - self.close() - raise ispBase.IspError("Failed to enter programming mode") + self.serial.flushInput() + self.serial.flushOutput() + if self.sendMessage([0x10, 0xc8, 0x64, 0x19, 0x20, 0x00, 0x53, 0x03, 0xac, 0x53, 0x00, 0x00]) != [0x10, 0x00]: + self.close() + raise ispBase.IspError("Failed to enter programming mode") - self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) - if self.sendMessage([0xEE])[1] == 0x00: - self._has_checksum = True - else: - self._has_checksum = False - self.serial.timeout = 5 + self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) + if self.sendMessage([0xEE])[1] == 0x00: + self._has_checksum = True + else: + self._has_checksum = False + self.serial.timeout = 5 - def close(self): - if self.serial is not None: - self.serial.close() - self.serial = None + def close(self): + if self.serial is not None: + self.serial.close() + self.serial = None #Leave ISP does not reset the serial port, only resets the device, and returns the serial port after disconnecting it from the programming interface. # This allows you to use the serial port without opening it again. - def leaveISP(self): - if self.serial is not None: - if self.sendMessage([0x11]) != [0x11, 0x00]: - raise ispBase.IspError("Failed to leave programming mode") - ret = self.serial - self.serial = None - return ret - return None - - def isConnected(self): - return self.serial is not None + def leaveISP(self): + if self.serial is not None: + if self.sendMessage([0x11]) != [0x11, 0x00]: + raise ispBase.IspError("Failed to leave programming mode") + ret = self.serial + self.serial = None + return ret + return None + + def isConnected(self): + return self.serial is not None - def hasChecksumFunction(self): - return self._has_checksum + def hasChecksumFunction(self): + return self._has_checksum - def sendISP(self, data): - recv = self.sendMessage([0x1D, 4, 4, 0, data[0], data[1], data[2], data[3]]) - return recv[2:6] - - def writeFlash(self, flashData): - #Set load addr to 0, in case we have more then 64k flash we need to enable the address extension - pageSize = self.chip['pageSize'] * 2 - flashSize = pageSize * self.chip['pageCount'] - if flashSize > 0xFFFF: - self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) - else: - self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00]) - - loadCount = (len(flashData) + pageSize - 1) / pageSize - for i in xrange(0, loadCount): - recv = self.sendMessage([0x13, pageSize >> 8, pageSize & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flashData[(i * pageSize):(i * pageSize + pageSize)]) - if self.progressCallback is not None: - if self._has_checksum: - self.progressCallback(i + 1, loadCount) - else: - self.progressCallback(i + 1, loadCount*2) - - def verifyFlash(self, flashData): - if self._has_checksum: - self.sendMessage([0x06, 0x00, (len(flashData) >> 17) & 0xFF, (len(flashData) >> 9) & 0xFF, (len(flashData) >> 1) & 0xFF]) - res = self.sendMessage([0xEE]) - checksum_recv = res[2] | (res[3] << 8) - checksum = 0 - for d in flashData: - checksum += d - checksum &= 0xFFFF - if hex(checksum) != hex(checksum_recv): - raise ispBase.IspError('Verify checksum mismatch: 0x%x != 0x%x' % (checksum & 0xFFFF, checksum_recv)) - else: - #Set load addr to 0, in case we have more then 64k flash we need to enable the address extension - flashSize = self.chip['pageSize'] * 2 * self.chip['pageCount'] - if flashSize > 0xFFFF: - self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) - else: - self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00]) + def sendISP(self, data): + recv = self.sendMessage([0x1D, 4, 4, 0, data[0], data[1], data[2], data[3]]) + return recv[2:6] + + def writeFlash(self, flashData): + #Set load addr to 0, in case we have more then 64k flash we need to enable the address extension + pageSize = self.chip['pageSize'] * 2 + flashSize = pageSize * self.chip['pageCount'] + if flashSize > 0xFFFF: + self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) + else: + self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00]) + + loadCount = (len(flashData) + pageSize - 1) / pageSize + for i in xrange(0, loadCount): + recv = self.sendMessage([0x13, pageSize >> 8, pageSize & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flashData[(i * pageSize):(i * pageSize + pageSize)]) + if self.progressCallback is not None: + if self._has_checksum: + self.progressCallback(i + 1, loadCount) + else: + self.progressCallback(i + 1, loadCount*2) + + def verifyFlash(self, flashData): + if self._has_checksum: + self.sendMessage([0x06, 0x00, (len(flashData) >> 17) & 0xFF, (len(flashData) >> 9) & 0xFF, (len(flashData) >> 1) & 0xFF]) + res = self.sendMessage([0xEE]) + checksum_recv = res[2] | (res[3] << 8) + checksum = 0 + for d in flashData: + checksum += d + checksum &= 0xFFFF + if hex(checksum) != hex(checksum_recv): + raise ispBase.IspError('Verify checksum mismatch: 0x%x != 0x%x' % (checksum & 0xFFFF, checksum_recv)) + else: + #Set load addr to 0, in case we have more then 64k flash we need to enable the address extension + flashSize = self.chip['pageSize'] * 2 * self.chip['pageCount'] + if flashSize > 0xFFFF: + self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) + else: + self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00]) - loadCount = (len(flashData) + 0xFF) / 0x100 - for i in xrange(0, loadCount): - recv = self.sendMessage([0x14, 0x01, 0x00, 0x20])[2:0x102] - if self.progressCallback is not None: - self.progressCallback(loadCount + i + 1, loadCount*2) - for j in xrange(0, 0x100): - if i * 0x100 + j < len(flashData) and flashData[i * 0x100 + j] != recv[j]: - raise ispBase.IspError('Verify error at: 0x%x' % (i * 0x100 + j)) + loadCount = (len(flashData) + 0xFF) / 0x100 + for i in xrange(0, loadCount): + recv = self.sendMessage([0x14, 0x01, 0x00, 0x20])[2:0x102] + if self.progressCallback is not None: + self.progressCallback(loadCount + i + 1, loadCount*2) + for j in xrange(0, 0x100): + if i * 0x100 + j < len(flashData) and flashData[i * 0x100 + j] != recv[j]: + raise ispBase.IspError('Verify error at: 0x%x' % (i * 0x100 + j)) - def sendMessage(self, data): - message = struct.pack(">BBHB", 0x1B, self.seq, len(data), 0x0E) - for c in data: - message += struct.pack(">B", c) - checksum = 0 - for c in message: - checksum ^= ord(c) - message += struct.pack(">B", checksum) - try: - self.serial.write(message) - self.serial.flush() - except SerialTimeoutException: - raise ispBase.IspError('Serial send timeout') - self.seq = (self.seq + 1) & 0xFF - return self.recvMessage() - - def recvMessage(self): - state = 'Start' - checksum = 0 - while True: - s = self.serial.read() - if len(s) < 1: - raise ispBase.IspError("Timeout") - b = struct.unpack(">B", s)[0] - checksum ^= b - #print(hex(b)) - if state == 'Start': - if b == 0x1B: - state = 'GetSeq' - checksum = 0x1B - elif state == 'GetSeq': - state = 'MsgSize1' - elif state == 'MsgSize1': - msgSize = b << 8 - state = 'MsgSize2' - elif state == 'MsgSize2': - msgSize |= b - state = 'Token' - elif state == 'Token': - if b != 0x0E: - state = 'Start' - else: - state = 'Data' - data = [] - elif state == 'Data': - data.append(b) - if len(data) == msgSize: - state = 'Checksum' - elif state == 'Checksum': - if checksum != 0: - state = 'Start' - else: - return data + def sendMessage(self, data): + message = struct.pack(">BBHB", 0x1B, self.seq, len(data), 0x0E) + for c in data: + message += struct.pack(">B", c) + checksum = 0 + print("messsage " , message) + for c in message: + print(c) + checksum ^= ord(c) + message += struct.pack(">B", checksum) + try: + self.serial.write(message) + self.serial.flush() + except SerialTimeoutException: + raise ispBase.IspError('Serial send timeout') + self.seq = (self.seq + 1) & 0xFF + return self.recvMessage() + + def recvMessage(self): + state = 'Start' + checksum = 0 + while True: + s = self.serial.read() + if len(s) < 1: + raise ispBase.IspError("Timeout") + b = struct.unpack(">B", s)[0] + checksum ^= b + #print(hex(b)) + if state == 'Start': + if b == 0x1B: + state = 'GetSeq' + checksum = 0x1B + elif state == 'GetSeq': + state = 'MsgSize1' + elif state == 'MsgSize1': + msgSize = b << 8 + state = 'MsgSize2' + elif state == 'MsgSize2': + msgSize |= b + state = 'Token' + elif state == 'Token': + if b != 0x0E: + state = 'Start' + else: + state = 'Data' + data = [] + elif state == 'Data': + data.append(b) + if len(data) == msgSize: + state = 'Checksum' + elif state == 'Checksum': + if checksum != 0: + state = 'Start' + else: + return data def portList(): - ret = [] - import _winreg - key=_winreg.OpenKey(_winreg.HKEY_LOCAL_MACHINE,"HARDWARE\\DEVICEMAP\\SERIALCOMM") - i=0 - while True: - try: - values = _winreg.EnumValue(key, i) - except: - return ret - if 'USBSER' in values[0]: - ret.append(values[1]) - i+=1 - return ret + ret = [] + import _winreg + key=_winreg.OpenKey(_winreg.HKEY_LOCAL_MACHINE,"HARDWARE\\DEVICEMAP\\SERIALCOMM") + i=0 + while True: + try: + values = _winreg.EnumValue(key, i) + except: + return ret + if 'USBSER' in values[0]: + ret.append(values[1]) + i+=1 + return ret def runProgrammer(port, filename): - """ Run an STK500v2 program on serial port 'port' and write 'filename' into flash. """ - programmer = Stk500v2() - programmer.connect(port = port) - programmer.programChip(intelHex.readHex(filename)) - programmer.close() + """ Run an STK500v2 program on serial port 'port' and write 'filename' into flash. """ + programmer = Stk500v2() + programmer.connect(port = port) + programmer.programChip(intelHex.readHex(filename)) + programmer.close() def main(): - """ Entry point to call the stk500v2 programmer from the commandline. """ - import threading - if sys.argv[1] == 'AUTO': - print(portList()) - for port in portList(): - threading.Thread(target=runProgrammer, args=(port,sys.argv[2])).start() - time.sleep(5) - else: - programmer = Stk500v2() - programmer.connect(port = sys.argv[1]) - programmer.programChip(intelHex.readHex(sys.argv[2])) - sys.exit(1) + """ Entry point to call the stk500v2 programmer from the commandline. """ + import threading + if sys.argv[1] == 'AUTO': + print(portList()) + for port in portList(): + threading.Thread(target=runProgrammer, args=(port,sys.argv[2])).start() + time.sleep(5) + else: + programmer = Stk500v2() + programmer.connect(port = sys.argv[1]) + programmer.programChip(intelHex.readHex(sys.argv[2])) + sys.exit(1) if __name__ == '__main__': - main() + main()