S1005403_RisCC/target_simulator/gui/status_bar.py

265 lines
9.7 KiB
Python

"""
Status bar widget.
Cosa fa: fornisce indicatori di connessione, messaggi di stato e monitor risorse.
Principali: StatusBar
Ingressi/Uscite: crea widget UI; aggiorna status via metodi pubblici (side-effect).
"""
import os
import threading
import tkinter as tk
from tkinter import ttk
from typing import Optional
# Optional dependency: psutil provides portable CPU/memory info across OSes.
try:
import psutil # type: ignore
_HAS_PSUTIL = True
except Exception:
psutil = None # type: ignore
_HAS_PSUTIL = False
class StatusBar(ttk.Frame):
"""Status bar widget containing connection indicators, status text,
small rate indicator and optional resource monitor.
Public attributes used elsewhere in the app:
- target_status_canvas
- lru_status_canvas
- status_var (tk.StringVar)
- rate_status_var (tk.StringVar) or None
- resource_var (tk.StringVar) or None
The resource monitor runs in a daemon thread and updates the UI via
`after(0, ...)` so updates are executed on the Tk mainloop.
"""
def __init__(self, parent, resource_poll_s: float = 1.0, height: int = 24):
super().__init__(parent, relief=tk.SUNKEN)
# Keep the status bar a fixed small height
try:
self.configure(height=int(height))
self.pack_propagate(False)
except Exception:
pass
# Left area: connection indicators
ttk.Label(self, text="Target:").pack(side=tk.LEFT, padx=(6, 2))
self.target_status_canvas = tk.Canvas(
self, width=16, height=16, highlightthickness=0
)
self.target_status_canvas.pack(side=tk.LEFT, padx=(0, 8))
self._draw_status_indicator(self.target_status_canvas, "#e74c3c")
ttk.Label(self, text="LRU:").pack(side=tk.LEFT, padx=(6, 2))
self.lru_status_canvas = tk.Canvas(
self, width=16, height=16, highlightthickness=0
)
self.lru_status_canvas.pack(side=tk.LEFT, padx=(0, 8))
self._draw_status_indicator(self.lru_status_canvas, "#e74c3c")
# Center: main status message
self.status_var = tk.StringVar(value="Ready")
ttk.Label(self, textvariable=self.status_var, anchor=tk.W).pack(
side=tk.LEFT, fill=tk.X, expand=True, padx=6
)
# Right: rate, latency, and resource indicators
try:
# Resource usage (optional) - pack this first to appear on the far right
self.resource_var = tk.StringVar(value="")
ttk.Label(self, textvariable=self.resource_var, anchor=tk.E).pack(
side=tk.RIGHT, padx=(6, 8)
)
except Exception:
self.resource_var = None
# Separator before latency
ttk.Separator(self, orient=tk.VERTICAL).pack(
side=tk.RIGHT, fill=tk.Y, padx=5, pady=4
)
try:
# Latency indicator
self.latency_status_var = tk.StringVar(value="")
ttk.Label(self, textvariable=self.latency_status_var, anchor=tk.E).pack(
side=tk.RIGHT, padx=(0, 6)
)
except Exception:
self.latency_status_var = None
# Separator before rate
ttk.Separator(self, orient=tk.VERTICAL).pack(
side=tk.RIGHT, fill=tk.Y, padx=5, pady=4
)
try:
self.rate_status_var = tk.StringVar(value="")
ttk.Label(self, textvariable=self.rate_status_var, anchor=tk.E).pack(
side=tk.RIGHT, padx=(0, 6)
)
except Exception:
self.rate_status_var = None
# Internal state
self._status_after_id: Optional[str] = None
self._res_stop_event = threading.Event()
self._res_thread: Optional[threading.Thread] = None
self._resource_poll_s = float(resource_poll_s)
# Start background monitor if psutil is available
if _HAS_PSUTIL and self.resource_var is not None:
try:
self.start_resource_monitor(self._resource_poll_s)
except Exception:
pass
def _draw_status_indicator(self, canvas: tk.Canvas, color: str) -> None:
try:
canvas.delete("all")
canvas.create_oval(2, 2, 14, 14, fill=color, outline="black")
except Exception:
pass
def set_target_connected(self, is_connected: bool) -> None:
color = "#2ecc40" if is_connected else "#e74c3c"
try:
self._draw_status_indicator(self.target_status_canvas, color)
except Exception:
pass
def set_lru_connected(self, is_connected: bool) -> None:
color = "#2ecc40" if is_connected else "#e74c3c"
try:
self._draw_status_indicator(self.lru_status_canvas, color)
except Exception:
pass
def show_status_message(self, text: str, timeout_ms: int = 3000) -> None:
"""Show a transient status message in the main status bar.
If timeout_ms is 0 the message is sticky until changed again. This
method is thread-safe when called from the Tk mainloop; if you call it
from a worker thread, schedule it via `after(0, lambda: ... )`.
"""
try:
try:
if self._status_after_id is not None:
try:
self.after_cancel(self._status_after_id)
except Exception:
pass
except Exception:
pass
try:
self.status_var.set(text)
except Exception:
pass
# Schedule clear back to Ready
if timeout_ms and timeout_ms > 0:
def _clear():
try:
self.status_var.set("Ready")
except Exception:
pass
try:
self._status_after_id = self.after(timeout_ms, _clear)
except Exception:
self._status_after_id = None
except Exception:
# Swallow errors; status updates shouldn't crash the UI
pass
# ---- Resource monitor ----
def start_resource_monitor(self, poll_s: float = 1.0) -> None:
"""Start the background resource monitor (daemon thread)."""
if not _HAS_PSUTIL or self.resource_var is None:
return
# If already running, ignore
if self._res_thread and self._res_thread.is_alive():
return
self._res_stop_event.clear()
self._resource_poll_s = float(poll_s)
def _monitor_loop():
try:
proc = psutil.Process(os.getpid())
# Prime cpu_percent the first time
try:
proc.cpu_percent(None)
psutil.cpu_percent(None)
except Exception:
pass
while not self._res_stop_event.wait(self._resource_poll_s):
try:
# Measure CPU usage for the current process (proc.cpu_percent)
# psutil.Process.cpu_percent may return values >100 on
# multi-core systems because it reports percentage of CPU
# time across all cores. To present a value comparable to
# Task Manager's per-process percentage (0-100 scale),
# normalize by the number of logical CPUs.
cpu_proc = proc.cpu_percent(None)
ncpu = psutil.cpu_count(logical=True) or 1
cpu = cpu_proc / ncpu
# Prefer USS (unique set size) when available since it
# represents memory unique to the process (closer to
# what Task Manager reports as private working set).
try:
mem_full = proc.memory_full_info()
uss = getattr(mem_full, "uss", None)
except Exception:
uss = None
if uss is not None and uss > 0:
mem_bytes = uss
mem_tag = "USS"
else:
# Fallback to RSS (working set) if USS unavailable
mem_bytes = proc.memory_info().rss
mem_tag = "RSS"
# Convert to MiB for display (1024^2) — clear label
rss_mb = mem_bytes / (1024.0 * 1024.0)
mem_pct = proc.memory_percent()
nthreads = proc.num_threads()
s = (
f"CPU {cpu:.0f}% · MEM {rss_mb:.0f}MB ({mem_pct:.1f}%)"
f" [{mem_tag}] · Thr {nthreads}"
)
except Exception:
s = ""
try:
# Schedule UI update on main thread
self.after(0, lambda val=s: self.resource_var.set(val))
except Exception:
pass
except Exception:
# If psutil unexpectedly fails, stop quietly
pass
self._res_thread = threading.Thread(target=_monitor_loop, daemon=True)
self._res_thread.start()
def stop_resource_monitor(self) -> None:
"""Stop the background resource-monitoring thread.
Sets the internal stop event which causes the daemon monitor thread to
exit on its next poll. This method is safe to call multiple times.
"""
try:
self._res_stop_event.set()
except Exception:
pass