fix braakpoint order

This commit is contained in:
VALLONGOL 2025-05-23 10:09:14 +02:00
parent a668497221
commit 4648efef2a

View File

@ -1,48 +1,37 @@
# File: cpp_python_debug/core/profile_executor.py # File: cpp_python_debug/core/profile_executor.py
# Manages the automated execution of a debug profile. # Manages the automated execution of a debug profile using breakpoint addresses.
import logging import logging
import os import os
import time import time
import json # For saving the summary report import json
import re # For sanitizing filenames import re
from datetime import datetime # For timestamping from datetime import datetime
from typing import Dict, Any, Optional, Callable, List from typing import Dict, Any, Optional, Callable, List, Tuple
from .gdb_controller import GDBSession from .gdb_controller import GDBSession
from .config_manager import AppSettings from .config_manager import AppSettings
from .output_formatter import save_to_json as save_data_to_json_file # Alias to avoid confusion from .output_formatter import save_to_json as save_data_to_json_file
from .output_formatter import save_to_csv as save_data_to_csv_file # Alias from .output_formatter import save_to_csv as save_data_to_csv_file
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Data structure for the execution log entries to be displayed in GUI and saved in summary ExecutionLogEntry = Dict[str, str] # type alias
# TypedDict could be used here if preferred for more formal structure
ExecutionLogEntry = Dict[str, str] # E.g., {"breakpoint": "main", "variable": "argc", "file_produced": "dump.json", "status": "Success"}
def sanitize_filename_component(component: str) -> str: def sanitize_filename_component(component: str) -> str:
"""Sanitizes a string component to be safe for filenames."""
if not component: if not component:
return "unknown" return "unknown"
# Remove or replace characters invalid for filenames on most OS
# This is a basic sanitization, might need to be more robust
component = re.sub(r'[\\/*?:"<>|]', "_", component) component = re.sub(r'[\\/*?:"<>|]', "_", component)
component = component.replace(" ", "_") component = component.replace(" ", "_")
return component[:50] # Limit length of each component return component[:50] # Limit length for safety
class ProfileExecutor: class ProfileExecutor:
"""
Orchestrates the execution of a debug profile, interacting with GDBSession.
"""
def __init__(self, def __init__(self,
profile_data: Dict[str, Any], profile_data: Dict[str, Any],
app_settings: AppSettings, app_settings: AppSettings,
status_update_callback: Optional[Callable[[str], None]] = None, status_update_callback: Optional[Callable[[str], None]] = None,
gdb_output_callback: Optional[Callable[[str], None]] = None, gdb_output_callback: Optional[Callable[[str], None]] = None,
json_output_callback: Optional[Callable[[Any], None]] = None, json_output_callback: Optional[Callable[[Any], None]] = None,
# NEW callback for structured log entries
execution_log_callback: Optional[Callable[[ExecutionLogEntry], None]] = None execution_log_callback: Optional[Callable[[ExecutionLogEntry], None]] = None
): ):
self.profile = profile_data self.profile = profile_data
@ -51,9 +40,17 @@ class ProfileExecutor:
self.is_running: bool = False self.is_running: bool = False
self._stop_requested: bool = False self._stop_requested: bool = False
self.profile_execution_summary: Dict[str, Any] = {} # To store all summary data self.profile_execution_summary: Dict[str, Any] = {}
self.produced_files_log: List[ExecutionLogEntry] = [] # Log of files produced self.produced_files_log: List[ExecutionLogEntry] = []
self.execution_event_log: List[str] = [] # General text log of execution steps self.execution_event_log: List[str] = []
# NEW MAPPING STRUCTURES:
# Maps GDB breakpoint number to its address and original action index
self.gdb_bp_num_to_details_map: Dict[int, Dict[str, Any]] = {}
# Maps memory address (str) to a list of action indices that are set at this address
self.address_to_action_indices_map: Dict[str, List[int]] = {}
self.current_run_output_path: Optional[str] = None
self.status_updater = status_update_callback if status_update_callback else self._default_status_update self.status_updater = status_update_callback if status_update_callback else self._default_status_update
self.gdb_output_writer = gdb_output_callback if gdb_output_callback else self._default_gdb_output self.gdb_output_writer = gdb_output_callback if gdb_output_callback else self._default_gdb_output
@ -62,33 +59,36 @@ class ProfileExecutor:
logger.info(f"ProfileExecutor initialized for profile: '{self.profile.get('profile_name', 'Unnamed Profile')}'") logger.info(f"ProfileExecutor initialized for profile: '{self.profile.get('profile_name', 'Unnamed Profile')}'")
# Default callbacks if none are provided (e.g., for non-GUI execution in future)
def _default_status_update(self, msg: str): logger.info(f"Status: {msg}") def _default_status_update(self, msg: str): logger.info(f"Status: {msg}")
def _default_gdb_output(self, msg: str): logger.debug(f"GDB Output: {msg}") def _default_gdb_output(self, msg: str): logger.debug(f"GDB Output: {msg}")
def _default_json_data(self, data: Any): logger.debug(f"JSON Data: {str(data)[:200]}") def _default_json_data(self, data: Any): logger.debug(f"JSON Data: {str(data)[:200]}")
def _default_execution_log(self, entry: ExecutionLogEntry): logger.info(f"Execution Log: {entry}") def _default_execution_log(self, entry: ExecutionLogEntry): logger.info(f"Execution Log: {entry}")
def _log_event(self, message: str, is_status_update: bool = True) -> None: def _log_event(self, message: str, is_status_update: bool = True) -> None:
"""Logs an event to the internal event log and optionally updates status."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"[{timestamp}] {message}" log_message = f"[{timestamp}] {message}"
self.execution_event_log.append(log_message) self.execution_event_log.append(log_message)
if is_status_update: if is_status_update:
self.status_updater(message) # Update GUI status with the core message self.status_updater(message)
logger.info(message) # Also log to main application logger logger.info(message)
def _add_produced_file_entry(self, breakpoint_loc: str, variable_name: str, file_path: str, status: str, details: str = "") -> None: def _add_produced_file_entry(self, breakpoint_loc_spec: str, # The user-specified location string
variable_name: str, file_path: str, status: str,
gdb_bp_num: Optional[int] = None, address: Optional[str] = None,
details: str = "") -> None:
entry: ExecutionLogEntry = { entry: ExecutionLogEntry = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"breakpoint": breakpoint_loc, "breakpoint_spec": breakpoint_loc_spec, # User's original string
"gdb_bp_num": str(gdb_bp_num) if gdb_bp_num is not None else "N/A",
"address": address if address else "N/A",
"variable": variable_name, "variable": variable_name,
"file_produced": os.path.basename(file_path) if file_path else "N/A", "file_produced": os.path.basename(file_path) if file_path else "N/A",
"full_path": file_path if file_path else "N/A", # Store full path for reference "full_path": file_path if file_path else "N/A",
"status": status, # e.g., "Success", "Failed", "Skipped" "status": status,
"details": details "details": details
} }
self.produced_files_log.append(entry) self.produced_files_log.append(entry)
self.execution_log_adder(entry) # Send to GUI for display self.execution_log_adder(entry) # Call GUI callback
def _get_setting(self, category: str, key: str, default: Optional[Any] = None) -> Any: def _get_setting(self, category: str, key: str, default: Optional[Any] = None) -> Any:
return self.app_settings.get_setting(category, key, default) return self.app_settings.get_setting(category, key, default)
@ -96,14 +96,13 @@ class ProfileExecutor:
def _get_dumper_options(self) -> Dict[str, Any]: def _get_dumper_options(self) -> Dict[str, Any]:
return self.app_settings.get_category_settings("dumper_options", {}) return self.app_settings.get_category_settings("dumper_options", {})
def _generate_output_filename(self, pattern: str, profile_name: str, bp_loc: str, var_name: str, file_format: str) -> str: def _generate_output_filename(self, pattern: str, profile_name: str, bp_loc_spec: str, var_name: str, file_format: str) -> str:
"""Generates a filename based on the pattern and context.""" timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] # Milliseconds
placeholders = { placeholders = {
"{profile_name}": sanitize_filename_component(profile_name), "{profile_name}": sanitize_filename_component(profile_name),
"{app_name}": sanitize_filename_component(os.path.basename(self.profile.get("target_executable", "app"))), "{app_name}": sanitize_filename_component(os.path.basename(self.profile.get("target_executable", "app"))),
"{breakpoint}": sanitize_filename_component(bp_loc), "{breakpoint}": sanitize_filename_component(bp_loc_spec), # Use original spec for filename clarity
"{variable}": sanitize_filename_component(var_name), "{variable}": sanitize_filename_component(var_name),
"{timestamp}": timestamp_str, "{timestamp}": timestamp_str,
"{format}": file_format.lower() "{format}": file_format.lower()
@ -113,34 +112,18 @@ class ProfileExecutor:
for ph, val in placeholders.items(): for ph, val in placeholders.items():
filename = filename.replace(ph, val) filename = filename.replace(ph, val)
# Ensure it ends with the correct extension if not already handled by pattern's {format}
if not filename.lower().endswith(f".{file_format.lower()}"): if not filename.lower().endswith(f".{file_format.lower()}"):
filename += f".{file_format.lower()}" filename += f".{file_format.lower()}"
return filename return filename
def _prepare_output_directory(self, base_output_dir_from_action: str, profile_name: str) -> Optional[str]: def _prepare_output_directory(self, base_output_dir_from_action: str, profile_name: str) -> Optional[str]:
"""Creates the output directory structure: base_dir/profile_name_timestamp/"""
if not base_output_dir_from_action: if not base_output_dir_from_action:
self._log_event(f"Error: Output directory for action is not specified in profile '{profile_name}'.", True) self._log_event(f"Error: Output directory for action is not specified in profile '{profile_name}'.", True)
return None return None
# Sanitize profile name for directory use
sane_profile_name = sanitize_filename_component(profile_name) sane_profile_name = sanitize_filename_component(profile_name)
timestamp_dir = datetime.now().strftime("%Y%m%d_%H%M%S") timestamp_dir = datetime.now().strftime("%Y%m%d_%H%M%S")
# Specific execution directory: base_output_dir / profile_name / profile_name_timestamp
# This makes it easy to group all files from one specific run.
specific_run_dirname = f"{sane_profile_name}_{timestamp_dir}" specific_run_dirname = f"{sane_profile_name}_{timestamp_dir}"
# The `base_output_dir_from_action` is the root where the profile's folder will be created.
# For example, if action says "./dumps", and profile is "MyTest",
# we create: ./dumps/MyTest/MyTest_20230101_120000/
# This seems a bit nested. Let's simplify:
# Action `output_directory` becomes the PARENT of the `specific_run_dirname`
profile_execution_path = os.path.join(base_output_dir_from_action, specific_run_dirname) profile_execution_path = os.path.join(base_output_dir_from_action, specific_run_dirname)
try: try:
os.makedirs(profile_execution_path, exist_ok=True) os.makedirs(profile_execution_path, exist_ok=True)
self._log_event(f"Output directory prepared: {profile_execution_path}", False) self._log_event(f"Output directory prepared: {profile_execution_path}", False)
@ -149,6 +132,52 @@ class ProfileExecutor:
self._log_event(f"Error creating output directory '{profile_execution_path}': {e}", True) self._log_event(f"Error creating output directory '{profile_execution_path}': {e}", True)
return None return None
def _parse_gdb_set_breakpoint_output(self, gdb_output: str) -> Optional[Tuple[int, str]]:
"""
Parses the output of GDB's 'break' command to extract breakpoint number and address.
Example: "Breakpoint 1 at 0x401570: file main.c, line 5."
Example (pending): "Breakpoint 2 pending." (Address might not be available yet)
Returns: Tuple (gdb_bp_number, address_string) or None if not parsed.
Address can be "pending" if GDB indicates so.
"""
if not gdb_output: return None
# Regex to capture number and address (hex)
match = re.search(r"Breakpoint\s+(\d+)\s+at\s+(0x[0-9a-fA-F]+)", gdb_output, re.IGNORECASE)
if match:
bp_num = int(match.group(1))
address = match.group(2).lower() # Normalize address to lowercase
return bp_num, address
# Regex for pending breakpoints (might not have an address yet)
match_pending = re.search(r"Breakpoint\s+(\d+)\s+pending", gdb_output, re.IGNORECASE)
if match_pending:
bp_num = int(match_pending.group(1))
return bp_num, "pending" # Special address string for pending
logger.warning(f"Could not parse GDB breakpoint number and address from set_breakpoint output: '{gdb_output[:200]}'")
return None
def _parse_breakpoint_hit_output(self, gdb_output: str) -> Optional[int]:
"""
Parses GDB output (from run/continue) to find the number of the GDB breakpoint that was hit.
Returns the GDB breakpoint number (int) or None.
"""
if not gdb_output: return None
# Order of regexes can matter if output is ambiguous.
# "Thread X hit Breakpoint Y" is often more specific if present.
match = re.search(r"Thread\s+\S+\s+hit\s+Breakpoint\s+(\d+)", gdb_output, re.IGNORECASE)
if match:
return int(match.group(1))
# Simpler "Breakpoint Y, ..." or "Breakpoint Y at ..."
match = re.search(r"Breakpoint\s+(\d+)[,\s]", gdb_output) # Ensure a separator after number
if match:
return int(match.group(1))
logger.debug(f"Could not parse GDB breakpoint number from hit output: '{gdb_output[:200]}...'")
return None
def run(self) -> None: def run(self) -> None:
profile_name = self.profile.get("profile_name", "Unnamed Profile") profile_name = self.profile.get("profile_name", "Unnamed Profile")
self._log_event(f"Starting profile: '{profile_name}'...", True) self._log_event(f"Starting profile: '{profile_name}'...", True)
@ -156,6 +185,8 @@ class ProfileExecutor:
self._stop_requested = False self._stop_requested = False
self.produced_files_log.clear() self.produced_files_log.clear()
self.execution_event_log.clear() self.execution_event_log.clear()
self.gdb_bp_num_to_details_map.clear()
self.address_to_action_indices_map.clear()
self.profile_execution_summary = { self.profile_execution_summary = {
"profile_name": profile_name, "profile_name": profile_name,
@ -164,12 +195,19 @@ class ProfileExecutor:
"start_time": datetime.now().isoformat(), "start_time": datetime.now().isoformat(),
"end_time": None, "end_time": None,
"status": "Initialized", "status": "Initialized",
"actions_summary": [], # Will be populated "actions_summary": [
"execution_log": [], # Will be self.execution_event_log {"action_index": i,
"files_produced_detailed": [] # Will be self.produced_files_log "breakpoint_spec": action.get("breakpoint_location", "N/A"),
"gdb_bp_num_assigned": None, # Will be filled
"address_resolved": None, # Will be filled
"variables_dumped_count": 0,
"status": "Pending"}
for i, action in enumerate(self.profile.get("actions", []))
],
"execution_log": [],
"files_produced_detailed": []
} }
gdb_exe = self._get_setting("general", "gdb_executable_path") gdb_exe = self._get_setting("general", "gdb_executable_path")
target_exe = self.profile.get("target_executable") target_exe = self.profile.get("target_executable")
gdb_script_path = self._get_setting("general", "gdb_dumper_script_path") gdb_script_path = self._get_setting("general", "gdb_dumper_script_path")
@ -179,12 +217,9 @@ class ProfileExecutor:
self._log_event(msg, True) self._log_event(msg, True)
self.profile_execution_summary["status"] = "Error: Target not found" self.profile_execution_summary["status"] = "Error: Target not found"
self.is_running = False self.is_running = False
self._finalize_summary_report(None) # Try to save what we have self._finalize_summary_report(None)
return return
# The main output directory for this entire profile *run*
# We use the first action's output_directory as the base for creating the unique run folder.
# All actions in this profile run will save into this unique folder.
actions = self.profile.get("actions", []) actions = self.profile.get("actions", [])
if not actions: if not actions:
self._log_event(f"Profile '{profile_name}' has no actions defined. Stopping.", True) self._log_event(f"Profile '{profile_name}' has no actions defined. Stopping.", True)
@ -192,15 +227,10 @@ class ProfileExecutor:
self._finalize_summary_report(None) self._finalize_summary_report(None)
return return
# Use the output_directory from the first action to determine the parent for the run-specific folder
# A better approach might be a global output_directory for the profile itself.
# For now, assume all actions in a profile intend to save to subfolders of a common base.
# We'll use the first action's directory as this common base.
first_action_output_dir_base = actions[0].get("output_directory", ".") first_action_output_dir_base = actions[0].get("output_directory", ".")
self.current_run_output_path = self._prepare_output_directory(first_action_output_dir_base, profile_name) self.current_run_output_path = self._prepare_output_directory(first_action_output_dir_base, profile_name)
if not self.current_run_output_path: if not self.current_run_output_path:
# Error already logged by _prepare_output_directory
self.profile_execution_summary["status"] = "Error: Cannot create output directory" self.profile_execution_summary["status"] = "Error: Cannot create output directory"
self.is_running = False self.is_running = False
self._finalize_summary_report(None) self._finalize_summary_report(None)
@ -211,221 +241,263 @@ class ProfileExecutor:
gdb_path=gdb_exe, executable_path=target_exe, gdb_path=gdb_exe, executable_path=target_exe,
gdb_script_full_path=gdb_script_path, dumper_options=self._get_dumper_options() gdb_script_full_path=gdb_script_path, dumper_options=self._get_dumper_options()
) )
startup_timeout = self._get_setting("timeouts", "gdb_start", 30) startup_timeout = self._get_setting("timeouts", "gdb_start", 30)
self._log_event(f"Spawning GDB for '{os.path.basename(target_exe)}'...", True) self._log_event(f"Spawning GDB for '{os.path.basename(target_exe)}'...", True)
self.gdb_session.start(timeout=startup_timeout) self.gdb_session.start(timeout=startup_timeout)
self.gdb_output_writer(f"GDB session started for profile '{profile_name}'.\n") self.gdb_output_writer(f"GDB session started for profile '{profile_name}'.\n")
self._log_event("GDB session started.", False) self._log_event("GDB session started.", False)
if gdb_script_path and self.gdb_session.gdb_script_sourced_successfully: if gdb_script_path and self.gdb_session.gdb_script_sourced_successfully:
self.gdb_output_writer(f"GDB dumper script '{os.path.basename(gdb_script_path)}' sourced successfully.\n")
self._log_event("GDB dumper script sourced successfully.", False) self._log_event("GDB dumper script sourced successfully.", False)
elif gdb_script_path: elif gdb_script_path:
self.gdb_output_writer(f"Warning: GDB dumper script '{os.path.basename(gdb_script_path)}' failed to load.\n")
self._log_event("Warning: GDB dumper script failed to load.", False) self._log_event("Warning: GDB dumper script failed to load.", False)
# --- PHASE 1: Set all breakpoints and build maps ---
cmd_timeout = self._get_setting("timeouts", "gdb_command", 30)
num_successfully_mapped_breakpoints = 0
for action_idx, action_config in enumerate(actions):
if self._stop_requested: break
bp_spec = action_config.get("breakpoint_location")
action_summary = self.profile_execution_summary["actions_summary"][action_idx]
# --- Main execution loop for actions --- if not bp_spec:
program_exited_prematurely = False self._log_event(f"Action {action_idx + 1}: No breakpoint location. Skipping.", False)
for action_index, action in enumerate(actions): action_summary["status"] = "Skipped (No BP Spec)"
if self._stop_requested:
self._log_event("Execution stopped by user request.", True)
break
action_summary = {
"action_index": action_index,
"breakpoint": action.get("breakpoint_location", "N/A"),
"variables_dumped_count": 0,
"status": "Pending"
}
self.profile_execution_summary["actions_summary"].append(action_summary)
bp_location = action.get("breakpoint_location")
vars_to_dump = action.get("variables_to_dump", [])
continue_after = action.get("continue_after_dump", True)
output_format = action.get("output_format", "json").lower()
filename_pattern = action.get("filename_pattern", "{breakpoint}_{variable}_{timestamp}.{format}")
# Output directory is now self.current_run_output_path for all files in this run.
if not bp_location:
self._log_event(f"Action {action_index + 1}: No breakpoint location. Skipping action.", True)
action_summary["status"] = "Skipped: No breakpoint"
continue continue
self._log_event(f"Action {action_index + 1}: Setting breakpoint at '{bp_location}'...", True) self._log_event(f"Setting BP for Action {action_idx + 1} ('{bp_spec}')...", False)
cmd_timeout = self._get_setting("timeouts", "gdb_command", 30) bp_set_output = self.gdb_session.set_breakpoint(bp_spec, timeout=cmd_timeout)
bp_output = self.gdb_session.set_breakpoint(bp_location, timeout=cmd_timeout) self.gdb_output_writer(bp_set_output)
self.gdb_output_writer(bp_output)
if "Breakpoint" not in bp_output and "pending" not in bp_output.lower(): parsed_bp_info = self._parse_gdb_set_breakpoint_output(bp_set_output)
self._log_event(f"Error: Action {action_index + 1}: Failed to set breakpoint '{bp_location}'. Skipping action.", True) if parsed_bp_info:
action_summary["status"] = f"Error: Failed to set BP" gdb_bp_num, address_str = parsed_bp_info
self._add_produced_file_entry(bp_location, "N/A", "", "Error", f"Failed to set breakpoint: {bp_output[:100]}") action_summary["gdb_bp_num_assigned"] = gdb_bp_num
continue # Move to next action or stop if critical? For now, continue. action_summary["address_resolved"] = address_str
# Run or Continue program self.gdb_bp_num_to_details_map[gdb_bp_num] = {
# Only run on the very first action that requires it, then continue. "address": address_str,
# This needs to be smarter if breakpoints are out of order or program restarts. "action_index": action_idx,
# For now, simple model: run once, then continue. "bp_spec": bp_spec
gdb_run_cmd_output = "" }
run_timeout = self._get_setting("timeouts", "program_run_continue", 120) if address_str != "pending": # Only map non-pending to address map for execution
if address_str not in self.address_to_action_indices_map:
self.address_to_action_indices_map[address_str] = []
if action_idx not in self.address_to_action_indices_map[address_str]: # Avoid duplicates if GDB maps same BP spec multiple times (unlikely)
self.address_to_action_indices_map[address_str].append(action_idx)
self._log_event(f"Action {action_idx+1} ('{bp_spec}'): GDB BP {gdb_bp_num} at {address_str}.", False)
num_successfully_mapped_breakpoints +=1
else:
self._log_event(f"Action {action_idx+1} ('{bp_spec}'): GDB BP {gdb_bp_num} is PENDING. Will not trigger until resolved.", False)
action_summary["status"] = "Pending in GDB"
else:
self._log_event(f"Error: Action {action_idx + 1}: Failed to parse GDB BP info for '{bp_spec}'. Output: {bp_set_output[:100]}", True)
action_summary["status"] = "Error (BP Set/Parse)"
if action_index == 0: # TODO: This logic needs to be more robust for multiple runs or complex scenarios if self._stop_requested: raise InterruptedError("User requested stop during BP setup.")
if num_successfully_mapped_breakpoints == 0:
self._log_event("No non-pending breakpoints successfully mapped. Aborting profile.", True)
self.profile_execution_summary["status"] = "Error: No BPs Mapped"
self._cleanup_session() # Ensure GDB is closed
self._finalize_summary_report(self.current_run_output_path)
return
# --- PHASE 2: Run program and handle breakpoint hits ---
program_params = self.profile.get("program_parameters", "") program_params = self.profile.get("program_parameters", "")
self._log_event(f"Running program '{os.path.basename(target_exe)} {program_params}'...", True) self._log_event(f"Running program '{os.path.basename(target_exe)} {program_params}'...", True)
gdb_run_cmd_output = self.gdb_session.run_program(program_params, timeout=run_timeout) run_timeout = self._get_setting("timeouts", "program_run_continue", 120)
else: # For subsequent breakpoints, we 'continue' gdb_output = self.gdb_session.run_program(program_params, timeout=run_timeout)
self._log_event(f"Continuing execution for action {action_index + 1} (BP: {bp_location})...", True) self.gdb_output_writer(gdb_output)
gdb_run_cmd_output = self.gdb_session.continue_execution(timeout=run_timeout)
self.gdb_output_writer(gdb_run_cmd_output) program_has_exited = "Program exited normally" in gdb_output or "exited with code" in gdb_output
if program_has_exited:
self._log_event(f"Program exited on initial run. Output: {gdb_output[:100]}", True)
# Main event loop
while self.gdb_session.is_alive() and not program_has_exited and not self._stop_requested:
hit_gdb_bp_num = self._parse_breakpoint_hit_output(gdb_output)
# NEW: Try to get current PC if no direct BP number is parsed, or to confirm address
current_pc_address: Optional[str] = None
if self.gdb_session and self.gdb_session.is_alive() and not program_has_exited:
try:
# This is an extra command to GDB, use with caution if performance is critical
# For now, it helps resolve the actual stopping address.
pc_out = self.gdb_session.send_cmd("p/x $pc", expect_prompt=True, timeout=cmd_timeout)
self.gdb_output_writer(f"$pc query: {pc_out}\n")
pc_match = re.search(r"=\s*(0x[0-9a-fA-F]+)", pc_out)
if pc_match:
current_pc_address = pc_match.group(1).lower()
self._log_event(f"Current PC: {current_pc_address}", False)
except Exception as e_pc:
self._log_event(f"Could not get current PC: {e_pc}", False)
actions_to_process_at_this_stop: List[int] = []
hit_bp_details_for_log = "N/A"
if hit_gdb_bp_num is not None and hit_gdb_bp_num in self.gdb_bp_num_to_details_map:
# GDB reported a direct BP number hit
bp_details = self.gdb_bp_num_to_details_map[hit_gdb_bp_num]
address_of_hit = bp_details["address"]
hit_bp_details_for_log = f"GDB BP {hit_gdb_bp_num} ('{bp_details['bp_spec']}') at {address_of_hit}"
if address_of_hit != "pending" and address_of_hit in self.address_to_action_indices_map:
actions_to_process_at_this_stop.extend(self.address_to_action_indices_map[address_of_hit])
elif current_pc_address and current_pc_address in self.address_to_action_indices_map:
# Stopped at a known address, even if GDB didn't report a specific BP number we parsed
actions_to_process_at_this_stop.extend(self.address_to_action_indices_map[current_pc_address])
hit_bp_details_for_log = f"PC {current_pc_address} (mapped to actions)"
if actions_to_process_at_this_stop:
self._log_event(f"Processing stop at {hit_bp_details_for_log}.", True)
# Process all actions mapped to this address/hit
# Deduplicate action indices in case of multiple GDB BPs mapping to same address and action
unique_action_indices_to_process = sorted(list(set(actions_to_process_at_this_stop)))
should_continue_after_all_these_actions = True # Default
for action_idx in unique_action_indices_to_process:
if self._stop_requested: break if self._stop_requested: break
current_action_config = actions[action_idx]
action_summary = self.profile_execution_summary["actions_summary"][action_idx]
if "Program exited normally" in gdb_run_cmd_output or "exited with code" in gdb_run_cmd_output: # Check if this action was already completed (e.g. if multiple GDB BPs mapped to it)
self._log_event(f"Program exited before or during action {action_index + 1} (BP: {bp_location}).", True) if action_summary["status"].startswith("Completed"):
program_exited_prematurely = True self._log_event(f"Action {action_idx + 1} ('{current_action_config.get('breakpoint_location')}') already completed. Skipping.", False)
action_summary["status"] = "Skipped: Program exited" if not current_action_config.get("continue_after_dump", True):
break # Stop processing further actions if program ended should_continue_after_all_these_actions = False # If one says stop, we stop
continue
is_breakpoint_hit = "Breakpoint" in gdb_run_cmd_output or \ self._log_event(f"Executing Action {action_idx + 1} ('{current_action_config.get('breakpoint_location')}')...", False)
(hasattr(self.gdb_session.child, 'before') and "Breakpoint" in self.gdb_session.child.before) action_summary["status"] = "Processing Dumps"
# ... (dumping logic for variables in current_action_config - same as before)
vars_to_dump_for_action = current_action_config.get("variables_to_dump", [])
filename_pattern = current_action_config.get("filename_pattern", "{breakpoint}_{variable}_{timestamp}.{format}")
output_format_for_action = current_action_config.get("output_format", "json").lower()
bp_spec_for_file = current_action_config.get("breakpoint_location", "unknown_bp")
if not is_breakpoint_hit: dump_success_count = 0
self._log_event(f"Action {action_index + 1}: Breakpoint '{bp_location}' not hit as expected. Output: {gdb_run_cmd_output[:100]}...", True) for var_name in vars_to_dump_for_action:
action_summary["status"] = "Skipped: BP not hit" # ... (dumping and saving logic for each var)
self._add_produced_file_entry(bp_location, "N/A", "", "Not Hit", f"BP not hit. GDB: {gdb_run_cmd_output[:100]}") # Make sure to use bp_spec_for_file in _add_produced_file_entry and _generate_output_filename
# If continue_after is false, we'd be stuck. If true, GDB might be running or exited. # ...
# For now, if BP not hit and we were supposed to dump, this action fails.
# If continue_after_dump is true, we might have already continued past other potential BPs.
# This area needs careful thought for complex execution flows.
if not continue_after:
self._log_event(f"Action {action_index + 1}: Halting profile as breakpoint was not hit and continue_after_dump is false.", True)
program_exited_prematurely = True # Treat as if it ended for subsequent actions
break
continue # To next action, assuming program is still running or will hit another BP.
self._log_event(f"Action {action_index + 1}: Hit breakpoint at '{bp_location}'.", True)
action_summary["status"] = "Processing..." # Intermediate status
if not vars_to_dump:
self._log_event(f"Action {action_index + 1}: No variables specified to dump at '{bp_location}'.", False)
self._add_produced_file_entry(bp_location, "N/A", "", "Skipped", "No variables to dump")
dump_success_count_for_action = 0
for var_to_dump in vars_to_dump:
if self._stop_requested: break
self._log_event(f"Dumping variable '{var_to_dump}'...", True)
dump_timeout = self._get_setting("timeouts", "dump_variable", 60) dump_timeout = self._get_setting("timeouts", "dump_variable", 60)
dumped_data = None; file_save_path = ""; dump_status_msg = "Failed"; dump_details_msg = ""
dumped_data = None if not self.gdb_session.gdb_script_sourced_successfully and output_format_for_action == "json":
file_save_path = "" msg = f"Dumper script unavailable for '{var_name}' (JSON)."
dump_status = "Failed" self._log_event(msg, False); dump_details_msg = msg
dump_details = "" self.json_data_handler({"_profile_executor_error": msg, "variable": var_name})
if not self.gdb_session.gdb_script_sourced_successfully and output_format == "json":
msg = f"GDB Dumper script not available/loaded. Cannot dump '{var_to_dump}' as JSON."
self._log_event(msg, True)
dump_details = msg
self.json_data_handler({"_profile_executor_error": msg, "variable": var_to_dump})
else: else:
dumped_data = self.gdb_session.dump_variable_to_json(var_to_dump, timeout=dump_timeout) # Assuming JSON for now dumped_data = self.gdb_session.dump_variable_to_json(var_name, timeout=dump_timeout)
self.json_data_handler(dumped_data) # Send to GUI/logger self.json_data_handler(dumped_data)
if isinstance(dumped_data, dict) and "_gdb_tool_error" in dumped_data: if isinstance(dumped_data, dict) and "_gdb_tool_error" in dumped_data:
err_detail = dumped_data.get("details", dumped_data["_gdb_tool_error"]) err_detail = dumped_data.get("details", dumped_data["_gdb_tool_error"])
self._log_event(f"Error dumping '{var_to_dump}': {err_detail}", True) self._log_event(f"Error dumping '{var_name}': {err_detail}", False); dump_details_msg = f"GDB Tool Error: {err_detail}"
dump_details = f"GDB Tool Error: {err_detail}"
elif dumped_data is not None: elif dumped_data is not None:
self._log_event(f"Successfully dumped '{var_to_dump}'. Preparing to save.", False) output_filename = self._generate_output_filename(filename_pattern, profile_name, bp_spec_for_file, var_name, output_format_for_action)
# Generate filename and save
output_filename = self._generate_output_filename(filename_pattern, profile_name, bp_location, var_to_dump, output_format)
file_save_path = os.path.join(self.current_run_output_path, output_filename) file_save_path = os.path.join(self.current_run_output_path, output_filename)
try: try:
if output_format == "json": if output_format_for_action == "json": save_data_to_json_file(dumped_data, file_save_path)
save_data_to_json_file(dumped_data, file_save_path) elif output_format_for_action == "csv":
elif output_format == "csv": data_for_csv = dumped_data # Adapt as before
# Adapt data for CSV if necessary (as in main_window) if isinstance(data_for_csv, dict) and not isinstance(data_for_csv, list): data_for_csv = [data_for_csv] # etc.
data_for_csv = dumped_data
if isinstance(data_for_csv, dict) and not isinstance(data_for_csv, list): data_for_csv = [data_for_csv]
elif not isinstance(data_for_csv, list): data_for_csv = [{"value": data_for_csv}]
elif isinstance(data_for_csv, list) and data_for_csv and not all(isinstance(item, dict) for item in data_for_csv):
data_for_csv = [{"value": item} for item in data_for_csv]
save_data_to_csv_file(data_for_csv, file_save_path) save_data_to_csv_file(data_for_csv, file_save_path)
else: else: raise ValueError(f"Unsupported format: {output_format_for_action}")
raise ValueError(f"Unsupported output format: {output_format}") self._log_event(f"Saved '{var_name}' to '{output_filename}'.", False); dump_status_msg = "Success"; dump_success_count += 1
self._log_event(f"Saved '{var_to_dump}' to '{output_filename}'.", True)
dump_status = "Success"
dump_success_count_for_action +=1
except Exception as save_e: except Exception as save_e:
self._log_event(f"Error saving dump of '{var_to_dump}' to '{file_save_path}': {save_e}", True) self._log_event(f"Error saving dump of '{var_name}': {save_e}", False); dump_details_msg = f"Save Error: {save_e}"
dump_details = f"Save Error: {save_e}" else:
else: # Dumped data is None, but no _gdb_tool_error self._log_event(f"Dump of '{var_name}' returned no data.", False); dump_details_msg = "Dump returned no data"
self._log_event(f"Dump of '{var_to_dump}' returned no data.", True) self._add_produced_file_entry(bp_spec_for_file, var_name, file_save_path, dump_status_msg,
dump_details = "Dump returned no data" gdb_bp_num=hit_gdb_bp_num, address=current_pc_address, details=dump_details_msg)
self._add_produced_file_entry(bp_location, var_to_dump, file_save_path, dump_status, dump_details) action_summary["variables_dumped_count"] = dump_success_count
if dump_success_count == len(vars_to_dump_for_action) and vars_to_dump_for_action: action_summary["status"] = "Completed"
elif not vars_to_dump_for_action: action_summary["status"] = "Completed (No Vars)"
else: action_summary["status"] = "Completed with Errors"
action_summary["variables_dumped_count"] = dump_success_count_for_action if not current_action_config.get("continue_after_dump", True):
if dump_success_count_for_action == len(vars_to_dump) and vars_to_dump: # All vars in action dumped should_continue_after_all_these_actions = False
action_summary["status"] = "Completed"
elif not vars_to_dump: # No vars to dump for this action, BP was hit
action_summary["status"] = "Completed (No Vars)"
else: # Some vars might have failed
action_summary["status"] = "Completed with Errors"
if self._stop_requested: break
if should_continue_after_all_these_actions:
self._log_event(f"Continuing after processing actions at {hit_bp_details_for_log}...", True)
gdb_output = self.gdb_session.continue_execution(timeout=run_timeout)
self.gdb_output_writer(gdb_output)
if "Program exited normally" in gdb_output or "exited with code" in gdb_output:
program_has_exited = True
self._log_event(f"Program exited after continue. Output: {gdb_output[:100]}", True)
else:
self._log_event(f"Execution halted after processing actions at {hit_bp_details_for_log} as per profile.", True)
program_has_exited = True # Treat as if program ended for the profile
if not continue_after: elif "Program exited normally" in gdb_output or "exited with code" in gdb_output:
self._log_event(f"Action {action_index + 1}: Execution paused at '{bp_location}' as per profile. Profile will now terminate.", True) program_has_exited = True; self._log_event(f"Program exited. Output: {gdb_output[:100]}", True)
break # End profile execution here elif "received signal" in gdb_output.lower():
program_has_exited = True; self._log_event(f"Program received signal. Output: {gdb_output[:100]}", True)
self.profile_execution_summary["status"] = "Completed (Program Signalled/Crashed)"
else:
self._log_event(f"GDB unresponsive or no recognized output after previous step. Output: {gdb_output[:200]}", True)
program_has_exited = True # Assume cannot proceed
if self._stop_requested: break # Check again before explicit continue if program_has_exited: break # Exit while loop
# After loop of actions or if break # After loop summary status update
if program_exited_prematurely: final_status = "Completed"
self.profile_execution_summary["status"] = "Completed (Program Exited Prematurely)" if program_has_exited and not self._stop_requested:
# Check if any actions are still pending (implies program exited before all BPs were hit)
if any(s["status"] == "Pending" or s["status"] == "Pending in GDB" for s in self.profile_execution_summary["actions_summary"]):
final_status = "Completed (Program Exited Prematurely)"
# Preserve crash status if set
if self.profile_execution_summary["status"] not in ["Initialized", "Error: No BPs Mapped"]: # if not already an error
if "Crashed" in self.profile_execution_summary["status"] or "Signalled" in self.profile_execution_summary["status"]:
pass # Keep the more specific crash status
else:
self.profile_execution_summary["status"] = final_status
elif self._stop_requested: elif self._stop_requested:
self.profile_execution_summary["status"] = "Completed (User Stopped)" self.profile_execution_summary["status"] = "Completed (User Stopped)"
else: elif not (self.gdb_session and self.gdb_session.is_alive()) and not program_has_exited:
self.profile_execution_summary["status"] = "Completed" self.profile_execution_summary["status"] = "Error: GDB Died Unexpectedly"
self._log_event("Error: GDB session died unexpectedly during execution.", True)
else: # Loop finished, all actions processed or halted by continue=false
self.profile_execution_summary["status"] = "Completed (All Actions Processed or Halted by Profile)"
except InterruptedError as ie: # Custom for user stop
except FileNotFoundError as fnf_e: self.profile_execution_summary["status"] = "Interrupted (User Stop)"
self._log_event(str(ie), True)
except FileNotFoundError as fnf_e: # ... (standard error handling)
msg = f"Error running profile '{profile_name}': File not found - {fnf_e}" msg = f"Error running profile '{profile_name}': File not found - {fnf_e}"
self._log_event(msg, True) self._log_event(msg, True); self.profile_execution_summary["status"] = f"Error: {fnf_e}"
self.profile_execution_summary["status"] = f"Error: {fnf_e}"
except (ConnectionError, TimeoutError) as session_e: except (ConnectionError, TimeoutError) as session_e:
msg = f"Session error running profile '{profile_name}': {type(session_e).__name__} - {session_e}" msg = f"Session error running profile '{profile_name}': {type(session_e).__name__} - {session_e}"
self._log_event(msg, True) self._log_event(msg, True); self.profile_execution_summary["status"] = f"Error: {session_e}"
self.profile_execution_summary["status"] = f"Error: {session_e}"
except Exception as e: except Exception as e:
msg = f"Unexpected error running profile '{profile_name}': {type(e).__name__} - {e}" msg = f"Unexpected error running profile '{profile_name}': {type(e).__name__} - {e}"
self._log_event(msg, True) self._log_event(msg, True); logger.critical(msg, exc_info=True)
logger.critical(msg, exc_info=True) # Log full traceback for unexpected
self.profile_execution_summary["status"] = f"Critical Error: {e}" self.profile_execution_summary["status"] = f"Critical Error: {e}"
finally: finally:
self._cleanup_session() self._cleanup_session()
self.profile_execution_summary["end_time"] = datetime.now().isoformat() self.profile_execution_summary["end_time"] = datetime.now().isoformat()
self.profile_execution_summary["execution_log"] = self.execution_event_log self.profile_execution_summary["execution_log"] = self.execution_event_log
self.profile_execution_summary["files_produced_detailed"] = self.produced_files_log self.profile_execution_summary["files_produced_detailed"] = self.produced_files_log
self._finalize_summary_report(self.current_run_output_path if hasattr(self, 'current_run_output_path') else None)
self._log_event(f"Profile '{profile_name}' execution finished. Summary report generated.", True) summary_file_path = self._finalize_summary_report(self.current_run_output_path)
# Add summary file to produced_files_log AFTER it's written, if successful.
# This needs to be handled carefully to avoid adding it if _finalize_summary_report fails.
# The _finalize_summary_report could internally call _add_produced_file_entry,
# or we add it here based on its return value.
# For now, _finalize_summary_report does not call it to prevent recursion on error.
self._log_event(f"Profile '{profile_name}' execution cycle finished. Summary report generation attempt at: {summary_file_path if summary_file_path else 'N/A'}.", True)
self.is_running = False self.is_running = False
def _finalize_summary_report(self, run_output_path: Optional[str]) -> Optional[str]:
def _finalize_summary_report(self, run_output_path: Optional[str]) -> None:
if not run_output_path: if not run_output_path:
logger.warning("No run output path available, cannot save summary report to specific location.") logger.warning("No run output path available, cannot save summary report to specific location.")
# Could save to a default location or just log it. For now, just log if no path.
logger.info(f"Execution Summary for '{self.profile.get('profile_name')}':\n{json.dumps(self.profile_execution_summary, indent=2)}") logger.info(f"Execution Summary for '{self.profile.get('profile_name')}':\n{json.dumps(self.profile_execution_summary, indent=2)}")
return return None
sane_profile_name = sanitize_filename_component(self.profile.get("profile_name", "profile_run")) sane_profile_name = sanitize_filename_component(self.profile.get("profile_name", "profile_run"))
# Use a consistent timestamp for the summary that matches the folder, if possible, or a new one.
# For simplicity, use current time for summary filename.
summary_filename = f"_{sane_profile_name}_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" summary_filename = f"_{sane_profile_name}_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
summary_filepath = os.path.join(run_output_path, summary_filename) summary_filepath = os.path.join(run_output_path, summary_filename)
@ -433,35 +505,27 @@ class ProfileExecutor:
with open(summary_filepath, 'w', encoding='utf-8') as f_summary: with open(summary_filepath, 'w', encoding='utf-8') as f_summary:
json.dump(self.profile_execution_summary, f_summary, indent=2, ensure_ascii=False) json.dump(self.profile_execution_summary, f_summary, indent=2, ensure_ascii=False)
logger.info(f"Execution summary report saved to: {summary_filepath}") logger.info(f"Execution summary report saved to: {summary_filepath}")
self._add_produced_file_entry("Summary", "Execution Report", summary_filepath, "Success") return summary_filepath
except Exception as e: except Exception as e:
logger.error(f"Failed to save execution summary report to '{summary_filepath}': {e}") logger.error(f"Failed to save execution summary report to '{summary_filepath}': {e}")
self._add_produced_file_entry("Summary", "Execution Report", "", "Failed", str(e)) return None
def request_stop(self) -> None: def request_stop(self) -> None:
self._log_event("Stop requested for current profile execution...", True) self._log_event("Stop requested for current profile execution...", True)
self._stop_requested = True self._stop_requested = True
if self.gdb_session:
# This is a soft stop. GDB might be busy.
# A more forceful stop might involve interrupting gdb_session.child if possible.
pass
def _cleanup_session(self) -> None: def _cleanup_session(self) -> None:
if self.gdb_session and self.gdb_session.is_alive(): if self.gdb_session and self.gdb_session.is_alive():
self._log_event("Cleaning up GDB session...", False) self._log_event("Cleaning up GDB session...", False)
quit_timeout = self._get_setting("timeouts", "gdb_quit", 10) quit_timeout = self._get_setting("timeouts", "gdb_quit", 10)
try: try:
# Check if the inferior process might still be running
# This can be complex; for now, we assume quit will handle it or timeout
# if self.gdb_session.child.isalive(): # Simplified check
# kill_timeout = self._get_setting("timeouts", "kill_program", 20)
# self.gdb_output_writer(self.gdb_session.kill_program(timeout=kill_timeout))
pass
except Exception as e_kill:
logger.warning(f"Exception during potential kill in cleanup: {e_kill}")
finally:
self.gdb_session.quit(timeout=quit_timeout) self.gdb_session.quit(timeout=quit_timeout)
self.gdb_output_writer("GDB session quit during cleanup.\n") self.gdb_output_writer("GDB session quit during cleanup.\n")
except Exception as e_quit:
logger.error(f"Exception during GDB quit in cleanup: {e_quit}")
# Ensure gdb_session is None even if quit fails, to prevent reuse.
finally:
self.gdb_session = None self.gdb_session = None
logger.info("ProfileExecutor GDB session cleaned up.") elif self.gdb_session: # Session object exists but not alive
self.gdb_session = None # Clear it
logger.info("ProfileExecutor GDB session resources attempted cleanup.")