Find symbols that access a specific memory range

Purpose

This notebook and script are designed to find all symbols that access a specific memory range. This script searches a Reven trace for all symbols that accessed a specific memory range. The script can filter the results by processes, ring, included binaries, excluded binaries, excluded symbols, context range and memory access operation. The script can generate two kinds of results:

  • process, binary and symbol information for each memory access.
  • for each symbol, all the memory accesses that occurred in that symbol. Note that this option can take long time to start showing results, especially when there is many nested functions or many functions that don't end in the trace. Note that:
  • accesses will be reported as belonging to the innermost symbol that has not been excluded and whose binary has not been excluded in the configuration.
  • we consider that we are "in a symbol" when the corresponding context.location.symbol returns this symbol. Reven returns the closest symbol with an rva lower than ours. Note that we are not trying to determine the exact bounds of the function with that symbol for name. In particular, when there are missing symbols, this may report a symbol we saw a long time ago rather than

How to use

Results can be generated from this notebook or from the command line. The script can also be imported as a module for use from your own script or notebook.

From the notebook

  1. Upload the symbols_access_memory_range.ipynb file in Jupyter.
  2. Fill out the parameters cell of this notebook according to your scenario and desired output.
  3. Run the full notebook.

From the command line

  1. Make sure that you are in an environment that can run Reven scripts.
  2. Run python symbols_access_memory_range.py --help to get a tour of available arguments.
  3. Run python symbols_access_memory_range.py --host <your_host> --port <your_port> [<other_option>] with your arguments of choice.

Imported in your own script or notebook

  1. Make sure that you are in an environment that can run Reven scripts.
  2. Make sure that symbols_access_memory_range.py is in the same directory as your script or notebook.
  3. Add import symbols_access_memory_range to your script or notebook. You can access the various functions and classes exposed by the module from the symbols_access_memory_range namespace.
  4. Refer to the Argument parsing cell for an example of use in a script, and to the Parameters cell and below for an example of use in a notebook (you just need to preprend symbols_access_memory_range in front of the functions and classes from the script).

Known limitations

N/A.

Supported versions

Reven 2.10+

Supported perimeter

Any Reven scenario.

Dependencies

The script requires that the target Reven scenario have:

  • The OSSI feature replayed.
  • The memory history feature replayed.
  • pandas python module

Source

# ---
# jupyter:
#   jupytext:
#     formats: ipynb,py:percent
#     text_representation:
#       extension: .py
#       format_name: percent
#   kernelspec:
#     display_name: reven
#     language: python
#     name: reven-python3
# ---

# %% [markdown]
# # Find symbols that access a specific memory range
#
# ## Purpose
#
# This notebook and script are designed to find all symbols that access a specific memory range.
#
# This script searches a Reven trace for all symbols that accessed a specific memory range.
# The script can filter the results by processes, ring, included binaries, excluded binaries, excluded
# symbols, context range and memory access operation.
#
# The script can generate two kinds of results:
# - process, binary and symbol information for each memory access.
# - for each symbol, all the memory accesses that occurred in that symbol.
#   Note that this option can take long time to start showing results,
#   especially when there is many nested functions or many functions that don't end in the trace.
#
# Note that:
# - accesses will be reported as belonging to the innermost symbol that has not been excluded
#   and whose binary has not been excluded in the configuration.
# - we consider that we are "in a symbol" when the corresponding context.location.symbol returns this symbol.
#   Reven returns the closest symbol with an `rva` lower than ours. Note that we are not trying to determine the
#   exact bounds of the function with that symbol for name. In particular, when there are missing symbols,
#   this may report a symbol we saw a long time ago rather than <unknown>
#
#
#
# ## How to use
#
# Results can be generated from this notebook or from the command line.
# The script can also be imported as a module for use from your own script or notebook.
#
#
# ### From the notebook
#
# 1. Upload the `symbols_access_memory_range.ipynb` file in Jupyter.
# 2. Fill out the [parameters](#Parameters) cell of this notebook according to your scenario and desired output.
# 3. Run the full notebook.
#
#
# ### From the command line
#
# 1. Make sure that you are in an environment that can run Reven scripts.
# 2. Run `python symbols_access_memory_range.py --help` to get a tour of available arguments.
# 3. Run `python symbols_access_memory_range.py --host <your_host> --port <your_port> [<other_option>]` with your
# arguments of choice.
#
# ### Imported in your own script or notebook
#
# 1. Make sure that you are in an environment that can run Reven scripts.
# 2. Make sure that `symbols_access_memory_range.py` is in the same directory as your script or notebook.
# 3. Add `import symbols_access_memory_range` to your script or notebook. You can access the various functions and
#    classes exposed by the module from the `symbols_access_memory_range` namespace.
# 4. Refer to the [Argument parsing](#Argument-parsing) cell for an example of use in a script, and to the
#    [Parameters](#Parameters) cell and below for an example of use in a notebook (you just need to preprend
#    `symbols_access_memory_range` in front of the functions and classes from the script).
#
# ## Known limitations
#
# N/A.
#
# ## Supported versions
#
# Reven 2.10+
#
# ## Supported perimeter
#
# Any Reven scenario.
#
# ## Dependencies
#
# The script requires that the target Reven scenario have:
#
# * The OSSI feature replayed.
# * The memory history feature replayed.
# * pandas python module

# %% [markdown]
# ### Package imports

# %%
import argparse
from enum import Enum
from typing import Iterable as _Iterable, List
from typing import Optional as _Optional
from typing import cast as _cast

from IPython.core.display import display  # type: ignore

import pandas

import reven2.address as _address
import reven2.arch as _arch
from reven2.filter import RingPolicy
from reven2.memhist import MemoryAccess, MemoryAccessOperation
from reven2.memory_range import MemoryRange
from reven2.ossi import Binary, Process, Symbol
from reven2.ossi.thread import Thread
from reven2.prelude import RevenServer
from reven2.stack import Stack
from reven2.trace import Context, Trace
from reven2.util import collate as _collate


# %% [markdown]
# ### Utility functions


# %%
# Detect if we are currently running a Jupyter notebook.
#
# This is used e.g. to display rendered results inline in Jupyter when we are executing in the context of a Jupyter
# notebook, or to display raw results on the standard output when we are executing in the context of a script.
def in_notebook():
    try:
        from IPython import get_ipython  # type: ignore

        if get_ipython() is None or ("IPKernelApp" not in get_ipython().config):
            return False
    except ImportError:
        return False
    return True


# %% [markdown]
# ### Helper classes for results


# %%
class CallSymbol:
    r"""
    CallSymbol is a helper class used to represent a symbol with its start and end context
    """

    def __init__(self, symbol: _Optional[Symbol], start: Context, end: _Optional[Context] = None) -> None:
        self._symbol = symbol
        self._start = start
        self._end = end

    @property
    def symbol(self) -> _Optional[Symbol]:
        r"""
        B{Property:} The symbol of the call symbol. None if the symbol is unknown.
        """
        return self._symbol

    @property
    def start_context(self) -> Context:
        r"""
        B{Property:} The start context of the call symbol.
        """
        return self._start

    @property
    def end_context(self) -> _Optional[Context]:
        r"""
        B{Property:} The end excluded context of the call symbol. None if the end context isn't in the trace.
        """
        return self._end

    def __eq__(self, other: "CallSymbol") -> bool:  # type: ignore
        return self._symbol == other._symbol and self._start == other._start and self._end == other._end

    def __ne__(self, other: "CallSymbol") -> bool:  # type: ignore
        return not (self == other)


class MemoryRangeSymbolResult:
    r"""
    MemoryRangeSymbolResult is a helper class that represents one result of the search.
    """

    def __init__(
        self,
        call_symbol: CallSymbol,
        memory_access: _Optional[MemoryAccess],
        ring: int,
        process: _Optional[Process],
        thread: _Optional[Thread],
        binary: _Optional[Binary],
    ) -> None:
        self._call_symbol = call_symbol
        self._memory_accesses = [] if memory_access is None else [memory_access]
        self._ring = ring
        self._process = process
        self._thread = thread
        self._binary = binary

    def copy(self) -> "MemoryRangeSymbolResult":
        r"""
        return a copy of this object

        it makes a shallow copy of all attributes except for memory accesses where the list is deeply copied
        """
        new_obj = MemoryRangeSymbolResult(
            call_symbol=self._call_symbol,
            memory_access=None,
            ring=self._ring,
            process=self._process,
            thread=self._thread,
            binary=self._binary,
        )
        if self._memory_accesses is not None:
            new_obj._memory_accesses += self._memory_accesses
        return new_obj

    @property
    def call_symbol(self) -> CallSymbol:
        r"""
        B{Property:} The call symbol of the result.
        """
        return self._call_symbol

    @property
    def memory_accesses(self) -> List[MemoryAccess]:
        r"""
        B{Property:} The memory accesses of the result.
        """
        return self._memory_accesses

    @property
    def ring(self) -> int:
        r"""
        B{Property:} The ring of the result.
        """
        return self._ring

    @property
    def process(self) -> _Optional[Process]:
        r"""
        B{Property:} The process of the result.
        """
        return self._process

    @property
    def binary(self) -> _Optional[Binary]:
        r"""
        B{Property:} The binary of the result. None if the binary is unknown.
        """
        return self._binary

    @property
    def thread(self) -> _Optional[Thread]:
        r"""
        B{Property:} The thread of the result.
        """
        return self._thread

    def __eq__(self, other: "MemoryRangeSymbolResult") -> bool:  # type: ignore
        return (
            self._ring == other._ring
            and self._process is not None
            and other._process is not None
            and self._process.name == other._process.name
            and self._process.pid == other._process.pid
            and self._process.ppid == other._process.ppid
            and self._thread is not None
            and other._thread is not None
            and self._thread.id == other._thread.id
            and self._thread.owner_process_id == other._thread.owner_process_id
            and (
                (self._binary is None and other._binary is None)
                or (self._binary is not None and other._binary is not None and self._binary.path == other._binary.path)
            )
            and self._call_symbol == other._call_symbol
        )

    def __ne__(self, other: "MemoryRangeSymbolResult") -> bool:  # type: ignore
        return not (self == other)

    def __str__(self) -> str:
        memory_accesses = "\nmemory accesses:"
        for m in self._memory_accesses:
            memory_accesses += f"\n\t{m}, "
        memory_accesses += "\n"

        return (
            f"ring: {self._ring}, process: {self._process}, "
            f"thread: {self._thread}, binary: {self._binary}, "
            f"symbol: {self._call_symbol.symbol}[{self._call_symbol.start_context}, "
            f"{self._call_symbol.end_context}[ {memory_accesses}"
        )

    def format_as_html(self):
        r"""
        This method gets an html formatting string representation for this class instance.

        Information
        ===========
        @returns: C{String}
        """
        memory_accesses = "<p>memory accesses:</p><ol>"
        for m in self._memory_accesses:
            memory_accesses += f"<li>{m.format_as_html()}</li>"
        memory_accesses += "</ol>"

        return (
            f"ring: {self._ring}, process: {self._process if self._process is not None else 'unknown'}, "
            f" thread: {self._thread if self._thread is not None else 'unknown'}, binary: {self._binary}, "
            f"symbol: {self._call_symbol.symbol}[{self._call_symbol.start_context}, "
            f"{self._call_symbol.end_context}[ {memory_accesses}"
        )

    def _repr_html_(self):
        r"""
        Representation used by Jupyter Notebook when an instance of the this class is displayed in a cell.
        """
        return "<p>{}</p>".format(self.format_as_html())


# %% [markdown]
# ### MemoryRangeSymbolFinder
#
# This class represents the main logic of this script


# %%
class MemoryRangeSymbolFinder(object):
    r"""
        This class is a helper class to search for all symbols that access a specific memory range.
        Results can be filtered by processes, ring, binaries, excluded binaries, excluded symbols
        and a context range.

        The symbols that access this memory range are returned.

        Examples
        ========

        >>> # Search all symbols that access the memory range [ds:0xfffff8800115e180 ; 128]
        >>> # filtered by the process `svchost.exe` at the context #410545055.
        >>> processes = server.ossi.executed_processes('svchost.exe')
        >>> memory_range = MemoryRange::from_string("[ds:0xfffff8800115e180 ; 128]")
        >>> context = server.trace.context_before(410545055)
        >>> symbol_mem_finder = MemoryRangeSymbolFinder(
        ...     trace=server.trace, memory_range=memory_range,
        ...     context=context, processes=processes)
        >>> for r in symbol_mem_finder.query():
        ...     print(r)
        ring: 0, process: svchost.exe (1004), thread: 2256, binary: c:/windows/system32/drivers/cng.sys,
    symbol: cng!AesCbcDecrypt[Context before #25208343, Context before #25212457[
    memory accesses:
        [#25208488 xor r8d, dword ptr ds:[r11+rax*4+0x800]]Read access at
    @phy:0x36411e0 (virtual address: lin:0xfffff8800115e1e0) of size 4,
        ...

    """

    def __init__(
        self,
        trace: Trace,
        memory_range: MemoryRange,
        translation_context: _Optional[Context] = None,
        from_context: _Optional[Context] = None,
        to_context: _Optional[Context] = None,
        ring_policy: RingPolicy = RingPolicy.All,
        processes: _Optional[_Iterable[Process]] = None,
        included_binaries: _Optional[_Iterable[Binary]] = None,
        excluded_binaries: _Optional[_Iterable[Binary]] = None,
        excluded_symbols: _Optional[_Iterable[Symbol]] = None,
        operation: _Optional[MemoryAccessOperation] = None,
    ) -> None:
        r"""
        Initialize a C{MemoryRangeSymbolFinder}

        Information
        ===========

        @param trace: the trace where symbols will be looked for.
        @param memory_range: the memory range that are accessed by the returned symbols.
        @param translation_context: context used to translate the memory range when it is virtual.
        @param to_context: the context where the search will be ended.
        @param ring_policy: ring policy to search for.
        @param processes: processes to limit the search in it. If None, all processes will be filtered.
        @param included_binaries: binaries that must be included in the search.
                                  If None, all binaries will be included.
                                  When binary is not included, all its symbols are ignored with its memory accesses
        @param excluded_binaries: binaries that must be excluded from the search. If None nothing will be excluded.
                                  Accesses performed in this binary are reported, but using the first caller
                                  binary that is not excluded. Note that inclusion is applied before exclusion.
        @param excluded_symbols: symbols that must be excluded from the search. If None nothing will be excluded.
                                 Accesses performed in this symbol are reported, but using the first caller
                                 symbol that is not excluded.
        @param operation: limit results to accesses performing the specified operation.


        @raises TypeError: if trace is not a C{reven2.trace.Trace}.
        @raises ValueError: If provided memory range is virtual and the translation_context is None.
        """
        if not isinstance(trace, Trace):
            raise TypeError("You must provide a valid trace")
        self._trace = trace

        if isinstance(memory_range.address, _address.PhysicalAddress):
            self._physical_memory_ranges = [_cast(MemoryRange[_address.PhysicalAddress], memory_range)]
        elif translation_context is None:
            raise ValueError("You must provide a context for the translation if the memory range is virtual")
        else:
            self._physical_memory_ranges = [mem_range for mem_range in memory_range.translate(translation_context)]

        self._from_context = from_context
        self._to_context = to_context

        self._ring_policy = ring_policy
        self._processes = None if processes is None else [process for process in processes]
        self._included_binaries = None if included_binaries is None else {binary.name for binary in included_binaries}
        self._excluded_binaries = set() if excluded_binaries is None else {binary.name for binary in excluded_binaries}
        self._excluded_symbols = set() if excluded_symbols is None else {symbol.name for symbol in excluded_symbols}
        self._operation = operation

    def filter_by_processes(self, processes: _Iterable[Process]) -> "MemoryRangeSymbolFinder":
        r"""
        Extend the list of processes to limit the search in, and return the self object.

        Information
        ===========

        @param processes: processes to limit the search in.
        @returns : self object
        """
        if self._processes is None:
            self._processes = []
        self._processes += [process for process in processes]

        return self

    def filter_by_ring(self, ring_policy: RingPolicy) -> "MemoryRangeSymbolFinder":
        r"""
        Update the ring policy to search for and return the `self` object.

        Information
        ===========

        @param ring_policy: ring policy to search for.
        @returns : self object
        """
        self._ring_policy = ring_policy
        return self

    def from_context(self, context: Context) -> "MemoryRangeSymbolFinder":
        r"""
        Update the context where the search will be started and return the `self` object.

        Information
        ===========

        @param context: context where the search will be started.
        @returns : self object
        """
        self._from_context = context
        return self

    def to_context(self, context: Context) -> "MemoryRangeSymbolFinder":
        r"""
        Update the context where the search will be ended and return the `self` object.

        Information
        ===========

        @param context: context where the search will be ended.
        @returns : self object
        """
        self._to_context = context
        return self

    def include_bnaries(self, binaries: _Iterable[Binary]) -> "MemoryRangeSymbolFinder":
        r"""
        Extend the list of binaries that must be included in the search and return the `self` object.

        Information
        ===========

        @param binaries: binaries that must be included in the search.
        @returns : self object
        """
        if self._included_binaries is None:
            self._included_binaries = {binary.name for binary in binaries}
        else:
            self._included_binaries.update([binary.name for binary in binaries])
        return self

    def exclude_bnaries(self, binaries: _Iterable[Binary]) -> "MemoryRangeSymbolFinder":
        r"""
        Extend the list of binaries that must be excluded from the search and return the `self` object.

        Information
        ===========

        @param binaries: binaries that must be excluded from the search.
        @returns : self object
        """
        self._excluded_binaries.update([binary.name for binary in binaries])
        return self

    def exclude_symbols(self, symbols: _Iterable[Symbol]) -> "MemoryRangeSymbolFinder":
        r"""
        Extend the list of symbols that must be excluded from the search and return the `self` object.

        Information
        ===========

        @param symbols: symbols that must be excluded from the search.
        @returns : self object
        """
        self._excluded_symbols.update([symbol.name for symbol in symbols])
        return self

    def filter_by_memory_access_operation(
        self, operation: _Optional[MemoryAccessOperation] = None
    ) -> "MemoryRangeSymbolFinder":
        r"""
        Update the memory access operation to limit results to accesses performing this
        operation and return the `self` object.

        Information
        ===========

        @param operation: limit results to accesses performing the specified operation.
        @returns : self object
        """
        self._operation = operation
        return self

    def _is_the_same_stack(self, stack1: Stack, stack2: Stack) -> bool:
        # we assume that two stacks are the same if the first contexts of their first frames are the same

        frame1 = next(stack1.frames())
        frame2 = next(stack2.frames())

        return frame1.first_context == frame2.first_context

    def query(self) -> _Iterable[MemoryRangeSymbolResult]:
        r"""
        Iterate over all filtered contexts and yield symbols.

        Note: the same symbol can be yielded several times with different memory accesses.
        """

        # Make a copy of the variables that can modify the generated results
        operation = self._operation
        included_binaries = None if self._included_binaries is None else self._included_binaries.copy()
        excluded_binaries = self._excluded_binaries.copy()
        excluded_symbols = self._excluded_symbols.copy()

        # store last handled stack to use it if we are in the same stack
        last_stack: _Optional[Stack] = None
        # store last result to use it if we are in the same stack
        last_result: _Optional[MemoryRangeSymbolResult] = None

        # Iterate over all context range filtered by ring, processes, from_context and to_context
        for context_range in self._trace.filter(
            processes=self._processes,
            ring_policy=self._ring_policy,
            from_context=self._from_context,
            to_context=self._to_context,
        ):
            from_transition = (
                context_range.begin.transition_before()
                if context_range.begin == self._trace.last_context
                else context_range.begin.transition_after()
            )
            to_transition = (
                context_range.last.transition_before()
                if context_range.last == self._trace.last_context
                else context_range.last.transition_after()
            )

            # iterate over physical memory range
            iterators = [
                self._trace.memory_accesses(
                    address=memory_range.address,
                    size=memory_range.size,
                    from_transition=from_transition,
                    to_transition=to_transition,
                )
                for memory_range in self._physical_memory_ranges
            ]
            # iterate over all memory accesses in the this range
            for memory_access in _collate(iterators, key=lambda x: x.transition.id):
                # apply filter by operation here instead of in the query, because currently
                # operation-constrained queries are not optimized in the backend
                if operation is not None and operation != memory_access.operation:
                    continue
                # get the stack at this transition
                current_context: Context = memory_access.transition.context_before()

                stack = current_context.stack
                if last_result is not None and last_stack is not None and self._is_the_same_stack(last_stack, stack):
                    # update the memory access of the last result and yield it
                    last_result._memory_accesses = [memory_access]
                    yield last_result
                    continue

                last_stack = stack

                # exclude symbols and binary
                handled_binary = None
                handled_symbol = None
                handled_symbol_found = False
                frames = [frame for frame in stack.frames()]
                frames.reverse()
                for frame in frames:
                    loc = frame.first_context.ossi.location()
                    if loc is not None and (
                        loc.binary.name in excluded_binaries
                        or (loc.symbol is not None and loc.symbol.name in excluded_symbols)
                        or ("unknown" in excluded_symbols)
                    ):
                        break

                    if loc is not None:
                        if loc.binary is not None:
                            handled_binary = loc.binary
                        if loc.symbol is not None:
                            handled_symbol = loc.symbol
                    handled_symbol_found = True

                    first_context = frame.first_context
                    handled_process = frame.first_context.ossi.process()
                    handled_thread = frame.first_context.ossi.thread()

                # ignore symbol if it is in excluded symbols or if its binary in the excluded binaries
                if not handled_symbol_found:
                    continue

                # ignore symbol if its binary isn't in the included binaries
                if (
                    included_binaries is not None
                    and handled_binary is not None
                    and handled_binary.name not in included_binaries
                ):
                    continue

                # get the end of symbol
                end_transition = (
                    first_context.transition_after().step_out()
                    if first_context != self._trace.last_context
                    else first_context.transition_before().step_out()
                )
                end_context = None if end_transition is None else end_transition.context_before()

                # get the ring of the symbol
                handled_ring = first_context.read(_arch.x64.cs) & 0x3

                last_result = MemoryRangeSymbolResult(
                    call_symbol=CallSymbol(handled_symbol, first_context, end_context),
                    memory_access=memory_access,
                    ring=handled_ring,
                    process=handled_process,
                    thread=handled_thread,
                    binary=handled_binary,
                )
                yield last_result

    def group_by_symbol_query(self) -> _Iterable[MemoryRangeSymbolResult]:
        r"""
        Iterate over all filtered contexts and yield symbols.

        Note: each symbol will be yielded only once, with a group of all its memory accesses.
        """
        # Add symbols to a stack and pop it when it is finished
        result_stack = []  # type: List[MemoryRangeSymbolResult]
        for result in self.query():
            if len(result_stack) > 0:
                # firstly, we verify if we can pop the last item from the stack
                # Item will be yielded if its end context isn't None and the current result of
                # the query has a memory access such that the before context of its transition
                # >= the context of the last symbol in the stack
                if (
                    result.call_symbol.end_context is not None
                    # len(result.memory_accesses) > 0 because the results of `query`
                    # contain exactly one memory_access by construction.
                    and result.memory_accesses[0].transition.context_before() >= result.call_symbol.end_context
                ):
                    res = result_stack.pop(-1)
                    yield res

            # Here we observe symbols change,
            # if the symbol is changed (result_stack[-1] != result) we add the new symbol to the stack.
            # (len(result_stack) == 0 is only to handle the case of the first result)
            if len(result_stack) == 0 or result_stack[-1] != result:
                # store a deep copy of the result
                result_stack.append(result.copy())
                continue

            # the symbol didn't change, so we add the memory access of the current result
            # to the last item in the stack
            result_stack[-1]._memory_accesses += result.memory_accesses

        # yield all symbols with None end context
        for result in result_stack:
            yield result


# %% [markdown]
#
# ### OutputType


# %%
class OutputFormat(Enum):
    r"""
    Enum describing the various possible output formats of the results
     - RAW: The results will be output using its string representation.
     - TABLE: The results will be output using pandas table format.
     - CSV: The results will be output as csv.
     - HTML: The results will be output as html table.
    """
    RAW = 0
    TABLE = 1
    CSV = 2
    HTML = 3


# %% [markdown]
# ### Main function
#
# This function is called with parameters from the [Parameters](#Parameters) cell in the notebook context,
# or with parameters from the command line in the script context.


# %%
def symbols_access_memory_range(
    server: RevenServer,
    memory_range: MemoryRange,
    context: _Optional[int],
    from_context: _Optional[int] = None,
    to_context: _Optional[int] = None,
    ring_policy: RingPolicy = RingPolicy.All,
    processes: _Optional[_Iterable[str]] = None,
    included_binaries: _Optional[_Iterable[str]] = None,
    excluded_binaries: _Optional[_Iterable[str]] = None,
    excluded_symbols: _Optional[_Iterable[str]] = None,
    operation: _Optional[MemoryAccessOperation] = None,
    grouped_by_symbol: bool = False,
    output_format: OutputFormat = OutputFormat.RAW,
    output_file: _Optional[str] = None,
) -> None:
    # declare symbol finder.
    memory_range_symbols_finder = MemoryRangeSymbolFinder(
        trace=server.trace,
        memory_range=memory_range,
        translation_context=(None if context is None else server.trace.context_before(context)),
        from_context=(None if from_context is None else server.trace.context_before(from_context)),
        to_context=(None if to_context is None else server.trace.context_before(to_context)),
        ring_policy=ring_policy,
        operation=operation,
    )

    # filer by processes
    if processes is not None:
        for process in processes:
            memory_range_symbols_finder.filter_by_processes(server.ossi.executed_processes(process))

    # include binaries
    if included_binaries is not None:
        for binary in included_binaries:
            memory_range_symbols_finder.include_bnaries(server.ossi.executed_binaries(binary))

    # exclude binaries
    if excluded_binaries is not None:
        for binary in excluded_binaries:
            memory_range_symbols_finder.exclude_bnaries(server.ossi.executed_binaries(binary))

    # exclude symbols
    if excluded_symbols is not None:
        for symbol in excluded_symbols:
            memory_range_symbols_finder.exclude_symbols(server.ossi.symbols(symbol))

    query = (
        memory_range_symbols_finder.group_by_symbol_query()
        if grouped_by_symbol
        else memory_range_symbols_finder.query()
    )

    if output_format == OutputFormat.RAW:
        print_func = display if in_notebook() else print
        if output_file is not None:
            file = open(output_file, "w")

            def fprint_func(s: MemoryRangeSymbolResult) -> None:
                file.write(str(s))
                file.write("\n")

            print_func = fprint_func
        for result in query:
            print_func(result)

        if output_file is not None:
            file.close()
    else:
        results = {  # type: ignore
            "Ring": [],
            "Process": [],
            "Thread": [],
            "Binary": [],
            "Symbol": [],
            "Start context": [],
            "Access transition": [],
            "Access operation": [],
            "Access physical": [],
            "Access linear": [],
            "Access size": [],
        }
        for result in query:
            for mem_access in result.memory_accesses:
                results["Ring"].append(result.ring)
                results["Process"].append(str(result.process) if result.process is not None else "unknown")
                results["Thread"].append(str(result.thread) if result.thread is not None else "unknown")
                results["Binary"].append(result.binary.name if result.binary is not None else "unknown")
                results["Symbol"].append(
                    result.call_symbol.symbol.name if result.call_symbol.symbol is not None else "unknown"
                )
                results["Start context"].append(str(result.call_symbol.start_context))
                results["Access transition"].append(mem_access.transition.id)
                results["Access operation"].append(mem_access.operation.name)
                results["Access physical"].append(mem_access.physical_address)
                results["Access linear"].append(mem_access.virtual_address)
                results["Access size"].append(mem_access.size)

        # type stub is installed for pandas module but it is a WIP.
        # It doesn't know the `from_dict`` method of `DataFrame` class.
        # so we ignore the type here.
        df = pandas.DataFrame.from_dict(results)  # type: ignore
        if output_format == OutputFormat.TABLE:
            if output_file is not None:
                with open(output_file, "w") as file:
                    file.write(str(df))
            else:
                print(df)
        elif output_format == OutputFormat.CSV:
            print(df.to_csv()) if output_file is None else df.to_csv(output_file)
        elif output_format == OutputFormat.HTML:
            print(df.to_html()) if output_file is None else df.to_html(output_file)


# %% [markdown]
# ### Argument parsing
#
# Argument parsing function for use in the script context.


# %%
def get_memory_access_operation(operation: str) -> MemoryAccessOperation:
    if operation is None:
        return None
    if operation.lower() == "read":
        return MemoryAccessOperation.Read
    if operation.lower() == "write":
        return MemoryAccessOperation.Write
    raise ValueError(f"'operation' value should be 'read' or 'write'. Received '{operation}'.")


def get_ring_policy(ring: int) -> RingPolicy:
    if ring is None:
        return RingPolicy.All
    if ring == 0:
        return RingPolicy.R0Only
    if ring == 3:
        return RingPolicy.R3Only
    raise ValueError(f"'ring_policy' value should be '0' or '1'. Received '{ring_policy}'.")


def get_output_format(format: str) -> OutputFormat:
    if format.lower() == "raw":
        return OutputFormat.RAW
    if format.lower() == "table":
        return OutputFormat.TABLE
    if format.lower() == "html":
        return OutputFormat.HTML
    if format.lower() == "csv":
        return OutputFormat.CSV
    raise ValueError(f"'output format' value should be 'raw', 'table', 'html', or 'csv'. Received '{format}'.")


def script_main():
    parser = argparse.ArgumentParser(description="Find all symbols that access a memory range")
    parser.add_argument(
        "--host",
        type=str,
        default="localhost",
        required=False,
        help='Reven host, as a string (default: "localhost")',
    )
    parser.add_argument(
        "-p",
        "--port",
        type=int,
        default="13370",
        required=False,
        help="Reven port, as an int (default: 13370)",
    )
    parser.add_argument(
        "-m",
        "--memory-range",
        type=str,
        required=True,
        help="The memory range whose accesses to look for in symbols (e.g. [ds:0xfff5000; 2])",
    )
    parser.add_argument(
        "-C",
        "--context",
        type=int,
        required=False,
        help="The context used to translate the memory range if it is virtual",
    )
    parser.add_argument(
        "--from-context",
        type=int,
        required=False,
        help="The context from where the search starts",
    )
    parser.add_argument(
        "--to-context",
        type=int,
        required=False,
        help="The context(not included) at which the search stops",
    )
    parser.add_argument(
        "--ring",
        type=int,
        required=False,
        help="Show symbols in this ring only, can be (0=ring0, 3=ring3)",
    )
    parser.add_argument(
        "--processes",
        required=False,
        nargs="*",
        help="Show symbols in these processes only",
    )
    parser.add_argument(
        "--include-binaries",
        required=False,
        nargs="*",
        help="Show symbols in these binaries only",
    )
    parser.add_argument(
        "--exclude-binaries",
        required=False,
        nargs="*",
        help="Don't show symbols in these binaries, accesses that belong to these symbols will be reported with "
        "the innermost symbol such that it or its binary don't excluded",
    )
    parser.add_argument(
        "--exclude-symbols",
        required=False,
        nargs="*",
        help="Don't show these symbols, accesses that belong to these symbols will be reported with "
        "the innermost non excluded symbol",
    )
    parser.add_argument(
        "--memory-access-operation",
        choices=["read", "write"],
        required=False,
        help="Only show symbols that access the memory range using this operation",
    )
    parser.add_argument(
        "--grouped-by-symbol",
        action="store_true",
        required=False,
        default=False,
        help="Group results by symbol",
    )
    parser.add_argument(
        "-o",
        "--output-file",
        type=str,
        required=False,
        help="The target file of the results. If absent, the results will be printed on the standard output",
    )
    parser.add_argument(
        "--output-format",
        choices=["raw", "table", "csv", "html"],
        required=False,
        default="raw",
        help="Output format of the results",
    )

    args = parser.parse_args()

    try:
        server = RevenServer(args.host, args.port)
    except RuntimeError:
        raise RuntimeError(f"Could not connect to the server on {args.host}:{args.port}.")

    symbols_access_memory_range(
        server=server,
        memory_range=MemoryRange.from_string(args.memory_range),
        context=args.context,
        from_context=args.from_context,
        to_context=args.to_context,
        ring_policy=get_ring_policy(args.ring),
        processes=args.processes,
        included_binaries=args.include_binaries,
        excluded_binaries=args.exclude_binaries,
        excluded_symbols=args.exclude_symbols,
        operation=get_memory_access_operation(args.memory_access_operation),
        grouped_by_symbol=args.grouped_by_symbol,
        output_format=get_output_format(args.output_format),
        output_file=args.output_file,
    )


# %% [markdown]
# ## Parameters
#
# These parameters have to be filled out to use in the notebook context.

# %%
# Server connection
#
host = "localhost"
port = 37103

# Input data

memory_range = MemoryRange(address=_address.LogicalAddress(offset=0xFFFFF8800115E180), size=1)
# Or use the MemoryRange.from_string method
# memory_range = MemoryRange.from_string("[ds:0xFFFFF8800115E180; 1]")


context = 100
# context = None # can be None only when the memory range is defined by a physical address


# Output filter

from_context = None
# from_context = 10


to_context = None
# to_context = 10


ring_policy = RingPolicy.All
# ring_policy = RingPolicy.R0Only
# ring_policy = RingPolicy.R3Only

processes = None  # display result for all processes in the trace
# processes = ["xxx",]

included_binaries = None
# included_binaries = ["xxx",]
excluded_binaries = None
# excluded_binaries = ["xxx",]

excluded_symbols = None
# excluded_symbols = "xxx"

memory_access_operation = None
# memory_access_operation = MemoryAccessOperation.Write
# memory_access_operation = MemoryAccessOperation.Read

# Output target
#
output_file = None  # display results inline
# output_file = "res.csv"  # write results formatted as `csv` to a file named "res.csv" in the current directory


# Output control
#
# group results by symbol
grouped_by_symbol = False
# pandas output type
output_format: OutputFormat = OutputFormat.RAW


# %% [markdown]
# ### Pandas module
#
# This cell verify if pandas module is installed and install it if needed.


# %%
if in_notebook():
    try:
        import pandas  # noqa

        print("pandas already installed")
    except ImportError:
        print("Could not find pandas, attempting to install it from pip")
        import sys
        import subprocess

        command = [f"{sys.executable}", "-m", "pip", "install", "pandas"]
        p = subprocess.run(command)

        if int(p.returncode) != 0:
            raise RuntimeError("Error installing pandas")
        import pandas  # noqa

        print("Successfully installed pandas")
else:
    import pandas  # noqa


# %% [markdown]
# ### Execution cell
#
# This cell executes according to the [parameters](#Parameters) when in notebook context, or according to the
# [parsed arguments](#Argument-parsing) when in script context.
#
# When in notebook context, if the `output` parameter is `None`, then the report will be displayed in the last cell of
# the notebook.

# %%
if __name__ == "__main__":
    if in_notebook():
        try:
            server = RevenServer(host, port)
        except RuntimeError:
            raise RuntimeError(f"Could not connect to the server on {host}:{port}.")

        symbols_access_memory_range(
            server=server,
            memory_range=memory_range,
            context=context,
            from_context=from_context,
            to_context=to_context,
            ring_policy=ring_policy,
            processes=processes,
            included_binaries=included_binaries,
            excluded_binaries=excluded_binaries,
            excluded_symbols=excluded_symbols,
            operation=memory_access_operation,
            grouped_by_symbol=grouped_by_symbol,
            output_format=output_format,
            output_file=output_file,
        )
    else:
        script_main()
# %%