381 lines
16 KiB
Python
381 lines
16 KiB
Python
# target_simulator/utils/config_manager.py
|
|
|
|
"""
|
|
Manages loading and saving of application settings and scenarios to a single JSON file.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import logging
|
|
import time
|
|
from typing import Dict, Any, Optional, List
|
|
from enum import Enum
|
|
|
|
|
|
class EnumEncoder(json.JSONEncoder):
|
|
"""A custom JSON encoder to handle Enum objects by encoding them as their values."""
|
|
|
|
def default(self, o: Any) -> Any:
|
|
if isinstance(o, Enum):
|
|
return o.value
|
|
return super().default(o)
|
|
|
|
|
|
class ConfigManager:
|
|
def save_connection_settings(self, config: Dict[str, Any]):
|
|
"""
|
|
Save both target and lru connection configs in the settings file.
|
|
"""
|
|
if "general" not in self._settings:
|
|
self._settings["general"] = {}
|
|
self._settings["general"]["connection"] = config
|
|
self._save_settings()
|
|
|
|
def get_connection_settings(self) -> Dict[str, Any]:
|
|
"""
|
|
Returns the connection settings from the 'general' section.
|
|
"""
|
|
general = self._settings.get("general", {})
|
|
return general.get("connection", {})
|
|
|
|
"""Handles reading and writing application settings and scenarios from a JSON file."""
|
|
|
|
def __init__(
|
|
self,
|
|
filename: str = "settings.json",
|
|
scenarios_filename: str = "scenarios.json",
|
|
):
|
|
"""
|
|
Initializes the ConfigManager.
|
|
|
|
Args:
|
|
filename: The name of the settings file. It will be stored in
|
|
the project root directory.
|
|
"""
|
|
if getattr(sys, "frozen", False):
|
|
application_path = os.path.dirname(sys.executable)
|
|
else:
|
|
application_path = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..")
|
|
)
|
|
|
|
self.filepath = os.path.join(application_path, filename)
|
|
self.scenarios_filepath = os.path.join(application_path, scenarios_filename)
|
|
self._settings = self._load_or_initialize_settings()
|
|
# Load scenarios from separate file if present, otherwise keep any scenarios
|
|
# found inside settings.json (fallback).
|
|
self._scenarios = self._load_or_initialize_scenarios()
|
|
# Apply debug overrides (if present) into the global DEBUG_CONFIG so
|
|
# runtime helpers (e.g., csv_logger) pick up user-configured values.
|
|
try:
|
|
from target_simulator.config import DEBUG_CONFIG
|
|
|
|
debug_block = (
|
|
self._settings.get("debug", {})
|
|
if isinstance(self._settings, dict)
|
|
else {}
|
|
)
|
|
if isinstance(debug_block, dict):
|
|
for k, v in debug_block.items():
|
|
DEBUG_CONFIG[k] = v
|
|
except Exception:
|
|
# If anything goes wrong here, we don't want to fail initialization.
|
|
pass
|
|
|
|
def _load_or_initialize_settings(self) -> Dict[str, Any]:
|
|
"""Loads settings from the JSON file or initializes with a default structure."""
|
|
# Note: do not accidentally clear `_scenarios` here. Tests may call
|
|
# this method directly to reload settings, but clearing the in-memory
|
|
# scenarios dict can lead to accidental data loss if followed by a
|
|
# save operation. Any explicit reload of scenarios should be done via
|
|
# `_load_or_initialize_scenarios()`.
|
|
|
|
if not os.path.exists(self.filepath):
|
|
return {"general": {}, "scenarios": {}}
|
|
try:
|
|
with open(self.filepath, "r", encoding="utf-8") as f:
|
|
settings = json.load(f)
|
|
if not isinstance(settings, dict) or "general" not in settings:
|
|
# If file contained only general settings (old style), wrap it
|
|
return {"general": settings, "scenarios": {}}
|
|
# Keep scenarios key if present; actual source of truth for scenarios
|
|
# will be the separate scenarios file when available.
|
|
return settings
|
|
except (json.JSONDecodeError, IOError):
|
|
return {"general": {}, "scenarios": {}}
|
|
|
|
def _save_settings(self):
|
|
"""Saves the current settings to the JSON file."""
|
|
try:
|
|
with open(self.filepath, "w", encoding="utf-8") as f:
|
|
# Write all settings except 'scenarios' (which is persisted
|
|
# separately). This preserves custom top-level keys while
|
|
# ensuring scenarios are stored in their dedicated file.
|
|
to_write = dict(self._settings)
|
|
if "scenarios" in to_write:
|
|
# don't duplicate scenarios in settings.json
|
|
del to_write["scenarios"]
|
|
json.dump(to_write, f, indent=4, cls=EnumEncoder)
|
|
except IOError as e:
|
|
print(f"Error saving settings to {self.filepath}: {e}")
|
|
|
|
def _load_or_initialize_scenarios(self) -> Dict[str, Any]:
|
|
"""Loads scenarios from the separate scenarios file if present.
|
|
|
|
Falls back to scenarios inside settings.json when scenarios.json is absent.
|
|
"""
|
|
# If scenarios file exists, load from it.
|
|
if os.path.exists(self.scenarios_filepath):
|
|
try:
|
|
with open(self.scenarios_filepath, "r", encoding="utf-8") as f:
|
|
scenarios = json.load(f)
|
|
if isinstance(scenarios, dict):
|
|
return scenarios
|
|
except (json.JSONDecodeError, IOError):
|
|
return {}
|
|
|
|
# Fallback: try to read scenarios stored inside settings.json
|
|
try:
|
|
with open(self.filepath, "r", encoding="utf-8") as f:
|
|
settings = json.load(f)
|
|
if isinstance(settings, dict) and "scenarios" in settings:
|
|
return settings.get("scenarios", {})
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
def _save_scenarios(self):
|
|
"""Saves scenarios to the separate scenarios file."""
|
|
logger = logging.getLogger(__name__)
|
|
try:
|
|
# If there are no scenarios to save, avoid overwriting an existing
|
|
# scenarios file with an empty dict. This prevents loss of data when
|
|
# the application shuts down while _scenarios is temporarily empty.
|
|
if not self._scenarios:
|
|
if os.path.exists(self.scenarios_filepath):
|
|
logger.info(
|
|
"Skipping saving scenarios: in-memory scenarios empty and '%s' exists",
|
|
self.scenarios_filepath,
|
|
)
|
|
# Keep existing file untouched and return early.
|
|
return
|
|
# If file doesn't exist and no scenarios, nothing to do.
|
|
logger.info(
|
|
"No scenarios to save and '%s' does not exist; skipping write",
|
|
self.scenarios_filepath,
|
|
)
|
|
return
|
|
|
|
# If target exists, create a rotating backup set (keep last N versions)
|
|
backup_count = 5
|
|
try:
|
|
if os.path.exists(self.scenarios_filepath):
|
|
# rotate existing backups: bak{n-1} -> bak{n}
|
|
for i in range(backup_count - 1, 0, -1):
|
|
src = f"{self.scenarios_filepath}.bak{i}"
|
|
dst = f"{self.scenarios_filepath}.bak{i+1}"
|
|
if os.path.exists(src):
|
|
try:
|
|
shutil.move(src, dst)
|
|
except Exception:
|
|
pass
|
|
# copy current to bak1
|
|
try:
|
|
shutil.copy2(
|
|
self.scenarios_filepath, f"{self.scenarios_filepath}.bak1"
|
|
)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
# Non-fatal: proceed with write even if backup fails
|
|
pass
|
|
# Before writing, make an explicit timestamped backup of the current
|
|
# scenarios file to aid recovery in case of unexpected truncation.
|
|
try:
|
|
if os.path.exists(self.scenarios_filepath):
|
|
ts = int(time.time())
|
|
prebackup = f"{self.scenarios_filepath}.prewrite.{ts}.bak"
|
|
try:
|
|
shutil.copy2(self.scenarios_filepath, prebackup)
|
|
logger.debug("Created pre-write backup: %s", prebackup)
|
|
except Exception:
|
|
logger.debug("Pre-write backup failed, continuing", exc_info=True)
|
|
except Exception:
|
|
# Non-fatal
|
|
pass
|
|
|
|
# Write atomically: write to a temp file then validate and replace the target.
|
|
dirpath = os.path.dirname(self.scenarios_filepath) or "."
|
|
tmp_path = os.path.join(dirpath, f".{os.path.basename(self.scenarios_filepath)}.tmp")
|
|
with open(tmp_path, "w", encoding="utf-8") as f:
|
|
json.dump(self._scenarios, f, indent=4, cls=EnumEncoder)
|
|
|
|
# Validate the temporary file contains decodable JSON and expected type
|
|
try:
|
|
with open(tmp_path, "r", encoding="utf-8") as tf:
|
|
validated = json.load(tf)
|
|
if not isinstance(validated, dict):
|
|
logger.error("Temporary scenarios file did not contain a JSON object; aborting write")
|
|
try:
|
|
os.remove(tmp_path)
|
|
except Exception:
|
|
pass
|
|
return
|
|
except Exception as e:
|
|
logger.error("Failed to validate temporary scenarios file: %s", e, exc_info=True)
|
|
try:
|
|
os.remove(tmp_path)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Heuristic safeguard: if replacing an existing file would dramatically
|
|
# reduce the number of stored scenarios (e.g., existing had many and new
|
|
# has only one empty entry), preserve the existing file and write the
|
|
# candidate to a .suspect file for manual inspection instead of
|
|
# overwriting silently.
|
|
try:
|
|
if os.path.exists(self.scenarios_filepath):
|
|
try:
|
|
with open(self.scenarios_filepath, "r", encoding="utf-8") as ef:
|
|
existing = json.load(ef)
|
|
except Exception:
|
|
existing = None
|
|
|
|
try:
|
|
new_count = len(validated) if isinstance(validated, dict) else 0
|
|
existing_count = len(existing) if isinstance(existing, dict) else 0
|
|
if (
|
|
existing_count > 1
|
|
and new_count <= 1
|
|
and any(not v for v in validated.values())
|
|
):
|
|
suspect_path = f"{self.scenarios_filepath}.suspect.{int(time.time())}.json"
|
|
try:
|
|
shutil.copy2(tmp_path, suspect_path)
|
|
except Exception:
|
|
pass
|
|
logger.error(
|
|
"New scenarios content looks suspicious (existing had %d entries, new has %d); wrote suspect file %s and skipped overwrite",
|
|
existing_count,
|
|
new_count,
|
|
suspect_path,
|
|
)
|
|
try:
|
|
os.remove(tmp_path)
|
|
except Exception:
|
|
pass
|
|
return
|
|
except Exception:
|
|
# If anything goes wrong in this heuristic, continue with replace
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
# Atomic replace
|
|
try:
|
|
os.replace(tmp_path, self.scenarios_filepath)
|
|
logger.info("Saved scenarios to %s (atomic)", self.scenarios_filepath)
|
|
except Exception:
|
|
# Fallback: try to remove target and rename
|
|
try:
|
|
if os.path.exists(self.scenarios_filepath):
|
|
os.remove(self.scenarios_filepath)
|
|
os.rename(tmp_path, self.scenarios_filepath)
|
|
except Exception as e:
|
|
# If rename succeeded, log success; otherwise log error
|
|
if os.path.exists(self.scenarios_filepath):
|
|
logger.info(
|
|
"Saved scenarios to %s (rename fallback)",
|
|
self.scenarios_filepath,
|
|
)
|
|
else:
|
|
logger.error(
|
|
"Error finalizing scenarios write to %s: %s",
|
|
self.scenarios_filepath,
|
|
e,
|
|
)
|
|
except IOError as e:
|
|
logger.error("Error saving scenarios to %s: %s", self.scenarios_filepath, e)
|
|
|
|
def get_general_settings(self) -> Dict[str, Any]:
|
|
"""Returns the general settings."""
|
|
return self._settings.get("general", {})
|
|
|
|
def save_general_settings(self, data: Dict[str, Any]):
|
|
"""Saves the general settings."""
|
|
# Merge incoming general settings with existing ones to avoid
|
|
# overwriting unrelated keys (for example: logger_panel saved prefs).
|
|
existing = (
|
|
self._settings.get("general", {})
|
|
if isinstance(self._settings, dict)
|
|
else {}
|
|
)
|
|
try:
|
|
# Shallow merge: if a key contains a dict, update its contents
|
|
for k, v in (data or {}).items():
|
|
if (
|
|
isinstance(v, dict)
|
|
and k in existing
|
|
and isinstance(existing.get(k), dict)
|
|
):
|
|
existing[k].update(v)
|
|
else:
|
|
existing[k] = v
|
|
except Exception:
|
|
# Fallback: replace entirely if merge fails
|
|
existing = data
|
|
|
|
self._settings["general"] = existing
|
|
self._save_settings()
|
|
|
|
def get_scenario_names(self) -> List[str]:
|
|
"""Returns a list of all scenario names."""
|
|
return list(self._scenarios.keys())
|
|
|
|
def get_scenario(self, name: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Retrieves a specific scenario by name.
|
|
|
|
Args:
|
|
name: The name of the scenario to retrieve.
|
|
|
|
Returns:
|
|
A dictionary with the scenario data, or None if not found.
|
|
"""
|
|
return self._scenarios.get(name)
|
|
|
|
def save_scenario(self, name: str, data: Dict[str, Any]):
|
|
"""
|
|
Saves or updates a scenario.
|
|
|
|
Args:
|
|
name: The name of the scenario to save.
|
|
data: The dictionary of scenario data to save.
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
try:
|
|
# Basic sanity check: expect a mapping for scenario data
|
|
if not isinstance(data, dict):
|
|
logger.error("Attempted to save scenario '%s' with non-dict data: %r", name, type(data))
|
|
return
|
|
self._scenarios[name] = data
|
|
logger.info("Saving scenario '%s' (total scenarios after save: %d)", name, len(self._scenarios))
|
|
logger.debug("Scenario keys: %s", list(self._scenarios.keys()))
|
|
self._save_scenarios()
|
|
except Exception:
|
|
logger.exception("Failed to save scenario '%s'", name)
|
|
|
|
def delete_scenario(self, name: str):
|
|
"""
|
|
Deletes a scenario by name.
|
|
|
|
Args:
|
|
name: The name of the scenario to delete.
|
|
"""
|
|
if name in self._scenarios:
|
|
del self._scenarios[name]
|
|
self._save_scenarios()
|