Thread synchronization

Purpose

Trace calls to Windows Synchronization APIs

  • Critical sections: 'InitializeCriticalSection', 'InitializeCriticalSectionEx', 'EnterCriticalSection', 'LeaveCriticalSection', 'SleepConditionVariableCS', 'SleepConditionVariableCSRW'

  • Slim reader/writer locks: 'RtlInitializeSRWLock', 'RtlAcquireSRWLockShared', 'RtlAcquireSRWLockExclusive', 'RtlReleaseSRWLockShared', 'RtlReleaseSRWLockExclusive'

  • Mutex: 'CreateMutexW', 'OpenMutexW', 'WaitForSingleObject', ReleaseMutex

  • Condition variables: 'InitializeConditionVariable', 'WakeConditionVariable', 'WakeAllConditionVariable', 'RtlWakeConditionVariable', 'RtlWakeAllConditionVariable'

How to use

usage: threadsync.py [-h] [--host HOST] [-p PORT] --pid PID
                     [--from_id FROM_ID] [--to_id TO_ID] [--binary BINARY]
                     [--sync PRIMITIVE]

optional arguments:
  -h, --help            show this help message and exit
  --host HOST           Reven host, as a string (default: "localhost")
  -p PORT, --port PORT  Reven port, as an int (default: 13370)
  --pid PID             Process id
  --from_id FROM_ID     Transition to start searching from
  --to_id TO_ID         Transition to start searching to
  --binary BINARY       Process binary
  --sync PRIMITIVE      Synchronization primitives to check.
                        Each repeated use of `--sync <primitive>` adds the corresponding primitive to check.
                        If no `--sync` option is passed, then all supported primitives are checked.
                        Supported primitives:
                         - cs:  critical section
                         - cv:  condition variable
                         - mx:  mutex
                         - srw: slim read/write lock

Known limitations

N/A

Supported versions

Reven 2.12+

Supported perimeter

Any Windows 10/11 on x86-64 scenario.

Dependencies

  • OSSI (with symbols ntdll.dll, kernelbase.dll, ntoskrnl.exe resolved) feature replayed.
  • Fast Search feature replayed.

Source

import argparse

import reven2
from reven2.address import LogicalAddress
from reven2.arch import x64
from reven2.util import collate

"""
# Thread synchronization

## Purpose

Trace calls to Windows Synchronization APIs
  - Critical sections:        'InitializeCriticalSection', 'InitializeCriticalSectionEx',
                              'EnterCriticalSection', 'LeaveCriticalSection',
                              'SleepConditionVariableCS',  'SleepConditionVariableCSRW'

  - Slim reader/writer locks: 'RtlInitializeSRWLock',
                              'RtlAcquireSRWLockShared', 'RtlAcquireSRWLockExclusive',
                              'RtlReleaseSRWLockShared', 'RtlReleaseSRWLockExclusive'

  - Mutex:                    'CreateMutexW', 'OpenMutexW', 'WaitForSingleObject', `ReleaseMutex`

  - Condition variables:      'InitializeConditionVariable',
                              'WakeConditionVariable', 'WakeAllConditionVariable',
                              'RtlWakeConditionVariable', 'RtlWakeAllConditionVariable'

## How to use

```bash
usage: threadsync.py [-h] [--host HOST] [-p PORT] --pid PID
                     [--from_id FROM_ID] [--to_id TO_ID] [--binary BINARY]
                     [--sync PRIMITIVE]

optional arguments:
  -h, --help            show this help message and exit
  --host HOST           Reven host, as a string (default: "localhost")
  -p PORT, --port PORT  Reven port, as an int (default: 13370)
  --pid PID             Process id
  --from_id FROM_ID     Transition to start searching from
  --to_id TO_ID         Transition to start searching to
  --binary BINARY       Process binary
  --sync PRIMITIVE      Synchronization primitives to check.
                        Each repeated use of `--sync <primitive>` adds the corresponding primitive to check.
                        If no `--sync` option is passed, then all supported primitives are checked.
                        Supported primitives:
                         - cs:  critical section
                         - cv:  condition variable
                         - mx:  mutex
                         - srw: slim read/write lock
```

## Known limitations

N/A

## Supported versions

Reven 2.12+

## Supported perimeter

Any Windows 10/11 on x86-64 scenario.

## Dependencies

- OSSI (with symbols `ntdll.dll`, `kernelbase.dll`, `ntoskrnl.exe` resolved) feature replayed.
- Fast Search feature replayed.
"""


class SyncOSSI(object):
    def __init__(self, ossi, trace):
        # basic OSSI
        bin_symbol_names = {
            "c:/windows/system32/ntdll.dll": {
                # critical section
                "RtlInitializeCriticalSection",
                "RtlInitializeCriticalSectionEx",
                "RtlEnterCriticalSection",
                "RtlLeaveCriticalSection",
                # slim read/write lock
                "RtlInitializeSRWLock",
                "RtlAcquireSRWLockShared",
                "RtlAcquireSRWLockExclusive",
                "RtlReleaseSRWLockShared",
                "RtlReleaseSRWLockExclusive",
                # condition variable (general)
                "RtlWakeConditionVariable",
                "RtlWakeAllConditionVariable",
            },
            "c:/windows/system32/kernelbase.dll": {
                # mutex
                "CreateMutexW",
                "OpenMutexW",
                "ReleaseMutex",
                "WaitForSingleObject",
                # condition variable (general)
                "InitializeConditionVariable",
                "WakeConditionVariable",
                "WakeAllConditionVariable",
                # condition variable on critical section
                "SleepConditionVariableCS",
                # condition variable on slim read/write lock
                "SleepConditionVariableSRW",
            },
        }

        self.symbols = {}

        for bin, symbol_names in bin_symbol_names.items():
            try:
                exec_bin = next(ossi.executed_binaries(f"^{bin}$"))
            except StopIteration:
                raise RuntimeError(f"{bin} not found")

            for name in symbol_names:
                try:
                    sym = next(exec_bin.symbols(f"^{name}$"))
                    self.symbols[name] = sym
                except StopIteration:
                    print(f"Warning: {name} not found in {bin}")

        self.has_debugger = True
        try:
            trace.first_transition.step_over()
        except RuntimeError:
            print(
                "Warning: the debugger interface is not available, so the script cannot determine \
function return values.\nMake sure the stack events and PC range resources are replayed for this scenario."
            )
            self.has_debugger = False

        self.search_symbol = trace.search.symbol
        self.trace = trace


# tool functions
def context_cr3(ctxt):
    return ctxt.read(x64.cr3)


def is_kernel_mode(ctxt):
    return ctxt.read(x64.cs) & 0x3 == 0


def find_process_ranges(rvn, pid, from_id, to_id):
    """
    Traversing over the trace, looking for ranges of the interested process

    Parameters:
     - rvn: RevenServer
     - pid: int (process id)

    Output: yielding ranges
    """
    try:
        ntoskrnl = next(rvn.ossi.executed_binaries("^c:/windows/system32/ntoskrnl.exe$"))
    except StopIteration:
        raise RuntimeError("ntoskrnl.exe not found")

    try:
        ki_swap_context = next(ntoskrnl.symbols("^KiSwapContext$"))
    except StopIteration:
        raise RuntimeError("KiSwapContext not found")

    if from_id is None:
        ctxt_low = rvn.trace.first_context
    else:
        try:
            ctxt_low = rvn.trace.transition(from_id).context_before()
        except IndexError:
            raise RuntimeError(f"Transition of id {from_id} not found")

    if to_id is None:
        ctxt_hi = None
    else:
        try:
            ctxt_hi = rvn.trace.transition(to_id).context_before()
        except IndexError:
            raise RuntimeError(f"Transition of id {to_id} not found")

    if ctxt_hi is not None and ctxt_low >= ctxt_hi:
        return

    for ctxt in rvn.trace.search.symbol(
        ki_swap_context,
        from_context=ctxt_low,
        to_context=None if ctxt_hi is None or ctxt_hi == rvn.trace.last_context else ctxt_hi,
    ):
        if ctxt_low.ossi.process().pid == pid:
            yield (ctxt_low, ctxt)
        ctxt_low = ctxt

    if ctxt_low.ossi.process().pid == pid:
        if ctxt_hi is not None and ctxt_low < ctxt_hi:
            yield (ctxt_low, ctxt_hi)
        else:
            yield (ctxt_low, None)


def find_usermode_ranges(ctxt_low, ctxt_high):
    if ctxt_high is None:
        if not is_kernel_mode(ctxt_low):
            yield (ctxt_low, None)
        return

    ctxt_current = ctxt_low
    while ctxt_current < ctxt_high:
        ctxt_next = ctxt_current.find_register_change(x64.cs, is_forward=True)

        if not is_kernel_mode(ctxt_current):
            if ctxt_next is None or ctxt_next > ctxt_high:
                yield (ctxt_current, ctxt_high)
                break
            else:
                yield (ctxt_current, ctxt_next - 1)

        if ctxt_next is None:
            break

        ctxt_current = ctxt_next


def get_tid(ctxt):
    return ctxt.read(LogicalAddress(0x48, x64.gs), 4)


def caller_context_in_binary(ctxt, binary):
    if binary is None:
        return True

    caller_ctxt = ctxt - 1
    if caller_ctxt is None:
        return False

    caller_binary = caller_ctxt.ossi.location().binary
    if caller_binary is None:
        return False

    return binary in [caller_binary.name, caller_binary.filename, caller_binary.path]


def build_ordered_api_calls(syncOSSI, ctxt_low, ctxt_high, apis):
    def gen(api):
        return (
            (api, ctxt)
            for ctxt in syncOSSI.search_symbol(syncOSSI.symbols[api], from_context=ctxt_low, to_context=ctxt_high)
        )

    api_contexts = (
        # Using directly a generator expression appears to give wrong results,
        # while using a function works as expected.
        gen(api)
        for api in apis
        if api in syncOSSI.symbols
    )

    return collate(api_contexts, key=lambda name_ctxt: name_ctxt[1])


def get_return_value(syncOSSI, ctxt):
    if not syncOSSI.has_debugger:
        return False

    try:
        trans_after = ctxt.transition_after()
    except IndexError:
        return None

    if trans_after is None:
        return None

    trans_ret = trans_after.step_out()
    if trans_ret is None:
        return None

    ctxt_ret = trans_ret.context_after()
    return ctxt_ret.read(x64.rax)


# Values of primitive_handle and return_value fall into one of the following:
#  - None
#  - a numeric value
#  - a string literal
# where None is used usually in cases the script cannot reliably get the actual
# value of the object.
def print_csv(csv_file, ctxt, sync_primitive, primitive_handle, return_value):
    try:
        transition_id = ctxt.transition_before().id + 1
    except IndexError:
        transition_id = 0

    if primitive_handle is None:
        formatted_primitive_handle = "unknown"
    else:
        formatted_primitive_handle = (
            primitive_handle if isinstance(primitive_handle, str) else f"{primitive_handle:#x}"
        )

    if return_value is None:
        formatted_ret_value = "unknown"
    else:
        formatted_ret_value = return_value if isinstance(return_value, str) else f"{return_value:#x}"

    output = f"{transition_id}, {sync_primitive}, {formatted_primitive_handle}, {formatted_ret_value}\n"

    try:
        csv_file.write(output)
    except OSError:
        print(f"Failed to write to {csv_file}")


def check_critical_section(syncOSSI, ctxt_low, ctxt_high, binary, csv_file):
    critical_section_apis = {
        "RtlInitializeCriticalSection",
        "RtlInitializeCriticalSectionEx",
        "RtlEnterCriticalSection",
        "RtlLeaveCriticalSection",
        "SleepConditionVariableCS",
    }

    found = False

    ordered_api_contexts = build_ordered_api_calls(syncOSSI, ctxt_low, ctxt_high, critical_section_apis)
    for api, ctxt in ordered_api_contexts:
        if not caller_context_in_binary(ctxt, binary):
            continue

        found = True

        if api in {
            "RtlInitializeCriticalSection",
            "RtlInitializeCriticalSectionEx",
            "RtlEnterCriticalSection",
            "RtlLeaveCriticalSection",
        }:
            # the critical section handle is the first argument
            cs_handle = ctxt.read(x64.rcx)

            if csv_file is None:
                print(f"{ctxt}: {api}, critical section 0x{cs_handle:x}")
            else:
                # any API of this group returns void, then "unused" is passed as the return value
                print_csv(csv_file, ctxt, "cs", cs_handle, "unused")

        elif api in {"SleepConditionVariableCS"}:
            # the condition variable is the first argument
            cv_handle = ctxt.read(x64.rcx)

            # the critical section is the second argument
            cs_handle = ctxt.read(x64.rcx)

            if csv_file is None:
                print(f"{ctxt}: {api}, critical section 0x{cs_handle:x}, condition variable 0x{cv_handle:x}")
            else:
                # go to the return point (if possible) to get the return value
                ret_val = get_return_value(syncOSSI, ctxt)

                print_csv(csv_file, ctxt, "cs", cs_handle, ret_val)
                print_csv(csv_file, ctxt, "cv", cv_handle, ret_val)

    return found


def check_srw_lock(syncOSSI, ctxt_low, ctxt_high, binary, csv_file):
    srw_apis = [
        "RtlInitializeSRWLock",
        "RtlAcquireSRWLockShared",
        "RtlAcquireSRWLockExclusive",
        "RtlReleaseSRWLockShared",
        "RtlReleaseSRWLockExclusive",
    ]

    found = False

    ordered_api_contexts = build_ordered_api_calls(syncOSSI, ctxt_low, ctxt_high, srw_apis)
    for api, ctxt in ordered_api_contexts:
        if not caller_context_in_binary(ctxt, binary):
            continue

        found = True

        # the srw lock handle is the first argument
        srw_handle = ctxt.read(x64.rcx)

        if csv_file is None:
            print(f"{ctxt}: {api}, lock 0x{srw_handle:x}")
        else:
            # any API of this group returns void, then "unused" is passed as the return value
            print_csv(csv_file, ctxt, "srw", srw_handle, "unused")

    return found


def check_mutex(syncOSSI, ctxt_low, ctxt_high, binary, used_handles, csv_file):
    mutex_apis = {"CreateMutexW", "OpenMutexW", "WaitForSingleObject", "ReleaseMutex"}

    found = False

    ordered_api_contexts = build_ordered_api_calls(syncOSSI, ctxt_low, ctxt_high, mutex_apis)
    for api, ctxt in ordered_api_contexts:
        if not caller_context_in_binary(ctxt, binary):
            continue

        found = True

        if api in {"CreateMutexW", "OpenMutexW"}:
            # go to the return point (if possible) to get the mutex handle
            mx_handle = get_return_value(syncOSSI, ctxt)
            if mx_handle is not None and mx_handle != 0:
                used_handles.add(mx_handle)

            if csv_file is None:
                if mx_handle is None:
                    print(f"{ctxt}: {api}, mutex handle unknown")
                else:
                    if mx_handle == 0:
                        print(f"{ctxt}: {api}, failed")
                    else:
                        print(f"{ctxt}: {api}, mutex handle 0x{mx_handle:x}")
            else:
                print_csv(csv_file, ctxt, "mx", mx_handle, mx_handle)

        elif api in {"ReleaseMutex"}:
            mx_handle = ctxt.read(x64.rcx)
            used_handles.add(mx_handle)

            if csv_file is None:
                print(f"{ctxt}: {api}, mutex handle 0x{mx_handle:x}")
            else:
                # go to the return point (if possible) to get the return value
                ret_val = get_return_value(syncOSSI, ctxt)
                print_csv(csv_file, ctxt, "mx", mx_handle, ret_val)

        elif api in {"WaitForSingleObject"}:
            handle = ctxt.read(x64.rcx)
            if handle in used_handles:
                if csv_file is None:
                    print(f"{ctxt}: {api}, mutex handle 0x{handle:x}")
                else:
                    ret_val = get_return_value(syncOSSI, ctxt)
                    print_csv(csv_file, ctxt, "mx", handle, ret_val)

    return found


def check_condition_variable(syncOSSI, ctxt_low, ctxt_high, binary, csv_file):
    cond_var_apis = {
        "InitializeConditionVariable",
        "WakeConditionVariable",
        "WakeAllConditionVariable",
        "RtlWakeConditionVariable",
        "RtlWakeAllConditionVariable",
    }

    found = False

    ordered_api_contexts = build_ordered_api_calls(syncOSSI, ctxt_low, ctxt_high, cond_var_apis)
    for api, ctxt in ordered_api_contexts:
        if not caller_context_in_binary(ctxt, binary):
            continue

        found = True

        # the condition variable is the first argument
        cv_handle = ctxt.read(x64.rcx)

        if csv_file is None:
            print(f"{ctxt}: {api}, condition variable 0x{cv_handle:x}")
        else:
            # any API of this group returns void, then "unused" is passed as the return value
            print_csv(csv_file, ctxt, "cv", cv_handle, "unused")

    return found


def check_lock_unlock(syncOSSI, ctxt_low, ctxt_high, binary, sync_primitives, used_mutex_handles, csv_file):
    transition_low = ctxt_low.transition_after().id
    transition_high = ctxt_high.transition_after().id if ctxt_high is not None else syncOSSI.trace.last_transition.id

    tid = get_tid(ctxt_low)

    if csv_file is None:
        print(
            "\n==== checking the transition range [#{}, #{}] (thread id: {}) ====".format(
                transition_low, transition_high, tid
            )
        )

    found = False

    if "cs" in sync_primitives:
        cs_found = check_critical_section(syncOSSI, ctxt_low, ctxt_high, binary, csv_file)
        found = found or cs_found

    if "srw" in sync_primitives:
        srw_found = check_srw_lock(syncOSSI, ctxt_low, ctxt_high, binary, csv_file)
        found = found or srw_found

    if "cv" in sync_primitives:
        cv_found = check_condition_variable(syncOSSI, ctxt_low, ctxt_high, binary, csv_file)
        found = found or cv_found

    if "mx" in sync_primitives:
        mx_found = check_mutex(syncOSSI, ctxt_low, ctxt_high, binary, used_mutex_handles, csv_file)
        found = found or mx_found

    if not found and csv_file is None:
        print("\tnothing found")


def run(rvn, proc_id, proc_bin, from_id, to_id, sync_primitives, csv_file):
    syncOSSI = SyncOSSI(rvn.ossi, rvn.trace)
    used_mutex_handles = set()
    process_ranges = find_process_ranges(rvn, proc_id, from_id, to_id)
    for low, high in process_ranges:
        user_mode_ranges = find_usermode_ranges(low, high)
        for low_usermode, high_usermode in user_mode_ranges:
            check_lock_unlock(
                syncOSSI, low_usermode, high_usermode, proc_bin, sync_primitives, used_mutex_handles, csv_file
            )


def parse_args():
    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
    parser.add_argument(
        "--host",
        type=str,
        default="localhost",
        help='Reven host, as a string (default: "localhost")',
    )
    parser.add_argument(
        "-p",
        "--port",
        type=int,
        default="13370",
        help="Reven port, as an int (default: 13370)",
    )
    parser.add_argument("--pid", type=int, required=True, help="Process id")
    parser.add_argument("--from_id", type=int, default=None, help="Transition to start searching from")
    parser.add_argument("--to_id", type=int, default=None, help="Transition to start searching to")
    parser.add_argument(
        "--binary",
        type=str,
        default=None,
        help="Process binary",
    )
    parser.add_argument(
        "--sync",
        type=str,
        metavar="PRIMITIVE",
        action="append",
        choices=["cs", "cv", "mx", "srw"],
        default=None,
        help="Synchronization primitives to check.\n"
        + "Each repeated use of `--sync <primitive>` adds the corresponding primitive to check.\n"
        + "If no `--sync` option is passed, then all supported primitives are checked.\n"
        + "Supported primitives:\n"
        + " - cs:  critical section\n"
        + " - cv:  condition variable\n"
        + " - mx:  mutex\n"
        + " - srw: slim read/write lock",
    )
    parser.add_argument("--raw", type=str, default=None, help="CSV file as output")
    return parser.parse_args()


if __name__ == "__main__":
    args = parse_args()
    if args.sync is None or not args.sync:
        args.sync = ["cs", "cv", "mx", "srw"]

    if args.raw is None:
        csv_file = None
    else:
        try:
            csv_file = open(args.raw, "w")
        except OSError:
            raise RuntimeError(f"Failed to open {args.raw}")

        try:
            csv_file.write("transition id, primitive, handle, return value\n")
        except OSError:
            raise RuntimeError(f"Failed to write to {csv_file}")

    rvn = reven2.RevenServer(args.host, args.port)
    rvn.ossi.os().expect(
        reven2.ossi.Os(
            architecture=reven2.ossi.Architecture.X64,
            family=reven2.ossi.OsFamily.Windows,
            windows_version=reven2.ossi.WindowsVersion.Windows10,
        ),
        reven2.ossi.Os(
            architecture=reven2.ossi.Architecture.X64,
            family=reven2.ossi.OsFamily.Windows,
            windows_version=reven2.ossi.WindowsVersion.Windows11,
        ),
    )

    run(rvn, args.pid, args.binary, args.from_id, args.to_id, args.sync, csv_file)