# File: cpp_python_debug/gui/action_editor_window.py import tkinter as tk from tkinter import ttk, scrolledtext, messagebox, filedialog import logging import os import threading from typing import Dict, Any, Optional, List, Union, Tuple from .dialogs import FunctionSelectorDialog, SymbolListViewerDialog, SymbolAnalysisProgressDialog from ..core.gdb_controller import GDBSession from ..core.config_manager import AppSettings from ..core.gdb_interactive_inspector import GDBInteractiveInspector logger = logging.getLogger(__name__) class ActionEditorWindow(tk.Toplevel): def __init__(self, parent: tk.Widget, action_data: Optional[Dict[str, Any]] = None, is_new: bool = True, target_executable_path: Optional[str] = None, app_settings: Optional[AppSettings] = None, symbol_analysis_data: Optional[Dict[str, Any]] = None, program_parameters_for_scope: Optional[str] = ""): super().__init__(parent) self.parent_window = parent self.is_new_action = is_new self.result: Optional[Dict[str, Any]] = None self.target_executable_path = target_executable_path self.app_settings = app_settings self.symbol_analysis_data = symbol_analysis_data self.program_parameters_for_scope = program_parameters_for_scope if program_parameters_for_scope is not None else "" title = "Add New Action" if self.is_new_action else "Edit Action" self.title(title) window_width = 700 window_height = 680 # Adjusted height for the new checkbox parent_x = self.parent_window.winfo_x() parent_y = self.parent_window.winfo_y() parent_width = self.parent_window.winfo_width() parent_height = self.parent_window.winfo_height() position_x = parent_x + (parent_width // 2) - (window_width // 2) position_y = parent_y + (parent_height // 2) - (window_height // 2) self.geometry(f'{window_width}x{window_height}+{position_x}+{position_y}') self.resizable(False, False) self.transient(parent) self.grab_set() self.breakpoint_var = tk.StringVar() self.output_format_var = tk.StringVar() self.output_directory_var = tk.StringVar() self.filename_pattern_var = tk.StringVar() self.continue_after_dump_var = tk.BooleanVar() self.dump_on_every_hit_var = tk.BooleanVar() # NEW variable self._initial_action_data = action_data.copy() if action_data else None self.gdb_inspector_instance: Optional[GDBInteractiveInspector] = None self.scope_inspection_dialog: Optional[SymbolAnalysisProgressDialog] = None self._scope_variables_cache: Dict[Tuple[str, str, str], Dict[str, List[str]]] = {} self._last_bp_for_scope_cache: str = "" self._create_widgets() self._load_action_data(action_data) self._update_browse_button_states() self.breakpoint_var.trace_add("write", self._on_breakpoint_var_change) self.protocol("WM_DELETE_WINDOW", self._on_cancel) self.wait_window() def _on_breakpoint_var_change(self, *args): self._update_browse_button_states() def _create_widgets(self) -> None: main_frame = ttk.Frame(self, padding="15") main_frame.pack(expand=True, fill=tk.BOTH) main_frame.columnconfigure(1, weight=1) row_idx = 0 ttk.Label(main_frame, text="Breakpoint Location:").grid(row=row_idx, column=0, sticky=tk.W, padx=5, pady=5) bp_input_frame = ttk.Frame(main_frame) bp_input_frame.grid(row=row_idx, column=1, columnspan=2, sticky="ew", padx=5, pady=5) bp_input_frame.columnconfigure(0, weight=1) bp_input_frame.columnconfigure(1, weight=0) bp_input_frame.columnconfigure(2, weight=0) self.bp_entry = ttk.Entry(bp_input_frame, textvariable=self.breakpoint_var) self.bp_entry.grid(row=0, column=0, sticky="ew", padx=(0,5)) self.browse_funcs_button = ttk.Button(bp_input_frame, text="Funcs...", command=self._browse_functions, width=8) self.browse_funcs_button.grid(row=0, column=1, sticky=tk.E, padx=(0,2)) self.browse_files_button = ttk.Button(bp_input_frame, text="Files...", command=self._browse_source_files, width=8) self.browse_files_button.grid(row=0, column=2, sticky=tk.E, padx=(0,5)) row_idx += 1 bp_help_label = ttk.Label(main_frame, text="(e.g., main, file.cpp:123, MyClass::foo)", foreground="gray") bp_help_label.grid(row=row_idx, column=1, columnspan=2, sticky=tk.W, padx=7, pady=(0,10)) row_idx += 1 vars_label_frame = ttk.LabelFrame(main_frame, text="Variables to Dump", padding=(5,2)) vars_label_frame.grid(row=row_idx, column=0, sticky="new", padx=5, pady=5, rowspan=2) vars_label_frame.columnconfigure(0, weight=1) self.browse_globals_button = ttk.Button(vars_label_frame, text="Globals...", command=self._browse_global_variables, width=12) self.browse_globals_button.pack(side=tk.TOP, anchor=tk.W, pady=(0,3), padx=2) self.browse_scope_vars_button = ttk.Button(vars_label_frame, text="Scope Vars...", command=self._browse_scope_variables, width=12) self.browse_scope_vars_button.pack(side=tk.TOP, anchor=tk.W, pady=(0,3), padx=2) self.test_action_button = ttk.Button(vars_label_frame, text="Test Action...", command=self._test_action_placeholder, width=12, state=tk.DISABLED) self.test_action_button.pack(side=tk.TOP, anchor=tk.W, pady=(0,0), padx=2) self.variables_text = scrolledtext.ScrolledText(main_frame, wrap=tk.WORD, height=5, width=58, font=("Consolas", 9)) self.variables_text.grid(row=row_idx, column=1, columnspan=2, sticky="nsew", padx=5, pady=5, rowspan=2) row_idx += 2 vars_help_label = ttk.Label(main_frame, text="(One variable/expression per line)", foreground="gray") vars_help_label.grid(row=row_idx, column=1, columnspan=2, sticky=tk.W, padx=7, pady=(0,10)) row_idx += 1 ttk.Label(main_frame, text="Output Format:").grid(row=row_idx, column=0, sticky=tk.W, padx=5, pady=5) self.output_format_combo = ttk.Combobox(main_frame, textvariable=self.output_format_var, values=["json", "csv"], state="readonly", width=10) self.output_format_combo.grid(row=row_idx, column=1, sticky="w", padx=5, pady=5) row_idx += 1 ttk.Label(main_frame, text="Output Directory:").grid(row=row_idx, column=0, sticky=tk.W, padx=5, pady=5) self.output_dir_entry = ttk.Entry(main_frame, textvariable=self.output_directory_var) self.output_dir_entry.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=5) ttk.Button(main_frame, text="Browse...", command=self._browse_output_dir).grid(row=row_idx, column=2, padx=5, pady=5) row_idx += 1 ttk.Label(main_frame, text="Filename Pattern:").grid(row=row_idx, column=0, sticky=tk.W, padx=5, pady=5) self.filename_pattern_entry = ttk.Entry(main_frame, textvariable=self.filename_pattern_var) self.filename_pattern_entry.grid(row=row_idx, column=1, columnspan=2, sticky="ew", padx=5, pady=5) row_idx += 1 pattern_help_label = ttk.Label(main_frame, text="(Placeholders: {profile_name}, {breakpoint}, {variable}, {timestamp}, {format})", foreground="gray") pattern_help_label.grid(row=row_idx, column=1, columnspan=2, sticky=tk.W, padx=7, pady=(0,10)) row_idx += 1 # Action Behavior Options Frame behavior_frame = ttk.Frame(main_frame) behavior_frame.grid(row=row_idx, column=0, columnspan=3, sticky="ew", pady=(5,0)) self.continue_check = ttk.Checkbutton(behavior_frame, text="Continue execution after this action completes", variable=self.continue_after_dump_var) self.continue_check.pack(side=tk.LEFT, anchor=tk.W, padx=5, pady=2) self.dump_every_hit_check = ttk.Checkbutton(behavior_frame, text="Dump variables every time breakpoint is hit", variable=self.dump_on_every_hit_var) self.dump_every_hit_check.pack(side=tk.LEFT, anchor=tk.W, padx=(20,5), pady=2) row_idx += 1 buttons_frame = ttk.Frame(main_frame) buttons_frame.grid(row=row_idx, column=0, columnspan=3, pady=(15,5), sticky="e") ttk.Button(buttons_frame, text="OK", command=self._on_ok, width=10).pack(side=tk.RIGHT, padx=5) ttk.Button(buttons_frame, text="Cancel", command=self._on_cancel, width=10).pack(side=tk.RIGHT, padx=5) def _update_browse_button_states(self) -> None: analysis_available = bool(self.symbol_analysis_data) target_is_valid_file = bool(self.target_executable_path and os.path.isfile(str(self.target_executable_path))) gdb_exe_is_configured = bool(self.app_settings and self.app_settings.get_setting("general", "gdb_executable_path")) can_do_live_gdb_query = target_is_valid_file and gdb_exe_is_configured self.browse_funcs_button.config(state=tk.NORMAL if analysis_available or can_do_live_gdb_query else tk.DISABLED) self.browse_files_button.config(state=tk.NORMAL if analysis_available and self.symbol_analysis_data.get("symbols", {}).get("source_files") else tk.DISABLED) self.browse_globals_button.config(state=tk.NORMAL if analysis_available and self.symbol_analysis_data.get("symbols", {}).get("global_variables") else tk.DISABLED) self.browse_scope_vars_button.config(state=tk.NORMAL if can_do_live_gdb_query and self.breakpoint_var.get().strip() else tk.DISABLED) self.test_action_button.config(state=tk.DISABLED) def _load_action_data(self, action_data: Optional[Dict[str, Any]]) -> None: _DEFAULT_ACTION_LOCAL = { "breakpoint_location": "main", "variables_to_dump": ["my_variable"], "output_format": "json", "output_directory": "./debug_dumps", "filename_pattern": "{profile_name}_{breakpoint}_{variable}_{timestamp}.{format}", "continue_after_dump": True, "dump_on_every_hit": True # NEW default } data_to_load = action_data if action_data else _DEFAULT_ACTION_LOCAL.copy() self.breakpoint_var.set(data_to_load.get("breakpoint_location", _DEFAULT_ACTION_LOCAL["breakpoint_location"])) variables_list = data_to_load.get("variables_to_dump", _DEFAULT_ACTION_LOCAL["variables_to_dump"]) if isinstance(variables_list, list): self.variables_text.insert(tk.END, "\n".join(variables_list)) else: self.variables_text.insert(tk.END, str(variables_list)) self.output_format_var.set(data_to_load.get("output_format", _DEFAULT_ACTION_LOCAL["output_format"])) self.output_directory_var.set(data_to_load.get("output_directory", _DEFAULT_ACTION_LOCAL["output_directory"])) self.filename_pattern_var.set(data_to_load.get("filename_pattern", _DEFAULT_ACTION_LOCAL["filename_pattern"])) self.continue_after_dump_var.set(data_to_load.get("continue_after_dump", _DEFAULT_ACTION_LOCAL["continue_after_dump"])) self.dump_on_every_hit_var.set(data_to_load.get("dump_on_every_hit", _DEFAULT_ACTION_LOCAL["dump_on_every_hit"])) # Load new var self._last_bp_for_scope_cache = self.breakpoint_var.get() def _browse_output_dir(self) -> None: current_path = self.output_directory_var.get() initial_dir = current_path if current_path and os.path.isdir(current_path) else None path = filedialog.askdirectory( title="Select Output Directory for Dumps", initialdir=initial_dir, parent=self ) if path: self.output_directory_var.set(path) def _validate_data(self) -> bool: if not self.breakpoint_var.get().strip(): messagebox.showerror("Validation Error", "Breakpoint Location cannot be empty.", parent=self) return False variables_str = self.variables_text.get("1.0", tk.END).strip() if not variables_str: messagebox.showerror("Validation Error", "Variables to Dump cannot be empty.", parent=self) return False if not self.output_directory_var.get().strip(): messagebox.showerror("Validation Error", "Output Directory cannot be empty.", parent=self) return False if not self.filename_pattern_var.get().strip(): messagebox.showerror("Validation Error", "Filename Pattern cannot be empty.", parent=self) return False if not any(p in self.filename_pattern_var.get() for p in ["{breakpoint}", "{variable}", "{timestamp}"]): messagebox.showwarning("Validation Warning", "Filename Pattern seems to be missing common placeholders like {breakpoint}, {variable}, or {timestamp}. This might lead to overwritten files.", parent=self) return True def _on_ok(self) -> None: if not self._validate_data(): return variables_list = [line.strip() for line in self.variables_text.get("1.0", tk.END).strip().splitlines() if line.strip()] if not variables_list : messagebox.showerror("Validation Error", "Variables to Dump cannot be empty after processing.", parent=self) return self.result = { "breakpoint_location": self.breakpoint_var.get().strip(), "variables_to_dump": variables_list, "output_format": self.output_format_var.get(), "output_directory": self.output_directory_var.get().strip(), "filename_pattern": self.filename_pattern_var.get().strip(), "continue_after_dump": self.continue_after_dump_var.get(), "dump_on_every_hit": self.dump_on_every_hit_var.get() # NEW: save this value } self.destroy() def _on_cancel(self) -> None: self.result = None self.destroy() def get_result(self) -> Optional[Dict[str, Any]]: return self.result def _browse_functions(self) -> None: functions_to_show_dicts: List[Dict[str, Any]] = [] functions_to_show_names: List[str] = [] source_of_functions = "live GDB query" if self.symbol_analysis_data and isinstance(self.symbol_analysis_data, dict): analyzed_exe = self.symbol_analysis_data.get("analyzed_executable_path") if analyzed_exe and os.path.normpath(analyzed_exe) == os.path.normpath(str(self.target_executable_path)): cached_functions_data = self.symbol_analysis_data.get("symbols", {}).get("functions", []) if isinstance(cached_functions_data, list) and cached_functions_data: if all(isinstance(item, dict) for item in cached_functions_data): functions_to_show_dicts = cached_functions_data source_of_functions = "cached analysis (rich)" elif all(isinstance(item, str) for item in cached_functions_data): functions_to_show_names = cached_functions_data source_of_functions = "cached analysis (names only)" if not functions_to_show_dicts and not functions_to_show_names: # Fallback to live query if not self.target_executable_path or not os.path.isfile(str(self.target_executable_path)): messagebox.showerror("Error", "Target executable for the profile is not set or invalid. Cannot browse functions.", parent=self) return if not self.app_settings: messagebox.showerror("Error", "Application settings not available. Cannot determine GDB path.", parent=self) return gdb_exe_path = self.app_settings.get_setting("general", "gdb_executable_path") if not gdb_exe_path or not os.path.isfile(gdb_exe_path): messagebox.showerror("GDB Error", f"GDB executable not found or not configured: {gdb_exe_path}", parent=self) return self.config(cursor="watch") self.update_idletasks() temp_gdb_session: Optional[GDBSession] = None live_query_error = False try: logger.info(f"Performing live GDB query for functions. Target: {self.target_executable_path}") temp_gdb_session = GDBSession(gdb_exe_path, str(self.target_executable_path), None, {}) startup_timeout = self.app_settings.get_setting("timeouts", "gdb_start", 30) // 2 command_timeout = self.app_settings.get_setting("timeouts", "gdb_command", 30) // 2 temp_gdb_session.start(timeout=startup_timeout) if not temp_gdb_session.symbols_found: messagebox.showwarning("No Debug Symbols", "GDB reported no debugging symbols in the executable. " "The function list (live query) might be empty or incomplete.", parent=self) functions_from_gdb_session = temp_gdb_session.list_functions(timeout=command_timeout) functions_to_show_names = functions_from_gdb_session # list_functions returns List[str] in GDBSession except Exception as e: logger.error(f"Error during live function query: {e}", exc_info=True) messagebox.showerror("GDB Query Error", f"Could not retrieve functions from GDB: {e}", parent=self) live_query_error = True finally: if temp_gdb_session and temp_gdb_session.is_alive(): quit_timeout = self.app_settings.get_setting("timeouts", "gdb_quit", 10) // 2 try: temp_gdb_session.quit(timeout=quit_timeout) except Exception: pass self.config(cursor="") if live_query_error: return selected_function_name: Optional[str] = None if functions_to_show_dicts: dialog = SymbolListViewerDialog(self, functions_to_show_dicts, title=f"Select Function ({source_of_functions})", return_full_dict=False) # Get only 'name' result_from_dialog = dialog.result if isinstance(result_from_dialog, str): selected_function_name = result_from_dialog elif functions_to_show_names: # This branch will be hit if live query was used or cache was List[str] dialog = FunctionSelectorDialog(self, functions_to_show_names, title=f"Select Function ({source_of_functions})") selected_function_name = dialog.result else: messagebox.showinfo("No Functions", f"No functions found (source: {source_of_functions}). Ensure target is compiled with debug symbols.", parent=self) return if selected_function_name: self.breakpoint_var.set(selected_function_name) self._last_bp_for_scope_cache = selected_function_name def _browse_source_files(self) -> None: if not self.symbol_analysis_data or not self.symbol_analysis_data.get("symbols", {}).get("source_files"): messagebox.showinfo("No Source Data", "Symbol analysis data with source files is not available.", parent=self) return source_files_list_of_dicts = self.symbol_analysis_data["symbols"]["source_files"] if not source_files_list_of_dicts: messagebox.showinfo("No Source Files", "No source files found in the analysis.", parent=self) return dialog = SymbolListViewerDialog(self, source_files_list_of_dicts, title="Select Source File (from analysis)", return_full_dict=False) # Get only 'path' selected_file_path = dialog.result if selected_file_path and isinstance(selected_file_path, str): normalized_path = os.path.normpath(selected_file_path) self.breakpoint_var.set(f"{normalized_path}:") self.bp_entry.focus_set() self.bp_entry.icursor(tk.END) self._last_bp_for_scope_cache = self.breakpoint_var.get() def _browse_global_variables(self) -> None: if not self.symbol_analysis_data or not self.symbol_analysis_data.get("symbols", {}).get("global_variables"): messagebox.showinfo("No Global Variables Data", "Symbol analysis data with global variables is not available.", parent=self) return global_vars_list_of_dicts = self.symbol_analysis_data["symbols"]["global_variables"] if not global_vars_list_of_dicts: messagebox.showinfo("No Global Variables", "No global variables found in the analysis.", parent=self) return dialog = SymbolListViewerDialog(self, global_vars_list_of_dicts, title="Select Global Variables to Dump (from analysis)", allow_multiple_selection=True, return_full_dict=False) # Get only 'name' selected_vars = dialog.result if selected_vars and isinstance(selected_vars, list): current_vars_text = self.variables_text.get("1.0", tk.END).strip() current_vars_list = [line.strip() for line in current_vars_text.splitlines() if line.strip()] new_vars_added_count = 0 for var_item in selected_vars: if isinstance(var_item, str) and var_item not in current_vars_list: if current_vars_text and not current_vars_text.endswith("\n"): self.variables_text.insert(tk.END, f"\n{var_item}") else: self.variables_text.insert(tk.END, var_item + ("\n" if not current_vars_text else "\n")) current_vars_text = self.variables_text.get("1.0", tk.END).strip() # Update after insert current_vars_list.append(var_item) # Keep track new_vars_added_count +=1 if new_vars_added_count > 0: self.variables_text.see(tk.END) logger.info(f"Added {new_vars_added_count} global variables to dump list.") def _browse_scope_variables(self) -> None: bp_loc = self.breakpoint_var.get().strip() if not bp_loc: messagebox.showerror("Input Error", "Breakpoint Location must be specified to browse scope variables.", parent=self) return target_exe = str(self.target_executable_path) if not target_exe or not os.path.isfile(target_exe): messagebox.showerror("Configuration Error", "Valid Target Executable path is required.", parent=self) return if not self.app_settings: messagebox.showerror("Configuration Error", "Application settings are not available.", parent=self) return gdb_exe = self.app_settings.get_setting("general", "gdb_executable_path") if not gdb_exe or not os.path.isfile(gdb_exe): messagebox.showerror("Configuration Error", "GDB executable not configured or not found.", parent=self) return current_program_args = self.program_parameters_for_scope cache_key = (target_exe, bp_loc, current_program_args) if cache_key in self._scope_variables_cache: logger.info(f"Using cached scope variables for: {cache_key}") cached_data = self._scope_variables_cache[cache_key] self._finalize_scope_variables_fetch(cached_data, "Scope data loaded from cache.", False, True) return logger.info(f"No cache hit for scope variables: {cache_key}. Fetching live.") self._last_bp_for_scope_cache = bp_loc self.gdb_inspector_instance = GDBInteractiveInspector(gdb_exe, self.app_settings) self.scope_inspection_dialog = SymbolAnalysisProgressDialog(self) self.scope_inspection_dialog.title("Inspecting Scope") self.scope_inspection_dialog.set_status(f"Attempting to reach breakpoint '{bp_loc}' and list variables...") self.scope_inspection_dialog.log_message("Starting GDB for scope inspection...") self.attributes('-disabled', True) thread = threading.Thread(target=self._fetch_scope_variables_thread, args=(target_exe, bp_loc, current_program_args, cache_key), daemon=True) thread.start() def _fetch_scope_variables_thread(self, target_exe: str, bp_loc: str, prog_args: str, cache_key_for_storage: Tuple[str,str,str]): results: Dict[str, List[str]] = {"locals": [], "args": []} error_occurred = False final_status_msg = "Scope inspection completed." fetched_data_successfully = False def inspector_status_update(msg: str): if self.scope_inspection_dialog and self.scope_inspection_dialog.winfo_exists(): self.after(0, self.scope_inspection_dialog.log_message, msg) self.after(0, self.scope_inspection_dialog.set_status, msg) if self.gdb_inspector_instance: try: results = self.gdb_inspector_instance.get_variables_in_scope( target_executable=target_exe, breakpoint_location=bp_loc, program_args=prog_args, status_callback=inspector_status_update ) if results.get("locals") or results.get("args"): fetched_data_successfully = True self._scope_variables_cache[cache_key_for_storage] = results final_status_msg = "Scope inspection successful." elif self.scope_inspection_dialog and self.scope_inspection_dialog.winfo_exists() and \ not "Error:" in self.scope_inspection_dialog.status_label_var.get(): final_status_msg = "No local variables or arguments found at breakpoint, or it was not hit." except Exception as e: logger.error(f"Exception in _fetch_scope_variables_thread: {e}", exc_info=True) final_status_msg = f"Error during scope inspection: {e}" error_occurred = True self.after(0, self._finalize_scope_variables_fetch, results, final_status_msg, error_occurred, fetched_data_successfully) def _finalize_scope_variables_fetch(self, scope_vars_data: Dict[str, List[str]], status_msg: str, error_occurred: bool, data_was_fetched: bool): if self.scope_inspection_dialog and self.scope_inspection_dialog.winfo_exists(): self.scope_inspection_dialog.analysis_complete_or_failed(not error_occurred and data_was_fetched) self.scope_inspection_dialog.set_status(status_msg) if self.winfo_exists(): self.attributes('-disabled', False) self.focus_set() if error_occurred: messagebox.showerror("Scope Inspection Error", status_msg, parent=self) return all_scope_vars = scope_vars_data.get("args", []) + scope_vars_data.get("locals", []) if not all_scope_vars and data_was_fetched: messagebox.showinfo("Scope Variables", "No local variables or arguments found at the specified breakpoint, or it was not hit.", parent=self) return elif not data_was_fetched and not error_occurred: if not status_msg.startswith("Scope data loaded from cache."): # Avoid redundant message for cache messagebox.showinfo("Scope Variables", status_msg, parent=self) return if all_scope_vars: dialog = SymbolListViewerDialog(self, sorted(list(set(all_scope_vars))), # Use unique sorted list title="Select Variables from Scope", allow_multiple_selection=True, return_full_dict=False) # Get only names selected_vars = dialog.result if selected_vars and isinstance(selected_vars, list): current_vars_text = self.variables_text.get("1.0", tk.END).strip() current_vars_list = [line.strip() for line in current_vars_text.splitlines() if line.strip()] new_vars_added_count = 0 for var_name in selected_vars: if isinstance(var_name, str) and var_name not in current_vars_list: if current_vars_text and not current_vars_text.endswith("\n"): self.variables_text.insert(tk.END, f"\n{var_name}") else: self.variables_text.insert(tk.END, var_name + ("\n" if not current_vars_text else "\n")) current_vars_text = self.variables_text.get("1.0", tk.END).strip() current_vars_list.append(var_name) new_vars_added_count +=1 if new_vars_added_count > 0: self.variables_text.see(tk.END) logger.info(f"Added {new_vars_added_count} scope variables to dump list.") def _test_action_placeholder(self) -> None: messagebox.showinfo("Not Implemented", "The 'Test Action' feature is not yet implemented.", parent=self)