Thread id
Purpose
Detect the current thread at a point in the trace and find when the thread is created.
How to use
usage: thread_id.py [-h] [--host HOST] [-p PORT] TRANSITION_ID
positional arguments:
TRANSITION_ID Get thread id at transition (before)
optional arguments:
-h, --help show this help message and exit
--host HOST Reven host, as a string (default: "localhost")
-p PORT, --port PORT Reven port, as an int (default: 13370)
Known limitations
- Current thread is not detected if the given point is in ring0.
Supported versions
Reven 2.12+
Supported perimeter
Any Windows 10/11 on x86-64 scenario. Given point must be in a 64 bit process.
Dependencies
The script requires that the target Reven scenario have: * The Fast Search feature replayed. * The OSSI feature replayed. * An access to the binary 'ntdll.dll' and its PDB file.
Source
import argparse
import reven2
from reven2.address import LogicalAddress
from reven2.arch import x64
"""
# Thread id
## Purpose
Detect the current thread at a point in the trace
and find when the thread is created.
## How to use
```bash
usage: thread_id.py [-h] [--host HOST] [-p PORT] TRANSITION_ID
positional arguments:
TRANSITION_ID Get thread id at transition (before)
optional arguments:
-h, --help show this help message and exit
--host HOST Reven host, as a string (default: "localhost")
-p PORT, --port PORT Reven port, as an int (default: 13370)
```
## Known limitations
- Current thread is not detected if the given point is in ring0.
## Supported versions
Reven 2.12+
## Supported perimeter
Any Windows 10/11 on x86-64 scenario. Given point must be in a 64 bit process.
## Dependencies
The script requires that the target Reven scenario have:
* The Fast Search feature replayed.
* The OSSI feature replayed.
* An access to the binary 'ntdll.dll' and its PDB file.
"""
class ThreadInfo(object):
def __init__(self, ctxt):
self.cr3 = ctxt.read(x64.cr3)
self.pid = ctxt.read(LogicalAddress(0x40, x64.gs), 4)
self.tid = ctxt.read(LogicalAddress(0x48, x64.gs), 4)
def __eq__(self, other):
return (self.cr3, self.pid, self.tid) == (
other.cr3,
other.pid,
other.tid,
)
def __ne__(self, other):
return not self == other
def context_ring(ctxt):
return ctxt.read(x64.cs) & 0x3
def all_start_thread_calls(ossi, trace):
# look for RtlUserThreadStart
ntdll_dll = next(ossi.executed_binaries("c:/windows/system32/ntdll.dll"))
rtl_user_thread_start = next(ntdll_dll.symbols("RtlUserThreadStart"))
return trace.search.symbol(rtl_user_thread_start)
def thread_search_pc(trace, thread_info, pc, from_context=None, to_context=None):
matches = trace.search.pc(pc, from_context=from_context, to_context=to_context)
for ctxt in matches:
# ensure current match is in requested thread
if ThreadInfo(ctxt) == thread_info:
yield ctxt
def find_thread_starting_transition(rvn, thread_info):
for start_thread_ctxt in all_start_thread_calls(rvn.ossi, rvn.trace):
if ThreadInfo(start_thread_ctxt) == thread_info:
# the first argument is the start address of the thread
thread_start_address = start_thread_ctxt.read(x64.rcx)
matches = thread_search_pc(
rvn.trace,
thread_info,
pc=thread_start_address,
from_context=start_thread_ctxt,
)
for match in matches:
return match.transition_after()
return None
def print_thread_info(rvn, tr_id):
ctxt = rvn.trace.transition(tr_id).context_before()
if context_ring(ctxt) == 0:
print("(User) thread may not count in ring 0")
return
# pid, tid at the transition
thread = ThreadInfo(ctxt)
start_transition = find_thread_starting_transition(rvn, thread)
if start_transition is None:
print("TID: {thread.tid} (PID: {thread.pid}) starting transition not found".format(thread=thread))
return
print(
"TID: {thread.tid} (PID: {thread.pid}), starts at: {transition}".format(
thread=thread, transition=start_transition
)
)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--host",
type=str,
default="localhost",
help='Reven host, as a string (default: "localhost")',
)
parser.add_argument(
"-p",
"--port",
type=int,
default="13370",
help="Reven port, as an int (default: 13370)",
)
parser.add_argument(
"transition_id",
metavar="TRANSITION_ID",
type=int,
help="Get thread id at transition (before)",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
rvn = reven2.RevenServer(args.host, args.port)
rvn.ossi.os().expect(
reven2.ossi.Os(
architecture=reven2.ossi.Architecture.X64,
family=reven2.ossi.OsFamily.Windows,
windows_version=reven2.ossi.WindowsVersion.Windows10,
),
reven2.ossi.Os(
architecture=reven2.ossi.Architecture.X64,
family=reven2.ossi.OsFamily.Windows,
windows_version=reven2.ossi.WindowsVersion.Windows11,
),
)
tr_id = args.transition_id
print_thread_info(rvn, tr_id)