73 lines
2.3 KiB
Python
73 lines
2.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
import threading
|
|
import time
|
|
import logging
|
|
from typing import Dict, Any
|
|
from pymsc.core.bus_1553_module import Bus1553Module
|
|
from pymsc.utils.logger import CustomLogger
|
|
from pymsc.utils.profiler import monitor_execution
|
|
|
|
class MissionLogic:
|
|
"""
|
|
Main orchestrator for PyMsc modules and background sync tasks.
|
|
"""
|
|
_instance = None
|
|
|
|
def __new__(cls):
|
|
if not cls._instance:
|
|
cls._instance = super(MissionLogic, cls).__new__(cls)
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if hasattr(self, 'initialized'):
|
|
return
|
|
self.initialized = True
|
|
self.logger = CustomLogger().get_logger()
|
|
self.bus_1553 = Bus1553Module()
|
|
self.sync_thread = None
|
|
self.keep_syncing = False
|
|
|
|
def initialize_system(self, config_1553: Dict[str, Any]) -> bool:
|
|
"""Initializes the 1553 hardware module."""
|
|
return self.bus_1553.initialize(config_1553)
|
|
|
|
def start_mission(self):
|
|
"""Starts the physical session and the background sync thread."""
|
|
self.bus_1553.start_session()
|
|
self.keep_syncing = True
|
|
self.sync_thread = threading.Thread(target=self._background_sync_task)
|
|
self.sync_thread.daemon = True
|
|
self.sync_thread.start()
|
|
self.logger.info("Mission logic started.")
|
|
|
|
def stop_mission(self):
|
|
"""Clean shutdown of all threads."""
|
|
self.keep_syncing = False
|
|
self.bus_1553.stop_session()
|
|
if self.sync_thread:
|
|
self.sync_thread.join(timeout=1.0)
|
|
self.logger.info("Mission logic stopped.")
|
|
|
|
def _background_sync_task(self):
|
|
"""
|
|
Infinite loop for hardware synchronization.
|
|
Runs in a dedicated thread.
|
|
"""
|
|
while self.keep_syncing:
|
|
self._execute_sync_cycle()
|
|
time.sleep(0.02) # Frequency: 50Hz
|
|
|
|
@monitor_execution
|
|
def _execute_sync_cycle(self):
|
|
"""
|
|
Single synchronization step.
|
|
Decorated to measure performance of the 1553 sync.
|
|
"""
|
|
self.bus_1553.sync_all_messages()
|
|
|
|
def get_system_status(self) -> Dict[str, Any]:
|
|
"""Returns the status of all active modules."""
|
|
return {
|
|
"bus_1553": self.bus_1553.get_status(),
|
|
"sync_active": self.keep_syncing
|
|
} |