Dump PCAP

Purpose

Generate a PCAP file containing all network packets that were sent/received in a trace.

The timestamp of packets is replaced by the transition id where the packet was sent/received.

How to use

usage: dump_pcap.py [-h] [--host host] [--port port] [--filename file_name]
                    [--fix-checksum]

Dump a PCAP file from a Windows 10 x64 trace. To get the time as transition ID in wireshark, select:
View->Time display format->Seconds since 1970-01-01

optional arguments:
  -h, --help            show this help message and exit
  --host host           Reven host, as a string (default: "localhost")
  --port port           Reven port, as an int (default: 13370)
  --filename file_name  the output file name (default: "output.pcap"). Will be created if it doesn't exist
  --fix-checksum        If not specified, the packet checksum won't be fixed and you will have the buffer
                        that has been dumped from memory, and a lot of ugly packets in Wireshark,
                        that you can also ignore if needed.

Known limitations

N/A

Supported versions

Reven 2.12+

Supported perimeter

Any Windows 10/11 x64 scenario.

Dependencies

  • The script requires the scapy package
  • The network_packet_tools.py file distributed alongside this example must be provided (e.g. in the same directory).
  • The script requires that the target Reven scenario have:
    • The Fast Search feature replayed.
    • The OSSI feature replayed.
    • An access to the binary e1g6032e.sys and its PDB file.

Source

#!/usr/bin/env python3

import argparse
import itertools
import os
from typing import Iterator as _Iterator, List as _List, Optional as _Optional, Tuple as _Tuple

import network_packet_tools as nw_tools

import reven2

from scapy.all import Ether, TCP, wrpcap

"""
# Dump PCAP

## Purpose

Generate a PCAP file containing all network packets that were sent/received in a trace.

The timestamp of packets is replaced by the transition id where the packet was sent/received.

## How to use

```bash
usage: dump_pcap.py [-h] [--host host] [--port port] [--filename file_name]
                    [--fix-checksum]

Dump a PCAP file from a Windows 10 x64 trace. To get the time as transition ID in wireshark, select:
View->Time display format->Seconds since 1970-01-01

optional arguments:
  -h, --help            show this help message and exit
  --host host           Reven host, as a string (default: "localhost")
  --port port           Reven port, as an int (default: 13370)
  --filename file_name  the output file name (default: "output.pcap"). Will be created if it doesn't exist
  --fix-checksum        If not specified, the packet checksum won't be fixed and you will have the buffer
                        that has been dumped from memory, and a lot of ugly packets in Wireshark,
                        that you can also ignore if needed.
```

## Known limitations

N/A

## Supported versions

Reven 2.12+

## Supported perimeter

Any Windows 10/11 x64 scenario.

## Dependencies

- The script requires the scapy package
- The `network_packet_tools.py` file distributed alongside this example must be provided (e.g. in the same directory).
- The script requires that the target Reven scenario have:
    - The Fast Search feature replayed.
    - The OSSI feature replayed.
    - An access to the binary `e1g6032e.sys` and its PDB file.
"""


def parse_args():
    parser = argparse.ArgumentParser(
        description="Dump a PCAP file from a Windows 10 x64 trace. "
        "To get the time as transition ID in "
        "wireshark, select:\nView->Time display format->Seconds since "
        "1970-01-01\n",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--host",
        metavar="host",
        dest="host",
        help='Reven host, as a string (default: "localhost")',
        default="localhost",
        type=str,
    )
    parser.add_argument(
        "--port", metavar="port", dest="port", help="Reven port, as an int (default: 13370)", type=int, default=13370
    )
    parser.add_argument(
        "--filename",
        metavar="file_name",
        dest="file_name",
        help="the output " 'file name (default: "output.pcap"). Will be created if it doesn\'t exist',
        default="output.pcap",
    )
    parser.add_argument(
        "--fix-checksum",
        dest="fix_checksum",
        action="store_true",
        help="If not specified, the packet checksum won't be fixed and you will have the buffer that \
                        has been dumped from memory, and a lot of ugly packets in Wireshark, that you can also ignore \
                        if needed.",
    )

    args = parser.parse_args()
    return args


def get_network_buffer_recv_RxPacketAssemble(
    ctx: reven2.trace.Context,
) -> _Tuple[_List[reven2.MemoryRange[reven2.address._AbstractAddress]], _Optional[bytearray]]:
    packet_memory_range = nw_tools.get_memory_address_and_size_of_received_network_packet(ctx)
    Buffer: _Optional[bytearray] = None
    sources = []
    if packet_memory_range is not None:
        sources = [packet_memory_range]
        # Get the buffer
        Buffer = ctx.read(packet_memory_range, raw=True)
    return sources, Buffer


def get_network_buffer_send_NdisSendNetBufferLists(
    reven_server: reven2.RevenServer, ctx: reven2.trace.Context
) -> _Tuple[_List[reven2.MemoryRange[reven2.address._AbstractAddress]], _Optional[bytearray]]:
    packet_memory_ranges = nw_tools.get_memory_addresses_and_sizes_of_sent_network_packet(reven_server, ctx)
    Buffer: _Optional[bytearray] = None
    sources = []
    # read buffer and join them
    for memory_range in packet_memory_ranges:
        sources.append(memory_range)
        if Buffer is None:
            Buffer = ctx.read(memory_range, raw=True)
        else:
            Buffer += ctx.read(memory_range, raw=True)

    return sources, Buffer


def get_all_send_recv(reven_server: reven2.RevenServer) -> _Iterator[_Tuple[reven2.trace.Context, str]]:
    print("[+] Get all sent/received packets...")

    send_queries, recv_queries = nw_tools.get_all_send_recv_packet_context(reven_server)

    # `reven2.util.collate` enables to iterate over multiple generators in a sorted way
    send_results = zip(reven2.util.collate(send_queries), itertools.repeat("send"))
    recv_results = zip(reven2.util.collate(recv_queries), itertools.repeat("recv"))

    # Return a sorted generator of both results regarding their context
    return reven2.util.collate([send_results, recv_results], lambda ctx_type: ctx_type[0])


def dump_pcap(reven_server: reven2.RevenServer, output_file: str = "output.pcap", fix_checksum: bool = False) -> None:
    if os.path.isfile(output_file):
        raise RuntimeError(
            '"{}" already exists. Choose an other output file or remove it before running the script.'.format(
                output_file
            )
        )

    print("[+] Creating pcap from trace...")

    # Get all send and recv from the trace
    results = list(get_all_send_recv(reven_server))
    if len(results) == 0:
        print("[+] Finished: no network packets were sent/received in the trace")
        return

    # Get packets buffers and create the pcap file.
    print("[+] Convert packets to pcap format and write to file...")
    for ctx, ty in results:
        # Just detect if send or recv context
        if ty == "send":
            sources, buf = get_network_buffer_send_NdisSendNetBufferLists(reven_server, ctx)
        else:
            sources, buf = get_network_buffer_recv_RxPacketAssemble(ctx)

        if buf is not None:
            packet = Ether(bytes(buf))

            # Here we check wether or not we have to fix checksum.
            if fix_checksum:
                if TCP in packet:
                    del packet[TCP].chksum

            # Replace the time in the packet by the transition ID, so that we get
            # it in Wireshark in a nice way.
            transition = ctx.transition_before().id
            packet.time = transition

            # Write packet to pcap file
            wrpcap(output_file, packet, append=True)

            # Print packet information
            sources_str = ", ".join(
                [
                    "{size} bytes at {address}".format(size=memory_range.size, address=memory_range.address)
                    for memory_range in sources
                ]
            )
            print("#{transition} [{type}] {sources}".format(transition=transition, type=ty, sources=sources_str))

    print("[+] Finished: PCAP file is '{}'.".format(output_file))


if __name__ == "__main__":
    args = parse_args()

    # Get a server instance
    reven_server = reven2.RevenServer(args.host, args.port)
    reven_server.ossi.os().expect(
        reven2.ossi.Os(
            architecture=reven2.ossi.Architecture.X64,
            family=reven2.ossi.OsFamily.Windows,
            windows_version=reven2.ossi.WindowsVersion.Windows10,
        ),
        reven2.ossi.Os(
            architecture=reven2.ossi.Architecture.X64,
            family=reven2.ossi.OsFamily.Windows,
            windows_version=reven2.ossi.WindowsVersion.Windows11,
        ),
    )

    # Generate the PCAP file
    dump_pcap(reven_server, output_file=args.file_name, fix_checksum=args.fix_checksum)