484 lines
18 KiB
Python
484 lines
18 KiB
Python
"""SRIO Flash operations (erase/write/read) via TFTP transport.
|
|
|
|
This module implements the flash control protocol derived from dwl_fw.cpp,
|
|
using TFTP for SRIO register read/write operations.
|
|
|
|
Reference: _OLD/linux_app/dwl_fw.cpp and dwl_fw.h
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import struct
|
|
import time
|
|
from typing import Callable, Optional
|
|
|
|
from pydownloadfwviasrio.tftp_client import SRIOTFTPClient
|
|
|
|
|
|
# SRIO Register addresses (from dwl_fw.h)
|
|
MODE_REG = 0x4700002C
|
|
CMD_REG = 0x47000030
|
|
ADDR_REG = 0x47000034
|
|
NUM_BYTE_REG = 0x47000038
|
|
CTRL_REG = 0x47000060
|
|
TX_FIFO_REG = 0x47000400
|
|
STATUS_REG = 0x47000864
|
|
RX_FIFO_REG = 0x47000C00
|
|
|
|
|
|
class SRIOFlashController:
|
|
"""Low-level SRIO flash controller using TFTP transport.
|
|
|
|
Implements the register-level protocol from dwl_fw.cpp.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
client: SRIOTFTPClient,
|
|
slot_address: str,
|
|
flashport: int = 0, # 0 = primary, 8 = secondary (from dwl_fw.cpp)
|
|
max_verify_retries: int = 500,
|
|
) -> None:
|
|
"""Initialize SRIO flash controller.
|
|
|
|
Args:
|
|
client: TFTP client for SRIO communication.
|
|
slot_address: SRIO slot/endpoint (e.g., "0x13").
|
|
flashport: Flash port offset (0 or 8).
|
|
max_verify_retries: Default max retries for register write verification.
|
|
"""
|
|
self.client = client
|
|
self.slot_address = slot_address
|
|
self.flashport = flashport
|
|
self.default_max_verify_retries = max_verify_retries
|
|
self.abort_check: Optional[Callable[[], bool]] = None # Set by operations
|
|
|
|
def set_abort_check(self, abort_check: Optional[Callable[[], bool]]) -> None:
|
|
"""Set abort check callback for current operation.
|
|
|
|
Args:
|
|
abort_check: Callback that returns True if operation should abort.
|
|
"""
|
|
self.abort_check = abort_check
|
|
|
|
def _write_reg(
|
|
self,
|
|
address: int,
|
|
value: int,
|
|
log: Optional[Callable[[str], None]] = None,
|
|
description: str = "",
|
|
max_verify_retries: Optional[int] = None,
|
|
verify_timeout_us: int = 1000,
|
|
) -> bool:
|
|
"""Write 32-bit value to SRIO register with read-back verification.
|
|
|
|
Implements the same retry logic as dwl_fw.cpp srio_write_reg() (line 109-177):
|
|
- Writes value to register
|
|
- Reads back and verifies (except for TX_FIFO which is write-only)
|
|
- Retries up to max_verify_retries times if value doesn't match
|
|
|
|
Args:
|
|
address: Register address.
|
|
value: 32-bit value to write.
|
|
log: Optional logging callback for TFTP commands.
|
|
description: Optional description of operation.
|
|
max_verify_retries: Max attempts to verify write (uses default if None).
|
|
verify_timeout_us: Microseconds delay between verify attempts.
|
|
|
|
Returns:
|
|
True on success (value verified or TX_FIFO write).
|
|
"""
|
|
if max_verify_retries is None:
|
|
max_verify_retries = self.default_max_verify_retries
|
|
|
|
# Pack value as 4 bytes (little-endian)
|
|
data = struct.pack('<I', value)
|
|
|
|
# Retry loop with read-back verification (dwl_fw.cpp line 134-172)
|
|
for attempt in range(max_verify_retries):
|
|
# Check abort before each attempt
|
|
if self.abort_check and self.abort_check():
|
|
return False
|
|
|
|
# Write value
|
|
if not self.client.srio_write(self.slot_address, address, data, log=log, description=description):
|
|
if attempt < max_verify_retries - 1:
|
|
if log:
|
|
log(f" ↻ Retry {attempt+1}/{max_verify_retries}: TFTP write failed, retrying...")
|
|
time.sleep(verify_timeout_us / 1_000_000.0)
|
|
continue
|
|
return False
|
|
|
|
# Exception: TX_FIFO_REG is write-only, no verification (dwl_fw.cpp line 156)
|
|
if address == TX_FIFO_REG:
|
|
return True
|
|
|
|
# Read-back verification (dwl_fw.cpp line 158-164)
|
|
read_value = self._read_reg_no_verify(address, log=log)
|
|
if read_value is not None and read_value == value:
|
|
if attempt > 0 and log: # Log solo se c'è stato almeno 1 retry
|
|
log(f" ✓ Verified after {attempt+1} attempt(s)")
|
|
return True # Verified successfully!
|
|
|
|
# Value mismatch, retry after delay (dwl_fw.cpp line 170)
|
|
if attempt < max_verify_retries - 1:
|
|
if log:
|
|
log(f" ↻ Retry {attempt+1}/{max_verify_retries}: Read-back mismatch (got 0x{read_value:08X}, expected 0x{value:08X})")
|
|
time.sleep(verify_timeout_us / 1_000_000.0)
|
|
|
|
# Max retries exhausted (dwl_fw.cpp line 173)
|
|
if log:
|
|
log(f"⚠️ Write verify FAILED after {max_verify_retries} attempts: addr=0x{address:08X}, expected=0x{value:08X}")
|
|
return False
|
|
|
|
def _read_reg_no_verify(
|
|
self,
|
|
address: int,
|
|
log: Optional[Callable[[str], None]] = None,
|
|
) -> Optional[int]:
|
|
"""Read 32-bit value from SRIO register (internal, no description log).
|
|
|
|
Used for read-back verification in _write_reg.
|
|
|
|
Args:
|
|
address: Register address.
|
|
log: Optional logging callback for TFTP commands.
|
|
|
|
Returns:
|
|
32-bit value or None on failure.
|
|
"""
|
|
data = self.client.srio_read(self.slot_address, address, 4, log=log, description="")
|
|
if data and len(data) >= 4:
|
|
return struct.unpack('<I', data[:4])[0]
|
|
return None
|
|
|
|
def _read_reg(
|
|
self,
|
|
address: int,
|
|
log: Optional[Callable[[str], None]] = None,
|
|
description: str = "",
|
|
) -> Optional[int]:
|
|
"""Read 32-bit value from SRIO register.
|
|
|
|
Args:
|
|
address: Register address.
|
|
log: Optional logging callback for TFTP commands.
|
|
description: Optional description of operation.
|
|
|
|
Returns:
|
|
32-bit value or None on failure.
|
|
"""
|
|
data = self.client.srio_read(self.slot_address, address, 4, log=log, description=description)
|
|
if data and len(data) >= 4:
|
|
return struct.unpack('<I', data[:4])[0]
|
|
return None
|
|
|
|
def _wait_status_bit(
|
|
self,
|
|
bit_pos: int,
|
|
expected_value: int,
|
|
max_retries: int = 500,
|
|
timeout_us: int = 1000,
|
|
log: Optional[Callable[[str], None]] = None,
|
|
) -> bool:
|
|
"""Wait for specific bit in STATUS_REG to reach expected value.
|
|
|
|
Args:
|
|
bit_pos: Bit position to check (0-31).
|
|
expected_value: Expected bit value (0 or 1).
|
|
max_retries: Maximum retry attempts.
|
|
timeout_us: Microseconds between retries.
|
|
log: Optional logging callback.
|
|
|
|
Returns:
|
|
True if bit reached expected value, False on timeout.
|
|
"""
|
|
for i in range(max_retries):
|
|
# Check abort before each poll
|
|
if self.abort_check and self.abort_check():
|
|
return False
|
|
|
|
status = self._read_reg(STATUS_REG, log=log, description=f"STATUS_REG (poll bit{bit_pos}→{expected_value})")
|
|
if status is not None:
|
|
bit_value = (status >> bit_pos) & 0x01
|
|
if bit_value == expected_value:
|
|
return True
|
|
time.sleep(timeout_us / 1_000_000.0)
|
|
|
|
return False
|
|
|
|
def start_request(self, log: Optional[Callable[[str], None]] = None) -> bool:
|
|
"""START_REQUEST macro (from dwl_fw.cpp line 237)."""
|
|
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01 (START)"):
|
|
return False
|
|
return self._wait_status_bit(11, 1, log=log)
|
|
|
|
def end_request(self, log: Optional[Callable[[str], None]] = None) -> bool:
|
|
"""END_REQUEST macro (from dwl_fw.cpp line 248)."""
|
|
# Check error bits
|
|
for bit in [1, 4, 5, 6, 10]:
|
|
if not self._wait_status_bit(bit, 0, log=log):
|
|
return False
|
|
|
|
if not self._write_reg(CTRL_REG, 0x00, log, "CTRL_REG = 0x00 (END)"):
|
|
return False
|
|
return self._wait_status_bit(11, 0, log=log)
|
|
|
|
def send_request(self, log: Optional[Callable[[str], None]] = None) -> bool:
|
|
"""SEND_REQUEST macro (from dwl_fw.cpp line 282)."""
|
|
if not self.start_request(log):
|
|
return False
|
|
if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (EXECUTE)"):
|
|
return False
|
|
if not self._wait_status_bit(12, 1, log=log):
|
|
return False
|
|
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"):
|
|
return False
|
|
if not self._wait_status_bit(12, 0, log=log):
|
|
return False
|
|
return self.end_request(log)
|
|
|
|
def write_data(self, log: Optional[Callable[[str], None]] = None) -> bool:
|
|
"""WRITE_DATA macro (from dwl_fw.cpp line 260)."""
|
|
if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (WRITE_DATA)"):
|
|
return False
|
|
if not self._wait_status_bit(12, 1, log=log):
|
|
return False
|
|
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"):
|
|
return False
|
|
if not self._wait_status_bit(12, 0, log=log):
|
|
return False
|
|
return self.end_request(log)
|
|
|
|
def wait_flash(
|
|
self,
|
|
max_retries: int = 1000,
|
|
log: Optional[Callable[[str], None]] = None,
|
|
) -> bool:
|
|
"""Wait for flash operation to complete (from dwl_fw.cpp line 359)."""
|
|
for _ in range(max_retries):
|
|
# Check abort before each flash status check
|
|
if self.abort_check and self.abort_check():
|
|
return False
|
|
|
|
if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"):
|
|
continue
|
|
if not self._write_reg(CMD_REG, 0x05, log, "CMD_REG = 0x05 (ReadStatus)"):
|
|
continue
|
|
|
|
# read_data macro
|
|
if not self.start_request(log):
|
|
continue
|
|
if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (READ_DATA)"):
|
|
continue
|
|
if not self._wait_status_bit(12, 1, log=log):
|
|
continue
|
|
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"):
|
|
continue
|
|
if not self._wait_status_bit(12, 0, log=log):
|
|
continue
|
|
|
|
# Check RX_FIFO_REG bit 0 == 0 (flash ready)
|
|
rx_fifo = self._read_reg(RX_FIFO_REG, log=log, description="RX_FIFO (flash status)")
|
|
if rx_fifo is not None and (rx_fifo & 0x01) == 0:
|
|
self.end_request(log)
|
|
return True
|
|
|
|
self.end_request(log)
|
|
time.sleep(0.001)
|
|
|
|
return False
|
|
|
|
def erase_section(
|
|
self,
|
|
address: int,
|
|
log: Optional[Callable[[str], None]] = None,
|
|
) -> bool:
|
|
"""Erase 64KB flash sector (from dwl_fw.cpp line 421).
|
|
|
|
Args:
|
|
address: Flash address (should be 64KB-aligned).
|
|
log: Optional logging callback for TFTP commands.
|
|
|
|
Returns:
|
|
True on success.
|
|
"""
|
|
# MODE_REG = 0x02 + flashport
|
|
if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"):
|
|
return False
|
|
# CMD_REG = 0x06 (Write Enable)
|
|
if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06 (WriteEnable)"):
|
|
return False
|
|
if not self.send_request(log):
|
|
return False
|
|
|
|
# MODE_REG = 0x06 + flashport
|
|
if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"):
|
|
return False
|
|
# ADDR_REG = flash address
|
|
if not self._write_reg(ADDR_REG, address, log, f"ADDR_REG = 0x{address:08X}"):
|
|
return False
|
|
# NUM_BYTE_REG = 0 (for erase)
|
|
if not self._write_reg(NUM_BYTE_REG, 0x00, log, "NUM_BYTE_REG = 0"):
|
|
return False
|
|
# CMD_REG = 0xD8 (Sector Erase)
|
|
if not self._write_reg(CMD_REG, 0xD8, log, "CMD_REG = 0xD8 (Erase64KB)"):
|
|
return False
|
|
if not self.send_request(log):
|
|
return False
|
|
|
|
return True
|
|
|
|
def write_flash(
|
|
self,
|
|
address: int,
|
|
data: bytes,
|
|
log: Optional[Callable[[str], None]] = None,
|
|
) -> bool:
|
|
"""Write 256 bytes to flash (from dwl_fw.cpp line 392).
|
|
|
|
Args:
|
|
address: Flash address (should be 256-byte aligned).
|
|
data: Data to write (will be padded/truncated to 256 bytes).
|
|
log: Optional logging callback for TFTP commands.
|
|
|
|
Returns:
|
|
True on success.
|
|
"""
|
|
# Ensure 256 bytes
|
|
if len(data) < 256:
|
|
data = data + b'\x00' * (256 - len(data))
|
|
else:
|
|
data = data[:256]
|
|
|
|
# MODE_REG = 0x02 + flashport
|
|
if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"):
|
|
return False
|
|
# CMD_REG = 0x06 (Write Enable)
|
|
if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06 (WriteEnable)"):
|
|
return False
|
|
if not self.send_request(log):
|
|
return False
|
|
|
|
# MODE_REG = 0x06 + flashport
|
|
if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"):
|
|
return False
|
|
# ADDR_REG = flash address
|
|
if not self._write_reg(ADDR_REG, address, log, f"ADDR_REG = 0x{address:08X}"):
|
|
return False
|
|
# NUM_BYTE_REG = 256
|
|
if not self._write_reg(NUM_BYTE_REG, 256, log, "NUM_BYTE_REG = 256"):
|
|
return False
|
|
# CMD_REG = 0x02 (Page Program)
|
|
if not self._write_reg(CMD_REG, 0x02, log, "CMD_REG = 0x02 (PageProgram)"):
|
|
return False
|
|
if not self.start_request(log):
|
|
return False
|
|
|
|
# Write data to TX_FIFO_REG
|
|
if not self.client.srio_write(self.slot_address, TX_FIFO_REG, data, log=log, description="TX_FIFO ← 256B data"):
|
|
return False
|
|
|
|
return self.write_data(log)
|
|
|
|
def op_init_qspi(self, log: Optional[Callable[[str], None]] = None) -> bool:
|
|
"""Initialize QSPI interface (from dwl_fw.cpp line 299).
|
|
|
|
This must be called before any flash operations.
|
|
The original code calls this TWICE at startup.
|
|
|
|
Args:
|
|
log: Optional logging callback for TFTP commands.
|
|
|
|
Returns:
|
|
True on success.
|
|
"""
|
|
# MODE_REG = 0x00 + flashport
|
|
if not self._write_reg(MODE_REG, 0x00 + self.flashport, log, f"MODE_REG = 0x{0x00 + self.flashport:02X}"):
|
|
return False
|
|
# CMD_REG = 0x06
|
|
if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06"):
|
|
return False
|
|
if not self.send_request(log):
|
|
return False
|
|
|
|
# MODE_REG = 0x00 + flashport
|
|
if not self._write_reg(MODE_REG, 0x00 + self.flashport, log, f"MODE_REG = 0x{0x00 + self.flashport:02X}"):
|
|
return False
|
|
# CMD_REG = 0x61
|
|
if not self._write_reg(CMD_REG, 0x61, log, "CMD_REG = 0x61 (EnterQPI)"):
|
|
return False
|
|
if not self.start_request(log):
|
|
return False
|
|
# TX_FIFO_REG = 0x6F
|
|
if not self._write_reg(TX_FIFO_REG, 0x6F, log, "TX_FIFO = 0x6F"):
|
|
return False
|
|
if not self.write_data(log):
|
|
return False
|
|
|
|
# MODE_REG = 0x02 + flashport
|
|
if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"):
|
|
return False
|
|
# CMD_REG = 0x06
|
|
if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06"):
|
|
return False
|
|
if not self.send_request(log):
|
|
return False
|
|
|
|
# MODE_REG = 0x02 + flashport
|
|
if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"):
|
|
return False
|
|
# CMD_REG = 0xB7
|
|
if not self._write_reg(CMD_REG, 0xB7, log, "CMD_REG = 0xB7 (EnterDeepPwrDn)"):
|
|
return False
|
|
if not self.send_request(log):
|
|
return False
|
|
|
|
return True
|
|
|
|
def read_flash(
|
|
self,
|
|
address: int,
|
|
length: int,
|
|
log: Optional[Callable[[str], None]] = None,
|
|
) -> Optional[bytes]:
|
|
"""Read data from flash (from dwl_fw.cpp line 344).
|
|
|
|
Args:
|
|
address: Flash address.
|
|
length: Number of bytes to read.
|
|
log: Optional logging callback for TFTP commands.
|
|
|
|
Returns:
|
|
Read data or None on failure.
|
|
"""
|
|
# MODE_REG = 0x06 + flashport
|
|
if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"):
|
|
return None
|
|
# ADDR_REG = flash address
|
|
if not self._write_reg(ADDR_REG, address, log, f"ADDR_REG = 0x{address:08X}"):
|
|
return None
|
|
# NUM_BYTE_REG = length
|
|
if not self._write_reg(NUM_BYTE_REG, length, log, f"NUM_BYTE_REG = {length}"):
|
|
return None
|
|
# CMD_REG = 0x0B (Fast Read)
|
|
if not self._write_reg(CMD_REG, 0x0B, log, "CMD_REG = 0x0B (FastRead)"):
|
|
return None
|
|
|
|
# read_data macro
|
|
if not self.start_request(log):
|
|
return None
|
|
if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (READ_DATA)"):
|
|
return None
|
|
if not self._wait_status_bit(12, 1, log=log):
|
|
return None
|
|
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"):
|
|
return None
|
|
if not self._wait_status_bit(12, 0, log=log):
|
|
return None
|
|
|
|
# Read from RX_FIFO_REG
|
|
data = self.client.srio_read(self.slot_address, RX_FIFO_REG, length, log=log, description=f"RX_FIFO → {length}B data")
|
|
self.end_request(log)
|
|
|
|
return data
|