SXXXXXXX_PyDownloadFwViaSRIO/pydownloadfwviasrio/core/srio_flash.py
2026-01-22 17:10:05 +01:00

484 lines
18 KiB
Python

"""SRIO Flash operations (erase/write/read) via TFTP transport.
This module implements the flash control protocol derived from dwl_fw.cpp,
using TFTP for SRIO register read/write operations.
Reference: _OLD/linux_app/dwl_fw.cpp and dwl_fw.h
"""
from __future__ import annotations
import struct
import time
from typing import Callable, Optional
from pydownloadfwviasrio.tftp_client import SRIOTFTPClient
# SRIO Register addresses (from dwl_fw.h)
MODE_REG = 0x4700002C
CMD_REG = 0x47000030
ADDR_REG = 0x47000034
NUM_BYTE_REG = 0x47000038
CTRL_REG = 0x47000060
TX_FIFO_REG = 0x47000400
STATUS_REG = 0x47000864
RX_FIFO_REG = 0x47000C00
class SRIOFlashController:
"""Low-level SRIO flash controller using TFTP transport.
Implements the register-level protocol from dwl_fw.cpp.
"""
def __init__(
self,
client: SRIOTFTPClient,
slot_address: str,
flashport: int = 0, # 0 = primary, 8 = secondary (from dwl_fw.cpp)
max_verify_retries: int = 500,
) -> None:
"""Initialize SRIO flash controller.
Args:
client: TFTP client for SRIO communication.
slot_address: SRIO slot/endpoint (e.g., "0x13").
flashport: Flash port offset (0 or 8).
max_verify_retries: Default max retries for register write verification.
"""
self.client = client
self.slot_address = slot_address
self.flashport = flashport
self.default_max_verify_retries = max_verify_retries
self.abort_check: Optional[Callable[[], bool]] = None # Set by operations
def set_abort_check(self, abort_check: Optional[Callable[[], bool]]) -> None:
"""Set abort check callback for current operation.
Args:
abort_check: Callback that returns True if operation should abort.
"""
self.abort_check = abort_check
def _write_reg(
self,
address: int,
value: int,
log: Optional[Callable[[str], None]] = None,
description: str = "",
max_verify_retries: Optional[int] = None,
verify_timeout_us: int = 1000,
) -> bool:
"""Write 32-bit value to SRIO register with read-back verification.
Implements the same retry logic as dwl_fw.cpp srio_write_reg() (line 109-177):
- Writes value to register
- Reads back and verifies (except for TX_FIFO which is write-only)
- Retries up to max_verify_retries times if value doesn't match
Args:
address: Register address.
value: 32-bit value to write.
log: Optional logging callback for TFTP commands.
description: Optional description of operation.
max_verify_retries: Max attempts to verify write (uses default if None).
verify_timeout_us: Microseconds delay between verify attempts.
Returns:
True on success (value verified or TX_FIFO write).
"""
if max_verify_retries is None:
max_verify_retries = self.default_max_verify_retries
# Pack value as 4 bytes (little-endian)
data = struct.pack('<I', value)
# Retry loop with read-back verification (dwl_fw.cpp line 134-172)
for attempt in range(max_verify_retries):
# Check abort before each attempt
if self.abort_check and self.abort_check():
return False
# Write value
if not self.client.srio_write(self.slot_address, address, data, log=log, description=description):
if attempt < max_verify_retries - 1:
if log:
log(f" ↻ Retry {attempt+1}/{max_verify_retries}: TFTP write failed, retrying...")
time.sleep(verify_timeout_us / 1_000_000.0)
continue
return False
# Exception: TX_FIFO_REG is write-only, no verification (dwl_fw.cpp line 156)
if address == TX_FIFO_REG:
return True
# Read-back verification (dwl_fw.cpp line 158-164)
read_value = self._read_reg_no_verify(address, log=log)
if read_value is not None and read_value == value:
if attempt > 0 and log: # Log solo se c'è stato almeno 1 retry
log(f" ✓ Verified after {attempt+1} attempt(s)")
return True # Verified successfully!
# Value mismatch, retry after delay (dwl_fw.cpp line 170)
if attempt < max_verify_retries - 1:
if log:
log(f" ↻ Retry {attempt+1}/{max_verify_retries}: Read-back mismatch (got 0x{read_value:08X}, expected 0x{value:08X})")
time.sleep(verify_timeout_us / 1_000_000.0)
# Max retries exhausted (dwl_fw.cpp line 173)
if log:
log(f"⚠️ Write verify FAILED after {max_verify_retries} attempts: addr=0x{address:08X}, expected=0x{value:08X}")
return False
def _read_reg_no_verify(
self,
address: int,
log: Optional[Callable[[str], None]] = None,
) -> Optional[int]:
"""Read 32-bit value from SRIO register (internal, no description log).
Used for read-back verification in _write_reg.
Args:
address: Register address.
log: Optional logging callback for TFTP commands.
Returns:
32-bit value or None on failure.
"""
data = self.client.srio_read(self.slot_address, address, 4, log=log, description="")
if data and len(data) >= 4:
return struct.unpack('<I', data[:4])[0]
return None
def _read_reg(
self,
address: int,
log: Optional[Callable[[str], None]] = None,
description: str = "",
) -> Optional[int]:
"""Read 32-bit value from SRIO register.
Args:
address: Register address.
log: Optional logging callback for TFTP commands.
description: Optional description of operation.
Returns:
32-bit value or None on failure.
"""
data = self.client.srio_read(self.slot_address, address, 4, log=log, description=description)
if data and len(data) >= 4:
return struct.unpack('<I', data[:4])[0]
return None
def _wait_status_bit(
self,
bit_pos: int,
expected_value: int,
max_retries: int = 500,
timeout_us: int = 1000,
log: Optional[Callable[[str], None]] = None,
) -> bool:
"""Wait for specific bit in STATUS_REG to reach expected value.
Args:
bit_pos: Bit position to check (0-31).
expected_value: Expected bit value (0 or 1).
max_retries: Maximum retry attempts.
timeout_us: Microseconds between retries.
log: Optional logging callback.
Returns:
True if bit reached expected value, False on timeout.
"""
for i in range(max_retries):
# Check abort before each poll
if self.abort_check and self.abort_check():
return False
status = self._read_reg(STATUS_REG, log=log, description=f"STATUS_REG (poll bit{bit_pos}{expected_value})")
if status is not None:
bit_value = (status >> bit_pos) & 0x01
if bit_value == expected_value:
return True
time.sleep(timeout_us / 1_000_000.0)
return False
def start_request(self, log: Optional[Callable[[str], None]] = None) -> bool:
"""START_REQUEST macro (from dwl_fw.cpp line 237)."""
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01 (START)"):
return False
return self._wait_status_bit(11, 1, log=log)
def end_request(self, log: Optional[Callable[[str], None]] = None) -> bool:
"""END_REQUEST macro (from dwl_fw.cpp line 248)."""
# Check error bits
for bit in [1, 4, 5, 6, 10]:
if not self._wait_status_bit(bit, 0, log=log):
return False
if not self._write_reg(CTRL_REG, 0x00, log, "CTRL_REG = 0x00 (END)"):
return False
return self._wait_status_bit(11, 0, log=log)
def send_request(self, log: Optional[Callable[[str], None]] = None) -> bool:
"""SEND_REQUEST macro (from dwl_fw.cpp line 282)."""
if not self.start_request(log):
return False
if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (EXECUTE)"):
return False
if not self._wait_status_bit(12, 1, log=log):
return False
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"):
return False
if not self._wait_status_bit(12, 0, log=log):
return False
return self.end_request(log)
def write_data(self, log: Optional[Callable[[str], None]] = None) -> bool:
"""WRITE_DATA macro (from dwl_fw.cpp line 260)."""
if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (WRITE_DATA)"):
return False
if not self._wait_status_bit(12, 1, log=log):
return False
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"):
return False
if not self._wait_status_bit(12, 0, log=log):
return False
return self.end_request(log)
def wait_flash(
self,
max_retries: int = 1000,
log: Optional[Callable[[str], None]] = None,
) -> bool:
"""Wait for flash operation to complete (from dwl_fw.cpp line 359)."""
for _ in range(max_retries):
# Check abort before each flash status check
if self.abort_check and self.abort_check():
return False
if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"):
continue
if not self._write_reg(CMD_REG, 0x05, log, "CMD_REG = 0x05 (ReadStatus)"):
continue
# read_data macro
if not self.start_request(log):
continue
if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (READ_DATA)"):
continue
if not self._wait_status_bit(12, 1, log=log):
continue
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"):
continue
if not self._wait_status_bit(12, 0, log=log):
continue
# Check RX_FIFO_REG bit 0 == 0 (flash ready)
rx_fifo = self._read_reg(RX_FIFO_REG, log=log, description="RX_FIFO (flash status)")
if rx_fifo is not None and (rx_fifo & 0x01) == 0:
self.end_request(log)
return True
self.end_request(log)
time.sleep(0.001)
return False
def erase_section(
self,
address: int,
log: Optional[Callable[[str], None]] = None,
) -> bool:
"""Erase 64KB flash sector (from dwl_fw.cpp line 421).
Args:
address: Flash address (should be 64KB-aligned).
log: Optional logging callback for TFTP commands.
Returns:
True on success.
"""
# MODE_REG = 0x02 + flashport
if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"):
return False
# CMD_REG = 0x06 (Write Enable)
if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06 (WriteEnable)"):
return False
if not self.send_request(log):
return False
# MODE_REG = 0x06 + flashport
if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"):
return False
# ADDR_REG = flash address
if not self._write_reg(ADDR_REG, address, log, f"ADDR_REG = 0x{address:08X}"):
return False
# NUM_BYTE_REG = 0 (for erase)
if not self._write_reg(NUM_BYTE_REG, 0x00, log, "NUM_BYTE_REG = 0"):
return False
# CMD_REG = 0xD8 (Sector Erase)
if not self._write_reg(CMD_REG, 0xD8, log, "CMD_REG = 0xD8 (Erase64KB)"):
return False
if not self.send_request(log):
return False
return True
def write_flash(
self,
address: int,
data: bytes,
log: Optional[Callable[[str], None]] = None,
) -> bool:
"""Write 256 bytes to flash (from dwl_fw.cpp line 392).
Args:
address: Flash address (should be 256-byte aligned).
data: Data to write (will be padded/truncated to 256 bytes).
log: Optional logging callback for TFTP commands.
Returns:
True on success.
"""
# Ensure 256 bytes
if len(data) < 256:
data = data + b'\x00' * (256 - len(data))
else:
data = data[:256]
# MODE_REG = 0x02 + flashport
if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"):
return False
# CMD_REG = 0x06 (Write Enable)
if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06 (WriteEnable)"):
return False
if not self.send_request(log):
return False
# MODE_REG = 0x06 + flashport
if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"):
return False
# ADDR_REG = flash address
if not self._write_reg(ADDR_REG, address, log, f"ADDR_REG = 0x{address:08X}"):
return False
# NUM_BYTE_REG = 256
if not self._write_reg(NUM_BYTE_REG, 256, log, "NUM_BYTE_REG = 256"):
return False
# CMD_REG = 0x02 (Page Program)
if not self._write_reg(CMD_REG, 0x02, log, "CMD_REG = 0x02 (PageProgram)"):
return False
if not self.start_request(log):
return False
# Write data to TX_FIFO_REG
if not self.client.srio_write(self.slot_address, TX_FIFO_REG, data, log=log, description="TX_FIFO ← 256B data"):
return False
return self.write_data(log)
def op_init_qspi(self, log: Optional[Callable[[str], None]] = None) -> bool:
"""Initialize QSPI interface (from dwl_fw.cpp line 299).
This must be called before any flash operations.
The original code calls this TWICE at startup.
Args:
log: Optional logging callback for TFTP commands.
Returns:
True on success.
"""
# MODE_REG = 0x00 + flashport
if not self._write_reg(MODE_REG, 0x00 + self.flashport, log, f"MODE_REG = 0x{0x00 + self.flashport:02X}"):
return False
# CMD_REG = 0x06
if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06"):
return False
if not self.send_request(log):
return False
# MODE_REG = 0x00 + flashport
if not self._write_reg(MODE_REG, 0x00 + self.flashport, log, f"MODE_REG = 0x{0x00 + self.flashport:02X}"):
return False
# CMD_REG = 0x61
if not self._write_reg(CMD_REG, 0x61, log, "CMD_REG = 0x61 (EnterQPI)"):
return False
if not self.start_request(log):
return False
# TX_FIFO_REG = 0x6F
if not self._write_reg(TX_FIFO_REG, 0x6F, log, "TX_FIFO = 0x6F"):
return False
if not self.write_data(log):
return False
# MODE_REG = 0x02 + flashport
if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"):
return False
# CMD_REG = 0x06
if not self._write_reg(CMD_REG, 0x06, log, "CMD_REG = 0x06"):
return False
if not self.send_request(log):
return False
# MODE_REG = 0x02 + flashport
if not self._write_reg(MODE_REG, 0x02 + self.flashport, log, f"MODE_REG = 0x{0x02 + self.flashport:02X}"):
return False
# CMD_REG = 0xB7
if not self._write_reg(CMD_REG, 0xB7, log, "CMD_REG = 0xB7 (EnterDeepPwrDn)"):
return False
if not self.send_request(log):
return False
return True
def read_flash(
self,
address: int,
length: int,
log: Optional[Callable[[str], None]] = None,
) -> Optional[bytes]:
"""Read data from flash (from dwl_fw.cpp line 344).
Args:
address: Flash address.
length: Number of bytes to read.
log: Optional logging callback for TFTP commands.
Returns:
Read data or None on failure.
"""
# MODE_REG = 0x06 + flashport
if not self._write_reg(MODE_REG, 0x06 + self.flashport, log, f"MODE_REG = 0x{0x06 + self.flashport:02X}"):
return None
# ADDR_REG = flash address
if not self._write_reg(ADDR_REG, address, log, f"ADDR_REG = 0x{address:08X}"):
return None
# NUM_BYTE_REG = length
if not self._write_reg(NUM_BYTE_REG, length, log, f"NUM_BYTE_REG = {length}"):
return None
# CMD_REG = 0x0B (Fast Read)
if not self._write_reg(CMD_REG, 0x0B, log, "CMD_REG = 0x0B (FastRead)"):
return None
# read_data macro
if not self.start_request(log):
return None
if not self._write_reg(CTRL_REG, 0x03, log, "CTRL_REG = 0x03 (READ_DATA)"):
return None
if not self._wait_status_bit(12, 1, log=log):
return None
if not self._write_reg(CTRL_REG, 0x01, log, "CTRL_REG = 0x01"):
return None
if not self._wait_status_bit(12, 0, log=log):
return None
# Read from RX_FIFO_REG
data = self.client.srio_read(self.slot_address, RX_FIFO_REG, length, log=log, description=f"RX_FIFO → {length}B data")
self.end_request(log)
return data