Ask Your Question

Revision history [back]

Dumpcap hangs while running ring capture

I am trying to create a python program that continuously capture packets using ring buffer option and .pcap file is then analysed by tshark which output packets in pdml format. but after few seconds dumpcap hangs. I have not able to find the root cause, I hope someone can help me out.

import subprocess
import os
import logging
from typing import Generator
from pathlib import Path


logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class ProcessError(Exception):
    """Custom exception for process-related errors"""
    pass

class PacketCaptureLib:
    def __init__(
        self,
        interface: str,
        output_file: str,
        filter_expr: str = "smb2",
        num_files: int = 10,
        filesize_kb: int = 1024,
        duration_sec: int = 1
    ):
        self.interface = interface
        self.output_file = output_file
        self.filter_expr = filter_expr
        self.num_files = num_files
        self.filesize_kb = filesize_kb
        self.duration_sec = duration_sec
        self.capture_process = None
        self.tshark_process = None

        output_dir = os.path.dirname(self.output_file)
        if output_dir and not os.path.exists(output_dir):
            try:
                os.makedirs(output_dir)
                logger.info(f"Created output directory: {output_dir}")
            except OSError as e:
                raise ProcessError(f"Failed to create output directory {output_dir}: {e}")
        self._setup_paths()

    def _setup_paths(self):
        """Setup Wireshark executable paths"""
        default_path = "C:\\Program Files\\Wireshark"
        self.dumpcap_path = os.path.join(default_path, "dumpcap.exe")
        self.tshark_path = os.path.join(default_path, "tshark.exe")

        if not os.path.exists(self.dumpcap_path):
            raise FileNotFoundError(f"dumpcap.exe not found at {self.dumpcap_path}")
        if not os.path.exists(self.tshark_path):
            raise FileNotFoundError(f"tshark.exe not found at {self.tshark_path}")

    def _start_capture(self) -> subprocess.Popen:
        """Start packet capture process"""
        command = [
            self.dumpcap_path,
            "-i", self.interface,
            "-b", f"files:{self.num_files}",
            "-b", f"filesize:{self.filesize_kb}",
            "-b", f"duration:{self.duration_sec}",
            "-b", "printname:stdout",
            "-w", self.output_file
        ]

        process = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=1,
            creationflags=subprocess.CREATE_NO_WINDOW
        )

        if process.poll() is not None:
            stderr = process.stderr.read()
            raise ProcessError(f"Dumpcap failed to start: {stderr}")

        return process

    def _analyze_file(self, filepath: str) -> Generator[str, None, None]:
        """Analyze a PCAP file and yield PDML content"""
        command = [
            self.tshark_path,
            "-r", filepath,
            "-Y", self.filter_expr,
            "-T", "pdml"
        ]

        process = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=1,
            creationflags=subprocess.CREATE_NO_WINDOW
        )

        if process.poll() is not None:
            stderr = process.stderr.read()
            raise ProcessError(f"Tshark failed to start: {stderr}")

        buffer = []
        in_packet = False

        while True:
            line = process.stdout.readline()
            if not line:
                break

            line = line.strip()
            if line.startswith("<packet>"):
                in_packet = True
                buffer = [line]
            elif line.endswith("</packet>"):
                buffer.append(line)
                yield "\n".join(buffer)
                buffer = []
                in_packet = False
            elif in_packet:
                buffer.append(line)

        process.terminate()
        try:
            process.wait(timeout=5)
        except subprocess.TimeoutExpired:
            process.kill()
            process.wait()

    def cleanup(self):
        """Cleanup processes"""
        for process in [self.capture_process, self.tshark_process]:
            if process and process.poll() is None:
                try:
                    process.kill()
                    process.wait(timeout=5)
                except subprocess.TimeoutExpired:
                    process.kill()
                    process.wait()
                except Exception as e:
                    logger.error(f"Error stopping process: {e}")

    def capture_packets(self) -> Generator[str, None, None]:
        """Capture and analyze packets, yielding packet"""
        try:
            self.capture_process = self._start_capture()

            while True:
                filepath = self.capture_process.stdout.readline().strip()
                if filepath:
                    logger.debug(f"Processing capture file: {filepath}")
                    yield from self._analyze_file(filepath)

        except Exception as e:
            logger.error(f"Error in packet capture: {e}")
            raise
        finally:
            self.cleanup()

def main():
    output_file = str(Path(r"C:\Users\User1\AppData\Local\Temp\capture\file.pcap"))
    packet_capture = PacketCaptureLib(
        interface="Ethernet0",
        output_file=output_file,
        filter_expr="",
        num_files=10,
        filesize_kb=1024,
        duration_sec=1)
    a = 0
    for packet in packet_capture.capture_packets():
        a += 1
        print(f"packet: {a}")
main()