started with base type overhaul

assembly-parser-rework
Anton Lydike 3 years ago
parent 0488a9d6bc
commit 5538034f8b

@ -1,6 +1,6 @@
MIT License
Copyright (c) 2021 Anton Lydike
Copyright (c) 2021-2022 Anton Lydike
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

@ -9,7 +9,7 @@ on them.
import sys
from typing import Tuple, List, Dict, Callable, Type
from .base_types import MemoryFlags
from .types import MemoryFlags
from .syscall import SyscallInterface, get_syscall_symbols
from .exceptions import RiscemuBaseException, LaunchDebuggerException
from .MMU import MMU
@ -23,7 +23,7 @@ import riscemu
import typing
if typing.TYPE_CHECKING:
from . import base_types, LoadedExecutable, LoadedInstruction
from . import types, LoadedExecutable, LoadedInstruction
from .instructions.InstructionSet import InstructionSet
@ -34,7 +34,7 @@ class CPU:
It is initialized with a configuration and a list of instruction sets.
"""
INS_XLEN = 1
INS_XLEN = 4
def __init__(self, conf: RunConfig, instruction_sets: List[Type['riscemu.InstructionSet']]):
"""
@ -70,34 +70,6 @@ class CPU:
if conf.include_scall_symbols:
self.mmu.global_symbols.update(get_syscall_symbols())
def get_tokenizer(self, tokenizer_input):
"""
Returns a tokenizer that respects the language of the CPU
:param tokenizer_input: an instance of the RiscVTokenizerInput class
"""
return RiscVTokenizer(tokenizer_input, self.all_instructions())
def load(self, e: riscemu.base_types):
"""
Load an executable into Memory
"""
return self.mmu.load_bin(e)
def run_loaded(self, le: 'riscemu.LoadedExecutable'):
"""
Run a loaded executable
"""
self.pc = le.run_ptr
if self.conf.stack_size > 0:
self.stack = self.mmu.allocate_section("stack", self.conf.stack_size, MemoryFlags(False, False))
self.regs.set('sp', self.stack.base + self.stack.size)
print(FMT_CPU + '[CPU] Allocated {} bytes of stack'.format(self.stack.size) + FMT_NONE)
print(FMT_CPU + '[CPU] Started running from 0x{:08X} ({})'.format(le.run_ptr, le.name) + FMT_NONE)
self._run()
def continue_from_debugger(self, verbose=True):
"""
called from the debugger to continue running
@ -157,24 +129,6 @@ class CPU:
print()
print(FMT_CPU + "Program stopped without exiting - perhaps you stopped the debugger?" + FMT_NONE)
def run_instruction(self, ins: 'LoadedInstruction'):
"""
Execute a single instruction
:param ins: The instruction to execute
"""
if ins.name in self.instructions:
self.instructions[ins.name](ins)
else:
# this should never be reached, as unknown instructions are imparseable
raise RuntimeError("Unknown instruction: {}".format(ins))
def all_instructions(self) -> List[str]:
"""
Return a list of all instructions this CPU can execute.
"""
return list(self.instructions.keys())
def __repr__(self):
"""
Returns a representation of the CPU and some of its state.

@ -4,17 +4,20 @@ RiscEmu (c) 2021 Anton Lydike
SPDX-License-Identifier: MIT
"""
from .base_types import InstructionContext, Instruction, MemorySection, MemoryFlags, T_RelativeAddress, T_AbsoluteAddress, \
Program
from .helpers import align_addr, int_from_bytes
from .exceptions import OutOfMemoryException, InvalidAllocationException
from typing import Dict, List, Optional
from .colors import *
from typing import Dict, List, Tuple, Optional
from .exceptions import InvalidAllocationException
from .helpers import align_addr, int_from_bytes
from .types import Instruction, MemorySection, MemoryFlags, T_AbsoluteAddress, \
Program
class MMU:
"""
The MemoryManagementUnit (handles loading binaries, and reading/writing data)
The MemoryManagementUnit. This provides a unified interface for reading/writing data from/to memory.
It also provides various translations for addresses.
"""
max_size = 0xFFFFFFFF
@ -62,9 +65,9 @@ class MMU:
return None
def get_bin_containing(self, addr: T_AbsoluteAddress) -> Optional[Program]:
for exe in self.binaries:
if exe.base_addr <= addr < exe.base_addr + exe.size:
return exe
for program in self.programs:
if program.base <= addr < program.base + program.size:
return program
return None
def read_ins(self, addr: T_AbsoluteAddress) -> Instruction:
@ -140,7 +143,68 @@ class MMU:
def read_int(self, addr: int) -> int:
return int_from_bytes(self.read(addr, 4))
def translate_address(self, address: T_AbsoluteAddress) -> str:
# FIXME: proper implementation using the debug info
return str(address)
def has_continous_free_region(self, start: int, end: int) -> bool:
# if we have no sections we are all good
if len(self.sections) == 0:
return True
# if the last section is located before the start we are also good
if start > self.sections[-1].base + self.sections[-1].size:
return True
for sec in self.sections:
# skip all sections that end before the required start point
if sec.base + sec.size < start:
continue
# we now have the first section that doesn't end **before** the start point
# if this section starts after the specified end, we are good
if sec.base > end:
return True
# otherwise we can't continue
return False
# if all sections end before the requested start we are good
# technically we shouldn't ever reach this point, but better safe than sorry
return True
def load_program(self, program: Program, align_to: int = 4):
if program.base is not None:
if not self.has_continous_free_region(program.base, program.base + program.size):
print(FMT_MEM + "Cannot load program {} into desired space (0x{:0x}-0x{:0x}), area occupied.".format(
program.name, program.base, program.base + program.size
) + FMT_NONE)
raise InvalidAllocationException("Area occupied".format(
program.name, program.base, program.base + program.size
), program.name, program.size, MemoryFlags(False, True))
at_addr = program.base
else:
first_guaranteed_free_address = self.sections[-1].base + self.sections[-1].size
at_addr = align_addr(first_guaranteed_free_address, align_to)
# trigger the load event to set all addresses in the binary
program.loaded_trigger(at_addr)
# add program and sections to internal state
self.programs.append(program)
self.sections += program.sections
self._update_state()
# load all global symbols from program
self.global_symbols.update(
{key: program.context.labels[key] for key in program.global_labels}
)
# inject reference to global symbol table into program context
# FIXME: this is pretty unclean and should probably be solved in a better way in the future
program.context.global_symbol_dict = self.global_symbols
def _update_state(self):
self.programs.sort(key=lambda bin: bin.base)
self.sections.sort(key=lambda sec: sec.base)
def __repr__(self):
return "MMU(\n\t{}\n)".format(
"\n\t".join(repr(x) for x in self.sections)
"\n\t".join(repr(x) for x in self.programs)
)

@ -22,7 +22,7 @@ from .CPU import CPU
from .config import RunConfig
from .parser import tokenize, parse_tokens, parse_program_from_file
from .parser import tokenize, parse_tokens, AssemblyFileLoader
__author__ = "Anton Lydike <Anton@Lydike.com>"
__copyright__ = "Copyright 2021 Anton Lydike"

@ -2,14 +2,20 @@ from typing import Optional, Tuple, Union, List
from enum import Enum, auto
from typing import Optional, Tuple, Union
from .helpers import parse_numeric_argument, align_addr, int_to_bytes
from .base_types import Program, T_RelativeAddress, InstructionContext, Instruction
from .helpers import parse_numeric_argument, align_addr, int_to_bytes, get_section_base_name
from .types import Program, T_RelativeAddress, InstructionContext, Instruction
from .colors import FMT_PARSE, FMT_NONE
from .exceptions import ParseException, ASSERT_LEN, ASSERT_NOT_NULL
from .tokenizer import Token
from .types import BinaryDataMemorySection, InstructionMemorySection
from .base import BinaryDataMemorySection, InstructionMemorySection
INSTRUCTION_SECTION_NAMES = ('.text', '.init', '.fini')
"""
A tuple containing all section names which contain executable code (instead of data)
The first segment of each segment (first segment of ".text.main" is ".text") is checked
against this list to determine the type of it.
"""
class MemorySectionType(Enum):
@ -64,17 +70,21 @@ class ParseContext:
if self.section is None:
return
if self.section.type == MemorySectionType.Data:
section = BinaryDataMemorySection(self.section.data, self.section.name, self.context, self.program)
section = BinaryDataMemorySection(
self.section.data, self.section.name, self.context, self.program.name, self.section.base
)
self.program.add_section(section)
elif self.section.type == MemorySectionType.Instructions:
section = InstructionMemorySection(self.section.data, self.section.name, self.context, self.program)
section = InstructionMemorySection(
self.section.data, self.section.name, self.context, self.program.name, self.section.base
)
self.program.add_section(section)
self.section = None
def new_section(self, name: str, type: MemorySectionType):
def new_section(self, name: str, type: MemorySectionType, alignment: int = 4):
base = 0
if self.section is not None:
base = align_addr(self.section.current_address(), 4)
base = align_addr(self.section.current_address(), alignment)
print("base at {}".format(base))
self._finalize_section()
self.section = CurrentSection(name, type, base)
@ -94,10 +104,6 @@ def ASSERT_IN_SECTION_TYPE(context: ParseContext, type: MemorySectionType):
)
def get_section_base_name(section_name: str) -> str:
return '.' + section_name.split('.')[1]
class AssemblerDirectives:
"""
This class represents a collection of all assembler directives as documented by

@ -0,0 +1,81 @@
"""
This file contains a base implementation of Instruction, and MemorySection.
This aims to be a simple base, usable for everyone who needs the basic functionality, but doesn't
want to set up their own subtypes of Instruction and MemorySection
"""
from typing import List, Tuple
from .exceptions import MemoryAccessException
from .helpers import parse_numeric_argument
from .types import Instruction, MemorySection, MemoryFlags, InstructionContext, T_RelativeAddress, \
T_AbsoluteAddress, Program
class SimpleInstruction(Instruction):
def __init__(self, name: str, args: Tuple[str], context: InstructionContext, addr: T_RelativeAddress):
self.context = context
self.name = name
self.args = args
self.addr = addr
def get_imm(self, num: int) -> int:
resolved_label = self.context.resolve_label(self.args[num], self.addr)
if resolved_label is None:
return parse_numeric_argument(self.args[num])
return resolved_label
def get_imm_reg(self, num: int) -> Tuple[int, str]:
return self.get_imm(num + 1), self.get_reg(num)
def get_reg(self, num: int) -> str:
return self.args[num]
class InstructionMemorySection(MemorySection):
def __init__(self, instructions: List[Instruction], name: str, context: InstructionContext, owner: str, base: int = 0):
self.name = name
self.base = base
self.context = context
self.size = len(instructions) * 4
self.flags = MemoryFlags(True, True)
self.instructions = instructions
self.owner = owner
def read(self, offset: T_RelativeAddress, size: int) -> bytearray:
raise MemoryAccessException("Cannot read raw bytes from instruction section", self.base + offset, size, 'read')
def write(self, offset: T_RelativeAddress, size: int, data: bytearray):
raise MemoryAccessException("Cannot write raw bytes to instruction section", self.base + offset, size, 'write')
def read_ins(self, offset: T_RelativeAddress) -> Instruction:
if offset % 4 != 0:
raise MemoryAccessException("Unaligned instruction fetch!", self.base + offset, 4, 'instruction fetch')
return self.instructions[offset // 4]
class BinaryDataMemorySection(MemorySection):
def __init__(self, data: bytearray, name: str, context: InstructionContext, owner: str, base: int = 0, flags: MemoryFlags = None):
self.name = name
self.base = base
self.context = context
self.size = len(data)
self.flags = flags if flags is not None else MemoryFlags(False, False)
self.data = data
self.owner = owner
def read(self, offset: T_RelativeAddress, size: int) -> bytearray:
if offset + size > self.size:
raise MemoryAccessException("Out of bounds access in {}".format(self), offset, size, 'read')
return self.data[offset:offset + size]
def write(self, offset: T_RelativeAddress, size: int, data: bytearray):
if offset + size > self.size:
raise MemoryAccessException("Out of bounds access in {}".format(self), offset, size, 'write')
if len(data[0:size]) != size:
raise MemoryAccessException("Invalid write parameter sizing", offset, size, 'write')
self.data[offset:offset + size] = data[0:size]
def read_ins(self, offset: T_RelativeAddress) -> Instruction:
raise MemoryAccessException("Tried reading instruction on non-executable section {}".format(self),
offset, 4, 'instruction fetch')

@ -1,188 +0,0 @@
"""
RiscEmu (c) 2021 Anton Lydike
SPDX-License-Identifier: MIT
This file contains base classes which represent loaded programs
"""
import re
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Set
from .colors import FMT_MEM, FMT_NONE, FMT_UNDERLINE, FMT_ORANGE
from .exceptions import ParseException
from .helpers import format_bytes
T_RelativeAddress = int
T_AbsoluteAddress = int
NUMBER_SYMBOL_PATTERN = re.compile(r'^\d+[fb]$')
@dataclass(frozen=True)
class MemoryFlags:
read_only: bool
executable: bool
def __repr__(self):
return "{}({},{})".format(
self.__class__.__name__,
'ro' if self.read_only else 'rw',
'x' if self.executable else '-'
)
class InstructionContext:
base_address: T_AbsoluteAddress
"""
The address where the instruction block is placed
"""
labels: Dict[str, T_RelativeAddress]
"""
This dictionary maps all labels to their relative position of the instruction block
"""
numbered_labels: Dict[str, List[T_RelativeAddress]]
"""
This dictionary maps numbered labels (which can occur multiple times) to a list of (block-relative) addresses where
the label was placed
"""
def __init__(self):
self.labels = dict()
self.numbered_labels = defaultdict(list)
self.base_address = 0
def resolve_label(self, symbol: str, address_at: Optional[T_RelativeAddress] = None) -> Optional[T_RelativeAddress]:
if NUMBER_SYMBOL_PATTERN.match(symbol):
if address_at is None:
raise ParseException("Cannot resolve relative symbol {} without an address!".format(symbol))
direction = symbol[-1]
if direction == 'b':
return max([addr for addr in self.numbered_labels.get(symbol[:-1], []) if addr < address_at],
default=None)
else:
return min([addr for addr in self.numbered_labels.get(symbol[:-1], []) if addr > address_at],
default=None)
else:
return self.labels.get(symbol, None)
class Instruction(ABC):
name: str
args: tuple
@abstractmethod
def get_imm(self, num: int) -> int:
"""
parse and get immediate argument
"""
pass
@abstractmethod
def get_imm_reg(self, num: int) -> Tuple[int, str]:
"""
parse and get an argument imm(reg)
"""
pass
@abstractmethod
def get_reg(self, num: int) -> str:
"""
parse and get an register argument
"""
pass
def __repr__(self):
return "{} {}".format(self.name, ", ".join(self.args))
@dataclass
class MemorySection(ABC):
name: str
flags: MemoryFlags
size: int
base: T_AbsoluteAddress
owner: str
context: InstructionContext
@abstractmethod
def read(self, offset: T_RelativeAddress, size: int) -> bytearray:
pass
@abstractmethod
def write(self, offset: T_RelativeAddress, size: int, data: bytearray):
pass
@abstractmethod
def read_ins(self, offset: T_RelativeAddress) -> Instruction:
pass
def dump(self, start: T_RelativeAddress, end: Optional[T_RelativeAddress], fmt: str = 'hex',
bytes_per_row: int = 16, rows: int = 10, group: int = 4):
if self.flags.executable:
bytes_per_row = 4
highlight = None
if end is None:
end = start + (bytes_per_row * (rows // 2))
highlight = start
start = start - (bytes_per_row * (rows // 2))
if self.flags.executable:
print(FMT_MEM + "{}, viewing {} instructions:".format(
self, (end - start) // 4
) + FMT_NONE)
for addr in range(start, end, 4):
if addr == highlight:
print(FMT_UNDERLINE + FMT_ORANGE, end='')
print("0x{:x}: {}{}".format(
self.base + addr, self.read_ins(addr), FMT_NONE
))
else:
print(FMT_MEM + "{}, viewing {} bytes:".format(
self, (end - start)
) + FMT_NONE)
for addr in range(start, end, bytes_per_row):
hi_ind = (highlight - addr) // group
print("0x{:x}: {}{}".format(
self.base + addr, format_bytes(self.read(addr, bytes_per_row), fmt, group, hi_ind), FMT_NONE
))
def __repr__(self):
return "{}[{}] at 0x{:08X} (size={}bytes, flags={}, owner={})".format(
self.__class__.__name__,
self.name,
self.base,
self.size,
self.flags,
self.owner
)
class Program:
name: str
context: InstructionContext
global_labels: Set[str]
sections: List[MemorySection]
base: T_AbsoluteAddress = 0
def __init__(self, name: str, base: int = 0):
self.name = name
self.context = InstructionContext()
self.sections = []
self.base = base
self.global_labels = set()
def add_section(self, sec: MemorySection):
self.sections.append(sec)
def __repr__(self):
return "{}(name={},context={},globals={},sections={},base={})".format(
self.__class__.__name__, self.name, self.context, self.global_labels,
[s.name for s in self.sections], self.base
)

@ -7,7 +7,7 @@ SPDX-License-Identifier: MIT
import typing
from .registers import Registers
from .colors import FMT_DEBUG, FMT_NONE
from .base_types import Instruction
from .types import Instruction
from .helpers import *
if typing.TYPE_CHECKING:

@ -5,8 +5,11 @@ SPDX-License-Identifier: MIT
"""
from abc import abstractmethod
from .base_types import Instruction
from .colors import *
import typing
if typing.TYPE_CHECKING:
from .types import Instruction
class RiscemuBaseException(BaseException):
@ -112,7 +115,7 @@ class InvalidAllocationException(RiscemuBaseException):
class UnimplementedInstruction(RiscemuBaseException):
def __init__(self, ins: Instruction):
def __init__(self, ins: 'Instruction'):
self.ins = ins
def message(self):

@ -139,3 +139,9 @@ class Peekable(Generic[T], Iterator[T]):
def is_empty(self) -> bool:
return self.peek() is None
def get_section_base_name(section_name: str) -> str:
if '.' not in section_name:
print(FMT_PARSE + f"Invalid section {section_name}, not starting with a dot!" + FMT_NONE)
return '.' + section_name.split('.')[1]

@ -10,7 +10,7 @@ from abc import ABC
from ..CPU import CPU
from ..helpers import to_unsigned
from ..exceptions import ASSERT_LEN, ASSERT_IN
from ..base_types import Instruction
from ..types import Instruction
class InstructionSet(ABC):

@ -11,7 +11,7 @@ from ..colors import FMT_DEBUG, FMT_NONE
from ..debug import launch_debug_session
from ..exceptions import LaunchDebuggerException
from ..syscall import Syscall
from ..base_types import Instruction
from ..types import Instruction
class RV32I(InstructionSet):

@ -5,15 +5,15 @@ SPDX-License-Identifier: MIT
"""
import os
import re
from typing import Dict, Tuple, Iterable, Callable
from typing import Dict, Tuple, Iterable, Callable, List
from .helpers import Peekable
from .assembler import MemorySectionType, ParseContext, AssemblerDirectives
from .base_types import Program
from .types import Program, T_ParserOpts, ProgramLoader
from .colors import FMT_PARSE
from .exceptions import ParseException
from .tokenizer import Token, TokenType, tokenize
from .types import SimpleInstruction
from .base import SimpleInstruction
def parse_instruction(token: Token, args: Tuple[str], context: ParseContext):
@ -53,7 +53,6 @@ def parse_tokens(name: str, tokens_iter: Iterable[Token]) -> Program:
for token, args in composite_tokenizer(Peekable[Token](tokens_iter)):
if token.type not in PARSERS:
raise ParseException("Unexpected token type: {}, {}".format(token, args))
print("{} {}".format(token, args))
PARSERS[token.type](token, args, context)
return context.finalize()
@ -92,9 +91,37 @@ def take_arguments(tokens: Peekable[Token]) -> Iterable[str]:
next(tokens)
break
break
#raise ParseException("Expected newline, instead got {}".format(tokens.peek()))
# raise ParseException("Expected newline, instead got {}".format(tokens.peek()))
def parse_program_from_file(path: str) -> Program:
with open(path, 'r') as f:
return parse_tokens(os.path.split(path)[-1], tokenize(f))
class AssemblyFileLoader(ProgramLoader):
"""
This class loads assembly files written by hand. It understands some assembler directives and supports most
pseudo instructions. It does very little verification of source correctness.
It also supports numbered jump targets and properly supports local and global scope (.globl assembly directive)
The AssemblyFileLoader loads .asm, .S and .s files by default, and acts as a weak fallback to all other filetypes.
"""
def parse(self) -> Program:
with open(self.source_path, 'r') as f:
return parse_tokens(self.filename, tokenize(f))
@classmethod
def can_parse(cls, source_path: str) -> float:
"""
It also acts as a weak fallback if no other loaders want to take the file.
:param source_path: the path to the source file
:return:
"""
# gcc recognizes these line endings as assembly. So we will do too.
if source_path.split('.')[-1] in ('asm', 'S', 's'):
return 1
return 0.01
@classmethod
def get_options(cls, argv: list[str]) -> [List[str], T_ParserOpts]:
return argv, {}

@ -1,11 +1,9 @@
from dataclasses import dataclass
from typing import List, Dict, Tuple
from typing import List
from .Exceptions import *
from ..exceptions import RiscemuBaseException
from ..base_types import MemoryFlags, LoadedMemorySection
from ..decoder import decode, RISCV_REGS, format_ins
from .types import ElfMemorySection
from ..helpers import FMT_PARSE, FMT_NONE, FMT_GREEN, FMT_BOLD
from ..types import MemoryFlags, Program, ProgramLoader, T_ParserOpts
FMT_ELF = FMT_GREEN + FMT_BOLD
@ -13,41 +11,53 @@ if typing.TYPE_CHECKING:
from elftools.elf.elffile import ELFFile
from elftools.elf.sections import Section, SymbolTableSection
# This requires pyelftools package!
INCLUDE_SEC = ('.text', '.stack', '.bss', '.sdata', '.sbss')
class ElfExecutable:
sections: List['ElfLoadedMemorySection']
sections_by_name: Dict[str, 'ElfLoadedMemorySection']
symbols: Dict[str, int]
run_ptr: int
class ElfBinaryFileLoader(ProgramLoader):
"""
Loads compiled elf binaries (checks for the magic sequence 7f45 4c46)
This loader respects local and global symbols.
"""
program: Program
def __init__(self, name: str):
self.sections = list()
self.sections_by_name = dict()
self.symbols = dict()
def __init__(self, source_path: str, options: T_ParserOpts):
super().__init__(source_path, options)
self.program = Program(self.filename)
@classmethod
def can_parse(cls, source_path: str) -> float:
with open(source_path, 'rb') as f:
if f.read(4) == b'\x7f\x45\x4c\x46':
return 1
return 0
@classmethod
def get_options(cls, argv: list[str]) -> [List[str], T_ParserOpts]:
return argv, {}
def parse(self) -> Program:
try:
from elftools.elf.elffile import ELFFile
from elftools.elf.sections import Section, SymbolTableSection
with open(name, 'rb') as f:
print(FMT_ELF + "[ElfLoader] Loading elf executable from: {}".format(name) + FMT_NONE)
with open(self.source_path, 'rb') as f:
print(FMT_ELF + "[ElfLoader] Loading elf executable from: {}".format(self.source_path) + FMT_NONE)
self._read_elf(ELFFile(f))
except ImportError as e:
print(FMT_PARSE + "[ElfLoader] Cannot load elf files without PyElfTools package! You can install them using pip install pyelftools!" + FMT_NONE)
print(FMT_PARSE + "[ElfLoader] Cannot load elf files without PyElfTools package! You can install them "
"using pip install pyelftools!" + FMT_NONE)
raise e
return self.program
def _read_elf(self, elf: 'ELFFile'):
if not elf.header.e_machine == 'EM_RISCV':
raise InvalidElfException("Not a RISC-V elf file!")
if not elf.header.e_ident.EI_CLASS == 'ELFCLASS32':
raise InvalidElfException("Only 32bit executables are supported!")
self.run_ptr = elf.header.e_entry
from elftools.elf.sections import SymbolTableSection
for sec in elf.iter_sections():
if isinstance(sec, SymbolTableSection):
@ -57,29 +67,31 @@ class ElfExecutable:
if sec.name not in INCLUDE_SEC:
continue
self.add_sec(self._lms_from_elf_sec(sec, 'kernel'))
self._add_sec(self._lms_from_elf_sec(sec, self.filename))
def _lms_from_elf_sec(self, sec: 'Section', owner: str):
is_code = sec.name in ('.text',)
data = bytearray(sec.data())
if len(data) < sec.data_size:
data += bytearray(len(data) - sec.data_size)
flags = MemoryFlags(is_code, is_code)
print(FMT_ELF + "[ElfLoader] Section {} at: {:X}".format(sec.name, sec.header.sh_addr) + FMT_NONE)
return ElfLoadedMemorySection(
sec.name,
sec.header.sh_addr,
sec.data_size,
data,
flags,
owner
return ElfMemorySection(
data, sec.name, self.program.context, owner, sec.header.sh_addr, flags
)
def _parse_symtab(self, symtab: 'SymbolTableSection'):
self.symbols = {
sym.name: sym.entry.st_value for sym in symtab.iter_symbols() if sym.name
}
def add_sec(self, new_sec: 'ElfLoadedMemorySection'):
for sec in self.sections:
for sym in symtab.iter_symbols():
if not sym.name:
continue
self.program.context.labels[sym.name] = sym.entry.st_value
# check if it has st_visibility bit set
if sym.entry.st_shndx == 1: # STB_GLOBAL = 1
self.program.global_labels.add(sym.name)
print(FMT_PARSE + "LOADED GLOBAL SYMBOL {}: {}".format(sym.name, sym.entry.st_value) + FMT_NONE)
def _add_sec(self, new_sec: 'ElfMemorySection'):
for sec in self.program.sections:
if sec.base < sec.end <= new_sec.base or sec.end > sec.base >= new_sec.end:
continue
else:
@ -88,78 +100,4 @@ class ElfExecutable:
) + FMT_NONE)
raise RuntimeError("Cannot load elf with overlapping sections!")
self.sections.append(new_sec)
self.sections_by_name[new_sec.name] = new_sec
class InvalidElfException(RiscemuBaseException):
def __init__(self, msg: str):
super().__init__()
self.msg = msg
def message(self):
return FMT_PARSE + "{}(\"{}\")".format(self.__class__.__name__, self.msg) + FMT_NONE
@dataclass(frozen=True)
class ElfInstruction:
name: str
args: List[int]
encoded: int
def get_imm(self, num: int) -> int:
return self.args[num]
def get_imm_reg(self, num: int) -> Tuple[int, int]:
return self.args[-1], self.args[-2]
def get_reg(self, num: int) -> str:
return RISCV_REGS[self.args[num]]
def __repr__(self) -> str:
if self.name == 'jal' and self.args[0] == 0:
return "j {}".format(self.args[1])
if self.name == 'addi' and self.args[2] == 0:
return "mv {}, {}".format(self.get_reg(0), self.get_reg(1))
if self.name == 'addi' and self.args[1] == 0:
return "li {}, {}".format(self.get_reg(0), self.args[2])
if self.name == 'ret' and len(self.args) == 0:
return "ret"
return format_ins(self.encoded, self.name)
# if self.name in ('lw', 'lh', 'lb', 'lbu', 'lhu', 'sw', 'sh', 'sb'):
# args = "{}, {}({})".format(
# RISCV_REGS[self.args[0]], self.args[2], RISCV_REGS[self.args[1]]
# )
# else:
# args = ", ".join(map(str, self.args))
# return "{:<8} {}".format(
# self.name,
# args
# )
class ElfLoadedMemorySection(LoadedMemorySection):
ins_cache: List[Optional[ElfInstruction]]
"""
A fast cache for accessing pre-decoded instructions
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__setattr__('ins_cache', [None] * (self.size // 4))
def read_instruction(self, offset):
if self.ins_cache[offset//4] is not None:
return self.ins_cache[offset//4]
if not self.flags.executable:
print(FMT_PARSE + "Reading instruction from non-executable memory!" + FMT_NONE)
raise InstructionAccessFault(offset + self.base)
if offset % 4 != 0:
raise InstructionAddressMisalignedTrap(offset + self.base)
ins = ElfInstruction(*decode(self.content[offset:offset + 4]))
self.ins_cache[offset // 4] = ins
return ins
@property
def end(self):
return self.size + self.base
self.program.add_section(new_sec)

@ -5,6 +5,9 @@ from .CSRConsts import MCAUSE_TRANSLATION
import typing
from .. import RiscemuBaseException
from ..colors import FMT_PARSE, FMT_NONE
if typing.TYPE_CHECKING:
from .ElfLoader import ElfInstruction
@ -52,14 +55,17 @@ class CpuTrap(BaseException):
def mcause(self):
return (self.interrupt << 31) + self.code
def message(self) -> str:
return ""
def __repr__(self):
name = "Reserved interrupt({}, {})".format(self.interrupt, self.code)
if (self.interrupt, self.code) in MCAUSE_TRANSLATION:
name = MCAUSE_TRANSLATION[(self.interrupt, self.code)] + "({}, {})".format(self.interrupt, self.code)
return "{} {{priv={}, type={}, mtval={:x}}}".format(
name, self.priv.name, self.type.name, self.mtval
return "{} {{priv={}, type={}, mtval={:x}}} {}".format(
name, self.priv.name, self.type.name, self.mtval, self.message()
)
def __str__(self):
@ -89,3 +95,29 @@ class TimerInterrupt(CpuTrap):
class EcallTrap(CpuTrap):
def __init__(self, mode: PrivModes):
super().__init__(mode.value + 8, 0, CpuTrapType.EXCEPTION)
class InvalidElfException(RiscemuBaseException):
def __init__(self, msg: str):
super().__init__()
self.msg = msg
def message(self):
return FMT_PARSE + "{}(\"{}\")".format(self.__class__.__name__, self.msg) + FMT_NONE
class LoadAccessFault(CpuTrap):
def __init__(self, msg, addr, size, op):
super(LoadAccessFault, self).__init__(5, addr, CpuTrapType.EXCEPTION)
self.msg = msg
self.addr = addr
self.size = size
self.op = op
def message(self):
return "(During {} at 0x{:08x} of size {}: {})".format(
self.op,
self.addr,
self.size,
self.msg
)

@ -2,124 +2,74 @@
Laods a memory image with debug information into memory
"""
import json
from functools import lru_cache
from typing import Dict, List, Optional, TYPE_CHECKING
from .ElfLoader import ElfInstruction, ElfLoadedMemorySection, InstructionAccessFault, InstructionAddressMisalignedTrap
from .PrivMMU import PrivMMU
from ..config import RunConfig
from ..base_types import LoadedMemorySection, MemoryFlags
from ..IO.IOModule import IOModule
from ..colors import FMT_ERROR, FMT_NONE, FMT_MEM
from ..decoder import decode
if TYPE_CHECKING:
pass
class MemoryImageMMU(PrivMMU):
io: List[IOModule]
data: bytearray
io_start: int
debug_info: Dict[str, Dict[str, Dict[str, str]]]
def __init__(self, file_name: str, io_start: int = 0xFF0000):
super(MemoryImageMMU, self).__init__(conf=RunConfig())
with open(file_name, 'rb') as memf:
data = memf.read()
with open(file_name + '.dbg', 'r') as dbgf:
debug_info: Dict = json.load(dbgf)
self.data = bytearray(data)
# TODO: super wasteful memory allocation happening here
if len(data) < io_start:
self.data += bytearray(io_start - len(data))
self.debug_info = debug_info
self.io_start = io_start
self.io = list()
def get_entrypoint(self):
try:
start = self.debug_info['symbols']['kernel'].get('_start', None)
if start is not None:
return start
return self.debug_info['symbols']['kernel'].get('_ftext')
except KeyError:
print(FMT_ERROR + '[MMU] cannot find kernel entry in debug information! Falling back to 0x100' + FMT_NONE)
return 0x100
@lru_cache(maxsize=2048)
def read_ins(self, addr: int) -> ElfInstruction:
if addr >= self.io_start:
raise InstructionAccessFault(addr)
if addr % 4 != 0:
raise InstructionAddressMisalignedTrap(addr)
return ElfInstruction(*decode(self.data[addr:addr + 4]))
def read(self, addr: int, size: int) -> bytearray:
if addr < 0x100:
pc = self.cpu.pc
text_sec = self.get_sec_containing(pc)
print(FMT_ERROR + "[MMU] possible null dereference (read {:x}) from (pc={:x},sec={},rel={:x})".format(
addr, pc, text_sec.owner + ':' + text_sec.name, pc - text_sec.base
) + FMT_NONE)
if addr >= self.io_start:
return self.io_at(addr).read(addr, size)
return self.data[addr: addr + size]
def write(self, addr: int, size: int, data):
if addr < 0x100:
pc = self.cpu.pc
text_sec = self.get_sec_containing(pc)
print(FMT_ERROR + "[MMU] possible null dereference (write {:x}) from (pc={:x},sec={},rel={:x})".format(
addr, pc, text_sec.owner + ':' + text_sec.name, pc - text_sec.base
) + FMT_NONE)
if addr >= self.io_start:
return self.io_at(addr).write(addr, data, size)
self.data[addr:addr + size] = data[0:size]
def io_at(self, addr) -> IOModule:
for mod in self.io:
if mod.contains(addr):
return mod
raise InstructionAccessFault(addr)
def add_io(self, io: IOModule):
self.io.append(io)
def __repr__(self):
return "MemoryImageMMU()"
@lru_cache(maxsize=32)
def get_sec_containing(self, addr: int) -> Optional[LoadedMemorySection]:
next_sec = len(self.data)
for sec_addr, name in reversed(self.debug_info['sections'].items()):
if addr >= int(sec_addr):
owner, name = name.split(':')
base = int(sec_addr)
size = next_sec - base
flags = MemoryFlags('.text' in name, '.text' in name)
return ElfLoadedMemorySection(name, base, size, self.data[base:next_sec], flags, owner)
else:
next_sec = int(sec_addr)
def translate_address(self, addr: int):
sec = self.get_sec_containing(addr)
if sec.name == '.empty':
return "<empty>"
symbs = self.debug_info['symbols'][sec.owner]
for sym, val in reversed(symbs.items()):
if addr >= val:
return "{}{:+x} ({}:{})".format(sym, addr - val, sec.owner, sec.name)
return "{}:{}{:+x}".format(sec.owner, sec.name, addr - sec.base)
def label(self, symb: str):
print(FMT_MEM + "Looking up symbol {}".format(symb))
for owner, symbs in self.debug_info['symbols'].items():
if symb in symbs:
print(" Hit in {}: {} = {}".format(owner, symb, symbs[symb]))
print(FMT_NONE, end="")
import os.path
from typing import List, Iterable
from .ElfLoader import ElfMemorySection
from .types import MemoryImageDebugInfos
from ..assembler import INSTRUCTION_SECTION_NAMES
from ..colors import FMT_NONE, FMT_PARSE
from ..helpers import get_section_base_name
from ..types import MemoryFlags, ProgramLoader, Program, T_ParserOpts
class MemoryImageLoader(ProgramLoader):
@classmethod
def can_parse(cls, source_path: str) -> float:
if source_path.split('.')[-1] == '.img':
return 1
return 0
@classmethod
def get_options(cls, argv: list[str]) -> [List[str], T_ParserOpts]:
return argv, {}
def parse(self) -> Iterable[Program]:
if self.options.get('debug', False):
yield self.parse_no_debug()
return
with open(self.options.get('debug'), 'r') as debug_file:
debug_info = MemoryImageDebugInfos.load(debug_file.read())
with open(self.source_path, 'rb') as source_file:
data: bytearray = bytearray(source_file.read())
for name, sections in debug_info.sections.items():
program = Program(name)
for sec_name, (start, size) in sections.items():
if program.base is None:
program.base = start
in_code_sec = get_section_base_name(sec_name) in INSTRUCTION_SECTION_NAMES
program.add_section(
ElfMemorySection(
data[start:start+size], sec_name, program.context,
name, start, MemoryFlags(in_code_sec, in_code_sec)
)
)
program.context.labels.update(debug_info.symbols.get(name, dict()))
program.global_labels.update(debug_info.globals.get(name, set()))
yield program
def parse_no_debug(self) -> Program:
print(FMT_PARSE + "[MemoryImageLoader] Warning: loading memory image without debug information!" + FMT_NONE)
with open(self.source_path, 'rb') as source_file:
data: bytes = source_file.read()
p = Program(self.filename)
p.add_section(ElfMemorySection(
bytearray(data), 'memory image contents', p.context, p.name, 0, MemoryFlags(False, True)
))
return p
@classmethod
def instantiate(cls, source_path: str, options: T_ParserOpts) -> 'ProgramLoader':
if os.path.exists(source_path + '.dbg'):
return MemoryImageLoader(source_path, dict(**options, debug=source_path + '.dbg'))
return MemoryImageLoader(source_path, options)

@ -15,7 +15,7 @@ from ..IO import TextIO
from ..instructions import RV32A, RV32M
if typing.TYPE_CHECKING:
from riscemu import base_types, LoadedExecutable, LoadedInstruction
from riscemu import types, LoadedExecutable, LoadedInstruction
from riscemu.instructions.InstructionSet import InstructionSet
@ -25,7 +25,7 @@ class PrivCPU(CPU):
It should support M and U Mode, but no U-Mode Traps.
This allows us to
This is meant to emulate whole operating systems.
"""
csr: CSR
@ -44,17 +44,11 @@ class PrivCPU(CPU):
the equivalent of "1 byte" (this is actually impossible)
"""
def __init__(self, conf, mmu: PrivMMU):
def __init__(self, conf):
super().__init__(conf, [PrivRV32I, RV32M, RV32A])
# start in machine mode
self.mode: PrivModes = PrivModes.MACHINE
mmu.set_cpu(self)
self.pc = mmu.get_entrypoint()
self.mmu = mmu
if hasattr(self.mmu, 'add_io'):
self.mmu.add_io(TextIO.TextIO(0xff0000, 64))
self.syscall_int = None
self.launch_debug = False
self.pending_traps: List[CpuTrap] = list()

@ -21,7 +21,7 @@ class PrivRV32I(RV32I):
This is an extension of RV32I, written for the PrivCPU class
"""
def instruction_csrrw(self, ins: 'LoadedInstruction'):
def instruction_csrrw(self, ins: 'Instruction'):
rd, rs, csr_addr = self.parse_crs_ins(ins)
old_val = None
if rd != 'zero':
@ -34,7 +34,7 @@ class PrivRV32I(RV32I):
if old_val is not None:
self.regs.set(rd, old_val)
def instruction_csrrs(self, ins: 'LoadedInstruction'):
def instruction_csrrs(self, ins: 'Instruction'):
rd, rs, csr_addr = self.parse_crs_ins(ins)
if rs != 'zero':
# oh no, this should not happen!
@ -45,13 +45,13 @@ class PrivRV32I(RV32I):
self.regs.set(rd, old_val)
def instruction_csrrc(self, ins: 'LoadedInstruction'):
def instruction_csrrc(self, ins: 'Instruction'):
INS_NOT_IMPLEMENTED(ins)
def instruction_csrrsi(self, ins: 'LoadedInstruction'):
def instruction_csrrsi(self, ins: 'Instruction'):
INS_NOT_IMPLEMENTED(ins)
def instruction_csrrwi(self, ins: 'LoadedInstruction'):
def instruction_csrrwi(self, ins: 'Instruction'):
ASSERT_LEN(ins.args, 3)
rd, imm, addr = ins.get_reg(0), ins.get_imm(1), ins.get_imm(2)
if rd != 'zero':
@ -62,10 +62,10 @@ class PrivRV32I(RV32I):
self.cpu.csr.set(addr, imm)
def instruction_csrrci(self, ins: 'LoadedInstruction'):
def instruction_csrrci(self, ins: 'Instruction'):
INS_NOT_IMPLEMENTED(ins)
def instruction_mret(self, ins: 'LoadedInstruction'):
def instruction_mret(self, ins: 'Instruction'):
if self.cpu.mode != PrivModes.MACHINE:
print("MRET not inside machine level code!")
raise IllegalInstructionTrap(ins)
@ -90,53 +90,53 @@ class PrivRV32I(RV32I):
if self.cpu.conf.verbosity > 1:
self.regs.dump_reg_a()
def instruction_uret(self, ins: 'LoadedInstruction'):
def instruction_uret(self, ins: 'Instruction'):
raise IllegalInstructionTrap(ins)
def instruction_sret(self, ins: 'LoadedInstruction'):
def instruction_sret(self, ins: 'Instruction'):
raise IllegalInstructionTrap(ins)
def instruction_scall(self, ins: 'LoadedInstruction'):
def instruction_scall(self, ins: 'Instruction'):
"""
Overwrite the scall from userspace RV32I
"""
raise EcallTrap(self.cpu.mode)
def instruction_beq(self, ins: 'LoadedInstruction'):
def instruction_beq(self, ins: 'Instruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
if rs1 == rs2:
self.pc += dst - 4
def instruction_bne(self, ins: 'LoadedInstruction'):
def instruction_bne(self, ins: 'Instruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
if rs1 != rs2:
self.pc += dst - 4
def instruction_blt(self, ins: 'LoadedInstruction'):
def instruction_blt(self, ins: 'Instruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
if rs1 < rs2:
self.pc += dst - 4
def instruction_bge(self, ins: 'LoadedInstruction'):
def instruction_bge(self, ins: 'Instruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
if rs1 >= rs2:
self.pc += dst - 4
def instruction_bltu(self, ins: 'LoadedInstruction'):
def instruction_bltu(self, ins: 'Instruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins, signed=False)
if rs1 < rs2:
self.pc += dst - 4
def instruction_bgeu(self, ins: 'LoadedInstruction'):
def instruction_bgeu(self, ins: 'Instruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins, signed=False)
if rs1 >= rs2:
self.pc += dst - 4
# technically deprecated
def instruction_j(self, ins: 'LoadedInstruction'):
def instruction_j(self, ins: 'Instruction'):
raise NotImplementedError("Should never be reached!")
def instruction_jal(self, ins: 'LoadedInstruction'):
def instruction_jal(self, ins: 'Instruction'):
ASSERT_LEN(ins.args, 2)
reg = ins.get_reg(0)
addr = ins.get_imm(1)
@ -148,20 +148,20 @@ class PrivRV32I(RV32I):
self.regs.set(reg, self.pc)
self.pc += addr - 4
def instruction_jalr(self, ins: 'LoadedInstruction'):
def instruction_jalr(self, ins: 'Instruction'):
ASSERT_LEN(ins.args, 3)
rd, rs, imm = self.parse_rd_rs_imm(ins)
self.regs.set(rd, self.pc)
self.pc = rs + imm - 4
def instruction_sbreak(self, ins: 'LoadedInstruction'):
def instruction_sbreak(self, ins: 'Instruction'):
raise LaunchDebuggerException()
def parse_crs_ins(self, ins: 'LoadedInstruction'):
def parse_crs_ins(self, ins: 'Instruction'):
ASSERT_LEN(ins.args, 3)
return ins.get_reg(0), ins.get_reg(1), ins.get_imm(2)
def parse_mem_ins(self, ins: 'LoadedInstruction') -> Tuple[str, int]:
def parse_mem_ins(self, ins: 'Instruction') -> Tuple[str, int]:
ASSERT_LEN(ins.args, 3)
addr = self.get_reg_content(ins, 1) + ins.get_imm(2)
reg = ins.get_reg(0)

@ -0,0 +1,140 @@
import json
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from typing import Tuple, Dict, Set
from riscemu import MemoryAccessException
from riscemu.priv.Exceptions import InstructionAccessFault, InstructionAddressMisalignedTrap, LoadAccessFault
from riscemu.types import Instruction, InstructionContext, T_RelativeAddress, MemoryFlags, T_AbsoluteAddress
from riscemu.base import BinaryDataMemorySection
from riscemu.colors import FMT_NONE, FMT_PARSE
from riscemu.decoder import format_ins, RISCV_REGS, decode
@dataclass(frozen=True)
class ElfInstruction(Instruction):
name: str
args: Tuple[int]
encoded: int
def get_imm(self, num: int) -> int:
return self.args[num]
def get_imm_reg(self, num: int) -> Tuple[int, int]:
return self.args[-1], self.args[-2]
def get_reg(self, num: int) -> str:
return RISCV_REGS[self.args[num]]
def __repr__(self) -> str:
if self.name == 'jal' and self.args[0] == 0:
return "j {}".format(self.args[1])
if self.name == 'addi' and self.args[2] == 0:
return "mv {}, {}".format(self.get_reg(0), self.get_reg(1))
if self.name == 'addi' and self.args[1] == 0:
return "li {}, {}".format(self.get_reg(0), self.args[2])
if self.name == 'ret' and len(self.args) == 0:
return "ret"
return format_ins(self.encoded, self.name)
class ElfMemorySection(BinaryDataMemorySection):
def __init__(self, data: bytearray, name: str, context: InstructionContext, owner: str, base: int,
flags: MemoryFlags):
super().__init__(data, name, context, owner, base=base, flags=flags)
@lru_cache
def read_ins(self, offset):
if not self.flags.executable:
print(FMT_PARSE + "Reading instruction from non-executable memory!" + FMT_NONE)
raise InstructionAccessFault(offset + self.base)
if offset % 4 != 0:
raise InstructionAddressMisalignedTrap(offset + self.base)
return ElfInstruction(*decode(self.data[offset:offset + 4]))
def write(self, offset: T_RelativeAddress, size: int, data: bytearray):
if self.flags.read_only:
raise LoadAccessFault('read-only section', offset + self.base, size, 'write')
self.read_ins.cache_clear()
return super(ElfMemorySection, self).write(offset, size, data)
@property
def end(self):
return self.size + self.base
class MemoryImageDebugInfos:
VERSION = '1'
"""
Schema version
"""
base: T_AbsoluteAddress = 0
"""
The base address where the image starts. Defaults to zero.
"""
sections: Dict[str, Dict[str, Tuple[int, int]]]
"""
This dictionary maps a program and section to (start address, section length)
"""
symbols: Dict[str, Dict[str, int]]
"""
This dictionary maps a program and a symbol to a value
"""
globals: Dict[str, Set[str]]
"""
This dictionary contains the list of all global symbols of a given program
"""
def __init__(self,
sections: Dict[str, Dict[str, Tuple[int, int]]],
symbols: Dict[str, Dict[str, int]],
globals: Dict[str, Set[str]],
base: int = 0
):
self.sections = sections
self.symbols = symbols
self.globals = globals
self.base = base
def serialize(self) -> str:
def serialize(obj: any) -> str:
if isinstance(obj, defaultdict):
return json.dumps(dict(obj), default=serialize)
if isinstance(obj, (set, tuple)):
return json.dumps(list(obj), default=serialize)
return "<<unserializable {}>>".format(getattr(obj, '__qualname__', '{unknown}'))
return json.dumps(
dict(sections=self.sections, symbols=self.symbols, globals=self.globals, base=self.base),
default=serialize
)
@classmethod
def load(cls, serialized_str: str) -> 'MemoryImageDebugInfos':
json_obj: dict = json.loads(serialized_str)
if 'VERSION' not in json_obj:
raise RuntimeError("Unknown MemoryImageDebugInfo version!")
version: str = json_obj.pop('VERSION')
# compare major version
if version != cls.VERSION or version.split('.')[0] != cls.VERSION.split('.')[0]:
raise RuntimeError(
"Unknown MemoryImageDebugInfo version! This emulator expects version {}, debug info version {}".format(
cls.VERSION, version
)
)
return MemoryImageDebugInfos(**json_obj)
@classmethod
def builder(cls) -> 'MemoryImageDebugInfos':
return MemoryImageDebugInfos(
defaultdict(dict), defaultdict(dict), defaultdict(set)
)

@ -18,9 +18,9 @@ if typing.TYPE_CHECKING:
from . import CPU
SYSCALLS = {
63: 'read',
64: 'write',
93: 'exit',
63: 'read',
64: 'write',
93: 'exit',
1024: 'open',
1025: 'close',
}
@ -35,6 +35,7 @@ OPEN_MODES = {
}
"""All available file open modes"""
@dataclass(frozen=True)
class Syscall:
"""
@ -199,4 +200,4 @@ class SyscallInterface:
return "{}(\n\tfiles={}\n)".format(
self.__class__.__name__,
self.open_files
)
)

@ -133,7 +133,3 @@ def split_whitespace_respecting_quotes(line: str) -> Iterable[str]:
if part:
yield part

@ -1,74 +1,412 @@
from typing import List, Tuple
from .exceptions import MemoryAccessException
from .helpers import parse_numeric_argument
from .base_types import Instruction, MemorySection, MemoryFlags, InstructionContext, T_RelativeAddress, \
T_AbsoluteAddress, Program
"""
RiscEmu (c) 2021 Anton Lydike
SPDX-License-Identifier: MIT
This file contains abstract base classes and types, bundling only the absolute basic functionality
See base.py for some basic implementations of these classes
"""
import os
import re
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Set, Union, Generator, Iterator, Callable, Type
from . import MMU, InstructionSet
from .assembler import get_section_base_name
from .colors import FMT_MEM, FMT_NONE, FMT_UNDERLINE, FMT_ORANGE, FMT_PARSE, FMT_RED, FMT_BOLD
from .exceptions import ParseException
from .helpers import format_bytes
# define some base type aliases so we can keep track of absolute and relative addresses
T_RelativeAddress = int
T_AbsoluteAddress = int
# parser options are just dictionaries with arbitrary values
T_ParserOpts = Dict[str, any]
NUMBER_SYMBOL_PATTERN = re.compile(r'^\d+[fb]$')
@dataclass(frozen=True)
class MemoryFlags:
read_only: bool
executable: bool
def __repr__(self):
return "r{}{}".format(
'-' if self.read_only else 'w',
'x' if self.executable else '-'
)
class InstructionContext:
base_address: T_AbsoluteAddress
"""
The address where the instruction block is placed
"""
labels: Dict[str, T_RelativeAddress]
"""
This dictionary maps all labels to their relative position of the instruction block
"""
numbered_labels: Dict[str, List[T_RelativeAddress]]
"""
This dictionary maps numbered labels (which can occur multiple times) to a list of (block-relative) addresses where
the label was placed
"""
global_symbol_dict: Dict[str, T_AbsoluteAddress]
"""
A reference to the MMU for access to global symbols
"""
def __init__(self):
self.labels = dict()
self.numbered_labels = defaultdict(list)
self.base_address = 0
self.global_symbol_dict = dict()
def resolve_label(self, symbol: str, address_at: Optional[T_RelativeAddress] = None) -> Optional[T_AbsoluteAddress]:
if NUMBER_SYMBOL_PATTERN.match(symbol):
if address_at is None:
raise ParseException("Cannot resolve relative symbol {} without an address!".format(symbol))
direction = symbol[-1]
if direction == 'b':
return max([addr for addr in self.numbered_labels.get(symbol[:-1], []) if addr < address_at],
default=None)
else:
return min([addr for addr in self.numbered_labels.get(symbol[:-1], []) if addr > address_at],
default=None)
else:
if symbol not in self.labels:
return self.global_symbol_dict.get(symbol, None)
value = self.labels.get(symbol, None)
if value is None:
return value
return value + self.base_address
class SimpleInstruction(Instruction):
def __init__(self, name: str, args: Tuple[str], context: InstructionContext, addr: T_RelativeAddress):
self.context = context
self.name = name
self.args = args
self.addr = addr
class Instruction(ABC):
name: str
args: tuple
@abstractmethod
def get_imm(self, num: int) -> int:
resolved_label = self.context.resolve_label(self.args[num], self.addr)
if resolved_label is None:
return parse_numeric_argument(self.args[num])
return resolved_label
"""
parse and get immediate argument
"""
pass
@abstractmethod
def get_imm_reg(self, num: int) -> Tuple[int, str]:
return self.get_imm(num + 1), self.get_reg(num)
"""
parse and get an argument imm(reg)
"""
pass
@abstractmethod
def get_reg(self, num: int) -> str:
return self.args[num]
"""
parse and get an register argument
"""
pass
def __repr__(self):
return "{} {}".format(self.name, ", ".join(self.args))
class InstructionMemorySection(MemorySection):
def __init__(self, instructions: List[Instruction], name: str, context: InstructionContext, owner: Program, base: int = 0):
self.name = name
self.base = base
self.context = context
self.size = len(instructions) * 4
self.flags = MemoryFlags(True, True)
self.instructions = instructions
self.owner = owner.name
@dataclass
class MemorySection(ABC):
name: str
flags: MemoryFlags
size: int
base: T_AbsoluteAddress
owner: str
context: InstructionContext
@property
def end(self):
return self.base + self.size
@abstractmethod
def read(self, offset: T_RelativeAddress, size: int) -> bytearray:
raise MemoryAccessException("Cannot read raw bytes from instruction section", self.base + offset, size, 'read')
pass
@abstractmethod
def write(self, offset: T_RelativeAddress, size: int, data: bytearray):
raise MemoryAccessException("Cannot write raw bytes to instruction section", self.base + offset, size, 'write')
pass
@abstractmethod
def read_ins(self, offset: T_RelativeAddress) -> Instruction:
if offset % 4 != 0:
raise MemoryAccessException("Unaligned instruction fetch!", self.base + offset, 4, 'instruction fetch')
return self.instructions[offset // 4]
pass
def dump(self, start: T_RelativeAddress, end: Optional[T_RelativeAddress] = None, fmt: str = 'hex',
bytes_per_row: int = 16, rows: int = 10, group: int = 4):
if self.flags.executable:
bytes_per_row = 4
highlight = None
if end is None:
end = min(start + (bytes_per_row * (rows // 2)), self.size - 1)
highlight = start
start = max(0, start - (bytes_per_row * (rows // 2)))
if self.flags.executable:
print(FMT_MEM + "{}, viewing {} instructions:".format(
self, (end - start) // 4
) + FMT_NONE)
for addr in range(start, end, 4):
if addr == highlight:
print(FMT_UNDERLINE + FMT_ORANGE, end='')
print("0x{:04x}: {}{}".format(
self.base + addr, self.read_ins(addr), FMT_NONE
))
else:
print(FMT_MEM + "{}, viewing {} bytes:".format(
self, (end - start)
) + FMT_NONE)
aligned_end = end - (end % bytes_per_row) if end % bytes_per_row != 0 else end
for addr in range(start, aligned_end, bytes_per_row):
hi_ind = (highlight - addr) // group if highlight is not None else -1
print("0x{:04x}: {}{}".format(
self.base + addr, format_bytes(self.read(addr, bytes_per_row), fmt, group, hi_ind), FMT_NONE
))
if aligned_end != end:
hi_ind = (highlight - aligned_end) // group if highlight is not None else -1
print("0x{:04x}: {}{}".format(
self.base + aligned_end, format_bytes(
self.read(aligned_end, end % bytes_per_row), fmt, group, hi_ind
), FMT_NONE
))
def dump_all(self, *args, **kwargs):
self.dump(0, self.size, *args, **kwargs)
def __repr__(self):
return "{}[{}] at 0x{:08X} (size={}bytes, flags={}, owner={})".format(
self.__class__.__name__,
self.name,
self.base,
self.size,
self.flags,
self.owner
)
class Program:
"""
This represents a collection of sections which together form an executable program
When you want to create a program which can be located anywhere in memory, set base to None,
this signals the other components, that this is relocatable. Set the base of each section to
the offset in the program, and everything will be taken care of for you.
"""
name: str
context: InstructionContext
global_labels: Set[str]
sections: List[MemorySection]
base: Optional[T_AbsoluteAddress]
is_loaded: bool
@property
def size(self):
if len(self.sections) == 0:
return 0
if self.base is None:
return self.sections[-1].base + self.sections[-1].size
return (self.sections[-1].base - self.base) + self.sections[-1].size
class BinaryDataMemorySection(MemorySection):
def __init__(self, data: bytearray, name: str, context: InstructionContext, owner: Program, base: int = 0):
def __init__(self, name: str, base: Optional[int] = None):
self.name = name
self.context = InstructionContext()
self.sections = []
self.global_labels = set()
self.base = base
self.context = context
self.size = len(data)
self.flags = MemoryFlags(False, False)
self.data = data
self.owner = owner.name
self.loaded = False
def read(self, offset: T_RelativeAddress, size: int) -> bytearray:
if offset + size > self.size:
raise MemoryAccessException("Out of bounds access in {}".format(self), offset, size, 'read')
return self.data[offset:offset + size]
def add_section(self, sec: MemorySection):
# print a warning when a section is located before the programs base
if self.base is not None:
if sec.base < self.base:
print(FMT_RED + FMT_BOLD + "WARNING: memory section {} in {} is placed before program base (0x{:x})".format(
sec, self.name, self.base
) + FMT_NONE)
def write(self, offset: T_RelativeAddress, size: int, data: bytearray):
if offset + size > self.size:
raise MemoryAccessException("Out of bounds access in {}".format(self), offset, size, 'write')
if len(data[0:size]) != size:
raise MemoryAccessException("Invalid write parameter sizing", offset, size, 'write')
self.data[offset:offset + size] = data[0:size]
self.sections.append(sec)
# keep section list ordered
self.sections.sort(key=lambda section: section.base)
def read_ins(self, offset: T_RelativeAddress) -> Instruction:
raise MemoryAccessException("Tried reading instruction on non-executable section {}".format(self),
offset, 4, 'instruction fetch')
def __repr__(self):
return "{}(name={},globals={},sections={},base={})".format(
self.__class__.__name__, self.name, self.global_labels,
[s.name for s in self.sections], self.base
)
@property
def entrypoint(self):
base = 0 if self.base is None else self.base
if '_start' in self.context.labels:
return base + self.context.labels.get('_start')
if 'main' in self.context.labels:
return base + self.context.labels.get('main')
for sec in self.sections:
if get_section_base_name(sec.name) == '.text' and sec.flags.executable:
return base + sec.base
def loaded_trigger(self, at_addr: T_AbsoluteAddress):
"""
This trigger is called when the binary is loaded and its final address in memory is determined
This will do a small sanity check to prevent programs loading twice, or at addresses they don't
expect to be loaded.
:param at_addr: the address where the program will be located
"""
if self.is_loaded:
if at_addr != self.base:
raise RuntimeError("Program loaded twice at different addresses! This will probably break things!")
return
if self.base is not None and self.base != at_addr:
print(FMT_MEM + 'WARNING: Program loaded at different address then expected! (loaded at {}, '
'but expects to be loaded at {})'.format(at_addr, self.base) + FMT_NONE)
# if the program is not located anywhere explicitly in memory, add the program address
# to the defined section bases
if self.base is None:
for sec in self.sections:
sec.base += at_addr
if self.base != at_addr:
# move sections so they are located where they want to be located
offset = at_addr - self.base
for sec in self.sections:
sec.base += offset
self.base = at_addr
self.context.base_address = at_addr
class ProgramLoader(ABC):
"""
A program loader is always specific to a given source file. It is a place to store all state
concerning the parsing and loading of that specific source file, including options.
"""
def __init__(self, source_path: str, options: T_ParserOpts):
self.source_path = source_path
self.options = options
self.filename = os.path.split(self.source_path)[-1]
@classmethod
@abstractmethod
def can_parse(cls, source_path: str) -> float:
"""
Return confidence that the file located at source_path
should be parsed and loaded by this loader
:param source_path: the path of the source file
:return: the confidence that this file belongs to this parser
"""
pass
@classmethod
@abstractmethod
def get_options(cls, argv: list[str]) -> [List[str], T_ParserOpts]:
"""
parse command line args into an options dictionary
:param argv: the command line args list
:return: all remaining command line args and the parser options object
"""
pass
@classmethod
def instantiate(cls, source_path: str, options: T_ParserOpts) -> 'ProgramLoader':
"""
Instantiate a loader for the given source file with the required arguments
:param source_path: the path to the source file
:param options: the parsed options (guaranteed to come from this classes get_options method.
:return: An instance of a ProgramLoader for the spcified source
"""
return cls(source_path, options)
@abstractmethod
def parse(self) -> Union[Program, Iterator[Program]]:
"""
:return:
"""
pass
class CPU(ABC):
# static cpu configuration
INS_XLEN: int = 4
# housekeeping variables
mmu: MMU
pc: T_AbsoluteAddress
cycle: int
halted: bool
# debugging context
debugger_active: bool
# instruction information
instructions: Dict[str, Callable[[Instruction], None]]
instruction_sets: Set[InstructionSet]
def __init__(self, mmu: MMU, instruction_sets: List[Type[InstructionSet]]):
self.mmu = mmu
self.instruction_sets = set()
self.instructions = dict()
for set_class in instruction_sets:
ins_set = set_class(self)
self.instructions.update(ins_set.load())
self.instruction_sets.add(ins_set)
self.cycle = 0
self.pc = 0
self.debugger_active = False
self.sections = list()
self.programs = list()
def run_instruction(self, ins: Instruction):
"""
Execute a single instruction
:param ins: The instruction to execute
"""
if ins.name in self.instructions:
self.instructions[ins.name](ins)
else:
# this should never be reached, as unknown instructions are imparseable
raise RuntimeError("Unknown instruction: {}".format(ins))
def load_program(self, program: Program):
self.mmu.load_program(program)
def __repr__(self):
"""
Returns a representation of the CPU and some of its state.
"""
return "{}(pc=0x{:08X}, cycle={}, halted={} instructions={})".format(
self.__class__.__name__,
self.pc,
self.cycle,
self.halted,
" ".join(s.name for s in self.instruction_sets)
)

Loading…
Cancel
Save