SXXXXXXX_ControlPanel/VideoReceiverSFP/core/sfp_transport.py
2026-01-13 14:21:21 +01:00

201 lines
7.6 KiB
Python

"""SFP transport for VideoReceiverSFP.
Lightweight UDP receiver with fragment reassembly and ACK support.
This file is self-contained inside the module and intentionally keeps
logging to a module-specific logger so it can be tuned by the test harness.
"""
import socket
import threading
import time
import logging
from typing import Dict, Callable, Optional
from .sfp_structures import SFPHeader
from controlpanel import config
PayloadHandler = Callable[[bytearray], None]
logger = logging.getLogger("VideoReceiverSFP.sfp_transport")
def create_udp_socket(host: str, port: int, timeout: float = 1.0) -> Optional[socket.socket]:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# report buffer sizes (module logger)
try:
default_buf = s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
logger.info("default SO_RCVBUF=%d", default_buf)
except Exception:
logger.debug("could not read default SO_RCVBUF", exc_info=True)
# request a large receive buffer like ControlPanel
try:
desired = 8 * 1024 * 1024
logger.debug("requesting SO_RCVBUF=%d", desired)
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, desired)
actual = s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
logger.info("actual SO_RCVBUF=%d", actual)
if actual < desired:
logger.warning("OS capped SO_RCVBUF below requested value")
except OSError:
logger.exception("error while setting SO_RCVBUF")
except Exception:
logger.exception("unexpected error while tuning SO_RCVBUF")
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
s.settimeout(timeout)
logger.info("bound UDP socket on %s:%s", host, port)
return s
except Exception:
logger.exception("Failed to create UDP socket")
return None
def close_udp_socket(sock: socket.socket) -> None:
try:
sock.close()
except Exception:
logger.debug("error closing socket", exc_info=True)
class SfpTransport:
"""Manage SFP reassembly and dispatch to payload handlers."""
def __init__(self, host: str, port: int, payload_handlers: Dict[int, PayloadHandler]):
self._host = host
self._port = port
self._payload_handlers = payload_handlers
self._socket: Optional[socket.socket] = None
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# internal reassembly state
self._fragments: Dict[tuple, Dict[int, int]] = {}
self._buffers: Dict[tuple, bytearray] = {}
def start(self) -> bool:
if self._thread and self._thread.is_alive():
return True
self._socket = create_udp_socket(self._host, self._port)
if not self._socket:
return False
self._stop_event.clear()
self._thread = threading.Thread(target=self._receive_loop, daemon=True)
self._thread.start()
try:
logger.info("receiver thread started for %s:%s", self._host, self._port)
except Exception:
pass
return True
def shutdown(self) -> None:
self._stop_event.set()
if self._socket:
close_udp_socket(self._socket)
self._socket = None
if self._thread and self._thread.is_alive():
self._thread.join(timeout=1.0)
def _receive_loop(self):
while not self._stop_event.is_set():
if not self._socket:
break
try:
data, addr = self._socket.recvfrom(65535)
logger.debug("raw packet recv %d bytes from %s", len(data), addr)
except socket.timeout:
continue
except OSError:
break
except Exception:
time.sleep(0.01)
continue
self._process_packet(data, addr)
def _process_packet(self, data: bytes, addr: tuple):
header_size = SFPHeader.size()
if len(data) < header_size:
return
try:
header = SFPHeader.from_buffer_copy(data)
except Exception:
logger.debug("failed to parse SFPHeader", exc_info=True)
return
try:
flow, tid = header.SFP_FLOW, header.SFP_TID
frag, total_frags = header.SFP_FRAG, header.SFP_TOTFRGAS
pl_size, pl_offset = header.SFP_PLSIZE, header.SFP_PLOFFSET
total_size = header.SFP_TOTSIZE
logger.debug(
"pkt hdr flow=%s tid=%s frag=%s/%s pl_size=%s pl_off=%s tot_size=%s addr=%s",
(chr(flow) if 32 <= flow <= 126 else flow), tid, frag, total_frags, pl_size, pl_offset, total_size, addr,
)
except Exception:
flow = tid = frag = total_frags = pl_size = pl_offset = total_size = None
key = (flow, tid)
if frag == 0:
self._fragments[key] = {}
try:
self._buffers[key] = bytearray(total_size)
except Exception:
self._fragments.pop(key, None)
return
logger.debug("started transaction key=%s total_size=%s", key, total_size)
# If sender requests ACKs, send one back to allow windowing
try:
if header.SFP_FLAGS & 0x01:
self._send_ack(addr, data[:header_size])
except Exception:
logger.debug("failed to send ACK (ignored)", exc_info=True)
if key not in self._buffers:
return
payload = data[header_size:]
bytes_to_copy = min(pl_size, len(payload))
if (pl_offset + bytes_to_copy) > len(self._buffers[key]):
return
self._buffers[key][pl_offset : pl_offset + bytes_to_copy] = payload[:bytes_to_copy]
self._fragments[key][frag] = total_frags
logger.debug("stored frag=%s for key=%s bytes=%s", frag, key, bytes_to_copy)
if len(self._fragments[key]) == total_frags:
completed = self._buffers.pop(key)
self._fragments.pop(key, None)
handler = self._payload_handlers.get(flow)
logger.info("completed payload flow=%s tid=%s size=%d", chr(flow) if 32 <= flow <= 126 else flow, tid, len(completed))
if handler:
try:
handler(completed)
except Exception:
logger.exception("Error in payload handler")
def _send_ack(self, dest_addr: tuple, original_header_bytes: bytes):
"""Send an SFP ACK packet back to the sender, matching ControlPanel behavior."""
if not self._socket:
return
try:
ack_header = bytearray(original_header_bytes)
flow = ack_header[SFPHeader.get_field_offset("SFP_FLOW")]
# Determine window size based on flow
window_size = 0
if flow == ord("M"):
window_size = config.ACK_WINDOW_SIZE_MFD
elif flow == ord("S"):
window_size = config.ACK_WINDOW_SIZE_SAR
# Modify header for ACK response
ack_header[SFPHeader.get_field_offset("SFP_DIRECTION")] = 0x3C
ack_header[SFPHeader.get_field_offset("SFP_WIN")] = window_size
original_flags = ack_header[SFPHeader.get_field_offset("SFP_FLAGS")]
ack_header[SFPHeader.get_field_offset("SFP_FLAGS")] = (original_flags | 0x01) & ~0x02
self._socket.sendto(ack_header, dest_addr)
logger.debug("ACK sent to %s for flow %s", dest_addr, chr(flow) if 32 <= flow <= 126 else flow)
except Exception:
logger.exception("Failed to send ACK to %s", dest_addr)