SXXXXXXX_CppPythonDebug/cpp_python_debug/gui/action_editor_window.py
2025-06-09 10:52:43 +02:00

864 lines
34 KiB
Python

# File: cpp_python_debug/gui/action_editor_window.py
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import logging
import os
import threading
from typing import Dict, Any, Optional, List, Union, Tuple, Callable, TYPE_CHECKING
# Import assoluti
from cpp_python_debug.gui.dialogs import (
FunctionSelectorDialog,
SymbolListViewerDialog,
SymbolAnalysisProgressDialog,
)
from cpp_python_debug.core.gdb_controller import (
GDBSession,
) # Usato in _browse_functions per query live
# from cpp_python_debug.core.config_manager import AppSettings # Passato come istanza, non importato direttamente se non per TYPE_CHECKING
from cpp_python_debug.core.gdb_interactive_inspector import GDBInteractiveInspector
# from cpp_python_debug.core.file_utils import sanitize_filename_component # Non usato qui
if TYPE_CHECKING:
from cpp_python_debug.core.config_manager import AppSettings
# from .main_window import GDBGui # Parent è tk.Widget, non GDBGui qui
logger = logging.getLogger(__name__)
# Default per una nuova azione, per coerenza se action_data è None
DEFAULT_ACTION_CONFIG_EDITOR = {
"breakpoint_location": "main",
"variables_to_dump": ["my_variable"],
"output_format": "json",
"output_directory": "./debug_dumps",
"filename_pattern": "{profile_name}_{app_name}_{breakpoint}_{variable}_{timestamp}.{format}",
"continue_after_dump": True,
"dump_on_every_hit": True,
}
class ActionEditorWindow(tk.Toplevel):
def __init__(
self,
parent: tk.Widget, # Parent generico tk.Widget
action_data: Optional[Dict[str, Any]] = None,
is_new: bool = True,
target_executable_path: Optional[str] = None,
app_settings: Optional[
"AppSettings"
] = None, # Usa stringa per forward reference
symbol_analysis_data: Optional[Dict[str, Any]] = None,
program_parameters_for_scope: Optional[str] = "",
):
super().__init__(parent)
self.parent_window = parent
self.is_new_action = is_new
self.result: Optional[Dict[str, Any]] = None
self.target_executable_path: Optional[str] = target_executable_path
self.app_settings: Optional["AppSettings"] = app_settings
self.symbol_analysis_data: Optional[Dict[str, Any]] = symbol_analysis_data
self.program_parameters_for_scope: str = (
program_parameters_for_scope
if program_parameters_for_scope is not None
else ""
)
title = "Add New Action" if self.is_new_action else "Edit Action"
self.title(title)
window_width = 700
window_height = 680
self.parent_window.update_idletasks() # Assicura che la geometria del parent sia nota
parent_x = self.parent_window.winfo_x()
parent_y = self.parent_window.winfo_y()
parent_width = self.parent_window.winfo_width()
parent_height = self.parent_window.winfo_height()
position_x = max(0, parent_x + (parent_width // 2) - (window_width // 2))
position_y = max(0, parent_y + (parent_height // 2) - (window_height // 2))
self.geometry(f"{window_width}x{window_height}+{position_x}+{position_y}")
self.resizable(False, False)
self.transient(parent)
self.grab_set()
self.breakpoint_var = tk.StringVar()
self.output_format_var = tk.StringVar()
self.output_directory_var = tk.StringVar()
self.filename_pattern_var = tk.StringVar()
self.continue_after_dump_var = tk.BooleanVar()
self.dump_on_every_hit_var = tk.BooleanVar()
self._initial_action_data = (
action_data.copy() if action_data else None
) # Salva una copia se necessario
self.gdb_inspector_instance: Optional[GDBInteractiveInspector] = None
self.scope_inspection_dialog: Optional[SymbolAnalysisProgressDialog] = None
self._scope_variables_cache: Dict[
Tuple[str, str, str], Dict[str, List[str]]
] = {}
self._last_bp_for_scope_cache: str = ""
self._create_widgets()
self._load_action_data(action_data)
self._update_browse_button_states()
self.breakpoint_var.trace_add("write", self._on_breakpoint_var_change)
self.protocol("WM_DELETE_WINDOW", self._on_cancel)
# wait_window sarà chiamato dal ProfileManagerWindow
def _on_breakpoint_var_change(self, *args):
self._update_browse_button_states()
def _create_widgets(self) -> None:
main_frame = ttk.Frame(self, padding="15")
main_frame.pack(expand=True, fill=tk.BOTH)
main_frame.columnconfigure(1, weight=1) # Colonna degli entry si espande
row_idx = 0
# Breakpoint Location
ttk.Label(main_frame, text="Breakpoint Location:").grid(
row=row_idx, column=0, sticky=tk.W, padx=5, pady=5
)
bp_input_frame = ttk.Frame(main_frame)
bp_input_frame.grid(
row=row_idx, column=1, columnspan=2, sticky="ew", padx=5, pady=5
)
bp_input_frame.columnconfigure(0, weight=1) # Entry si espande
self.bp_entry = ttk.Entry(bp_input_frame, textvariable=self.breakpoint_var)
self.bp_entry.grid(row=0, column=0, sticky="ew", padx=(0, 5))
self.browse_funcs_button = ttk.Button(
bp_input_frame, text="Funcs...", command=self._browse_functions, width=8
)
self.browse_funcs_button.grid(row=0, column=1, sticky=tk.E, padx=(0, 2))
self.browse_files_button = ttk.Button(
bp_input_frame, text="Files...", command=self._browse_source_files, width=8
)
self.browse_files_button.grid(row=0, column=2, sticky=tk.E, padx=(0, 5))
row_idx += 1
ttk.Label(
main_frame,
text="(e.g., main, file.cpp:123, MyClass::foo)",
foreground="gray",
).grid(row=row_idx, column=1, columnspan=2, sticky=tk.W, padx=7, pady=(0, 10))
row_idx += 1
# Variables to Dump
vars_label_frame = ttk.LabelFrame(
main_frame, text="Variables to Dump", padding=(5, 2)
)
vars_label_frame.grid(
row=row_idx, column=0, sticky="new", padx=5, pady=5, rowspan=2
) # rowspan per allineare con text box
vars_label_frame.columnconfigure(0, weight=1)
self.browse_globals_button = ttk.Button(
vars_label_frame,
text="Globals...",
command=self._browse_global_variables,
width=12,
)
self.browse_globals_button.pack(side=tk.TOP, anchor=tk.W, pady=(0, 3), padx=2)
self.browse_scope_vars_button = ttk.Button(
vars_label_frame,
text="Scope Vars...",
command=self._browse_scope_variables,
width=12,
)
self.browse_scope_vars_button.pack(
side=tk.TOP, anchor=tk.W, pady=(0, 3), padx=2
)
self.test_action_button = ttk.Button(
vars_label_frame,
text="Test Action...",
command=self._test_action_placeholder,
width=12,
state=tk.DISABLED,
) # Funzionalità non implementata
self.test_action_button.pack(side=tk.TOP, anchor=tk.W, pady=(0, 0), padx=2)
self.variables_text = scrolledtext.ScrolledText(
main_frame, wrap=tk.WORD, height=5, width=58, font=("Consolas", 9)
)
self.variables_text.grid(
row=row_idx,
column=1,
columnspan=2,
sticky="nsew",
padx=5,
pady=5,
rowspan=2,
)
row_idx += 2
ttk.Label(
main_frame, text="(One variable/expression per line)", foreground="gray"
).grid(row=row_idx, column=1, columnspan=2, sticky=tk.W, padx=7, pady=(0, 10))
row_idx += 1
# Output Format
ttk.Label(main_frame, text="Output Format:").grid(
row=row_idx, column=0, sticky=tk.W, padx=5, pady=5
)
self.output_format_combo = ttk.Combobox(
main_frame,
textvariable=self.output_format_var,
values=["json", "csv"],
state="readonly",
width=10,
)
self.output_format_combo.grid(row=row_idx, column=1, sticky="w", padx=5, pady=5)
row_idx += 1
# Output Directory
ttk.Label(main_frame, text="Output Directory:").grid(
row=row_idx, column=0, sticky=tk.W, padx=5, pady=5
)
self.output_dir_entry = ttk.Entry(
main_frame, textvariable=self.output_directory_var
)
self.output_dir_entry.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=5)
ttk.Button(main_frame, text="Browse...", command=self._browse_output_dir).grid(
row=row_idx, column=2, padx=5, pady=5
)
row_idx += 1
# Filename Pattern
ttk.Label(main_frame, text="Filename Pattern:").grid(
row=row_idx, column=0, sticky=tk.W, padx=5, pady=5
)
self.filename_pattern_entry = ttk.Entry(
main_frame, textvariable=self.filename_pattern_var
)
self.filename_pattern_entry.grid(
row=row_idx, column=1, columnspan=2, sticky="ew", padx=5, pady=5
)
row_idx += 1
ttk.Label(
main_frame,
text="(Placeholders: {profile_name}, {app_name}, {breakpoint}, {variable}, {timestamp}, {format})",
foreground="gray",
).grid(row=row_idx, column=1, columnspan=2, sticky=tk.W, padx=7, pady=(0, 10))
row_idx += 1
# Action Behavior Options
behavior_frame = ttk.Frame(main_frame)
behavior_frame.grid(
row=row_idx, column=0, columnspan=3, sticky="ew", pady=(5, 0)
)
self.continue_check = ttk.Checkbutton(
behavior_frame,
text="Continue execution after this action completes",
variable=self.continue_after_dump_var,
)
self.continue_check.pack(side=tk.LEFT, anchor=tk.W, padx=5, pady=2)
self.dump_every_hit_check = ttk.Checkbutton(
behavior_frame,
text="Dump variables every time breakpoint is hit",
variable=self.dump_on_every_hit_var,
)
self.dump_every_hit_check.pack(side=tk.LEFT, anchor=tk.W, padx=(20, 5), pady=2)
row_idx += 1
# OK/Cancel Buttons
buttons_frame = ttk.Frame(main_frame)
buttons_frame.grid(
row=row_idx, column=0, columnspan=3, pady=(15, 5), sticky="e"
)
ttk.Button(buttons_frame, text="OK", command=self._on_ok, width=10).pack(
side=tk.RIGHT, padx=5
)
ttk.Button(
buttons_frame, text="Cancel", command=self._on_cancel, width=10
).pack(side=tk.RIGHT, padx=5)
def _update_browse_button_states(self) -> None:
analysis_available = bool(self.symbol_analysis_data)
target_exe_str = (
self.target_executable_path
if isinstance(self.target_executable_path, str)
else ""
)
target_is_valid_file = bool(target_exe_str and os.path.isfile(target_exe_str))
gdb_exe_is_configured = False
if self.app_settings:
gdb_exe_path_setting = self.app_settings.get_setting(
"general", "gdb_executable_path"
)
gdb_exe_is_configured = bool(
gdb_exe_path_setting and os.path.isfile(gdb_exe_path_setting)
)
can_do_live_gdb_query = target_is_valid_file and gdb_exe_is_configured
self.browse_funcs_button.config(
state=(
tk.NORMAL
if analysis_available or can_do_live_gdb_query
else tk.DISABLED
)
)
can_browse_files = (
analysis_available
and isinstance(self.symbol_analysis_data, dict)
and isinstance(self.symbol_analysis_data.get("symbols"), dict)
and bool(self.symbol_analysis_data["symbols"].get("source_files"))
)
self.browse_files_button.config(
state=tk.NORMAL if can_browse_files else tk.DISABLED
)
can_browse_globals = (
analysis_available
and isinstance(self.symbol_analysis_data, dict)
and isinstance(self.symbol_analysis_data.get("symbols"), dict)
and bool(self.symbol_analysis_data["symbols"].get("global_variables"))
)
self.browse_globals_button.config(
state=tk.NORMAL if can_browse_globals else tk.DISABLED
)
self.browse_scope_vars_button.config(
state=(
tk.NORMAL
if can_do_live_gdb_query and self.breakpoint_var.get().strip()
else tk.DISABLED
)
)
def _load_action_data(self, action_data: Optional[Dict[str, Any]]) -> None:
# Usa la costante definita a livello di modulo
defaults = DEFAULT_ACTION_CONFIG_EDITOR.copy()
if (
self.app_settings
): # Sovrascrivi il default per output_directory se AppSettings è disponibile
default_output_dir_from_settings = self.app_settings.get_setting(
"general",
"default_dump_output_dir",
# Non fornire un altro fallback qui, se non c'è in AppSettings, il default di DEFAULT_ACTION_CONFIG_EDITOR
# per output_directory verrà usato quando si accede a data_to_load.get(...)
)
if default_output_dir_from_settings: # Se AppSettings fornisce un valore
defaults["output_directory"] = default_output_dir_from_settings
data_to_load = action_data if action_data else defaults
self.breakpoint_var.set(
data_to_load.get("breakpoint_location", defaults["breakpoint_location"])
)
variables_list = data_to_load.get(
"variables_to_dump", defaults["variables_to_dump"]
)
if isinstance(variables_list, list):
self.variables_text.insert(tk.END, "\n".join(variables_list))
else:
self.variables_text.insert(tk.END, str(variables_list)) # Fallback
self.output_format_var.set(
data_to_load.get("output_format", defaults["output_format"])
)
self.output_directory_var.set(
data_to_load.get("output_directory", defaults["output_directory"])
)
self.filename_pattern_var.set(
data_to_load.get("filename_pattern", defaults["filename_pattern"])
)
self.continue_after_dump_var.set(
data_to_load.get("continue_after_dump", defaults["continue_after_dump"])
)
self.dump_on_every_hit_var.set(
data_to_load.get("dump_on_every_hit", defaults["dump_on_every_hit"])
)
self._last_bp_for_scope_cache = self.breakpoint_var.get()
def _browse_output_dir(self) -> None:
current_path = self.output_directory_var.get()
initial_dir = (
current_path
if current_path and os.path.isdir(current_path)
else os.getcwd()
) # Fallback a CWD
path = filedialog.askdirectory(
title="Select Output Directory for Dumps",
initialdir=initial_dir,
parent=self,
)
if path:
self.output_directory_var.set(path)
def _validate_data(self) -> bool:
if not self.breakpoint_var.get().strip():
messagebox.showerror(
"Validation Error", "Breakpoint Location cannot be empty.", parent=self
)
return False
variables_str = self.variables_text.get("1.0", tk.END).strip()
if not variables_str:
messagebox.showerror(
"Validation Error", "Variables to Dump cannot be empty.", parent=self
)
return False
if not self.output_directory_var.get().strip():
messagebox.showerror(
"Validation Error", "Output Directory cannot be empty.", parent=self
)
return False
if not self.filename_pattern_var.get().strip():
messagebox.showerror(
"Validation Error", "Filename Pattern cannot be empty.", parent=self
)
return False
pattern_str = self.filename_pattern_var.get()
if not any(
ph in pattern_str
for ph in ["{breakpoint}", "{variable}", "{timestamp}", "{app_name}"]
):
messagebox.showwarning(
"Pattern Warning",
"Filename Pattern misses common placeholders (e.g., {breakpoint}, {variable}, {timestamp}, {app_name}). This may cause overwritten files.",
parent=self,
)
if "{format}" not in pattern_str:
messagebox.showwarning(
"Pattern Warning",
"Filename Pattern is missing {format}. File extension might be incorrect.",
parent=self,
)
return True
def _on_ok(self) -> None:
if not self._validate_data():
return
variables_list = [
line.strip()
for line in self.variables_text.get("1.0", tk.END).strip().splitlines()
if line.strip()
]
if not variables_list:
messagebox.showerror(
"Validation Error",
"Variables to Dump list is empty after processing.",
parent=self,
)
return
self.result = {
"breakpoint_location": self.breakpoint_var.get().strip(),
"variables_to_dump": variables_list,
"output_format": self.output_format_var.get(),
"output_directory": self.output_directory_var.get().strip(),
"filename_pattern": self.filename_pattern_var.get().strip(),
"continue_after_dump": self.continue_after_dump_var.get(),
"dump_on_every_hit": self.dump_on_every_hit_var.get(),
}
self.destroy()
def _on_cancel(self) -> None:
self.result = None
self.destroy()
def get_result(self) -> Optional[Dict[str, Any]]:
return self.result
def _browse_functions(self) -> None:
target_exe_str = (
self.target_executable_path
if isinstance(self.target_executable_path, str)
else ""
)
functions_to_show_dicts: List[Dict[str, Any]] = []
functions_to_show_names: List[str] = []
source_of_functions = "live GDB query"
if self.symbol_analysis_data and isinstance(self.symbol_analysis_data, dict):
analyzed_exe = self.symbol_analysis_data.get("analyzed_executable_path")
if (
analyzed_exe
and target_exe_str
and os.path.normpath(analyzed_exe) == os.path.normpath(target_exe_str)
):
cached_functions_data = self.symbol_analysis_data.get(
"symbols", {}
).get("functions", [])
if isinstance(cached_functions_data, list) and cached_functions_data:
if all(isinstance(item, dict) for item in cached_functions_data):
functions_to_show_dicts = cached_functions_data
source_of_functions = "cached analysis (rich)"
elif all(isinstance(item, str) for item in cached_functions_data):
functions_to_show_names = cached_functions_data
source_of_functions = "cached analysis (names only)"
if not functions_to_show_dicts and not functions_to_show_names:
if not target_exe_str or not os.path.isfile(target_exe_str):
messagebox.showerror(
"Error", "Target executable not set or invalid.", parent=self
)
return
if not self.app_settings:
messagebox.showerror(
"Error", "Application settings unavailable.", parent=self
)
return
gdb_exe_path = self.app_settings.get_setting(
"general", "gdb_executable_path"
)
if not gdb_exe_path or not os.path.isfile(gdb_exe_path):
messagebox.showerror(
"GDB Error",
f"GDB not configured or found: {gdb_exe_path}",
parent=self,
)
return
self.config(cursor="watch")
self.update_idletasks()
temp_gdb_session: Optional[GDBSession] = None
live_query_error = False
try:
logger.info(f"Live GDB query for functions. Target: {target_exe_str}")
temp_gdb_session = GDBSession(gdb_exe_path, target_exe_str, None, {})
default_timeouts = self.app_settings._get_default_settings().get(
"timeouts", {}
)
startup_timeout = self.app_settings.get_setting(
"timeouts", "gdb_start", default_timeouts.get("gdb_start", 15)
) # Shorter for non-interactive
command_timeout = self.app_settings.get_setting(
"timeouts", "gdb_command", default_timeouts.get("gdb_command", 30)
)
temp_gdb_session.start(timeout=startup_timeout)
if not temp_gdb_session.symbols_found:
messagebox.showwarning(
"No Symbols",
"GDB reported no debug symbols. Function list may be incomplete.",
parent=self,
)
functions_to_show_names = temp_gdb_session.list_functions(
timeout=command_timeout
)
except Exception as e:
logger.error(f"Error live function query: {e}", exc_info=True)
messagebox.showerror(
"GDB Query Error", f"Could not retrieve functions: {e}", parent=self
)
live_query_error = True
finally:
if temp_gdb_session and temp_gdb_session.is_alive():
quit_timeout = self.app_settings.get_setting(
"timeouts", "gdb_quit", default_timeouts.get("gdb_quit", 5)
)
try:
temp_gdb_session.quit(timeout=quit_timeout)
except Exception:
pass
self.config(cursor="")
if live_query_error:
return
selected_function_name: Optional[str] = None
dialog_instance: Optional[
Union[SymbolListViewerDialog, FunctionSelectorDialog]
] = None
if functions_to_show_dicts:
dialog_instance = SymbolListViewerDialog(
self,
functions_to_show_dicts,
title=f"Select Function ({source_of_functions})",
return_full_dict=False,
)
elif functions_to_show_names:
dialog_instance = FunctionSelectorDialog(
self,
functions_to_show_names,
title=f"Select Function ({source_of_functions})",
)
else:
messagebox.showinfo(
"No Functions",
f"No functions found (source: {source_of_functions}).",
parent=self,
)
return
if dialog_instance:
self.wait_window(dialog_instance)
result_from_dialog = dialog_instance.result
if isinstance(result_from_dialog, str):
selected_function_name = result_from_dialog
if selected_function_name:
self.breakpoint_var.set(selected_function_name)
self._last_bp_for_scope_cache = selected_function_name
logger.info(f"BP location set to func: {selected_function_name}")
def _browse_source_files(self) -> None:
if (
not self.symbol_analysis_data
or not isinstance(self.symbol_analysis_data.get("symbols"), dict)
or not self.symbol_analysis_data["symbols"].get("source_files")
):
messagebox.showinfo(
"No Source Data",
"Symbol analysis data with source files is not available.",
parent=self,
)
return
source_files_list = self.symbol_analysis_data["symbols"]["source_files"]
if not source_files_list:
messagebox.showinfo(
"No Source Files", "No source files found in analysis.", parent=self
)
return
dialog = SymbolListViewerDialog(
self,
source_files_list,
title="Select Source File (from analysis)",
return_full_dict=False,
)
self.wait_window(dialog)
selected_file_path = dialog.result
if selected_file_path and isinstance(selected_file_path, str):
normalized_path = os.path.normpath(selected_file_path)
self.breakpoint_var.set(f"{normalized_path}:")
self.bp_entry.focus_set()
self.bp_entry.icursor(tk.END)
self._last_bp_for_scope_cache = self.breakpoint_var.get()
logger.info(f"BP location prefix set to file: {normalized_path}:")
def _browse_global_variables(self) -> None:
if (
not self.symbol_analysis_data
or not isinstance(self.symbol_analysis_data.get("symbols"), dict)
or not self.symbol_analysis_data["symbols"].get("global_variables")
):
messagebox.showinfo(
"No Global Variables Data",
"Symbol analysis data with global variables is not available.",
parent=self,
)
return
global_vars_list = self.symbol_analysis_data["symbols"]["global_variables"]
if not global_vars_list:
messagebox.showinfo(
"No Global Variables",
"No global variables found in analysis.",
parent=self,
)
return
dialog = SymbolListViewerDialog(
self,
global_vars_list,
title="Select Global Variables to Dump (from analysis)",
allow_multiple_selection=True,
return_full_dict=False,
)
self.wait_window(dialog)
selected_vars = dialog.result
if selected_vars and isinstance(selected_vars, list):
current_text = self.variables_text.get("1.0", tk.END).strip()
current_list = [
ln.strip() for ln in current_text.splitlines() if ln.strip()
]
count = 0
for var_item in selected_vars:
if isinstance(var_item, str) and var_item not in current_list:
prefix = (
"\n" if current_text and not current_text.endswith("\n") else ""
)
self.variables_text.insert(tk.END, f"{prefix}{var_item}\n")
current_text = self.variables_text.get("1.0", tk.END).strip()
current_list.append(var_item)
count += 1
if count > 0:
self.variables_text.see(tk.END)
logger.info(f"Added {count} global variables.")
def _browse_scope_variables(self) -> None:
bp_loc = self.breakpoint_var.get().strip()
if not bp_loc:
messagebox.showerror(
"Input Error", "Breakpoint Location must be specified.", parent=self
)
return
target_exe = (
self.target_executable_path
if isinstance(self.target_executable_path, str)
else ""
)
if not target_exe or not os.path.isfile(target_exe):
messagebox.showerror(
"Config Error", "Valid Target Executable required.", parent=self
)
return
if not self.app_settings:
messagebox.showerror(
"Config Error", "App settings unavailable.", parent=self
)
return
gdb_exe = self.app_settings.get_setting("general", "gdb_executable_path")
if not gdb_exe or not os.path.isfile(gdb_exe):
messagebox.showerror(
"Config Error", "GDB not configured/found.", parent=self
)
return
prog_args = self.program_parameters_for_scope
cache_key = (target_exe, bp_loc, prog_args)
if cache_key in self._scope_variables_cache:
logger.info(f"Using cached scope vars for: {cache_key}")
self._finalize_scope_variables_fetch(
self._scope_variables_cache[cache_key],
"Scope data from cache.",
False,
True,
)
return
logger.info(f"No cache for scope vars: {cache_key}. Fetching live.")
self._last_bp_for_scope_cache = bp_loc
self.gdb_inspector_instance = GDBInteractiveInspector(
gdb_exe, self.app_settings
)
self.scope_inspection_dialog = SymbolAnalysisProgressDialog(self)
self.scope_inspection_dialog.title("Inspecting Scope")
self.scope_inspection_dialog.set_status(f"Reaching BP '{bp_loc}'...")
self.scope_inspection_dialog.log_message(
"Starting GDB MI for scope inspection..."
)
self.attributes("-disabled", True)
threading.Thread(
target=self._fetch_scope_variables_thread,
args=(target_exe, bp_loc, prog_args, cache_key),
daemon=True,
).start()
def _fetch_scope_variables_thread(
self,
target_exe: str,
bp_loc: str,
prog_args: str,
cache_key: Tuple[str, str, str],
):
results: Dict[str, List[str]] = {"locals": [], "args": []}
error = False
final_msg = "Scope inspection complete."
fetched = False
def inspector_status_cb(msg: str):
if (
self.scope_inspection_dialog
and self.scope_inspection_dialog.winfo_exists()
):
self.after(0, self.scope_inspection_dialog.log_message, msg)
self.after(0, self.scope_inspection_dialog.set_status, msg)
if self.gdb_inspector_instance:
try:
results = self.gdb_inspector_instance.get_variables_in_scope(
target_exe, bp_loc, prog_args, inspector_status_cb
)
if results.get("locals") or results.get("args"):
fetched = True
final_msg = "Scope inspection successful."
elif (
self.scope_inspection_dialog
and self.scope_inspection_dialog.winfo_exists()
and not any(
ekw
in self.scope_inspection_dialog.status_label_var.get().lower()
for ekw in ["error", "fail", "timeout"]
)
):
final_msg = "No locals/args at BP, or not hit."
except Exception as e:
logger.error(
f"Exception in _fetch_scope_vars_thread: {e}", exc_info=True
)
final_msg = f"Error during scope inspection: {e}"
error = True
if self.winfo_exists():
self.after(
0,
self._finalize_scope_variables_fetch,
results,
final_msg,
error,
fetched,
)
def _finalize_scope_variables_fetch(
self,
scope_vars_data: Dict[str, List[str]],
status_msg: str,
error_occurred: bool,
data_was_fetched: bool,
):
if self.scope_inspection_dialog and self.scope_inspection_dialog.winfo_exists():
self.scope_inspection_dialog.analysis_complete_or_failed(
not error_occurred and data_was_fetched
)
self.scope_inspection_dialog.set_status(status_msg)
if self.winfo_exists():
self.attributes("-disabled", False)
self.focus_set()
if error_occurred:
messagebox.showerror("Scope Inspection Error", status_msg, parent=self)
return
all_scope_vars = scope_vars_data.get("args", []) + scope_vars_data.get(
"locals", []
)
if not all_scope_vars and data_was_fetched:
messagebox.showinfo(
"Scope Variables",
"No locals or args found at BP, or BP not hit.",
parent=self,
)
return
elif not data_was_fetched and not error_occurred:
if not status_msg.startswith("Scope data from cache."):
messagebox.showinfo("Scope Variables", status_msg, parent=self)
return
if all_scope_vars:
scope_var_names = sorted(list(set(all_scope_vars)))
dialog = SymbolListViewerDialog(
self,
scope_var_names,
title="Select Variables from Scope",
allow_multiple_selection=True,
return_full_dict=False,
)
self.wait_window(dialog)
selected_vars = dialog.result
if selected_vars and isinstance(selected_vars, list):
current_text = self.variables_text.get("1.0", tk.END).strip()
current_list = [
ln.strip() for ln in current_text.splitlines() if ln.strip()
]
count = 0
for var_name in selected_vars:
if isinstance(var_name, str) and var_name not in current_list:
prefix = (
"\n"
if current_text and not current_text.endswith("\n")
else ""
)
self.variables_text.insert(tk.END, f"{prefix}{var_name}\n")
current_text = self.variables_text.get("1.0", tk.END).strip()
current_list.append(var_name)
count += 1
if count > 0:
self.variables_text.see(tk.END)
logger.info(f"Added {count} scope variables.")
def _test_action_placeholder(self) -> None:
messagebox.showinfo(
"Not Implemented",
"The 'Test Action' feature is not yet implemented.",
parent=self,
)