diff --git a/.gitignore b/.gitignore index d75edea..57e5c62 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ venv -__pycache__ \ No newline at end of file +__pycache__ +.mypy_cache diff --git a/requirements.txt b/requirements.txt index e69de29..7a447fe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1 @@ +pyelftools~=0.27 \ No newline at end of file diff --git a/riscemu/CPU.py b/riscemu/CPU.py index a51103d..962ef85 100644 --- a/riscemu/CPU.py +++ b/riscemu/CPU.py @@ -34,6 +34,9 @@ class CPU: It is initialized with a configuration and a list of instruction sets. """ + + INS_XLEN = 1 + def __init__(self, conf: RunConfig, instruction_sets: List[Type['riscemu.InstructionSet']]): """ Creates a CPU instance. @@ -44,8 +47,8 @@ class CPU: # setup CPU states self.pc = 0 self.cycle = 0 - self.exit = False - self.exit_code = 0 + self.exit: bool = False + self.exit_code: int = 0 self.conf = conf self.active_debug = False # if a debugging session is currently runnign @@ -94,7 +97,7 @@ class CPU: print(FMT_CPU + '[CPU] Allocated {} bytes of stack'.format(self.stack.size) + FMT_NONE) print(FMT_CPU + '[CPU] Started running from 0x{:08X} ({})'.format(le.run_ptr, le.name) + FMT_NONE) - self.__run() + self._run() def continue_from_debugger(self, verbose=True): """ @@ -102,7 +105,7 @@ class CPU: :param verbose: If True, will print each executed instruction to STDOUT """ - self.__run(verbose) + self._run(verbose) def step(self): """ @@ -115,15 +118,15 @@ class CPU: self.cycle += 1 ins = self.mmu.read_ins(self.pc) print(FMT_CPU + " Running 0x{:08X}:{} {}".format(self.pc, FMT_NONE, ins)) - self.pc += 1 + self.pc += self.INS_XLEN self.run_instruction(ins) except LaunchDebuggerException: print(FMT_CPU + "[CPU] Returning to debugger!" + FMT_NONE) except RiscemuBaseException as ex: - self.pc -= 1 + self.pc -= self.INS_XLEN print(ex.message()) - def __run(self, verbose=False): + def _run(self, verbose=False): if self.pc <= 0: return False ins = None @@ -133,20 +136,19 @@ class CPU: ins = self.mmu.read_ins(self.pc) if verbose: print(FMT_CPU + " Running 0x{:08X}:{} {}".format(self.pc, FMT_NONE, ins)) - self.pc += 1 + self.pc += self.INS_XLEN self.run_instruction(ins) except RiscemuBaseException as ex: if not isinstance(ex, LaunchDebuggerException): print(FMT_ERROR + "[CPU] excpetion caught at 0x{:08X}: {}:".format(self.pc - 1, ins) + FMT_NONE) print(ex.message()) - self.pc -= 1 + self.pc -= self.INS_XLEN if self.active_debug: print(FMT_CPU + "[CPU] Returning to debugger!" + FMT_NONE) return if self.conf.debug_on_exception: - launch_debug_session(self, self.mmu, self.regs, - "Exception encountered, launching debug:") + launch_debug_session(self, self.mmu, self.regs, "Exception encountered, launching debug:") if self.exit: print() @@ -178,7 +180,8 @@ class CPU: """ Returns a representation of the CPU and some of its state. """ - return "CPU(pc=0x{:08X}, cycle={}, exit={}, instructions={})".format( + return "{}(pc=0x{:08X}, cycle={}, exit={}, instructions={})".format( + self.__class__.__name__, self.pc, self.cycle, self.exit, diff --git a/riscemu/IO/IOModule.py b/riscemu/IO/IOModule.py new file mode 100644 index 0000000..21d6a97 --- /dev/null +++ b/riscemu/IO/IOModule.py @@ -0,0 +1,22 @@ +from abc import ABC, abstractmethod + + +class IOModule(ABC): + addr: int + size: int + + def __init__(self, addr: int, size: int): + self.addr = addr + self.size = size + + @abstractmethod + def read(self, addr: int, size: int): + pass + + @abstractmethod + def write(self, addr: int, data: bytearray, size: int): + pass + + def contains(self, addr, size: int = 0): + return self.addr <= addr < self.addr + self.size and \ + self.addr <= addr + size <= self.addr + self.size diff --git a/riscemu/IO/TextIO.py b/riscemu/IO/TextIO.py new file mode 100644 index 0000000..d48d465 --- /dev/null +++ b/riscemu/IO/TextIO.py @@ -0,0 +1,95 @@ +from .IOModule import IOModule +from ..priv.Exceptions import InstructionAccessFault +from ..helpers import int_from_bytes +from threading import Thread +import time + + +def _window_loop(textIO: 'TextIO'): + #textIO.set_sg_window(None) + #return + try: + import PySimpleGUI as sg + + logs = sg.Text(font="monospace") + col = sg.Column([[logs]], size=(640, 400), scrollable=True) + window = sg.Window("TextIO:{:x}".format(textIO.addr), [[col]]) + lines = list() + + window.finalize() + textIO.set_sg_window(window) + while True: + e, v = window.read() + if e == sg.WINDOW_CLOSED: + window.close() + textIO.set_sg_window(None) + break + if e == 'putlog': + lines.insert(0, v[0]) + logs.update(value='\n'.join(lines) + '\n') + col.contents_changed() + + except ImportError: + print("[TextIO] module works best with PySimpleGui!") + textIO.set_sg_window(None) + + +class TextIO(IOModule): + def __init__(self, addr: int, buflen: int = 128): + super(TextIO, self).__init__(addr, buflen + 4) + self.buff = bytearray(buflen) + self.current_line = "" + self.sg_window = None + self.start_buffer = list() + + self.thread = Thread(target=_window_loop, args=(self,)) + self.thread.start() + time.sleep(0.1) + + def set_sg_window(self, window): + if self.sg_window is not None and window is not None: + raise Exception("cannot set window twice!") + self.sg_window = window + + buff = self.start_buffer + self.start_buffer = None if window is None else list() + + for line in buff: + self._present(line) + + def read(self, addr: int, size: int) -> bytearray: + raise InstructionAccessFault(addr) + + def write(self, addr: int, data: bytearray, size: int): + if addr == self.addr: + if size > 4: + raise InstructionAccessFault(addr) + if int_from_bytes(data[0:4]) > 0: + self._print() + return + buff_start = addr - self.addr - 4 + self.buff[buff_start:buff_start + size] = data[0:size] + + def _print(self): + buff = self.buff + self.buff = bytearray(self.size) + if b'\x00' in buff: + buff = buff.split(b'\x00')[0] + text = buff.decode('ascii') + if '\n' in text: + lines = text.split("\n") + lines[0] = self.current_line + lines[0] + for line in lines[:-1]: + self._present(line) + self.current_line = lines[-1] + else: + self.current_line += text + + def _present(self, text: str): + if self.sg_window is not None: + self.sg_window.write_event_value('putlog', text) + elif self.start_buffer is not None: + self.start_buffer.append(text) + else: + print("[TextIO:{:x}] {}".format(self.addr, text)) + diff --git a/riscemu/IO/__init__.py b/riscemu/IO/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/riscemu/MMU.py b/riscemu/MMU.py index e4e4bfc..0d9c8ff 100644 --- a/riscemu/MMU.py +++ b/riscemu/MMU.py @@ -47,9 +47,11 @@ class MMU: The global symbol table """ + last_ins_sec: Optional[LoadedMemorySection] + def __init__(self, conf: RunConfig): """ - Create a new MMU, respeccting the active RunConfiguration + Create a new MMU, respecting the active RunConfiguration :param conf: The config to respect """ @@ -58,6 +60,7 @@ class MMU: self.first_free_addr: int = 0x100 self.conf: RunConfig = conf self.global_symbols: Dict[str, int] = dict() + self.last_ins_sec = None def load_bin(self, exe: Executable) -> LoadedExecutable: """ @@ -140,7 +143,11 @@ class MMU: :param addr: The location :return: The Instruction """ + sec = self.last_ins_sec + if sec is not None and sec.base <= addr < sec.base + sec.size: + return sec.read_instruction(addr - sec.base) sec = self.get_sec_containing(addr) + self.last_ins_sec = sec if sec is None: print(FMT_MEM + "[MMU] Trying to read instruction form invalid region! " "Have you forgotten an exit syscall or ret statement?" + FMT_NONE) diff --git a/riscemu/Registers.py b/riscemu/Registers.py index 2a58006..3cc10db 100644 --- a/riscemu/Registers.py +++ b/riscemu/Registers.py @@ -95,15 +95,19 @@ class Registers: :return: If the operation was successful """ if reg == 'zero': - print("[Registers.set] trying to set read-only register: {}".format(reg)) return False - if reg not in Registers.all_registers(): - raise InvalidRegisterException(reg) + #if reg not in Registers.all_registers(): + # raise InvalidRegisterException(reg) # replace fp register with s1, as these are the same register if reg == 'fp': reg = 's1' if mark_set: self.last_set = reg + # check 32 bit signed bounds + if val < -2147483648: + val = -2147483648 + elif val > 2147483647: + val = 2147483647 self.vals[reg] = val return True @@ -114,8 +118,8 @@ class Registers: :param mark_read: If the register should be markes as "last read" (only used internally) :return: The contents of register reg """ - if reg not in Registers.all_registers(): - raise InvalidRegisterException(reg) + #if reg not in Registers.all_registers(): + # raise InvalidRegisterException(reg) if reg == 'fp': reg = 's0' if mark_read: diff --git a/riscemu/colors.py b/riscemu/colors.py index 8b5ddbb..c2a7182 100644 --- a/riscemu/colors.py +++ b/riscemu/colors.py @@ -26,3 +26,4 @@ FMT_PARSE = FMT_CYAN + FMT_BOLD FMT_CPU = FMT_BLUE + FMT_BOLD FMT_SYSCALL = FMT_YELLOW + FMT_BOLD FMT_DEBUG = FMT_MAGENTA + FMT_BOLD +FMT_CSR = FMT_ORANGE + FMT_BOLD \ No newline at end of file diff --git a/riscemu/debug.py b/riscemu/debug.py index 9e6704d..d8a656d 100644 --- a/riscemu/debug.py +++ b/riscemu/debug.py @@ -45,9 +45,7 @@ def launch_debug_session(cpu: 'CPU', mmu: 'MMU', reg: 'Registers', prompt=""): print("Invalid arg count!") return bin = mmu.get_bin_containing(cpu.pc) - if bin is None: - print(FMT_DEBUG + '[Debugger] Not in a section, can\'t execute instructions!' + FMT_NONE) - return + ins = LoadedInstruction(name, list(args), bin) print(FMT_DEBUG + "Running instruction " + ins + FMT_NONE) cpu.run_instruction(ins) diff --git a/riscemu/decoder/__init__.py b/riscemu/decoder/__init__.py new file mode 100644 index 0000000..e68396a --- /dev/null +++ b/riscemu/decoder/__init__.py @@ -0,0 +1 @@ +from .decoder import decode, RISCV_REGS diff --git a/riscemu/decoder/__main__.py b/riscemu/decoder/__main__.py new file mode 100644 index 0000000..98bc146 --- /dev/null +++ b/riscemu/decoder/__main__.py @@ -0,0 +1,17 @@ +if __name__ == '__main__': + import code + import readline + import rlcompleter + + from .decoder import * + from .formats import * + from .instruction_table import * + from .regs import RISCV_REGS + + sess_vars = globals() + sess_vars.update(locals()) + + readline.set_completer(rlcompleter.Completer(sess_vars).complete) + readline.set_completer(rlcompleter.Completer(sess_vars).complete) + readline.parse_and_bind("tab: complete") + code.InteractiveConsole(sess_vars).interact(banner="Interaktive decoding session started...", exitmsg="Closing...") diff --git a/riscemu/decoder/decoder.py b/riscemu/decoder/decoder.py new file mode 100644 index 0000000..4bdaed8 --- /dev/null +++ b/riscemu/decoder/decoder.py @@ -0,0 +1,88 @@ +from .instruction_table import * +from typing import Tuple, List + + +def print_ins(ins: int): + print(" f7 rs2 rs1 f3 rd op") + print( + f"0b{ins >> 25 :07b}_{(ins >> 20) & 0b11111:05b}_{(ins >> 15) & 0b11111:05b}_{(ins >> 12) & 0b111:03b}_{(ins >> 7) & 0b11111:05b}_{ins & 0b1111111:07b}"); + + +STATIC_INSN: Dict[int, Tuple[str, List[int], int]] = { + 0x00000013: ("nop", [], 0x00000013), + 0x00008067: ("ret", [], 0x00008067), + 0xfe010113: ("addi", [2, 2, -32], 0xfe010113), + 0x02010113: ("addi", [2, 2, 32], 0x02010113), + 0x00100073: ("ebreak", [], 0x00100073), + 0x00000073: ("ecall", [], 0x00000073), + 0x30200073: ("mret", [], 0x30200073), + 0x00200073: ("uret", [], 0x00200073), + 0x10200073: ("sret", [], 0x10200073), + 0x10500073: ("wfi", [], 0x10500073), +} + + +def int_from_ins(insn: bytearray): + return int.from_bytes(insn, 'little') + + +def name_from_insn(ins: int): + opcode = op(ins) + if opcode not in RV32: + print_ins(ins) + raise RuntimeError(f"Invalid opcode: {opcode:0x} in insn {ins:x}") + dec = RV32[opcode] + + if isinstance(dec, str): + return dec + + fun3 = funct3(ins) + if fun3 not in dec: + print_ins(ins) + raise RuntimeError(f"Invalid funct3: {fun3:0x} in insn {ins:x}") + + dec = dec[fun3] + if isinstance(dec, str): + return dec + + if opcode == 0x1c and fun3 == 0: + # we have ecall/ebreak + token = imm110(ins) + if token in dec: + return dec[token] + print_ins(ins) + raise RuntimeError(f"Invalid instruction in ebreak/ecall region: {ins:x}") + + fun7 = funct7(ins) + if opcode == 0b1011 and fun3 == 0b10: + # ignore the two aq/lr bits located in the fun7 block + # riscemu has no memory reordering, therefore we don't need to look at these bits ever + fun7 = fun7 >> 2 + + if fun7 in dec: + if opcode == 0x0C or (opcode == 0x04 and fun3 == 5): + dec = dec[fun7] + return dec + print("unknown instruction?!") + return dec[fun7] + + print_ins(ins) + raise RuntimeError(f"Invalid instruction: {ins:x}") + + +def decode(ins: Union[bytearray, bytes]) -> Tuple[str, List[int], int]: + insn = int_from_ins(ins) + + if insn & 3 != 3: + print_ins(insn) + raise RuntimeError("Not a RV32 instruction!") + + if insn in STATIC_INSN: + return STATIC_INSN[insn] + + opcode = op(insn) + if opcode not in INSTRUCTION_ARGS_DECODER: + print_ins(insn) + raise RuntimeError("No instruction decoder found for instruction") + + return name_from_insn(insn), INSTRUCTION_ARGS_DECODER[opcode](insn), insn diff --git a/riscemu/decoder/formats.py b/riscemu/decoder/formats.py new file mode 100644 index 0000000..2f9af85 --- /dev/null +++ b/riscemu/decoder/formats.py @@ -0,0 +1,117 @@ +from typing import Dict, Callable, List, Union +from .regs import RISCV_REGS + +def op(ins: int): + return (ins >> 2) & 31 + + +def rd(ins: int): + return (ins >> 7) & 31 + + +def funct3(ins: int): + return (ins >> 12) & 7 + + +def rs1(ins: int): + return (ins >> 15) & 31 + + +def rs2(ins: int): + return (ins >> 20) & 31 + + +def funct7(ins: int): + return ins >> 25 + + +def imm110(ins: int): + return ins >> 20 + + +def imm3112(ins: int): + return ins >> 12 + + +def imm_i(ins: int): + return sign_extend(imm110(ins), 12) + + +def imm_s(ins: int): + num = (funct7(ins) << 5) + rd(ins) + return sign_extend(num, 12) + + +def imm_b(ins: int): + lower = rd(ins) + higher = funct7(ins) + + num = (lower & 0b11110) + ((higher & 0b0111111) << 5) + ((lower & 1) << 11) + ((higher >> 6) << 12) + return sign_extend(num, 13) + + +def imm_u(ins: int): + return sign_extend(imm3112(ins), 20) + + +def imm_j(ins: int): + return sign_extend( + (((ins >> 21) & 0b1111111111) << 1) + + (((ins >> 20) & 1) << 11) + + (((ins >> 12) & 0b11111111) << 12) + + (((ins >> 31) & 1) << 20), 21 + ) + + +def sign_extend(num, bits): + sign_mask = 1 << (bits - 1) + return (num & (sign_mask - 1)) - (num & sign_mask) + + +def decode_i(ins: int) -> List[int]: + return [rd(ins), rs1(ins), imm_i(ins)] + + +def decode_b(ins: int) -> List[int]: + return [rs1(ins), rs2(ins), imm_b(ins)] + + +def decode_u(ins: int) -> List[int]: + return [rd(ins), imm_u(ins)] + + +def decode_r(ins: int) -> List[int]: + return [rd(ins), rs1(ins), rs2(ins)] + + +def decode_s(ins: int) -> List[int]: + return [rs2(ins), rs1(ins), imm_s(ins)] + + +def decode_j(ins: int) -> List[int]: + return [rd(ins), imm_j(ins)] + + +def decode_i_shamt(ins: int) -> List[int]: + if funct3(ins) in (1, 5): + return [rd(ins), rs1(ins), rs2(ins)] + return decode_i(ins) + + +def decode_i_unsigned(ins: int) -> List[int]: + return [rd(ins), rs1(ins), imm110(ins)] + + +INSTRUCTION_ARGS_DECODER: Dict[int, Callable[[int], List[int]]] = { + 0x00: decode_i, + 0x04: decode_i_shamt, + 0x05: decode_u, + 0x08: decode_s, + 0x0C: decode_r, + 0x0D: decode_u, + 0x18: decode_b, + 0x19: decode_i, + 0x1b: decode_j, + 0x1c: decode_i_unsigned, + 0b1011: decode_r +} diff --git a/riscemu/decoder/instruction_table.py b/riscemu/decoder/instruction_table.py new file mode 100644 index 0000000..84dbd2e --- /dev/null +++ b/riscemu/decoder/instruction_table.py @@ -0,0 +1,81 @@ +from collections import defaultdict +from .formats import * + +tbl = lambda: defaultdict(tbl) + +RV32 = tbl() +RV32[0x1b] = "jal" +RV32[0x0D] = "lui" +RV32[0x05] = "auipc" +RV32[0x19][0] = "jalr" + +RV32[0x04][0] = "addi" +RV32[0x04][1] = "slli" +RV32[0x04][2] = "slti" +RV32[0x04][3] = "sltiu" +RV32[0x04][4] = "xori" +RV32[0x04][5][0x00] = "srli" +RV32[0x04][5][0x20] = "srai" +RV32[0x04][6] = "ori" +RV32[0x04][7] = "andi" + +RV32[0x18][0] = "beq" +RV32[0x18][1] = "bne" +RV32[0x18][4] = "blt" +RV32[0x18][5] = "bge" +RV32[0x18][6] = "bltu" +RV32[0x18][7] = "bgeu" + +RV32[0x00][0] = "lb" +RV32[0x00][1] = "lh" +RV32[0x00][2] = "lw" +RV32[0x00][4] = "lbu" +RV32[0x00][5] = "lhu" + +RV32[0x08][0] = "sb" +RV32[0x08][1] = "sh" +RV32[0x08][2] = "sw" + +RV32[0x1c][1] = "csrrw" +RV32[0x1c][2] = "csrrs" +RV32[0x1c][3] = "csrrc" +RV32[0x1c][5] = "csrrwi" +RV32[0x1c][6] = "csrrsi" +RV32[0x1c][7] = "csrrci" + +RV32[0x1c][0][0] = "ecall" +RV32[0x1c][0][1] = "ebreak" + +RV32[0x0C][0][0] = "add" +RV32[0x0C][0][32] = "sub" +RV32[0x0C][1][0] = "sll" +RV32[0x0C][2][0] = "slt" +RV32[0x0C][3][0] = "sltu" +RV32[0x0C][4][0] = "xor" +RV32[0x0C][5][0] = "srl" +RV32[0x0C][5][32] = "sra" +RV32[0x0C][6][0] = "or" +RV32[0x0C][7][0] = "and" + +# rv32m +RV32[0x0C][0][1] = "mul" +RV32[0x0C][1][1] = "mulh" +RV32[0x0C][2][1] = "mulhsu" +RV32[0x0C][3][1] = "mulhu" +RV32[0x0C][4][1] = "div" +RV32[0x0C][5][1] = "divu" +RV32[0x0C][6][1] = "rem" +RV32[0x0C][7][1] = "remu" + +# rv32a +RV32[0b1011][0b10][0b00010] = "lr.w" +RV32[0b1011][0b10][0b00011] = "sc.w" +RV32[0b1011][0b10][0b00001] = "amoswap.w" +RV32[0b1011][0b10][0b00000] = "amoadd.w" +RV32[0b1011][0b10][0b00100] = "amoxor.w" +RV32[0b1011][0b10][0b01100] = "amoand.w" +RV32[0b1011][0b10][0b01000] = "amoor.w" +RV32[0b1011][0b10][0b10000] = "amomin.w" +RV32[0b1011][0b10][0b10100] = "amomax.w" +RV32[0b1011][0b10][0b11000] = "amominu.w" +RV32[0b1011][0b10][0b11100] = "amomaxu.w" diff --git a/riscemu/decoder/regs.py b/riscemu/decoder/regs.py new file mode 100644 index 0000000..44d70ac --- /dev/null +++ b/riscemu/decoder/regs.py @@ -0,0 +1,6 @@ +RISCV_REGS = [ + 'zero', 'ra', 'sp', 'gp', 'tp', 't0', 't1', 't2', + 's0', 's1', 'a0', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', + 's2', 's3', 's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11', + 't3', 't4', 't5', 't6' +] diff --git a/riscemu/helpers.py b/riscemu/helpers.py index 97458cc..85b64af 100644 --- a/riscemu/helpers.py +++ b/riscemu/helpers.py @@ -33,19 +33,14 @@ def int_to_bytes(val, bytes=4, unsigned=False) -> bytearray: """ if unsigned and val < 0: raise NumberFormatException("unsigned negative number!") - return bytearray([ - (val >> ((bytes - i - 1) * 8)) & 0xFF for i in range(bytes) - ]) + return bytearray(to_unsigned(val, bytes).to_bytes(bytes, 'little')) def int_from_bytes(bytes, unsigned=False) -> int: """ byte -> int (two's complement) """ - num = 0 - for b in bytes: - num = num << 8 - num += b + num = int.from_bytes(bytes, 'little') if unsigned: return num @@ -55,7 +50,7 @@ def int_from_bytes(bytes, unsigned=False) -> int: def to_unsigned(num: int, bytes=4) -> int: if num < 0: - return 2 ** (bytes * 8) + num + return (2 ** (bytes * 8)) + num return num diff --git a/riscemu/instructions/InstructionSet.py b/riscemu/instructions/InstructionSet.py index f5d0f01..6b55e7d 100644 --- a/riscemu/instructions/InstructionSet.py +++ b/riscemu/instructions/InstructionSet.py @@ -29,8 +29,6 @@ class InstructionSet(ABC): """ self.name = self.__class__.__name__ self.cpu = cpu - self.mmu = cpu.mmu - self.regs = cpu.regs def load(self) -> Dict[str, Callable[['LoadedInstruction'], None]]: """ @@ -132,6 +130,14 @@ class InstructionSet(ABC): def pc(self, val): self.cpu.pc = val + @property + def mmu(self): + return self.cpu.mmu + + @property + def regs(self): + return self.cpu.regs + def __repr__(self): return "InstructionSet[{}] with {} instructions".format( self.__class__.__name__, diff --git a/riscemu/instructions/RV32A.py b/riscemu/instructions/RV32A.py new file mode 100644 index 0000000..9432c83 --- /dev/null +++ b/riscemu/instructions/RV32A.py @@ -0,0 +1,78 @@ +from .InstructionSet import InstructionSet, LoadedInstruction +from ..Exceptions import INS_NOT_IMPLEMENTED +from ..helpers import int_from_bytes, int_to_bytes, to_unsigned, to_signed + + +class RV32A(InstructionSet): + """ + The RV32A instruction set. Currently, load-reserved and store conditionally are not supported + due to limitations in the way the MMU is implemented. Maybe a later implementation will add support + for this? + """ + + def instruction_lr_w(self, ins: 'LoadedInstruction'): + INS_NOT_IMPLEMENTED(ins) + + def instruction_sc_w(self, ins: 'LoadedInstruction'): + INS_NOT_IMPLEMENTED(ins) + + def instruction_amoswap_w(self, ins: 'LoadedInstruction'): + dest, addr, val = self.parse_rd_rs_rs(ins) + if dest == 'zero': + self.mmu.write(addr, int_to_bytes(addr, 4)) + else: + old = int_from_bytes(self.mmu.read(addr, 4)) + self.mmu.write(addr, int_to_bytes(val, 4)) + self.regs.set(dest, old) + + def instruction_amoadd_w(self, ins: 'LoadedInstruction'): + dest, addr, val = self.parse_rd_rs_rs(ins) + old = int_from_bytes(self.mmu.read(addr, 4)) + self.mmu.write(addr, int_to_bytes(old + val, 4)) + self.regs.set(dest, old) + + def instruction_amoand_w(self, ins: 'LoadedInstruction'): + dest, addr, val = self.parse_rd_rs_rs(ins) + old = int_from_bytes(self.mmu.read(addr, 4)) + self.mmu.write(addr, int_to_bytes(old & val, 4)) + self.regs.set(dest, old) + + def instruction_amoor_w(self, ins: 'LoadedInstruction'): + dest, addr, val = self.parse_rd_rs_rs(ins) + old = int_from_bytes(self.mmu.read(addr, 4)) + self.mmu.write(addr, int_to_bytes(old | val, 4)) + self.regs.set(dest, old) + + def instruction_amoxor_w(self, ins: 'LoadedInstruction'): + dest, addr, val = self.parse_rd_rs_rs(ins) + old = int_from_bytes(self.mmu.read(addr, 4)) + self.mmu.write(addr, int_to_bytes(old ^ val, 4)) + self.regs.set(dest, old) + + def instruction_amomax_w(self, ins: 'LoadedInstruction'): + dest, addr, val = self.parse_rd_rs_rs(ins) + old = int_from_bytes(self.mmu.read(addr, 4)) + self.mmu.write(addr, int_to_bytes(max(old, val), 4)) + self.regs.set(dest, old) + + def instruction_amomaxu_w(self, ins: 'LoadedInstruction'): + dest, addr, val = self.parse_rd_rs_rs(ins) + val = to_unsigned(val) + old = int_from_bytes(self.mmu.read(addr, 4), unsigned=True) + + self.mmu.write(addr, int_to_bytes(to_signed(max(old, val)), 4)) + self.regs.set(dest, old) + + def instruction_amomin_w(self, ins: 'LoadedInstruction'): + dest, addr, val = self.parse_rd_rs_rs(ins) + old = int_from_bytes(self.mmu.read(addr, 4)) + self.mmu.write(addr, int_to_bytes(min(old, val), 4)) + self.regs.set(dest, old) + + def instruction_amominu_w(self, ins: 'LoadedInstruction'): + dest, addr, val = self.parse_rd_rs_rs(ins) + val = to_unsigned(val) + old = int_from_bytes(self.mmu.read(addr, 4), unsigned=True) + + self.mmu.write(addr, int_to_bytes(to_signed(min(old, val)), 4)) + self.regs.set(dest, old) diff --git a/riscemu/instructions/RV32I.py b/riscemu/instructions/RV32I.py index 20a64aa..d8681d0 100644 --- a/riscemu/instructions/RV32I.py +++ b/riscemu/instructions/RV32I.py @@ -154,8 +154,7 @@ class RV32I(InstructionSet): ASSERT_LEN(ins.args, 2) reg = ins.get_reg(0) imm = to_unsigned(ins.get_imm(1)) - self.pc += (imm << 12) - self.regs.set(reg, self.pc) + self.regs.set(reg, self.pc + (imm << 12)) def instruction_xor(self, ins: 'LoadedInstruction'): rd, rs1, rs2 = self.parse_rd_rs_rs(ins) diff --git a/riscemu/instructions/__init__.py b/riscemu/instructions/__init__.py index fb6a17c..65bda29 100644 --- a/riscemu/instructions/__init__.py +++ b/riscemu/instructions/__init__.py @@ -9,7 +9,8 @@ This package holds all instruction sets, available to the processor from .InstructionSet import InstructionSet from .RV32M import RV32M from .RV32I import RV32I +from .RV32A import RV32A InstructionSetDict = { - v.__name__: v for v in [RV32I, RV32M] + v.__name__: v for v in [RV32I, RV32M, RV32A] } diff --git a/riscemu/priv/CSR.py b/riscemu/priv/CSR.py new file mode 100644 index 0000000..4a2cc7b --- /dev/null +++ b/riscemu/priv/CSR.py @@ -0,0 +1,138 @@ +from typing import Dict, Union, Callable, Optional +from collections import defaultdict +from .privmodes import PrivModes +from .Exceptions import InstructionAccessFault +from ..helpers import to_signed +from ..colors import FMT_CSR, FMT_NONE + +from .CSRConsts import CSR_NAME_TO_ADDR, MSTATUS_LEN_2, MSTATUS_OFFSETS + + +class CSR: + """ + This holds all Control and Status Registers (CSR) + """ + regs: Dict[int, int] + """ + All Control and Status Registers are stored here + """ + + virtual_regs: Dict[int, Callable[[], int]] + """ + list of virtual CSR registers, with values computed on read + """ + + listeners: Dict[int, Callable[[int, int], None]] + + mstatus_cache: Dict[str, int] + mstatus_cache_dirty = True + + def __init__(self): + self.regs = defaultdict(lambda: 0) + self.listeners = defaultdict(lambda: (lambda x, y: None)) + self.virtual_regs = dict() + self.mstatus_cache = dict() + # TODO: implement write masks (bitmasks which control writeable bits in registers + + def set(self, addr: Union[str, int], val: int): + addr = self._name_to_addr(addr) + if addr is None: + return + val = to_signed(val) + self.listeners[addr](self.regs[addr], val) + if addr == 0x300: + self.mstatus_cache_dirty = True + self.regs[addr] = val + + def get(self, addr: Union[str, int]) -> int: + addr = self._name_to_addr(addr) + if addr is None: + raise RuntimeError(f"Invalid CSR name: {addr}!") + if addr in self.virtual_regs: + return self.virtual_regs[addr]() + return self.regs[addr] + + def set_listener(self, addr: Union[str, int], listener: Callable[[int, int], None]): + addr = self._name_to_addr(addr) + if addr is None: + print("unknown csr address name: {}".format(addr)) + return + self.listeners[addr] = listener + + # mstatus properties + def set_mstatus(self, name: str, val: int): + """ + Set mstatus bits using this helper. mstatus is a 32 bit register, holding various machine status flags + Setting them by hand is super painful, so this helper allows you to set specific bits. + + Please make sure your supplied value has the correct width! + + :param name: + :param val: + :return: + """ + size = 2 if name in MSTATUS_LEN_2 else 1 + off = MSTATUS_OFFSETS[name] + mask = (2 ** size - 1) << off + old_val = self.get('mstatus') + erased = old_val & (~mask) + new_val = erased | (val << off) + self.set('mstatus', new_val) + + def get_mstatus(self, name) -> int: + if not self.mstatus_cache_dirty and name in self.mstatus_cache: + return self.mstatus_cache[name] + + size = 2 if name in MSTATUS_LEN_2 else 1 + off = MSTATUS_OFFSETS[name] + mask = (2 ** size - 1) << off + val = (self.get('mstatus') & mask) >> off + if self.mstatus_cache_dirty: + self.mstatus_cache = dict(name=val) + else: + self.mstatus_cache[name] = val + return val + + def callback(self, addr: Union[str, int]): + def inner(func: Callable[[int, int], None]): + self.set_listener(addr, func) + return func + + return inner + + def assert_can_read(self, mode: PrivModes, addr: int): + if (addr >> 8) & 3 > mode.value: + raise InstructionAccessFault(addr) + + def assert_can_write(self, mode: PrivModes, addr: int): + if (addr >> 8) & 3 > mode.value or addr >> 10 == 11: + raise InstructionAccessFault(addr) + + def _name_to_addr(self, addr: Union[str, int]) -> Optional[int]: + if isinstance(addr, str): + if addr not in CSR_NAME_TO_ADDR: + print("Unknown CSR register {}".format(addr)) + return None + return CSR_NAME_TO_ADDR[addr] + return addr + + def virtual_register(self, addr: Union[str, int]): + addr = self._name_to_addr(addr) + if addr is None: + print("unknown csr address name: {}".format(addr)) + + def inner(func: Callable[[], int]): + self.virtual_regs[addr] = func + return func + + return inner + + def dump_mstatus(self): + print(FMT_CSR + "[CSR] dumping mstatus:") + i = 0 + for name in MSTATUS_OFFSETS: + print(" {:<5} {}".format(name, self.get_mstatus(name)), end="") + if i % 6 == 5: + print() + i += 1 + print(FMT_NONE) diff --git a/riscemu/priv/CSRConsts.py b/riscemu/priv/CSRConsts.py new file mode 100644 index 0000000..b8b75bb --- /dev/null +++ b/riscemu/priv/CSRConsts.py @@ -0,0 +1,81 @@ +from typing import Dict, Tuple + +MCAUSE_TRANSLATION: Dict[Tuple[int, int], str]= { + (1, 0): 'User software interrupt', + (1, 1): 'Supervisor software interrupt', + (1, 3): 'Machine software interrupt', + (1, 4): 'User timer interrupt', + (1, 5): 'Supervisor timer interrupt', + (1, 7): 'Machine timer interrupt', + (1, 8): 'User external interrupt', + (1, 9): 'Supervisor external interrupt', + (1, 11): 'Machine external interrupt', + (0, 0): 'Instruction address misaligned', + (0, 1): 'Instruction access fault', + (0, 2): 'Illegal instruction', + (0, 3): 'Breakpoint', + (0, 4): 'Load address misaligned', + (0, 5): 'Load access fault', + (0, 6): 'Store/AMO address misaligned', + (0, 7): 'Store/AMO access fault', + (0, 8): 'environment call from user mode', + (0, 9): 'environment call from supervisor mode', + (0, 11): 'environment call from machine mode', + (0, 12): 'Instruction page fault', + (0, 13): 'Load page fault', + (0, 15): 'Store/AMO page fault', +} +""" +Assigns tuple (interrupt bit, exception code) to their respective readable names +""" + +MSTATUS_OFFSETS: Dict[str, int] = { + 'uie': 0, + 'sie': 1, + 'mie': 3, + 'upie': 4, + 'spie': 5, + 'mpie': 7, + 'spp': 8, + 'mpp': 11, + 'fs': 13, + 'xs': 15, + 'mpriv': 17, + 'sum': 18, + 'mxr': 19, + 'tvm': 20, + 'tw': 21, + 'tsr': 22, + 'sd': 31 +} +""" +Offsets for all mstatus bits +""" + +MSTATUS_LEN_2 = ('mpp', 'fs', 'xs') +""" +All mstatus parts that have length 2. All other mstatus parts have length 1 +""" + +CSR_NAME_TO_ADDR: Dict[str, int] = { + 'mstatus': 0x300, + 'misa': 0x301, + 'mie': 0x304, + 'mtvec': 0x305, + 'mepc': 0x341, + 'mcause': 0x342, + 'mtval': 0x343, + 'mip': 0x344, + 'mvendorid': 0xF11, + 'marchid': 0xF12, + 'mimpid': 0xF13, + 'mhartid': 0xF14, + 'time': 0xc01, + 'timeh': 0xc81, + 'halt': 0x789, + 'mtimecmp': 0x780, + 'mtimecmph': 0x781, +} +""" +Translation for named registers +""" \ No newline at end of file diff --git a/riscemu/priv/ElfLoader.py b/riscemu/priv/ElfLoader.py new file mode 100644 index 0000000..9f8a172 --- /dev/null +++ b/riscemu/priv/ElfLoader.py @@ -0,0 +1,158 @@ +from dataclasses import dataclass +from typing import List, Dict, Tuple + +from .Exceptions import * +from ..Exceptions import RiscemuBaseException +from ..Executable import MemoryFlags, LoadedMemorySection +from ..decoder import decode, RISCV_REGS +from ..helpers import FMT_PARSE, FMT_NONE, FMT_GREEN, FMT_BOLD + +FMT_ELF = FMT_GREEN + FMT_BOLD + +if typing.TYPE_CHECKING: + from elftools.elf.elffile import ELFFile + from elftools.elf.sections import Section, SymbolTableSection + +# This requires pyelftools package! + +INCLUDE_SEC = ('.text', '.stack', '.bss', '.sdata', '.sbss') + + +class ElfExecutable: + sections: List['ElfLoadedMemorySection'] + sections_by_name: Dict[str, 'ElfLoadedMemorySection'] + symbols: Dict[str, int] + run_ptr: int + + def __init__(self, name: str): + self.sections = list() + self.sections_by_name = dict() + self.symbols = dict() + + try: + from elftools.elf.elffile import ELFFile + from elftools.elf.sections import Section, SymbolTableSection + + with open(name, 'rb') as f: + print(FMT_ELF + "[ElfLoader] Loading elf executable from: {}".format(name) + FMT_NONE) + self._read_elf(ELFFile(f)) + except ImportError as e: + print(FMT_PARSE + "[ElfLoader] Cannot load elf files without PyElfTools package! You can install them using pip install pyelftools!" + FMT_NONE) + raise e + + def _read_elf(self, elf: 'ELFFile'): + if not elf.header.e_machine == 'EM_RISCV': + raise InvalidElfException("Not a RISC-V elf file!") + if not elf.header.e_ident.EI_CLASS == 'ELFCLASS32': + raise InvalidElfException("Only 32bit executables are supported!") + + self.run_ptr = elf.header.e_entry + + from elftools.elf.sections import SymbolTableSection + for sec in elf.iter_sections(): + if isinstance(sec, SymbolTableSection): + self._parse_symtab(sec) + continue + + if sec.name not in INCLUDE_SEC: + continue + + self.add_sec(self._lms_from_elf_sec(sec, 'kernel')) + + def _lms_from_elf_sec(self, sec: 'Section', owner: str): + is_code = sec.name in ('.text',) + data = bytearray(sec.data()) + flags = MemoryFlags(is_code, is_code) + print(FMT_ELF + "[ElfLoader] Section {} at: {:X}".format(sec.name, sec.header.sh_addr) + FMT_NONE) + return ElfLoadedMemorySection( + sec.name, + sec.header.sh_addr, + sec.data_size, + data, + flags, + owner + ) + + def _parse_symtab(self, symtab: 'SymbolTableSection'): + self.symbols = { + sym.name: sym.entry.st_value for sym in symtab.iter_symbols() if sym.name + } + + def add_sec(self, new_sec: 'ElfLoadedMemorySection'): + for sec in self.sections: + if sec.base < sec.end <= new_sec.base or sec.end > sec.base >= new_sec.end: + continue + else: + print(FMT_ELF + "[ElfLoader] Invalid elf layout: Two sections overlap: \n\t{}\n\t{}".format( + sec, new_sec + ) + FMT_NONE) + raise RuntimeError("Cannot load elf with overlapping sections!") + + self.sections.append(new_sec) + self.sections_by_name[new_sec.name] = new_sec + + +class InvalidElfException(RiscemuBaseException): + def __init__(self, msg: str): + super().__init__() + self.msg = msg + + def message(self): + return FMT_PARSE + "{}(\"{}\")".format(self.__class__.__name__, self.msg) + FMT_NONE + + +@dataclass(frozen=True) +class ElfInstruction: + name: str + args: List[int] + encoded: int + + def get_imm(self, num: int) -> int: + return self.args[num] + + def get_imm_reg(self, num: int) -> Tuple[int, int]: + return self.args[-1], self.args[-2] + + def get_reg(self, num: int) -> str: + return RISCV_REGS[self.args[num]] + + def __repr__(self) -> str: + if self.name == 'jal' and self.args[0] == 0: + return "j {}".format(self.args[1]) + elif self.name in ('lw', 'lh', 'lb', 'lbu', 'lhu', 'sw', 'sh', 'sb'): + args = "{}, {}({})".format( + RISCV_REGS[self.args[0]], self.args[2], RISCV_REGS[self.args[1]] + ) + else: + args = ", ".join(map(str, self.args)) + return "{:<8} {}".format( + self.name, + args + ) + + +class ElfLoadedMemorySection(LoadedMemorySection): + ins_cache: List[Optional[ElfInstruction]] + """ + A fast cache for accessing pre-decoded instructions + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__setattr__('ins_cache', [None] * (self.size // 4)) + + def read_instruction(self, offset): + if self.ins_cache[offset//4] is not None: + return self.ins_cache[offset//4] + if not self.flags.executable: + print(FMT_PARSE + "Reading instruction from non-executable memory!" + FMT_NONE) + raise InstructionAccessFault(offset + self.base) + if offset % 4 != 0: + raise InstructionAddressMisalignedTrap(offset + self.base) + ins = ElfInstruction(*decode(self.content[offset:offset + 4])) + self.ins_cache[offset // 4] = ins + return ins + + @property + def end(self): + return self.size + self.base diff --git a/riscemu/priv/Exceptions.py b/riscemu/priv/Exceptions.py new file mode 100644 index 0000000..fee6217 --- /dev/null +++ b/riscemu/priv/Exceptions.py @@ -0,0 +1,91 @@ +from typing import Optional, NewType +from enum import Enum +from .privmodes import PrivModes +from .CSRConsts import MCAUSE_TRANSLATION + +import typing + +if typing.TYPE_CHECKING: + from .ElfLoader import ElfInstruction + + +class CpuTrapType(Enum): + TIMER = 1 + SOFTWARE = 2 + EXTERNAL = 3 + EXCEPTION = 4 + + +class CpuTrap(BaseException): + code: int + """ + 31-bit value encoding the exception code in the mstatus register + """ + interrupt: int + """ + The isInterrupt bit in the mstatus register + """ + + mtval: int + """ + contents of the mtval register + """ + + type: CpuTrapType + """ + The type (timer, external, software) of the trap + """ + + priv: PrivModes + """ + The privilege level this trap targets + """ + + def __init__(self, code: int, mtval, type: CpuTrapType, priv: PrivModes = PrivModes.MACHINE): + self.interrupt = 0 if type == CpuTrapType.EXCEPTION else 1 + self.code = code + self.mtval = mtval + self.priv = priv + self.type = type + + @property + def mcause(self): + return (self.interrupt << 31) + self.code + + def __repr__(self): + name = "Reserved interrupt({}, {})".format(self.interrupt, self.code) + + if (self.interrupt, self.code) in MCAUSE_TRANSLATION: + name = MCAUSE_TRANSLATION[(self.interrupt, self.code)] + "({}, {})".format(self.interrupt, self.code) + + return "{} {{priv={}, type={}, mtval={:x}}}".format( + name, self.priv.name, self.type.name, self.mtval + ) + + def __str__(self): + return self.__repr__() + + +class IllegalInstructionTrap(CpuTrap): + def __init__(self, ins: 'ElfInstruction'): + super().__init__(2, ins.encoded, CpuTrapType.EXCEPTION) + + +class InstructionAddressMisalignedTrap(CpuTrap): + def __init__(self, addr: int): + super().__init__(0, addr, CpuTrapType.EXCEPTION) + + +class InstructionAccessFault(CpuTrap): + def __init__(self, addr: int): + super().__init__(1, addr, CpuTrapType.EXCEPTION) + + +class TimerInterrupt(CpuTrap): + def __init__(self): + super().__init__(7, 0, CpuTrapType.TIMER) + + +class EcallTrap(CpuTrap): + def __init__(self, mode: PrivModes): + super().__init__(mode.value + 8, 0, CpuTrapType.EXCEPTION) diff --git a/riscemu/priv/ImageLoader.py b/riscemu/priv/ImageLoader.py new file mode 100644 index 0000000..88c8f6b --- /dev/null +++ b/riscemu/priv/ImageLoader.py @@ -0,0 +1,119 @@ +""" +Laods a memory image with debug information into memory +""" + +from .PrivMMU import PrivMMU +from ..Config import RunConfig +from ..Executable import Executable, LoadedExecutable, LoadedMemorySection, LoadedInstruction, MemoryFlags +from .ElfLoader import ElfInstruction, ElfLoadedMemorySection, InstructionAccessFault, InstructionAddressMisalignedTrap +from ..decoder import decode +from ..IO.IOModule import IOModule +from .privmodes import PrivModes +from ..colors import FMT_ERROR, FMT_NONE +import json + +from functools import lru_cache +from typing import Dict, List, Tuple, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from .PrivCPU import PrivCPU + + +class MemoryImageMMU(PrivMMU): + io: List[IOModule] + data: bytearray + io_start: int + debug_info: Dict[str, Dict[str, Dict[str, str]]] + + def __init__(self, file_name: str, io_start: int = 0xFF0000): + super(MemoryImageMMU, self).__init__(conf=RunConfig()) + + with open(file_name, 'rb') as memf: + data = memf.read() + with open(file_name + '.dbg', 'r') as dbgf: + debug_info: Dict = json.load(dbgf) + + self.data = bytearray(data) + # TODO: super wasteful memory allocation happening here + if len(data) < io_start: + self.data += bytearray(io_start - len(data)) + self.debug_info = debug_info + self.io_start = io_start + self.io = list() + + def get_entrypoint(self): + try: + start = self.debug_info['symbols']['kernel'].get('_start', None) + if start is not None: + return start + return self.debug_info['symbols']['kernel'].get('_ftext') + except KeyError: + print(FMT_ERROR + '[MMU] cannot find kernel entry in debug information! Falling back to 0x100' + FMT_NONE) + return 0x100 + + @lru_cache(maxsize=2048) + def read_ins(self, addr: int) -> ElfInstruction: + if addr >= self.io_start: + raise InstructionAccessFault(addr) + if addr % 4 != 0: + raise InstructionAddressMisalignedTrap(addr) + + return ElfInstruction(*decode(self.data[addr:addr + 4])) + + def read(self, addr: int, size: int) -> bytearray: + if addr < 0x100: + pc = self.cpu.pc + text_sec = self.get_sec_containing(pc) + print(FMT_ERROR + "[MMU] possible null dereference (read {:x}) from (pc={:x},sec={},rel={:x})".format( + addr, pc, text_sec.owner + ':' + text_sec.name, pc - text_sec.base + ) + FMT_NONE) + if addr >= self.io_start: + return self.io_at(addr).read(addr, size) + return self.data[addr: addr + size] + + def write(self, addr: int, size: int, data): + if addr < 0x100: + pc = self.cpu.pc + text_sec = self.get_sec_containing(pc) + print(FMT_ERROR + "[MMU] possible null dereference (write {:x}) from (pc={:x},sec={},rel={:x})".format( + addr, pc, text_sec.owner + ':' + text_sec.name, pc - text_sec.base + ) + FMT_NONE) + + if addr >= self.io_start: + return self.io_at(addr).write(addr, data, size) + self.data[addr:addr + size] = data[0:size] + + def io_at(self, addr) -> IOModule: + for mod in self.io: + if mod.contains(addr): + return mod + raise InstructionAccessFault(addr) + + def add_io(self, io: IOModule): + self.io.append(io) + + def __repr__(self): + return "MemoryImageMMU()" + + @lru_cache(maxsize=32) + def get_sec_containing(self, addr: int) -> Optional[LoadedMemorySection]: + next_sec = len(self.data) + for sec_addr, name in reversed(self.debug_info['sections'].items()): + if addr >= int(sec_addr): + owner, name = name.split(':') + base = int(sec_addr) + size = next_sec - base + flags = MemoryFlags('.text' in name, '.text' in name) + return ElfLoadedMemorySection(name, base, size, self.data[base:next_sec], flags, owner) + else: + next_sec = int(sec_addr) + + def translate_address(self, addr: int): + sec = self.get_sec_containing(addr) + if sec.name == '.empty': + return "" + symbs = self.debug_info['symbols'][sec.owner] + for sym, val in reversed(symbs.items()): + if addr >= val: + return "{}{:+x} ({}:{})".format(sym, addr - val, sec.owner, sec.name) + return "{}:{}{:+x}".format(sec.owner, sec.name, addr - sec.base) diff --git a/riscemu/priv/PrivCPU.py b/riscemu/priv/PrivCPU.py new file mode 100644 index 0000000..e3886ac --- /dev/null +++ b/riscemu/priv/PrivCPU.py @@ -0,0 +1,244 @@ +""" +RiscEmu (c) 2021 Anton Lydike + +SPDX-License-Identifier: MIT +""" +import time + +from riscemu.CPU import * +from .CSR import CSR +from .ElfLoader import ElfExecutable +from .ImageLoader import MemoryImageMMU +from .Exceptions import * +from .PrivMMU import PrivMMU +from ..IO import TextIO +from .PrivRV32I import PrivRV32I +from .privmodes import PrivModes +from ..instructions import RV32A, RV32M +import json + +if typing.TYPE_CHECKING: + from riscemu import Executable, LoadedExecutable, LoadedInstruction + from riscemu.instructions.InstructionSet import InstructionSet + + +class PrivCPU(CPU): + """ + This is a CPU that has different modes, instruction sets and registers. + + It should support M and U Mode, but no U-Mode Traps. + + This allows us to + """ + + csr: CSR + """ + Reference to the control and status registers + """ + + TIME_RESOLUTION_NS: int = 1000000 + """ + controls the resolution of the time csr register (in nanoseconds) + """ + + INS_XLEN = 4 + """ + Size of an instruction in memory. Should be 4, but since our loading code is shit, instruction take up + the equivalent of "1 byte" (this is actually impossible) + """ + + def __init__(self, conf, mmu: PrivMMU): + super().__init__(conf, [PrivRV32I, RV32M, RV32A]) + self.mode: PrivModes = PrivModes.MACHINE + + mmu.set_cpu(self) + self.pc = mmu.get_entrypoint() + self.mmu = mmu + + if hasattr(self.mmu, 'add_io'): + self.mmu.add_io(TextIO.TextIO(0xff0000, 64)) + + self.syscall_int = None + self.launch_debug = False + self.pending_traps: List[CpuTrap] = list() + + self._time_start = 0 + self._time_timecmp = 0 + self._time_interrupt_enabled = False + + # performance counters + self._perf_counters = list() + + # init csr + self._init_csr() + + def _run(self, verbose=False): + if self.pc <= 0: + return False + ins = None + try: + while not self.exit: + self.step(verbose) + except RiscemuBaseException as ex: + if isinstance(ex, LaunchDebuggerException): + self.launch_debug = True + self.pc += self.INS_XLEN + else: + print(FMT_ERROR + "[CPU] exception caught at 0x{:08X}: {}:".format(self.pc - 1, ins) + FMT_NONE) + print(ex.message()) + if self.conf.debug_on_exception: + self.launch_debug = True + if self.exit: + print() + print(FMT_CPU + "Program exited with code {}".format(self.exit_code) + FMT_NONE) + sys.exit(self.exit_code) + elif self.launch_debug: + self.launch_debug = False + launch_debug_session(self, self.mmu, self.regs, + "Launching debugger:") + if not self.active_debug: + self._run(verbose) + else: + print() + print(FMT_CPU + "Program stopped without exiting - perhaps you stopped the debugger?" + FMT_NONE) + + def load(self, e: riscemu.Executable): + raise NotImplementedError("Not supported!") + + def run_loaded(self, le: 'riscemu.LoadedExecutable'): + raise NotImplementedError("Not supported!") + + def get_tokenizer(self, tokenizer_input): + raise NotImplementedError("Not supported!") + + def run(self, verbose: bool = False): + print(FMT_CPU + '[CPU] Started running from 0x{:08X} ({})'.format(self.pc, "kernel") + FMT_NONE) + self._time_start = time.perf_counter_ns() // self.TIME_RESOLUTION_NS + self._run(verbose) + + def _init_csr(self): + # set up CSR + self.csr = CSR() + self.csr.set('mhartid', 0) # core id + # TODO: set correct value + self.csr.set('mimpid', 0) # implementation id + # set mxl to 1 (32 bit) and set bits for i and m isa + self.csr.set('misa', (1 << 30) + (1 << 8) + (1 << 12)) # available ISA + + # CSR write callbacks: + + @self.csr.callback('halt') + def halt(old: int, new: int): + if new != 0: + self.exit = True + self.exit_code = new + + @self.csr.callback('mstatus') + def mstatus(old: int, new: int): + pass + + @self.csr.callback('mtimecmp') + def mtimecmp(old, new): + self._time_timecmp = (self.csr.get('mtimecmph') << 32) + new + self._time_interrupt_enabled = True + + @self.csr.callback('mtimecmph') + def mtimecmph(old, new): + self._time_timecmp = (new << 32) + self.csr.get('mtimecmp') + self._time_interrupt_enabled = True + + # virtual CSR registers: + + @self.csr.virtual_register('time') + def get_time(): + return (time.perf_counter_ns() // self.TIME_RESOLUTION_NS - self._time_start) & (2 ** 32 - 1) + + @self.csr.virtual_register('timeh') + def get_timeh(): + return (time.perf_counter_ns() // self.TIME_RESOLUTION_NS - self._time_start) >> 32 + + # add minstret and mcycle counters + + def _handle_trap(self, trap: CpuTrap): + # implement trap handling! + self.pending_traps.append(trap) + + def step(self, verbose=True): + try: + self.cycle += 1 + if self.cycle % 20 == 0: + self._timer_step() + self._check_interrupt() + ins = self.mmu.read_ins(self.pc) + if verbose and self.mode == PrivModes.USER: + print(FMT_CPU + " Running 0x{:08X}:{} {}".format(self.pc, FMT_NONE, ins)) + self.run_instruction(ins) + self.pc += self.INS_XLEN + except CpuTrap as trap: + self._handle_trap(trap) + + def _timer_step(self): + if not self._time_interrupt_enabled: + return + if self._time_timecmp <= (time.perf_counter_ns() // self.TIME_RESOLUTION_NS) - self._time_start: + self.pending_traps.append(TimerInterrupt()) + self._time_interrupt_enabled = False + + def _check_interrupt(self): + if not (len(self.pending_traps) > 0 and self.csr.get_mstatus('mie')): + return + # select best interrupt + # TODO: actually select based on the official ranking + trap = self.pending_traps.pop() # use the most recent trap + if not isinstance(trap, TimerInterrupt) or True: + print(FMT_CPU + "[CPU] [{}] taking trap {}!".format(self.cycle, trap) + FMT_NONE) + + if trap.priv != PrivModes.MACHINE: + print(FMT_CPU + "[CPU] Trap not targeting machine mode encountered! - undefined behaviour!" + FMT_NONE) + raise Exception("Undefined behaviour!") + + if self.mode != PrivModes.USER: + print(FMT_CPU + "[CPU] Trap triggered outside of user mode?!" + FMT_NONE) + + self.csr.set_mstatus('mpie', self.csr.get_mstatus('mie')) + self.csr.set_mstatus('mpp', self.mode.value) + self.csr.set_mstatus('mie', 0) + self.csr.set('mcause', trap.mcause) + self.csr.set('mepc', self.pc-self.INS_XLEN) + self.csr.set('mtval', trap.mtval) + self.mode = trap.priv + mtvec = self.csr.get('mtvec') + if mtvec & 0b11 == 0: + self.pc = mtvec + if mtvec & 0b11 == 1: + self.pc = (mtvec & 0b11111111111111111111111111111100) + (trap.code * 4) + self.record_perf_profile() + if len(self._perf_counters) % 100 == 0: + self.show_perf() + + def show_perf(self): + timed = 0 + cycled = 0 + cps_list = list() + + print(FMT_CPU + "[CPU] Performance overview:") + for time_ns, cycle in self._perf_counters: + if cycled == 0: + cycled = cycle + timed = time_ns + continue + cps = (cycle - cycled) / (time_ns - timed) * 1000000000 + + # print(" {:03d} cycles in {:08d}ns ({:.2f} cycles/s)".format( + # cycle - cycled, + # time_ns - timed, + # cps + # )) + cycled = cycle + timed = time_ns + cps_list.append(cps) + print(" on average {:.0f} instructions/s".format(sum(cps_list) / len(cps_list)) + FMT_NONE) + self._perf_counters = list() + + def record_perf_profile(self): + self._perf_counters.append((time.perf_counter_ns(), self.cycle)) diff --git a/riscemu/priv/PrivMMU.py b/riscemu/priv/PrivMMU.py new file mode 100644 index 0000000..28f2ecf --- /dev/null +++ b/riscemu/priv/PrivMMU.py @@ -0,0 +1,41 @@ +from ..MMU import * +from abc import abstractmethod + +import typing + +from .ElfLoader import ElfExecutable + +if typing.TYPE_CHECKING: + from .PrivCPU import PrivCPU + + +class PrivMMU(MMU): + cpu: 'PrivCPU' + + @abstractmethod + def get_entrypoint(self) -> int: + raise + + def set_cpu(self, cpu: 'PrivCPU'): + self.cpu = cpu + + def translate_address(self, addr: int): + return "" + +class LoadedElfMMU(PrivMMU): + def __init__(self, elf: ElfExecutable): + super().__init__(conf=RunConfig()) + self.entrypoint = elf.symbols['_start'] + + self.binaries.append(elf) + for sec in elf.sections: + self.sections.append(sec) + + def load_bin(self, exe: Executable) -> LoadedExecutable: + raise NotImplementedError("This is a privMMU, it's initialized with a single ElfExecutable!") + + def allocate_section(self, name: str, req_size: int, flag: MemoryFlags): + raise NotImplementedError("Not supported!") + + def get_entrypoint(self): + return self.entrypoint diff --git a/riscemu/priv/PrivRV32I.py b/riscemu/priv/PrivRV32I.py new file mode 100644 index 0000000..ef56a75 --- /dev/null +++ b/riscemu/priv/PrivRV32I.py @@ -0,0 +1,160 @@ +""" +RiscEmu (c) 2021 Anton Lydike + +SPDX-License-Identifier: MIT +""" + +from ..instructions.RV32I import * +from ..Exceptions import INS_NOT_IMPLEMENTED +from .Exceptions import * +from .privmodes import PrivModes +from ..colors import FMT_CPU, FMT_NONE +import typing + +if typing.TYPE_CHECKING: + from riscemu.priv.PrivCPU import PrivCPU + + +class PrivRV32I(RV32I): + cpu: 'PrivCPU' + """ + This is an extension of RV32I, written for the PrivCPU class + """ + + def instruction_csrrw(self, ins: 'LoadedInstruction'): + rd, rs, csr_addr = self.parse_crs_ins(ins) + old_val = None + if rd != 'zero': + self.cpu.csr.assert_can_read(self.cpu.mode, csr_addr) + old_val = self.cpu.csr.get(csr_addr) + if rs != 'zero': + new_val = self.regs.get(rs) + self.cpu.csr.assert_can_write(self.cpu.mode, csr_addr) + self.cpu.csr.set(csr_addr, new_val) + if old_val is not None: + self.regs.set(rd, old_val) + + def instruction_csrrs(self, ins: 'LoadedInstruction'): + rd, rs, csr_addr = self.parse_crs_ins(ins) + if rs != 'zero': + # oh no, this should not happen! + INS_NOT_IMPLEMENTED(ins) + if rd != 'zero': + self.cpu.csr.assert_can_read(self.cpu.mode, csr_addr) + old_val = self.cpu.csr.get(csr_addr) + self.regs.set(rd, old_val) + + + def instruction_csrrc(self, ins: 'LoadedInstruction'): + INS_NOT_IMPLEMENTED(ins) + + def instruction_csrrsi(self, ins: 'LoadedInstruction'): + INS_NOT_IMPLEMENTED(ins) + + def instruction_csrrwi(self, ins: 'LoadedInstruction'): + ASSERT_LEN(ins.args, 3) + rd, imm, addr = ins.get_reg(0), ins.get_imm(1), ins.get_imm(2) + if rd != 'zero': + self.cpu.csr.assert_can_read(self.cpu.mode, addr) + old_val = self.cpu.csr.get(addr) + self.regs.set(rd, old_val) + self.cpu.csr.assert_can_write(self.cpu.mode, addr) + self.cpu.csr.set(addr, imm) + + + def instruction_csrrci(self, ins: 'LoadedInstruction'): + INS_NOT_IMPLEMENTED(ins) + + def instruction_mret(self, ins: 'LoadedInstruction'): + if self.cpu.mode != PrivModes.MACHINE: + print("MRET not inside machine level code!") + raise IllegalInstructionTrap(ins) + # retore mie + mpie = self.cpu.csr.get_mstatus('mpie') + self.cpu.csr.set_mstatus('mie', mpie) + # restore priv + mpp = self.cpu.csr.get_mstatus('mpp') + self.cpu.mode = PrivModes(mpp) + # restore pc + mepc = self.cpu.csr.get('mepc') + self.cpu.pc = mepc + + sec = self.mmu.get_sec_containing(mepc) + if sec is not None: + print(FMT_CPU + "[CPU] [{}] returning to mode {} in {}".format( + self.cpu.cycle, + PrivModes(mpp).name, + self.mmu.translate_address(mepc) + ) + FMT_NONE) + + def instruction_uret(self, ins: 'LoadedInstruction'): + raise IllegalInstructionTrap(ins) + + def instruction_sret(self, ins: 'LoadedInstruction'): + raise IllegalInstructionTrap(ins) + + def instruction_scall(self, ins: 'LoadedInstruction'): + """ + Overwrite the scall from userspace RV32I + """ + raise EcallTrap(self.cpu.mode) + + def instruction_beq(self, ins: 'LoadedInstruction'): + rs1, rs2, dst = self.parse_rs_rs_imm(ins) + if rs1 == rs2: + self.pc += dst - 4 + + def instruction_bne(self, ins: 'LoadedInstruction'): + rs1, rs2, dst = self.parse_rs_rs_imm(ins) + if rs1 != rs2: + self.pc += dst - 4 + + def instruction_blt(self, ins: 'LoadedInstruction'): + rs1, rs2, dst = self.parse_rs_rs_imm(ins) + if rs1 < rs2: + self.pc += dst - 4 + + def instruction_bge(self, ins: 'LoadedInstruction'): + rs1, rs2, dst = self.parse_rs_rs_imm(ins) + if rs1 >= rs2: + self.pc += dst - 4 + + def instruction_bltu(self, ins: 'LoadedInstruction'): + rs1, rs2, dst = self.parse_rs_rs_imm(ins, signed=False) + if rs1 < rs2: + self.pc += dst - 4 + + def instruction_bgeu(self, ins: 'LoadedInstruction'): + rs1, rs2, dst = self.parse_rs_rs_imm(ins, signed=False) + if rs1 >= rs2: + self.pc += dst - 4 + + # technically deprecated + def instruction_j(self, ins: 'LoadedInstruction'): + raise NotImplementedError("Should never be reached!") + + def instruction_jal(self, ins: 'LoadedInstruction'): + ASSERT_LEN(ins.args, 2) + reg = ins.get_reg(0) + addr = ins.get_imm(1) + self.regs.set(reg, self.pc) + self.pc += addr - 4 + + def instruction_jalr(self, ins: 'LoadedInstruction'): + ASSERT_LEN(ins.args, 3) + rd, rs, imm = self.parse_rd_rs_imm(ins) + self.regs.set(rd, self.pc) + self.pc = rs + imm - 4 + + def instruction_sbreak(self, ins: 'LoadedInstruction'): + raise LaunchDebuggerException() + + def parse_crs_ins(self, ins: 'LoadedInstruction'): + ASSERT_LEN(ins.args, 3) + return ins.get_reg(0), ins.get_reg(1), ins.get_imm(2) + + def parse_mem_ins(self, ins: 'LoadedInstruction') -> Tuple[str, int]: + ASSERT_LEN(ins.args, 3) + addr = self.get_reg_content(ins, 1) + ins.get_imm(2) + reg = ins.get_reg(0) + return reg, addr diff --git a/riscemu/priv/__init__.py b/riscemu/priv/__init__.py new file mode 100644 index 0000000..26e488f --- /dev/null +++ b/riscemu/priv/__init__.py @@ -0,0 +1,15 @@ +""" +RiscEmu (c) 2021 Anton Lydike + +SPDX-License-Identifier: MIT + +The priv Module holds everything necessary for emulating privileged risc-v assembly + + +Running priv is only preferable to the normal userspace emulator, if you actually want to emulate the whole system. + +Syscalls will have to be intercepted by your assembly code. + + +The PrivCPU Implements the Risc-V M/U Model, meaning there is machine mode and user mode. No PMP or paging is available. +""" \ No newline at end of file diff --git a/riscemu/priv/__main__.py b/riscemu/priv/__main__.py new file mode 100644 index 0000000..f9d3640 --- /dev/null +++ b/riscemu/priv/__main__.py @@ -0,0 +1,37 @@ +from .PrivCPU import PrivCPU, RunConfig +from .ImageLoader import MemoryImageMMU +from .PrivMMU import LoadedElfMMU +from .ElfLoader import ElfExecutable + +from ..Tokenizer import RiscVInput +from ..ExecutableParser import ExecutableParser + +import sys + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='RISC-V privileged architecture emulator', prog='riscemu') + + parser.add_argument('--kernel', type=str, help='Kernel elf loaded with user programs', nargs='?') + parser.add_argument('--image', type=str, help='Memory image containing kernel', nargs='?') + + parser.add_argument('-v', '--verbose', help="Verbose output", action='store_true') + + args = parser.parse_args() + mmu = None + + if args.kernel is not None: + mmu = LoadedElfMMU(ElfExecutable(args.kernel)) + elif args.image is not None: + mmu = MemoryImageMMU(args.image) + + if mmu is None: + print("You must specify one of --kernel or --image for running in privilege mode!") + sys.exit(1) + + cpu = PrivCPU(RunConfig(), mmu) + cpu.run(args.verbose) + + + diff --git a/riscemu/priv/privmodes.py b/riscemu/priv/privmodes.py new file mode 100644 index 0000000..1e58bc2 --- /dev/null +++ b/riscemu/priv/privmodes.py @@ -0,0 +1,7 @@ +from enum import IntEnum + + +class PrivModes(IntEnum): + USER = 0 + SUPER = 1 + MACHINE = 3