Find symbols that access a specific memory range
Purpose
This notebook and script are designed to find all symbols that access a specific memory range. This script searches a Reven trace for all symbols that accessed a specific memory range. The script can filter the results by processes, ring, included binaries, excluded binaries, excluded symbols, context range and memory access operation. The script can generate two kinds of results:
- process, binary and symbol information for each memory access.
- for each symbol, all the memory accesses that occurred in that symbol. Note that this option can take long time to start showing results, especially when there is many nested functions or many functions that don't end in the trace. Note that:
- accesses will be reported as belonging to the innermost symbol that has not been excluded and whose binary has not been excluded in the configuration.
- we consider that we are "in a symbol" when the corresponding context.location.symbol returns this symbol.
Reven returns the closest symbol with an
rva
lower than ours. Note that we are not trying to determine the exact bounds of the function with that symbol for name. In particular, when there are missing symbols, this may report a symbol we saw a long time ago rather than
How to use
Results can be generated from this notebook or from the command line. The script can also be imported as a module for use from your own script or notebook.
From the notebook
- Upload the
symbols_access_memory_range.ipynb
file in Jupyter. - Fill out the parameters cell of this notebook according to your scenario and desired output.
- Run the full notebook.
From the command line
- Make sure that you are in an environment that can run Reven scripts.
- Run
python symbols_access_memory_range.py --help
to get a tour of available arguments. - Run
python symbols_access_memory_range.py --host <your_host> --port <your_port> [<other_option>]
with your arguments of choice.
Imported in your own script or notebook
- Make sure that you are in an environment that can run Reven scripts.
- Make sure that
symbols_access_memory_range.py
is in the same directory as your script or notebook. - Add
import symbols_access_memory_range
to your script or notebook. You can access the various functions and classes exposed by the module from thesymbols_access_memory_range
namespace. - Refer to the Argument parsing cell for an example of use in a script, and to the
Parameters cell and below for an example of use in a notebook (you just need to preprend
symbols_access_memory_range
in front of the functions and classes from the script).
Known limitations
N/A.
Supported versions
Reven 2.10+
Supported perimeter
Any Reven scenario.
Dependencies
The script requires that the target Reven scenario have:
- The OSSI feature replayed.
- The memory history feature replayed.
- pandas python module
Source
# ---
# jupyter:
# jupytext:
# formats: ipynb,py:percent
# text_representation:
# extension: .py
# format_name: percent
# kernelspec:
# display_name: reven
# language: python
# name: reven-python3
# ---
# %% [markdown]
# # Find symbols that access a specific memory range
#
# ## Purpose
#
# This notebook and script are designed to find all symbols that access a specific memory range.
#
# This script searches a Reven trace for all symbols that accessed a specific memory range.
# The script can filter the results by processes, ring, included binaries, excluded binaries, excluded
# symbols, context range and memory access operation.
#
# The script can generate two kinds of results:
# - process, binary and symbol information for each memory access.
# - for each symbol, all the memory accesses that occurred in that symbol.
# Note that this option can take long time to start showing results,
# especially when there is many nested functions or many functions that don't end in the trace.
#
# Note that:
# - accesses will be reported as belonging to the innermost symbol that has not been excluded
# and whose binary has not been excluded in the configuration.
# - we consider that we are "in a symbol" when the corresponding context.location.symbol returns this symbol.
# Reven returns the closest symbol with an `rva` lower than ours. Note that we are not trying to determine the
# exact bounds of the function with that symbol for name. In particular, when there are missing symbols,
# this may report a symbol we saw a long time ago rather than <unknown>
#
#
#
# ## How to use
#
# Results can be generated from this notebook or from the command line.
# The script can also be imported as a module for use from your own script or notebook.
#
#
# ### From the notebook
#
# 1. Upload the `symbols_access_memory_range.ipynb` file in Jupyter.
# 2. Fill out the [parameters](#Parameters) cell of this notebook according to your scenario and desired output.
# 3. Run the full notebook.
#
#
# ### From the command line
#
# 1. Make sure that you are in an environment that can run Reven scripts.
# 2. Run `python symbols_access_memory_range.py --help` to get a tour of available arguments.
# 3. Run `python symbols_access_memory_range.py --host <your_host> --port <your_port> [<other_option>]` with your
# arguments of choice.
#
# ### Imported in your own script or notebook
#
# 1. Make sure that you are in an environment that can run Reven scripts.
# 2. Make sure that `symbols_access_memory_range.py` is in the same directory as your script or notebook.
# 3. Add `import symbols_access_memory_range` to your script or notebook. You can access the various functions and
# classes exposed by the module from the `symbols_access_memory_range` namespace.
# 4. Refer to the [Argument parsing](#Argument-parsing) cell for an example of use in a script, and to the
# [Parameters](#Parameters) cell and below for an example of use in a notebook (you just need to preprend
# `symbols_access_memory_range` in front of the functions and classes from the script).
#
# ## Known limitations
#
# N/A.
#
# ## Supported versions
#
# Reven 2.10+
#
# ## Supported perimeter
#
# Any Reven scenario.
#
# ## Dependencies
#
# The script requires that the target Reven scenario have:
#
# * The OSSI feature replayed.
# * The memory history feature replayed.
# * pandas python module
# %% [markdown]
# ### Package imports
# %%
import argparse
from enum import Enum
from typing import Iterable as _Iterable, List
from typing import Optional as _Optional
from typing import cast as _cast
from IPython.core.display import display # type: ignore
import pandas
import reven2.address as _address
import reven2.arch as _arch
from reven2.filter import RingPolicy
from reven2.memhist import MemoryAccess, MemoryAccessOperation
from reven2.memory_range import MemoryRange
from reven2.ossi import Binary, Process, Symbol
from reven2.ossi.thread import Thread
from reven2.prelude import RevenServer
from reven2.stack import Stack
from reven2.trace import Context, Trace
from reven2.util import collate as _collate
# %% [markdown]
# ### Utility functions
# %%
# Detect if we are currently running a Jupyter notebook.
#
# This is used e.g. to display rendered results inline in Jupyter when we are executing in the context of a Jupyter
# notebook, or to display raw results on the standard output when we are executing in the context of a script.
def in_notebook():
try:
from IPython import get_ipython # type: ignore
if get_ipython() is None or ("IPKernelApp" not in get_ipython().config):
return False
except ImportError:
return False
return True
# %% [markdown]
# ### Helper classes for results
# %%
class CallSymbol:
r"""
CallSymbol is a helper class used to represent a symbol with its start and end context
"""
def __init__(self, symbol: _Optional[Symbol], start: Context, end: _Optional[Context] = None) -> None:
self._symbol = symbol
self._start = start
self._end = end
@property
def symbol(self) -> _Optional[Symbol]:
r"""
B{Property:} The symbol of the call symbol. None if the symbol is unknown.
"""
return self._symbol
@property
def start_context(self) -> Context:
r"""
B{Property:} The start context of the call symbol.
"""
return self._start
@property
def end_context(self) -> _Optional[Context]:
r"""
B{Property:} The end excluded context of the call symbol. None if the end context isn't in the trace.
"""
return self._end
def __eq__(self, other: "CallSymbol") -> bool: # type: ignore
return self._symbol == other._symbol and self._start == other._start and self._end == other._end
def __ne__(self, other: "CallSymbol") -> bool: # type: ignore
return not (self == other)
class MemoryRangeSymbolResult:
r"""
MemoryRangeSymbolResult is a helper class that represents one result of the search.
"""
def __init__(
self,
call_symbol: CallSymbol,
memory_access: _Optional[MemoryAccess],
ring: int,
process: _Optional[Process],
thread: _Optional[Thread],
binary: _Optional[Binary],
) -> None:
self._call_symbol = call_symbol
self._memory_accesses = [] if memory_access is None else [memory_access]
self._ring = ring
self._process = process
self._thread = thread
self._binary = binary
def copy(self) -> "MemoryRangeSymbolResult":
r"""
return a copy of this object
it makes a shallow copy of all attributes except for memory accesses where the list is deeply copied
"""
new_obj = MemoryRangeSymbolResult(
call_symbol=self._call_symbol,
memory_access=None,
ring=self._ring,
process=self._process,
thread=self._thread,
binary=self._binary,
)
if self._memory_accesses is not None:
new_obj._memory_accesses += self._memory_accesses
return new_obj
@property
def call_symbol(self) -> CallSymbol:
r"""
B{Property:} The call symbol of the result.
"""
return self._call_symbol
@property
def memory_accesses(self) -> List[MemoryAccess]:
r"""
B{Property:} The memory accesses of the result.
"""
return self._memory_accesses
@property
def ring(self) -> int:
r"""
B{Property:} The ring of the result.
"""
return self._ring
@property
def process(self) -> _Optional[Process]:
r"""
B{Property:} The process of the result.
"""
return self._process
@property
def binary(self) -> _Optional[Binary]:
r"""
B{Property:} The binary of the result. None if the binary is unknown.
"""
return self._binary
@property
def thread(self) -> _Optional[Thread]:
r"""
B{Property:} The thread of the result.
"""
return self._thread
def __eq__(self, other: "MemoryRangeSymbolResult") -> bool: # type: ignore
return (
self._ring == other._ring
and self._process is not None
and other._process is not None
and self._process.name == other._process.name
and self._process.pid == other._process.pid
and self._process.ppid == other._process.ppid
and self._thread is not None
and other._thread is not None
and self._thread.id == other._thread.id
and self._thread.owner_process_id == other._thread.owner_process_id
and (
(self._binary is None and other._binary is None)
or (self._binary is not None and other._binary is not None and self._binary.path == other._binary.path)
)
and self._call_symbol == other._call_symbol
)
def __ne__(self, other: "MemoryRangeSymbolResult") -> bool: # type: ignore
return not (self == other)
def __str__(self) -> str:
memory_accesses = "\nmemory accesses:"
for m in self._memory_accesses:
memory_accesses += f"\n\t{m}, "
memory_accesses += "\n"
return (
f"ring: {self._ring}, process: {self._process}, "
f"thread: {self._thread}, binary: {self._binary}, "
f"symbol: {self._call_symbol.symbol}[{self._call_symbol.start_context}, "
f"{self._call_symbol.end_context}[ {memory_accesses}"
)
def format_as_html(self):
r"""
This method gets an html formatting string representation for this class instance.
Information
===========
@returns: C{String}
"""
memory_accesses = "<p>memory accesses:</p><ol>"
for m in self._memory_accesses:
memory_accesses += f"<li>{m.format_as_html()}</li>"
memory_accesses += "</ol>"
return (
f"ring: {self._ring}, process: {self._process if self._process is not None else 'unknown'}, "
f" thread: {self._thread if self._thread is not None else 'unknown'}, binary: {self._binary}, "
f"symbol: {self._call_symbol.symbol}[{self._call_symbol.start_context}, "
f"{self._call_symbol.end_context}[ {memory_accesses}"
)
def _repr_html_(self):
r"""
Representation used by Jupyter Notebook when an instance of the this class is displayed in a cell.
"""
return "<p>{}</p>".format(self.format_as_html())
# %% [markdown]
# ### MemoryRangeSymbolFinder
#
# This class represents the main logic of this script
# %%
class MemoryRangeSymbolFinder(object):
r"""
This class is a helper class to search for all symbols that access a specific memory range.
Results can be filtered by processes, ring, binaries, excluded binaries, excluded symbols
and a context range.
The symbols that access this memory range are returned.
Examples
========
>>> # Search all symbols that access the memory range [ds:0xfffff8800115e180 ; 128]
>>> # filtered by the process `svchost.exe` at the context #410545055.
>>> processes = server.ossi.executed_processes('svchost.exe')
>>> memory_range = MemoryRange::from_string("[ds:0xfffff8800115e180 ; 128]")
>>> context = server.trace.context_before(410545055)
>>> symbol_mem_finder = MemoryRangeSymbolFinder(
... trace=server.trace, memory_range=memory_range,
... context=context, processes=processes)
>>> for r in symbol_mem_finder.query():
... print(r)
ring: 0, process: svchost.exe (1004), thread: 2256, binary: c:/windows/system32/drivers/cng.sys,
symbol: cng!AesCbcDecrypt[Context before #25208343, Context before #25212457[
memory accesses:
[#25208488 xor r8d, dword ptr ds:[r11+rax*4+0x800]]Read access at
@phy:0x36411e0 (virtual address: lin:0xfffff8800115e1e0) of size 4,
...
"""
def __init__(
self,
trace: Trace,
memory_range: MemoryRange,
translation_context: _Optional[Context] = None,
from_context: _Optional[Context] = None,
to_context: _Optional[Context] = None,
ring_policy: RingPolicy = RingPolicy.All,
processes: _Optional[_Iterable[Process]] = None,
included_binaries: _Optional[_Iterable[Binary]] = None,
excluded_binaries: _Optional[_Iterable[Binary]] = None,
excluded_symbols: _Optional[_Iterable[Symbol]] = None,
operation: _Optional[MemoryAccessOperation] = None,
) -> None:
r"""
Initialize a C{MemoryRangeSymbolFinder}
Information
===========
@param trace: the trace where symbols will be looked for.
@param memory_range: the memory range that are accessed by the returned symbols.
@param translation_context: context used to translate the memory range when it is virtual.
@param to_context: the context where the search will be ended.
@param ring_policy: ring policy to search for.
@param processes: processes to limit the search in it. If None, all processes will be filtered.
@param included_binaries: binaries that must be included in the search.
If None, all binaries will be included.
When binary is not included, all its symbols are ignored with its memory accesses
@param excluded_binaries: binaries that must be excluded from the search. If None nothing will be excluded.
Accesses performed in this binary are reported, but using the first caller
binary that is not excluded. Note that inclusion is applied before exclusion.
@param excluded_symbols: symbols that must be excluded from the search. If None nothing will be excluded.
Accesses performed in this symbol are reported, but using the first caller
symbol that is not excluded.
@param operation: limit results to accesses performing the specified operation.
@raises TypeError: if trace is not a C{reven2.trace.Trace}.
@raises ValueError: If provided memory range is virtual and the translation_context is None.
"""
if not isinstance(trace, Trace):
raise TypeError("You must provide a valid trace")
self._trace = trace
if isinstance(memory_range.address, _address.PhysicalAddress):
self._physical_memory_ranges = [_cast(MemoryRange[_address.PhysicalAddress], memory_range)]
elif translation_context is None:
raise ValueError("You must provide a context for the translation if the memory range is virtual")
else:
self._physical_memory_ranges = [mem_range for mem_range in memory_range.translate(translation_context)]
self._from_context = from_context
self._to_context = to_context
self._ring_policy = ring_policy
self._processes = None if processes is None else [process for process in processes]
self._included_binaries = None if included_binaries is None else {binary.name for binary in included_binaries}
self._excluded_binaries = set() if excluded_binaries is None else {binary.name for binary in excluded_binaries}
self._excluded_symbols = set() if excluded_symbols is None else {symbol.name for symbol in excluded_symbols}
self._operation = operation
def filter_by_processes(self, processes: _Iterable[Process]) -> "MemoryRangeSymbolFinder":
r"""
Extend the list of processes to limit the search in, and return the self object.
Information
===========
@param processes: processes to limit the search in.
@returns : self object
"""
if self._processes is None:
self._processes = []
self._processes += [process for process in processes]
return self
def filter_by_ring(self, ring_policy: RingPolicy) -> "MemoryRangeSymbolFinder":
r"""
Update the ring policy to search for and return the `self` object.
Information
===========
@param ring_policy: ring policy to search for.
@returns : self object
"""
self._ring_policy = ring_policy
return self
def from_context(self, context: Context) -> "MemoryRangeSymbolFinder":
r"""
Update the context where the search will be started and return the `self` object.
Information
===========
@param context: context where the search will be started.
@returns : self object
"""
self._from_context = context
return self
def to_context(self, context: Context) -> "MemoryRangeSymbolFinder":
r"""
Update the context where the search will be ended and return the `self` object.
Information
===========
@param context: context where the search will be ended.
@returns : self object
"""
self._to_context = context
return self
def include_bnaries(self, binaries: _Iterable[Binary]) -> "MemoryRangeSymbolFinder":
r"""
Extend the list of binaries that must be included in the search and return the `self` object.
Information
===========
@param binaries: binaries that must be included in the search.
@returns : self object
"""
if self._included_binaries is None:
self._included_binaries = {binary.name for binary in binaries}
else:
self._included_binaries.update([binary.name for binary in binaries])
return self
def exclude_bnaries(self, binaries: _Iterable[Binary]) -> "MemoryRangeSymbolFinder":
r"""
Extend the list of binaries that must be excluded from the search and return the `self` object.
Information
===========
@param binaries: binaries that must be excluded from the search.
@returns : self object
"""
self._excluded_binaries.update([binary.name for binary in binaries])
return self
def exclude_symbols(self, symbols: _Iterable[Symbol]) -> "MemoryRangeSymbolFinder":
r"""
Extend the list of symbols that must be excluded from the search and return the `self` object.
Information
===========
@param symbols: symbols that must be excluded from the search.
@returns : self object
"""
self._excluded_symbols.update([symbol.name for symbol in symbols])
return self
def filter_by_memory_access_operation(
self, operation: _Optional[MemoryAccessOperation] = None
) -> "MemoryRangeSymbolFinder":
r"""
Update the memory access operation to limit results to accesses performing this
operation and return the `self` object.
Information
===========
@param operation: limit results to accesses performing the specified operation.
@returns : self object
"""
self._operation = operation
return self
def _is_the_same_stack(self, stack1: Stack, stack2: Stack) -> bool:
# we assume that two stacks are the same if the first contexts of their first frames are the same
frame1 = next(stack1.frames())
frame2 = next(stack2.frames())
return frame1.first_context == frame2.first_context
def query(self) -> _Iterable[MemoryRangeSymbolResult]:
r"""
Iterate over all filtered contexts and yield symbols.
Note: the same symbol can be yielded several times with different memory accesses.
"""
# Make a copy of the variables that can modify the generated results
operation = self._operation
included_binaries = None if self._included_binaries is None else self._included_binaries.copy()
excluded_binaries = self._excluded_binaries.copy()
excluded_symbols = self._excluded_symbols.copy()
# store last handled stack to use it if we are in the same stack
last_stack: _Optional[Stack] = None
# store last result to use it if we are in the same stack
last_result: _Optional[MemoryRangeSymbolResult] = None
# Iterate over all context range filtered by ring, processes, from_context and to_context
for context_range in self._trace.filter(
processes=self._processes,
ring_policy=self._ring_policy,
from_context=self._from_context,
to_context=self._to_context,
):
from_transition = (
context_range.begin.transition_before()
if context_range.begin == self._trace.last_context
else context_range.begin.transition_after()
)
to_transition = (
context_range.last.transition_before()
if context_range.last == self._trace.last_context
else context_range.last.transition_after()
)
# iterate over physical memory range
iterators = [
self._trace.memory_accesses(
address=memory_range.address,
size=memory_range.size,
from_transition=from_transition,
to_transition=to_transition,
)
for memory_range in self._physical_memory_ranges
]
# iterate over all memory accesses in the this range
for memory_access in _collate(iterators, key=lambda x: x.transition.id):
# apply filter by operation here instead of in the query, because currently
# operation-constrained queries are not optimized in the backend
if operation is not None and operation != memory_access.operation:
continue
# get the stack at this transition
current_context: Context = memory_access.transition.context_before()
stack = current_context.stack
if last_result is not None and last_stack is not None and self._is_the_same_stack(last_stack, stack):
# update the memory access of the last result and yield it
last_result._memory_accesses = [memory_access]
yield last_result
continue
last_stack = stack
# exclude symbols and binary
handled_binary = None
handled_symbol = None
handled_symbol_found = False
frames = [frame for frame in stack.frames()]
frames.reverse()
for frame in frames:
loc = frame.first_context.ossi.location()
if loc is not None and (
loc.binary.name in excluded_binaries
or (loc.symbol is not None and loc.symbol.name in excluded_symbols)
or ("unknown" in excluded_symbols)
):
break
if loc is not None:
if loc.binary is not None:
handled_binary = loc.binary
if loc.symbol is not None:
handled_symbol = loc.symbol
handled_symbol_found = True
first_context = frame.first_context
handled_process = frame.first_context.ossi.process()
handled_thread = frame.first_context.ossi.thread()
# ignore symbol if it is in excluded symbols or if its binary in the excluded binaries
if not handled_symbol_found:
continue
# ignore symbol if its binary isn't in the included binaries
if (
included_binaries is not None
and handled_binary is not None
and handled_binary.name not in included_binaries
):
continue
# get the end of symbol
end_transition = (
first_context.transition_after().step_out()
if first_context != self._trace.last_context
else first_context.transition_before().step_out()
)
end_context = None if end_transition is None else end_transition.context_before()
# get the ring of the symbol
handled_ring = first_context.read(_arch.x64.cs) & 0x3
last_result = MemoryRangeSymbolResult(
call_symbol=CallSymbol(handled_symbol, first_context, end_context),
memory_access=memory_access,
ring=handled_ring,
process=handled_process,
thread=handled_thread,
binary=handled_binary,
)
yield last_result
def group_by_symbol_query(self) -> _Iterable[MemoryRangeSymbolResult]:
r"""
Iterate over all filtered contexts and yield symbols.
Note: each symbol will be yielded only once, with a group of all its memory accesses.
"""
# Add symbols to a stack and pop it when it is finished
result_stack = [] # type: List[MemoryRangeSymbolResult]
for result in self.query():
if len(result_stack) > 0:
# firstly, we verify if we can pop the last item from the stack
# Item will be yielded if its end context isn't None and the current result of
# the query has a memory access such that the before context of its transition
# >= the context of the last symbol in the stack
if (
result.call_symbol.end_context is not None
# len(result.memory_accesses) > 0 because the results of `query`
# contain exactly one memory_access by construction.
and result.memory_accesses[0].transition.context_before() >= result.call_symbol.end_context
):
res = result_stack.pop(-1)
yield res
# Here we observe symbols change,
# if the symbol is changed (result_stack[-1] != result) we add the new symbol to the stack.
# (len(result_stack) == 0 is only to handle the case of the first result)
if len(result_stack) == 0 or result_stack[-1] != result:
# store a deep copy of the result
result_stack.append(result.copy())
continue
# the symbol didn't change, so we add the memory access of the current result
# to the last item in the stack
result_stack[-1]._memory_accesses += result.memory_accesses
# yield all symbols with None end context
for result in result_stack:
yield result
# %% [markdown]
#
# ### OutputType
# %%
class OutputFormat(Enum):
r"""
Enum describing the various possible output formats of the results
- RAW: The results will be output using its string representation.
- TABLE: The results will be output using pandas table format.
- CSV: The results will be output as csv.
- HTML: The results will be output as html table.
"""
RAW = 0
TABLE = 1
CSV = 2
HTML = 3
# %% [markdown]
# ### Main function
#
# This function is called with parameters from the [Parameters](#Parameters) cell in the notebook context,
# or with parameters from the command line in the script context.
# %%
def symbols_access_memory_range(
server: RevenServer,
memory_range: MemoryRange,
context: _Optional[int],
from_context: _Optional[int] = None,
to_context: _Optional[int] = None,
ring_policy: RingPolicy = RingPolicy.All,
processes: _Optional[_Iterable[str]] = None,
included_binaries: _Optional[_Iterable[str]] = None,
excluded_binaries: _Optional[_Iterable[str]] = None,
excluded_symbols: _Optional[_Iterable[str]] = None,
operation: _Optional[MemoryAccessOperation] = None,
grouped_by_symbol: bool = False,
output_format: OutputFormat = OutputFormat.RAW,
output_file: _Optional[str] = None,
) -> None:
# declare symbol finder.
memory_range_symbols_finder = MemoryRangeSymbolFinder(
trace=server.trace,
memory_range=memory_range,
translation_context=(None if context is None else server.trace.context_before(context)),
from_context=(None if from_context is None else server.trace.context_before(from_context)),
to_context=(None if to_context is None else server.trace.context_before(to_context)),
ring_policy=ring_policy,
operation=operation,
)
# filer by processes
if processes is not None:
for process in processes:
memory_range_symbols_finder.filter_by_processes(server.ossi.executed_processes(process))
# include binaries
if included_binaries is not None:
for binary in included_binaries:
memory_range_symbols_finder.include_bnaries(server.ossi.executed_binaries(binary))
# exclude binaries
if excluded_binaries is not None:
for binary in excluded_binaries:
memory_range_symbols_finder.exclude_bnaries(server.ossi.executed_binaries(binary))
# exclude symbols
if excluded_symbols is not None:
for symbol in excluded_symbols:
memory_range_symbols_finder.exclude_symbols(server.ossi.symbols(symbol))
query = (
memory_range_symbols_finder.group_by_symbol_query()
if grouped_by_symbol
else memory_range_symbols_finder.query()
)
if output_format == OutputFormat.RAW:
print_func = display if in_notebook() else print
if output_file is not None:
file = open(output_file, "w")
def fprint_func(s: MemoryRangeSymbolResult) -> None:
file.write(str(s))
file.write("\n")
print_func = fprint_func
for result in query:
print_func(result)
if output_file is not None:
file.close()
else:
results = { # type: ignore
"Ring": [],
"Process": [],
"Thread": [],
"Binary": [],
"Symbol": [],
"Start context": [],
"Access transition": [],
"Access operation": [],
"Access physical": [],
"Access linear": [],
"Access size": [],
}
for result in query:
for mem_access in result.memory_accesses:
results["Ring"].append(result.ring)
results["Process"].append(str(result.process) if result.process is not None else "unknown")
results["Thread"].append(str(result.thread) if result.thread is not None else "unknown")
results["Binary"].append(result.binary.name if result.binary is not None else "unknown")
results["Symbol"].append(
result.call_symbol.symbol.name if result.call_symbol.symbol is not None else "unknown"
)
results["Start context"].append(str(result.call_symbol.start_context))
results["Access transition"].append(mem_access.transition.id)
results["Access operation"].append(mem_access.operation.name)
results["Access physical"].append(mem_access.physical_address)
results["Access linear"].append(mem_access.virtual_address)
results["Access size"].append(mem_access.size)
# type stub is installed for pandas module but it is a WIP.
# It doesn't know the `from_dict`` method of `DataFrame` class.
# so we ignore the type here.
df = pandas.DataFrame.from_dict(results) # type: ignore
if output_format == OutputFormat.TABLE:
if output_file is not None:
with open(output_file, "w") as file:
file.write(str(df))
else:
print(df)
elif output_format == OutputFormat.CSV:
print(df.to_csv()) if output_file is None else df.to_csv(output_file)
elif output_format == OutputFormat.HTML:
print(df.to_html()) if output_file is None else df.to_html(output_file)
# %% [markdown]
# ### Argument parsing
#
# Argument parsing function for use in the script context.
# %%
def get_memory_access_operation(operation: str) -> MemoryAccessOperation:
if operation is None:
return None
if operation.lower() == "read":
return MemoryAccessOperation.Read
if operation.lower() == "write":
return MemoryAccessOperation.Write
raise ValueError(f"'operation' value should be 'read' or 'write'. Received '{operation}'.")
def get_ring_policy(ring: int) -> RingPolicy:
if ring is None:
return RingPolicy.All
if ring == 0:
return RingPolicy.R0Only
if ring == 3:
return RingPolicy.R3Only
raise ValueError(f"'ring_policy' value should be '0' or '1'. Received '{ring_policy}'.")
def get_output_format(format: str) -> OutputFormat:
if format.lower() == "raw":
return OutputFormat.RAW
if format.lower() == "table":
return OutputFormat.TABLE
if format.lower() == "html":
return OutputFormat.HTML
if format.lower() == "csv":
return OutputFormat.CSV
raise ValueError(f"'output format' value should be 'raw', 'table', 'html', or 'csv'. Received '{format}'.")
def script_main():
parser = argparse.ArgumentParser(description="Find all symbols that access a memory range")
parser.add_argument(
"--host",
type=str,
default="localhost",
required=False,
help='Reven host, as a string (default: "localhost")',
)
parser.add_argument(
"-p",
"--port",
type=int,
default="13370",
required=False,
help="Reven port, as an int (default: 13370)",
)
parser.add_argument(
"-m",
"--memory-range",
type=str,
required=True,
help="The memory range whose accesses to look for in symbols (e.g. [ds:0xfff5000; 2])",
)
parser.add_argument(
"-C",
"--context",
type=int,
required=False,
help="The context used to translate the memory range if it is virtual",
)
parser.add_argument(
"--from-context",
type=int,
required=False,
help="The context from where the search starts",
)
parser.add_argument(
"--to-context",
type=int,
required=False,
help="The context(not included) at which the search stops",
)
parser.add_argument(
"--ring",
type=int,
required=False,
help="Show symbols in this ring only, can be (0=ring0, 3=ring3)",
)
parser.add_argument(
"--processes",
required=False,
nargs="*",
help="Show symbols in these processes only",
)
parser.add_argument(
"--include-binaries",
required=False,
nargs="*",
help="Show symbols in these binaries only",
)
parser.add_argument(
"--exclude-binaries",
required=False,
nargs="*",
help="Don't show symbols in these binaries, accesses that belong to these symbols will be reported with "
"the innermost symbol such that it or its binary don't excluded",
)
parser.add_argument(
"--exclude-symbols",
required=False,
nargs="*",
help="Don't show these symbols, accesses that belong to these symbols will be reported with "
"the innermost non excluded symbol",
)
parser.add_argument(
"--memory-access-operation",
choices=["read", "write"],
required=False,
help="Only show symbols that access the memory range using this operation",
)
parser.add_argument(
"--grouped-by-symbol",
action="store_true",
required=False,
default=False,
help="Group results by symbol",
)
parser.add_argument(
"-o",
"--output-file",
type=str,
required=False,
help="The target file of the results. If absent, the results will be printed on the standard output",
)
parser.add_argument(
"--output-format",
choices=["raw", "table", "csv", "html"],
required=False,
default="raw",
help="Output format of the results",
)
args = parser.parse_args()
try:
server = RevenServer(args.host, args.port)
except RuntimeError:
raise RuntimeError(f"Could not connect to the server on {args.host}:{args.port}.")
symbols_access_memory_range(
server=server,
memory_range=MemoryRange.from_string(args.memory_range),
context=args.context,
from_context=args.from_context,
to_context=args.to_context,
ring_policy=get_ring_policy(args.ring),
processes=args.processes,
included_binaries=args.include_binaries,
excluded_binaries=args.exclude_binaries,
excluded_symbols=args.exclude_symbols,
operation=get_memory_access_operation(args.memory_access_operation),
grouped_by_symbol=args.grouped_by_symbol,
output_format=get_output_format(args.output_format),
output_file=args.output_file,
)
# %% [markdown]
# ## Parameters
#
# These parameters have to be filled out to use in the notebook context.
# %%
# Server connection
#
host = "localhost"
port = 37103
# Input data
memory_range = MemoryRange(address=_address.LogicalAddress(offset=0xFFFFF8800115E180), size=1)
# Or use the MemoryRange.from_string method
# memory_range = MemoryRange.from_string("[ds:0xFFFFF8800115E180; 1]")
context = 100
# context = None # can be None only when the memory range is defined by a physical address
# Output filter
from_context = None
# from_context = 10
to_context = None
# to_context = 10
ring_policy = RingPolicy.All
# ring_policy = RingPolicy.R0Only
# ring_policy = RingPolicy.R3Only
processes = None # display result for all processes in the trace
# processes = ["xxx",]
included_binaries = None
# included_binaries = ["xxx",]
excluded_binaries = None
# excluded_binaries = ["xxx",]
excluded_symbols = None
# excluded_symbols = "xxx"
memory_access_operation = None
# memory_access_operation = MemoryAccessOperation.Write
# memory_access_operation = MemoryAccessOperation.Read
# Output target
#
output_file = None # display results inline
# output_file = "res.csv" # write results formatted as `csv` to a file named "res.csv" in the current directory
# Output control
#
# group results by symbol
grouped_by_symbol = False
# pandas output type
output_format: OutputFormat = OutputFormat.RAW
# %% [markdown]
# ### Pandas module
#
# This cell verify if pandas module is installed and install it if needed.
# %%
if in_notebook():
try:
import pandas # noqa
print("pandas already installed")
except ImportError:
print("Could not find pandas, attempting to install it from pip")
import sys
import subprocess
command = [f"{sys.executable}", "-m", "pip", "install", "pandas"]
p = subprocess.run(command)
if int(p.returncode) != 0:
raise RuntimeError("Error installing pandas")
import pandas # noqa
print("Successfully installed pandas")
else:
import pandas # noqa
# %% [markdown]
# ### Execution cell
#
# This cell executes according to the [parameters](#Parameters) when in notebook context, or according to the
# [parsed arguments](#Argument-parsing) when in script context.
#
# When in notebook context, if the `output` parameter is `None`, then the report will be displayed in the last cell of
# the notebook.
# %%
if __name__ == "__main__":
if in_notebook():
try:
server = RevenServer(host, port)
except RuntimeError:
raise RuntimeError(f"Could not connect to the server on {host}:{port}.")
symbols_access_memory_range(
server=server,
memory_range=memory_range,
context=context,
from_context=from_context,
to_context=to_context,
ring_policy=ring_policy,
processes=processes,
included_binaries=included_binaries,
excluded_binaries=excluded_binaries,
excluded_symbols=excluded_symbols,
operation=memory_access_operation,
grouped_by_symbol=grouped_by_symbol,
output_format=output_format,
output_file=output_file,
)
else:
script_main()
# %%