Detecting critical section deadlocks

This notebook checks for potential deadlocks that may occur when using critical sections as the synchronization primitive.

Prerequisites

  • This notebook should be run in a jupyter notebook server equipped with a Reven python kernel. Reven comes with a jupyter notebook server accessible with the Open Python button in the Analyze page of any scenario.
  • The following resources must be replayed for the analyzed scenario:
  • Trace
  • OSSI
  • Fast Search

Limits

  • Only support RtlEnterCriticalSection and RtlLeaveCriticalSection as the lock (resp. unlock) primitive.
  • Locks and unlocks must be not nested: a critical section can be locked and (or) unlocked then, but it must not be relocked multiple times, e.g.
  • lock A, lock B, unlock A, lock A => OK
  • lock A, lock B, lock A => not OK (since A is locked again)

Running

Fill out the parameters in the Parameters cell below, then run all the cells of this notebook.

Source

# ---
# jupyter:
#   jupytext:
#     formats: ipynb,py:percent
#     text_representation:
#       extension: .py
#       format_name: percent
#   kernelspec:
#     display_name: reven
#     language: python
#     name: reven-python3
# ---

# %% [markdown]
# # Detecting critical section deadlocks
# This notebook checks for potential deadlocks that may occur when using critical sections as the synchronization
# primitive.
#
# ## Prerequisites
#  - This notebook should be run in a jupyter notebook server equipped with a Reven python kernel.
# Reven comes with a jupyter notebook server accessible with the `Open Python` button
# in the `Analyze` page of any scenario.
#  - The following resources must be replayed for the analyzed scenario:
#     - Trace
#     - OSSI
#     - Fast Search
#
# ## Limits
#  - Only support `RtlEnterCriticalSection` and `RtlLeaveCriticalSection` as the lock (resp. unlock) primitive.
#  - Locks and unlocks must be not nested: a critical section can be locked and (or) unlocked then, but it must not be
# relocked multiple times, e.g.
#     - lock A, lock B, unlock A, lock A => OK
#     - lock A, lock B, lock A => not OK (since A is locked again)
#
# ## Running
# Fill out the parameters in the [Parameters cell](#Parameters) below, then run all the cells of this notebook.

# %%
# For Python's type annotation
from typing import Dict, Iterator, List, Optional, Set, Tuple

# Reven specific
import reven2
from reven2.address import LogicalAddress
from reven2.arch import x64
from reven2.trace import Context, Trace, Transition
from reven2.util import collate

# %% [markdown]
# # Parameters

# %%
# Host and port of the running the scenario
host = '127.0.0.1'
port = 35083


# The PID and (or) the name of the binary of interest: if the binary name is given (not None),
# only locks and unlocks called directly from the binary are counted.
pid = 2044
binary = None

# The begin and end transition numbers between them the deadlock detection processes.
begin_trans_id = None  # if None, start from the first transition of the trace
end_trans_id = None    # if None, stop at the last transition


# %%
# Helper class which wraps Reven's runtime objects and give methods helping get information about calls to
# RtlEnterCriticalSection and RtlLeaveCriticalSection
class RuntimeHelper:
    def __init__(self, host: str, port: int):
        try:
            server = reven2.RevenServer(host, port)
        except RuntimeError:
            raise RuntimeError(f'Cannot connect to the scenario at {host}:{port}')

        self.trace = server.trace
        self.ossi = server.ossi
        self.search_symbol = self.trace.search.symbol
        self.search_binary = self.trace.search.binary

        try:
            ntdll = next(self.ossi.executed_binaries('^c:/windows/system32/ntdll.dll$'))
        except StopIteration:
            raise RuntimeError('ntdll.dll not found')

        try:
            self.__rtl_enter_critical_section = next(ntdll.symbols("^RtlEnterCriticalSection$"))
            self.__rtl_leave_critical_section = next(ntdll.symbols("^RtlLeaveCriticalSection$"))
        except StopIteration:
            raise RuntimeError('Rtl(Enter|Leave)CriticalSection symbol not found')

    def get_critical_section_locks(self, from_context: Context, to_context: Context) -> Iterator[Context]:
        # For ease, if the from_context is the first context of the trace and also the beginning of a lock,
        # then omit it since the caller is unknown
        if from_context == self.trace.first_context:
            if from_context == self.trace.last_context:
                return
            else:
                from_context = from_context + 1

        for ctxt in self.search_symbol(self.__rtl_enter_critical_section, from_context, to_context):
            # Any context correspond to the entry of RtlEnterCriticalSection, since the first context
            # does not count, decrease by 1 is safe.
            yield ctxt - 1

    def get_critical_section_unlocks(self, from_context: Context, to_context: Context) -> Iterator[Context]:
        # For ease, if the from_context is the first context of the trace and also the beginning of an unlock,
        # then omit it since the caller is unknown
        if from_context == self.trace.first_context:
            if from_context == self.trace.last_context:
                return
            else:
                from_context = from_context + 1

        for ctxt in self.search_symbol(self.__rtl_leave_critical_section, from_context, to_context):
            # Any context correspond to the entry of RtlLeaveCriticalSection, since the first context
            # does not count, decrease by 1 is safe.
            yield ctxt - 1

    @staticmethod
    def get_critical_section_handle(ctxt: Context) -> int:
        return ctxt.read(x64.rcx)

    @staticmethod
    def thread_id(ctxt: Context) -> int:
        return ctxt.read(LogicalAddress(0x48, x64.gs), 4)

    @staticmethod
    def is_kernel_mode(ctxt: Context) -> bool:
        return ctxt.read(x64.cs) & 0x3 == 0


# %%
# Find the lower/upper bound of contexts on which the deadlock detection processes
def find_begin_end_context(sco: RuntimeHelper, pid: int, binary: Optional[str],
                           begin_id: Optional[int], end_id: Optional[int]) -> Tuple[Context, Context]:
    begin_ctxt = None
    if begin_id is not None:
        try:
            begin_trans = sco.trace.transition(begin_id)
            begin_ctxt = begin_trans.context_after()
        except IndexError:
            begin_ctxt = None

    if begin_ctxt is None:
        if binary is not None:
            for name in sco.ossi.executed_binaries(binary):
                for ctxt in sco.search_binary(name):
                    ctx_process = ctxt.ossi.process()
                    assert ctx_process is not None

                    if ctx_process.pid == pid:
                        begin_ctxt = ctxt
                        break
                if begin_ctxt is not None:
                    break

    if begin_ctxt is None:
        begin_ctxt = sco.trace.first_context

    end_ctxt = None
    if end_id is not None:
        try:
            end_trans = sco.trace.transition(end_id)
            end_ctxt = end_trans.context_before()
        except IndexError:
            end_ctxt = None

    if end_ctxt is None:
        end_ctxt = sco.trace.last_context

    if (end_ctxt <= begin_ctxt):
        raise RuntimeError("The begin transition must be smaller than the end.")

    return (begin_ctxt, end_ctxt)


# Get all execution contexts of a given process
def find_process_ranges(sco: RuntimeHelper, pid: int, first_ctxt: Context, last_context: Context) \
        -> Iterator[Tuple[Context, Context]]:
    ctxt_low = first_ctxt
    ctxt_high: Optional[Context] = None

    while True:
        current_process = ctxt_low.ossi.process()
        assert current_process is not None

        current_pid = current_process.pid
        ctxt_high = ctxt_low.find_register_change(x64.cr3, is_forward=True)

        if ctxt_high is None:
            if current_pid == pid:
                if ctxt_low < last_context - 1:
                    yield (ctxt_low, last_context - 1)
            break

        if ctxt_high >= last_context:
            break

        if current_pid == pid:
            yield (ctxt_low, ctxt_high)

        ctxt_low = ctxt_high + 1


# Start from a transition, return the first transition that is not a non-instruction, or None if there isn't one.
def ignore_non_instructions(trans: Transition, trace: Trace) -> Optional[Transition]:
    while trans.instruction is None:
        if trans == trace.last_transition:
            return None
        trans = trans + 1

    return trans


# Extract user mode only context ranges from a context range (which may include also kernel mode ranges)
def find_usermode_ranges(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context) \
        -> Iterator[Tuple[Context, Context]]:
    trans = ignore_non_instructions(ctxt_low.transition_after(), sco.trace)
    if trans is None:
        return
    ctxt_current = trans.context_before()

    while ctxt_current < ctxt_high:
        ctxt_next = ctxt_current.find_register_change(x64.cs, is_forward=True)

        if not RuntimeHelper.is_kernel_mode(ctxt_current):
            if ctxt_next is None or ctxt_next > ctxt_high:
                yield (ctxt_current, ctxt_high)
                break
            else:
                # It's safe to decrease ctxt_next by 1 because it was obtained from a forward find_register_change
                yield (ctxt_current, ctxt_next - 1)

        if ctxt_next is None:
            break

        ctxt_current = ctxt_next


# Get user mode only execution contexts of a given process
def find_process_usermode_ranges(trace: RuntimeHelper, pid: int, first_ctxt: Context, last_ctxt: Context) \
        -> Iterator[Tuple[Context, Context]]:
    for (ctxt_low, ctxt_high) in find_process_ranges(trace, pid, first_ctxt, last_ctxt):
        usermode_ranges = find_usermode_ranges(trace, ctxt_low, ctxt_high)
        for usermode_range in usermode_ranges:
            yield usermode_range


def context_is_in_binary(ctxt: Optional[Context], binary: Optional[str]) -> bool:
    if ctxt is None:
        return False

    if binary is None:
        return True

    ctxt_loc = ctxt.ossi.location()
    if ctxt_loc is None:
        return False

    ctxt_binary = ctxt_loc.binary
    if ctxt_binary is not None:
        return (binary in [ctxt_binary.name, ctxt_binary.filename, ctxt_binary.path])

    return False


# Get locks (i.e. RtlEnterCriticalSection) called by the binary in a range of context (defined by ctxt_low and
# ctxt_high).
# Note that for a process (of a binary), there are also locks called by libraries loaded by PE loader, such calls
# are considered "uninteresting" if the binary name is given.
def get_in_binary_locks(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context, binary: Optional[str]) \
        -> Iterator[Context]:
    for ctxt in sco.get_critical_section_locks(ctxt_low, ctxt_high):
        if context_is_in_binary(ctxt, binary):
            yield ctxt


# Get unlocks (i.e. RtlLeaveCriticalSection) called by the binary in a range of context (defined by ctxt_low and
# ctxt_high).
# Note that for a process (of a binary), there are also unlocks called by libraries loaded by PE loader, such calls
# are considered "uninteresting" if the binary name is given.
def get_in_binary_unlocks(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context, binary: Optional[str]) \
        -> Iterator[Context]:
    for ctxt in sco.get_critical_section_unlocks(ctxt_low, ctxt_high):
        if context_is_in_binary(ctxt, binary):
            yield ctxt


# Sort lock and unlock contexts (called in a range of contexts) in a correct order.
# Return a generator of pairs (bool, Context): True for lock, False for unlock.
def get_in_binary_locks_unlocks(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context, binary: Optional[str]) \
        -> Iterator[Tuple[bool, Context]]:
    def generate_locks():
        for ctxt in get_in_binary_locks(sco, ctxt_low, ctxt_high, binary):
            yield (True, ctxt)

    def generate_unlocks():
        for ctxt in get_in_binary_unlocks(sco, ctxt_low, ctxt_high, binary):
            yield (False, ctxt)

    return collate([generate_locks(), generate_unlocks()], key=lambda bool_context: bool_context[1])


# Generate all locks and unlocks called by the binary
def get_thread_usermode_in_binary_locks_unlocks(sco: RuntimeHelper, ranges: List[Tuple[Context, Context]],
                                                binary: Optional[str]) -> Iterator[Tuple[bool, Context]]:
    for (ctxt_low, ctxt_high) in ranges:
        for lock_unlock in get_in_binary_locks_unlocks(sco, ctxt_low, ctxt_high, binary):
            yield lock_unlock


# %%
# Check for RAII in locks/unlocks (i.e. critical sections are unlocked in reverse order of lock), warn if RAII is
# violated as it is good practice.
#
# In synchronization with critical sections, RAII is an idiomatic technique used to make the lock order consistent,
# then avoid deadlock in case of there is a total order of locks between threads. For example: the following threads
# where the lock/unlock of critical sections A and B follows RAII
#  - thread 0: lock A, lock B, unlock B, unlock A, lock A, lock B
#  - thread 1: lock A, lock B, unlock B, unlock A, lock A, lock B
# are deadlock free. But
#  - thread 0: lock A, lock B, unlock A, lock A, unlock B
#  - thread 1: lock A, lock B, unlock A, lock A, unlock B
# have deadlock. Let's consider the interleaving:
#  - lock A, lock B, unlock A (thread 0), lock A (thread 1)
# now thread 1 try to lock B (but it cannot since B is already locked by thread 0), thread 0 cannot unlock B neither
# since it needs to lock A first (but it cannot since A is already locked by thread 1).
#
# Note that RAII cannot guarantee the deadlock free synchronization if the condition about single total order of
# critical sections is not satisfied.
def check_intrathread_lock_unlock_matching(trace: RuntimeHelper, thread_ranges: List[Tuple[Context, Context]],
                                           binary: Optional[str]):
    locks_unlocks = get_thread_usermode_in_binary_locks_unlocks(trace, thread_ranges, binary)

    # lock/unlock should (though not obliged) follow RAII
    corresponding_lock_stack: List[Context] = []
    mismatch_lock_unlock_pcs: Set[Tuple[int, int]] = set()
    mismatch_unlocks_pcs: Set[int] = set()
    ok = True
    for (is_lock, ctxt) in locks_unlocks:
        if is_lock:
            # push lock context
            corresponding_lock_stack.append(ctxt)
        else:
            if corresponding_lock_stack:
                last_lock_ctxt = corresponding_lock_stack[-1]

                last_lock_handle = RuntimeHelper.get_critical_section_handle(last_lock_ctxt)
                current_unlock_handle = RuntimeHelper.get_critical_section_handle(ctxt)

                if last_lock_handle == current_unlock_handle:
                    # lock and unlock on the same critical section
                    corresponding_lock_stack.pop()
                else:
                    # It's safe to decrease by 1 since the first context of the trace is never counted as a lock
                    # nor unlock (c.f. RuntimeHelper::get_critical_section_(locks|unlocks)).
                    in_binary_lock_pc = last_lock_ctxt.read(x64.rip)
                    in_binary_unlock_pc = ctxt.read(x64.rip)

                    if (in_binary_lock_pc, in_binary_unlock_pc) in mismatch_lock_unlock_pcs:
                        continue

                    mismatch_lock_unlock_pcs.add((in_binary_lock_pc, in_binary_unlock_pc))

                    print(f'Warning:\n\t#{last_lock_ctxt.transition_after().id}: lock at 0x{in_binary_lock_pc:x} \
(on critical section handle 0x{last_lock_handle:x}) followed by\n\t#{ctxt.transition_after().id}: \
unlock at 0x{in_binary_unlock_pc:x} (on different critical section handle 0x{current_unlock_handle:x})')

                    ok = False
            else:
                in_binary_unlock_pc = ctxt.read(x64.rip)
                if in_binary_unlock_pc in mismatch_unlocks_pcs:
                    continue

                mismatch_unlocks_pcs.add(in_binary_unlock_pc)

                print(f'Warning:\n\t#{ctxt.transition_after().id}: unlock at  \
(on 0x{current_unlock_handle:x}) without any lock')
                ok = False
    if ok:
        print('OK')


# Build a dependency graph of locks: a lock A is followed by a lock B if A is still locked when B is locked.
# For example:
#  - lock A, lock B => A followed by B
#  - lock A, unlock A, lock B => A is not followed by B (since A is already unlocked when B is locked)
def build_locks_unlocks_order_graph_next(locks_unlocks: Iterator[Tuple[bool, Context]]) \
        -> Tuple[Dict[int, List[int]], Dict[Tuple[int, int], List[Tuple[Context, Context]]]]:

    order_graph: Dict[int, List[int]] = {}
    order_graph_label: Dict[Tuple[int, int], List[Tuple[Context, Context]]] = {}

    lock_stack: List[Context] = []
    for (is_lock, ctxt) in locks_unlocks:
        if not lock_stack:
            if is_lock:
                lock_stack.append(ctxt)
            continue

        current_lock_unlock_handle = RuntimeHelper.get_critical_section_handle(ctxt)
        current_lock_unlock_threadid = RuntimeHelper.thread_id(ctxt)
        if not is_lock:
            # looking for the last lock in the stack
            i = len(lock_stack) - 1
            while True:
                lock_handle_i = RuntimeHelper.get_critical_section_handle(lock_stack[i])
                lock_threadid_i = RuntimeHelper.thread_id(lock_stack[i])
                if lock_handle_i == current_lock_unlock_handle and lock_threadid_i == current_lock_unlock_threadid:
                    del lock_stack[i]
                    break
                if i == 0:
                    break
                i -= 1
            continue

        last_lock_ctxt = lock_stack[-1]
        # check of the last lock and the current lock are in the same thread
        if RuntimeHelper.thread_id(last_lock_ctxt) == RuntimeHelper.thread_id(ctxt):
            # create the edge: last_lock -> current_lock
            last_lock_handle = RuntimeHelper.get_critical_section_handle(last_lock_ctxt)
            if last_lock_handle not in order_graph:
                order_graph[last_lock_handle] = []
            order_graph[last_lock_handle].append(current_lock_unlock_handle)

            # create (or update) the label of the edge
            if (last_lock_handle, current_lock_unlock_handle) not in order_graph_label:
                order_graph_label[(last_lock_handle, current_lock_unlock_handle)] = []
            order_graph_label[(last_lock_handle, current_lock_unlock_handle)].append((last_lock_ctxt, ctxt))

        lock_stack.append(ctxt)

    return (order_graph, order_graph_label)


# Check if there are cycles in the lock dependency graph, such a cycle is considered a potential deadlock.
def check_order_graph_cycle(graph: Dict[int, List[int]], labels: Dict[Tuple[int, int], List[Tuple[Context, Context]]]):
    def dfs(path: List[Tuple[int, int, int]], starting_node: int, visited_nodes: Set[int]) \
            -> Optional[List[List[Tuple[int, int, int]]]]:
        if starting_node not in graph:
            return None

        next_nodes_to_visit = set(graph[starting_node]) - visited_nodes
        if not next_nodes_to_visit:
            return None

        nodes_on_path = set()
        tids_on_path = set()
        for (hd, tl, tid) in path:
            nodes_on_path.add(hd)
            nodes_on_path.add(tl)
            tids_on_path.add(tid)

        # check if we can build a cycle of locks by trying visiting a node
        back_nodes = next_nodes_to_visit & nodes_on_path
        for node in back_nodes:
            back_node = node
            tids_starting_back = set()
            for (ctxt_hd, _) in labels[(starting_node, back_node)]:
                tids_starting_back.add(RuntimeHelper.thread_id(ctxt_hd))

            # sub-path starting from the back-node to the starting-node
            sub_path = []
            sub_path_tids = set()
            for (hd, tl, tid) in path:
                if hd == node:
                    sub_path.append((hd, tl, tid))
                    sub_path_tids.add(tid)
                    node = tl

                if tl == starting_node:
                    diff_tids = tids_starting_back - sub_path_tids
                    # there is an edge whose TID is not on the sub-path yet
                    cycles = []
                    if diff_tids:
                        for tid in diff_tids:
                            for (ctxt_hd, _) in labels[(starting_node, back_node)]:
                                if RuntimeHelper.thread_id(ctxt_hd) == tid:
                                    sub_path.append((starting_node, back_node, tid))
                                    cycles.append(sub_path)
                                    break
                        return cycles
                    else:
                        return None

        for next_node in next_nodes_to_visit:
            tids = set()
            for (ctxt_hd, _,) in labels[(starting_node, next_node)]:
                tid = RuntimeHelper.thread_id(ctxt_hd)
                tids.add(tid)

            tids_to_visit = tids - tids_on_path
            if tids_to_visit:
                for tid_to_visit in tids_to_visit:
                    for (ctxt_hd, _) in labels[(starting_node, next_node)]:
                        if RuntimeHelper.thread_id(ctxt_hd) == tid_to_visit:
                            next_path = path
                            next_path.append((starting_node, next_node, tid))
                            visited_nodes.add(next_node)
                            some_cycles = dfs(next_path, next_node, visited_nodes)
                            if some_cycles is not None:
                                return some_cycles

        return None

    def compare_cycles(c0: List[Tuple[int, int, int]], c1: List[Tuple[int, int, int]]) -> bool:
        if len(c0) != len(c1):
            return False

        if len(c0) == 0:
            return True

        def circular_generator(c: List[Tuple[int, int, int]], e: Tuple[int, int, int]) \
                -> Optional[Iterator[Tuple[int, int, int]]]:
            clen = len(c)
            i = 0
            for elem in c:
                if elem == e:
                    while True:
                        yield c[i]
                        i = i + 1
                        if i == clen:
                            i = 0
                i = i + 1

            return None

        c0_gen = circular_generator(c0, c0[0])
        c1_gen = circular_generator(c1, c0[0])

        if c1_gen is None or c0_gen is None:
            return False

        i = 0
        while i < len(c0):
            e0 = next(c0_gen)
            e1 = next(c1_gen)
            if e0 != e1:
                return False
            i = i + 1

        return True

    ok = True
    distinct_cycles: List[List[Tuple[int, int, int]]] = []
    for node in graph:
        cycles = dfs([], node, set())
        if cycles is None or not cycles:
            continue

        for cycle in cycles:
            duplicated = False
            if not distinct_cycles:
                duplicated = False
            else:
                for ec in distinct_cycles:
                    if compare_cycles(ec, cycle):
                        duplicated = True
                        break
            if duplicated:
                continue

            distinct_cycles.append(cycle)

            print('Potential deadlock(s):')
            for (node, next_node, tid) in cycle:
                label = labels[(node, next_node)]
                distinct_labels: Set[Tuple[int, int]] = set()
                for (node_ctxt, next_node_ctxt) in label:
                    in_binary_lock_pc = (node_ctxt - 1).read(x64.rip)
                    in_binary_next_lock_pc = (next_node_ctxt - 1).read(x64.rip)

                    if (in_binary_lock_pc, in_binary_next_lock_pc) in distinct_labels:
                        continue

                    distinct_labels.add((in_binary_lock_pc, in_binary_next_lock_pc))

                    print(f'\t#{node_ctxt.transition_after().id}: lock at 0x{in_binary_lock_pc:x} \
(on thread {RuntimeHelper.thread_id(node_ctxt)}, critical section handle \
0x{RuntimeHelper.get_critical_section_handle(node_ctxt):x}) followed by\n\t\
#{next_node_ctxt.transition_after().id}: lock at 0x{in_binary_next_lock_pc:x} \
(on thread {RuntimeHelper.thread_id(next_node_ctxt)}, \
critical section handle 0x{RuntimeHelper.get_critical_section_handle(next_node_ctxt):x})')
                    ok = False
                print(
                    '\t=============================================================='
                )
    if ok:
        print('Not found.')
    return None


# Get user mode locks (and unlocks) of called by the binary, build the dependency graph, then check the cycles.
def check_lock_cycle(trace: RuntimeHelper, threads_ranges: List[Tuple[Context, Context]], binary: Optional[str]):
    locks_unlocks = get_thread_usermode_in_binary_locks_unlocks(trace, threads_ranges, binary)
    (order_graph, order_graph_labels) = build_locks_unlocks_order_graph_next(locks_unlocks)
    check_order_graph_cycle(order_graph, order_graph_labels)


# Combination of checking lock/unlock RAII and deadlock
def detect_deadlocks(trace: RuntimeHelper, pid: int, binary: Optional[str],
                     first_id: Optional[int], last_id: Optional[int]):
    (first_context, last_context) = find_begin_end_context(trace, pid, binary, first_id, last_id)
    process_ranges = list(find_process_usermode_ranges(trace, pid, first_context, last_context))
    thread_ranges: Dict[int, List[Tuple[Context, Context]]] = {}

    for (ctxt_low, ctxt_high) in process_ranges:
        tid = RuntimeHelper.thread_id(ctxt_low)
        if tid not in thread_ranges:
            thread_ranges[tid] = []
        thread_ranges[tid].append((ctxt_low, ctxt_high))

    for (tid, ranges) in thread_ranges.items():
        print('\n============ checking lock/unlock matching on thread {} ============'.format(tid))
        check_intrathread_lock_unlock_matching(trace, ranges, binary)

    print('\n\n============ checking potential deadlocks on process ============')
    check_lock_cycle(trace, process_ranges, binary)


# %%
trace = RuntimeHelper(host, port)
detect_deadlocks(trace, pid, binary, begin_trans_id, end_trans_id)