SXXXXXXX_NetAnalyzer/netanalyzer/core/analyzer.py
2025-11-18 15:53:15 +01:00

80 lines
3.3 KiB
Python

import scapy.all as scapy
import pandas as pd
from typing import List, Dict, Any, Optional
class PacketAnalyzer:
"""
Main class for analyzing network packets from capture files.
Uses Scapy for reading and Pandas for statistical processing.
"""
def __init__(self):
self.packets: Optional[scapy.PacketList] = None
self.filepath: Optional[str] = None
def load_pcap(self, filepath: str) -> int:
"""
Loads packets from a .pcapng or .pcap file.
WARNING: This can be slow for very large files as it loads the entire file into memory.
Returns the total number of packets loaded.
"""
self.filepath = filepath
print(f"Loading packets from {filepath}...")
self.packets = scapy.rdpcap(filepath)
print(f"Loaded {len(self.packets)} packets.")
return len(self.packets)
def calculate_ab_processing_stats(self, port_a: int, port_b: int) -> Dict[str, Any]:
"""
Calculates the processing time between packets on port A and port B.
"""
if not self.packets:
raise ValueError("No packets loaded. Call load_pcap() first.")
# Extract timestamps for packets on port A and port B (destination ports)
times_a = [p.time for p in self.packets if p.haslayer(scapy.UDP) and p[scapy.UDP].dport == port_a]
times_b = [p.time for p in self.packets if p.haslayer(scapy.UDP) and p[scapy.UDP].dport == port_b]
if not times_a or not times_b:
return {"error": "Packets for one or both ports (A/B) were not found."}
min_len = min(len(times_a), len(times_b))
if min_len == 0:
return {"error": "No matching A/B packet pairs found."}
# Calculate the time deltas in milliseconds
processing_times_ms = (pd.Series(times_b[:min_len]) - pd.Series(times_a[:min_len])) * 1000
# Use pandas to get a comprehensive set of statistics
stats = processing_times_ms.describe(percentiles=[.25, .5, .75, .95, .99]).to_dict()
# Add raw data points for plotting
stats['data_points'] = processing_times_ms.tolist()
return stats
def calculate_inter_packet_gap_stats(self, port: int) -> Dict[str, Any]:
"""
Calculates the interval (and jitter) between consecutive packets on a single port.
"""
if not self.packets:
raise ValueError("No packets loaded.")
# Filter packets for the specified port (either source or destination)
times = [p.time for p in self.packets if p.haslayer(scapy.UDP) and (p[scapy.UDP].sport == port or p[scapy.UDP].dport == port)]
if len(times) < 2:
return {"error": f"Fewer than 2 packets found on port {port}. Cannot calculate interval."}
# Calculate the difference between each timestamp and the previous one
gaps_ms = pd.Series(times).diff().dropna() * 1000
# Calculate statistics
stats = gaps_ms.describe(percentiles=[.25, .5, .75, .95, .99]).to_dict()
# Jitter is often defined as the standard deviation of the packet delay variation
stats['jitter'] = stats.get('std', 0.0)
stats['data_points'] = gaps_ms.tolist()
return stats