Compare commits

..

2 Commits

Author SHA1 Message Date
VALLONGOL
7398ac7617 sistemato import per creazione eseguibile 2025-11-19 14:15:56 +01:00
VALLONGOL
d4e6f3487d ggiunto confronto jitter tra più porte 2025-11-18 16:04:23 +01:00
6 changed files with 214 additions and 94 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 0 B

After

Width:  |  Height:  |  Size: 82 KiB

View File

@ -1,39 +1,5 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['netanalyzer/__main__.py'],
pathex=['.'],
binaries=[],
# Usa project_icon_filename nella sezione datas
datas=[('NetAnalyzer.ico', '.')],
hiddenimports=[],
hookspath=[],
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False
)
a = Analysis(pathex=['netanalyzer', '.'], binaries=[], datas=[('NetAnalyzer.ico', '.')], hiddenimports=[], hookspath=[], runtime_hooks=[], excludes=[], win_no_prefer_redirects=False, win_private_assemblies=False, cipher=block_cipher, noarchive=False, scripts=['netanalyzer\\__main__.py'])
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='NetAnalyzer',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
# Usa project_icon_filename per l'opzione icon
icon='NetAnalyzer.ico'
)
exe = EXE(pyz, a.scripts, a.binaries, a.zipfiles, a.datas, [], name='NetAnalyzer', debug=False, bootloader_ignore_signals=False, strip=False, upx=True, upx_exclude=[], runtime_tmpdir=None, console=True, icon='NetAnalyzer.ico', exclude_binaries=True)
coll = COLLECT(exe, a.binaries, a.zipfiles, a.datas, strip=False, upx=True, upx_exclude=[], name='NetAnalyzer')

View File

@ -1,4 +1,4 @@
from .gui.main_view import MainView
from gui.main_view import MainView
def main():
"""

74
netanalyzer/_version.py Normal file
View File

@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
# File generated by PyInstaller GUI Wrapper. DO NOT EDIT MANUALLY.
# Contains build-time information scraped from Git (if available)
# and a helper function to format version strings.
import re
# --- Version Data (Generated) ---
__version__ = "v.0.0.0.2-0-gd4e6f34-dirty"
GIT_COMMIT_HASH = "d4e6f3487d230e5b2b1df98cf35f2e853bc2f38e"
GIT_BRANCH = "master"
BUILD_TIMESTAMP = "2025-11-19T13:05:49.512611+00:00"
IS_GIT_REPO = True
# --- Default Values (for comparison or fallback) ---
DEFAULT_VERSION = "0.0.0+unknown"
DEFAULT_COMMIT = "Unknown"
DEFAULT_BRANCH = "Unknown"
# --- Helper Function ---
def get_version_string(format_string=None):
"""
Returns a formatted string based on the build version information.
Args:
format_string (str, optional): A format string using placeholders.
Defaults to "{version} ({branch}/{commit_short})" if None.
Placeholders:
{{version}}: Full version string (e.g., 'v1.0.0-5-gabcdef-dirty')
{{tag}}: Clean tag part if exists (e.g., 'v1.0.0'), else DEFAULT_VERSION.
{{commit}}: Full Git commit hash.
{{commit_short}}: Short Git commit hash (7 chars).
{{branch}}: Git branch name.
{{dirty}}: '-dirty' if the repo was dirty, empty otherwise.
{{timestamp}}: Full build timestamp (ISO 8601 UTC).
{{timestamp_short}}: Build date only (YYYY-MM-DD).
{{is_git}}: 'Git' if IS_GIT_REPO is True, 'Unknown' otherwise.
Returns:
str: The formatted version string, or an error message if formatting fails.
"""
if format_string is None:
format_string = "{version} ({branch}/{commit_short})" # Default format
replacements = {}
try:
replacements['version'] = __version__ if __version__ else DEFAULT_VERSION
replacements['commit'] = GIT_COMMIT_HASH if GIT_COMMIT_HASH else DEFAULT_COMMIT
replacements['commit_short'] = GIT_COMMIT_HASH[:7] if GIT_COMMIT_HASH and len(GIT_COMMIT_HASH) >= 7 else DEFAULT_COMMIT
replacements['branch'] = GIT_BRANCH if GIT_BRANCH else DEFAULT_BRANCH
replacements['timestamp'] = BUILD_TIMESTAMP if BUILD_TIMESTAMP else "Unknown"
replacements['timestamp_short'] = BUILD_TIMESTAMP.split('T')[0] if BUILD_TIMESTAMP and 'T' in BUILD_TIMESTAMP else "Unknown"
replacements['is_git'] = "Git" if IS_GIT_REPO else "Unknown"
replacements['dirty'] = "-dirty" if __version__ and __version__.endswith('-dirty') else ""
tag = DEFAULT_VERSION
if __version__ and IS_GIT_REPO:
match = re.match(r'^(v?([0-9]+(?:\.[0-9]+)*))', __version__)
if match:
tag = match.group(1)
replacements['tag'] = tag
output_string = format_string
for placeholder, value in replacements.items():
pattern = re.compile(r'{{\s*' + re.escape(placeholder) + r'\s*}}')
output_string = pattern.sub(str(value), output_string)
if re.search(r'{\s*\w+\s*}', output_string):
pass # Or log a warning: print(f"Warning: Unreplaced placeholders found: {output_string}")
return output_string
except Exception as e:
return f"[Formatting Error: {e}]"

View File

@ -1,3 +1,5 @@
# NetPulseAnalyzer/core/analyzer.py
import scapy.all as scapy
import pandas as pd
from typing import List, Dict, Any, Optional
@ -31,7 +33,6 @@ class PacketAnalyzer:
if not self.packets:
raise ValueError("No packets loaded. Call load_pcap() first.")
# Extract timestamps for packets on port A and port B (destination ports)
times_a = [p.time for p in self.packets if p.haslayer(scapy.UDP) and p[scapy.UDP].dport == port_a]
times_b = [p.time for p in self.packets if p.haslayer(scapy.UDP) and p[scapy.UDP].dport == port_b]
@ -42,13 +43,8 @@ class PacketAnalyzer:
if min_len == 0:
return {"error": "No matching A/B packet pairs found."}
# Calculate the time deltas in milliseconds
processing_times_ms = (pd.Series(times_b[:min_len]) - pd.Series(times_a[:min_len])) * 1000
# Use pandas to get a comprehensive set of statistics
stats = processing_times_ms.describe(percentiles=[.25, .5, .75, .95, .99]).to_dict()
# Add raw data points for plotting
stats['data_points'] = processing_times_ms.tolist()
return stats
@ -60,21 +56,15 @@ class PacketAnalyzer:
if not self.packets:
raise ValueError("No packets loaded.")
# Filter packets for the specified port (either source or destination)
times = [p.time for p in self.packets if p.haslayer(scapy.UDP) and (p[scapy.UDP].sport == port or p[scapy.UDP].dport == port)]
if len(times) < 2:
return {"error": f"Fewer than 2 packets found on port {port}. Cannot calculate interval."}
# Calculate the difference between each timestamp and the previous one
gaps_ms = pd.Series(times).diff().dropna() * 1000
# Calculate statistics
stats = gaps_ms.describe(percentiles=[.25, .5, .75, .95, .99]).to_dict()
# Jitter is often defined as the standard deviation of the packet delay variation
stats['jitter'] = stats.get('std', 0.0)
stats['data_points'] = gaps_ms.tolist()
return stats

View File

@ -1,14 +1,11 @@
# NetPulseAnalyzer/gui/main_view.py
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import threading
import os
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2Tk # NUOVO IMPORT
import pandas as pd
from typing import Dict, Any
from typing import List, Dict, Any
from ..core.analyzer import PacketAnalyzer
@ -29,7 +26,6 @@ class MainView(tk.Tk):
main_frame = ttk.Frame(self, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# --- Top Controls Section ---
controls_frame = ttk.LabelFrame(main_frame, text="File Controls", padding=10)
controls_frame.pack(fill=tk.X, pady=(0, 10))
@ -38,18 +34,15 @@ class MainView(tk.Tk):
self.load_button = ttk.Button(controls_frame, text="Open .pcapng File...", command=self._open_file)
self.load_button.pack(side=tk.RIGHT, padx=5)
# --- Notebook for different analysis types ---
self.results_notebook = ttk.Notebook(main_frame)
self.results_notebook.pack(fill=tk.BOTH, expand=True)
# Create the two tabs
self.tab_processing_time = ttk.Frame(self.results_notebook)
self.tab_jitter = ttk.Frame(self.results_notebook)
self.results_notebook.add(self.tab_processing_time, text=" Processing Time (A -> B) ")
self.results_notebook.add(self.tab_jitter, text=" Interval / Jitter Analysis ")
# Populate each tab with its specific widgets
self._create_processing_time_tab(self.tab_processing_time)
self._create_jitter_tab(self.tab_jitter)
@ -86,29 +79,55 @@ class MainView(tk.Tk):
self.proc_fig = plt.Figure(figsize=(5, 4), dpi=100)
self.proc_ax = self.proc_fig.add_subplot(111)
self.proc_canvas = FigureCanvasTkAgg(self.proc_fig, master=plot_frame)
# --- NUOVA PARTE: Toolbar per il grafico ---
toolbar = NavigationToolbar2Tk(self.proc_canvas, plot_frame)
toolbar.update()
# --- FINE NUOVA PARTE ---
self.proc_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def _create_jitter_tab(self, parent_tab):
controls = ttk.LabelFrame(parent_tab, text="Analysis Parameters", padding=10)
controls.pack(fill=tk.X, padx=10, pady=10)
ttk.Label(controls, text="Port to Analyze:").grid(row=0, column=0, padx=5, sticky='w')
self.jitter_port_var = tk.StringVar(value="60012") # Default to server command port
ttk.Entry(controls, textvariable=self.jitter_port_var, width=10).grid(row=0, column=1)
ports_frame = ttk.Frame(controls)
ports_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
ttk.Label(ports_frame, text="Ports to Analyze (use Ctrl/Shift to multi-select):").pack(anchor='w')
self.jitter_ports_listbox = tk.Listbox(ports_frame, selectmode=tk.EXTENDED, height=4)
for port in ["60012", "55001", "55002"]:
self.jitter_ports_listbox.insert(tk.END, port)
self.jitter_ports_listbox.pack(side=tk.LEFT, fill=tk.X, expand=True, pady=5)
self.jitter_ports_listbox.selection_set(0)
self.jitter_ports_listbox.selection_set(2)
self.jitter_ports_listbox.selection_set(3)
add_port_frame = ttk.Frame(ports_frame)
add_port_frame.pack(side=tk.LEFT, padx=10, fill='y')
self.jitter_custom_port_var = tk.StringVar()
custom_port_entry = ttk.Entry(add_port_frame, textvariable=self.jitter_custom_port_var, width=10)
custom_port_entry.pack()
ttk.Button(add_port_frame, text="Add Port", command=self._add_jitter_port).pack(pady=5)
self.jitter_analyze_button = ttk.Button(controls, text="Analyze Jitter", command=self._run_jitter_analysis, state=tk.DISABLED)
self.jitter_analyze_button.grid(row=0, column=2, padx=20)
self.jitter_analyze_button.pack(side=tk.LEFT, padx=20, anchor='center')
pane = ttk.PanedWindow(parent_tab, orient=tk.HORIZONTAL)
pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10))
stats_frame = ttk.LabelFrame(pane, text="Interval Statistics (ms)", padding=10)
pane.add(stats_frame, weight=1)
pane.add(stats_frame, weight=2)
self.jitter_stats_tree = ttk.Treeview(stats_frame, columns=("Metric", "Value"), show="headings")
self.jitter_stats_tree = ttk.Treeview(stats_frame, columns=("Port", "Metric", "Value"), show="headings")
self.jitter_stats_tree.heading("Port", text="Port")
self.jitter_stats_tree.heading("Metric", text="Metric")
self.jitter_stats_tree.heading("Value", text="Value (ms)")
self.jitter_stats_tree.column("Port", width=80, anchor='center')
self.jitter_stats_tree.column("Metric", width=120)
self.jitter_stats_tree.column("Value", width=100, anchor='e')
self.jitter_stats_tree.pack(fill=tk.BOTH, expand=True)
@ -119,8 +138,31 @@ class MainView(tk.Tk):
self.jitter_fig = plt.Figure(figsize=(5, 4), dpi=100)
self.jitter_ax = self.jitter_fig.add_subplot(111)
self.jitter_canvas = FigureCanvasTkAgg(self.jitter_fig, master=plot_frame)
# --- NUOVA PARTE: Toolbar per il grafico jitter ---
toolbar = NavigationToolbar2Tk(self.jitter_canvas, plot_frame)
toolbar.update()
# --- FINE NUOVA PARTE ---
self.jitter_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def _add_jitter_port(self):
port_str = self.jitter_custom_port_var.get().strip()
if not port_str:
return
try:
port_num = int(port_str)
if not (0 < port_num < 65536):
raise ValueError("Port out of range")
if port_str not in self.jitter_ports_listbox.get(0, tk.END):
self.jitter_ports_listbox.insert(tk.END, port_str)
self.jitter_custom_port_var.set("")
except ValueError:
messagebox.showerror("Invalid Port", "Please enter a valid port number (1-65535).")
# --- Le funzioni di caricamento file rimangono invariate ---
def _open_file(self):
filepath = filedialog.askopenfilename(
title="Select a capture file",
@ -142,7 +184,6 @@ class MainView(tk.Tk):
).start()
def _create_progress_dialog(self, filepath: str) -> tk.Toplevel:
"""Creates a modal dialog with an indeterminate progress bar."""
dialog = tk.Toplevel(self)
dialog.title("Loading...")
dialog.geometry("400x150")
@ -159,7 +200,6 @@ class MainView(tk.Tk):
ttk.Label(dialog, text="Loading capture file:", padding=10).pack()
ttk.Label(dialog, text=f"{filename} {size_str}", font=('Helvetica', 10, 'bold')).pack()
ttk.Label(dialog, text="This may take a moment...", padding=5).pack()
progress_bar = ttk.Progressbar(dialog, mode='indeterminate', length=350)
@ -174,9 +214,6 @@ class MainView(tk.Tk):
return dialog
def _load_file_thread(self, filepath: str, progress_dialog: tk.Toplevel):
"""
Background thread function to load the file. Closes the dialog when done.
"""
try:
total_packets = self.analyzer.load_pcap(filepath)
self.after(0, self._on_load_complete, filepath, total_packets)
@ -188,7 +225,6 @@ class MainView(tk.Tk):
self.after(0, self.load_button.config, {"state": tk.NORMAL})
def _on_load_complete(self, filepath, total_packets):
"""GUI thread callback for when file loading is complete."""
self.filepath_label.config(text=f"File: {os.path.basename(filepath)} ({total_packets} packets)")
self.proc_analyze_button.config(state=tk.NORMAL)
self.jitter_analyze_button.config(state=tk.NORMAL)
@ -201,35 +237,45 @@ class MainView(tk.Tk):
if "error" in stats:
messagebox.showerror("Analysis Error", stats["error"])
return
self._update_stats_table(self.proc_stats_tree, stats)
self._update_plot(self.proc_ax, self.proc_fig, self.proc_canvas, stats['data_points'], "Processing Time", "Time (ms)")
self._update_single_port_stats_table(self.proc_stats_tree, stats)
self._update_single_series_plot(self.proc_ax, self.proc_fig, self.proc_canvas, stats['data_points'], "Processing Time", "Time (ms)")
except Exception as e:
messagebox.showerror("Error", f"Analysis failed: {e}")
def _run_jitter_analysis(self):
try:
port = int(self.jitter_port_var.get())
stats = self.analyzer.calculate_inter_packet_gap_stats(port)
if "error" in stats:
messagebox.showerror("Analysis Error", stats["error"])
return
selected_indices = self.jitter_ports_listbox.curselection()
if not selected_indices:
messagebox.showwarning("No Selection", "Please select one or more ports to analyze.")
return
# CORREZIONE: Usiamo la chiave 'std' per 'Jitter'
stats['Jitter'] = stats.get('std', 0.0)
selected_ports = [self.jitter_ports_listbox.get(i) for i in selected_indices]
self._update_stats_table(self.jitter_stats_tree, stats)
self._update_plot(self.jitter_ax, self.jitter_fig, self.jitter_canvas, stats['data_points'], f"Packet Interval (Port {port})", "Interval (ms)")
except Exception as e:
messagebox.showerror("Error", f"Analysis failed: {e}")
all_stats: Dict[int, Dict[str, Any]] = {}
all_data_points: Dict[int, List[float]] = {}
def _update_stats_table(self, tree: ttk.Treeview, stats: Dict[str, Any]):
for port_str in selected_ports:
try:
port = int(port_str)
stats = self.analyzer.calculate_inter_packet_gap_stats(port)
if "error" not in stats:
all_stats[port] = stats
all_data_points[port] = stats['data_points']
else:
messagebox.showwarning("Analysis Warning", f"Could not analyze port {port}:\n{stats['error']}")
except Exception as e:
messagebox.showerror("Error", f"Analysis for port {port} failed: {e}")
if all_stats:
self._update_multi_port_stats_table(all_stats)
self._update_multi_series_plot(all_data_points)
def _update_single_port_stats_table(self, tree: ttk.Treeview, stats: Dict[str, Any]):
for item in tree.get_children():
tree.delete(item)
# CORREZIONE: Usiamo 'std' ma lo etichettiamo come Jitter per chiarezza
metric_map = {
'count': 'Sample Count', 'mean': 'Mean', 'std': 'Std Dev (Jitter)',
'min': 'Min', '25%': '25th Percentile', '50%': 'Median (50%)',
'count': 'Sample Count', 'mean': 'Mean', 'std': 'Std Dev',
'min': 'Min', '25%': '25th Percentile', '50%': 'Median',
'75%': '75th Percentile', '95%': '95th Percentile', '99%': '99th Percentile',
'max': 'Max'
}
@ -242,14 +288,35 @@ class MainView(tk.Tk):
else:
tree.insert('', 'end', values=(name, f"{value:.4f}"))
def _update_plot(self, ax, fig, canvas, data_points, title, ylabel):
# --- NUOVA FUNZIONE per la tabella multi-porta ---
def _update_multi_port_stats_table(self, all_stats: Dict[int, Dict[str, Any]]):
tree = self.jitter_stats_tree
for item in tree.get_children():
tree.delete(item)
metric_map = {'count': 'Samples', 'mean': 'Mean', 'std': 'Std Dev (Jitter)', 'min': 'Min', 'max': 'Max', '50%': 'Median'}
for port, stats in sorted(all_stats.items()):
# Add a header row for the port
tree.insert('', 'end', values=(port, '', ''), tags=('header',))
for key, name in metric_map.items():
if key in stats:
value = stats[key]
if key == 'count':
tree.insert('', 'end', values=('', name, f"{int(value)}"))
else:
tree.insert('', 'end', values=('', name, f"{value:.4f}"))
tree.tag_configure('header', font=('Helvetica', 10, 'bold'), background='#eee')
def _update_single_series_plot(self, ax, fig, canvas, data_points, title, ylabel):
ax.clear()
ax.plot(data_points, marker='.', linestyle='-', markersize=4)
if data_points:
s = pd.Series(data_points)
mean_val = s.mean()
p99_val = s.quantile(0.99)
mean_val, p99_val = s.mean(), s.quantile(0.99)
ax.axhline(y=mean_val, color='g', linestyle='--', label=f'Mean ({mean_val:.2f} ms)')
ax.axhline(y=p99_val, color='r', linestyle='--', label=f'99th Percentile ({p99_val:.2f} ms)')
@ -260,3 +327,26 @@ class MainView(tk.Tk):
ax.legend()
fig.tight_layout()
canvas.draw()
# --- NUOVA FUNZIONE per il grafico multi-serie ---
def _update_multi_series_plot(self, all_data_points: Dict[int, List[float]]):
ax, fig, canvas = self.jitter_ax, self.jitter_fig, self.jitter_canvas
ax.clear()
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
for i, (port, data_points) in enumerate(sorted(all_data_points.items())):
if data_points:
color = colors[i % len(colors)]
ax.plot(data_points, marker='.', linestyle='-', markersize=3, label=f'Port {port}', color=color, alpha=0.8)
ax.set_title("Packet Interval Comparison")
ax.set_xlabel("Packet Sequence")
ax.set_ylabel("Interval (ms)")
ax.grid(True, which='both', linestyle='--', linewidth=0.5)
if all_data_points:
ax.legend()
fig.tight_layout()
canvas.draw()