"""TFTP client for SRIO-over-TFTP communication. This module implements the sender side: it constructs special TFTP filenames that encode SRIO read/write commands and uses a simple TFTP implementation to communicate with the target (Windows-compatible). Protocol format (compatible with Vecchia_app): - Write: PUT "$SRIO:/
+" with binary payload - Read: GET "$SRIO:/
+" returns binary data Reference: fgpaprogrammer.cpp rtgWrite/rtgRead functions. Note: This is a minimal TFTP client implementation for Windows compatibility. For production, consider using a full-featured TFTP library. """ from __future__ import annotations import socket import struct import time from typing import Callable, Optional class SimpleTFTPClient: """Minimal TFTP client (RFC 1350) for Windows compatibility. Implements only write (WRQ) and read (RRQ) operations needed for SRIO protocol. """ # TFTP opcodes OP_RRQ = 1 # Read request OP_WRQ = 2 # Write request OP_DATA = 3 # Data OP_ACK = 4 # Acknowledgment OP_ERROR = 5 # Error def __init__(self, server_ip: str, server_port: int = 69, timeout: float = 5.0) -> None: self.server_ip = server_ip self.server_port = server_port self.timeout = timeout self.block_size = 512 # TFTP default def _send_rrq(self, filename: str, sock: socket.socket) -> None: """Send Read Request.""" mode = b"octet" packet = struct.pack("!H", self.OP_RRQ) + filename.encode() + b"\x00" + mode + b"\x00" sock.sendto(packet, (self.server_ip, self.server_port)) def _send_wrq(self, filename: str, sock: socket.socket) -> None: """Send Write Request.""" mode = b"octet" packet = struct.pack("!H", self.OP_WRQ) + filename.encode() + b"\x00" + mode + b"\x00" sock.sendto(packet, (self.server_ip, self.server_port)) def _send_ack(self, block_num: int, sock: socket.socket, addr: tuple) -> None: """Send ACK.""" packet = struct.pack("!HH", self.OP_ACK, block_num) sock.sendto(packet, addr) def _send_data(self, block_num: int, data: bytes, sock: socket.socket, addr: tuple) -> None: """Send DATA.""" packet = struct.pack("!HH", self.OP_DATA, block_num) + data sock.sendto(packet, addr) def download(self, filename: str) -> Optional[bytes]: """Download file via TFTP GET (RRQ).""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(self.timeout) try: self._send_rrq(filename, sock) data_buffer = bytearray() expected_block = 1 while True: packet, server_addr = sock.recvfrom(4096) opcode = struct.unpack("!H", packet[:2])[0] if opcode == self.OP_DATA: block_num = struct.unpack("!H", packet[2:4])[0] data = packet[4:] if block_num == expected_block: data_buffer.extend(data) self._send_ack(block_num, sock, server_addr) expected_block += 1 # Last packet (less than 512 bytes) if len(data) < self.block_size: break elif opcode == self.OP_ERROR: error_code = struct.unpack("!H", packet[2:4])[0] error_msg = packet[4:].decode('ascii', errors='ignore').rstrip('\x00') raise Exception(f"TFTP Error {error_code}: {error_msg}") return bytes(data_buffer) except socket.timeout: raise Exception("TFTP timeout") finally: sock.close() def upload(self, filename: str, data: bytes) -> bool: """Upload file via TFTP PUT (WRQ).""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(self.timeout) try: self._send_wrq(filename, sock) # Wait for initial ACK (block 0) packet, server_addr = sock.recvfrom(4096) opcode = struct.unpack("!H", packet[:2])[0] if opcode == self.OP_ERROR: error_code = struct.unpack("!H", packet[2:4])[0] error_msg = packet[4:].decode('ascii', errors='ignore').rstrip('\x00') raise Exception(f"TFTP Error {error_code}: {error_msg}") if opcode != self.OP_ACK: raise Exception("Expected ACK after WRQ") # Send data blocks block_num = 1 offset = 0 while offset < len(data): chunk = data[offset : offset + self.block_size] self._send_data(block_num, chunk, sock, server_addr) # Wait for ACK packet, _ = sock.recvfrom(4096) opcode = struct.unpack("!H", packet[:2])[0] if opcode == self.OP_ERROR: error_code = struct.unpack("!H", packet[2:4])[0] error_msg = packet[4:].decode('ascii', errors='ignore').rstrip('\x00') raise Exception(f"TFTP Error {error_code}: {error_msg}") if opcode != self.OP_ACK: raise Exception(f"Expected ACK for block {block_num}") ack_block = struct.unpack("!H", packet[2:4])[0] if ack_block != block_num: raise Exception(f"ACK mismatch: expected {block_num}, got {ack_block}") offset += len(chunk) block_num += 1 return True except socket.timeout: raise Exception("TFTP timeout") finally: sock.close() class SRIOTFTPClient: """TFTP client that sends SRIO commands via filename encoding. Includes simplified retry logic with exponential backoff. Reference: fgpaprogrammer.cpp NUM_REPEAT_WRITE/NUM_REPEAT_READ. """ def __init__(self, server_ip: str, server_port: int = 69, max_retries: int = 3) -> None: """Initialize SRIO TFTP client. Args: server_ip: TFTP server IP address. server_port: TFTP server port (default 69). max_retries: Maximum retry attempts for TFTP operations (configurable). """ self.server_ip = server_ip self.server_port = server_port self.max_retries = max_retries self.client = SimpleTFTPClient(server_ip, server_port) def srio_write( self, slot: str, address: int, data: bytes, log: Optional[Callable[[str], None]] = None, description: str = "", abort_check: Optional[Callable[[], bool]] = None, ) -> bool: """Write data to SRIO address via TFTP PUT with retry. Args: slot: SRIO endpoint (e.g., "0x10"). address: Target memory address. data: Binary payload to write. log: Optional logging callback. description: Optional description of operation. abort_check: Optional callback that returns True if operation should abort. Returns: True on success, False on failure after retries. """ length = len(data) remote_file = f"$SRIO:{slot}/0x{address:X}+{length}" for attempt in range(self.max_retries): # Check for abort before each retry attempt if abort_check and abort_check(): if log: log(f"TFTP operation aborted: {remote_file}") return False try: if log and attempt == 0: desc_str = f" → {description}" if description else "" log(f"TFTP PUT {remote_file}{desc_str}") elif log: log(f"Retry {attempt}/{self.max_retries - 1}: {remote_file}") # Use simple TFTP client instead of BytesIO self.client.upload(remote_file, data) return True except Exception as e: # Check abort even after errors - don't waste time retrying if user aborted if abort_check and abort_check(): if log: log(f"TFTP operation aborted: {remote_file}") return False if attempt < self.max_retries - 1: # Exponential backoff: 0.1s, 0.2s, 0.4s delay = 0.1 * (2 ** attempt) if log: log(f"Write error: {e}, retrying in {delay}s...") time.sleep(delay) else: if log: log(f"Write FAILED after {self.max_retries} attempts: {remote_file} - {e}") return False return False def srio_read( self, slot: str, address: int, length: int, log: Optional[Callable[[str], None]] = None, description: str = "", abort_check: Optional[Callable[[], bool]] = None, ) -> Optional[bytes]: """Read data from SRIO address via TFTP GET with retry. Args: slot: SRIO endpoint (e.g., "0x10"). address: Target memory address. length: Number of bytes to read. log: Optional logging callback. description: Optional description of operation. Returns: Binary data on success, None on failure after retries. """ remote_file = f"$SRIO:{slot}/0x{address:X}+{length}" for attempt in range(self.max_retries): # Check for abort before each retry attempt if abort_check and abort_check(): if log: log(f"TFTP operation aborted: {remote_file}") return None try: if log and attempt == 0: desc_str = f" → {description}" if description else "" log(f"TFTP GET {remote_file}{desc_str}") elif log: log(f"Retry {attempt}/{self.max_retries - 1}: {remote_file}") # Use simple TFTP client data = self.client.download(remote_file) return data except Exception as e: # Check abort even after errors - don't waste time retrying if user aborted if abort_check and abort_check(): if log: log(f"TFTP operation aborted: {remote_file}") return None if attempt < self.max_retries - 1: # Exponential backoff: 0.1s, 0.2s, 0.4s delay = 0.1 * (2 ** attempt) if log: log(f"Read error: {e}, retrying in {delay}s...") time.sleep(delay) else: if log: log(f"Read FAILED after {self.max_retries} attempts: {remote_file} - {e}") return None return None def format_srio_filename(slot: str, address: int, length: int) -> str: """Format SRIO command as TFTP filename (Vecchia_app compatible). Examples: >>> format_srio_filename("0x10", 0x1000, 256) '$SRIO:0x10/0x1000+256' """ return f"$SRIO:{slot}/0x{address:X}+{length}"