1939 lines
68 KiB
Python
1939 lines
68 KiB
Python
|
# NOTE: This script is test under Python 3.x
|
||
|
|
||
|
__copyright__ = "Copyright (C) 2020~2021 Nuvoton Technology Corp. All rights reserved"
|
||
|
__version__ = "v0.37"
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import argparse
|
||
|
import json
|
||
|
import crcmod
|
||
|
from Crypto.Cipher import AES
|
||
|
import hashlib
|
||
|
import ecdsa
|
||
|
import binascii
|
||
|
from datetime import datetime
|
||
|
import random
|
||
|
import shutil
|
||
|
from tqdm import tqdm
|
||
|
from xusbcom import XUsbComList
|
||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
from UnpackImage import UnpackImage
|
||
|
from collections import namedtuple
|
||
|
from struct import unpack
|
||
|
import time
|
||
|
import platform
|
||
|
# for debug
|
||
|
import usb.core
|
||
|
import usb.util
|
||
|
|
||
|
ACK = 0x55AA55AA
|
||
|
TRANSFER_SIZE = 4096
|
||
|
MAX_HEADER_IMG = 4
|
||
|
# SPI NOR align for erase/program starting address
|
||
|
SPINOR_ALIGN = 4096
|
||
|
|
||
|
# Storage device type
|
||
|
DEV_DDR_SRAM = 0
|
||
|
DEV_NAND = 1
|
||
|
DEV_SD_EMMC = 2
|
||
|
DEV_SPINOR = 3
|
||
|
DEV_SPINAND = 4
|
||
|
DEV_OTP = 6
|
||
|
DEV_USBH = 7
|
||
|
DEV_UNKNOWN = 0xFF
|
||
|
|
||
|
# For OTP programming
|
||
|
ACT_LOAD = 1
|
||
|
ACT_WRITE = 2
|
||
|
ACT_ERASE = 3
|
||
|
ACT_READ = 4
|
||
|
ACT_MSC = 5
|
||
|
|
||
|
# Command options
|
||
|
OPT_NONE = 0
|
||
|
OPT_SCRUB = 1 # For erase, use with care
|
||
|
OPT_WITHBAD = 1 # For read
|
||
|
OPT_EXECUTE = 2 # For write
|
||
|
OPT_VERIFY = 3 # For write
|
||
|
OPT_UNPACK = 4 # For pack
|
||
|
OPT_RAW = 5 # For write
|
||
|
OPT_EJECT = 6 # For msc
|
||
|
OPT_STUFF = 7 # For stuff pack, output could be used by dd command
|
||
|
OPT_SETINFO = 8 # For set storage info for attach
|
||
|
OPT_CONCAT = 9 # For convert, concatenate at the end of encrypted data file
|
||
|
OPT_SHOWHDR = 10 # For convert. Instead of convert, show header content instead
|
||
|
OPT_UNKNOWN = 0xFF # Error
|
||
|
|
||
|
|
||
|
# OPT block definitions
|
||
|
OPT_OTPBLK1 = 0x100
|
||
|
OPT_OTPBLK2 = 0x200
|
||
|
OPT_OTPBLK3 = 0x400
|
||
|
OPT_OTPBLK4 = 0x800
|
||
|
OPT_OTPBLK5 = 0x1000
|
||
|
OPT_OTPBLK6 = 0x2000
|
||
|
OPT_OTPBLK7 = 0x4000
|
||
|
OPT_OTPKEY = 0x8000
|
||
|
|
||
|
# for key lock
|
||
|
OPT_OTPKEY0 = 0x10000
|
||
|
OPT_OTPKEY1 = 0x20000
|
||
|
OPT_OTPKEY2 = 0x40000
|
||
|
OPT_OTPKEY3 = 0x80000
|
||
|
OPT_OTPKEY4 = 0x100000
|
||
|
OPT_OTPKEY5 = 0x200000
|
||
|
|
||
|
|
||
|
# Image type definitions
|
||
|
IMG_DATA = 0
|
||
|
IMG_TFA = 1
|
||
|
IMG_UBOOT = 2
|
||
|
IMG_LINUX = 3
|
||
|
IMG_DDR = 4
|
||
|
IMG_TEE = 5
|
||
|
IMG_DTB = 6
|
||
|
|
||
|
# If attach is a must. maybe better for real chip.
|
||
|
# devices = []
|
||
|
mp_mode = False
|
||
|
|
||
|
WINDOWS_PATH = "C:\\Program Files (x86)\\Nuvoton Tools\\NuWriter\\"
|
||
|
LINUX_PATH = "/usr/share/nuwriter/"
|
||
|
|
||
|
|
||
|
def conv_env(env_file_name, blk_size) -> bytearray:
|
||
|
|
||
|
try:
|
||
|
with open(env_file_name, "r") as env_file:
|
||
|
env_data = env_file.read().splitlines()
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {env_file_name} failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
out = bytearray(4) # Reserved for CRC
|
||
|
for lines in env_data:
|
||
|
out += bytes(lines, 'ascii')
|
||
|
out += b'\x00'
|
||
|
out += b'\x00'
|
||
|
out += b'\xFF' * (blk_size - len(out))
|
||
|
|
||
|
crc32_func = crcmod.predefined.mkCrcFun('crc-32')
|
||
|
checksum = crc32_func(out[4:])
|
||
|
out[0:4] = checksum.to_bytes(4, byteorder="little")
|
||
|
|
||
|
return out
|
||
|
|
||
|
|
||
|
def get_dpm(dpm) -> int:
|
||
|
return {
|
||
|
'a35sdsdis': 0x00000001,
|
||
|
'a35sdslock': 0x00000002,
|
||
|
'a35sndsdis': 0x00000004,
|
||
|
'a35sndslock': 0x00000008,
|
||
|
'a35nsdsdis': 0x00000010,
|
||
|
'a35nsdslock': 0x00000020,
|
||
|
'a35nsndsdis': 0x00000040,
|
||
|
'a35nsndslock': 0x00000080,
|
||
|
'm4dsdis': 0x00000100,
|
||
|
'm4dslock': 0x00000200,
|
||
|
'm4ndsdis': 0x00000400,
|
||
|
'm4ndslock': 0x00000800,
|
||
|
'extdis': 0x00001000,
|
||
|
'extlock': 0x00002000,
|
||
|
'exttdis': 0x00004000,
|
||
|
'exttlock': 0x00008000,
|
||
|
'giccfgsdis': 0x00010000,
|
||
|
'giccfgslock': 0x00020000
|
||
|
}.get(dpm, 0)
|
||
|
|
||
|
|
||
|
def get_plm(plm) -> int:
|
||
|
return {
|
||
|
'oem': 0x1,
|
||
|
'deploy': 0x3,
|
||
|
'rma': 0x7,
|
||
|
'prma': 0xF
|
||
|
}.get(plm, 0)
|
||
|
|
||
|
|
||
|
def conv_otp(opt_file_name) -> (bytearray, int):
|
||
|
try:
|
||
|
with open(opt_file_name, "r") as json_file:
|
||
|
try:
|
||
|
d = json.load(json_file)
|
||
|
except json.decoder.JSONDecodeError as err:
|
||
|
print(f"{opt_file_name} parsing error")
|
||
|
sys.exit(err)
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {opt_file_name} failed")
|
||
|
sys.exit(err)
|
||
|
# Bootcfg, DPM, PLM, and PWD 4 bytes each, MAC addr 8 bytes each, sec/nsec 88 bytes each
|
||
|
data = bytearray(208)
|
||
|
|
||
|
option = 0
|
||
|
for key in d.keys():
|
||
|
if key == 'boot_cfg':
|
||
|
cfg_val = 0
|
||
|
for sub_key in d['boot_cfg'].keys():
|
||
|
if sub_key == 'posotp':
|
||
|
if d['boot_cfg']['posotp'] == 'enable':
|
||
|
cfg_val |= 1
|
||
|
if sub_key == 'qspiclk':
|
||
|
if d['boot_cfg']['qspiclk'] == '50mhz':
|
||
|
cfg_val |= 2
|
||
|
if sub_key == 'wdt1en':
|
||
|
if d['boot_cfg']['wdt1en'] == 'enable':
|
||
|
cfg_val |= 4
|
||
|
if sub_key == 'uart0en':
|
||
|
if d['boot_cfg']['uart0en'] == 'disable':
|
||
|
cfg_val |= 0x10
|
||
|
if sub_key == 'sd0bken':
|
||
|
if d['boot_cfg']['sd0bken'] == 'enable':
|
||
|
cfg_val |= 0x20
|
||
|
if sub_key == 'tsiimg':
|
||
|
if d['boot_cfg']['tsiimg'] == 'enable':
|
||
|
cfg_val |= 0x40
|
||
|
if sub_key == 'tsidbg':
|
||
|
if d['boot_cfg']['tsidbg'] == 'disable':
|
||
|
cfg_val |= 0x80
|
||
|
if sub_key == 'bootsrc':
|
||
|
if d['boot_cfg']['bootsrc'] == 'sd' or d['boot_cfg']['bootsrc'] == 'emmc':
|
||
|
cfg_val |= 0x400
|
||
|
elif d['boot_cfg']['bootsrc'] == 'nand':
|
||
|
cfg_val |= 0x800
|
||
|
elif d['boot_cfg']['bootsrc'] == 'usb':
|
||
|
cfg_val |= 0xC00
|
||
|
if sub_key == 'page':
|
||
|
if d['boot_cfg']['page'] == '2k':
|
||
|
cfg_val |= 0x1000
|
||
|
elif d['boot_cfg']['page'] == '4k':
|
||
|
cfg_val |= 0x2000
|
||
|
elif d['boot_cfg']['page'] == '8k':
|
||
|
cfg_val |= 0x3000
|
||
|
if sub_key == 'option':
|
||
|
if d['boot_cfg']['option'] == 'sd1' or d['boot_cfg']['option'] == 'emmc1' or \
|
||
|
d['boot_cfg']['option'] == 't12' or d['boot_cfg']['option'] == 'spinand4':
|
||
|
cfg_val |= 0x4000
|
||
|
elif d['boot_cfg']['option'] == 't24' or d['boot_cfg']['option'] == 'spinor1':
|
||
|
cfg_val |= 0x8000
|
||
|
elif d['boot_cfg']['option'] == 'noecc' or d['boot_cfg']['option'] == 'spinor4':
|
||
|
cfg_val |= 0xC000
|
||
|
if sub_key == 'secboot':
|
||
|
if d['boot_cfg']['secboot'] == 'disable':
|
||
|
cfg_val |= 0x5A000000
|
||
|
data[0:4] = cfg_val.to_bytes(4, byteorder='little')
|
||
|
option |= OPT_OTPBLK1
|
||
|
elif key == 'dpm_plm':
|
||
|
for sub_key in d['dpm_plm'].keys():
|
||
|
if sub_key == 'dpm':
|
||
|
dpm_val = 0
|
||
|
for dpm_key in d['dpm_plm']['dpm'].keys():
|
||
|
dpm_val |= get_dpm(dpm_key)
|
||
|
if dpm_val != 0:
|
||
|
data[4:8] = dpm_val.to_bytes(4, byteorder='little')
|
||
|
elif sub_key == 'plm':
|
||
|
plm_val = get_plm(d['dpm_plm']['plm'])
|
||
|
if plm_val != 0:
|
||
|
data[8:12] = plm_val.to_bytes(4, byteorder='little')
|
||
|
option |= OPT_OTPBLK2
|
||
|
elif key == 'mac0':
|
||
|
data[12:18] = bytes.fromhex(d['mac0'])
|
||
|
option |= OPT_OTPBLK3
|
||
|
elif key == 'mac1':
|
||
|
data[20:26] = bytes.fromhex(d['mac1'])
|
||
|
option |= OPT_OTPBLK4
|
||
|
elif key == 'dplypwd':
|
||
|
data[28:32] = bytes.fromhex(d['dplypwd'])
|
||
|
option |= OPT_OTPBLK5
|
||
|
elif key == 'sec':
|
||
|
newkey = bytes.fromhex(d['sec'])
|
||
|
newkey += b'\x00' * (88 - len(newkey))
|
||
|
data[32:120] = newkey
|
||
|
option |= OPT_OTPBLK6
|
||
|
elif key == 'nonsec':
|
||
|
newkey = bytes.fromhex(d['nonsec'])
|
||
|
newkey += b'\x00' * (88 - len(newkey))
|
||
|
data[120:208] = newkey
|
||
|
option |= OPT_OTPBLK7
|
||
|
elif key == 'huk0':
|
||
|
newkey = bytes.fromhex(d['huk0']['key'])
|
||
|
if len(newkey) != 16:
|
||
|
print("HUK0 is 128-bit")
|
||
|
sys.exit(2)
|
||
|
newkey += b'\x00' * (32 - len(newkey))
|
||
|
# size - 128-bit
|
||
|
newkey += b'\x08\x00\x00\x00'
|
||
|
# key number - 0
|
||
|
newkey += b'\x00\x00\x00\x00'
|
||
|
# meta - owner: cpu, cpu readable
|
||
|
newkey += b'\x04\x00\x05\x80'
|
||
|
data += newkey
|
||
|
|
||
|
elif key == 'huk1':
|
||
|
newkey = bytes.fromhex(d['huk1']['key'])
|
||
|
if len(newkey) != 16:
|
||
|
print("HUK1 is 128-bit")
|
||
|
sys.exit(2)
|
||
|
newkey += b'\x00' * (32 - len(newkey))
|
||
|
# size - 128-bit
|
||
|
newkey += b'\x08\x00\x00\x00'
|
||
|
# key number - 1
|
||
|
newkey += b'\x01\x00\x00\x00'
|
||
|
# meta - owner: cpu, cpu readable
|
||
|
newkey += b'\x04\x00\x05\x80'
|
||
|
data += newkey
|
||
|
|
||
|
elif key == 'huk2':
|
||
|
newkey = bytes.fromhex(d['huk2']['key'])
|
||
|
if len(newkey) != 16:
|
||
|
print("HUK0 is 128-bit")
|
||
|
sys.exit(2)
|
||
|
newkey += b'\x00' * (32 - len(newkey))
|
||
|
# size - 128-bit
|
||
|
newkey += b'\x08\x00\x00\x00'
|
||
|
# key number - 2
|
||
|
newkey += b'\x02\x00\x00\x00'
|
||
|
# meta - owner: cpu, cpu readable
|
||
|
newkey += b'\x04\x00\x05\x80'
|
||
|
data += newkey
|
||
|
|
||
|
elif key == 'key3':
|
||
|
newkey = bytes.fromhex(d['key3']['key'])
|
||
|
if len(newkey) != 32:
|
||
|
print("key3 is 256-bit")
|
||
|
sys.exit(2)
|
||
|
newkey += b'\x00' * (32 - len(newkey))
|
||
|
# size - 256-bit
|
||
|
newkey += b'\x00\x01\x00\x00'
|
||
|
# key number - 3
|
||
|
newkey += b'\x03\x00\x00\x00'
|
||
|
|
||
|
if d['key3']['meta'] == 'aes256-unreadable':
|
||
|
newkey += b'\x00\x06\x00\x80'
|
||
|
elif d['key3']['meta'] == 'aes256-cpu-readable':
|
||
|
newkey += b'\x04\x06\x00\x80'
|
||
|
elif d['key3']['meta'] == 'sha256-unreadable':
|
||
|
newkey += b'\x00\x06\x01\x80'
|
||
|
elif d['key3']['meta'] == 'sha256-cpu-readable':
|
||
|
newkey += b'\x04\x06\x01\x80'
|
||
|
elif d['key3']['meta'] == 'eccp256-unreadable':
|
||
|
newkey += b'\x00\x06\x04\x80'
|
||
|
elif d['key3']['meta'] == 'eccp256-cpu-readable':
|
||
|
newkey += b'\x04\x06\x04\x80'
|
||
|
|
||
|
data += newkey
|
||
|
elif key == 'key4':
|
||
|
newkey = bytes.fromhex(d['key4']['key'])
|
||
|
if len(newkey) != 32:
|
||
|
print("key4 is 256-bit")
|
||
|
sys.exit(2)
|
||
|
newkey += b'\x00' * (32 - len(newkey))
|
||
|
# size - 256-bit
|
||
|
newkey += b'\x00\x01\x00\x00'
|
||
|
# key number - 4
|
||
|
newkey += b'\x04\x00\x00\x00'
|
||
|
|
||
|
if d['key4']['meta'] == 'aes256-unreadable':
|
||
|
newkey += b'\x00\x06\x00\x80'
|
||
|
elif d['key4']['meta'] == 'aes256-cpu-readable':
|
||
|
newkey += b'\x04\x06\x00\x80'
|
||
|
elif d['key4']['meta'] == 'sha256-unreadable':
|
||
|
newkey += b'\x00\x06\x01\x80'
|
||
|
elif d['key4']['meta'] == 'sha256-cpu-readable':
|
||
|
newkey += b'\x04\x06\x01\x80'
|
||
|
elif d['key4']['meta'] == 'eccp256-unreadable':
|
||
|
newkey += b'\x00\x06\x04\x80'
|
||
|
elif d['key4']['meta'] == 'eccp256-cpu-readable':
|
||
|
newkey += b'\x04\x06\x04\x80'
|
||
|
|
||
|
data += newkey
|
||
|
elif key == 'key5':
|
||
|
newkey = bytes.fromhex(d['key5']['key'])
|
||
|
if len(newkey) != 32:
|
||
|
print("key5 is 256-bit")
|
||
|
sys.exit(2)
|
||
|
newkey += b'\x00' * (32 - len(newkey))
|
||
|
# size - 256-bit
|
||
|
newkey += b'\x00\x01\x00\x00'
|
||
|
# key number - 5
|
||
|
newkey += b'\x05\x00\x00\x00'
|
||
|
|
||
|
if d['key5']['meta'] == 'aes256-unreadable':
|
||
|
newkey += b'\x00\x06\x00\x80'
|
||
|
elif d['key5']['meta'] == 'aes256-cpu-readable':
|
||
|
newkey += b'\x04\x06\x00\x80'
|
||
|
elif d['key5']['meta'] == 'sha256-unreadable':
|
||
|
newkey += b'\x00\x06\x01\x80'
|
||
|
elif d['key5']['meta'] == 'sha256-cpu-readable':
|
||
|
newkey += b'\x04\x06\x01\x80'
|
||
|
elif d['key5']['meta'] == 'eccp256-unreadable':
|
||
|
newkey += b'\x00\x06\x04\x80'
|
||
|
elif d['key5']['meta'] == 'eccp256-cpu-readable':
|
||
|
newkey += b'\x04\x06\x04\x80'
|
||
|
|
||
|
data += newkey
|
||
|
elif key == 'publicx':
|
||
|
newkey = bytes.fromhex(d['publicx'])
|
||
|
if len(newkey) != 32:
|
||
|
print("IBR publicx is 256-bit")
|
||
|
sys.exit(2)
|
||
|
data += bytes.fromhex(d['publicx'])
|
||
|
data += b'\x00\x01\x00\x00' # 256 bits
|
||
|
data += b'\x06\x00\x00\x00'
|
||
|
data += b'\x01\x06\x04\x80'
|
||
|
elif key == 'publicy':
|
||
|
newkey = bytes.fromhex(d['publicy'])
|
||
|
if len(newkey) != 32:
|
||
|
print("IBR publicy is 256-bit")
|
||
|
sys.exit(2)
|
||
|
data += bytes.fromhex(d['publicy'])
|
||
|
data += b'\x00\x01\x00\x00' # 256 bits
|
||
|
data += b'\x07\x00\x00\x00'
|
||
|
data += b'\x01\x06\x04\x80'
|
||
|
elif key == 'aeskey':
|
||
|
newkey = bytes.fromhex(d['aeskey'])
|
||
|
if len(newkey) != 32:
|
||
|
print("IBR aeskey is 256-bit")
|
||
|
sys.exit(2)
|
||
|
data += bytes.fromhex(d['aeskey'])
|
||
|
data += b'\x00\x01\x00\x00' # 256 bits
|
||
|
data += b'\x08\x00\x00\x00'
|
||
|
data += b'\x01\x06\x00\x80'
|
||
|
|
||
|
try:
|
||
|
with open("otp_data.bin", "wb") as out_file:
|
||
|
out_file.write(data[0:len(data)])
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open otp_data.bin failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
if len(data) > 208:
|
||
|
option |= OPT_OTPKEY
|
||
|
return data, option
|
||
|
|
||
|
|
||
|
def __img_erase(dev, media, start, length, option) -> int:
|
||
|
|
||
|
nand_align, spinand_align = dev.get_align()
|
||
|
|
||
|
if (media == DEV_NAND and nand_align == 0) or \
|
||
|
(media == DEV_SPINAND and spinand_align == 0):
|
||
|
print("Unable to get block size")
|
||
|
return -1
|
||
|
|
||
|
if (media == DEV_NAND and start % nand_align != 0) or\
|
||
|
(media == DEV_SPINAND and start % spinand_align != 0) or \
|
||
|
(media == DEV_SPINOR and start % SPINOR_ALIGN != 0):
|
||
|
print("Starting address must be block aligned")
|
||
|
return -1
|
||
|
|
||
|
cmd = start.to_bytes(8, byteorder='little')
|
||
|
cmd += length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_ERASE.to_bytes(4, byteorder='little')
|
||
|
cmd += option.to_bytes(4, byteorder='little')
|
||
|
|
||
|
dev.set_media(media)
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return -1
|
||
|
bar = tqdm(total=100, position=dev.get_id(), ascii=True)
|
||
|
previous_progress = 0
|
||
|
while True:
|
||
|
# xusb ack with total erase progress.
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") <= 100:
|
||
|
bar.update(int.from_bytes(ack, byteorder="little") - previous_progress)
|
||
|
previous_progress = int.from_bytes(ack, byteorder="little")
|
||
|
if int.from_bytes(ack, byteorder="little") == 100:
|
||
|
break
|
||
|
bar.close()
|
||
|
return 0
|
||
|
|
||
|
|
||
|
# default erase all (count=0)
|
||
|
def do_img_erase(media, start, length=0, option=OPT_NONE) -> None:
|
||
|
global mp_mode
|
||
|
|
||
|
# devices = XUsbComList(attach_all=mp_mode).get_dev()
|
||
|
_XUsbComList = XUsbComList(attach_all=mp_mode)
|
||
|
devices = _XUsbComList.get_dev()
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
|
||
|
with ThreadPoolExecutor(max_workers=8) as executor:
|
||
|
futures = [executor.submit(__img_erase, dev, media, start, length, option) for dev in devices]
|
||
|
success = 0
|
||
|
failed = 0
|
||
|
for future in as_completed(futures):
|
||
|
if future.result() == 0:
|
||
|
success += 1
|
||
|
else:
|
||
|
failed += 1
|
||
|
|
||
|
print(f"Successfully erased {success} device(s)")
|
||
|
if failed > 0:
|
||
|
print(f"Failed to erase {failed} device(s)")
|
||
|
|
||
|
|
||
|
def do_otp_erase(option) -> None:
|
||
|
global mp_mode
|
||
|
|
||
|
# devices = XUsbComList(attach_all=mp_mode).get_dev()
|
||
|
_XUsbComList = XUsbComList(attach_all=mp_mode)
|
||
|
devices = _XUsbComList.get_dev()
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
|
||
|
start = 0
|
||
|
length = 0
|
||
|
dev = devices[0]
|
||
|
load_otp_writer(dev)
|
||
|
cmd = start.to_bytes(8, byteorder='little')
|
||
|
cmd += length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_ERASE.to_bytes(4, byteorder='little')
|
||
|
cmd += option.to_bytes(4, byteorder='little')
|
||
|
|
||
|
dev.set_media(DEV_OTP)
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
print(f"Failed to erase device(s)")
|
||
|
|
||
|
# There's no way to tell the progress...
|
||
|
ack = dev.read(4)
|
||
|
data = int.from_bytes(ack, byteorder="little")
|
||
|
if option == 0x100:
|
||
|
data >>= 2
|
||
|
data &= 0x3
|
||
|
if option == 0x400:
|
||
|
data >>= 6
|
||
|
data &= 0x3
|
||
|
if option == 0x800:
|
||
|
data >>= 8
|
||
|
data &= 0x3
|
||
|
if option & 0x8000:
|
||
|
data >>= 16
|
||
|
#print(f"Erase count state {hex(data)}")
|
||
|
|
||
|
print(f"Successfully erased device(s)")
|
||
|
|
||
|
def load_otp_writer(dev) -> int:
|
||
|
try:
|
||
|
with open("otp_writer.bin", "rb") as writer_file:
|
||
|
otp_writer = writer_file.read()
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {opt_file_name} failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
option = 0
|
||
|
img_length = len(otp_writer)
|
||
|
cmd = b'\x00\x00\xf0\x86\x00\x00\x00\x00'
|
||
|
cmd += img_length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_LOAD.to_bytes(4, byteorder='little')
|
||
|
cmd += option.to_bytes(4, byteorder='little')
|
||
|
dev.set_media(DEV_OTP)
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return -1
|
||
|
|
||
|
for offset in range(0, img_length, TRANSFER_SIZE):
|
||
|
xfer_size = TRANSFER_SIZE if offset + TRANSFER_SIZE < img_length else img_length - offset
|
||
|
dev.write(otp_writer[offset: offset + xfer_size])
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != xfer_size:
|
||
|
print("Acked size error")
|
||
|
return -1
|
||
|
|
||
|
while True:
|
||
|
# wait TSI update firmware
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") == ACK:
|
||
|
break
|
||
|
|
||
|
return 0
|
||
|
|
||
|
def __otp_program(dev, otp_data, option) -> int:
|
||
|
|
||
|
img_length = len(otp_data)
|
||
|
|
||
|
cmd = b'\x00\x00\xf0\x86\x00\x00\x00\x00'
|
||
|
cmd += img_length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_WRITE.to_bytes(4, byteorder='little')
|
||
|
cmd += option.to_bytes(4, byteorder='little')
|
||
|
|
||
|
dev.set_media(DEV_OTP)
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return -1
|
||
|
# There's no way to tell the progress...
|
||
|
dev.write(otp_data)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != img_length:
|
||
|
print("Acked size error")
|
||
|
return -1
|
||
|
|
||
|
# There's no way to tell the progress...
|
||
|
ack = dev.read(4)
|
||
|
data = int.from_bytes(ack, byteorder="little")
|
||
|
#print(f"Can program count {hex(data)}")
|
||
|
|
||
|
return 0
|
||
|
|
||
|
|
||
|
def do_otp_program(opt_file_name) -> None:
|
||
|
global mp_mode
|
||
|
|
||
|
# devices = XUsbComList(attach_all=mp_mode).get_dev()
|
||
|
_XUsbComList = XUsbComList(attach_all=mp_mode)
|
||
|
devices = _XUsbComList.get_dev()
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
|
||
|
load_otp_writer(devices[0])
|
||
|
otp_data, option = conv_otp(opt_file_name)
|
||
|
|
||
|
with ThreadPoolExecutor(max_workers=8) as executor:
|
||
|
futures = [executor.submit(__otp_program, dev, otp_data, option) for dev in devices]
|
||
|
success = 0
|
||
|
failed = 0
|
||
|
for future in as_completed(futures):
|
||
|
if future.result() == 0:
|
||
|
success += 1
|
||
|
else:
|
||
|
failed += 1
|
||
|
|
||
|
print(f"Successfully programmed {success} device(s)")
|
||
|
if failed > 0:
|
||
|
print(f"Failed to program {failed} device(s)")
|
||
|
|
||
|
|
||
|
def do_otp_read(media, start, out_file_name, length=0x1, option=OPT_NONE) -> None:
|
||
|
global mp_mode
|
||
|
|
||
|
# devices = XUsbComList(attach_all=mp_mode).get_dev()
|
||
|
_XUsbComList = XUsbComList(attach_all=mp_mode)
|
||
|
devices = _XUsbComList.get_dev()
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
|
||
|
# Only support one device in read function
|
||
|
dev = devices[0]
|
||
|
load_otp_writer(dev)
|
||
|
|
||
|
cmd = start.to_bytes(8, byteorder='little')
|
||
|
cmd += length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_READ.to_bytes(4, byteorder='little')
|
||
|
cmd += option.to_bytes(4, byteorder='little')
|
||
|
|
||
|
dev.set_media(media)
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return
|
||
|
# FIXME: Don't know real length for "read all"
|
||
|
bar = tqdm(total=length, ascii=True)
|
||
|
data = b''
|
||
|
remain = length
|
||
|
|
||
|
while remain > 0:
|
||
|
ack = dev.read(4)
|
||
|
# Get the transfer length of next read
|
||
|
xfer_size = int.from_bytes(ack, byteorder="little")
|
||
|
|
||
|
data += dev.read(xfer_size)
|
||
|
dev.write(xfer_size.to_bytes(4, byteorder='little')) # ack
|
||
|
remain -= xfer_size
|
||
|
bar.update(xfer_size)
|
||
|
try:
|
||
|
with open(out_file_name, "wb") as out_file:
|
||
|
out_file.write(data[0:length])
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {out_file_name} failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
bar.close()
|
||
|
|
||
|
|
||
|
def __pack_program(dev, media, pack_image, option) -> int:
|
||
|
|
||
|
nand_align, spinand_align = dev.get_align()
|
||
|
image_cnt = pack_image.img_count()
|
||
|
|
||
|
if (media == DEV_NAND and nand_align == 0) or \
|
||
|
(media == DEV_SPINAND and spinand_align == 0):
|
||
|
print("Unable to get block size")
|
||
|
return -1
|
||
|
|
||
|
for i in range(image_cnt):
|
||
|
img_length, img_start, img_type = pack_image.img_attr(i)
|
||
|
if (media == DEV_NAND and img_start % nand_align != 0) or \
|
||
|
(media == DEV_SPINAND and img_start % spinand_align != 0) or \
|
||
|
(media == DEV_SPINOR and img_start % SPINOR_ALIGN != 0):
|
||
|
print("Starting address must be block aligned")
|
||
|
return -1
|
||
|
time.sleep(1)
|
||
|
dev.set_media(media)
|
||
|
cmd = img_start.to_bytes(8, byteorder='little')
|
||
|
cmd += img_length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_WRITE.to_bytes(4, byteorder='little')
|
||
|
cmd += img_type.to_bytes(4, byteorder='little')
|
||
|
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return -1
|
||
|
text = f"Programming {i}/{image_cnt}"
|
||
|
bar = tqdm(total=img_length, position=dev.get_id(), ascii=True, desc=text)
|
||
|
for offset in range(0, img_length, TRANSFER_SIZE):
|
||
|
xfer_size = TRANSFER_SIZE if offset + TRANSFER_SIZE < img_length else img_length - offset
|
||
|
dev.write(pack_image.img_content(i, offset, xfer_size))
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != xfer_size:
|
||
|
print("Ack size error")
|
||
|
return -1
|
||
|
bar.update(xfer_size)
|
||
|
bar.close()
|
||
|
dev.read(4)
|
||
|
|
||
|
# FIXME: Added time.sleep(1) to make SPI NAND Pack Program + Verify PASS
|
||
|
time.sleep(1)
|
||
|
|
||
|
if option == OPT_VERIFY:
|
||
|
dev.set_media(media)
|
||
|
cmd = img_start.to_bytes(8, byteorder='little')
|
||
|
cmd += img_length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_READ.to_bytes(4, byteorder='little')
|
||
|
cmd += b'\x00' * 4
|
||
|
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return -1
|
||
|
remain = img_length
|
||
|
text = f"Verifying {i}/{image_cnt}"
|
||
|
bar = tqdm(total=img_length, position=dev.get_id(), ascii=True, desc=text)
|
||
|
while remain > 0:
|
||
|
ack = dev.read(4)
|
||
|
# Get the transfer length of next read
|
||
|
xfer_size = int.from_bytes(ack, byteorder="little")
|
||
|
|
||
|
data = dev.read(xfer_size)
|
||
|
dev.write(xfer_size.to_bytes(4, byteorder='little'))
|
||
|
offset = img_length - remain
|
||
|
|
||
|
# For SD/eMMC
|
||
|
if xfer_size > remain:
|
||
|
xfer_size = remain
|
||
|
data = data[0: remain]
|
||
|
|
||
|
if data != bytearray(pack_image.img_content(i, offset, xfer_size)):
|
||
|
print("Verify failed")
|
||
|
return -1
|
||
|
remain -= xfer_size
|
||
|
bar.update(xfer_size)
|
||
|
bar.close()
|
||
|
return 0
|
||
|
|
||
|
|
||
|
def do_pack_program(media, pack_file_name, option=OPT_NONE) -> None:
|
||
|
global mp_mode
|
||
|
|
||
|
# devices = XUsbComList(attach_all=mp_mode).get_dev()
|
||
|
_XUsbComList = XUsbComList(attach_all=mp_mode)
|
||
|
devices = _XUsbComList.get_dev()
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
|
||
|
pack_image = UnpackImage(pack_file_name)
|
||
|
with ThreadPoolExecutor(max_workers=8) as executor:
|
||
|
futures = [executor.submit(__pack_program, dev, media, pack_image, option) for dev in devices]
|
||
|
success = 0
|
||
|
failed = 0
|
||
|
for future in as_completed(futures):
|
||
|
if future.result() == 0:
|
||
|
success += 1
|
||
|
else:
|
||
|
failed += 1
|
||
|
|
||
|
print(f"Successfully programmed {success} device(s)")
|
||
|
if failed > 0:
|
||
|
print(f"Failed to program {failed} device(s)")
|
||
|
|
||
|
|
||
|
def __img_program(dev, media, start, img_data, option) -> int:
|
||
|
|
||
|
nand_align, spinand_align = dev.get_align()
|
||
|
|
||
|
if (media == DEV_NAND and nand_align == 0) or \
|
||
|
(media == DEV_SPINAND and spinand_align == 0):
|
||
|
print("Unable to get block size")
|
||
|
return -1
|
||
|
|
||
|
if (media == DEV_NAND and start % nand_align != 0) or\
|
||
|
(media == DEV_SPINAND and start % spinand_align != 0) or \
|
||
|
(media == DEV_SPINOR and start % SPINOR_ALIGN != 0):
|
||
|
print("Starting address must be block aligned")
|
||
|
return -1
|
||
|
|
||
|
img_length = len(img_data)
|
||
|
print(f"image length is {img_length}")
|
||
|
cmd = start.to_bytes(8, byteorder='little')
|
||
|
cmd += img_length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_WRITE.to_bytes(4, byteorder='little')
|
||
|
if option == OPT_EXECUTE:
|
||
|
cmd += option.to_bytes(4, byteorder='little')
|
||
|
else:
|
||
|
cmd += b'\x00' * 4
|
||
|
|
||
|
dev.set_media(media)
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return -1
|
||
|
|
||
|
# Set ascii=True is for Windows cmd terminal, position > 0 doesn't work as expected in cmd though...
|
||
|
bar = tqdm(total=img_length, position=dev.get_id(), ascii=True, desc="Programming")
|
||
|
for offset in range(0, img_length, TRANSFER_SIZE):
|
||
|
xfer_size = TRANSFER_SIZE if offset + TRANSFER_SIZE < img_length else img_length - offset
|
||
|
dev.write(img_data[offset: offset + xfer_size])
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != xfer_size:
|
||
|
print("Ack size error")
|
||
|
return -1
|
||
|
bar.update(xfer_size)
|
||
|
dev.read(4)
|
||
|
bar.close()
|
||
|
if option == OPT_VERIFY:
|
||
|
dev.set_media(media)
|
||
|
cmd = start.to_bytes(8, byteorder='little')
|
||
|
cmd += img_length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_READ.to_bytes(4, byteorder='little')
|
||
|
cmd += b'\x00' * 4
|
||
|
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return -1
|
||
|
|
||
|
remain = img_length
|
||
|
bar = tqdm(total=img_length, position=dev.get_id(), ascii=True, desc="Verifying")
|
||
|
while remain > 0:
|
||
|
ack = dev.read(4)
|
||
|
# Get the transfer length of next read
|
||
|
xfer_size = int.from_bytes(ack, byteorder="little")
|
||
|
|
||
|
data = dev.read(xfer_size)
|
||
|
dev.write(xfer_size.to_bytes(4, byteorder='little')) # ack
|
||
|
offset = img_length - remain
|
||
|
|
||
|
# For SD/eMMC
|
||
|
if xfer_size > remain:
|
||
|
xfer_size = remain
|
||
|
data = data[0: remain]
|
||
|
|
||
|
if data != bytearray(img_data[offset: offset + xfer_size]):
|
||
|
print("Verify failed")
|
||
|
return -1
|
||
|
remain -= xfer_size
|
||
|
bar.update(xfer_size)
|
||
|
print("Verify pass")
|
||
|
bar.close()
|
||
|
return 0
|
||
|
|
||
|
|
||
|
def do_img_program(media, start, image_file_name, option=OPT_NONE) -> None:
|
||
|
global mp_mode
|
||
|
|
||
|
# devices = XUsbComList(attach_all=mp_mode).get_dev()
|
||
|
_XUsbComList = XUsbComList(attach_all=mp_mode)
|
||
|
devices = _XUsbComList.get_dev()
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
try:
|
||
|
with open(image_file_name, "rb") as image_file:
|
||
|
img_data = image_file.read()
|
||
|
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {image_file_name} failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
with ThreadPoolExecutor(max_workers=8) as executor:
|
||
|
futures = [executor.submit(__img_program, dev, media, start, img_data, option) for dev in devices]
|
||
|
success = 0
|
||
|
failed = 0
|
||
|
for future in as_completed(futures):
|
||
|
if future.result() == 0:
|
||
|
success += 1
|
||
|
else:
|
||
|
failed += 1
|
||
|
|
||
|
print(f"Successfully programmed {success} device(s)")
|
||
|
if failed > 0:
|
||
|
print(f"Failed to program {failed} device(s)")
|
||
|
|
||
|
|
||
|
def do_img_read(media, start, out_file_name, length=0x1, option=OPT_NONE) -> None:
|
||
|
# only support read from 1 device
|
||
|
# devices = XUsbComList(attach_all=False).get_dev()
|
||
|
_XUsbComList = XUsbComList(attach_all=False)
|
||
|
devices = _XUsbComList.get_dev()
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
# Only support one device in read function
|
||
|
dev = devices[0]
|
||
|
|
||
|
cmd = start.to_bytes(8, byteorder='little')
|
||
|
cmd += length.to_bytes(8, byteorder='little')
|
||
|
cmd += ACT_READ.to_bytes(4, byteorder='little')
|
||
|
cmd += option.to_bytes(4, byteorder='little')
|
||
|
|
||
|
dev.set_media(media)
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return
|
||
|
# FIXME: Don't know real length for "read all"
|
||
|
bar = tqdm(total=length, ascii=True)
|
||
|
data = b''
|
||
|
remain = length
|
||
|
|
||
|
while remain > 0:
|
||
|
ack = dev.read(4)
|
||
|
# Get the transfer length of next read
|
||
|
xfer_size = int.from_bytes(ack, byteorder="little")
|
||
|
|
||
|
data += dev.read(xfer_size)
|
||
|
dev.write(xfer_size.to_bytes(4, byteorder='little')) # ack
|
||
|
remain -= xfer_size
|
||
|
bar.update(xfer_size)
|
||
|
try:
|
||
|
with open(out_file_name, "wb") as out_file:
|
||
|
out_file.write(data[0:length])
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {out_file_name} failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
bar.close()
|
||
|
|
||
|
|
||
|
def __attach(dev, ini_data, xusb_data) -> int:
|
||
|
ini_len = len(ini_data)
|
||
|
out = int(ini_len).to_bytes(4, byteorder="little")
|
||
|
out += b'\x00\x00\x03\x28' # Execute address is 0x28030000
|
||
|
dev.write(out)
|
||
|
dev.write(ini_data)
|
||
|
|
||
|
in_buf = dev.read(4)
|
||
|
if int.from_bytes(in_buf, byteorder="little") != ini_len:
|
||
|
print("Length error")
|
||
|
return -1
|
||
|
|
||
|
in_buf = dev.read(4)
|
||
|
if int.from_bytes(in_buf, byteorder="little") != ACK:
|
||
|
val = int.from_bytes(in_buf, byteorder="little")
|
||
|
print(f"Ack error {val}")
|
||
|
return -1
|
||
|
|
||
|
xusb_len = len(xusb_data)
|
||
|
out = int(xusb_len).to_bytes(4, byteorder="little")
|
||
|
out += b'\x00\x00\x00\x87' # Execute address is 0x87000000
|
||
|
dev.write(out)
|
||
|
for offset in range(0, xusb_len, TRANSFER_SIZE):
|
||
|
xfer_size = TRANSFER_SIZE if offset + TRANSFER_SIZE < xusb_len else xusb_len - offset
|
||
|
|
||
|
dev.write(xusb_data[offset: offset + xfer_size])
|
||
|
if offset + xfer_size != xusb_len: # Ignore the ack of last packet
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != xfer_size:
|
||
|
_ack = int.from_bytes(ack, byteorder="little")
|
||
|
print(f"Ack size error {_ack} {xfer_size}")
|
||
|
return -1
|
||
|
|
||
|
return 0
|
||
|
|
||
|
|
||
|
def __get_info(dev, data) -> int:
|
||
|
try:
|
||
|
info = dev.get_info(data)
|
||
|
except usb.core.USBError as err:
|
||
|
sys.exit(err)
|
||
|
|
||
|
_info_struct = namedtuple('_info_struct',
|
||
|
'page_per_blk page_size blk_cnt bad_clk_cnt oob_size usr_cfg0 spi_id usr_cfg1 quad_cmd \
|
||
|
read_sts_cmd write_sts_cmd sts_val dummy_byte blk rsv use_cfg2 snand_id snand_page_size \
|
||
|
snand_oob snand_quad_cmd snand_read_sts_cmd snand_write_sts_cmd snand_sts_val \
|
||
|
snand_dummy_byte snand_blk_cnt snand_page_per_blk')
|
||
|
info_struct = _info_struct._make(unpack('<IIIIIIIIBBBBIIIIIHHBBBBIII', info))
|
||
|
print("==== NAND ====")
|
||
|
print("Page per block: " + str(info_struct.page_per_blk))
|
||
|
print("Page size: " + str(info_struct.page_size))
|
||
|
print("Block per flash: " + str(info_struct.blk_cnt))
|
||
|
print("Bad block count: " + str(info_struct.bad_clk_cnt))
|
||
|
print("Spare size: " + str(info_struct.oob_size))
|
||
|
print("Is uer config: " + str(info_struct.usr_cfg0))
|
||
|
|
||
|
print("==== SPI NOR ====")
|
||
|
print("ID: " + str(info_struct.spi_id))
|
||
|
print("Is uer config: " + str(info_struct.usr_cfg1))
|
||
|
print("Quad read cmd: " + str(info_struct.quad_cmd))
|
||
|
print("Read sts cmd: " + str(info_struct.read_sts_cmd))
|
||
|
print("Write sts cmd: " + str(info_struct.write_sts_cmd))
|
||
|
print("Sts value: " + str(info_struct.sts_val))
|
||
|
print("Dummy byte: " + str(info_struct.dummy_byte))
|
||
|
|
||
|
print("==== eMMC ====")
|
||
|
print("Block: " + str(info_struct.blk))
|
||
|
print("Reserved: " + str(info_struct.rsv))
|
||
|
|
||
|
print("==== SPI NAND ====")
|
||
|
print("Is uer config: " + str(info_struct.use_cfg2))
|
||
|
print("ID: " + str(info_struct.snand_id))
|
||
|
print("Page size: " + str(info_struct.snand_page_size))
|
||
|
print("Spare size: " + str(info_struct.snand_oob))
|
||
|
print("Quad read cmd: " + str(info_struct.snand_quad_cmd))
|
||
|
print("Read sts cmd: " + str(info_struct.snand_read_sts_cmd))
|
||
|
print("Write sts cmd: " + str(info_struct.snand_write_sts_cmd))
|
||
|
print("Sts value: " + str(info_struct.snand_sts_val))
|
||
|
print("Dummy byte: " + str(info_struct.snand_dummy_byte))
|
||
|
print("Block per flash: " + str(info_struct.snand_blk_cnt))
|
||
|
print("Page per block: " + str(info_struct.snand_page_per_blk))
|
||
|
|
||
|
dev.set_align(info_struct.page_size * info_struct.page_per_blk,
|
||
|
info_struct.snand_page_size * info_struct.snand_page_per_blk)
|
||
|
|
||
|
return 0
|
||
|
|
||
|
|
||
|
def do_attach(ini_file_name, option=OPT_NONE) -> None:
|
||
|
global mp_mode
|
||
|
|
||
|
init_location = "missing"
|
||
|
if os.path.exists(ini_file_name): # default use the init file in current directory
|
||
|
init_location = ini_file_name
|
||
|
else:
|
||
|
if platform.system() == 'Windows':
|
||
|
if os.path.exists(WINDOWS_PATH + "ddrimg\\" + ini_file_name):
|
||
|
init_location = WINDOWS_PATH + "ddrimg\\" + ini_file_name
|
||
|
elif platform.system() == 'Linux':
|
||
|
if os.path.exists(LINUX_PATH + "ddrimg/" + ini_file_name):
|
||
|
init_location = LINUX_PATH + "ddrimg/" + ini_file_name
|
||
|
|
||
|
if init_location == "missing":
|
||
|
print(f"Cannot find {ini_file_name}")
|
||
|
sys.exit(3)
|
||
|
try:
|
||
|
with open(init_location, "rb") as ini_file:
|
||
|
ini_data = ini_file.read()
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {ini_file_name} failed")
|
||
|
sys.exit(err)
|
||
|
xusb_location = "missing"
|
||
|
if os.path.exists("xusb.bin"): # default use the xusb.bin in current directory
|
||
|
xusb_location = "xusb.bin"
|
||
|
else:
|
||
|
if platform.system() == 'Windows':
|
||
|
if os.path.exists(WINDOWS_PATH + "xusb.bin"):
|
||
|
xusb_location = WINDOWS_PATH + "xusb.bin"
|
||
|
elif platform.system() == 'Linux':
|
||
|
if os.path.exists(LINUX_PATH + "xusb.bin"):
|
||
|
xusb_location = LINUX_PATH + "xusb.bin"
|
||
|
if xusb_location == "missing":
|
||
|
print("Cannot find xusb.bin")
|
||
|
sys.exit(3)
|
||
|
|
||
|
try:
|
||
|
with open(xusb_location, "rb") as xusb_file:
|
||
|
xusb_data = xusb_file.read()
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Open xusb.bin failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
# devices = XUsbComList(attach_all=mp_mode).get_dev()
|
||
|
_XUsbComList = XUsbComList(attach_all=mp_mode)
|
||
|
devices = _XUsbComList.get_dev()
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
|
||
|
with ThreadPoolExecutor(max_workers=8) as executor:
|
||
|
futures = [executor.submit(__attach, dev, ini_data, xusb_data) for dev in devices]
|
||
|
success = 0
|
||
|
failed = 0
|
||
|
for future in as_completed(futures, timeout=2):
|
||
|
if future.result() == 0:
|
||
|
success += 1
|
||
|
else:
|
||
|
failed += 1
|
||
|
|
||
|
print(f"Successfully attached {success} device(s)")
|
||
|
if failed > 0:
|
||
|
print(f"Failed to attach {failed} device(s)")
|
||
|
if success == 0:
|
||
|
return
|
||
|
|
||
|
time.sleep(1)
|
||
|
|
||
|
# devices = XUsbComList(attach_all=mp_mode).get_dev()
|
||
|
_XUsbComListNew = XUsbComList(attach_all=mp_mode)
|
||
|
devices = _XUsbComListNew.get_dev()
|
||
|
|
||
|
data = bytearray(76)
|
||
|
# assign option file to set media info
|
||
|
if option == OPT_SETINFO:
|
||
|
try:
|
||
|
with open("info.json", "r") as json_file:
|
||
|
try:
|
||
|
d = json.load(json_file)
|
||
|
except json.decoder.JSONDecodeError as err:
|
||
|
print(f"{json_file} parsing error")
|
||
|
sys.exit(err)
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Open info.json failed")
|
||
|
sys.exit(err)
|
||
|
# now generate info from info.json
|
||
|
for key in d.keys():
|
||
|
if key == 'spinand':
|
||
|
data[48] = 1
|
||
|
for sub_key in d['spinand'].keys():
|
||
|
if sub_key == 'pagesize':
|
||
|
data[56:58] = int(d['spinand']['pagesize'], 0).to_bytes(2, byteorder="little")
|
||
|
elif sub_key == 'sparearea':
|
||
|
data[58:60] = int(d['spinand']['sparearea'], 0).to_bytes(2, byteorder="little")
|
||
|
elif sub_key == 'quadread':
|
||
|
data[60:61] = int(d['spinand']['quadread'], 0).to_bytes(1, byteorder="little")
|
||
|
elif sub_key == 'readsts':
|
||
|
data[61:62] = int(d['spinand']['readsts'], 0).to_bytes(1, byteorder="little")
|
||
|
elif sub_key == 'writests':
|
||
|
data[62:63] = int(d['spinand']['writests'], 0).to_bytes(1, byteorder="little")
|
||
|
elif sub_key == 'stsvalue':
|
||
|
data[63:64] = int(d['spinand']['stsvalue'], 0).to_bytes(1, byteorder="little")
|
||
|
elif sub_key == 'dummy':
|
||
|
data[64:68] = int(d['spinand']['dummy'], 0).to_bytes(4, byteorder="little")
|
||
|
elif sub_key == 'blkcnt':
|
||
|
data[68:72] = int(d['spinand']['blkcnt'], 0).to_bytes(4, byteorder="little")
|
||
|
elif sub_key == 'pageperblk':
|
||
|
data[72:76] = int(d['spinand']['pageperblk'], 0).to_bytes(4, byteorder="little")
|
||
|
elif key == 'spinor':
|
||
|
data[28] = 1
|
||
|
for sub_key in d['spinor'].keys():
|
||
|
if sub_key == 'quadread':
|
||
|
data[32:33] = int(d['spinor']['quadread'], 0).to_bytes(1, byteorder="little")
|
||
|
elif sub_key == 'readsts':
|
||
|
data[33:34] = int(d['spinor']['readsts'], 0).to_bytes(1, byteorder="little")
|
||
|
elif sub_key == 'writests':
|
||
|
data[34:35] = int(d['spinor']['writests'], 0).to_bytes(1, byteorder="little")
|
||
|
elif sub_key == 'stsvalue':
|
||
|
data[35:36] = int(d['spinor']['stsvalue'], 0).to_bytes(1, byteorder="little")
|
||
|
elif sub_key == 'dummy':
|
||
|
data[36:40] = int(d['spinor']['dummy'], 0).to_bytes(4, byteorder="little")
|
||
|
elif key == 'nand':
|
||
|
data[20] = 1
|
||
|
for sub_key in d['nand'].keys():
|
||
|
if sub_key == 'blkcnt':
|
||
|
data[8:12] = int(d['nand']['blkcnt'], 0).to_bytes(4, byteorder="little")
|
||
|
elif sub_key == 'pageperblk':
|
||
|
data[0:4] = int(d['nand']['pageperblk'], 0).to_bytes(4, byteorder="little")
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
with ThreadPoolExecutor(max_workers=8) as executor:
|
||
|
futures = [executor.submit(__get_info, dev, data) for dev in devices]
|
||
|
|
||
|
success = 0
|
||
|
failed = 0
|
||
|
for future in as_completed(futures, timeout=2):
|
||
|
if future.result() == 0:
|
||
|
success += 1
|
||
|
else:
|
||
|
failed += 1
|
||
|
|
||
|
print(f"Successfully get info from {success} device(s)")
|
||
|
|
||
|
|
||
|
def do_unpack(pack_file_name) -> None:
|
||
|
|
||
|
now = datetime.now()
|
||
|
pack_image = UnpackImage(pack_file_name)
|
||
|
image_cnt = pack_image.img_count()
|
||
|
|
||
|
try:
|
||
|
os.mkdir(now.strftime("%m%d-%H%M%S%f"))
|
||
|
except (IOError, OSError) as err:
|
||
|
sys.exit(err)
|
||
|
|
||
|
for i in range(image_cnt):
|
||
|
img_length, _, _ = pack_image.img_attr(i)
|
||
|
try:
|
||
|
with open(now.strftime("%m%d-%H%M%S%f") + "/img" + str(i) + ".bin", "wb") as img_file:
|
||
|
img_file.write(pack_image.img_content(i, 0, img_length))
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Create output image file failed")
|
||
|
sys.exit(err)
|
||
|
try:
|
||
|
os.unlink("unpack")
|
||
|
except (IOError, OSError):
|
||
|
pass
|
||
|
try:
|
||
|
os.symlink(now.strftime("%m%d-%H%M%S%f"), "unpack")
|
||
|
except (IOError, OSError):
|
||
|
print("Create symbolic folder unpack failed")
|
||
|
print("Unpack images to directory {} complete".format(now.strftime("%m%d-%H%M%S%f")))
|
||
|
|
||
|
|
||
|
def do_stuff(cfg_file) -> None:
|
||
|
now = datetime.now()
|
||
|
|
||
|
try:
|
||
|
with open(cfg_file, "r") as json_file:
|
||
|
try:
|
||
|
d = json.load(json_file)
|
||
|
except json.decoder.JSONDecodeError as err:
|
||
|
print(f"{cfg_file} parsing error")
|
||
|
sys.exit(err)
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {cfg_file} failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
try:
|
||
|
os.mkdir(now.strftime("%m%d-%H%M%S%f"))
|
||
|
pack_file = open(now.strftime("%m%d-%H%M%S%f") + "/pack.bin", "wb")
|
||
|
except (IOError, OSError) as err:
|
||
|
sys.exit(err)
|
||
|
|
||
|
offset = 0
|
||
|
out = bytearray()
|
||
|
|
||
|
# Start stuffing image
|
||
|
for img in d["image"]:
|
||
|
try:
|
||
|
with open(img["file"], "rb") as img_file:
|
||
|
data = img_file.read()
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {img_file} failed")
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
if int(img["offset"], 0) < offset:
|
||
|
print(f"Please place the files in {cfg_file} based on the ascending offset")
|
||
|
sys.exit(4)
|
||
|
elif int(img["offset"], 0) > offset:
|
||
|
out += b'\xFF' * (int(img["offset"], 0) - offset)
|
||
|
offset = int(img["offset"], 0)
|
||
|
out += data
|
||
|
offset += len(data)
|
||
|
pack_file.write(out)
|
||
|
pack_file.close()
|
||
|
try:
|
||
|
os.unlink("pack")
|
||
|
except (IOError, OSError):
|
||
|
pass
|
||
|
try:
|
||
|
os.symlink(now.strftime("%m%d-%H%M%S%f"), "pack")
|
||
|
except (IOError, OSError):
|
||
|
print("Create symbolic folder pack failed")
|
||
|
print("Generate pack file in directory {} complete".format(now.strftime("%m%d-%H%M%S%f")))
|
||
|
|
||
|
|
||
|
def do_pack(cfg_file) -> None:
|
||
|
now = datetime.now()
|
||
|
|
||
|
try:
|
||
|
with open(cfg_file, "r") as json_file:
|
||
|
try:
|
||
|
d = json.load(json_file)
|
||
|
except json.decoder.JSONDecodeError as err:
|
||
|
print(f"{cfg_file} parsing error")
|
||
|
sys.exit(err)
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {cfg_file} failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
try:
|
||
|
os.mkdir(now.strftime("%m%d-%H%M%S%f"))
|
||
|
pack_file = open(now.strftime("%m%d-%H%M%S%f") + "/pack.bin", "wb")
|
||
|
except (IOError, OSError) as err:
|
||
|
sys.exit(err)
|
||
|
|
||
|
out = bytearray(b'\x20\x54\x56\x4e' + b'\xFF' * 12) # NVT + CRC32 + image count + 4 reserved bytes
|
||
|
|
||
|
# Start packing image
|
||
|
img_cnt = 0
|
||
|
for img in d["image"]:
|
||
|
try:
|
||
|
with open(img["file"], "rb") as img_file:
|
||
|
data = img_file.read()
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {img_file} failed")
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
|
||
|
img_cnt = img_cnt + 1
|
||
|
|
||
|
img_len = len(data)
|
||
|
out += img_len.to_bytes(8, byteorder="little")
|
||
|
try:
|
||
|
out += int(img["offset"], 0).to_bytes(8, byteorder="little")
|
||
|
except ValueError as err:
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
out += img["type"].to_bytes(4, byteorder="little")
|
||
|
out += b'\xFF' * 4
|
||
|
out += data
|
||
|
# Always put image start @ 16 byte boundary
|
||
|
pad = 16 - (img_len + 8) & 0xF
|
||
|
if pad != 16:
|
||
|
out += b'\xFF' * pad
|
||
|
|
||
|
# Fill image count
|
||
|
out[8:12] = img_cnt.to_bytes(4, byteorder="little")
|
||
|
|
||
|
# Fill CRC field
|
||
|
crc32_func = crcmod.predefined.mkCrcFun('crc-32')
|
||
|
checksum = crc32_func(out[8:])
|
||
|
out[4:8] = checksum.to_bytes(4, byteorder="little")
|
||
|
|
||
|
pack_file.write(out)
|
||
|
pack_file.close()
|
||
|
try:
|
||
|
os.unlink("pack")
|
||
|
except (IOError, OSError):
|
||
|
pass
|
||
|
try:
|
||
|
os.symlink(now.strftime("%m%d-%H%M%S%f"), "pack")
|
||
|
except (IOError, OSError):
|
||
|
print("Create symbolic folder pack failed")
|
||
|
print("Generate pack file in directory {} complete".format(now.strftime("%m%d-%H%M%S%f")))
|
||
|
|
||
|
|
||
|
def do_showhdr(cfg_file) -> None:
|
||
|
try:
|
||
|
header_file = open(cfg_file, "br")
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {cfg_file} failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
if unpack('<I', header_file.read(4))[0] != 0x4E565420:
|
||
|
print("Header checker error, not a valid header image")
|
||
|
header_file.close()
|
||
|
return
|
||
|
|
||
|
checksum0 = unpack('<I', header_file.read(4))[0]
|
||
|
buf = header_file.read()
|
||
|
crc32_func = crcmod.predefined.mkCrcFun('crc-32')
|
||
|
checksum1 = crc32_func(buf)
|
||
|
if checksum1 != checksum0:
|
||
|
print("Checksum is incorrect")
|
||
|
print(f"Expect {checksum1}, get {checksum0}")
|
||
|
header_file.close()
|
||
|
return
|
||
|
header_file.seek(8)
|
||
|
print("Length: " + str(unpack('<I', header_file.read(4))[0]))
|
||
|
print("Version: " + hex(unpack('<I', header_file.read(4))[0]))
|
||
|
print("==== SPI INFO ====")
|
||
|
print("Page size: " + str(unpack('<H', header_file.read(2))[0]))
|
||
|
print("Spare area size: " + str(unpack('<H', header_file.read(2))[0]))
|
||
|
print("Page per block: " + str(unpack('<H', header_file.read(2))[0]))
|
||
|
print("Quad read cmd: " + hex(unpack('<B', header_file.read(1))[0]))
|
||
|
print("Read status cmd: " + hex(unpack('<B', header_file.read(1))[0]))
|
||
|
print("Write status cmd: " + hex(unpack('<B', header_file.read(1))[0]))
|
||
|
print("Status Value: " + hex(unpack('<B', header_file.read(1))[0]))
|
||
|
print("Dummy byte 1: " + str(unpack('<B', header_file.read(1))[0]))
|
||
|
print("Dummy byte 2: " + str(unpack('<B', header_file.read(1))[0]))
|
||
|
print("Suspend interval: " + str(unpack('<B', header_file.read(1))[0]))
|
||
|
header_file.read(3) # Skip three dummy bytes
|
||
|
print("==== SPI INFO ====")
|
||
|
print("Entry point: " + hex(unpack('<I', header_file.read(4))[0]))
|
||
|
count = unpack('<I', header_file.read(4))[0]
|
||
|
print(f"Image count: {count}")
|
||
|
for i in range(0, count):
|
||
|
print(f"==== Image {i} ====")
|
||
|
print("Offset: " + hex(unpack('<I', header_file.read(4))[0]))
|
||
|
print("Load addr: " + hex(unpack('<I', header_file.read(4))[0]))
|
||
|
print("Size: " + hex(unpack('<I', header_file.read(4))[0]))
|
||
|
print("Type: " + str(unpack('<I', header_file.read(4))[0]))
|
||
|
print("R: ", end='')
|
||
|
for _ in range(0, 32):
|
||
|
print(format(unpack('<B', header_file.read(1))[0], 'x'), end='')
|
||
|
print("")
|
||
|
print("S: ", end='')
|
||
|
for _ in range(0, 32):
|
||
|
print(format(unpack('<B', header_file.read(1))[0], 'x'), end='')
|
||
|
print("")
|
||
|
header_file.close()
|
||
|
|
||
|
|
||
|
def do_convert(cfg_file, option=OPT_NONE) -> None:
|
||
|
now = datetime.now()
|
||
|
|
||
|
try:
|
||
|
with open(cfg_file, "r") as json_file:
|
||
|
try:
|
||
|
d = json.load(json_file)
|
||
|
except json.decoder.JSONDecodeError as err:
|
||
|
print(f"{cfg_file} parsing error")
|
||
|
sys.exit(err)
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {cfg_file} failed")
|
||
|
sys.exit(err)
|
||
|
try:
|
||
|
os.mkdir(now.strftime("%m%d-%H%M%S%f"))
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Create output directory failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
if "header" in d:
|
||
|
out = bytearray(b'\x20\x54\x56\x4e' + b'\x00' * 8) # NVT + CRC + LEN
|
||
|
try:
|
||
|
out += int(d["header"]["version"], 0).to_bytes(4, byteorder="little")
|
||
|
# Fill SPI flash info
|
||
|
out += int(d["header"]["spiinfo"]["pagesize"], 0).to_bytes(2, byteorder="little")
|
||
|
out += int(d["header"]["spiinfo"]["sparearea"], 0).to_bytes(2, byteorder="little")
|
||
|
out += int(d["header"]["spiinfo"]["pageperblk"], 0).to_bytes(2, byteorder="little")
|
||
|
out += int(d["header"]["spiinfo"]["quadread"], 0).to_bytes(1, byteorder="little")
|
||
|
out += int(d["header"]["spiinfo"]["readsts"], 0).to_bytes(1, byteorder="little")
|
||
|
out += int(d["header"]["spiinfo"]["writests"], 0).to_bytes(1, byteorder="little")
|
||
|
out += int(d["header"]["spiinfo"]["stsvalue"], 0).to_bytes(1, byteorder="little")
|
||
|
out += int(d["header"]["spiinfo"]["dummy1"], 0).to_bytes(1, byteorder="little")
|
||
|
out += int(d["header"]["spiinfo"]["dummy2"], 0).to_bytes(1, byteorder="little")
|
||
|
out += int(d["header"]["spiinfo"]["suspintvl"], 0).to_bytes(1, byteorder="little")
|
||
|
out += b'\xFF' * 3 # 3 reserved bytes
|
||
|
out += int(d["header"]["entrypoint"], 0).to_bytes(4, byteorder="little")
|
||
|
except ValueError as err:
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
|
||
|
out += b'\xFF' * 4 # Reserve 4 bytes for image count
|
||
|
|
||
|
# Generate key file iff secure boot is enabled
|
||
|
if d["header"]["secureboot"] == 'yes':
|
||
|
|
||
|
try:
|
||
|
key_file = open(now.strftime("%m%d-%H%M%S%f") + "/header_key.txt", "w+")
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Create key file failed")
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
|
||
|
if "aeskey" in d["header"]:
|
||
|
try:
|
||
|
aeskey = bytes.fromhex(d["header"]["aeskey"])
|
||
|
except ValueError as err:
|
||
|
sys.exit(err)
|
||
|
else:
|
||
|
aeskey = ''.join(['%x' % random.randrange(16) for _ in range(0, 64)])
|
||
|
aeskey = binascii.unhexlify(bytes(aeskey, 'utf-8'))
|
||
|
key_file.write("AES key:\n" + str.upper(aeskey.hex()))
|
||
|
|
||
|
if "ecdsakey" in d["header"]:
|
||
|
try:
|
||
|
sk = ecdsa.SigningKey.from_string(bytes.fromhex(d["header"]["ecdsakey"]),
|
||
|
curve=ecdsa.NIST256p,
|
||
|
hashfunc=hashlib.sha256)
|
||
|
except ValueError as err:
|
||
|
sys.exit(err)
|
||
|
else:
|
||
|
sk = ecdsa.SigningKey.generate(curve=ecdsa.NIST256p, hashfunc=hashlib.sha256)
|
||
|
key_file.write("\nECDSA private key:\n" + str.upper(sk.to_string().hex()))
|
||
|
|
||
|
vk = sk.verifying_key
|
||
|
key_file.write("\nECDSA public key:\n" + format(vk.pubkey.point.x(), 'X') +
|
||
|
"\n" + format(vk.pubkey.point.y(), 'X') + "\n")
|
||
|
|
||
|
key_file.close()
|
||
|
|
||
|
img_cnt = len(d["header"]["image"])
|
||
|
if img_cnt > MAX_HEADER_IMG:
|
||
|
print("Can process 4 images in header max")
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(2)
|
||
|
|
||
|
# Fill image information
|
||
|
for i in range(img_cnt):
|
||
|
img = d["header"]["image"][i]
|
||
|
try:
|
||
|
with open(img["file"], "rb") as img_file:
|
||
|
data = img_file.read()
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Open image file failed")
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
try:
|
||
|
out += int(img["offset"], 0).to_bytes(4, byteorder="little")
|
||
|
out += int(img["loadaddr"], 0).to_bytes(4, byteorder="little")
|
||
|
out += os.path.getsize(img["file"]).to_bytes(4, byteorder="little")
|
||
|
out += int(img["type"]).to_bytes(4, byteorder="little")
|
||
|
except ValueError as err:
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
|
||
|
if d["header"]["secureboot"] == 'yes':
|
||
|
|
||
|
# Use CFB and each image is process independently, so call new() for every image
|
||
|
aes_enc = AES.new(aeskey, AES.MODE_CFB, b'\x00' * 16, segment_size=128)
|
||
|
data_out = aes_enc.encrypt(data)
|
||
|
# R & S
|
||
|
out += sk.sign(data_out)
|
||
|
|
||
|
# Write encrypt image
|
||
|
try:
|
||
|
with open(now.strftime("%m%d-%H%M%S%f") + '/enc_' +
|
||
|
os.path.basename(img["file"]), "wb") as enc_file:
|
||
|
enc_file.write(data_out)
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Create encrypt file failed")
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
|
||
|
else:
|
||
|
out += b'\xFF' * 64 # Just pack 0xFF if secure boot is disabled
|
||
|
|
||
|
# Fill header length
|
||
|
out[8:12] = int(len(out) - 8).to_bytes(4, byteorder="little")
|
||
|
# Fill image count
|
||
|
out[36:40] = img_cnt.to_bytes(4, byteorder="little")
|
||
|
# Fill header checksum
|
||
|
crc32_func = crcmod.predefined.mkCrcFun('crc-32')
|
||
|
out[4:8] = crc32_func(out[8:]).to_bytes(4, byteorder="little")
|
||
|
|
||
|
try:
|
||
|
with open(now.strftime("%m%d-%H%M%S%f") + "/header.bin", "wb") as header_file:
|
||
|
header_file.write(out)
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Create header file failed")
|
||
|
sys.exit(err)
|
||
|
|
||
|
if "env" in d:
|
||
|
try:
|
||
|
with open(now.strftime("%m%d-%H%M%S%f") + "/uboot-env.bin", "wb") as out_file:
|
||
|
out_file.write(conv_env(d["env"]["file"], int(d["env"]["blksize"], 0)))
|
||
|
except (IOError, OSError, ValueError) as err:
|
||
|
print("Create header file failed")
|
||
|
sys.exit(err)
|
||
|
# Misc images
|
||
|
if "data" in d:
|
||
|
try:
|
||
|
key_file = open(now.strftime("%m%d-%H%M%S%f") + "/data_key.txt", "w+")
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Create key file failed")
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
|
||
|
if "aeskey" in d["data"]:
|
||
|
try:
|
||
|
aeskey = bytes.fromhex(d["data"]["aeskey"])
|
||
|
except ValueError as err:
|
||
|
sys.exit(err)
|
||
|
else:
|
||
|
aeskey = ''.join(['%x' % random.randrange(16) for _ in range(0, 64)])
|
||
|
aeskey = binascii.unhexlify(bytes(aeskey, 'utf-8'))
|
||
|
key_file.write("AES key:\n" + str.upper(aeskey.hex()))
|
||
|
|
||
|
if "ecdsakey" in d["data"]:
|
||
|
try:
|
||
|
sk = ecdsa.SigningKey.from_string(bytes.fromhex(d["data"]["ecdsakey"]),
|
||
|
curve=ecdsa.NIST256p,
|
||
|
hashfunc=hashlib.sha256)
|
||
|
except ValueError as err:
|
||
|
sys.exit(err)
|
||
|
else:
|
||
|
sk = ecdsa.SigningKey.generate(curve=ecdsa.NIST256p, hashfunc=hashlib.sha256)
|
||
|
key_file.write("\nECDSA private key:\n" + str.upper(sk.to_string().hex()))
|
||
|
|
||
|
vk = sk.verifying_key
|
||
|
key_file.write("\nECDSA public key:\n" + format(vk.pubkey.point.x(), 'X') +
|
||
|
"\n" + format(vk.pubkey.point.y(), 'X') + "\n")
|
||
|
key_file.close()
|
||
|
|
||
|
for img in d["data"]["image"]:
|
||
|
try:
|
||
|
with open(img["file"], "rb") as img_file:
|
||
|
data = img_file.read()
|
||
|
except (IOError, OSError) as err:
|
||
|
print(f"Open {img_file} failed")
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
|
||
|
aes_enc = AES.new(aeskey, AES.MODE_CFB, b'\x00' * 16, segment_size=128)
|
||
|
data_out = aes_enc.encrypt(data)
|
||
|
signature = sk.sign(data_out)
|
||
|
try:
|
||
|
if option is OPT_CONCAT:
|
||
|
# Append the R & S at the end of signed image instead of writing to a separate file.
|
||
|
# The signed image could be used in deployed mode
|
||
|
data_out += signature
|
||
|
else:
|
||
|
with open(now.strftime("%m%d-%H%M%S%f") + '/sig_' + img["file"], "wb") as sig_file:
|
||
|
sig_file.write(signature) # R & S
|
||
|
|
||
|
with open(now.strftime("%m%d-%H%M%S%f") + '/enc_' + img["file"], "wb") as enc_file:
|
||
|
enc_file.write(data_out)
|
||
|
|
||
|
except (IOError, OSError) as err:
|
||
|
print("Create encrypt/signature file failed")
|
||
|
shutil.rmtree(now.strftime("%m%d-%H%M%S%f"))
|
||
|
sys.exit(err)
|
||
|
try:
|
||
|
os.unlink("conv")
|
||
|
except (IOError, OSError):
|
||
|
pass
|
||
|
try:
|
||
|
os.symlink(now.strftime("%m%d-%H%M%S%f"), "conv")
|
||
|
except (IOError, OSError):
|
||
|
print("Create symbolic folder conv failed")
|
||
|
print("Generate output image(s) in directory {} complete".format(now.strftime("%m%d-%H%M%S%f")))
|
||
|
|
||
|
|
||
|
def __msc(dev, media, reserve, option) -> int:
|
||
|
|
||
|
cmd = reserve.to_bytes(8, byteorder='little')
|
||
|
cmd += b'\x00' * 8
|
||
|
cmd += ACT_MSC.to_bytes(4, byteorder='little')
|
||
|
cmd += option.to_bytes(4, byteorder='little')
|
||
|
|
||
|
dev.set_media(media)
|
||
|
dev.write(cmd)
|
||
|
ack = dev.read(4)
|
||
|
if int.from_bytes(ack, byteorder="little") != ACK:
|
||
|
print("Receive ACK error")
|
||
|
return -1
|
||
|
return 0
|
||
|
|
||
|
|
||
|
def do_msc(media, reserve, option=OPT_NONE) -> None:
|
||
|
global mp_mode
|
||
|
|
||
|
# devices = XUsbComList(attach_all=mp_mode).get_dev()
|
||
|
_XUsbComList = XUsbComList(attach_all=mp_mode)
|
||
|
devices = _XUsbComList.get_dev()
|
||
|
|
||
|
if len(devices) == 0:
|
||
|
print("Device not found")
|
||
|
sys.exit(2)
|
||
|
|
||
|
with ThreadPoolExecutor(max_workers=8) as executor:
|
||
|
futures = [executor.submit(__msc, dev, media, reserve, option) for dev in devices]
|
||
|
success = 0
|
||
|
failed = 0
|
||
|
for future in as_completed(futures):
|
||
|
if future.result() == 0:
|
||
|
success += 1
|
||
|
else:
|
||
|
failed += 1
|
||
|
|
||
|
print("Successfully {} {} MSC device(s)".format("set" if option == "OPT_NONE" else "eject", success))
|
||
|
if failed > 0:
|
||
|
print("Failed to {} {} MSC device(s)".format("set" if option == "OPT_NONE" else "eject", failed))
|
||
|
|
||
|
|
||
|
def get_media(media) -> int:
|
||
|
media = str.upper(media)
|
||
|
return {
|
||
|
'DDR': DEV_DDR_SRAM,
|
||
|
'SRAM': DEV_DDR_SRAM,
|
||
|
'SD': DEV_SD_EMMC,
|
||
|
'EMMC': DEV_SD_EMMC,
|
||
|
'NAND': DEV_NAND,
|
||
|
'SPINAND': DEV_SPINAND,
|
||
|
'SPINOR': DEV_SPINOR,
|
||
|
'OTP': DEV_OTP,
|
||
|
'USBH': DEV_USBH
|
||
|
}.get(media, DEV_UNKNOWN)
|
||
|
|
||
|
|
||
|
def get_option(option) -> int:
|
||
|
option = str.upper(option)
|
||
|
return {
|
||
|
'SCRUB': OPT_SCRUB,
|
||
|
'WITHBAD': OPT_WITHBAD,
|
||
|
'VERIFY': OPT_VERIFY,
|
||
|
'EXECUTE': OPT_EXECUTE,
|
||
|
'UNPACK': OPT_UNPACK,
|
||
|
'RAW': OPT_RAW,
|
||
|
'EJECT': OPT_EJECT,
|
||
|
'STUFF': OPT_STUFF,
|
||
|
'SETINFO': OPT_SETINFO,
|
||
|
'CONCAT': OPT_CONCAT,
|
||
|
'SHOWHDR': OPT_SHOWHDR
|
||
|
}.get(option, OPT_UNKNOWN)
|
||
|
|
||
|
|
||
|
def get_type(img_type) -> int:
|
||
|
img_type = str.upper(img_type)
|
||
|
return {
|
||
|
'TFA': IMG_TFA,
|
||
|
'UBOOT': IMG_UBOOT,
|
||
|
'LINUX': IMG_LINUX,
|
||
|
'DDR': IMG_DDR,
|
||
|
'TEE': IMG_TEE
|
||
|
}.get(img_type, IMG_DATA)
|
||
|
|
||
|
def get_otpblock(num) -> int:
|
||
|
num = str.upper(num)
|
||
|
return {
|
||
|
'OTP1': OPT_OTPBLK1,
|
||
|
'OTP2': OPT_OTPBLK2,
|
||
|
'OTP3': OPT_OTPBLK3,
|
||
|
'OTP4': OPT_OTPBLK4,
|
||
|
'OTP5': OPT_OTPBLK5,
|
||
|
'OTP6': OPT_OTPBLK6,
|
||
|
'OTP7': OPT_OTPBLK7,
|
||
|
'KEY0': OPT_OTPKEY0+OPT_OTPKEY,
|
||
|
'KEY1': OPT_OTPKEY1+OPT_OTPKEY,
|
||
|
'KEY2': OPT_OTPKEY2+OPT_OTPKEY,
|
||
|
'KEY3': OPT_OTPKEY3+OPT_OTPKEY,
|
||
|
'KEY4': OPT_OTPKEY4+OPT_OTPKEY,
|
||
|
'KEY5': OPT_OTPKEY5+OPT_OTPKEY
|
||
|
}.get(num, OPT_UNKNOWN)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
parser = argparse.ArgumentParser()
|
||
|
|
||
|
parser.add_argument("CONFIG", nargs='?', help="Config file", type=str, default='')
|
||
|
parser.add_argument("-a", "--attach", action='store_true', help="Attach to MA35D1")
|
||
|
parser.add_argument("-o", "--option", nargs='+', help="Option flag")
|
||
|
parser.add_argument("-t", "--type", nargs='+', help="Type flag")
|
||
|
group = parser.add_mutually_exclusive_group()
|
||
|
group.add_argument("-c", "--convert", action='store_true', help="Convert images")
|
||
|
group.add_argument("-p", "--pack", action='store_true', help="Generate pack file")
|
||
|
group.add_argument("-v", "--version", action='store_true', help="Show version number")
|
||
|
group.add_argument("-r", "--read", nargs='+', help="Read flash")
|
||
|
group.add_argument("-w", "--write", nargs='+', help="Write flash")
|
||
|
group.add_argument("-e", "--erase", nargs='+', help="Erase flash")
|
||
|
group.add_argument("-s", "--storage", nargs='+', help="Export eMMC/SD as Mass Storage Class")
|
||
|
|
||
|
if len(sys.argv) == 1:
|
||
|
parser.print_help()
|
||
|
sys.exit(0)
|
||
|
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
if args.option:
|
||
|
option = get_option(args.option[0])
|
||
|
else:
|
||
|
option = OPT_NONE
|
||
|
|
||
|
if option is OPT_UNKNOWN:
|
||
|
print("Unknown option: " + args.option[0])
|
||
|
sys.exit(0)
|
||
|
# if args.type:
|
||
|
# img_type = get_type(args.type[0])
|
||
|
# else:
|
||
|
# img_type = IMG_DATA
|
||
|
|
||
|
cfg_file = args.CONFIG
|
||
|
|
||
|
if args.attach:
|
||
|
if not cfg_file:
|
||
|
print("Please assign a DDR ini file")
|
||
|
sys.exit(0)
|
||
|
do_attach(cfg_file, option)
|
||
|
|
||
|
if args.convert:
|
||
|
if cfg_file == '':
|
||
|
print("No config file assigned")
|
||
|
sys.exit(0)
|
||
|
else:
|
||
|
if option == OPT_SHOWHDR:
|
||
|
do_showhdr(cfg_file)
|
||
|
else:
|
||
|
do_convert(cfg_file, option)
|
||
|
elif args.pack:
|
||
|
if cfg_file == '':
|
||
|
print("No config file assigned")
|
||
|
sys.exit(0)
|
||
|
else:
|
||
|
if option == OPT_UNPACK:
|
||
|
do_unpack(cfg_file)
|
||
|
elif option == OPT_STUFF:
|
||
|
do_stuff(cfg_file)
|
||
|
else:
|
||
|
do_pack(cfg_file)
|
||
|
elif args.read:
|
||
|
# -r spinor all out.bin
|
||
|
# -r nand 0x1000 0x100 out.bin
|
||
|
# -r otp all out.bin (block1 ~ block7)
|
||
|
# -r otp blockno 0x4 out.bin
|
||
|
arg_count = len(args.read)
|
||
|
if arg_count < 3:
|
||
|
print("At lease take 3 arguments")
|
||
|
sys.exit(0)
|
||
|
media = get_media(args.read[0])
|
||
|
|
||
|
try:
|
||
|
if media == DEV_UNKNOWN:
|
||
|
raise ValueError(f"Cannot support read {str.upper(args.read[0])}")
|
||
|
if arg_count == 3 and str.upper(args.read[1]) != 'ALL':
|
||
|
raise ValueError("Unknown arguments")
|
||
|
except ValueError as err:
|
||
|
sys.exit(err)
|
||
|
|
||
|
if str.upper(args.read[1]) == 'ALL':
|
||
|
if media == DEV_OTP:
|
||
|
option |= 0x3fff00
|
||
|
do_otp_read(media, 0, args.read[2], 352, option)
|
||
|
else:
|
||
|
do_img_read(media, 0, args.read[2], 0, option)
|
||
|
else:
|
||
|
try:
|
||
|
if media == DEV_OTP:
|
||
|
option = get_otpblock(args.read[1]) | option
|
||
|
else:
|
||
|
start = int(args.read[1], 0)
|
||
|
length = int(args.read[2], 0)
|
||
|
except ValueError as err:
|
||
|
print("Wrong start/length value")
|
||
|
sys.exit(err)
|
||
|
|
||
|
if media == DEV_OTP:
|
||
|
do_otp_read(media, 0, args.read[3], length, option)
|
||
|
else:
|
||
|
do_img_read(media, start, args.read[3], length, option)
|
||
|
|
||
|
elif args.write:
|
||
|
# -w spinor 0x1000 image.bin
|
||
|
# -w otp otp.json
|
||
|
# -w nand pack.img
|
||
|
arg_count = len(args.write)
|
||
|
if arg_count < 2:
|
||
|
print("At lease take 2 arguments")
|
||
|
sys.exit(0)
|
||
|
media = get_media(args.write[0])
|
||
|
|
||
|
try:
|
||
|
if media == DEV_UNKNOWN:
|
||
|
raise ValueError(f"Unknown storage media {str.upper(args.write[0])}")
|
||
|
if option == OPT_VERIFY and media == DEV_OTP:
|
||
|
raise ValueError(f"Do not support verify option on {str.upper(args.write[0])}")
|
||
|
if option == OPT_EXECUTE and media != DEV_DDR_SRAM:
|
||
|
raise ValueError(f"Do not support execution on {str.upper(args.write[0])}")
|
||
|
if option == OPT_RAW and media != DEV_NAND:
|
||
|
raise ValueError(f"Do not support raw write on {str.upper(args.write[0])}")
|
||
|
except ValueError as err:
|
||
|
sys.exit(err)
|
||
|
|
||
|
if arg_count == 2:
|
||
|
if media == DEV_OTP:
|
||
|
do_otp_program(args.write[1])
|
||
|
else:
|
||
|
do_pack_program(media, args.write[1], option)
|
||
|
else:
|
||
|
try:
|
||
|
start = int(args.write[1], 0)
|
||
|
except ValueError as err:
|
||
|
print("Wrong start value")
|
||
|
sys.exit(err)
|
||
|
do_img_program(media, start, args.write[2], option)
|
||
|
|
||
|
elif args.erase:
|
||
|
# -e spinor all
|
||
|
# -e nand 0x100000 0x10000 -o withbad
|
||
|
# -e otp blockno
|
||
|
arg_count = len(args.erase)
|
||
|
if arg_count < 2:
|
||
|
print("At lease take 2 arguments")
|
||
|
sys.exit(0)
|
||
|
media = get_media(args.erase[0])
|
||
|
|
||
|
try:
|
||
|
if media in [DEV_DDR_SRAM, DEV_SD_EMMC, DEV_UNKNOWN]:
|
||
|
raise ValueError(f"{str.upper(args.erase[0])} does not support erase")
|
||
|
if arg_count == 2 and str.upper(args.erase[1]) != 'ALL' and media != DEV_OTP:
|
||
|
raise ValueError("Unknown arguments")
|
||
|
if str.upper(args.erase[1]) == 'ALL' and media == DEV_OTP:
|
||
|
raise ValueError("Wrong arguments")
|
||
|
except ValueError as err:
|
||
|
sys.exit(err)
|
||
|
|
||
|
if media == DEV_OTP:
|
||
|
option = get_otpblock(args.erase[1])
|
||
|
try:
|
||
|
# only can erase block 1, 3, 4
|
||
|
if option == OPT_OTPBLK2:
|
||
|
raise ValueError(f"Error block 2, only erase block 1, 3, 4")
|
||
|
if option == OPT_OTPBLK5:
|
||
|
raise ValueError(f"Error block 5, only erase block 1, 3, 4")
|
||
|
if option == OPT_OTPBLK6:
|
||
|
raise ValueError(f"Error block 6, only erase block 1, 3, 4")
|
||
|
if option == OPT_OTPBLK7:
|
||
|
raise ValueError(f"Error block 7, only erase block 1, 3, 4")
|
||
|
if option == OPT_UNKNOWN:
|
||
|
raise ValueError(f"Error key number")
|
||
|
except ValueError as err:
|
||
|
sys.exit(err)
|
||
|
do_otp_erase(option)
|
||
|
else:
|
||
|
if str.upper(args.erase[1]) == 'ALL':
|
||
|
do_img_erase(media, 0, 0, option)
|
||
|
else:
|
||
|
try:
|
||
|
start = int(args.erase[1], 0)
|
||
|
length = int(args.erase[2], 0)
|
||
|
except ValueError as err:
|
||
|
print("Wrong start/length value")
|
||
|
sys.exit(err)
|
||
|
do_img_erase(media, start, length, option)
|
||
|
|
||
|
elif args.storage:
|
||
|
# -s emmc 0x800000
|
||
|
# -s emmc -o remove
|
||
|
arg_count = len(args.erase)
|
||
|
if arg_count != 2 and option != OPT_EJECT:
|
||
|
print("Takes 2 arguments. Storage device and reserved size")
|
||
|
sys.exit(0)
|
||
|
media = get_media(args.storage[0])
|
||
|
try:
|
||
|
if media not in [DEV_SD_EMMC]:
|
||
|
raise ValueError("Only support eMMC/SD")
|
||
|
if option != OPT_NONE and option != OPT_EJECT:
|
||
|
raise ValueError("Unsupported option")
|
||
|
except ValueError as err:
|
||
|
sys.exit(err)
|
||
|
|
||
|
if option == OPT_EJECT:
|
||
|
do_msc(media, 0, OPT_EJECT)
|
||
|
else:
|
||
|
try:
|
||
|
reserve = int(args.storage[1], 0)
|
||
|
except ValueError as err:
|
||
|
print("Wrong reserve size")
|
||
|
sys.exit(err)
|
||
|
do_msc(media, reserve)
|
||
|
elif args.version:
|
||
|
print('NuWriter ' + __version__)
|
||
|
print(__copyright__)
|
||
|
|
||
|
|
||
|
# Here goes the main function
|
||
|
if __name__ == "__main__":
|
||
|
#os.system("start cmd.exe") #Call do it. Will open cmd window in each process
|
||
|
main()
|