fix table with report automatic

This commit is contained in:
VALLONGOL 2025-05-23 09:38:49 +02:00
parent ec89134d2c
commit a668497221
2 changed files with 554 additions and 198 deletions

View File

@ -3,15 +3,34 @@
import logging import logging
import os import os
import time # For timestamping if we log data directly for now import time
from typing import Dict, Any, Optional, Callable import json # For saving the summary report
import re # For sanitizing filenames
from datetime import datetime # For timestamping
from typing import Dict, Any, Optional, Callable, List
from .gdb_controller import GDBSession from .gdb_controller import GDBSession
from .config_manager import AppSettings from .config_manager import AppSettings
# from .output_formatter import save_to_json, save_to_csv # For later from .output_formatter import save_to_json as save_data_to_json_file # Alias to avoid confusion
from .output_formatter import save_to_csv as save_data_to_csv_file # Alias
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Data structure for the execution log entries to be displayed in GUI and saved in summary
# TypedDict could be used here if preferred for more formal structure
ExecutionLogEntry = Dict[str, str] # E.g., {"breakpoint": "main", "variable": "argc", "file_produced": "dump.json", "status": "Success"}
def sanitize_filename_component(component: str) -> str:
"""Sanitizes a string component to be safe for filenames."""
if not component:
return "unknown"
# Remove or replace characters invalid for filenames on most OS
# This is a basic sanitization, might need to be more robust
component = re.sub(r'[\\/*?:"<>|]', "_", component)
component = component.replace(" ", "_")
return component[:50] # Limit length of each component
class ProfileExecutor: class ProfileExecutor:
""" """
Orchestrates the execution of a debug profile, interacting with GDBSession. Orchestrates the execution of a debug profile, interacting with GDBSession.
@ -22,221 +41,426 @@ class ProfileExecutor:
app_settings: AppSettings, app_settings: AppSettings,
status_update_callback: Optional[Callable[[str], None]] = None, status_update_callback: Optional[Callable[[str], None]] = None,
gdb_output_callback: Optional[Callable[[str], None]] = None, gdb_output_callback: Optional[Callable[[str], None]] = None,
json_output_callback: Optional[Callable[[Any], None]] = None json_output_callback: Optional[Callable[[Any], None]] = None,
# NEW callback for structured log entries
execution_log_callback: Optional[Callable[[ExecutionLogEntry], None]] = None
): ):
"""
Initializes the ProfileExecutor.
Args:
profile_data: The dictionary containing the profile configuration.
app_settings: The application settings instance.
status_update_callback: Callback to update status in GUI.
gdb_output_callback: Callback to send GDB raw output to GUI.
json_output_callback: Callback to send parsed JSON data to GUI.
"""
self.profile = profile_data self.profile = profile_data
self.app_settings = app_settings self.app_settings = app_settings
self.gdb_session: Optional[GDBSession] = None self.gdb_session: Optional[GDBSession] = None
self.is_running: bool = False self.is_running: bool = False
self._stop_requested: bool = False self._stop_requested: bool = False
self.status_updater = status_update_callback if status_update_callback else lambda msg: logger.info(f"Status: {msg}") self.profile_execution_summary: Dict[str, Any] = {} # To store all summary data
self.gdb_output_writer = gdb_output_callback if gdb_output_callback else lambda msg: logger.debug(f"GDB Output: {msg}") self.produced_files_log: List[ExecutionLogEntry] = [] # Log of files produced
self.json_data_handler = json_output_callback if json_output_callback else lambda data: logger.debug(f"JSON Data: {str(data)[:200]}") self.execution_event_log: List[str] = [] # General text log of execution steps
self.status_updater = status_update_callback if status_update_callback else self._default_status_update
self.gdb_output_writer = gdb_output_callback if gdb_output_callback else self._default_gdb_output
self.json_data_handler = json_output_callback if json_output_callback else self._default_json_data
self.execution_log_adder = execution_log_callback if execution_log_callback else self._default_execution_log
logger.info(f"ProfileExecutor initialized for profile: '{self.profile.get('profile_name', 'Unnamed Profile')}'") logger.info(f"ProfileExecutor initialized for profile: '{self.profile.get('profile_name', 'Unnamed Profile')}'")
# Default callbacks if none are provided (e.g., for non-GUI execution in future)
def _default_status_update(self, msg: str): logger.info(f"Status: {msg}")
def _default_gdb_output(self, msg: str): logger.debug(f"GDB Output: {msg}")
def _default_json_data(self, data: Any): logger.debug(f"JSON Data: {str(data)[:200]}")
def _default_execution_log(self, entry: ExecutionLogEntry): logger.info(f"Execution Log: {entry}")
def _log_event(self, message: str, is_status_update: bool = True) -> None:
"""Logs an event to the internal event log and optionally updates status."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"[{timestamp}] {message}"
self.execution_event_log.append(log_message)
if is_status_update:
self.status_updater(message) # Update GUI status with the core message
logger.info(message) # Also log to main application logger
def _add_produced_file_entry(self, breakpoint_loc: str, variable_name: str, file_path: str, status: str, details: str = "") -> None:
entry: ExecutionLogEntry = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"breakpoint": breakpoint_loc,
"variable": variable_name,
"file_produced": os.path.basename(file_path) if file_path else "N/A",
"full_path": file_path if file_path else "N/A", # Store full path for reference
"status": status, # e.g., "Success", "Failed", "Skipped"
"details": details
}
self.produced_files_log.append(entry)
self.execution_log_adder(entry) # Send to GUI for display
def _get_setting(self, category: str, key: str, default: Optional[Any] = None) -> Any: def _get_setting(self, category: str, key: str, default: Optional[Any] = None) -> Any:
"""Helper to get settings via app_settings."""
return self.app_settings.get_setting(category, key, default) return self.app_settings.get_setting(category, key, default)
def _get_dumper_options(self) -> Dict[str, Any]: def _get_dumper_options(self) -> Dict[str, Any]:
"""Helper to get dumper options."""
return self.app_settings.get_category_settings("dumper_options", {}) return self.app_settings.get_category_settings("dumper_options", {})
def _generate_output_filename(self, pattern: str, profile_name: str, bp_loc: str, var_name: str, file_format: str) -> str:
"""Generates a filename based on the pattern and context."""
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] # Milliseconds
placeholders = {
"{profile_name}": sanitize_filename_component(profile_name),
"{app_name}": sanitize_filename_component(os.path.basename(self.profile.get("target_executable", "app"))),
"{breakpoint}": sanitize_filename_component(bp_loc),
"{variable}": sanitize_filename_component(var_name),
"{timestamp}": timestamp_str,
"{format}": file_format.lower()
}
filename = pattern
for ph, val in placeholders.items():
filename = filename.replace(ph, val)
# Ensure it ends with the correct extension if not already handled by pattern's {format}
if not filename.lower().endswith(f".{file_format.lower()}"):
filename += f".{file_format.lower()}"
return filename
def _prepare_output_directory(self, base_output_dir_from_action: str, profile_name: str) -> Optional[str]:
"""Creates the output directory structure: base_dir/profile_name_timestamp/"""
if not base_output_dir_from_action:
self._log_event(f"Error: Output directory for action is not specified in profile '{profile_name}'.", True)
return None
# Sanitize profile name for directory use
sane_profile_name = sanitize_filename_component(profile_name)
timestamp_dir = datetime.now().strftime("%Y%m%d_%H%M%S")
# Specific execution directory: base_output_dir / profile_name / profile_name_timestamp
# This makes it easy to group all files from one specific run.
specific_run_dirname = f"{sane_profile_name}_{timestamp_dir}"
# The `base_output_dir_from_action` is the root where the profile's folder will be created.
# For example, if action says "./dumps", and profile is "MyTest",
# we create: ./dumps/MyTest/MyTest_20230101_120000/
# This seems a bit nested. Let's simplify:
# Action `output_directory` becomes the PARENT of the `specific_run_dirname`
profile_execution_path = os.path.join(base_output_dir_from_action, specific_run_dirname)
try:
os.makedirs(profile_execution_path, exist_ok=True)
self._log_event(f"Output directory prepared: {profile_execution_path}", False)
return profile_execution_path
except OSError as e:
self._log_event(f"Error creating output directory '{profile_execution_path}': {e}", True)
return None
def run(self) -> None: def run(self) -> None:
"""
Starts the automated execution of the debug profile.
This method will run synchronously for now for simplicity in this first step.
Later, it might need to run in a separate thread to keep the GUI responsive.
"""
profile_name = self.profile.get("profile_name", "Unnamed Profile") profile_name = self.profile.get("profile_name", "Unnamed Profile")
self.status_updater(f"Starting profile: '{profile_name}'...") self._log_event(f"Starting profile: '{profile_name}'...", True)
self.is_running = True self.is_running = True
self._stop_requested = False self._stop_requested = False
self.produced_files_log.clear()
self.execution_event_log.clear()
self.profile_execution_summary = {
"profile_name": profile_name,
"target_executable": self.profile.get("target_executable"),
"program_parameters": self.profile.get("program_parameters"),
"start_time": datetime.now().isoformat(),
"end_time": None,
"status": "Initialized",
"actions_summary": [], # Will be populated
"execution_log": [], # Will be self.execution_event_log
"files_produced_detailed": [] # Will be self.produced_files_log
}
gdb_exe = self._get_setting("general", "gdb_executable_path") gdb_exe = self._get_setting("general", "gdb_executable_path")
target_exe = self.profile.get("target_executable") target_exe = self.profile.get("target_executable")
gdb_script_path = self._get_setting("general", "gdb_dumper_script_path") gdb_script_path = self._get_setting("general", "gdb_dumper_script_path")
if not target_exe or not os.path.exists(target_exe): if not target_exe or not os.path.exists(target_exe):
self.status_updater(f"Error: Target executable '{target_exe}' not found for profile '{profile_name}'.") msg = f"Error: Target executable '{target_exe}' not found for profile '{profile_name}'."
logger.error(f"Target executable '{target_exe}' not found for profile '{profile_name}'.") self._log_event(msg, True)
self.profile_execution_summary["status"] = "Error: Target not found"
self.is_running = False self.is_running = False
self._finalize_summary_report(None) # Try to save what we have
return
# The main output directory for this entire profile *run*
# We use the first action's output_directory as the base for creating the unique run folder.
# All actions in this profile run will save into this unique folder.
actions = self.profile.get("actions", [])
if not actions:
self._log_event(f"Profile '{profile_name}' has no actions defined. Stopping.", True)
self.profile_execution_summary["status"] = "Error: No actions"
self._finalize_summary_report(None)
return
# Use the output_directory from the first action to determine the parent for the run-specific folder
# A better approach might be a global output_directory for the profile itself.
# For now, assume all actions in a profile intend to save to subfolders of a common base.
# We'll use the first action's directory as this common base.
first_action_output_dir_base = actions[0].get("output_directory", ".")
self.current_run_output_path = self._prepare_output_directory(first_action_output_dir_base, profile_name)
if not self.current_run_output_path:
# Error already logged by _prepare_output_directory
self.profile_execution_summary["status"] = "Error: Cannot create output directory"
self.is_running = False
self._finalize_summary_report(None)
return return
try: try:
self.gdb_session = GDBSession( self.gdb_session = GDBSession(
gdb_path=gdb_exe, gdb_path=gdb_exe, executable_path=target_exe,
executable_path=target_exe, gdb_script_full_path=gdb_script_path, dumper_options=self._get_dumper_options()
gdb_script_full_path=gdb_script_path,
dumper_options=self._get_dumper_options()
) )
startup_timeout = self._get_setting("timeouts", "gdb_start", 30) startup_timeout = self._get_setting("timeouts", "gdb_start", 30)
self.status_updater(f"Profile '{profile_name}': Spawning GDB for '{os.path.basename(target_exe)}'...") self._log_event(f"Spawning GDB for '{os.path.basename(target_exe)}'...", True)
self.gdb_session.start(timeout=startup_timeout) self.gdb_session.start(timeout=startup_timeout)
self.gdb_output_writer(f"GDB session started for profile '{profile_name}'.\n") self.gdb_output_writer(f"GDB session started for profile '{profile_name}'.\n")
self._log_event("GDB session started.", False)
if gdb_script_path and self.gdb_session.gdb_script_sourced_successfully: if gdb_script_path and self.gdb_session.gdb_script_sourced_successfully:
self.gdb_output_writer(f"GDB dumper script '{os.path.basename(gdb_script_path)}' sourced successfully.\n") self.gdb_output_writer(f"GDB dumper script '{os.path.basename(gdb_script_path)}' sourced successfully.\n")
self._log_event("GDB dumper script sourced successfully.", False)
elif gdb_script_path: elif gdb_script_path:
self.gdb_output_writer(f"Warning: GDB dumper script '{os.path.basename(gdb_script_path)}' failed to load.\n") self.gdb_output_writer(f"Warning: GDB dumper script '{os.path.basename(gdb_script_path)}' failed to load.\n")
self._log_event("Warning: GDB dumper script failed to load.", False)
# --- Simplified Execution Logic for the first action and first variable --- # --- Main execution loop for actions ---
actions = self.profile.get("actions", []) program_exited_prematurely = False
if not actions: for action_index, action in enumerate(actions):
self.status_updater(f"Profile '{profile_name}' has no actions defined. Stopping.") if self._stop_requested:
logger.warning(f"Profile '{profile_name}' has no actions.") self._log_event("Execution stopped by user request.", True)
self._cleanup_session() break
return
# For now, just process the first action action_summary = {
first_action = actions[0] "action_index": action_index,
bp_location = first_action.get("breakpoint_location") "breakpoint": action.get("breakpoint_location", "N/A"),
vars_to_dump = first_action.get("variables_to_dump", []) "variables_dumped_count": 0,
continue_after = first_action.get("continue_after_dump", True) "status": "Pending"
}
self.profile_execution_summary["actions_summary"].append(action_summary)
bp_location = action.get("breakpoint_location")
vars_to_dump = action.get("variables_to_dump", [])
continue_after = action.get("continue_after_dump", True)
output_format = action.get("output_format", "json").lower()
filename_pattern = action.get("filename_pattern", "{breakpoint}_{variable}_{timestamp}.{format}")
# Output directory is now self.current_run_output_path for all files in this run.
if not bp_location: if not bp_location:
self.status_updater(f"Profile '{profile_name}': First action has no breakpoint. Stopping.") self._log_event(f"Action {action_index + 1}: No breakpoint location. Skipping action.", True)
logger.error(f"Profile '{profile_name}': First action missing breakpoint.") action_summary["status"] = "Skipped: No breakpoint"
self._cleanup_session() continue
return
if not vars_to_dump: self._log_event(f"Action {action_index + 1}: Setting breakpoint at '{bp_location}'...", True)
self.status_updater(f"Profile '{profile_name}': First action at '{bp_location}' has no variables to dump. Setting BP and running.")
# We might still want to run to the breakpoint even if no vars are dumped.
# For now, let's proceed if there's a BP.
# Set breakpoint
self.status_updater(f"Profile '{profile_name}': Setting breakpoint at '{bp_location}'...")
cmd_timeout = self._get_setting("timeouts", "gdb_command", 30) cmd_timeout = self._get_setting("timeouts", "gdb_command", 30)
bp_output = self.gdb_session.set_breakpoint(bp_location, timeout=cmd_timeout) bp_output = self.gdb_session.set_breakpoint(bp_location, timeout=cmd_timeout)
self.gdb_output_writer(bp_output) self.gdb_output_writer(bp_output)
if "Breakpoint" not in bp_output and "pending" not in bp_output.lower(): if "Breakpoint" not in bp_output and "pending" not in bp_output.lower():
self.status_updater(f"Error: Failed to set breakpoint '{bp_location}'. Check GDB output.") self._log_event(f"Error: Action {action_index + 1}: Failed to set breakpoint '{bp_location}'. Skipping action.", True)
logger.error(f"Failed to set breakpoint '{bp_location}' for profile '{profile_name}'. Output: {bp_output}") action_summary["status"] = f"Error: Failed to set BP"
self._cleanup_session() self._add_produced_file_entry(bp_location, "N/A", "", "Error", f"Failed to set breakpoint: {bp_output[:100]}")
return continue # Move to next action or stop if critical? For now, continue.
# Run program # Run or Continue program
program_params = self.profile.get("program_parameters", "") # Only run on the very first action that requires it, then continue.
self.status_updater(f"Profile '{profile_name}': Running program '{os.path.basename(target_exe)} {program_params}'...") # This needs to be smarter if breakpoints are out of order or program restarts.
# For now, simple model: run once, then continue.
gdb_run_cmd_output = ""
run_timeout = self._get_setting("timeouts", "program_run_continue", 120) run_timeout = self._get_setting("timeouts", "program_run_continue", 120)
run_output = self.gdb_session.run_program(program_params, timeout=run_timeout)
self.gdb_output_writer(run_output)
if self._stop_requested: if action_index == 0: # TODO: This logic needs to be more robust for multiple runs or complex scenarios
self.status_updater(f"Profile '{profile_name}' execution stopped by user request.") program_params = self.profile.get("program_parameters", "")
self._cleanup_session() self._log_event(f"Running program '{os.path.basename(target_exe)} {program_params}'...", True)
return gdb_run_cmd_output = self.gdb_session.run_program(program_params, timeout=run_timeout)
else: # For subsequent breakpoints, we 'continue'
self._log_event(f"Continuing execution for action {action_index + 1} (BP: {bp_location})...", True)
gdb_run_cmd_output = self.gdb_session.continue_execution(timeout=run_timeout)
# Check if breakpoint was hit self.gdb_output_writer(gdb_run_cmd_output)
if "Breakpoint" in run_output or \ if self._stop_requested: break
(hasattr(self.gdb_session.child, 'before') and "Breakpoint" in self.gdb_session.child.before): # Check 'before' as well
self.status_updater(f"Profile '{profile_name}': Hit breakpoint at '{bp_location}'.")
if vars_to_dump: if "Program exited normally" in gdb_run_cmd_output or "exited with code" in gdb_run_cmd_output:
var_to_dump = vars_to_dump[0] # Just the first one for now self._log_event(f"Program exited before or during action {action_index + 1} (BP: {bp_location}).", True)
self.status_updater(f"Profile '{profile_name}': Dumping variable '{var_to_dump}'...") program_exited_prematurely = True
action_summary["status"] = "Skipped: Program exited"
break # Stop processing further actions if program ended
is_breakpoint_hit = "Breakpoint" in gdb_run_cmd_output or \
(hasattr(self.gdb_session.child, 'before') and "Breakpoint" in self.gdb_session.child.before)
if not is_breakpoint_hit:
self._log_event(f"Action {action_index + 1}: Breakpoint '{bp_location}' not hit as expected. Output: {gdb_run_cmd_output[:100]}...", True)
action_summary["status"] = "Skipped: BP not hit"
self._add_produced_file_entry(bp_location, "N/A", "", "Not Hit", f"BP not hit. GDB: {gdb_run_cmd_output[:100]}")
# If continue_after is false, we'd be stuck. If true, GDB might be running or exited.
# For now, if BP not hit and we were supposed to dump, this action fails.
# If continue_after_dump is true, we might have already continued past other potential BPs.
# This area needs careful thought for complex execution flows.
if not continue_after:
self._log_event(f"Action {action_index + 1}: Halting profile as breakpoint was not hit and continue_after_dump is false.", True)
program_exited_prematurely = True # Treat as if it ended for subsequent actions
break
continue # To next action, assuming program is still running or will hit another BP.
self._log_event(f"Action {action_index + 1}: Hit breakpoint at '{bp_location}'.", True)
action_summary["status"] = "Processing..." # Intermediate status
if not vars_to_dump:
self._log_event(f"Action {action_index + 1}: No variables specified to dump at '{bp_location}'.", False)
self._add_produced_file_entry(bp_location, "N/A", "", "Skipped", "No variables to dump")
dump_success_count_for_action = 0
for var_to_dump in vars_to_dump:
if self._stop_requested: break
self._log_event(f"Dumping variable '{var_to_dump}'...", True)
dump_timeout = self._get_setting("timeouts", "dump_variable", 60) dump_timeout = self._get_setting("timeouts", "dump_variable", 60)
if not self.gdb_session.gdb_script_sourced_successfully: dumped_data = None
msg = f"Profile '{profile_name}': GDB Dumper script not available/loaded. Cannot dump '{var_to_dump}'." file_save_path = ""
self.status_updater(msg) dump_status = "Failed"
logger.warning(msg) dump_details = ""
self.json_data_handler({"_profile_executor_error": msg})
else:
dumped_json = self.gdb_session.dump_variable_to_json(var_to_dump, timeout=dump_timeout)
self.json_data_handler(dumped_json) # Send to GUI/logger
self.gdb_output_writer(f"Dumped '{var_to_dump}': {str(dumped_json)[:200]}...\n") # Also to GDB raw output for now
if isinstance(dumped_json, dict) and "_gdb_tool_error" in dumped_json: if not self.gdb_session.gdb_script_sourced_successfully and output_format == "json":
self.status_updater(f"Error dumping '{var_to_dump}': {dumped_json.get('details', '')}") msg = f"GDB Dumper script not available/loaded. Cannot dump '{var_to_dump}' as JSON."
self._log_event(msg, True)
dump_details = msg
self.json_data_handler({"_profile_executor_error": msg, "variable": var_to_dump})
else: else:
self.status_updater(f"Successfully dumped '{var_to_dump}'.") dumped_data = self.gdb_session.dump_variable_to_json(var_to_dump, timeout=dump_timeout) # Assuming JSON for now
# Placeholder for saving to file later self.json_data_handler(dumped_data) # Send to GUI/logger
logger.info(f"Profile '{profile_name}' - Dumped data for '{var_to_dump}': {str(dumped_json)[:100]}...")
if isinstance(dumped_data, dict) and "_gdb_tool_error" in dumped_data:
err_detail = dumped_data.get("details", dumped_data["_gdb_tool_error"])
self._log_event(f"Error dumping '{var_to_dump}': {err_detail}", True)
dump_details = f"GDB Tool Error: {err_detail}"
elif dumped_data is not None:
self._log_event(f"Successfully dumped '{var_to_dump}'. Preparing to save.", False)
# Generate filename and save
output_filename = self._generate_output_filename(filename_pattern, profile_name, bp_location, var_to_dump, output_format)
file_save_path = os.path.join(self.current_run_output_path, output_filename)
try:
if output_format == "json":
save_data_to_json_file(dumped_data, file_save_path)
elif output_format == "csv":
# Adapt data for CSV if necessary (as in main_window)
data_for_csv = dumped_data
if isinstance(data_for_csv, dict) and not isinstance(data_for_csv, list): data_for_csv = [data_for_csv]
elif not isinstance(data_for_csv, list): data_for_csv = [{"value": data_for_csv}]
elif isinstance(data_for_csv, list) and data_for_csv and not all(isinstance(item, dict) for item in data_for_csv):
data_for_csv = [{"value": item} for item in data_for_csv]
save_data_to_csv_file(data_for_csv, file_save_path)
else:
raise ValueError(f"Unsupported output format: {output_format}")
self._log_event(f"Saved '{var_to_dump}' to '{output_filename}'.", True)
dump_status = "Success"
dump_success_count_for_action +=1
except Exception as save_e:
self._log_event(f"Error saving dump of '{var_to_dump}' to '{file_save_path}': {save_e}", True)
dump_details = f"Save Error: {save_e}"
else: # Dumped data is None, but no _gdb_tool_error
self._log_event(f"Dump of '{var_to_dump}' returned no data.", True)
dump_details = "Dump returned no data"
self._add_produced_file_entry(bp_location, var_to_dump, file_save_path, dump_status, dump_details)
action_summary["variables_dumped_count"] = dump_success_count_for_action
if dump_success_count_for_action == len(vars_to_dump) and vars_to_dump: # All vars in action dumped
action_summary["status"] = "Completed"
elif not vars_to_dump: # No vars to dump for this action, BP was hit
action_summary["status"] = "Completed (No Vars)"
else: # Some vars might have failed
action_summary["status"] = "Completed with Errors"
if continue_after: if not continue_after:
if self._stop_requested: self._log_event(f"Action {action_index + 1}: Execution paused at '{bp_location}' as per profile. Profile will now terminate.", True)
self.status_updater(f"Profile '{profile_name}' execution stopped by user request before continue.") break # End profile execution here
self._cleanup_session()
return
self.status_updater(f"Profile '{profile_name}': Continuing execution...")
continue_output = self.gdb_session.continue_execution(timeout=run_timeout)
self.gdb_output_writer(continue_output)
# Rudimentary: check for program exit after continue
if "Program exited normally" in continue_output or "exited with code" in continue_output:
self.status_updater(f"Profile '{profile_name}': Program exited after continue.")
else:
self.status_updater(f"Profile '{profile_name}': Program continued. Further automatic steps not yet implemented.")
else:
self.status_updater(f"Profile '{profile_name}': Execution paused at '{bp_location}' as per profile action (continue_after_dump is false).")
elif "Program exited normally" in run_output or "exited with code" in run_output: if self._stop_requested: break # Check again before explicit continue
self.status_updater(f"Profile '{profile_name}': Program exited before hitting breakpoint '{bp_location}'.")
# After loop of actions or if break
if program_exited_prematurely:
self.profile_execution_summary["status"] = "Completed (Program Exited Prematurely)"
elif self._stop_requested:
self.profile_execution_summary["status"] = "Completed (User Stopped)"
else: else:
self.status_updater(f"Profile '{profile_name}': Program did not hit breakpoint '{bp_location}' as expected. Output: {run_output[:100]}...") self.profile_execution_summary["status"] = "Completed"
except FileNotFoundError as fnf_e: except FileNotFoundError as fnf_e:
err_msg = f"Error running profile '{profile_name}': File not found - {fnf_e}" msg = f"Error running profile '{profile_name}': File not found - {fnf_e}"
self.status_updater(err_msg) self._log_event(msg, True)
logger.error(err_msg, exc_info=True) self.profile_execution_summary["status"] = f"Error: {fnf_e}"
except (ConnectionError, TimeoutError) as session_e: except (ConnectionError, TimeoutError) as session_e:
err_msg = f"Session error running profile '{profile_name}': {type(session_e).__name__} - {session_e}" msg = f"Session error running profile '{profile_name}': {type(session_e).__name__} - {session_e}"
self.status_updater(err_msg) self._log_event(msg, True)
logger.error(err_msg, exc_info=True) self.profile_execution_summary["status"] = f"Error: {session_e}"
except Exception as e: except Exception as e:
err_msg = f"Unexpected error running profile '{profile_name}': {type(e).__name__} - {e}" msg = f"Unexpected error running profile '{profile_name}': {type(e).__name__} - {e}"
self.status_updater(err_msg) self._log_event(msg, True)
logger.critical(err_msg, exc_info=True) logger.critical(msg, exc_info=True) # Log full traceback for unexpected
self.profile_execution_summary["status"] = f"Critical Error: {e}"
finally: finally:
self._cleanup_session() self._cleanup_session()
self.status_updater(f"Profile '{profile_name}' execution finished.") self.profile_execution_summary["end_time"] = datetime.now().isoformat()
self.profile_execution_summary["execution_log"] = self.execution_event_log
self.profile_execution_summary["files_produced_detailed"] = self.produced_files_log
self._finalize_summary_report(self.current_run_output_path if hasattr(self, 'current_run_output_path') else None)
self._log_event(f"Profile '{profile_name}' execution finished. Summary report generated.", True)
self.is_running = False self.is_running = False
def _finalize_summary_report(self, run_output_path: Optional[str]) -> None:
if not run_output_path:
logger.warning("No run output path available, cannot save summary report to specific location.")
# Could save to a default location or just log it. For now, just log if no path.
logger.info(f"Execution Summary for '{self.profile.get('profile_name')}':\n{json.dumps(self.profile_execution_summary, indent=2)}")
return
sane_profile_name = sanitize_filename_component(self.profile.get("profile_name", "profile_run"))
summary_filename = f"_{sane_profile_name}_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
summary_filepath = os.path.join(run_output_path, summary_filename)
try:
with open(summary_filepath, 'w', encoding='utf-8') as f_summary:
json.dump(self.profile_execution_summary, f_summary, indent=2, ensure_ascii=False)
logger.info(f"Execution summary report saved to: {summary_filepath}")
self._add_produced_file_entry("Summary", "Execution Report", summary_filepath, "Success")
except Exception as e:
logger.error(f"Failed to save execution summary report to '{summary_filepath}': {e}")
self._add_produced_file_entry("Summary", "Execution Report", "", "Failed", str(e))
def request_stop(self) -> None: def request_stop(self) -> None:
"""Requests the profile execution to stop gracefully.""" self._log_event("Stop requested for current profile execution...", True)
self.status_updater("Stop requested for current profile execution...")
self._stop_requested = True self._stop_requested = True
# If GDB session is in a blocking expect, this won't have immediate effect if self.gdb_session:
# until GDB returns control. For true async stop, gdb_session would need interrupt. # This is a soft stop. GDB might be busy.
# A more forceful stop might involve interrupting gdb_session.child if possible.
pass
def _cleanup_session(self) -> None: def _cleanup_session(self) -> None:
"""Cleans up the GDB session."""
if self.gdb_session and self.gdb_session.is_alive(): if self.gdb_session and self.gdb_session.is_alive():
self.status_updater("Cleaning up GDB session...") self._log_event("Cleaning up GDB session...", False)
quit_timeout = self._get_setting("timeouts", "gdb_quit", 10) quit_timeout = self._get_setting("timeouts", "gdb_quit", 10)
# If program might be running, try to kill it first, then quit
if self.gdb_session.child and self.gdb_session.child.isalive(): # Check if child process exists
# A more robust check would be to see if GDB thinks a program is loaded/running
# For now, assume we might need to kill if a breakpoint was hit or run was issued.
try: try:
kill_timeout = self._get_setting("timeouts", "kill_program", 20) # Check if the inferior process might still be running
# Only kill if it's likely the inferior is running. # This can be complex; for now, we assume quit will handle it or timeout
# This is tricky. GDB might not have an inferior if it exited. # if self.gdb_session.child.isalive(): # Simplified check
# For simplicity now, just attempt quit. A more robust kill would be needed # kill_timeout = self._get_setting("timeouts", "kill_program", 20)
# if the program is left running and blocks quit. # self.gdb_output_writer(self.gdb_session.kill_program(timeout=kill_timeout))
# kill_output = self.gdb_session.kill_program(timeout=kill_timeout) pass
# self.gdb_output_writer(f"Kill attempt during cleanup: {kill_output}\n")
pass # Skip kill for now to avoid complexity if program already exited
except Exception as e_kill: except Exception as e_kill:
logger.warning(f"Exception during kill in cleanup: {e_kill}") logger.warning(f"Exception during potential kill in cleanup: {e_kill}")
finally:
self.gdb_session.quit(timeout=quit_timeout) self.gdb_session.quit(timeout=quit_timeout)
self.gdb_output_writer("GDB session quit during cleanup.\n") self.gdb_output_writer("GDB session quit during cleanup.\n")
self.gdb_session = None self.gdb_session = None

View File

@ -7,6 +7,8 @@ import logging
import os import os
import json # For pretty-printing JSON in the GUI import json # For pretty-printing JSON in the GUI
import re import re
import subprocess # For opening folder cross-platform
import sys # To check platform
import threading # For running profile executor in a separate thread import threading # For running profile executor in a separate thread
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
@ -15,7 +17,7 @@ from typing import Optional, Dict, Any
from ..core.gdb_controller import GDBSession from ..core.gdb_controller import GDBSession
from ..core.output_formatter import save_to_json, save_to_csv from ..core.output_formatter import save_to_json, save_to_csv
from ..core.config_manager import AppSettings from ..core.config_manager import AppSettings
from ..core.profile_executor import ProfileExecutor # NEW IMPORT from ..core.profile_executor import ProfileExecutor, ExecutionLogEntry
from .config_window import ConfigWindow from .config_window import ConfigWindow
from .profile_manager_window import ProfileManagerWindow from .profile_manager_window import ProfileManagerWindow
@ -56,6 +58,8 @@ class GDBGui(tk.Tk):
self.profile_executor_instance: Optional[ProfileExecutor] = None self.profile_executor_instance: Optional[ProfileExecutor] = None
self.available_profiles_map: Dict[str, Dict[str, Any]] = {} # Maps display name to profile data self.available_profiles_map: Dict[str, Dict[str, Any]] = {} # Maps display name to profile data
self.profile_exec_status_var = tk.StringVar(value="Select a profile to run.") # Status for auto execution self.profile_exec_status_var = tk.StringVar(value="Select a profile to run.") # Status for auto execution
self.produced_files_tree: Optional[ttk.Treeview] = None
self.last_run_output_path: Optional[str] = None
self._create_menus() self._create_menus()
self._create_widgets() self._create_widgets()
@ -235,41 +239,99 @@ class GDBGui(tk.Tk):
self.save_csv_button.pack(side=tk.LEFT, padx=5, pady=5) self.save_csv_button.pack(side=tk.LEFT, padx=5, pady=5)
def _populate_automated_execution_tab(self, parent_tab_frame: ttk.Frame) -> None: def _populate_automated_execution_tab(self, parent_tab_frame: ttk.Frame) -> None:
"""Populates the tab for automated profile execution.""" parent_tab_frame.columnconfigure(0, weight=1)
parent_tab_frame.columnconfigure(0, weight=1) # Consenti al frame principale del tab di espandersi parent_tab_frame.rowconfigure(1, weight=1) # Treeview expansion
parent_tab_frame.rowconfigure(2, weight=0) # Button row
# Frame per Profile Selection e Control, ora con una griglia più complessa # Frame for Profile Selection and Control (row 0)
auto_control_frame = ttk.LabelFrame(parent_tab_frame, text="Profile Execution Control", padding="10") auto_control_frame = ttk.LabelFrame(parent_tab_frame, text="Profile Execution Control", padding="10")
auto_control_frame.grid(row=0, column=0, sticky="ew", pady=5) auto_control_frame.grid(row=0, column=0, sticky="ew", pady=5)
# ... (contenuto di auto_control_frame come prima) ...
auto_control_frame.columnconfigure(1, weight=1)
# Configura le colonne del auto_control_frame per il layout desiderato:
# Colonna 0: Label "Select Profile:"
# Colonna 1: Combobox (espandibile)
# Colonna 2: Bottone Run
# Colonna 3: Bottone Stop
auto_control_frame.columnconfigure(0, weight=0) # Label fissa
auto_control_frame.columnconfigure(1, weight=1) # Combobox espandibile
auto_control_frame.columnconfigure(2, weight=0) # Bottone Run fisso
auto_control_frame.columnconfigure(3, weight=0) # Bottone Stop fisso
# Riga 0: Label, Combobox, Run Button, Stop Button
ttk.Label(auto_control_frame, text="Select Profile:").grid(row=0, column=0, padx=(5,2), pady=5, sticky="w") ttk.Label(auto_control_frame, text="Select Profile:").grid(row=0, column=0, padx=(5,2), pady=5, sticky="w")
self.profile_selection_combo = ttk.Combobox(auto_control_frame, state="readonly", width=35, textvariable=tk.StringVar())
self.profile_selection_combo = ttk.Combobox(auto_control_frame, state="readonly", width=35, # Larghezza leggermente ridotta
textvariable=tk.StringVar())
self.profile_selection_combo.grid(row=0, column=1, padx=(0,5), pady=5, sticky="ew") self.profile_selection_combo.grid(row=0, column=1, padx=(0,5), pady=5, sticky="ew")
self.run_profile_button = ttk.Button(auto_control_frame, text="Run Profile", command=self._run_selected_profile_action, state=tk.DISABLED) self.run_profile_button = ttk.Button(auto_control_frame, text="Run Profile", command=self._run_selected_profile_action, state=tk.DISABLED)
self.run_profile_button.grid(row=0, column=2, padx=(0,2), pady=5, sticky="ew") self.run_profile_button.grid(row=0, column=2, padx=(0,2), pady=5, sticky="ew")
self.stop_profile_button = ttk.Button(auto_control_frame, text="Stop Profile", command=self._stop_current_profile_action, state=tk.DISABLED) self.stop_profile_button = ttk.Button(auto_control_frame, text="Stop Profile", command=self._stop_current_profile_action, state=tk.DISABLED)
self.stop_profile_button.grid(row=0, column=3, padx=(0,5), pady=5, sticky="ew") self.stop_profile_button.grid(row=0, column=3, padx=(0,5), pady=5, sticky="ew")
# Riga 1: Status label (sotto i controlli)
ttk.Label(auto_control_frame, textvariable=self.profile_exec_status_var, relief=tk.SUNKEN, anchor=tk.W, padding=3).grid( ttk.Label(auto_control_frame, textvariable=self.profile_exec_status_var, relief=tk.SUNKEN, anchor=tk.W, padding=3).grid(
row=1, column=0, columnspan=4, sticky="ew", padx=5, pady=(10,5), ipady=2 # columnspan=4 per coprire tutte le colonne row=1, column=0, columnspan=4, sticky="ew", padx=5, pady=(10,5), ipady=2
) )
# Frame and Treeview for Produced Files Log (row 1)
produced_files_frame = ttk.LabelFrame(parent_tab_frame, text="Produced Files Log", padding="10")
produced_files_frame.grid(row=1, column=0, sticky="nsew", pady=(5,0))
produced_files_frame.columnconfigure(0, weight=1)
produced_files_frame.rowconfigure(0, weight=1)
self.produced_files_tree = ttk.Treeview(
produced_files_frame,
columns=("timestamp", "breakpoint", "variable", "file", "status", "details"),
show="headings",
selectmode="browse"
)
self.produced_files_tree.grid(row=0, column=0, sticky="nsew")
# ... (definizione headings e columns come prima) ...
self.produced_files_tree.heading("timestamp", text="Time", anchor=tk.W)
self.produced_files_tree.heading("breakpoint", text="Breakpoint", anchor=tk.W)
self.produced_files_tree.heading("variable", text="Variable", anchor=tk.W)
self.produced_files_tree.heading("file", text="File Produced", anchor=tk.W)
self.produced_files_tree.heading("status", text="Status", anchor=tk.W)
self.produced_files_tree.heading("details", text="Details", anchor=tk.W)
self.produced_files_tree.column("timestamp", width=130, minwidth=120, stretch=False)
self.produced_files_tree.column("breakpoint", width=150, minwidth=100, stretch=True)
self.produced_files_tree.column("variable", width=150, minwidth=100, stretch=True)
self.produced_files_tree.column("file", width=200, minwidth=150, stretch=True)
self.produced_files_tree.column("status", width=80, minwidth=60, stretch=False)
self.produced_files_tree.column("details", width=200, minwidth=150, stretch=True)
tree_scrollbar_y = ttk.Scrollbar(produced_files_frame, orient=tk.VERTICAL, command=self.produced_files_tree.yview)
tree_scrollbar_y.grid(row=0, column=1, sticky="ns")
tree_scrollbar_x = ttk.Scrollbar(produced_files_frame, orient=tk.HORIZONTAL, command=self.produced_files_tree.xview)
tree_scrollbar_x.grid(row=1, column=0, sticky="ew") # Sotto il treeview
self.produced_files_tree.configure(yscrollcommand=tree_scrollbar_y.set, xscrollcommand=tree_scrollbar_x.set)
# NEW: Button to open output folder (row 2)
folder_button_frame = ttk.Frame(parent_tab_frame) # No LabelFrame needed
folder_button_frame.grid(row=2, column=0, sticky="ew", pady=(5,0))
self.open_output_folder_button = ttk.Button(
folder_button_frame,
text="Open Output Folder",
command=self._open_last_run_output_folder,
state=tk.DISABLED
)
# Pack it to the right or center as desired
self.open_output_folder_button.pack(side=tk.RIGHT, padx=5, pady=5) # Allineato a destra nel suo frame
def _clear_produced_files_tree(self) -> None:
if self.produced_files_tree:
for item in self.produced_files_tree.get_children():
self.produced_files_tree.delete(item)
def _gui_add_execution_log_entry(self, entry: ExecutionLogEntry) -> None:
"""Callback for ProfileExecutor to add an entry to the produced files treeview."""
if self.produced_files_tree and self.winfo_exists():
try:
# Ensure all keys exist in the entry, providing defaults if not
values = (
entry.get("timestamp", ""),
entry.get("breakpoint", "N/A"),
entry.get("variable", "N/A"),
entry.get("file_produced", "N/A"),
entry.get("status", "N/A"),
entry.get("details", "")
)
item_id = self.produced_files_tree.insert("", tk.END, values=values)
self.produced_files_tree.see(item_id) # Scroll to the new item
except Exception as e:
logger.error(f"Failed to add entry to produced_files_tree: {e}. Entry: {entry}")
def _load_and_populate_profiles_for_automation_tab(self) -> None: def _load_and_populate_profiles_for_automation_tab(self) -> None:
self.available_profiles_map.clear() self.available_profiles_map.clear()
profiles = self.app_settings.get_profiles() profiles = self.app_settings.get_profiles()
@ -709,6 +771,10 @@ class GDBGui(tk.Tk):
self._update_parsed_json_output(data) self._update_parsed_json_output(data)
def _run_selected_profile_action(self) -> None: def _run_selected_profile_action(self) -> None:
self.last_run_output_path = None
self.open_output_folder_button.config(state=tk.DISABLED)
selected_profile_name = self.profile_selection_combo.get() selected_profile_name = self.profile_selection_combo.get()
if not selected_profile_name: if not selected_profile_name:
messagebox.showwarning("No Profile Selected", "Please select a profile to run.", parent=self) messagebox.showwarning("No Profile Selected", "Please select a profile to run.", parent=self)
@ -746,7 +812,8 @@ class GDBGui(tk.Tk):
self.app_settings, self.app_settings,
status_update_callback=self._gui_status_update, status_update_callback=self._gui_status_update,
gdb_output_callback=self._gui_gdb_output_update, gdb_output_callback=self._gui_gdb_output_update,
json_output_callback=self._gui_json_data_update json_output_callback=self._gui_json_data_update,
execution_log_callback=self._gui_add_execution_log_entry
) )
self.profile_exec_status_var.set(f"Starting profile '{selected_profile_name}' in background...") self.profile_exec_status_var.set(f"Starting profile '{selected_profile_name}' in background...")
@ -766,33 +833,98 @@ class GDBGui(tk.Tk):
self.after(0, self._on_profile_execution_finished) self.after(0, self._on_profile_execution_finished)
def _on_profile_execution_finished(self): def _on_profile_execution_finished(self):
if not self.winfo_exists(): return # Window might have been destroyed """Called (in main thread) after profile executor thread finishes."""
if not self.winfo_exists(): # Check if main window still exists
logger.warning("_on_profile_execution_finished called but window no longer exists.")
return
final_status_message = "Profile execution finished." # Default final message
# Attempt to get the output path from the executor instance and enable button
# Also, try to get a more specific final status from the executor if available
if self.profile_executor_instance:
if hasattr(self.profile_executor_instance, 'current_run_output_path'):
self.last_run_output_path = self.profile_executor_instance.current_run_output_path
if self.last_run_output_path and os.path.isdir(self.last_run_output_path):
self.open_output_folder_button.config(state=tk.NORMAL)
logger.info(f"Output folder for last run: {self.last_run_output_path}")
else:
self.open_output_folder_button.config(state=tk.DISABLED)
logger.warning(f"Output folder path from executor not valid or not found: {self.last_run_output_path}")
else: # If executor doesn't have the attribute (should not happen with current ProfileExecutor)
self.open_output_folder_button.config(state=tk.DISABLED)
logger.warning("profile_executor_instance does not have 'current_run_output_path' attribute.")
# Use the status already set by the executor's status_updater callback if it's meaningful
current_gui_status = self.profile_exec_status_var.get()
if current_gui_status and not current_gui_status.startswith("Starting profile") and not current_gui_status.startswith("Requesting profile stop"):
# If the status var has something other than the initial "Starting..." or "Stopping..." messages
if "Error:" in current_gui_status or "failed" in current_gui_status.lower() or "issues" in current_gui_status.lower():
final_status_message = f"Profile finished with issues: {current_gui_status}"
else:
final_status_message = f"Profile run completed. Last status: {current_gui_status}"
# If current_gui_status is still "Starting..." or "Stopping...", the default "Profile execution finished." is fine
# or we could check profile_executor_instance.profile_execution_summary["status"]
elif hasattr(self.profile_executor_instance, 'profile_execution_summary'):
executor_final_status = self.profile_executor_instance.profile_execution_summary.get("status", "Unknown")
if "Error" in executor_final_status or "failed" in executor_final_status.lower():
final_status_message = f"Profile finished with issues (from summary): {executor_final_status}"
elif executor_final_status != "Initialized" and executor_final_status != "Pending":
final_status_message = f"Profile run completed. Final state: {executor_final_status}"
# Get final status from executor instance if it's still around and set it.
# The instance might have already updated profile_exec_status_var directly.
final_status_message = "Profile execution finished."
if self.profile_executor_instance and hasattr(self.profile_executor_instance, 'status_updater'):
# This is a bit indirect; ideally, status_updater already set the final message
# or executor.run() returns a final status. For now, we take the current var value.
current_status = self.profile_exec_status_var.get()
if "Error:" in current_status or "failed" in current_status.lower():
final_status_message = f"Profile finished with issues: {current_status}"
elif current_status and not current_status.startswith("Starting profile"): # If not default starting msg
final_status_message = f"Profile run completed. Last status: {current_status}"
self.profile_exec_status_var.set(final_status_message) self.profile_exec_status_var.set(final_status_message)
# Re-enable UI components # Re-enable UI components
self.run_profile_button.config(state=tk.NORMAL if self.profile_selection_combo.get() else tk.DISABLED) # Enable run button only if a profile is actually selected in the combobox
if self.profile_selection_combo.get():
self.run_profile_button.config(state=tk.NORMAL)
else:
self.run_profile_button.config(state=tk.DISABLED)
self.stop_profile_button.config(state=tk.DISABLED) self.stop_profile_button.config(state=tk.DISABLED)
self.profile_selection_combo.config(state="readonly") self.profile_selection_combo.config(state="readonly") # Re-enable combobox selection
try:
try: # Safely re-enable menu items
if self.menubar.winfo_exists(): # Check if menubar itself exists
self.menubar.entryconfig("Profiles", state=tk.NORMAL) self.menubar.entryconfig("Profiles", state=tk.NORMAL)
self.menubar.entryconfig("Options", state=tk.NORMAL) self.menubar.entryconfig("Options", state=tk.NORMAL)
except tk.TclError: pass except tk.TclError as e:
logger.warning(f"TclError re-enabling menubar items: {e}")
self._check_critical_configs_and_update_gui() # This will re-evaluate manual GDB buttons state # This will re-evaluate the state of manual GDB buttons correctly
# and also ensure GDB path status is current.
self._check_critical_configs_and_update_gui()
# Clear the executor instance as it has finished its job
self.profile_executor_instance = None self.profile_executor_instance = None
logger.info("Profile execution GUI updates completed, executor instance cleared.")
def _open_last_run_output_folder(self) -> None:
"""Opens the output folder of the last successfully completed profile run."""
if not self.last_run_output_path or not os.path.isdir(self.last_run_output_path):
messagebox.showwarning("No Output Folder",
"The output folder for the last run is not available or does not exist.",
parent=self)
self.open_output_folder_button.config(state=tk.DISABLED) # Re-disable if path became invalid
return
try:
logger.info(f"Opening output folder: {self.last_run_output_path}")
if sys.platform == "win32":
os.startfile(self.last_run_output_path)
elif sys.platform == "darwin": # macOS
subprocess.run(["open", self.last_run_output_path], check=True)
else: # Linux and other UNIX-like
subprocess.run(["xdg-open", self.last_run_output_path], check=True)
except FileNotFoundError: # For xdg-open or open if not found
messagebox.showerror("Error",
f"Could not find the file manager command ('xdg-open' or 'open'). Please open the folder manually:\n{self.last_run_output_path}",
parent=self)
except Exception as e:
logger.error(f"Failed to open output folder '{self.last_run_output_path}': {e}")
messagebox.showerror("Error Opening Folder",
f"Could not open the output folder:\n{self.last_run_output_path}\n\nError: {e}",
parent=self)
def _stop_current_profile_action(self) -> None: def _stop_current_profile_action(self) -> None: