Search in memory
Purpose
Search the memory at a specific context for a string or for an array of bytes.
The memory range to search in is defined by a starting address and a search_size.
All unmapped addresses are ignored during the search.
How to use
usage: search_in_memory.py [-h] --host HOST -p PORT --transition TRANSITION
--address ADDRESS --pattern PATTERN
[--search-size SEARCH_SIZE] [--backward]
optional arguments:
-h, --help show this help message and exit
--host HOST Reven host, as a string (default: "localhost")
-p PORT, --port PORT Reven port, as an int (default: 13370)
--transition TRANSITION
transition id. the context before this id will be
searched
--address ADDRESS The start address of the memory area to search in. It
can be a hex offset as 0xfff123 (same as ds:0xfff123),
hex offset prefixed by segment register as
gs:0xfff123, hex offset prefixed by hex segment index
as 0x20:0xfff123, hex offset prefixed by 'lin' for
linear address, or offset prefixed by 'phy' for
physical address.
--pattern PATTERN pattern that will be searched. It can be a normal
string as 'test', or a string of bytes as
'\x01\x02\x03\x04'.Maximum accepted length is 4096
--search-size SEARCH_SIZE
The size of memory area to search in. accepted value
can take a suffix, like 1000, 10kb or 10mb.Default
value is 1000mb
--backward If present the search will go in backward direction.
Known limitations
-
Currently, this script cannot handle logical addresses that are not aligned on a memory page (4K) with their corresponding physical address. In 64 bits, this can happen mainly for the
gs
andfs
segment registers. If you encounter this limitation, you can manually translate your virtual address using itstranslate
method, and then restart the search on the resulting physical address (limiting the search range to 4K, so as to remain in the boundaries of the virtual page). -
Pattern length must be less than or equal to the page size (4k).
Supported versions
Reven 2.6+.
Supported perimeter
Any Reven scenario.
Dependencies
None.
Source
import argparse
import sys
from copy import copy
import reven2
import reven2.address as _address
import reven2.arch.x64 as x64_regs
"""
# Search in memory
## Purpose
Search the memory at a specific context for a string or for an array of bytes.
The memory range to search in is defined by a starting address and a search_size.
All unmapped addresses are ignored during the search.
## How to use
```bash
usage: search_in_memory.py [-h] --host HOST -p PORT --transition TRANSITION
--address ADDRESS --pattern PATTERN
[--search-size SEARCH_SIZE] [--backward]
optional arguments:
-h, --help show this help message and exit
--host HOST Reven host, as a string (default: "localhost")
-p PORT, --port PORT Reven port, as an int (default: 13370)
--transition TRANSITION
transition id. the context before this id will be
searched
--address ADDRESS The start address of the memory area to search in. It
can be a hex offset as 0xfff123 (same as ds:0xfff123),
hex offset prefixed by segment register as
gs:0xfff123, hex offset prefixed by hex segment index
as 0x20:0xfff123, hex offset prefixed by 'lin' for
linear address, or offset prefixed by 'phy' for
physical address.
--pattern PATTERN pattern that will be searched. It can be a normal
string as 'test', or a string of bytes as
'\x01\x02\x03\x04'.Maximum accepted length is 4096
--search-size SEARCH_SIZE
The size of memory area to search in. accepted value
can take a suffix, like 1000, 10kb or 10mb.Default
value is 1000mb
--backward If present the search will go in backward direction.
```
## Known limitations
- Currently, this script cannot handle logical addresses that are not aligned on a memory page (4K)
with their corresponding physical address. In 64 bits, this can happen mainly for
the `gs` and `fs` segment registers.
If you encounter this limitation, you can manually translate your virtual address
using its `translate` method, and then restart the search on the resulting physical address
(limiting the search range to 4K, so as to remain in the boundaries of the virtual page).
- Pattern length must be less than or equal to the page size (4k).
## Supported versions
Reven 2.6+.
## Supported perimeter
Any Reven scenario.
## Dependencies
None.
"""
class MemoryFinder(object):
r"""
This class is a helper class to search the memory at a specific context for a string or for an array of bytes.
The memory range to search in is defined by a starting address and a search_size.
The matching addresses are returned.
Known limitation
================
Currently, this class cannot handle logical addresses that are not aligned on a memory page (4K)
with their corresponding physical address. In 64 bits, this can happen mainly for
the `gs` and `fs` segment registers.
If you encounter this limitation, you can manually translate your virtual address
using its `translate` method, and then restart the search on the resulting physical address
(limiting the search range to 4K, so as to remain in the boundaries of the virtual page).
Pattern length must be less than or equal to page size (4k).
Examples
========
>>> # Search the first context starting from the address ds:0xfffff123123 for the string 'string'
>>> # Search_size default value is 1000MB.
>>> # Memory range to search in is: [ds:0xfffff123123, ds:0xfffff123123 + 1000MB]
>>> for address, progress in MemoryFinder(context, 0xfffff123123).query('string'):
... sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), '%'))
... if address:
... print("found match at {}".format(address))
found match at ds:0xfffff123444
...
>>> # Search the first context starting from the address lin:0xfffff123123 for the
>>> # array of bytes '\\x35\\xfe\\x0e\\x4a'
>>> # Search size default value is 1000MB
>>> # Memory range to search in is: [lin:0xfffff123123, lin:0xfffff123123 + 1000MB]
>>> address = reven2.address.LinearAddress(0xfffff123123)
>>> for address, progress in MemoryFinder(context, address).query('\\x35\\xfe\\x0e\\x4a'):
... sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), '%'))
... if address:
... print("found match at {}".format(address))
found match at ds:0xfffff125229
...
>>> # Search the first context starting from the address gs:0x180 for the string 'string'
>>> # Search size value is 100MB
>>> # Memory range to search in is: [gs:0x180, ds:0x180 + 100MB]
>>> address = reven2.address.LogicalAddress(0x180, reven2.arch.x64.gs)
>>> for address, progress in MemoryFinder(context, address, 100*1024*1024).query('string'):
... sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), '%'))
... if address:
... print("found match at {}".format(address))
found match at ds:0xfffff123444
...
>>> # Search the first context starting from the address ds:0xfffff123123 for the string 'string'
>>> # in backward direction.
>>> # Search_size default value is 1000MB.
>>> # Memory range to search in is: [ds:0xfffff123123, ds:0xfffff123123 + 1000MB]
>>> for address, progress in MemoryFinder(context, 0xfffff123123).query('string', False):
... sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), '%'))
... if address:
... print("found match at {}".format(address))
found match at ds:0xfffff123004
...
"""
page_size = 0x1000
progress_step = 0x10000
def __init__(self, context, address, search_size=1000 * 1024**2):
r"""
Initialize a C{MemoryFinder} from context and address
Information
===========
@param context: C{reven2.trace.Context} where searching will be done.
@param address: a class from C{reven2.address} the address where the search will be started.
@param search_size: an C{Integer} representing the size, in bytes, of the search range.
@raises TypeError: if context is not a C{reven2.trace.Context} or address is not a C{Integer} or
one of the address classes on C{reven2.address}.
@raises RunTimeError: If the address is a virtual address that is not aligned to its
corresponding physical address.
"""
if not isinstance(context, reven2.trace.Context):
raise TypeError("context must be an instance of reven2.trace.Context class")
self._context = context
search_addr = copy(address)
if not isinstance(search_addr, _address._AbstractAddress):
try:
# if address is of type int make it a logical address with ds as segment register
search_addr = _address.LogicalAddress(address)
except TypeError:
raise TypeError(
"address must be an instance of a class from reven2.address " "module or an integer value."
)
self._search_size = search_size
self._start = search_addr
@property
def search_size(self):
return self._search_size
def query(self, pattern, is_forward=True):
r"""
Iterate the search range looking for the specified pattern.
This method returns a generator of tuples, of the form C{(A, processed_bytes)}, such that:
- C{processed_bytes} indicates the number of bytes already processed in the search range.
- C{A} is either an address of the same type as the input address, or C{None}.
If an address is returned, it corresponds to an address matching the searched pattern.
C{None} is returned every 40KB of the search range, as a means of indicating progress.
Information
===========
@param pattern: A C{str} or C{bytearray}. The pattern to look for in memory.
Note: C{str} pattern is converted to bytearray using ascci encoding.
@param is_forward: C{bool}, C{True} to search in forward direction and C{False}
to search in backward direction
@returns: a generator of tuples, where the tuples are either:
- C{(None, processed_bytes)} every 40KB of the search range,
- C{(matching_address, processed_bytes)} each time a matching_address is found.
"""
# pattern is a byte array or a string
search_pattern = copy(pattern)
if not isinstance(search_pattern, bytearray):
if isinstance(search_pattern, str):
search_pattern = bytearray(str.encode(pattern))
else:
raise RuntimeError("Cannot parse pattern, bad format.")
if len(search_pattern) > self.page_size:
raise RuntimeError("Maximum length of pattern must be less than or equal to %d." % self.page_size)
return self._search(search_pattern, is_forward)
def _search(self, pattern, is_forward):
def loop_condition(curr, end):
return curr < end if is_forward else curr > end
cross_page_addition = len(pattern) - 1
iteration_step = self.page_size if is_forward else -self.page_size
curr = self._start
end = curr + self._search_size if is_forward else curr - self._search_size
prev = None
progress = 0
# first loop detects the first mapped address, then test if it aligned
# this step is only applied for logical address
if not isinstance(curr, reven2.address.PhysicalAddress):
while loop_condition(curr, end):
phy = curr.translate(self._context)
if phy is None:
curr += iteration_step
progress += self.page_size
if progress % self.progress_step == 0:
yield None, progress
continue
# linear -> physical alignment is guaranteed on 4k boundary:
# If linear is 0xxxxx123, physical will be 0xyyyy123
# logical -> linear alignment is not guaranteed because segment offset goes down to the byte
# (or at least down to less than 4k): logical gs:0x123 could be linear 0xzzzzz456
# Problem is: gs:0x0 might not be at start of page, 0x0:0x1000 might span on two pages
# instead of one. To solve: we need to translate logical -> physical for start address,
# and take note of offset to use that to compute actual start of page
# currently, we don't treat the case where logical -> linear alignment isn't valid.
if curr.offset % self.page_size != phy.offset % self.page_size:
raise RuntimeError(
"The provided address is not aligned on a memory page (4K)"
"with their corresponding physical address. Only aligned "
"addresses can be handled."
)
break
# second loop starts the search
while loop_condition(curr, end):
# get offset between current address and the start of the page
# This offset is zero except in the first iteration may be different to zero
offset = curr.offset % self.page_size
# compute the length of the buffer to read.
# This buffer length equals the page size except in the first iteration may be different
buffer_length = self.page_size if offset == 0 else (self.page_size - offset if is_forward else offset)
# the iteration step to go forward or backward
iteration_step = buffer_length if is_forward else -buffer_length
# compute the address to read it.
# in forward this address is the current address,
# in backward we have to read until the current address so it is current - buffer length
read_address = curr if is_forward else curr - buffer_length
# if the read buffer will exceed the search range adjust it
if is_forward and read_address + buffer_length > end:
buffer_length = end.offset - read_address.offset
elif not is_forward and read_address < end:
read_address = end
buffer_length = curr.offset - read_address.offset
try:
buffer = self._context.read(read_address, buffer_length, raw=True)
except Exception:
curr += iteration_step
progress += self.page_size
prev = None
if progress % self.progress_step == 0:
yield None, progress
continue
# Add necessary bytes from previous page to allow cross-page matches
addr_offset = 0
if prev is not None:
if is_forward:
prev_buf_len = -len(prev) if cross_page_addition > len(prev) else -cross_page_addition
buffer = prev[prev_buf_len:] + buffer if prev_buf_len < 0 else buffer
addr_offset = prev_buf_len
else:
prev_buf_len = len(prev) if cross_page_addition > len(prev) else cross_page_addition
buffer = buffer + prev[:prev_buf_len]
index = 0
addr_res = []
while True:
index = buffer.find(pattern, index)
if index == -1:
break
addr_res.append(read_address + index + addr_offset)
index += 1
for addr in addr_res if is_forward else reversed(addr_res):
yield addr, progress
progress += self.page_size
prev = buffer
curr += iteration_step
def parse_address(string_address):
segments = [x64_regs.ds, x64_regs.cs, x64_regs.es, x64_regs.ss, x64_regs.gs, x64_regs.fs]
def _str_to_seg(str_reg):
for segment in segments:
if str_reg == segment.name:
return segment
return None
try:
# Try to parse address as offset only as 0xfff123.
return _address.LogicalAddress(int(string_address, base=16))
except ValueError:
pass
# Try to parse address as prefex:offset as 0x32:0xfff123, gs:0xfff123, lin:0xfff123 or phy:0xff123.
res = string_address.split(":")
if len(res) != 2:
raise RuntimeError("Cannot parse address, bad format")
try:
offset = int(res[1].strip(), base=16)
except ValueError:
raise RuntimeError("Cannot parse address, bad format")
try:
# Try to parse it as 0x32:0xfff123.
segment_index = int(res[0].strip(), base=16)
return _address.LogicalAddressSegmentIndex(segment_index, offset)
except ValueError:
pass
lower_res0 = res[0].lower().strip()
# Try parse it as ds:0xfff123, cs::0xfff123, es::0xfff123, ss::0xfff123, gs::0xfff123 or fs::0xfff123.
sreg = _str_to_seg(lower_res0)
if sreg:
return _address.LogicalAddress(offset, sreg)
elif lower_res0 == "lin":
# Try parse it as lin:0xfff123.
return _address.LinearAddress(offset)
elif lower_res0 == "phy":
# Try parse it as phy:0xfff123.
return _address.PhysicalAddress(offset)
else:
raise RuntimeError("Cannot parse address, bad format")
def parse_search_size(string_size):
try:
# try to convert it to int
return int(string_size)
except ValueError:
pass
# try to convert it to int without the two last char
lower_string = string_size.lower()
ssize = lower_string[:-2]
try:
size = int(ssize)
except ValueError:
raise RuntimeError("Cannot parse search size, bad format")
# convert it according to its suffix
if lower_string.endswith("kb"):
return size * 1024
elif lower_string.endswith("mb"):
return size * 1024 * 1024
else:
raise RuntimeError("Cannot parse search size, bad format")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--host", type=str, default="localhost", required=True, help='Reven host, as a string (default: "localhost")'
)
parser.add_argument(
"-p", "--port", type=int, default="13370", required=True, help="Reven port, as an int (default: 13370)"
)
parser.add_argument(
"--transition", type=int, required=True, help="transition id. the context before this id will be searched"
)
parser.add_argument(
"--address",
type=str,
required=True,
help="The start address of the memory area to search in. "
"It can be a hex offset as 0xfff123 (same as ds:0xfff123), "
"hex offset prefixed by segment register as gs:0xfff123, "
"hex offset prefixed by hex segment index as 0x20:0xfff123, "
"hex offset prefixed by 'lin' for linear address, "
"or offset prefixed by 'phy' for physical address.",
)
parser.add_argument(
"--pattern",
type=str,
required=True,
help="pattern that will be searched. "
"It can be a normal string as 'test', "
"or a string of bytes as '\\x01\\x02\\x03\\x04'."
"Maximum accepted length is 4096",
)
parser.add_argument(
"--search-size",
type=str,
default="1000mb",
help="The size of memory area to search in. "
"accepted value can take a suffix, like 1000, 10kb or 10mb."
"Default value is 1000mb",
)
parser.add_argument(
"--backward", default=False, action="store_true", help="If present the search will go in backward direction."
)
args = parser.parse_args()
try:
pattern = bytearray(map(ord, bytearray(map(ord, args.pattern.strip())).decode("unicode_escape")))
except Exception as e:
raise RuntimeError("Cannot parse pattern, bad format(%s)" % str(e))
address = parse_address(args.address.strip())
reven_server = reven2.RevenServer(args.host, args.port)
context = reven_server.trace.context_before(args.transition)
finder = MemoryFinder(context, address, parse_search_size(args.search_size.strip()))
for address, progress in finder.query(pattern, not args.backward):
sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), "%"))
if address:
print("found match at {}".format(address))