S1005403_RisCC/target_simulator/utils/config_manager.py

457 lines
19 KiB
Python

# target_simulator/utils/config_manager.py
"""
Manages loading and saving of application settings and scenarios to a single JSON file.
"""
import json
import os
import sys
import shutil
import logging
import time
from typing import Dict, Any, Optional, List
from enum import Enum
class EnumEncoder(json.JSONEncoder):
"""A custom JSON encoder to handle Enum objects by encoding them as their values."""
def default(self, o: Any) -> Any:
if isinstance(o, Enum):
return o.value
return super().default(o)
class ConfigManager:
"""Manage application settings and scenario persistence.
ConfigManager centralizes reading and writing of the application's
`settings.json` and a separate `scenarios.json`. It performs atomic writes,
rotating backups, and contains helpers for callers to get/save general
settings and scenarios.
"""
def save_connection_settings(self, config: Dict[str, Any]):
"""
Save both target and lru connection configs in the settings file.
"""
if "general" not in self._settings:
self._settings["general"] = {}
self._settings["general"]["connection"] = config
self._save_settings()
def get_connection_settings(self) -> Dict[str, Any]:
"""
Returns the connection settings from the 'general' section.
"""
general = self._settings.get("general", {})
return general.get("connection", {})
def __init__(
self,
filename: str = "settings.json",
scenarios_filename: str = "scenarios.json",
):
"""
Initializes the ConfigManager.
Args:
filename: The name of the settings file. It will be stored in
the project root directory.
"""
if getattr(sys, "frozen", False):
application_path = os.path.dirname(sys.executable)
else:
application_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
self.filepath = os.path.join(application_path, filename)
# Persist scenarios next to the settings file so tests that override
# `filepath` do not accidentally read/write the application's
# scenarios.json in the repo root.
self.scenarios_filepath = os.path.join(
os.path.dirname(self.filepath), scenarios_filename
)
self._settings = self._load_or_initialize_settings()
# Load scenarios from separate file if present, otherwise keep any scenarios
# found inside settings.json (fallback).
self._scenarios = self._load_or_initialize_scenarios()
# Apply debug overrides (if present) into the global DEBUG_CONFIG so
# runtime helpers (e.g., csv_logger) pick up user-configured values.
try:
from target_simulator.config import DEBUG_CONFIG
debug_block = (
self._settings.get("debug", {})
if isinstance(self._settings, dict)
else {}
)
if isinstance(debug_block, dict):
for k, v in debug_block.items():
DEBUG_CONFIG[k] = v
except Exception:
# If anything goes wrong here, we don't want to fail initialization.
pass
def _load_or_initialize_settings(self) -> Dict[str, Any]:
"""Loads settings from the JSON file or initializes with a default structure."""
# Note: do not accidentally clear `_scenarios` here. Tests may call
# this method directly to reload settings, but clearing the in-memory
# scenarios dict can lead to accidental data loss if followed by a
# save operation. Any explicit reload of scenarios should be done via
# `_load_or_initialize_scenarios()`.
if not os.path.exists(self.filepath):
# Ensure in-memory scenarios reflect the (missing) settings location
try:
self._scenarios = self._load_or_initialize_scenarios()
except Exception:
self._scenarios = {}
return {"general": {}, "scenarios": {}}
try:
with open(self.filepath, "r", encoding="utf-8") as f:
settings = json.load(f)
if not isinstance(settings, dict) or "general" not in settings:
# If file contained only general settings (old style), wrap it
return {"general": settings, "scenarios": {}}
# Keep scenarios key if present; actual source of truth for scenarios
# will be the separate scenarios file when available.
# Refresh in-memory scenarios based on the possibly-updated
# settings filepath so callers that override `filepath` and
# re-run this loader get a consistent view.
try:
self._scenarios = self._load_or_initialize_scenarios()
except Exception:
self._scenarios = {}
return settings
except (json.JSONDecodeError, IOError):
try:
self._scenarios = self._load_or_initialize_scenarios()
except Exception:
self._scenarios = {}
return {"general": {}, "scenarios": {}}
def _save_settings(self):
"""Saves the current settings to the JSON file."""
try:
with open(self.filepath, "w", encoding="utf-8") as f:
# Write all settings except 'scenarios' (which is persisted
# separately). This preserves custom top-level keys while
# ensuring scenarios are stored in their dedicated file.
to_write = dict(self._settings)
if "scenarios" in to_write:
# don't duplicate scenarios in settings.json
del to_write["scenarios"]
json.dump(to_write, f, indent=4, cls=EnumEncoder)
except IOError as e:
print(f"Error saving settings to {self.filepath}: {e}")
def _load_or_initialize_scenarios(self) -> Dict[str, Any]:
"""Loads scenarios from the separate scenarios file if present.
Falls back to scenarios inside settings.json when scenarios.json is absent.
"""
# Prefer a scenarios file colocated with the settings file (useful
# for tests which set `filepath` to a temp dir). Construct the local
# scenarios path by combining the settings directory with the
# scenarios filename.
try:
settings_dir = os.path.dirname(self.filepath) or "."
local_scenarios_path = os.path.join(
settings_dir, os.path.basename(self.scenarios_filepath)
)
if os.path.exists(local_scenarios_path):
with open(local_scenarios_path, "r", encoding="utf-8") as f:
scenarios = json.load(f)
if isinstance(scenarios, dict):
return scenarios
except Exception:
# If local scenarios file is unreadable, fall through to other
# fallback mechanisms rather than raising.
pass
# Only load the application-level scenarios file when the settings
# filepath resides inside the application directory. Tests often
# override `filepath` to a temporary location; in that case we must
# avoid pulling in global scenarios from the repo.
try:
app_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
settings_abs = os.path.abspath(self.filepath)
if os.path.commonpath([app_root, settings_abs]) == app_root:
if os.path.exists(self.scenarios_filepath):
try:
with open(self.scenarios_filepath, "r", encoding="utf-8") as f:
scenarios = json.load(f)
if isinstance(scenarios, dict):
return scenarios
except (json.JSONDecodeError, IOError):
return {}
except Exception:
# If path computations fail, fall back silently to other options.
pass
# Fallback: try to read scenarios stored inside settings.json
try:
with open(self.filepath, "r", encoding="utf-8") as f:
settings = json.load(f)
if isinstance(settings, dict) and "scenarios" in settings:
return settings.get("scenarios", {})
except Exception:
pass
return {}
def _save_scenarios(self):
"""Saves scenarios to the separate scenarios file."""
logger = logging.getLogger(__name__)
try:
# If there are no scenarios to save, avoid overwriting an existing
# scenarios file with an empty dict. This prevents loss of data when
# the application shuts down while _scenarios is temporarily empty.
if not self._scenarios:
if os.path.exists(self.scenarios_filepath):
logger.info(
"Skipping saving scenarios: in-memory scenarios empty and '%s' exists",
self.scenarios_filepath,
)
# Keep existing file untouched and return early.
return
# If file doesn't exist and no scenarios, nothing to do.
logger.info(
"No scenarios to save and '%s' does not exist; skipping write",
self.scenarios_filepath,
)
return
# If target exists, create a rotating backup set (keep last N versions)
backup_count = 5
try:
if os.path.exists(self.scenarios_filepath):
# rotate existing backups: bak{n-1} -> bak{n}
for i in range(backup_count - 1, 0, -1):
src = f"{self.scenarios_filepath}.bak{i}"
dst = f"{self.scenarios_filepath}.bak{i+1}"
if os.path.exists(src):
try:
shutil.move(src, dst)
except Exception:
pass
# copy current to bak1
try:
shutil.copy2(
self.scenarios_filepath, f"{self.scenarios_filepath}.bak1"
)
except Exception:
pass
except Exception:
# Non-fatal: proceed with write even if backup fails
pass
# Before writing, make an explicit timestamped backup of the current
# scenarios file to aid recovery in case of unexpected truncation.
try:
if os.path.exists(self.scenarios_filepath):
ts = int(time.time())
prebackup = f"{self.scenarios_filepath}.prewrite.{ts}.bak"
try:
shutil.copy2(self.scenarios_filepath, prebackup)
logger.debug("Created pre-write backup: %s", prebackup)
except Exception:
logger.debug(
"Pre-write backup failed, continuing", exc_info=True
)
except Exception:
# Non-fatal
pass
# Write atomically: write to a temp file then validate and replace the target.
dirpath = os.path.dirname(self.scenarios_filepath) or "."
tmp_path = os.path.join(
dirpath, f".{os.path.basename(self.scenarios_filepath)}.tmp"
)
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(self._scenarios, f, indent=4, cls=EnumEncoder)
# Validate the temporary file contains decodable JSON and expected type
try:
with open(tmp_path, "r", encoding="utf-8") as tf:
validated = json.load(tf)
if not isinstance(validated, dict):
logger.error(
"Temporary scenarios file did not contain a JSON object; aborting write"
)
try:
os.remove(tmp_path)
except Exception:
pass
return
except Exception as e:
logger.error(
"Failed to validate temporary scenarios file: %s", e, exc_info=True
)
try:
os.remove(tmp_path)
except Exception:
pass
return
# Heuristic safeguard: if replacing an existing file would dramatically
# reduce the number of stored scenarios (e.g., existing had many and new
# has only one empty entry), preserve the existing file and write the
# candidate to a .suspect file for manual inspection instead of
# overwriting silently.
try:
if os.path.exists(self.scenarios_filepath):
try:
with open(self.scenarios_filepath, "r", encoding="utf-8") as ef:
existing = json.load(ef)
except Exception:
existing = None
try:
new_count = len(validated) if isinstance(validated, dict) else 0
existing_count = (
len(existing) if isinstance(existing, dict) else 0
)
if (
existing_count > 1
and new_count <= 1
and any(not v for v in validated.values())
):
suspect_path = f"{self.scenarios_filepath}.suspect.{int(time.time())}.json"
try:
shutil.copy2(tmp_path, suspect_path)
except Exception:
pass
logger.error(
"New scenarios content looks suspicious (existing had %d entries, new has %d); wrote suspect file %s and skipped overwrite",
existing_count,
new_count,
suspect_path,
)
try:
os.remove(tmp_path)
except Exception:
pass
return
except Exception:
# If anything goes wrong in this heuristic, continue with replace
pass
except Exception:
pass
# Atomic replace
try:
os.replace(tmp_path, self.scenarios_filepath)
logger.info("Saved scenarios to %s (atomic)", self.scenarios_filepath)
except Exception:
# Fallback: try to remove target and rename
try:
if os.path.exists(self.scenarios_filepath):
os.remove(self.scenarios_filepath)
os.rename(tmp_path, self.scenarios_filepath)
except Exception as e:
# If rename succeeded, log success; otherwise log error
if os.path.exists(self.scenarios_filepath):
logger.info(
"Saved scenarios to %s (rename fallback)",
self.scenarios_filepath,
)
else:
logger.error(
"Error finalizing scenarios write to %s: %s",
self.scenarios_filepath,
e,
)
except IOError as e:
logger.error("Error saving scenarios to %s: %s", self.scenarios_filepath, e)
def get_general_settings(self) -> Dict[str, Any]:
"""Returns the general settings."""
return self._settings.get("general", {})
def save_general_settings(self, data: Dict[str, Any]):
"""Saves the general settings."""
# Merge incoming general settings with existing ones to avoid
# overwriting unrelated keys (for example: logger_panel saved prefs).
existing = (
self._settings.get("general", {})
if isinstance(self._settings, dict)
else {}
)
try:
# Shallow merge: if a key contains a dict, update its contents
for k, v in (data or {}).items():
if (
isinstance(v, dict)
and k in existing
and isinstance(existing.get(k), dict)
):
existing[k].update(v)
else:
existing[k] = v
except Exception:
# Fallback: replace entirely if merge fails
existing = data
self._settings["general"] = existing
self._save_settings()
def get_scenario_names(self) -> List[str]:
"""Returns a list of all scenario names."""
return list(self._scenarios.keys())
def get_scenario(self, name: str) -> Optional[Dict[str, Any]]:
"""
Retrieves a specific scenario by name.
Args:
name: The name of the scenario to retrieve.
Returns:
A dictionary with the scenario data, or None if not found.
"""
return self._scenarios.get(name)
def save_scenario(self, name: str, data: Dict[str, Any]):
"""
Saves or updates a scenario.
Args:
name: The name of the scenario to save.
data: The dictionary of scenario data to save.
"""
logger = logging.getLogger(__name__)
try:
# Basic sanity check: expect a mapping for scenario data
if not isinstance(data, dict):
logger.error(
"Attempted to save scenario '%s' with non-dict data: %r",
name,
type(data),
)
return
self._scenarios[name] = data
logger.info(
"Saving scenario '%s' (total scenarios after save: %d)",
name,
len(self._scenarios),
)
logger.debug("Scenario keys: %s", list(self._scenarios.keys()))
self._save_scenarios()
except Exception:
logger.exception("Failed to save scenario '%s'", name)
def delete_scenario(self, name: str):
"""
Deletes a scenario by name.
Args:
name: The name of the scenario to delete.
"""
if name in self._scenarios:
del self._scenarios[name]
self._save_scenarios()