SXXXXXXX_NetAnalyzer/netanalyzer/core/analyzer.py
2025-11-18 16:04:23 +01:00

70 lines
2.8 KiB
Python

# NetPulseAnalyzer/core/analyzer.py
import scapy.all as scapy
import pandas as pd
from typing import List, Dict, Any, Optional
class PacketAnalyzer:
"""
Main class for analyzing network packets from capture files.
Uses Scapy for reading and Pandas for statistical processing.
"""
def __init__(self):
self.packets: Optional[scapy.PacketList] = None
self.filepath: Optional[str] = None
def load_pcap(self, filepath: str) -> int:
"""
Loads packets from a .pcapng or .pcap file.
WARNING: This can be slow for very large files as it loads the entire file into memory.
Returns the total number of packets loaded.
"""
self.filepath = filepath
print(f"Loading packets from {filepath}...")
self.packets = scapy.rdpcap(filepath)
print(f"Loaded {len(self.packets)} packets.")
return len(self.packets)
def calculate_ab_processing_stats(self, port_a: int, port_b: int) -> Dict[str, Any]:
"""
Calculates the processing time between packets on port A and port B.
"""
if not self.packets:
raise ValueError("No packets loaded. Call load_pcap() first.")
times_a = [p.time for p in self.packets if p.haslayer(scapy.UDP) and p[scapy.UDP].dport == port_a]
times_b = [p.time for p in self.packets if p.haslayer(scapy.UDP) and p[scapy.UDP].dport == port_b]
if not times_a or not times_b:
return {"error": "Packets for one or both ports (A/B) were not found."}
min_len = min(len(times_a), len(times_b))
if min_len == 0:
return {"error": "No matching A/B packet pairs found."}
processing_times_ms = (pd.Series(times_b[:min_len]) - pd.Series(times_a[:min_len])) * 1000
stats = processing_times_ms.describe(percentiles=[.25, .5, .75, .95, .99]).to_dict()
stats['data_points'] = processing_times_ms.tolist()
return stats
def calculate_inter_packet_gap_stats(self, port: int) -> Dict[str, Any]:
"""
Calculates the interval (and jitter) between consecutive packets on a single port.
"""
if not self.packets:
raise ValueError("No packets loaded.")
times = [p.time for p in self.packets if p.haslayer(scapy.UDP) and (p[scapy.UDP].sport == port or p[scapy.UDP].dport == port)]
if len(times) < 2:
return {"error": f"Fewer than 2 packets found on port {port}. Cannot calculate interval."}
gaps_ms = pd.Series(times).diff().dropna() * 1000
stats = gaps_ms.describe(percentiles=[.25, .5, .75, .95, .99]).to_dict()
stats['jitter'] = stats.get('std', 0.0)
stats['data_points'] = gaps_ms.tolist()
return stats