Reven API Cookbook

Welcome to the Reven API Cookbook.

This book is a collection of examples that demonstrate good practices to accomplish common tasks using the Reven API.

Pre-requisites

This book assumes that you already read the Python API quick start guide.

Furthermore, all the examples of this book assume that you are in an environment where you can import reven2 successfully.
If this is not the case, please refer to the installation documentation of the Python API before starting this guide.

64-bit examples

This book assumes that you are analyzing scenarios in a 64-bit context. If analyzing a scenario in a 32-bit context, then you should read from the 32-bit variants of registers or using 32-bit addresses.

So, if an example use regs.rax, the register should be replaced by regs.eax, like the following example:
ctx.read(regs.rax, U64) # 64-bit
ctx.read(regs.eax, U32) # 32-bit

Common abbreviations and variable names

Reven scripts tend to re-use the same abbreviations and variable names for common objects.

AbbreviationDescription
serverA reven2.RevenServer instance representing a connection to a Reven server
pmA reven2.preview.project_manager.ProjectManager instance representing a connection to the Project Manager
traceA reven2.trace.Trace instance, usually obtained from server.trace
trA reven2.trace.Transition instance, usually obtained from trace.transition(tr_id)
ctxA reven2.trace.Context instance, usually obtained from e.g. trace.context_before(tr_id)

Connecting to a server

Server variable

The other examples of this book assume that your Python environment contains a server variable that represents a connection to a Reven server.

From its host and port

Reven v2.0.0
# Connecting to a reven server hostname = "localhost" port = 13370 server = reven2.RevenServer(hostname, port)

From the scenario's name

Reven v2.3.0
Edition Enterprise
API preview

You can use a feature of the Workflow API to get a connection to a server from the scenario's name, rather than by specifying a port.

From the CLI:

from reven2.preview.project_manager import ProjectManager pm = ProjectManager("http://localhost:8880") # URL to the Reven Project Manager connection = pm.connect("cve-2016-7255") # No need to specify "13370" server = connection.server

From a script:

with pm.connect("cve-2016-7255") as server: # TODO: use the server pass

This is useful, as the server port will typically change at each reopening of the scenario, while the scenario name remains the same.

If no server is open for that particular scenario when executing the ProjectManager.connect method call, then a new one will be started.

Checking server information

Getting basic server information

print(f"Scenario '{server.scenario_name}' on {server.host}:{server.port}")

Sample output:

Scenario 'CVE-2021-21166-Chrome' on localhost:13370

Server OS information

Reven v2.12.0
OS Windows 64-bit
OS Linux 64-bit

Starting with Reven 2.12, the server provides information on the OS of the running scenario:

print(server.ossi.os())

Sample output:

Windows x64 10.0 (Windows 10)

Checking that we are on the supported perimeter of the current script

server.ossi.os().expect(reven2.ossi.Os(windows_version=reven2.ossi.WindowsVersion.Windows7), reven2.ossi.Os(windows_version=reven2.ossi.WindowsVersion.Windows8))

Sample output:

reven2.ossi.os.OsError: Got 'Windows x64 10.0.18362 (Windows 10)', expected one of: 'Windows 7', 'Windows 8'

Doing something different depending on the OS

os = server.ossi.os() os.expect(reven2.ossi.Os(family=reven2.ossi.OsFamily.Linux)) if os.kernel_version.major == 4: # do something with a kernel 4.x pass elif os.kernel_version.major == 5: # do something with a kernel 5.x pass else: raise reven2.ossi.OsError(got=os, message="Expected kernel major version 4 or 5")

Manipulating transitions and contexts

Getting a transition or context from a transition id

Reven v2.2.0
tr = server.trace.transition(1234) ctx_before = server.trace.context_before(1234) ctx_after = server.trace.context_after(1234)

Context <-> Transition

Transition -> Context

Reven v2.2.0
ctx_before = tr.context_before() ctx_after = tr.context_after()

Context -> Transition

Reven v2.6.0
if ctx != server.trace.first_context: tr_before = ctx.transition_before() if ctx != server.trace.last_context: tr_after = ctx.transition_after()
Reven v2.2.0
if ctx != server.trace.context_before(0): tr_before = ctx.transition_before() if ctx != server.trace.context_after(server.trace.transition_count - 1): tr_after = ctx.transition_after()
There are not always transitions around a context

While transition.context_before/context_after() always works, one must handle the case where a context is the first/last of the trace, in which case no transition before/after it can be accessed.

Trying to access the transition before the first context/after the last, will trigger an IndexError.

Getting the next/previous context and transition

next_tr = tr + 1 prev_tr = tr - 1 next_ctx = ctx + 1 prev_ctx = ctx - 1 next_next_tr = tr + 2 # ...
There is not always a next/previous transition/context

Make sure that the resulting transition/context is in range when adding/subtracting an offset to generate a new transition/context.

Trying to access a transition/context out-of-range will trigger an IndexError.

Iterating on a range of transitions/contexts

Reven v2.2.0
for tr in server.trace.transitions(0, 1000): print(tr) for ctx in server.trace.contexts(): print(ctx)

Getting the first/last context/transition in the trace

Reven v2.6.0
first_tr = server.trace.first_transition last_tr = server.trace.last_transition first_ctx = server.trace.first_context last_ctx = server.trace.last_context
Reven v2.2.0
first_tr = server.trace.transition(0) last_tr = server.trace.transition(server.trace.transition_count - 1) first_ctx = server.trace.context_before(0) last_ctx = server.trace.context_after(server.trace.transition_count - 1)

Reading values or structs from registers or memory

Reven v2.2.0

Common imports for easy access

from reven2.address import LinearAddress, LogicalAddress, LogicalAddressSegmentIndex, PhysicalAddress from reven2.arch import x64 as regs from reven2.types import *

Getting the current ring

def current_ring(ctx): return ctx.read(regs.cs) & 3

Reading as a type

Integer types

ctx.read(regs.rax, U8) ctx.read(regs.rax, U16) ctx.read(regs.rax, I16) ctx.read(regs.rax, BigEndian(U16))

Sample output:

96 35680 -29856 24715

String

ctx.read(LogicalAddress(0xffffe00041cac2ea), CString(encoding=Encoding.Utf16, max_character_count=1000))

Sample output:

u'Network Store Interface Service'

Array

list(ctx.read(LogicalAddress(0xffffe00041cac2ea), Array(U8, 4)))

Sample output:

[78, 0, 101, 0]

Dereferencing pointers, reading the stack

Reading [rsp+0x20] manually:

addr = LogicalAddress(0x20) + ctx.read(regs.rsp, USize) ctx.read(addr, U64)

Reading [rsp+0x20] using deref:

ctx.deref(regs.rsp, Pointer(U64, base_address=LogicalAddress(0x20)))

Sample output:

10738

Reading struct values

Reven v2.11.0
OS Windows 64-bit
OS Windows 32-bit

Getting a struct type from a binary:

object_attributes_ty = next(server.ossi.executed_binaries("ntoskrnl")).exact_type("_OBJECT_ATTRIBUTES")

Sample output of printing object_attributes_ty:

StructKind.Struct _OBJECT_ATTRIBUTES /* 0x30 */ { /* 0x0 */ Length : U32, /* 0x8 */ RootDirectory : void*, /* 0x10 */ ObjectName : _UNICODE_STRING*, /* 0x18 */ Attributes : U32, /* 0x20 */ SecurityDescriptor : void*, /* 0x28 */ SecurityQualityOfService : void*, }

Reading a struct instance from a source (register, memory, etc):

# Reading structure from pointer stored in r8 object_attributes = ctx.read(reven2.address.LogicalAddress(0xffffe00041cac2ea), object_attributes_ty) # Read one field according to declared type object_attributes.field("Length").read()

Sample output:

48

You can also:

# Read one field, forcing its type to int-like. Useful for IDE type hints object_attributes.field("Attributes").read_int() # Get whole struct content as bytes object_attributes.as_bytes() # Dereferences a pointer-like field, read it as an inner struct, display its type print(object_attributes.field("ObjectName").deref_struct().type)

Sample output:

0 b'0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x98\xd1]\x06\x00\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' StructKind.Struct _UNICODE_STRING /* 0x10 */ { /* 0x0 */ Length : U16, /* 0x2 */ MaximumLength : U16, /* 0x8 */ Buffer : U16*, }

Parsing a raw buffer as a type

U16.parse(b"\x10\x20") BigEndian(U16).parse(b"\x10\x20") Array(U8, 2).parse(b"\x10\x20")

Sample output:

8208 4128 [16, 32]

Accessing the framebuffer memory to take a screenshot

Reven v2.12.0
image = ctx.framebuffer.image() # save the screenshot as a png file image.save("/tmp/framebuffer.png") # display the screenshot in the default image viewer image.show() # display the screenshot in a Jupyter Notebook display(image) # access RGB values from a pixel as a tuple image.getpixel((123, 42))

Moving to the beginning of a function

Reven v2.9.0
from reven2.trace import Transition from typing import Tuple, Optional def call_bounds(tr: Transition) -> Tuple[Optional[Transition], Optional[Transition]]: """ Given a transition anywhere inside of a function, this function returns the transition at the beginning of the function call and the transition at the end of the function call. """ return (tr.step_out(is_forward=False), tr.step_out(is_forward=True))

Searching for function calls

Looking for an exact symbol name

Reven v2.2.0
symbol_name = "CreateProcessW" binary_hint = r"kernelbase\.dll" try: symbol = next(server.ossi.symbols(f"^{symbol_name}$", binary_hint=binary_hint)) except StopIteration: raise RuntimeError(f"Could not find symbol '{symbol_name}' with binary hint '{binary_hint}'") for ctx in server.trace.search.symbol(symbol): print(ctx)
Handling of the case where the symbol is not in the trace

When looking for an exact symbol, you need to prepare for the case where the symbol is not available in the trace. Note that a symbol can be available even if it is never called.

In the example, it manifests with the iterator having no element, which raises a StopIteration, that we catch and then convert to a more specific error.

Symbol pattern is a regular expression

When looking for a single, exact symbol, you need to enclose the symbol's name in the pattern with ^$ because the pattern is actually a regular expression. Failure to do that could result in several matching symbols, with the risk that the rest of your code actually chooses the wrong one.

Looking for multiple symbols

Reven v2.2.0

The example provides an iterator of the tuples where the first element is the context of the call, and the second the name of the called symbol.

from itertools import repeat def first_symbol(symbol_name): return next(server.ossi.symbols(f"^{symbol_name}$", binary_hint=binary)) binary = "c:/windows/system32/ntoskrnl.exe" symbols = ["NtCreateFile", "NtOpenFile", "NtOpenDirectoryObject"] symbols_name = [(first_symbol(symbol), symbol) for symbol in symbols] symbols_name = [zip(server.trace.search.symbol(symbol[0]), repeat(symbol[1])) for symbol in symbols_name] for ctx_name in reven2.util.collate(symbols_name, lambda ctx_name: ctx_name[0]): print(f"{ctx_name[1]}: {ctx_name[0]}")

Sample output:

NtCreateFile: Context before #4468509 NtCreateFile: Context before #4479526 NtCreateFile: Context before #6451786 NtCreateFile: Context before #6852400 NtCreateFile: Context before #7666717 NtCreateFile: Context before #8067013 NtCreateFile: Context before #8298671 NtCreateFile: Context before #8648240 NtOpenFile: Context before #26656294 NtCreateFile: Context before #35251786 NtOpenFile: Context before #36420358 NtOpenFile: Context before #43268534 NtOpenDirectoryObject: Context before #43420816 NtOpenFile: Context before #43450170

Looking for a crash

System crashes

Reven v2.2.0
OS Windows 64-bit

Look for the symbol KeBugCheckEx in ntoskrnl:

crash_symbol = next(server.ossi.symbols("^KeBugCheckEx$", binary_hint="ntoskrnl")) for ctx in server.trace.search.symbol(crash_symbol): print(f"System crash at {ctx}")

Process crashes

Reven v2.3.0
OS Windows 64-bit

Look for the symbol KiUserExceptionDispatch in ntdll:

crash_symbol = next(server.ossi.symbols("^KiUserExceptionDispatch$", binary_hint="ntdll")) for ctx in server.trace.search.symbol(crash_symbol): process = ctx.ossi.process() print(f"{process.name} crashed at {ctx}")

Finding out when a memory location is accessed

Getting all the memory accesses of a range of addresses

Reven v2.2.0
for access in server.trace.memory_accesses(0xffff88007fc03000, 4096): print(access)

Sample output:

[#39 call 0xffffffff81611fe0 ($+0x165133)]Write access at @phy:0x7fc03ec8 (virtual address: lin:0xffff88007fc03ec8) of size 8 [#41 call qword ptr [0xffffffff81c24448]]Write access at @phy:0x7fc03ec0 (virtual address: lin:0xffff88007fc03ec0) of size 8 [#42 push rdx]Write access at @phy:0x7fc03eb8 (virtual address: lin:0xffff88007fc03eb8) of size 8 [#48 pop rdx]Read access at @phy:0x7fc03eb8 (virtual address: lin:0xffff88007fc03eb8) of size 8 [#49 ret ]Read access at @phy:0x7fc03ec0 (virtual address: lin:0xffff88007fc03ec0) of size 8 [#51 push rdi]Write access at @phy:0x7fc03ec0 (virtual address: lin:0xffff88007fc03ec0) of size 8 [#52 popfq ]Read access at @phy:0x7fc03ec0 (virtual address: lin:0xffff88007fc03ec0) of size 8 [#54 ret ]Read access at @phy:0x7fc03ec8 (virtual address: lin:0xffff88007fc03ec8) of size 8 [#60 call 0xffffffff814abe30 ($-0x108f)]Write access at @phy:0x7fc03ec8 (virtual address: lin:0xffff88007fc03ec8) of size 8 [#62 push r14]Write access at @phy:0x7fc03ec0 (virtual address: lin:0xffff88007fc03ec0) of size 8

Getting all the memory accesses on a range of transitions

Reven v2.9.0
for access in server.trace.memory_accesses(from_transition=server.trace.transition(1000), to_transition=server.trace.transition(1010)): print(access)

Sample output:

[#1005 mov qword ptr [rsp+0x40], r14]Write access at @phy:0x6645a9b0 (virtual address: lin:0xfffffe0ff31db9b0) of size 8 [#1007 mov qword ptr [rsp+0x98], r14]Write access at @phy:0x6645aa08 (virtual address: lin:0xfffffe0ff31dba08) of size 8 [#1008 or dword ptr [rsp+0x90], 0x2]Read access at @phy:0x6645aa00 (virtual address: lin:0xfffffe0ff31dba00) of size 4 [#1008 or dword ptr [rsp+0x90], 0x2]Write access at @phy:0x6645aa00 (virtual address: lin:0xfffffe0ff31dba00) of size 4

Finding the memory accesses at a transition

Reven v2.2.0
for access in tr.memory_accesses(): print(access)

Sample output:

[MemoryAccess(transition=Transition(id=42), physical_address=PhysicalAddress(offset=0x7fc03eb8), size=8, operation=MemoryAccessOperation.Write, virtual_address=LinearAddress(offset=0xffff88007fc03eb8))]

Getting all the memory accesses on the framebuffer

Reven v2.12.0
framebuffer_memory = server.trace.first_context.framebuffer.memory_range for access in server.trace.memory_accesses(address_range=framebuffer_memory): print(access)

Sample output:

[#462107 mov dword ptr ds:[rax-0x4], ecx]Write access at @phy:0xfd31d404 (virtual address: lin:0x7fd74fb16404) of size 4 [#462116 mov dword ptr ds:[rax-0x4], ecx]Write access at @phy:0xfd31d408 (virtual address: lin:0x7fd74fb16408) of size 4 [#462125 mov dword ptr ds:[rax-0x4], ecx]Write access at @phy:0xfd31d40c (virtual address: lin:0x7fd74fb1640c) of size 4

Displaying the value of arguments and return value of a call

Reven v2.10.0
API preview
OS Windows 64-bit
OS Linux 64-bit

When the prototype is known

Automatically detecting the calling convention from the OS

Reven v2.12.0
from typing import Optional def auto_calling_convention( server: reven2.RevenServer, ) -> Optional[reven2.preview.prototypes.callconv_helper.CallConvHelper]: prototypes = reven2.preview.prototypes.RevenPrototypes(server) if server.ossi.os().architecture != reven2.ossi.Architecture.X64: return None if server.ossi.os().family == reven2.ossi.OsFamily.Windows: return prototypes.calling_conventions.Ms64 elif server.ossi.os().family == reven2.ossi.OsFamily.Linux: return prototypes.calling_conventions.Sysv64 return None

Windows 64-bit example

call_tr = tr.step_out(is_forward=False) import reven2.preview.prototypes prototypes = reven2.preview.prototypes.RevenPrototypes(server) call_conv = auto_calling_convention(server) prototype = "char * __cdecl OaGetEnv(char const *);" f = prototypes.parse_one_function(prototype, call_conv) call = f.call_site_values(call_tr) call.arg_n(0) call.ret()

Sample output:

'OACACHEPARAMS' 0

Linux 64-bit example

Use the Sysv64 calling convention.

call_tr = tr.step_out(is_forward=False) import reven2.preview.prototypes prototypes = reven2.preview.prototypes.RevenPrototypes(server) call_conv = auto_calling_convention(server) prototype = "struct FILE; FILE* fopen64(const char *filename, const char *mode);" f = prototypes.parse_one_function(prototype, call_conv) call = f.call_site_values(call_tr) call.args() call.ret()

Sample output:

{'filename': '/proc/spl/kstat/zfs/arcstats', 'mode': 'r'} 0

Using a default prototype

Example with 5 parameters.

call_tr = tr.step_out(is_forward=False) import reven2.preview.prototypes prototypes = reven2.preview.prototypes.RevenPrototypes(server) call_conv = prototypes.calling_conventions.Ms64 prototype = "void* f(void* p0, void* p1, void* p2, void* p3, void* p4);" f = prototypes.parse_one_function(prototype, call_conv) call = f.call_site_values(call_tr) call.args() call.ret()

Sample output:

{'p0': 18446735287469384880, 'p1': 0, 'p2': 18446735287473289024, 'p3': 0, 'p4': 0} 1364968393473
Limitations apply

Using a default prototype works as long as the replaced parameters behave as void*.

This is notably not the case for structs passed by value that are larger than a pointer (in some calling conventions) and for floating-point arguments (in Sysv64).

Searching for values in memory

Searching for values in a range of memory at a single context

If the value you are looking for might be in a range of memory at a specific point in the trace, you can search it with the following:

pattern = bytearray([0x77, 0x43]) for match_address in ctx.search_in_memory(pattern, ctx.read(regs.rsp), 0x1000000): print(f"'{pattern}' is found starting at @{match_address}")

Searching for string-like values accessed in the trace

Reven v2.2.0

If the value you are looking for looks like a string indexed by the strings resource (defaults to strings of printable characters of length 5-128, possibly UTF-16 encoded), then you can use the strings search API, which will be the fastest way:

from_tr = server.trace.first_transition to_tr = server.trace.last_transition for string in server.trace.strings("Network"): for access in string.memory_accesses(from_tr, to_tr): print(access)

Sample output:

[#4653465 mov word ptr [rcx], ax]Write access at @phy:0x18e8252c (virtual address: lin:0x3a252c) of size 2 [#4653475 mov word ptr [rcx], ax]Write access at @phy:0x18e8252e (virtual address: lin:0x3a252e) of size 2 [#4653485 mov word ptr [rcx], ax]Write access at @phy:0x18e82530 (virtual address: lin:0x3a2530) of size 2 [#4653495 mov word ptr [rcx], ax]Write access at @phy:0x18e82532 (virtual address: lin:0x3a2532) of size 2 [#4653505 mov word ptr [rcx], ax]Write access at @phy:0x18e82534 (virtual address: lin:0x3a2534) of size 2 [#4653515 mov word ptr [rcx], ax]Write access at @phy:0x18e82536 (virtual address: lin:0x3a2536) of size 2 [#4653525 mov word ptr [rcx], ax]Write access at @phy:0x18e82538 (virtual address: lin:0x3a2538) of size 2 [#4653535 mov word ptr [rcx], ax]Write access at @phy:0x18e8253a (virtual address: lin:0x3a253a) of size 2 [#4653545 mov word ptr [rcx], ax]Write access at @phy:0x18e8253c (virtual address: lin:0x3a253c) of size 2 [#4653555 mov word ptr [rcx], ax]Write access at @phy:0x18e8253e (virtual address: lin:0x3a253e) of size 2 [#4653565 mov word ptr [rcx], ax]Write access at @phy:0x18e82540 (virtual address: lin:0x3a2540) of size 2 [#4653575 mov word ptr [rcx], ax]Write access at @phy:0x18e82542 (virtual address: lin:0x3a2542) of size 2 [#4653585 mov word ptr [rcx], ax]Write access at @phy:0x18e82544 (virtual address: lin:0x3a2544) of size 2 [#4653595 mov word ptr [rcx], ax]Write access at @phy:0x18e82546 (virtual address: lin:0x3a2546) of size 2 [#4653605 mov word ptr [rcx], ax]Write access at @phy:0x18e82548 (virtual address: lin:0x3a2548) of size 2 [#4653615 mov word ptr [rcx], ax]Write access at @phy:0x18e8254a (virtual address: lin:0x3a254a) of size 2 [#4653625 mov word ptr [rcx], ax]Write access at @phy:0x18e8254c (virtual address: lin:0x3a254c) of size 2
Unaccessed values are not found by this method

This method only finds values in memory that are accessed (read from, written to) during the portion of the trace where the search takes place.

If you don't know if your value is accessed in the trace, you can associate this method with a search in memory at a context.

Searching for other kinds of values accessed in the trace

Reven v2.6.0
from_ctx = server.trace.first_context to_ctx = server.trace.last_context for match in server.trace.search.memory(b"\xc0\xfc\x75\x02", from_ctx, to_ctx).matches(): print(match)

Sample output:

id: 0 | @lin:0xab600 (mapped at Context before #5665813) | [Context before #5665813 - Context after #16899165] | 1 access(es) id: 1 | @lin:0xbadf0 (mapped at Context before #5666826) | [Context before #5666826 - Context after #16899165] | 1 access(es) id: 2 | @lin:0xac810 (mapped at Context before #5666966) | [Context before #5666966 - Context before #6141251] | 1 access(es) id: 3 | @lin:0xb0935 (mapped at Context before #6140550) | [Context before #6140550 - Context after #16899165] | 1 access(es) id: 4 | @lin:0xab790 (mapped at Context before #6143172) | [Context before #6143172 - Context after #16899165] | 1 access(es) id: 5 | @lin:0xffffe0016aec9800 (mapped at Context before #6144279) | [Context before #6144279 - Context before #6379441] | 2 access(es) id: 6 | @lin:0xffffe0016a7accc0 (mapped at Context before #6152909) | [Context before #6152909 - Context before #6372050] | 1 access(es) id: 7 | @lin:0xc7e605 (mapped at Context before #6218204) | [Context before #6218204 - Context after #16899165] | 15 access(es) id: 8 | @lin:0xc67780 (mapped at Context before #6235773) | [Context before #6235773 - Context after #16899165] | 10 access(es) id: 9 | @lin:0x29feb26 (mapped at Context before #6360085) | [Context before #6360085 - Context before #6360107] | 2 access(es) id: 10 | @lin:0x29feb26 (mapped at Context before #6360108) | [Context before #6360108 - Context before #6360254] | 3 access(es) id: 11 | @lin:0x29feb26 (mapped at Context before #6360255) | [Context before #6360255 - Context after #16899165] | 2 access(es) id: 12 | @lin:0x29feb06 (mapped at Context before #6360265) | [Context before #6360265 - Context before #6360264] | 0 access(es) id: 13 | @lin:0x29feb06 (mapped at Context before #6360265) | [Context before #6360265 - Context after #16899165] | 2 access(es) id: 14 | @lin:0x29fe946 (mapped at Context before #6360440) | [Context before #6360440 - Context before #6360439] | 0 access(es) id: 15 | @lin:0x29fe946 (mapped at Context before #6360440) | [Context before #6360440 - Context before #6360458] | 2 access(es)
Unaccessed values are not found by this method

This method only finds values in memory that are accessed (read from, written to) during the portion of the trace where the search takes place.

If you don't know if your value is accessed in the trace, you can associate this method with a search in memory at a context.

Listing the processes, binaries in the trace

Listing the processes executed in the trace

Reven v2.10.0
OS Windows 64-bit
for process in server.ossi.executed_processes(): print(process)

Sample output:

cmd.exe (2320) chat_client.exe (2832) conhost.exe (2704) cmd.exe (2716) ShellExperienceHost.exe (2044) svchost.exe (876) conhost.exe (2596) ...

Listing the binaries executed in the trace

Reven v2.2.0
for binary in server.ossi.executed_binaries(): print(binary)

Sample output:

c:/windows/system32/wevtsvc.dll c:/windows/system32/oleacc.dll c:/windows/system32/inputswitch.dll c:/windows/explorer.exe c:/windows/system32/ci.dll c:/windows/system32/drivers/pciidex.sys c:/windows/system32/drivers/intelide.sys

Finding a single binary in the trace

Reven v2.2.0
def find_one_binary(binary_path): """ Return the binary corresponding to the passed portion of its path if any, None if there isn't one, and throws if there would be two matches or more. """ query = server.ossi.executed_binaries(binary_path) try: first = next(query) except StopIteration: return None try: second = next(query) raise ValueError(f"Found multiple binaries '{first}' and '{second}' for query '{binary_path}'") except StopIteration: return first

Finding the base address where a binary has been loaded

Because a binary can be loaded multiple times at different addresses in a trace, we recover the base address from a Context where the binary is executed.

Finding the base address of the first instance of a binary in the trace

Reven v2.2.0
def find_first_base_address(binary: reven2.ossi.ossi.Binary): for ctx in server.trace.search.binary(binary): return ctx.ossi.location().base_address

Finding all the base addresses of a binary in a specified process

Reven v2.10.0
def find_base_address_in_process(binary: reven2.ossi.ossi.Binary, process: reven2.ossi.process.Process): for ctx_range in server.trace.filter(processes=(process,)): for ctx in server.trace.search.binary(binary, ctx_range.begin, ctx_range.end): return ctx.ossi.location().base_address

Focusing on a portion of the trace

Filtering on processes/ring

Reven v2.10.0
processes = list(server.ossi.executed_processes("chat")) for ctx_range in server.trace.filter(processes, reven2.filter.RingPolicy.R3Only): first_context = next(iter(ctx_range)) print(f"{ctx_range}: {first_context.ossi.process()}\t| {first_context.ossi.location()}")

Sample output:

[Context before #5662909, Context before #5669296]: chat_client.exe (2816) | ntdll!ZwDeviceIoControlFile+0x14 [Context before #5670173, Context before #5671124]: chat_client.exe (2816) | ntdll!NtSetIoCompletion+0x14 [Context before #5672523, Context before #5673027]: chat_client.exe (2816) | ntdll!NtRemoveIoCompletionEx+0x14 [Context before #5678502, Context before #5678522]: chat_client.exe (2816) | chat_client!<tokio_timer::clock::clock::Clock as tokio_timer::timer::now::Now>::now+0x24 [Context before #5678723, Context before #5679392]: chat_client.exe (2816) | ntdll!ZwQueryPerformanceCounter+0x14 ...

Following the dataflow

Finding out from where data comes

The below displays all processes and functions that some tainted buffer goes through:

import reven2.preview.taint as tainting tainter = tainting.Tainter(server.trace) taint = tainter.simple_taint("rax", to_context=tr.context_before(), is_forward=False) last_symbol = None last_process = None for access in taint.accesses().all(): ctx_after = access.transition.context_after() new_process = ctx_after.ossi.process() new_symbol = ctx_after.ossi.location().symbol if new_symbol is None: if last_symbol is not None: print(f"{new_process}: ???") last_symbol = None last_process = new_process continue if (last_symbol is not None and last_process is not None and new_symbol == last_symbol and new_process.pid == last_process.pid ): continue last_symbol = new_symbol last_process = new_process print(f"{new_process}: {new_symbol}")

Sample output:

conhost.exe (2704): win32kfull!memcpy conhost.exe (2704): msvcrt!memcpy conhost.exe (2704): conhostv2!WriteCharsLegacy conhost.exe (2704): conhostv2!WriteBuffer::Print conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print conhost.exe (2704): conhostv2!WriteChars conhost.exe (2704): msvcrt!memcpy conhost.exe (2704): conhostv2!WriteCharsLegacy conhost.exe (2704): conhostv2!WriteBuffer::Print conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print conhost.exe (2704): conhostv2!WriteChars conhost.exe (2704): msvcrt!memcpy conhost.exe (2704): conhostv2!WriteCharsLegacy conhost.exe (2704): conhostv2!WriteBuffer::Print conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print conhost.exe (2704): conhostv2!WriteChars conhost.exe (2704): msvcrt!memcpy conhost.exe (2704): conhostv2!WriteCharsLegacy conhost.exe (2704): conhostv2!WriteBuffer::Print conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print conhost.exe (2704): conhostv2!WriteChars conhost.exe (2704): msvcrt!memcpy conhost.exe (2704): conhostv2!WriteCharsLegacy conhost.exe (2704): conhostv2!WriteBuffer::Print conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print conhost.exe (2704): conhostv2!WriteChars conhost.exe (2704): ??? chat_client.exe (2832): ntdll!memcpy chat_client.exe (2832): chat_client!<alloc::vec::Vec<T> as alloc::vec::SpecExtend<T, I>>::from_iter chat_client.exe (2832): ntdll!RtlpReAllocateHeapInternal chat_client.exe (2832): ntdll!memcpy chat_client.exe (2832): ntdll!RtlpReAllocateHeapInternal chat_client.exe (2832): chat_client!<alloc::vec::Vec<T> as alloc::vec::SpecExtend<T, I>>::from_iter chat_client.exe (2832): ntdll!RtlpAllocateHeapInternal chat_client.exe (2832): chat_client!<alloc::vec::Vec<T> as alloc::vec::SpecExtend<T, I>>::from_iter chat_client.exe (2832): msvcrt!memcpy chat_client.exe (2832): netio!memcpy chat_server.exe (648): netio!memcpy chat_server.exe (648): ??? chat_server.exe (648): msvcrt!memcpy chat_server.exe (648): chat_server!bytes::buf::buf_mut::BufMut::put chat_server.exe (648): msvcrt!memcpy chat_server.exe (648): chat_server!bytes::buf::buf_mut::BufMut::put chat_server.exe (648): chat_server!<&'a str as bytes::buf::into_buf::IntoBuf>::into_buf chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll chat_server.exe (648): chat_server!bytes::bytes::Inner::reserve chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll chat_server.exe (648): chat_server!<futures::sync::mpsc::UnboundedReceiver<T> as futures::stream::Stream>::poll chat_server.exe (648): chat_server!<futures::sync::mpsc::Receiver<T>>::next_message chat_server.exe (648): chat_server!<futures::sync::mpsc::queue::Queue<T>>::pop chat_server.exe (648): chat_server!<futures::sync::mpsc::Sender<T>>::queue_push_and_signal chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll chat_server.exe (648): chat_server!<bytes::bytes::Bytes as core::clone::Clone>::clone chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as bytes::buf::buf_mut::BufMut>::put_slice chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice chat_server.exe (648): chat_server!bytes::bytes::Inner::reserve chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as bytes::buf::buf_mut::BufMut>::put_slice chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice chat_server.exe (648): chat_server!bytes::bytes::Inner::reserve chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as bytes::buf::buf_mut::BufMut>::put_slice chat_server.exe (648): msvcrt!memcpy chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as bytes::buf::buf_mut::BufMut>::put_slice chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice chat_server.exe (648): chat_server!bytes::bytes::Inner::reserve chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as core::clone::Clone>::clone chat_server.exe (648): msvcrt!memcpy chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as core::clone::Clone>::clone chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll chat_server.exe (648): chat_server!<chat_server::Lines as futures::stream::Stream>::poll chat_server.exe (648): chat_server!bytes::bytes::BytesMut::split_off chat_server.exe (648): chat_server!bytes::bytes::Inner::set_start chat_server.exe (648): chat_server!bytes::bytes::BytesMut::split_off chat_server.exe (648): chat_server!<chat_server::Lines as futures::stream::Stream>::poll chat_server.exe (648): chat_server!bytes::bytes::BytesMut::split_to chat_server.exe (648): chat_server!bytes::bytes::Inner::shallow_clone_sync chat_server.exe (648): chat_server!bytes::bytes::BytesMut::split_to chat_server.exe (648): chat_server!<chat_server::Lines as futures::stream::Stream>::poll chat_server.exe (648): chat_server!<tokio_tcp::stream::TcpStream as tokio_io::async_read::AsyncRead>::read_buf chat_server.exe (648): chat_server!mio::sys::windows::tcp::TcpStream::readv chat_server.exe (648): chat_server!<&'a std::net::tcp::TcpStream as std::io::Read>::read chat_server.exe (648): ws2_32!recv chat_server.exe (648): mswsock!WSPRecv chat_server.exe (648): ntoskrnl!IopXxxControlFile chat_server.exe (648): ??? chat_server.exe (648): ntoskrnl!IopXxxControlFile chat_server.exe (648): ntoskrnl!NtDeviceIoControlFile chat_server.exe (648): ntoskrnl!KiSystemCall64 chat_server.exe (648): mswsock!WSPRecv chat_server.exe (648): ws2_32!recv chat_server.exe (648): chat_server!mio::sys::windows::tcp::TcpStream::readv chat_client.exe (2816): ??? chat_client.exe (2816): netio!RtlCopyMdlToBuffer chat_client.exe (2816): ??? chat_client.exe (2816): tcpip!TcpIndicateData chat_client.exe (2816): ndis!NdisAdvanceNetBufferDataStart chat_client.exe (2816): tcpip!IppPrevalidateLoopbackReceive chat_client.exe (2816): tcpip!IpNlpFastContinueSendDatagram chat_client.exe (2816): tcpip!IppSendDatagramsCommon chat_client.exe (2816): tcpip!IppPreparePacketChecksum chat_client.exe (2816): tcpip!IppSendDatagramsCommon chat_client.exe (2816): tcpip!TcpTcbSend chat_client.exe (2816): netio!NetioExtendNetBuffer chat_client.exe (2816): tcpip!TcpSegmentTcbSend chat_client.exe (2816): tcpip!TcpBeginTcbSend chat_client.exe (2816): netio!NetioAllocateAndReferenceNetBufferListNetBufferMdlAndData chat_client.exe (2816): ndis!NdisAllocateNetBufferList chat_client.exe (2816): netio!NetioAllocateAndReferenceNetBufferListNetBufferMdlAndData chat_client.exe (2816): tcpip!TcpBeginTcbSend chat_client.exe (2816): tcpip!TcpTcbSend chat_client.exe (2816): tcpip!TcpEnqueueTcbSend chat_client.exe (2816): ??? chat_client.exe (2816): chat_client!<std::net::tcp::TcpStream as miow::net::TcpStreamExt>::write_overlapped chat_client.exe (2816): chat_client!mio::sys::windows::tcp::StreamImp::schedule_write chat_client.exe (2816): chat_client!mio::poll::SetReadiness::set_readiness chat_client.exe (2816): chat_client!mio::sys::windows::tcp::StreamImp::schedule_write chat_client.exe (2816): chat_client!mio::sys::windows::tcp::TcpStream::writev chat_client.exe (2816): chat_client!<iovec::IoVec as core::ops::deref::Deref>::deref chat_client.exe (2816): chat_client!mio::sys::windows::tcp::TcpStream::writev chat_client.exe (2816): chat_client!mio::sys::windows::selector::ReadyBinding::get_buffer chat_client.exe (2816): chat_client!<mio::net::tcp::TcpStream as std::io::Write>::write chat_client.exe (2816): chat_client!iovec::IoVec::from_bytes chat_client.exe (2816): chat_client!<mio::net::tcp::TcpStream as std::io::Write>::write chat_client.exe (2816): chat_client!<tokio_reactor::poll_evented::PollEvented<E> as std::io::Write>::write chat_client.exe (2816): chat_client!<tokio_reactor::poll_evented::PollEvented<E>>::poll_write_ready chat_client.exe (2816): chat_client!<tokio_reactor::poll_evented::PollEvented<E> as std::io::Write>::write chat_client.exe (2816): chat_client!_ZN99_$LT$tokio_io.._tokio_codec..framed_write..FramedWrite2$LT$T$GT$$u20$as$u20$futures..sink..Sink$GT$13poll_complete17h99e4d chat_client.exe (2816): chat_client!bytes::buf::buf_mut::BufMut::put chat_client.exe (2816): chat_client!<&'a str as bytes::buf::into_buf::IntoBuf>::into_buf chat_client.exe (2816): chat_client!_ZN99_$LT$tokio_io.._tokio_codec..framed_write..FramedWrite2$LT$T$GT$$u20$as$u20$futures..sink..Sink$GT$10start_send17h590abca8 chat_client.exe (2816): chat_client!<futures::stream::split::SplitSink<S> as futures::sink::Sink>::start_send chat_client.exe (2816): chat_client!<futures::stream::forward::Forward<T, U>>::try_start_send chat_client.exe (2816): chat_client!<futures::stream::forward::Forward<T, U> as futures::future::Future>::poll chat_client.exe (2816): chat_client!<futures::stream::map_err::MapErr<S, F> as futures::stream::Stream>::poll chat_client.exe (2816): chat_client!<futures::sync::mpsc::Receiver<T> as futures::stream::Stream>::poll chat_client.exe (2816): chat_client!<futures::sync::mpsc::Receiver<T>>::next_message chat_client.exe (2816): chat_client!<futures::sync::mpsc::queue::Queue<T>>::pop chat_client.exe (2816): chat_client!<futures::sync::mpsc::Sender<T>>::do_send chat_client.exe (2816): chat_client!<futures::sink::send::Send<S> as futures::future::Future>::poll chat_client.exe (2816): chat_client!futures::future::Future::wait chat_client.exe (2816): chat_client!chat_client::read_stdin chat_client.exe (2816): chat_client!<std::io::stdio::Stdin as std::io::Read>::read chat_client.exe (2816): chat_client!<std::io::buffered::BufReader<R> as std::io::Read>::read chat_client.exe (2816): chat_client!<std::io::buffered::BufReader<R> as std::io::BufRead>::fill_buf chat_client.exe (2816): chat_client!std::sys::windows::stdio::Stdin::read chat_client.exe (2816): chat_client!<std::io::cursor::Cursor<T> as std::io::Read>::read chat_client.exe (2816): chat_client!std::sys::windows::stdio::Stdin::read chat_client.exe (2816): ntdll!RtlpFreeHeap chat_client.exe (2816): chat_client!std::sys::windows::stdio::Stdin::read chat_client.exe (2816): chat_client!alloc::string::String::from_utf16 chat_client.exe (2816): chat_client!alloc::string::String::push chat_client.exe (2816): ntdll!RtlpReAllocateHeapInternal chat_client.exe (2816): chat_client!alloc::string::String::push chat_client.exe (2816): ntdll!RtlpReAllocateHeapInternal chat_client.exe (2816): chat_client!alloc::string::String::push chat_client.exe (2816): ntdll!RtlpReAllocateHeapInternal chat_client.exe (2816): chat_client!alloc::string::String::push chat_client.exe (2816): ntdll!RtlpAllocateHeapInternal chat_client.exe (2816): chat_client!alloc::string::String::from_utf16 chat_client.exe (2816): kernelbase!StrRetToStrW

Displaying data nicely in Jupyter notebooks

Detecting if the script is executed in a notebook or not

def in_notebook(): """ Detect if we are currently running a Jupyter notebook. This is used to display rendered results inline in Jupyter when we are executing in the context of a Jupyter notebook, or to display raw results on the standard output when we are executing in the context of a script. """ try: from IPython import get_ipython # type: ignore if get_ipython() is None or ("IPKernelApp" not in get_ipython().config): return False except ImportError: return False return True

Displaying a screenshot from the trace

Reven v2.12.0
Environment Jupyter
ctx = server.trace.first_context display(ctx.framebuffer.image())

Sample output:

Trace screenshot

Displaying in tables

# helper functions to output html tables def table_line(cells): line = "" for cell in cells: line += "<td>{}</td>".format(cell) return "<tr>{}</tr>".format(line) def display_table(title, headers, html_lines): header_line = "" for header in headers: header_line += "<th>{}</th>".format(header) header_line = "<tr>{}</tr>".format(header_line) display(HTML("""<h2>{}</h2><table>{} {}</table>""".format(title, header_line, html_lines)))

Pretty printing

Basics

Reven v2.7.0
Environment Jupyter
from IPython.display import HTML, display display(HTML(f"<strong>{tr.format_as_html()}</strong>"))

Sample output:

Transition displayed strong

Transition

Reven v2.5.0
Environment Jupyter
import reven2 from IPython.display import HTML, display import re import itertools import html all_regs = {} for reg in reven2.arch.helpers.x64_registers(): all_regs[reg.name] = reg def tokenize_string(string): return re.split(" |dword|ptr|\\[|\\]|\\+|\\*|,", string) def tokenize_instruction(transition): if transition.instruction is None: return [] return tokenize_string(str(transition.instruction)) def get_pretty_print_tr(tr, show_context=False, show_symbol=False): output = "" if show_symbol: output += "<span>{}</span><br/>".format(html.escape(str(tr.context_before().ossi.location()))) output += tr._repr_html_() output += " <code>{}</code>".format(str(tr).split(" ", 1)[1]) output += "<br/>" if not show_context: return '<p style="font-family:monospace" class="tex2jax_ignore">' + output + "</p>" instr_elements = tokenize_instruction(tr) done_already = [] print_data = [] for elem in instr_elements: if elem in done_already: continue done_already.append(elem) if elem in all_regs: before = tr.context_before().read(all_regs[elem]) after = tr.context_after().read(all_regs[elem]) if before == after or elem in ["rip"]: print_data.append("{} = {:x}".format(elem, before)) else: print_data.append("{} = {:x} to {:x}".format(elem, before, after)) output += ", ".join(print_data) + "<br/>" print_data = [] max_items = 4 accesses = list(itertools.islice(tr.memory_accesses(), max_items)) for acc in accesses: elem = "{}[{:#x}]:{}".format("R" if acc.operation == reven2.memhist.MemoryAccessOperation.Read else "W", acc.virtual_address.offset, acc.size) try: before = tr.context_before().read(acc.virtual_address, acc.size) after = tr.context_after().read(acc.virtual_address, acc.size) if before == after: print_data.append("{} = {:x}".format(elem, before)) else: print_data.append("{} = {:x} to {:x}".format(elem, before, after)) except: print_data.append(elem + " = ?") if len(accesses) > max_items: print_data.append("...") output += ", ".join(print_data) return '<p style="font-family:monospace" class="tex2jax_ignore">' + output + "</p>" def pretty_print_tr(tr, show_context=False, show_symbol=False): display(HTML(get_pretty_print_tr(tr, show_context=show_context, show_symbol=show_symbol)))

Sample output:

Pretty transitions

Address / Hexdump

Reven v2.5.0
Environment Jupyter

Pretty print the content of a buffer at a specified address, in 0x10 increments. If passed a register, it will be dereferenced.

import reven2 from reven2.address import LinearAddress # shortcut when reading addresses from IPython.display import HTML, display def print_buffer(b, addr=0, col=16, highlights = []): style_highlight = '<span style="background-color:yellow">' style_highlight_off = '</span>' output = '<code style="background-color:white">\n' prev_all_zeros = 0 for i in range(int(len(b) / col)): total_sum = sum(b[i*col:(i+1)*col]) for h in highlights: if h in range(i*col,col): total_sum = 0 total_sum_next = sum(b[(i+1)*col:(i+2)*col]) if total_sum == 0: prev_all_zeros += 1 else: prev_all_zeros == 0 if prev_all_zeros > 0 and total_sum_next == 0 and len(b) >= (i+2)*col: if prev_all_zeros == 1: output += "...\n" continue output += "{:016x}:".format(i*col + addr) for j in range(col): offset = j + i*col if offset >= len(b): break if j % 8 == 0: output += " " total_sum += b[offset] if offset in highlights: output += style_highlight output += "{:02x}".format(b[offset]) if offset in highlights: output += style_highlight_off output += " " output += "- " for j in range(col): offset = j + i*col if offset >= len(b): break c = b[offset] if offset in highlights: output += style_highlight if c >= 32 and c <= 126: output += "{}".format(chr(c)) else: output += "." if offset in highlights: output += style_highlight_off output += "\n" output += "</code>" display(HTML(output)) def pretty_print_addr(ctx, address, size): if isinstance(address, reven2.arch.register.Register): address = ctx.read(address) if isinstance(address, int): address = LinearAddress(address) print_buffer(ctx.read(address, size), address.offset)

Sample output:

Pretty addresses

Exporting tabular data to work with reporting tools

Pre-requisites: install pandas

Reven v2.9.0
Environment Jupyter

This example requires the pandas Python package:

# From a code cell of a Jupyter notebook try: import pandas print("pandas already installed") except ImportError: print("Could not find pandas, attempting to install it from pip") import sys output = !{sys.executable} -m pip install pandas; echo $? # noqa success = output[-1] for line in output[0:-1]: print(line) if int(success) != 0: raise RuntimeError("Error installing pandas") import pandas print("Successfully installed pandas")

Building tabular data from filters

Reven v2.10.0
Dependency pandas
res = [] header = ["Context range", "Process", "Location"] processes = list(server.ossi.executed_processes("chat")) for ctx_range in server.trace.filter(processes, reven2.filter.RingPolicy.R3Only): first_context = next(iter(ctx_range)) res.append((ctx_range, first_context.ossi.process(), first_context.ossi.location())) df = pandas.DataFrame.from_records(res, columns=header)

Displaying tabular data in a Jupyter notebook

Reven v2.9.0
Environment Jupyter
Dependency pandas
res = [] header = ["Context range", "Process", "Location"] processes = list(server.ossi.executed_processes("chat")) for ctx_range in server.trace.filter(processes, reven2.filter.RingPolicy.R3Only): first_context = next(iter(ctx_range)) res.append((ctx_range, first_context.ossi.process(), first_context.ossi.location())) df = pandas.DataFrame.from_records(res, columns=header) # display table truncated in the middle display(df)

Sample output:

Jupyter rendered table

Exporting tabular data to csv

Reven v2.10.0
Dependency pandas
df.to_csv("data.csv")

This can then be opened in e.g. spreadsheet software:

Spreadsheet

Managing bookmarks

Working with bookmarks starting with a specific "tag"

Let's consider only bookmarks whose description starts with "auto".

Adding

for i in range(0, 100): server.bookmarks.add(server.trace.transition(i), f"auto: {i}")

Listing

for bookmark in server.bookmarks.all(): if not bookmark.description.startswith("auto"): continue print(bookmark)

Removing

server.bookmarks.remove_if(lambda bookmark: bookmark.description.startswith("auto:"))

Finding more about the semantics of an instruction

Pre-requisites: install and setup Capstone

Environment Jupyter

This example requires the capstone Python package:

# From a code cell of a Jupyter notebook try: import capstone print("capstone already installed") except ImportError: print("Could not find capstone, attempting to install it from pip") import sys import subprocess command = [f"{sys.executable}", "-m", "pip", "install", "capstone"] p = subprocess.run(command) if int(p.returncode) != 0: raise RuntimeError("Error installing capstone") import capstone # noqa print("Successfully installed capstone")

Using Capstone

Since we need to maintain several objects that we would need to pass to the various functions, the easiest way is to use classes:

class DisassembledInstruction: def __init__(self, _tr : reven2.trace.Transition, _cs_insn): self._tr = _tr self._cs_insn = _cs_insn def _read_transition_reg(self, reg: reven2.arch.register.Register): """ Read the value of a register during computations performed by the instruction. For PC, it is the value after the instruction. For other registers, it is the value before the instruction. """ if reg in [reven2.arch.x64.rip, reven2.arch.x64.eip]: return self._tr.pc + self._cs_insn.size else: return self._tr.context_before().read(reg) def dereferenced_address(self, op_index: int): from reven2.arch.register import Register cs_op = self._cs_insn.operands[op_index] if cs_op.type != capstone.CS_OP_MEM: raise IndexError("The selected operand is not a memory operand") dereferenced_address = 0 if cs_op.value.mem.base != 0: base_reg = Register.from_name(self._cs_insn.reg_name(cs_op.value.mem.base)) dereferenced_address += self._read_transition_reg(base_reg) if cs_op.value.mem.index != 0: index_reg = Register.from_name(self._cs_insn.reg_name(cs_op.value.mem.index)) index = self._read_transition_reg(index_reg) dereferenced_address += (cs_op.value.mem.scale * index) dereferenced_address += cs_op.value.mem.disp # mask instruction depending on mode mask = 0xFFFF_FFFF_FFFF_FFFF if self._tr.mode == reven2.trace.Mode.X86_64 else 0xFFFF_FFFF return dereferenced_address & mask @property def capstone_instruction(self): return self._cs_insn @property def transition(self): return self._tr class Disassembler: def __init__(self): self._md_64 = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64) self._md_64.detail = True self._md_32 = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32) self._md_32.detail = True def disassemble(self, tr: reven2.trace.Trace): """ Attempt to disassemble with Capstone the instruction associated to the passed transition. Returns None if there is no instruction to disassemble. """ if tr.instruction is not None: instruction = tr.instruction elif tr.exception is not None and tr.exception.related_instruction is not None: instruction = tr.exception.related_instruction else: return None if tr.mode == reven2.trace.Mode.X86_64: md = self._md_64 elif tr.mode == reven2.trace.Mode.X86_32: md = self._md_32 else: raise ValueError("Unsupported mode '{tr.mode}'") cs_insn = next(md.disasm(instruction.raw, instruction.size)) return DisassembledInstruction(tr, cs_insn)

Disassembling Reven instructions

Reven v2.10.0
Dependency capstone
dsm = Disassembler() insn = dsm.disassemble(tr) # Access the capstone instruction insn.capstone_instruction

Depending on your use-case you may want to skip disassembling instructions related to exceptions, as these are not always (fully) executed.

Compute dereferenced address

Reven v2.2.0
Dependency capstone
hex(insn.dereferenced_address(0))

Sample output:

'0xfffff8024cfacfb0'

Convert capstone flags to Reven register flags

Reven v2.2.0
Dependency capstone
test_eflags = { capstone.x86.X86_EFLAGS_TEST_OF: reven2.arch.x64.of, capstone.x86.X86_EFLAGS_TEST_SF: reven2.arch.x64.sf, capstone.x86.X86_EFLAGS_TEST_ZF: reven2.arch.x64.zf, capstone.x86.X86_EFLAGS_TEST_PF: reven2.arch.x64.pf, capstone.x86.X86_EFLAGS_TEST_CF: reven2.arch.x64.cf, capstone.x86.X86_EFLAGS_TEST_NT: reven2.arch.x64.nt, capstone.x86.X86_EFLAGS_TEST_DF: reven2.arch.x64.df, capstone.x86.X86_EFLAGS_TEST_RF: reven2.arch.x64.rf, capstone.x86.X86_EFLAGS_TEST_IF: reven2.arch.x64.if_, capstone.x86.X86_EFLAGS_TEST_TF: reven2.arch.x64.tf, capstone.x86.X86_EFLAGS_TEST_AF: reven2.arch.x64.af, } for flag, reg in test_eflags.items(): if not insn.capstone_instruction.eflags & flag: # register not present, skip continue print(f"{reg} is affected by the instruction")

Accessing handles

Reven v2.11.0
OS Windows 64-bit

Listing the file handles owned by the process

import reven2.preview.windows as windows ctx = windows.Context(ctx) # Focus on process handles, ignore the rest for handle in ctx.handles(kernel_handles = False, special_handles = False): try: # Request FileObject handles only, otherwise raise exception obj = handle.object(windows.FileObject) except ValueError: continue print(f"{handle}: {obj.filename_with_device}")

Sample output:

Handle 0x4 (object: lin:0xffffc8016fc2c760): \Device\ConDrv\Reference Handle 0x44 (object: lin:0xffffc8016fa92810): \Device\HarddiskVolume2\reven Handle 0x48 (object: lin:0xffffc8016fa94a70): \Device\ConDrv\Connect Handle 0x50 (object: lin:0xffffc8016fa921d0): \Device\ConDrv\Input Handle 0x54 (object: lin:0xffffc8016fa91eb0): \Device\ConDrv\Output Handle 0x58 (object: lin:0xffffc8016fa91eb0): \Device\ConDrv\Output Handle 0xa4 (object: lin:0xffffc8016f8e3b00): \Device\HarddiskVolume2\reven\output

Getting a handle by value

import reven2.preview.windows as windows ctx = windows.Context(ctx) handle = ctx.handle(0xa4) print(handle.object())

Sample output:

File object "\Device\HarddiskVolume2\reven\output" (lin:0xffffc8016f8e3b00)

Managing ranges of memory

Reven v2.11.0

A range of memory is a very common concept. Storing, updating, merging or iterating on such ranges are operations that are likely to be necessary to users of the Reven API.

Reven provides a "canonical" implementation in the API that spares users from having to deal with multiple reimplementations.

Creating & updating memory ranges

import reven2.memory_range as mr # First range r1 = mr.MemoryRange(reven2.address.LogicalAddress(0x1000), 0x1000) # This range will overlap with previous one r2 = mr.MemoryRange(reven2.address.LogicalAddress(0x1500), 0x1000) # This range does not overlap r3 = mr.MemoryRange(reven2.address.LogicalAddress(0x5000), 0x1000) print(r1) # Union is non-empty print(r1.union(r2)) # Union is empty print(r1.union(r2).union(r3))

Sample output:

[ds:0x1000; 4096] [ds:0x1000; 5376] None

Translating a range of virtual addresses

Users might need to access the physical addresses of a buffer. However, a range of memory may be mapped onto non-contiguous physical pages, rendering the conversion error-prone.

The MemoryRange object offers a translation method that is guaranteed to be correct.

virtual_range = mr.MemoryRange(reven2.address.LogicalAddress(0x10000), 0x2000) for phy in virtual_range.translate(ctx): print(phy)

Sample output:

[phy:0x59ce000; 4096] [phy:0x764f000; 4096]
This is the only recommended path

As mentioned above, it is easy to get the translation wrong: while translating a virtual address into physical address is valid, translating a whole buffer is more complex. Please use the MemoryRange object for this purpose.

Working with mulitple ranges

The API provides object to manage mutiple ranges, depending on the situation

Set of ranges

# First range r1 = mr.MemoryRange(reven2.address.LogicalAddress(0x1000), 0x1000) # This range will overlap with previous one r2 = mr.MemoryRange(reven2.address.LogicalAddress(0x1500), 0x1000) # This range does not overlap r3 = mr.MemoryRange(reven2.address.LogicalAddress(0x5000), 0x1000) # Range that overlap are merged s = mr.MemoryRangeSet([r1, r2, r3]) for r in s: print(r)

Sample output:

[ds:0x1000; 5376] [ds:0x5000; 4096]

Map of ranges

Maps of ranges are useful to associate data with these memory ranges. For example, you might want to associate buffers with a particular named source.

mem_map = mr.MemoryRangeMap(items=((r1, "from r1"), (r2, "from r2"), (r3, "from r3")), # Do not handle subtraction: subtract=lambda a: None, # Concatenate strings on merge: merge=lambda a, b: a + ", " + b) for r, source in mem_map: print(r, source)

Sample output:

[ds:0x1000; 1280] from r1 [ds:0x1500; 2816] from r1, from r2 [ds:0x2000; 1280] from r2 [ds:0x5000; 4096] from r3