Crash Detection
Purpose
Detect and report crashes and exceptions that occur during the trace. This script detects system crashes that occur inside of a trace, as well as exceptions thrown in user space.
How to use
Usage: crash_detection.py [-h] [--host host] [--port port] [--mode mode]
[--header]
Detect and report crashes and exceptions that appear during a Reven scenario.
optional arguments:
-h, --help show this help message and exit
--host host Reven host, as a string (default: "localhost")
--port port Reven port, as an int (default: 13370)
--mode mode Whether to look for "user" crash, "system" crash, or "all"
--header If present, display a header with the meaning of each column
Known limitations
Because user space processes can catch exceptions, a user exception reported by this script does not necessarily means that the involved user space process crashed after causing the exception.
Supported versions
Reven 2.12+
Supported perimeter
Any Windows 10/11 x64 Reven scenario.
Dependencies
The script requires that the target Reven scenario have:
- The Fast Search feature replayed.
- The OSSI feature replayed.
- The Backtrace feature replayed.
Source
#!/usr/bin/env python3
import argparse
import reven2
# %% [markdown]
# # Crash Detection
#
# ## Purpose
#
# Detect and report crashes and exceptions that occur during the trace.
#
# This script detects system crashes that occur inside of a trace, as well as exceptions thrown in user space.
#
# ## How to use
#
# ```bash
# Usage: crash_detection.py [-h] [--host host] [--port port] [--mode mode]
# [--header]
#
# Detect and report crashes and exceptions that appear during a Reven scenario.
#
# optional arguments:
# -h, --help show this help message and exit
# --host host Reven host, as a string (default: "localhost")
# --port port Reven port, as an int (default: 13370)
# --mode mode Whether to look for "user" crash, "system" crash, or "all"
# --header If present, display a header with the meaning of each column
# ```
#
# ## Known limitations
#
# Because user space processes can catch exceptions, a user exception reported by this script does not necessarily
# means that the involved user space process crashed after causing the exception.
#
# ## Supported versions
#
# Reven 2.12+
#
# ## Supported perimeter
#
# Any Windows 10/11 x64 Reven scenario.
#
# ## Dependencies
#
# The script requires that the target Reven scenario have:
# - The Fast Search feature replayed.
# - The OSSI feature replayed.
# - The Backtrace feature replayed.
HIGH_LEVEL_EXCEPTION_CODES = {
0x80000003: "breakpoint",
0x80000004: "single step debug",
0xC000001D: "illegal instruction",
0xC0000094: "integer division by zero",
0xC0000005: "access violation",
0xC0000409: "stack buffer overrun",
}
# Obtained by reversing the transformations performed on high level exception codes
# Some of the crashes find the low-level exception codes rather than the high-level ones
LOW_LEVEL_EXCEPTION_CODES = {
0x80000003: "breakpoint",
0x80000004: "single step debug",
0x10000002: "illegal instruction",
0x10000003: "integer division by zero",
0x10000004: "access violation",
}
class SystemCrash:
# Code values recovered here:
# https://docs.microsoft.com/en-us/windows-hardware/drivers/debugger/bug-check-code-reference2
PF_BUG_CHECK_CODES = [0x50, 0xCC, 0xCD, 0xD5, 0xD6]
EXCEPTION_BUG_CHECK_CODES = [0x1E, 0x7E, 0x8E, 0x8E, 0x135, 0x1000007E, 0x1000008E]
SYSTEM_SERVICE_EXCEPTION = 0x3B
KERNEL_SECURITY_CHECK_FAILURE = 0x139
def __init__(self, trace, dispatcher_ctx):
self._trace = trace
self._dispatch_ctx = dispatcher_ctx
self._bug_check_code = dispatcher_ctx.read(reven2.arch.x64.ecx)
self._error_code = None
self._page_fault_address = None
self._page_fault_operation = None
self._process = dispatcher_ctx.ossi.process()
if self._bug_check_code in SystemCrash.PF_BUG_CHECK_CODES:
# page fault address is the 2nd parameter of KeBugCheckEx call for PAGE_FAULT bug checks.
# operation is 3rd parameter of KeBugCheckEx. See for instance:
# https://docs.microsoft.com/en-us/windows-hardware/drivers/debugger/bug-check-0xcc--page-fault-in-freed-special-pool
self._page_fault_address = dispatcher_ctx.read(reven2.arch.x64.rdx)
self._page_fault_operation = dispatcher_ctx.read(reven2.arch.x64.r8)
elif self._bug_check_code in SystemCrash.EXCEPTION_BUG_CHECK_CODES:
# error code is the 2nd parameter of KeBugCheckEx call for EXCEPTION bug checks. See for instance:
# https://docs.microsoft.com/en-us/windows-hardware/drivers/debugger/bug-check-0x1e--kmode-exception-not-handled
self._error_code = dispatcher_ctx.read(reven2.arch.x64.edx)
elif self._bug_check_code == SystemCrash.KERNEL_SECURITY_CHECK_FAILURE:
# error code can be found as the first member of the exception structure that is 4th parameter of
# KeBugCheckEx call. See:
# https://docs.microsoft.com/en-us/windows-hardware/drivers/debugger/bug-check-0x139--kernel-security-check-failure
# https://docs.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-exception_record
self._error_code = dispatcher_ctx.deref(reven2.arch.x64.r9, reven2.types.Pointer(reven2.types.U32))
elif self._bug_check_code == SystemCrash.SYSTEM_SERVICE_EXCEPTION:
self._error_code = dispatcher_ctx.read(reven2.arch.x64.edx)
# Look for the exception transition in the backtrace, if any is found
self._exception_transition = None
for frame in dispatcher_ctx.transition_before().context_before().stack.frames():
if frame.creation_transition is not None and frame.creation_transition.exception:
self._exception_transition = frame.creation_transition
break
@property
def dispatch_ctx(self):
return self._dispatch_ctx
@property
def exception_transition(self):
return self._exception_transition
@property
def error_code(self):
return self._error_code
@property
def bug_check_code(self):
return self._bug_check_code
@property
def page_fault_address(self):
return self._page_fault_address
@property
def page_fault_operation(self):
return self._page_fault_operation
@property
def process(self):
return self._process
class UserCrash:
def __init__(self, trace, dispatcher_ctx):
self._trace = trace
self._dispatch_ctx = dispatcher_ctx
self._exception_transition = None
self._process = dispatcher_ctx.ossi.process()
try:
frame = next(dispatcher_ctx.transition_before().context_before().stack.frames())
self._exception_transition = frame.first_context.transition_before()
except StopIteration:
pass
self._error_code = None
# heuristic: go back some transitions to get good stack trace
ctx_before_rsp_changed = dispatcher_ctx - 8
frames = ctx_before_rsp_changed.stack.frames()
try:
ki_exception_dispatch_ctx = next(frames).first_context
self._error_code = ki_exception_dispatch_ctx.read(reven2.arch.x64.ecx)
except StopIteration:
pass
@property
def dispatch_ctx(self):
return self._dispatch_ctx
@property
def exception_transition(self):
return self._exception_transition
@property
def error_code(self):
return self._error_code
@property
def process(self):
return self._process
def detect_system_crashes(server):
try:
ntoskrnl = next(server.ossi.executed_binaries("ntoskrnl"))
except StopIteration:
raise RuntimeError("Could not find the ntoskrnl binary. " "Is this a Windows 10 trace with OSSI enabled?")
try:
ke_bug_check_ex = next(ntoskrnl.symbols("KeBugCheckEx"))
except StopIteration:
raise RuntimeError(
"Could not find the KeBugCheckEx symbol in ntoskrnl. " "Is this a Windows 10 trace with OSSI enabled?"
)
for call in server.trace.search.symbol(ke_bug_check_ex):
yield SystemCrash(server.trace, call)
def detect_user_crashes(server):
try:
ntdll = next(server.ossi.executed_binaries("ntdll"))
except StopIteration:
raise RuntimeError("Could not find the ntdll binary. " "Is this a Windows 10 trace with OSSI enabled?")
try:
ki_user_exception_dispatcher = next(ntdll.symbols("KiUserExceptionDispatch"))
except StopIteration:
raise RuntimeError(
"Could not find the KiUserExceptionDispatch symbol in ntdll. "
"Is this a Windows 10 trace with OSSI enabled?"
)
for call in server.trace.search.symbol(ki_user_exception_dispatcher):
yield UserCrash(server.trace, call)
def format_exception_code(error_code):
if error_code is None:
return None
if error_code in HIGH_LEVEL_EXCEPTION_CODES:
return "{} ({:#x})".format(HIGH_LEVEL_EXCEPTION_CODES[error_code], error_code)
elif error_code in LOW_LEVEL_EXCEPTION_CODES:
return "{} ({:#x})".format(LOW_LEVEL_EXCEPTION_CODES[error_code], error_code)
return "unknown or incorrect exception code: {:#x}".format(error_code)
def format_page_fault(page_fault_address, page_fault_operation):
if page_fault_address is None:
return None
# operations changed recently for bug check 0x50. It should work with any version though. See:
# https://docs.microsoft.com/en-us/windows-hardware/drivers/debugger/bug-check-0x50--page-fault-in-nonpaged-area#page_fault_in_nonpaged_area-parameters
if page_fault_operation == 0:
operation = "reading"
elif page_fault_operation == 1 or page_fault_operation == 2:
operation = "writing"
elif page_fault_operation == 10:
operation = "executing"
else:
return "page fault on address {:#x}".format(page_fault_address)
return "page fault while {} address {:#x}".format(operation, page_fault_address)
def format_cause(error_code=None, page_fault_address=None, page_fault_operation=None):
exception_fmt = format_exception_code(error_code)
if exception_fmt is not None:
return "{}".format(exception_fmt)
page_fault_fmt = format_page_fault(page_fault_address, page_fault_operation)
if page_fault_fmt is not None:
return "{}".format(page_fault_fmt)
return "Unknown"
def detect_crashes(server, has_system, has_user, has_header=False):
if has_header:
print("Mode | Process | Context | BugCheck | Cause | Exception transition")
print("-----|---------|---------|----------|-------|---------------------")
if has_system:
for system_crash in detect_system_crashes(server):
print(
"System | {} | {} | {:#x} | {} | {}".format(
system_crash.process,
system_crash.dispatch_ctx,
system_crash.bug_check_code,
format_cause(
system_crash.error_code, system_crash.page_fault_address, system_crash.page_fault_operation
),
system_crash.exception_transition,
)
)
if has_user:
for user_crash in detect_user_crashes(server):
print(
"User | {} | {} | N/A | {} | {}".format(
user_crash.process,
user_crash.dispatch_ctx,
format_exception_code(user_crash.error_code),
user_crash.exception_transition,
)
)
CRASH_MODE_DICT = {"all": (True, True), "user": (False, True), "system": (True, False)}
def parse_args():
parser = argparse.ArgumentParser(
description="Detect and report crashes and exceptions that appear during a " "Reven scenario.\n",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--host",
metavar="host",
dest="host",
help='Reven host, as a string (default: "localhost")',
default="localhost",
type=str,
)
parser.add_argument(
"--port", metavar="port", dest="port", help="Reven port, as an int (default: 13370)", type=int, default=13370
)
parser.add_argument(
"--mode",
metavar="mode",
dest="mode",
help='Whether to look for "user" crash, "system" crash, or "all"',
type=str,
default="all",
)
parser.add_argument(
"--header",
action="store_true",
dest="header",
help="If present, display a header with the meaning of each column",
)
args = parser.parse_args()
return args
if __name__ == "__main__":
args = parse_args()
if args.mode not in CRASH_MODE_DICT:
raise ValueError(
'Wrong "mode" value "{}". Mode must be "all",' ' "user" or "system" (defaults to "all").'.format(args.mode)
)
(has_system, has_user) = CRASH_MODE_DICT[args.mode]
# Get a server instance
reven_server = reven2.RevenServer(args.host, args.port)
reven_server.ossi.os().expect(
reven2.ossi.Os(
architecture=reven2.ossi.Architecture.X64,
family=reven2.ossi.OsFamily.Windows,
windows_version=reven2.ossi.WindowsVersion.Windows10,
),
reven2.ossi.Os(
architecture=reven2.ossi.Architecture.X64,
family=reven2.ossi.OsFamily.Windows,
windows_version=reven2.ossi.WindowsVersion.Windows11,
),
)
detect_crashes(reven_server, has_system, has_user, args.header)