I am trying to create a python program that continuously capture packets using ring buffer option and .pcap file is then analysed by tshark which output packets in pdml format. but after few seconds dumpcap hangs. I have not able to find the root cause, I hope someone can help me out.
import subprocess
import os
import logging
from typing import Generator
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ProcessError(Exception):
"""Custom exception for process-related errors"""
pass
class PacketCaptureLib:
def __init__(
self,
interface: str,
output_file: str,
filter_expr: str = "smb2",
num_files: int = 10,
filesize_kb: int = 1024,
duration_sec: int = 1
):
self.interface = interface
self.output_file = output_file
self.filter_expr = filter_expr
self.num_files = num_files
self.filesize_kb = filesize_kb
self.duration_sec = duration_sec
self.capture_process = None
self.tshark_process = None
output_dir = os.path.dirname(self.output_file)
if output_dir and not os.path.exists(output_dir):
try:
os.makedirs(output_dir)
logger.info(f"Created output directory: {output_dir}")
except OSError as e:
raise ProcessError(f"Failed to create output directory {output_dir}: {e}")
self._setup_paths()
def _setup_paths(self):
"""Setup Wireshark executable paths"""
default_path = "C:\\Program Files\\Wireshark"
self.dumpcap_path = os.path.join(default_path, "dumpcap.exe")
self.tshark_path = os.path.join(default_path, "tshark.exe")
if not os.path.exists(self.dumpcap_path):
raise FileNotFoundError(f"dumpcap.exe not found at {self.dumpcap_path}")
if not os.path.exists(self.tshark_path):
raise FileNotFoundError(f"tshark.exe not found at {self.tshark_path}")
def _start_capture(self) -> subprocess.Popen:
"""Start packet capture process"""
command = [
self.dumpcap_path,
"-i", self.interface,
"-b", f"files:{self.num_files}",
"-b", f"filesize:{self.filesize_kb}",
"-b", f"duration:{self.duration_sec}",
"-b", "printname:stdout",
"-w", self.output_file
]
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
creationflags=subprocess.CREATE_NO_WINDOW
)
if process.poll() is not None:
stderr = process.stderr.read()
raise ProcessError(f"Dumpcap failed to start: {stderr}")
return process
def _analyze_file(self, filepath: str) -> Generator[str, None, None]:
"""Analyze a PCAP file and yield PDML content"""
command = [
self.tshark_path,
"-r", filepath,
"-Y", self.filter_expr,
"-T", "pdml"
]
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
creationflags=subprocess.CREATE_NO_WINDOW
)
if process.poll() is not None:
stderr = process.stderr.read()
raise ProcessError(f"Tshark failed to start: {stderr}")
buffer = []
in_packet = False
while True:
line = process.stdout.readline()
if not line:
break
line = line.strip()
if line.startswith("<packet>"):
in_packet = True
buffer = [line]
elif line.endswith("</packet>"):
buffer.append(line)
yield "\n".join(buffer)
buffer = []
in_packet = False
elif in_packet:
buffer.append(line)
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
def cleanup(self):
"""Cleanup processes"""
for process in [self.capture_process, self.tshark_process]:
if process and process.poll() is None:
try:
process.kill()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
except Exception as e:
logger.error(f"Error stopping process: {e}")
def capture_packets(self) -> Generator[str, None, None]:
"""Capture and analyze packets, yielding packet"""
try:
self.capture_process = self._start_capture()
while True:
filepath = self.capture_process.stdout.readline().strip()
if filepath:
logger.debug(f"Processing capture file: {filepath}")
yield from self._analyze_file(filepath)
except Exception as e:
logger.error(f"Error in packet capture: {e}")
raise
finally:
self.cleanup()
def main():
output_file = str(Path(r"C:\Users\User1\AppData\Local\Temp\capture\file.pcap"))
packet_capture = PacketCaptureLib(
interface="Ethernet0",
output_file=output_file,
filter_expr="",
num_files=10,
filesize_kb=1024,
duration_sec=1)
a = 0
for packet in packet_capture.capture_packets():
a += 1
print(f"packet: {a}")
main()