"""SRIO Flash operations (erase/write/read) via TFTP transport. This module implements the flash control protocol derived from dwl_fw.cpp, using TFTP for SRIO register read/write operations. Reference: _OLD/linux_app/dwl_fw.cpp and dwl_fw.h """ from __future__ import annotations import struct import time from typing import Callable, Optional from pydownloadfwviasrio.tftp_client import SRIOTFTPClient # SRIO Register addresses (from dwl_fw.h) MODE_REG = 0x4700002C CMD_REG = 0x47000030 ADDR_REG = 0x47000034 NUM_BYTE_REG = 0x47000038 CTRL_REG = 0x47000060 TX_FIFO_REG = 0x47000400 STATUS_REG = 0x47000864 RX_FIFO_REG = 0x47000C00 class SRIOFlashController: """Low-level SRIO flash controller using TFTP transport. Implements the register-level protocol from dwl_fw.cpp. """ def __init__( self, client: SRIOTFTPClient, slot_address: str, flashport: int = 0, # 0 = primary, 8 = secondary (from dwl_fw.cpp) max_verify_retries: int = 500, ) -> None: """Initialize SRIO flash controller. Args: client: TFTP client for SRIO communication. slot_address: SRIO slot/endpoint (e.g., "0x13"). flashport: Flash port offset (0 or 8). max_verify_retries: Default max retries for register write verification. """ self.client = client self.slot_address = slot_address self.flashport = flashport self.default_max_verify_retries = max_verify_retries self.abort_check: Optional[Callable[[], bool]] = None # Set by operations def set_abort_check(self, abort_check: Optional[Callable[[], bool]]) -> None: """Set abort check callback for current operation. Args: abort_check: Callback that returns True if operation should abort. """ self.abort_check = abort_check def _write_reg( self, address: int, value: int, log: Optional[Callable[[str], None]] = None, description: str = "", max_verify_retries: Optional[int] = None, verify_timeout_us: int = 1000, ) -> bool: """Write 32-bit value to SRIO register with read-back verification. Implements the same retry logic as dwl_fw.cpp srio_write_reg() (line 109-177): - Writes value to register - Reads back and verifies (except for TX_FIFO which is write-only) - Retries up to max_verify_retries times if value doesn't match Args: address: Register address. value: 32-bit value to write. log: Optional logging callback for TFTP commands. description: Optional description of operation. max_verify_retries: Max attempts to verify write (uses default if None). verify_timeout_us: Microseconds delay between verify attempts. Returns: True on success (value verified or TX_FIFO write). """ if max_verify_retries is None: max_verify_retries = self.default_max_verify_retries # Pack value as 4 bytes (little-endian) data = struct.pack(' 0 and log: # Log solo se c'è stato almeno 1 retry log(f" ✓ Verified after {attempt+1} attempt(s)") return True # Verified successfully! # Value mismatch, retry after delay (dwl_fw.cpp line 170) if attempt < max_verify_retries - 1: if log: log(f" ↻ Retry {attempt+1}/{max_verify_retries}: Read-back mismatch (got 0x{read_value:08X}, expected 0x{value:08X})") time.sleep(verify_timeout_us / 1_000_000.0) # Max retries exhausted (dwl_fw.cpp line 173) if log: log(f"⚠️ Write verify FAILED after {max_verify_retries} attempts: addr=0x{address:08X}, expected=0x{value:08X}") return False def _read_reg_no_verify( self, address: int, log: Optional[Callable[[str], None]] = None, ) -> Optional[int]: """Read 32-bit value from SRIO register (internal, no description log). Used for read-back verification in _write_reg. Args: address: Register address. log: Optional logging callback for TFTP commands. Returns: 32-bit value or None on failure. """ data = self.client.srio_read(self.slot_address, address, 4, log=log, description="") if data and len(data) >= 4: return struct.unpack(' Optional[int]: """Read 32-bit value from SRIO register. Args: address: Register address. log: Optional logging callback for TFTP commands. description: Optional description of operation. Returns: 32-bit value or None on failure. """ data = self.client.srio_read(self.slot_address, address, 4, log=log, description=description) if data and len(data) >= 4: return struct.unpack(' bool: """Wait for specific bit in STATUS_REG to reach expected value. Args: bit_pos: Bit position to check (0-31). expected_value: Expected bit value (0 or 1). max_retries: Maximum retry attempts. timeout_us: Microseconds between retries. log: Optional logging callback. Returns: True if bit reached expected value, False on timeout. """ for i in range(max_retries): # Check abort before each poll if self.abort_check and self.abort_check(): return False status = self._read_reg(STATUS_REG, log=log, description=f"STATUS_REG (poll bit{bit_pos}→{expected_value})") if status is not None: bit_value = (status >> bit_pos) & 0x01 if bit_value == expected_value: return True time.sleep(timeout_us / 1_000_000.0) return False def start_request(self, log: Optional[Callable[[str], None]] = None) -> bool: """START_REQUEST macro (from dwl_fw.cpp line 237).""" if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01 (START)"): return False return self._wait_status_bit(11, 1, log=log) def end_request(self, log: Optional[Callable[[str], None]] = None) -> bool: """END_REQUEST macro (from dwl_fw.cpp line 248).""" # Check error bits for bit in [1, 4, 5, 6, 10]: if not self._wait_status_bit(bit, 0, log=log): return False if not self._write_reg(CTRL_REG, 0x00, log, "CTRL_REG = 0x00 (END)"): return False return self._wait_status_bit(11, 0, log=log) def send_request(self, log: Optional[Callable[[str], None]] = None) -> bool: """SEND_REQUEST macro (from dwl_fw.cpp line 282).""" if not self.start_request(log): return False if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (EXECUTE)"): return False if not self._wait_status_bit(12, 1, log=log): return False if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"): return False if not self._wait_status_bit(12, 0, log=log): return False return self.end_request(log) def write_data(self, log: Optional[Callable[[str], None]] = None) -> bool: """WRITE_DATA macro (from dwl_fw.cpp line 260).""" if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (WRITE_DATA)"): return False if not self._wait_status_bit(12, 1, log=log): return False if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"): return False if not self._wait_status_bit(12, 0, log=log): return False return self.end_request(log) def wait_flash( self, max_retries: int = 1000, log: Optional[Callable[[str], None]] = None, ) -> bool: """Wait for flash operation to complete (from dwl_fw.cpp line 359).""" for _ in range(max_retries): # Check abort before each flash status check if self.abort_check and self.abort_check(): return False if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"): continue if not self._write_reg(CMD_REG, 0x05, log, "CMD_REG = 0x05 (ReadStatus)"): continue # read_data macro if not self.start_request(log): continue if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (READ_DATA)"): continue if not self._wait_status_bit(12, 1, log=log): continue if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"): continue if not self._wait_status_bit(12, 0, log=log): continue # Check RX_FIFO_REG bit 0 == 0 (flash ready) rx_fifo = self._read_reg(RX_FIFO_REG, log=log, description="RX_FIFO (flash status)") if rx_fifo is not None and (rx_fifo & 0x01) == 0: self.end_request(log) return True self.end_request(log) time.sleep(0.001) return False def erase_section( self, address: int, log: Optional[Callable[[str], None]] = None, ) -> bool: """Erase 64KB flash sector (from dwl_fw.cpp line 421). Args: address: Flash address (should be 64KB-aligned). log: Optional logging callback for TFTP commands. Returns: True on success. """ # MODE_REG = 0x02 + flashport if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"): return False # CMD_REG = 0x06 (Write Enable) if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06 (WriteEnable)"): return False if not self.send_request(log): return False # MODE_REG = 0x06 + flashport if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"): return False # ADDR_REG = flash address if not self._write_reg(ADDR_REG, address, log, f"ADDR_REG = 0x{address:08X}"): return False # NUM_BYTE_REG = 0 (for erase) if not self._write_reg(NUM_BYTE_REG, 0x00, log, "NUM_BYTE_REG = 0"): return False # CMD_REG = 0xD8 (Sector Erase) if not self._write_reg(CMD_REG, 0xD8, log, "CMD_REG = 0xD8 (Erase64KB)"): return False if not self.send_request(log): return False return True def write_flash( self, address: int, data: bytes, log: Optional[Callable[[str], None]] = None, ) -> bool: """Write 256 bytes to flash (from dwl_fw.cpp line 392). Args: address: Flash address (should be 256-byte aligned). data: Data to write (will be padded/truncated to 256 bytes). log: Optional logging callback for TFTP commands. Returns: True on success. """ # Ensure 256 bytes if len(data) < 256: data = data + b'\x00' * (256 - len(data)) else: data = data[:256] # MODE_REG = 0x02 + flashport if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"): return False # CMD_REG = 0x06 (Write Enable) if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06 (WriteEnable)"): return False if not self.send_request(log): return False # MODE_REG = 0x06 + flashport if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"): return False # ADDR_REG = flash address if not self._write_reg(ADDR_REG, address, log, f"ADDR_REG = 0x{address:08X}"): return False # NUM_BYTE_REG = 256 if not self._write_reg(NUM_BYTE_REG, 256, log, "NUM_BYTE_REG = 256"): return False # CMD_REG = 0x02 (Page Program) if not self._write_reg(CMD_REG, 0x02, log, "CMD_REG = 0x02 (PageProgram)"): return False if not self.start_request(log): return False # Write data to TX_FIFO_REG if not self.client.srio_write(self.slot_address, TX_FIFO_REG, data, log=log, description="TX_FIFO ← 256B data"): return False return self.write_data(log) def op_init_qspi(self, log: Optional[Callable[[str], None]] = None) -> bool: """Initialize QSPI interface (from dwl_fw.cpp line 299). This must be called before any flash operations. The original code calls this TWICE at startup. Args: log: Optional logging callback for TFTP commands. Returns: True on success. """ # MODE_REG = 0x00 + flashport if not self._write_reg(MODE_REG, 0x00 + self.flashport, log, f"MODE_REG = 0x{0x00 + self.flashport:02X}"): return False # CMD_REG = 0x06 if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06"): return False if not self.send_request(log): return False # MODE_REG = 0x00 + flashport if not self._write_reg(MODE_REG, 0x00 + self.flashport, log, f"MODE_REG = 0x{0x00 + self.flashport:02X}"): return False # CMD_REG = 0x61 if not self._write_reg(CMD_REG, 0x61, log, "CMD_REG = 0x61 (EnterQPI)"): return False if not self.start_request(log): return False # TX_FIFO_REG = 0x6F if not self._write_reg(TX_FIFO_REG, 0x6F, log, "TX_FIFO = 0x6F"): return False if not self.write_data(log): return False # MODE_REG = 0x02 + flashport if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"): return False # CMD_REG = 0x06 if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06"): return False if not self.send_request(log): return False # MODE_REG = 0x02 + flashport if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"): return False # CMD_REG = 0xB7 if not self._write_reg(CMD_REG, 0xB7, log, "CMD_REG = 0xB7 (EnterDeepPwrDn)"): return False if not self.send_request(log): return False return True def read_flash( self, address: int, length: int, log: Optional[Callable[[str], None]] = None, ) -> Optional[bytes]: """Read data from flash (from dwl_fw.cpp line 344). Args: address: Flash address. length: Number of bytes to read. log: Optional logging callback for TFTP commands. Returns: Read data or None on failure. """ # MODE_REG = 0x06 + flashport if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"): return None # ADDR_REG = flash address if not self._write_reg(ADDR_REG, address, log, f"ADDR_REG = 0x{address:08X}"): return None # NUM_BYTE_REG = length if not self._write_reg(NUM_BYTE_REG, length, log, f"NUM_BYTE_REG = {length}"): return None # CMD_REG = 0x0B (Fast Read) if not self._write_reg(CMD_REG, 0x0B, log, "CMD_REG = 0x0B (FastRead)"): return None # read_data macro if not self.start_request(log): return None if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (READ_DATA)"): return None if not self._wait_status_bit(12, 1, log=log): return None if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"): return None if not self._wait_status_bit(12, 0, log=log): return None # Read from RX_FIFO_REG data = self.client.srio_read(self.slot_address, RX_FIFO_REG, length, log=log, description=f"RX_FIFO → {length}B data") self.end_request(log) return data