SXXXXXXX_CodeBridge/codebridge/gui/main_window.py
2025-12-23 15:11:00 +01:00

573 lines
24 KiB
Python

import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import tkinter.font as tkfont
from tkinter.scrolledtext import ScrolledText
import os
import logging
import threading
# Externals (made importable by __main__.py adding their folders to sys.path)
try:
from tkinter_logger import TkinterLogger
except Exception:
TkinterLogger = None # type: ignore
try:
from resource_monitor import TkinterResourceMonitor, is_psutil_available
except Exception:
TkinterResourceMonitor = None # type: ignore
is_psutil_available = lambda: False
from codebridge.core.comparer import CodeComparer
from codebridge.core.sync_manager import SyncManager
from codebridge.gui.diff_viewer import DiffViewer
from codebridge.gui.commit_dialog import CommitDialog
from codebridge.gui.profile_dialog import ProfileManagerDialog, ProfileManagerFrame
import json
# --- Import Version Info FOR THE WRAPPER ITSELF ---
try:
# Use absolute import based on package name
from codebridge import _version as wrapper_version
WRAPPER_APP_VERSION_STRING = f"{wrapper_version.__version__} ({wrapper_version.GIT_BRANCH}/{wrapper_version.GIT_COMMIT_HASH[:7]})"
WRAPPER_BUILD_INFO = f"Wrapper Built: {wrapper_version.BUILD_TIMESTAMP}"
except ImportError:
# This might happen if you run the wrapper directly from source
# without generating its _version.py first (if you use that approach for the wrapper itself)
WRAPPER_APP_VERSION_STRING = "(Dev Wrapper)"
WRAPPER_BUILD_INFO = "Wrapper build time unknown"
# --- End Import Version Info ---
# --- Constants for Version Generation ---
DEFAULT_VERSION = "0.0.0+unknown"
DEFAULT_COMMIT = "Unknown"
DEFAULT_BRANCH = "Unknown"
# --- End Constants --
class MainWindow:
"""
Main GUI class for CodeBridge application.
"""
def __init__(self, root: tk.Tk):
self.root = root
self.root.title(f"CodeBridge - Codebase Synchronizer - {WRAPPER_APP_VERSION_STRING}")
# Make the window taller by default so status/log area is visible
self.root.geometry("1000x760")
# Enforce a reasonable minimum size so status bar remains visible
try:
self.root.minsize(900, 600)
except Exception:
pass
self.comparer = None
self.sync_manager = SyncManager()
# Initialize optional externals
self.logger_sys = None
if TkinterLogger is not None:
try:
self.logger_sys = TkinterLogger(self.root)
# enable console by default; tkinter handler will be added when log window opens
self.logger_sys.setup(enable_console=True, enable_tkinter=True)
logging.getLogger(__name__).info("TkinterLogger initialized")
except Exception:
self.logger_sys = None
# Resource monitor (separate var) and progress status var
self._resource_monitor = None
self.resource_var = tk.StringVar()
self.progress_status_var = tk.StringVar()
if TkinterResourceMonitor is not None and is_psutil_available():
try:
self._resource_monitor = TkinterResourceMonitor(self.root, self.resource_var, poll_interval=1.0)
self._resource_monitor.start()
except Exception:
self._resource_monitor = None
else:
# psutil not available -> show note on resource area
self.resource_var.set("Resource monitor: psutil not installed")
# Profiles: load/save path in program root
try:
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
except Exception:
repo_root = os.getcwd()
self._profiles_path = os.path.join(repo_root, "profiles.json")
self.profiles = self._load_profiles()
self.current_ignore_exts = []
# Setup UI and close handler
self._setup_ui()
# Attach logger handler to persistent log widget if available
try:
if self.logger_sys and hasattr(self, "log_text"):
try:
self.logger_sys.add_tkinter_handler(self.log_text)
except Exception:
logging.getLogger(__name__).exception("Failed to attach tkinter log handler to main UI")
except Exception:
pass
# Populate profiles combobox if any
try:
self._populate_profiles()
except Exception:
pass
self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
def _setup_ui(self) -> None:
"""
Initializes the user interface layout.
"""
# Path selection frame
path_frame = ttk.LabelFrame(self.root, text="Path Selection", padding=10)
path_frame.pack(fill="x", padx=10, pady=5)
# Profiles combobox + Manage button
prof_frame = ttk.Frame(path_frame)
prof_frame.grid(row=0, column=0, columnspan=3, sticky="ew", pady=(0,6))
ttk.Label(prof_frame, text="Profile:").pack(side="left")
self.profile_var = tk.StringVar()
self.profile_cb = ttk.Combobox(prof_frame, textvariable=self.profile_var, state="readonly", width=40)
self.profile_cb.pack(side="left", padx=6)
self.profile_cb.bind("<<ComboboxSelected>>", lambda e: self._on_profile_change())
ttk.Button(prof_frame, text="⚙️ Manage Profiles...", command=self._open_manage_profiles).pack(side="left")
self.src_path_var = tk.StringVar()
self.dst_path_var = tk.StringVar()
self._create_path_row(path_frame, "Source (New):", self.src_path_var, 0)
self._create_path_row(path_frame, "Destination (Old):", self.dst_path_var, 1)
# Action buttons inside a labelled frame and filters to the right
actions_row = ttk.Frame(self.root, padding=5)
actions_row.pack(fill="x", padx=10)
actions_frame = ttk.LabelFrame(actions_row, text="Actions", padding=6)
actions_frame.pack(side="left", fill="x", expand=True)
# create a prominent style for the main Compare button
style = ttk.Style(self.root)
try:
big_font = tkfont.Font(self.root, family="TkDefaultFont", size=11, weight="bold")
style.configure('Big.TButton', font=big_font, padding=(10, 6))
except Exception:
style.configure('Big.TButton', padding=(8, 4))
self.compare_btn = ttk.Button(actions_frame, text="🔍 Compare Folders", command=self._run_comparison, style='Big.TButton')
self.compare_btn.pack(side="left", padx=8)
self.export_btn = ttk.Button(actions_frame, text="📦 Export Changes (ZIP)", command=self._export_changes)
self.export_btn.pack(side="left", padx=5)
self.import_btn = ttk.Button(actions_frame, text="📥 Import Package (ZIP)", command=self._import_package)
self.import_btn.pack(side="left", padx=5)
# Filter panel on the right: show counts and allow toggling
filter_frame = ttk.Frame(actions_row, padding=(6,0))
filter_frame.pack(side="right")
self.filter_vars = {
"added": tk.BooleanVar(value=False),
"modified": tk.BooleanVar(value=False),
"deleted": tk.BooleanVar(value=False),
}
# buttons that show counts; initial counts are 0
self.filter_buttons = {}
self.filter_buttons["added"] = ttk.Button(filter_frame, text="Added (0)", command=lambda: self._toggle_filter("added"))
self.filter_buttons["added"].pack(side="left", padx=4)
self.filter_buttons["modified"] = ttk.Button(filter_frame, text="Modified (0)", command=lambda: self._toggle_filter("modified"))
self.filter_buttons["modified"].pack(side="left", padx=4)
self.filter_buttons["deleted"] = ttk.Button(filter_frame, text="Deleted (0)", command=lambda: self._toggle_filter("deleted"))
self.filter_buttons["deleted"].pack(side="left", padx=4)
# store last results for filtering
self.last_added = []
self.last_modified = []
self.last_deleted = []
# Results treeview
self._setup_treeview()
def _create_path_row(self, parent: ttk.Frame, label: str, var: tk.StringVar, row: int) -> None:
"""
Creates a row with a label, entry and browse button for paths.
"""
ttk.Label(parent, text=label).grid(row=row+1, column=0, sticky="w", padx=5)
entry = ttk.Entry(parent, textvariable=var, width=80)
entry.grid(row=row+1, column=1, padx=5, pady=2)
btn = ttk.Button(parent, text="📁 Browse", command=lambda: self._browse_folder(var))
btn.grid(row=row+1, column=2, padx=5)
# store references so they can be made readonly when a profile is selected
if label.lower().startswith("source"):
self.src_entry = entry
self.src_browse = btn
else:
self.dst_entry = entry
self.dst_browse = btn
def _browse_folder(self, var: tk.StringVar) -> None:
"""
Opens a directory selection dialog.
"""
directory = filedialog.askdirectory()
if directory:
var.set(directory)
def _setup_treeview(self) -> None:
"""
Configures the treeview to display file differences.
"""
tree_frame = ttk.Frame(self.root, padding=10)
tree_frame.pack(fill="both", expand=True)
# Progress bar for long-running operations
self.progress_var = tk.IntVar(value=0)
self.progress = ttk.Progressbar(tree_frame, variable=self.progress_var, maximum=100, mode="determinate")
self.progress.pack(fill="x", pady=(0,6))
columns = ("status", "path")
self.tree = ttk.Treeview(tree_frame, columns=columns, show="headings")
self.tree.heading("status", text="Status")
self.tree.heading("path", text="File Path")
self.tree.column("status", width=100, stretch=False)
self.tree.tag_configure("added", foreground="green")
self.tree.tag_configure("modified", foreground="orange")
self.tree.tag_configure("deleted", foreground="red")
self.tree.bind("<Double-1>", self._on_item_double_click)
scrollbar = ttk.Scrollbar(tree_frame, orient="vertical", command=self.tree.yview)
self.tree.configure(yscroll=scrollbar.set)
self.tree.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
# Persistent log box below the treeview
log_frame = ttk.Frame(self.root, padding=(5, 2))
log_frame.pack(fill="x")
# Slightly reduce log height to keep status bar visible on smaller screens
self.log_text = ScrolledText(log_frame, height=6, state=tk.DISABLED)
self.log_text.pack(fill="both", expand=True)
# Status bar at bottom: left = progress/status, right = resource monitor
status_frame = ttk.Frame(self.root, padding=(5, 2))
status_frame.pack(fill="x", side="bottom")
ttk.Label(status_frame, textvariable=self.progress_status_var).pack(side="left")
ttk.Label(status_frame, textvariable=self.resource_var).pack(side="right")
def _log(self, message: str) -> None:
"""Write message to both logging and the embedded ScrolledText log."""
try:
import logging
logging.getLogger(__name__).info(message)
except Exception:
pass
def _update_progress(self, value: int, maximum: int, phase: str, count_display: str = None) -> None:
try:
self.progress.config(maximum=maximum)
self.progress_var.set(value)
if count_display:
self.progress_status_var.set(f"{phase}: {count_display}")
else:
self.progress_status_var.set(f"{phase}: {value}/{maximum}")
except Exception:
pass
def _toggle_filter(self, status: str) -> None:
"""Toggle a filter button and refresh the treeview."""
if status not in self.filter_vars:
return
cur = self.filter_vars[status].get()
self.filter_vars[status].set(not cur)
# update visual style of button to indicate active/inactive
try:
btn = self.filter_buttons.get(status)
if btn:
if self.filter_vars[status].get():
btn.state(["pressed"])
else:
btn.state(["!pressed"])
except Exception:
pass
self._refresh_tree()
def _refresh_tree(self) -> None:
"""Repopulates the treeview according to current filter selections."""
try:
for item in self.tree.get_children():
self.tree.delete(item)
# gather active filters
active = {k for k, v in self.filter_vars.items() if v.get()}
# if no filter active, show all
show_all = len(active) == 0
if show_all or "added" in active:
for f in self.last_added:
self.tree.insert("", "end", values=("Added", f), tags=("added",))
if show_all or "modified" in active:
for f in self.last_modified:
self.tree.insert("", "end", values=("Modified", f), tags=("modified",))
if show_all or "deleted" in active:
for f in self.last_deleted:
self.tree.insert("", "end", values=("Deleted", f), tags=("deleted",))
except Exception as e:
self._log(f"Error refreshing tree: {e}")
def _update_filter_counts(self) -> None:
"""Update the text of filter buttons to reflect current counts."""
try:
self.filter_buttons["added"].config(text=f"Added ({len(self.last_added)})")
self.filter_buttons["modified"].config(text=f"Modified ({len(self.last_modified)})")
self.filter_buttons["deleted"].config(text=f"Deleted ({len(self.last_deleted)})")
except Exception:
pass
try:
self.log_text.config(state=tk.NORMAL)
self.log_text.insert("end", message + "\n")
self.log_text.see("end")
self.log_text.config(state=tk.DISABLED)
except Exception:
pass
# --- Profiles persistence ---
def _load_profiles(self):
try:
if os.path.exists(self._profiles_path):
with open(self._profiles_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def _save_profiles(self):
try:
with open(self._profiles_path, "w", encoding="utf-8") as f:
json.dump(self.profiles, f, indent=2)
except Exception:
pass
def _populate_profiles(self):
names = ["(None)"] + sorted(self.profiles.keys())
self.profile_cb["values"] = names
self.profile_cb.set(names[0])
def _open_manage_profiles(self):
self._log("Opening Manage Profiles dialog")
dlg = ProfileManagerDialog(self.root, self._profiles_path, self.profiles)
self.root.wait_window(dlg)
# reload and refresh
self.profiles = self._load_profiles()
self._populate_profiles()
self._log("Profiles reloaded")
def _close_profile_panel(self):
try:
if hasattr(self, "profile_panel") and self.profile_panel and self.profile_panel.winfo_exists():
self.profile_panel.destroy()
except Exception:
pass
# reload and refresh
self.profiles = self._load_profiles()
self._populate_profiles()
def _on_profile_change(self):
sel = self.profile_var.get()
if not sel or sel == "(None)":
# enable editing of entries
try:
self.src_entry.config(state="normal")
self.dst_entry.config(state="normal")
self.src_browse.config(state="normal")
self.dst_browse.config(state="normal")
except Exception:
pass
self.current_ignore_exts = []
self._log("Profile selection cleared")
return
profile = self.profiles.get(sel)
if not profile:
return
# set entries and make readonly
self.src_path_var.set(profile.get("source", ""))
self.dst_path_var.set(profile.get("destination", ""))
# load ignore extensions for comparisons
ign = profile.get("ignore_extensions", [])
if isinstance(ign, (list, tuple)):
self.current_ignore_exts = ign
elif isinstance(ign, str) and ign:
# comma-separated string
self.current_ignore_exts = [e if e.startswith('.') else '.' + e for e in (x.strip() for x in ign.split(',')) if e]
else:
self.current_ignore_exts = []
try:
self.src_entry.config(state="readonly")
self.dst_entry.config(state="readonly")
self.src_browse.config(state="disabled")
self.dst_browse.config(state="disabled")
except Exception:
pass
self._log(f"Selected profile: {sel}")
def _run_comparison(self) -> None:
"""
Executes comparison and populates the treeview.
"""
src = self.src_path_var.get()
dst = self.dst_path_var.get()
if not src or not dst:
messagebox.showwarning("Warning", "Please select both source and destination folders.")
return
self._log(f"Starting comparison: src={src} dst={dst}")
self.comparer = CodeComparer(src, dst, ignore_extensions=self.current_ignore_exts)
def _disable_ui():
try:
self.compare_btn.config(state="disabled")
self.export_btn.config(state="disabled")
self.import_btn.config(state="disabled")
except Exception:
pass
def _enable_ui():
try:
self.compare_btn.config(state="normal")
self.export_btn.config(state="normal")
self.import_btn.config(state="normal")
except Exception:
pass
def worker():
try:
# disable UI
self.root.after(0, _disable_ui)
# phase 1: quick count
try:
total = self.comparer.count_files(src)
except Exception:
total = 0
if total <= 0:
total = 1
progress_max = total * 2
# initialize progress
self.root.after(0, lambda: self._update_progress(0, progress_max, "Counting", f"0/{total}"))
scanned = 0
hashed = 0
def progress_handler(*cb_args):
nonlocal scanned, hashed
# get_relative_files calls (root_path, full_path, relative_path)
if len(cb_args) == 3:
root_path, full_path, relative_path = cb_args
if root_path == src:
scanned += 1
# update UI
self.root.after(0, lambda s=scanned: self._update_progress(s, progress_max, "Scanning", f"{s}/{total}"))
else:
# hashing phase: comparer sends a tuple like ("hash", relative_path)
arg = cb_args[0] if cb_args else None
if isinstance(arg, tuple) and arg and arg[0] == "hash":
hashed += 1
v = scanned + hashed
self.root.after(0, lambda v=v: self._update_progress(v, progress_max, "Hashing", f"{hashed}/{total}"))
# perform compare with progress handler
added, modified, deleted = self.comparer.compare(progress_callback=progress_handler)
# ensure progress complete
self.root.after(0, lambda: self._update_progress(progress_max, progress_max, "Done", f"{total}/{total}"))
# populate tree in main thread
def finish_ui():
# store results and refresh according to filters
self.last_added = added
self.last_modified = modified
self.last_deleted = deleted
self._update_filter_counts()
self._refresh_tree()
self._log(f"Comparison complete. Added={len(added)} Modified={len(modified)} Deleted={len(deleted)}")
self.progress_status_var.set("")
self.progress_var.set(0)
_enable_ui()
self.root.after(0, finish_ui)
except Exception as e:
# ensure UI enabled on error
try:
self.root.after(0, _enable_ui)
except Exception:
pass
self._log(f"Error during comparison: {e}")
threading.Thread(target=worker, daemon=True).start()
# Background comparison started; UI will be updated when finished.
self._log("Background comparison started")
def _on_item_double_click(self, event) -> None:
"""
Opens the DiffViewer when a file is double-clicked.
"""
selection = self.tree.selection()
if not selection:
return
item_id = selection[0]
item = self.tree.item(item_id)
status, relative_path = item["values"]
if status == "Deleted":
messagebox.showinfo("Info", "File was deleted. Nothing to compare.")
return
source_full_path = os.path.join(self.src_path_var.get(), relative_path)
dest_full_path = os.path.join(self.dst_path_var.get(), relative_path)
DiffViewer(self.root, source_full_path, dest_full_path, relative_path)
def _export_changes(self) -> None:
"""
Prompts for a commit message and exports changes to a ZIP.
"""
if not self.comparer:
messagebox.showwarning("Warning", "Please run comparison first.")
return
dialog = CommitDialog(self.root)
commit_msg = dialog.get_message()
if not commit_msg:
messagebox.showwarning("Cancelled", "Export cancelled: Commit message is required.")
return
output_zip = filedialog.asksaveasfilename(defaultextension=".zip", filetypes=[("ZIP files", "*.zip")])
if output_zip:
changes = {
"added": self.comparer.added,
"modified": self.comparer.modified,
"deleted": self.comparer.deleted
}
self.sync_manager.create_export_package(self.src_path_var.get(), changes, output_zip, commit_msg)
messagebox.showinfo("Success", f"Package created: {output_zip}")
def _import_package(self) -> None:
"""
Selects a ZIP package and applies it to the destination folder.
"""
dst = self.dst_path_var.get()
if not dst:
messagebox.showwarning("Warning", "Please select a destination folder first.")
return
package_path = filedialog.askopenfilename(filetypes=[("ZIP files", "*.zip")])
if not package_path:
return
confirm = messagebox.askyesno("Confirm Import", "This will overwrite/delete files in the destination. Continue?")
if confirm:
commit_msg = self.sync_manager.apply_package(package_path, dst)
messagebox.showinfo("Import Complete", f"Package applied successfully.\n\nCommit Message:\n{commit_msg}")
self._run_comparison()
def _on_closing(self) -> None:
"""Clean shutdown: stop resource monitor and logging, then exit."""
try:
if self._resource_monitor:
try:
self._resource_monitor.stop()
except Exception:
pass
except Exception:
pass
try:
if self.logger_sys:
try:
self.logger_sys.shutdown()
except Exception:
pass
except Exception:
pass
try:
self.root.destroy()
except Exception:
# fallback
os._exit(0)