kernel-mode #1

Manually merged
anton merged 69 commits from kernel-mode into master 3 years ago

3
.gitignore vendored

@ -1,2 +1,3 @@
venv
__pycache__
__pycache__
.mypy_cache

@ -0,0 +1 @@
pyelftools~=0.27

@ -34,6 +34,9 @@ class CPU:
It is initialized with a configuration and a list of instruction sets.
"""
INS_XLEN = 1
def __init__(self, conf: RunConfig, instruction_sets: List[Type['riscemu.InstructionSet']]):
"""
Creates a CPU instance.
@ -44,8 +47,8 @@ class CPU:
# setup CPU states
self.pc = 0
self.cycle = 0
self.exit = False
self.exit_code = 0
self.exit: bool = False
self.exit_code: int = 0
self.conf = conf
self.active_debug = False # if a debugging session is currently runnign
@ -94,7 +97,7 @@ class CPU:
print(FMT_CPU + '[CPU] Allocated {} bytes of stack'.format(self.stack.size) + FMT_NONE)
print(FMT_CPU + '[CPU] Started running from 0x{:08X} ({})'.format(le.run_ptr, le.name) + FMT_NONE)
self.__run()
self._run()
def continue_from_debugger(self, verbose=True):
"""
@ -102,7 +105,7 @@ class CPU:
:param verbose: If True, will print each executed instruction to STDOUT
"""
self.__run(verbose)
self._run(verbose)
def step(self):
"""
@ -115,15 +118,15 @@ class CPU:
self.cycle += 1
ins = self.mmu.read_ins(self.pc)
print(FMT_CPU + " Running 0x{:08X}:{} {}".format(self.pc, FMT_NONE, ins))
self.pc += 1
self.pc += self.INS_XLEN
self.run_instruction(ins)
except LaunchDebuggerException:
print(FMT_CPU + "[CPU] Returning to debugger!" + FMT_NONE)
except RiscemuBaseException as ex:
self.pc -= 1
self.pc -= self.INS_XLEN
print(ex.message())
def __run(self, verbose=False):
def _run(self, verbose=False):
if self.pc <= 0:
return False
ins = None
@ -133,20 +136,19 @@ class CPU:
ins = self.mmu.read_ins(self.pc)
if verbose:
print(FMT_CPU + " Running 0x{:08X}:{} {}".format(self.pc, FMT_NONE, ins))
self.pc += 1
self.pc += self.INS_XLEN
self.run_instruction(ins)
except RiscemuBaseException as ex:
if not isinstance(ex, LaunchDebuggerException):
print(FMT_ERROR + "[CPU] excpetion caught at 0x{:08X}: {}:".format(self.pc - 1, ins) + FMT_NONE)
print(ex.message())
self.pc -= 1
self.pc -= self.INS_XLEN
if self.active_debug:
print(FMT_CPU + "[CPU] Returning to debugger!" + FMT_NONE)
return
if self.conf.debug_on_exception:
launch_debug_session(self, self.mmu, self.regs,
"Exception encountered, launching debug:")
launch_debug_session(self, self.mmu, self.regs, "Exception encountered, launching debug:")
if self.exit:
print()
@ -178,7 +180,8 @@ class CPU:
"""
Returns a representation of the CPU and some of its state.
"""
return "CPU(pc=0x{:08X}, cycle={}, exit={}, instructions={})".format(
return "{}(pc=0x{:08X}, cycle={}, exit={}, instructions={})".format(
self.__class__.__name__,
self.pc,
self.cycle,
self.exit,

@ -0,0 +1,22 @@
from abc import ABC, abstractmethod
class IOModule(ABC):
addr: int
size: int
def __init__(self, addr: int, size: int):
self.addr = addr
self.size = size
@abstractmethod
def read(self, addr: int, size: int):
pass
@abstractmethod
def write(self, addr: int, data: bytearray, size: int):
pass
def contains(self, addr, size: int = 0):
return self.addr <= addr < self.addr + self.size and \
self.addr <= addr + size <= self.addr + self.size

@ -0,0 +1,95 @@
from .IOModule import IOModule
from ..priv.Exceptions import InstructionAccessFault
from ..helpers import int_from_bytes
from threading import Thread
import time
def _window_loop(textIO: 'TextIO'):
#textIO.set_sg_window(None)
#return
try:
import PySimpleGUI as sg
logs = sg.Text(font="monospace")
col = sg.Column([[logs]], size=(640, 400), scrollable=True)
window = sg.Window("TextIO:{:x}".format(textIO.addr), [[col]])
lines = list()
window.finalize()
textIO.set_sg_window(window)
while True:
e, v = window.read()
if e == sg.WINDOW_CLOSED:
window.close()
textIO.set_sg_window(None)
break
if e == 'putlog':
lines.insert(0, v[0])
logs.update(value='\n'.join(lines) + '\n')
col.contents_changed()
except ImportError:
print("[TextIO] module works best with PySimpleGui!")
textIO.set_sg_window(None)
class TextIO(IOModule):
def __init__(self, addr: int, buflen: int = 128):
super(TextIO, self).__init__(addr, buflen + 4)
self.buff = bytearray(buflen)
self.current_line = ""
self.sg_window = None
self.start_buffer = list()
self.thread = Thread(target=_window_loop, args=(self,))
self.thread.start()
time.sleep(0.1)
def set_sg_window(self, window):
if self.sg_window is not None and window is not None:
raise Exception("cannot set window twice!")
self.sg_window = window
buff = self.start_buffer
self.start_buffer = None if window is None else list()
for line in buff:
self._present(line)
def read(self, addr: int, size: int) -> bytearray:
raise InstructionAccessFault(addr)
def write(self, addr: int, data: bytearray, size: int):
if addr == self.addr:
if size > 4:
raise InstructionAccessFault(addr)
if int_from_bytes(data[0:4]) > 0:
self._print()
return
buff_start = addr - self.addr - 4
self.buff[buff_start:buff_start + size] = data[0:size]
def _print(self):
buff = self.buff
self.buff = bytearray(self.size)
if b'\x00' in buff:
buff = buff.split(b'\x00')[0]
text = buff.decode('ascii')
if '\n' in text:
lines = text.split("\n")
lines[0] = self.current_line + lines[0]
for line in lines[:-1]:
self._present(line)
self.current_line = lines[-1]
else:
self.current_line += text
def _present(self, text: str):
if self.sg_window is not None:
self.sg_window.write_event_value('putlog', text)
elif self.start_buffer is not None:
self.start_buffer.append(text)
else:
print("[TextIO:{:x}] {}".format(self.addr, text))

@ -47,9 +47,11 @@ class MMU:
The global symbol table
"""
last_ins_sec: Optional[LoadedMemorySection]
def __init__(self, conf: RunConfig):
"""
Create a new MMU, respeccting the active RunConfiguration
Create a new MMU, respecting the active RunConfiguration
:param conf: The config to respect
"""
@ -58,6 +60,7 @@ class MMU:
self.first_free_addr: int = 0x100
self.conf: RunConfig = conf
self.global_symbols: Dict[str, int] = dict()
self.last_ins_sec = None
def load_bin(self, exe: Executable) -> LoadedExecutable:
"""
@ -140,7 +143,11 @@ class MMU:
:param addr: The location
:return: The Instruction
"""
sec = self.last_ins_sec
if sec is not None and sec.base <= addr < sec.base + sec.size:
return sec.read_instruction(addr - sec.base)
sec = self.get_sec_containing(addr)
self.last_ins_sec = sec
if sec is None:
print(FMT_MEM + "[MMU] Trying to read instruction form invalid region! "
"Have you forgotten an exit syscall or ret statement?" + FMT_NONE)

@ -95,15 +95,19 @@ class Registers:
:return: If the operation was successful
"""
if reg == 'zero':
print("[Registers.set] trying to set read-only register: {}".format(reg))
return False
if reg not in Registers.all_registers():
raise InvalidRegisterException(reg)
#if reg not in Registers.all_registers():
# raise InvalidRegisterException(reg)
# replace fp register with s1, as these are the same register
if reg == 'fp':
reg = 's1'
if mark_set:
self.last_set = reg
# check 32 bit signed bounds
if val < -2147483648:
val = -2147483648
elif val > 2147483647:
val = 2147483647
self.vals[reg] = val
return True
@ -114,8 +118,8 @@ class Registers:
:param mark_read: If the register should be markes as "last read" (only used internally)
:return: The contents of register reg
"""
if reg not in Registers.all_registers():
raise InvalidRegisterException(reg)
#if reg not in Registers.all_registers():
# raise InvalidRegisterException(reg)
if reg == 'fp':
reg = 's0'
if mark_read:

@ -26,3 +26,4 @@ FMT_PARSE = FMT_CYAN + FMT_BOLD
FMT_CPU = FMT_BLUE + FMT_BOLD
FMT_SYSCALL = FMT_YELLOW + FMT_BOLD
FMT_DEBUG = FMT_MAGENTA + FMT_BOLD
FMT_CSR = FMT_ORANGE + FMT_BOLD

@ -45,9 +45,7 @@ def launch_debug_session(cpu: 'CPU', mmu: 'MMU', reg: 'Registers', prompt=""):
print("Invalid arg count!")
return
bin = mmu.get_bin_containing(cpu.pc)
if bin is None:
print(FMT_DEBUG + '[Debugger] Not in a section, can\'t execute instructions!' + FMT_NONE)
return
ins = LoadedInstruction(name, list(args), bin)
print(FMT_DEBUG + "Running instruction " + ins + FMT_NONE)
cpu.run_instruction(ins)

@ -0,0 +1 @@
from .decoder import decode, RISCV_REGS

@ -0,0 +1,17 @@
if __name__ == '__main__':
import code
import readline
import rlcompleter
from .decoder import *
from .formats import *
from .instruction_table import *
from .regs import RISCV_REGS
sess_vars = globals()
sess_vars.update(locals())
readline.set_completer(rlcompleter.Completer(sess_vars).complete)
readline.set_completer(rlcompleter.Completer(sess_vars).complete)
readline.parse_and_bind("tab: complete")
code.InteractiveConsole(sess_vars).interact(banner="Interaktive decoding session started...", exitmsg="Closing...")

@ -0,0 +1,88 @@
from .instruction_table import *
from typing import Tuple, List
def print_ins(ins: int):
print(" f7 rs2 rs1 f3 rd op")
print(
f"0b{ins >> 25 :07b}_{(ins >> 20) & 0b11111:05b}_{(ins >> 15) & 0b11111:05b}_{(ins >> 12) & 0b111:03b}_{(ins >> 7) & 0b11111:05b}_{ins & 0b1111111:07b}");
STATIC_INSN: Dict[int, Tuple[str, List[int], int]] = {
0x00000013: ("nop", [], 0x00000013),
0x00008067: ("ret", [], 0x00008067),
0xfe010113: ("addi", [2, 2, -32], 0xfe010113),
0x02010113: ("addi", [2, 2, 32], 0x02010113),
0x00100073: ("ebreak", [], 0x00100073),
0x00000073: ("ecall", [], 0x00000073),
0x30200073: ("mret", [], 0x30200073),
0x00200073: ("uret", [], 0x00200073),
0x10200073: ("sret", [], 0x10200073),
0x10500073: ("wfi", [], 0x10500073),
}
def int_from_ins(insn: bytearray):
return int.from_bytes(insn, 'little')
def name_from_insn(ins: int):
opcode = op(ins)
if opcode not in RV32:
print_ins(ins)
raise RuntimeError(f"Invalid opcode: {opcode:0x} in insn {ins:x}")
dec = RV32[opcode]
if isinstance(dec, str):
return dec
fun3 = funct3(ins)
if fun3 not in dec:
print_ins(ins)
raise RuntimeError(f"Invalid funct3: {fun3:0x} in insn {ins:x}")
dec = dec[fun3]
if isinstance(dec, str):
return dec
if opcode == 0x1c and fun3 == 0:
# we have ecall/ebreak
token = imm110(ins)
if token in dec:
return dec[token]
print_ins(ins)
raise RuntimeError(f"Invalid instruction in ebreak/ecall region: {ins:x}")
fun7 = funct7(ins)
if opcode == 0b1011 and fun3 == 0b10:
# ignore the two aq/lr bits located in the fun7 block
# riscemu has no memory reordering, therefore we don't need to look at these bits ever
fun7 = fun7 >> 2
if fun7 in dec:
if opcode == 0x0C or (opcode == 0x04 and fun3 == 5):
dec = dec[fun7]
return dec
print("unknown instruction?!")
return dec[fun7]
print_ins(ins)
raise RuntimeError(f"Invalid instruction: {ins:x}")
def decode(ins: Union[bytearray, bytes]) -> Tuple[str, List[int], int]:
insn = int_from_ins(ins)
if insn & 3 != 3:
print_ins(insn)
raise RuntimeError("Not a RV32 instruction!")
if insn in STATIC_INSN:
return STATIC_INSN[insn]
opcode = op(insn)
if opcode not in INSTRUCTION_ARGS_DECODER:
print_ins(insn)
raise RuntimeError("No instruction decoder found for instruction")
return name_from_insn(insn), INSTRUCTION_ARGS_DECODER[opcode](insn), insn

@ -0,0 +1,117 @@
from typing import Dict, Callable, List, Union
from .regs import RISCV_REGS
def op(ins: int):
return (ins >> 2) & 31
def rd(ins: int):
return (ins >> 7) & 31
def funct3(ins: int):
return (ins >> 12) & 7
def rs1(ins: int):
return (ins >> 15) & 31
def rs2(ins: int):
return (ins >> 20) & 31
def funct7(ins: int):
return ins >> 25
def imm110(ins: int):
return ins >> 20
def imm3112(ins: int):
return ins >> 12
def imm_i(ins: int):
return sign_extend(imm110(ins), 12)
def imm_s(ins: int):
num = (funct7(ins) << 5) + rd(ins)
return sign_extend(num, 12)
def imm_b(ins: int):
lower = rd(ins)
higher = funct7(ins)
num = (lower & 0b11110) + ((higher & 0b0111111) << 5) + ((lower & 1) << 11) + ((higher >> 6) << 12)
return sign_extend(num, 13)
def imm_u(ins: int):
return sign_extend(imm3112(ins), 20)
def imm_j(ins: int):
return sign_extend(
(((ins >> 21) & 0b1111111111) << 1) +
(((ins >> 20) & 1) << 11) +
(((ins >> 12) & 0b11111111) << 12) +
(((ins >> 31) & 1) << 20), 21
)
def sign_extend(num, bits):
sign_mask = 1 << (bits - 1)
return (num & (sign_mask - 1)) - (num & sign_mask)
def decode_i(ins: int) -> List[int]:
return [rd(ins), rs1(ins), imm_i(ins)]
def decode_b(ins: int) -> List[int]:
return [rs1(ins), rs2(ins), imm_b(ins)]
def decode_u(ins: int) -> List[int]:
return [rd(ins), imm_u(ins)]
def decode_r(ins: int) -> List[int]:
return [rd(ins), rs1(ins), rs2(ins)]
def decode_s(ins: int) -> List[int]:
return [rs2(ins), rs1(ins), imm_s(ins)]
def decode_j(ins: int) -> List[int]:
return [rd(ins), imm_j(ins)]
def decode_i_shamt(ins: int) -> List[int]:
if funct3(ins) in (1, 5):
return [rd(ins), rs1(ins), rs2(ins)]
return decode_i(ins)
def decode_i_unsigned(ins: int) -> List[int]:
return [rd(ins), rs1(ins), imm110(ins)]
INSTRUCTION_ARGS_DECODER: Dict[int, Callable[[int], List[int]]] = {
0x00: decode_i,
0x04: decode_i_shamt,
0x05: decode_u,
0x08: decode_s,
0x0C: decode_r,
0x0D: decode_u,
0x18: decode_b,
0x19: decode_i,
0x1b: decode_j,
0x1c: decode_i_unsigned,
0b1011: decode_r
}

@ -0,0 +1,81 @@
from collections import defaultdict
from .formats import *
tbl = lambda: defaultdict(tbl)
RV32 = tbl()
RV32[0x1b] = "jal"
RV32[0x0D] = "lui"
RV32[0x05] = "auipc"
RV32[0x19][0] = "jalr"
RV32[0x04][0] = "addi"
RV32[0x04][1] = "slli"
RV32[0x04][2] = "slti"
RV32[0x04][3] = "sltiu"
RV32[0x04][4] = "xori"
RV32[0x04][5][0x00] = "srli"
RV32[0x04][5][0x20] = "srai"
RV32[0x04][6] = "ori"
RV32[0x04][7] = "andi"
RV32[0x18][0] = "beq"
RV32[0x18][1] = "bne"
RV32[0x18][4] = "blt"
RV32[0x18][5] = "bge"
RV32[0x18][6] = "bltu"
RV32[0x18][7] = "bgeu"
RV32[0x00][0] = "lb"
RV32[0x00][1] = "lh"
RV32[0x00][2] = "lw"
RV32[0x00][4] = "lbu"
RV32[0x00][5] = "lhu"
RV32[0x08][0] = "sb"
RV32[0x08][1] = "sh"
RV32[0x08][2] = "sw"
RV32[0x1c][1] = "csrrw"
RV32[0x1c][2] = "csrrs"
RV32[0x1c][3] = "csrrc"
RV32[0x1c][5] = "csrrwi"
RV32[0x1c][6] = "csrrsi"
RV32[0x1c][7] = "csrrci"
RV32[0x1c][0][0] = "ecall"
RV32[0x1c][0][1] = "ebreak"
RV32[0x0C][0][0] = "add"
RV32[0x0C][0][32] = "sub"
RV32[0x0C][1][0] = "sll"
RV32[0x0C][2][0] = "slt"
RV32[0x0C][3][0] = "sltu"
RV32[0x0C][4][0] = "xor"
RV32[0x0C][5][0] = "srl"
RV32[0x0C][5][32] = "sra"
RV32[0x0C][6][0] = "or"
RV32[0x0C][7][0] = "and"
# rv32m
RV32[0x0C][0][1] = "mul"
RV32[0x0C][1][1] = "mulh"
RV32[0x0C][2][1] = "mulhsu"
RV32[0x0C][3][1] = "mulhu"
RV32[0x0C][4][1] = "div"
RV32[0x0C][5][1] = "divu"
RV32[0x0C][6][1] = "rem"
RV32[0x0C][7][1] = "remu"
# rv32a
RV32[0b1011][0b10][0b00010] = "lr.w"
RV32[0b1011][0b10][0b00011] = "sc.w"
RV32[0b1011][0b10][0b00001] = "amoswap.w"
RV32[0b1011][0b10][0b00000] = "amoadd.w"
RV32[0b1011][0b10][0b00100] = "amoxor.w"
RV32[0b1011][0b10][0b01100] = "amoand.w"
RV32[0b1011][0b10][0b01000] = "amoor.w"
RV32[0b1011][0b10][0b10000] = "amomin.w"
RV32[0b1011][0b10][0b10100] = "amomax.w"
RV32[0b1011][0b10][0b11000] = "amominu.w"
RV32[0b1011][0b10][0b11100] = "amomaxu.w"

@ -0,0 +1,6 @@
RISCV_REGS = [
'zero', 'ra', 'sp', 'gp', 'tp', 't0', 't1', 't2',
's0', 's1', 'a0', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7',
's2', 's3', 's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11',
't3', 't4', 't5', 't6'
]

@ -33,19 +33,14 @@ def int_to_bytes(val, bytes=4, unsigned=False) -> bytearray:
"""
if unsigned and val < 0:
raise NumberFormatException("unsigned negative number!")
return bytearray([
(val >> ((bytes - i - 1) * 8)) & 0xFF for i in range(bytes)
])
return bytearray(to_unsigned(val, bytes).to_bytes(bytes, 'little'))
def int_from_bytes(bytes, unsigned=False) -> int:
"""
byte -> int (two's complement)
"""
num = 0
for b in bytes:
num = num << 8
num += b
num = int.from_bytes(bytes, 'little')
if unsigned:
return num
@ -55,7 +50,7 @@ def int_from_bytes(bytes, unsigned=False) -> int:
def to_unsigned(num: int, bytes=4) -> int:
if num < 0:
return 2 ** (bytes * 8) + num
return (2 ** (bytes * 8)) + num
return num

@ -29,8 +29,6 @@ class InstructionSet(ABC):
"""
self.name = self.__class__.__name__
self.cpu = cpu
self.mmu = cpu.mmu
self.regs = cpu.regs
def load(self) -> Dict[str, Callable[['LoadedInstruction'], None]]:
"""
@ -132,6 +130,14 @@ class InstructionSet(ABC):
def pc(self, val):
self.cpu.pc = val
@property
def mmu(self):
return self.cpu.mmu
@property
def regs(self):
return self.cpu.regs
def __repr__(self):
return "InstructionSet[{}] with {} instructions".format(
self.__class__.__name__,

@ -0,0 +1,78 @@
from .InstructionSet import InstructionSet, LoadedInstruction
from ..Exceptions import INS_NOT_IMPLEMENTED
from ..helpers import int_from_bytes, int_to_bytes, to_unsigned, to_signed
class RV32A(InstructionSet):
"""
The RV32A instruction set. Currently, load-reserved and store conditionally are not supported
due to limitations in the way the MMU is implemented. Maybe a later implementation will add support
for this?
"""
def instruction_lr_w(self, ins: 'LoadedInstruction'):
INS_NOT_IMPLEMENTED(ins)
def instruction_sc_w(self, ins: 'LoadedInstruction'):
INS_NOT_IMPLEMENTED(ins)
def instruction_amoswap_w(self, ins: 'LoadedInstruction'):
dest, addr, val = self.parse_rd_rs_rs(ins)
if dest == 'zero':
self.mmu.write(addr, int_to_bytes(addr, 4))
else:
old = int_from_bytes(self.mmu.read(addr, 4))
self.mmu.write(addr, int_to_bytes(val, 4))
self.regs.set(dest, old)
def instruction_amoadd_w(self, ins: 'LoadedInstruction'):
dest, addr, val = self.parse_rd_rs_rs(ins)
old = int_from_bytes(self.mmu.read(addr, 4))
self.mmu.write(addr, int_to_bytes(old + val, 4))
self.regs.set(dest, old)
def instruction_amoand_w(self, ins: 'LoadedInstruction'):
dest, addr, val = self.parse_rd_rs_rs(ins)
old = int_from_bytes(self.mmu.read(addr, 4))
self.mmu.write(addr, int_to_bytes(old & val, 4))
self.regs.set(dest, old)
def instruction_amoor_w(self, ins: 'LoadedInstruction'):
dest, addr, val = self.parse_rd_rs_rs(ins)
old = int_from_bytes(self.mmu.read(addr, 4))
self.mmu.write(addr, int_to_bytes(old | val, 4))
self.regs.set(dest, old)
def instruction_amoxor_w(self, ins: 'LoadedInstruction'):
dest, addr, val = self.parse_rd_rs_rs(ins)
old = int_from_bytes(self.mmu.read(addr, 4))
self.mmu.write(addr, int_to_bytes(old ^ val, 4))
self.regs.set(dest, old)
def instruction_amomax_w(self, ins: 'LoadedInstruction'):
dest, addr, val = self.parse_rd_rs_rs(ins)
old = int_from_bytes(self.mmu.read(addr, 4))
self.mmu.write(addr, int_to_bytes(max(old, val), 4))
self.regs.set(dest, old)
def instruction_amomaxu_w(self, ins: 'LoadedInstruction'):
dest, addr, val = self.parse_rd_rs_rs(ins)
val = to_unsigned(val)
old = int_from_bytes(self.mmu.read(addr, 4), unsigned=True)
self.mmu.write(addr, int_to_bytes(to_signed(max(old, val)), 4))
self.regs.set(dest, old)
def instruction_amomin_w(self, ins: 'LoadedInstruction'):
dest, addr, val = self.parse_rd_rs_rs(ins)
old = int_from_bytes(self.mmu.read(addr, 4))
self.mmu.write(addr, int_to_bytes(min(old, val), 4))
self.regs.set(dest, old)
def instruction_amominu_w(self, ins: 'LoadedInstruction'):
dest, addr, val = self.parse_rd_rs_rs(ins)
val = to_unsigned(val)
old = int_from_bytes(self.mmu.read(addr, 4), unsigned=True)
self.mmu.write(addr, int_to_bytes(to_signed(min(old, val)), 4))
self.regs.set(dest, old)

@ -154,8 +154,7 @@ class RV32I(InstructionSet):
ASSERT_LEN(ins.args, 2)
reg = ins.get_reg(0)
imm = to_unsigned(ins.get_imm(1))
self.pc += (imm << 12)
self.regs.set(reg, self.pc)
self.regs.set(reg, self.pc + (imm << 12))
def instruction_xor(self, ins: 'LoadedInstruction'):
rd, rs1, rs2 = self.parse_rd_rs_rs(ins)

@ -9,7 +9,8 @@ This package holds all instruction sets, available to the processor
from .InstructionSet import InstructionSet
from .RV32M import RV32M
from .RV32I import RV32I
from .RV32A import RV32A
InstructionSetDict = {
v.__name__: v for v in [RV32I, RV32M]
v.__name__: v for v in [RV32I, RV32M, RV32A]
}

@ -0,0 +1,138 @@
from typing import Dict, Union, Callable, Optional
from collections import defaultdict
from .privmodes import PrivModes
from .Exceptions import InstructionAccessFault
from ..helpers import to_signed
from ..colors import FMT_CSR, FMT_NONE
from .CSRConsts import CSR_NAME_TO_ADDR, MSTATUS_LEN_2, MSTATUS_OFFSETS
class CSR:
"""
This holds all Control and Status Registers (CSR)
"""
regs: Dict[int, int]
"""
All Control and Status Registers are stored here
"""
virtual_regs: Dict[int, Callable[[], int]]
"""
list of virtual CSR registers, with values computed on read
"""
listeners: Dict[int, Callable[[int, int], None]]
mstatus_cache: Dict[str, int]
mstatus_cache_dirty = True
def __init__(self):
self.regs = defaultdict(lambda: 0)
self.listeners = defaultdict(lambda: (lambda x, y: None))
self.virtual_regs = dict()
self.mstatus_cache = dict()
# TODO: implement write masks (bitmasks which control writeable bits in registers
def set(self, addr: Union[str, int], val: int):
addr = self._name_to_addr(addr)
if addr is None:
return
val = to_signed(val)
self.listeners[addr](self.regs[addr], val)
if addr == 0x300:
self.mstatus_cache_dirty = True
self.regs[addr] = val
def get(self, addr: Union[str, int]) -> int:
addr = self._name_to_addr(addr)
if addr is None:
raise RuntimeError(f"Invalid CSR name: {addr}!")
if addr in self.virtual_regs:
return self.virtual_regs[addr]()
return self.regs[addr]
def set_listener(self, addr: Union[str, int], listener: Callable[[int, int], None]):
addr = self._name_to_addr(addr)
if addr is None:
print("unknown csr address name: {}".format(addr))
return
self.listeners[addr] = listener
# mstatus properties
def set_mstatus(self, name: str, val: int):
"""
Set mstatus bits using this helper. mstatus is a 32 bit register, holding various machine status flags
Setting them by hand is super painful, so this helper allows you to set specific bits.
Please make sure your supplied value has the correct width!
:param name:
:param val:
:return:
"""
size = 2 if name in MSTATUS_LEN_2 else 1
off = MSTATUS_OFFSETS[name]
mask = (2 ** size - 1) << off
old_val = self.get('mstatus')
erased = old_val & (~mask)
new_val = erased | (val << off)
self.set('mstatus', new_val)
def get_mstatus(self, name) -> int:
if not self.mstatus_cache_dirty and name in self.mstatus_cache:
return self.mstatus_cache[name]
size = 2 if name in MSTATUS_LEN_2 else 1
off = MSTATUS_OFFSETS[name]
mask = (2 ** size - 1) << off
val = (self.get('mstatus') & mask) >> off
if self.mstatus_cache_dirty:
self.mstatus_cache = dict(name=val)
else:
self.mstatus_cache[name] = val
return val
def callback(self, addr: Union[str, int]):
def inner(func: Callable[[int, int], None]):
self.set_listener(addr, func)
return func
return inner
def assert_can_read(self, mode: PrivModes, addr: int):
if (addr >> 8) & 3 > mode.value:
raise InstructionAccessFault(addr)
def assert_can_write(self, mode: PrivModes, addr: int):
if (addr >> 8) & 3 > mode.value or addr >> 10 == 11:
raise InstructionAccessFault(addr)
def _name_to_addr(self, addr: Union[str, int]) -> Optional[int]:
if isinstance(addr, str):
if addr not in CSR_NAME_TO_ADDR:
print("Unknown CSR register {}".format(addr))
return None
return CSR_NAME_TO_ADDR[addr]
return addr
def virtual_register(self, addr: Union[str, int]):
addr = self._name_to_addr(addr)
if addr is None:
print("unknown csr address name: {}".format(addr))
def inner(func: Callable[[], int]):
self.virtual_regs[addr] = func
return func
return inner
def dump_mstatus(self):
print(FMT_CSR + "[CSR] dumping mstatus:")
i = 0
for name in MSTATUS_OFFSETS:
print(" {:<5} {}".format(name, self.get_mstatus(name)), end="")
if i % 6 == 5:
print()
i += 1
print(FMT_NONE)

@ -0,0 +1,81 @@
from typing import Dict, Tuple
MCAUSE_TRANSLATION: Dict[Tuple[int, int], str]= {
(1, 0): 'User software interrupt',
(1, 1): 'Supervisor software interrupt',
(1, 3): 'Machine software interrupt',
(1, 4): 'User timer interrupt',
(1, 5): 'Supervisor timer interrupt',
(1, 7): 'Machine timer interrupt',
(1, 8): 'User external interrupt',
(1, 9): 'Supervisor external interrupt',
(1, 11): 'Machine external interrupt',
(0, 0): 'Instruction address misaligned',
(0, 1): 'Instruction access fault',
(0, 2): 'Illegal instruction',
(0, 3): 'Breakpoint',
(0, 4): 'Load address misaligned',
(0, 5): 'Load access fault',
(0, 6): 'Store/AMO address misaligned',
(0, 7): 'Store/AMO access fault',
(0, 8): 'environment call from user mode',
(0, 9): 'environment call from supervisor mode',
(0, 11): 'environment call from machine mode',
(0, 12): 'Instruction page fault',
(0, 13): 'Load page fault',
(0, 15): 'Store/AMO page fault',
}
"""
Assigns tuple (interrupt bit, exception code) to their respective readable names
"""
MSTATUS_OFFSETS: Dict[str, int] = {
'uie': 0,
'sie': 1,
'mie': 3,
'upie': 4,
'spie': 5,
'mpie': 7,
'spp': 8,
'mpp': 11,
'fs': 13,
'xs': 15,
'mpriv': 17,
'sum': 18,
'mxr': 19,
'tvm': 20,
'tw': 21,
'tsr': 22,
'sd': 31
}
"""
Offsets for all mstatus bits
"""
MSTATUS_LEN_2 = ('mpp', 'fs', 'xs')
"""
All mstatus parts that have length 2. All other mstatus parts have length 1
"""
CSR_NAME_TO_ADDR: Dict[str, int] = {
'mstatus': 0x300,
'misa': 0x301,
'mie': 0x304,
'mtvec': 0x305,
'mepc': 0x341,
'mcause': 0x342,
'mtval': 0x343,
'mip': 0x344,
'mvendorid': 0xF11,
'marchid': 0xF12,
'mimpid': 0xF13,
'mhartid': 0xF14,
'time': 0xc01,
'timeh': 0xc81,
'halt': 0x789,
'mtimecmp': 0x780,
'mtimecmph': 0x781,
}
"""
Translation for named registers
"""

@ -0,0 +1,158 @@
from dataclasses import dataclass
from typing import List, Dict, Tuple
from .Exceptions import *
from ..Exceptions import RiscemuBaseException
from ..Executable import MemoryFlags, LoadedMemorySection
from ..decoder import decode, RISCV_REGS
from ..helpers import FMT_PARSE, FMT_NONE, FMT_GREEN, FMT_BOLD
FMT_ELF = FMT_GREEN + FMT_BOLD
if typing.TYPE_CHECKING:
from elftools.elf.elffile import ELFFile
from elftools.elf.sections import Section, SymbolTableSection
# This requires pyelftools package!
INCLUDE_SEC = ('.text', '.stack', '.bss', '.sdata', '.sbss')
class ElfExecutable:
sections: List['ElfLoadedMemorySection']
sections_by_name: Dict[str, 'ElfLoadedMemorySection']
symbols: Dict[str, int]
run_ptr: int
def __init__(self, name: str):
self.sections = list()
self.sections_by_name = dict()
self.symbols = dict()
try:
from elftools.elf.elffile import ELFFile
from elftools.elf.sections import Section, SymbolTableSection
with open(name, 'rb') as f:
print(FMT_ELF + "[ElfLoader] Loading elf executable from: {}".format(name) + FMT_NONE)
self._read_elf(ELFFile(f))
except ImportError as e:
print(FMT_PARSE + "[ElfLoader] Cannot load elf files without PyElfTools package! You can install them using pip install pyelftools!" + FMT_NONE)
raise e
def _read_elf(self, elf: 'ELFFile'):
if not elf.header.e_machine == 'EM_RISCV':
raise InvalidElfException("Not a RISC-V elf file!")
if not elf.header.e_ident.EI_CLASS == 'ELFCLASS32':
raise InvalidElfException("Only 32bit executables are supported!")
self.run_ptr = elf.header.e_entry
from elftools.elf.sections import SymbolTableSection
for sec in elf.iter_sections():
if isinstance(sec, SymbolTableSection):
self._parse_symtab(sec)
continue
if sec.name not in INCLUDE_SEC:
continue
self.add_sec(self._lms_from_elf_sec(sec, 'kernel'))
def _lms_from_elf_sec(self, sec: 'Section', owner: str):
is_code = sec.name in ('.text',)
data = bytearray(sec.data())
flags = MemoryFlags(is_code, is_code)
print(FMT_ELF + "[ElfLoader] Section {} at: {:X}".format(sec.name, sec.header.sh_addr) + FMT_NONE)
return ElfLoadedMemorySection(
sec.name,
sec.header.sh_addr,
sec.data_size,
data,
flags,
owner
)
def _parse_symtab(self, symtab: 'SymbolTableSection'):
self.symbols = {
sym.name: sym.entry.st_value for sym in symtab.iter_symbols() if sym.name
}
def add_sec(self, new_sec: 'ElfLoadedMemorySection'):
for sec in self.sections:
if sec.base < sec.end <= new_sec.base or sec.end > sec.base >= new_sec.end:
continue
else:
print(FMT_ELF + "[ElfLoader] Invalid elf layout: Two sections overlap: \n\t{}\n\t{}".format(
sec, new_sec
) + FMT_NONE)
raise RuntimeError("Cannot load elf with overlapping sections!")
self.sections.append(new_sec)
self.sections_by_name[new_sec.name] = new_sec
class InvalidElfException(RiscemuBaseException):
def __init__(self, msg: str):
super().__init__()
self.msg = msg
def message(self):
return FMT_PARSE + "{}(\"{}\")".format(self.__class__.__name__, self.msg) + FMT_NONE
@dataclass(frozen=True)
class ElfInstruction:
name: str
args: List[int]
encoded: int
def get_imm(self, num: int) -> int:
return self.args[num]
def get_imm_reg(self, num: int) -> Tuple[int, int]:
return self.args[-1], self.args[-2]
def get_reg(self, num: int) -> str:
return RISCV_REGS[self.args[num]]
def __repr__(self) -> str:
if self.name == 'jal' and self.args[0] == 0:
return "j {}".format(self.args[1])
elif self.name in ('lw', 'lh', 'lb', 'lbu', 'lhu', 'sw', 'sh', 'sb'):
args = "{}, {}({})".format(
RISCV_REGS[self.args[0]], self.args[2], RISCV_REGS[self.args[1]]
)
else:
args = ", ".join(map(str, self.args))
return "{:<8} {}".format(
self.name,
args
)
class ElfLoadedMemorySection(LoadedMemorySection):
ins_cache: List[Optional[ElfInstruction]]
"""
A fast cache for accessing pre-decoded instructions
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__setattr__('ins_cache', [None] * (self.size // 4))
def read_instruction(self, offset):
if self.ins_cache[offset//4] is not None:
return self.ins_cache[offset//4]
if not self.flags.executable:
print(FMT_PARSE + "Reading instruction from non-executable memory!" + FMT_NONE)
raise InstructionAccessFault(offset + self.base)
if offset % 4 != 0:
raise InstructionAddressMisalignedTrap(offset + self.base)
ins = ElfInstruction(*decode(self.content[offset:offset + 4]))
self.ins_cache[offset // 4] = ins
return ins
@property
def end(self):
return self.size + self.base

@ -0,0 +1,91 @@
from typing import Optional, NewType
from enum import Enum
from .privmodes import PrivModes
from .CSRConsts import MCAUSE_TRANSLATION
import typing
if typing.TYPE_CHECKING:
from .ElfLoader import ElfInstruction
class CpuTrapType(Enum):
TIMER = 1
SOFTWARE = 2
EXTERNAL = 3
EXCEPTION = 4
class CpuTrap(BaseException):
code: int
"""
31-bit value encoding the exception code in the mstatus register
"""
interrupt: int
"""
The isInterrupt bit in the mstatus register
"""
mtval: int
"""
contents of the mtval register
"""
type: CpuTrapType
"""
The type (timer, external, software) of the trap
"""
priv: PrivModes
"""
The privilege level this trap targets
"""
def __init__(self, code: int, mtval, type: CpuTrapType, priv: PrivModes = PrivModes.MACHINE):
self.interrupt = 0 if type == CpuTrapType.EXCEPTION else 1
self.code = code
self.mtval = mtval
self.priv = priv
self.type = type
@property
def mcause(self):
return (self.interrupt << 31) + self.code
def __repr__(self):
name = "Reserved interrupt({}, {})".format(self.interrupt, self.code)
if (self.interrupt, self.code) in MCAUSE_TRANSLATION:
name = MCAUSE_TRANSLATION[(self.interrupt, self.code)] + "({}, {})".format(self.interrupt, self.code)
return "{} {{priv={}, type={}, mtval={:x}}}".format(
name, self.priv.name, self.type.name, self.mtval
)
def __str__(self):
return self.__repr__()
class IllegalInstructionTrap(CpuTrap):
def __init__(self, ins: 'ElfInstruction'):
super().__init__(2, ins.encoded, CpuTrapType.EXCEPTION)
class InstructionAddressMisalignedTrap(CpuTrap):
def __init__(self, addr: int):
super().__init__(0, addr, CpuTrapType.EXCEPTION)
class InstructionAccessFault(CpuTrap):
def __init__(self, addr: int):
super().__init__(1, addr, CpuTrapType.EXCEPTION)
class TimerInterrupt(CpuTrap):
def __init__(self):
super().__init__(7, 0, CpuTrapType.TIMER)
class EcallTrap(CpuTrap):
def __init__(self, mode: PrivModes):
super().__init__(mode.value + 8, 0, CpuTrapType.EXCEPTION)

@ -0,0 +1,119 @@
"""
Laods a memory image with debug information into memory
"""
from .PrivMMU import PrivMMU
from ..Config import RunConfig
from ..Executable import Executable, LoadedExecutable, LoadedMemorySection, LoadedInstruction, MemoryFlags
from .ElfLoader import ElfInstruction, ElfLoadedMemorySection, InstructionAccessFault, InstructionAddressMisalignedTrap
from ..decoder import decode
from ..IO.IOModule import IOModule
from .privmodes import PrivModes
from ..colors import FMT_ERROR, FMT_NONE
import json
from functools import lru_cache
from typing import Dict, List, Tuple, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from .PrivCPU import PrivCPU
class MemoryImageMMU(PrivMMU):
io: List[IOModule]
data: bytearray
io_start: int
debug_info: Dict[str, Dict[str, Dict[str, str]]]
def __init__(self, file_name: str, io_start: int = 0xFF0000):
super(MemoryImageMMU, self).__init__(conf=RunConfig())
with open(file_name, 'rb') as memf:
data = memf.read()
with open(file_name + '.dbg', 'r') as dbgf:
debug_info: Dict = json.load(dbgf)
self.data = bytearray(data)
# TODO: super wasteful memory allocation happening here
if len(data) < io_start:
self.data += bytearray(io_start - len(data))
self.debug_info = debug_info
self.io_start = io_start
self.io = list()
def get_entrypoint(self):
try:
start = self.debug_info['symbols']['kernel'].get('_start', None)
if start is not None:
return start
return self.debug_info['symbols']['kernel'].get('_ftext')
except KeyError:
print(FMT_ERROR + '[MMU] cannot find kernel entry in debug information! Falling back to 0x100' + FMT_NONE)
return 0x100
@lru_cache(maxsize=2048)
def read_ins(self, addr: int) -> ElfInstruction:
if addr >= self.io_start:
raise InstructionAccessFault(addr)
if addr % 4 != 0:
raise InstructionAddressMisalignedTrap(addr)
return ElfInstruction(*decode(self.data[addr:addr + 4]))
def read(self, addr: int, size: int) -> bytearray:
if addr < 0x100:
pc = self.cpu.pc
text_sec = self.get_sec_containing(pc)
print(FMT_ERROR + "[MMU] possible null dereference (read {:x}) from (pc={:x},sec={},rel={:x})".format(
addr, pc, text_sec.owner + ':' + text_sec.name, pc - text_sec.base
) + FMT_NONE)
if addr >= self.io_start:
return self.io_at(addr).read(addr, size)
return self.data[addr: addr + size]
def write(self, addr: int, size: int, data):
if addr < 0x100:
pc = self.cpu.pc
text_sec = self.get_sec_containing(pc)
print(FMT_ERROR + "[MMU] possible null dereference (write {:x}) from (pc={:x},sec={},rel={:x})".format(
addr, pc, text_sec.owner + ':' + text_sec.name, pc - text_sec.base
) + FMT_NONE)
if addr >= self.io_start:
return self.io_at(addr).write(addr, data, size)
self.data[addr:addr + size] = data[0:size]
def io_at(self, addr) -> IOModule:
for mod in self.io:
if mod.contains(addr):
return mod
raise InstructionAccessFault(addr)
def add_io(self, io: IOModule):
self.io.append(io)
def __repr__(self):
return "MemoryImageMMU()"
@lru_cache(maxsize=32)
def get_sec_containing(self, addr: int) -> Optional[LoadedMemorySection]:
next_sec = len(self.data)
for sec_addr, name in reversed(self.debug_info['sections'].items()):
if addr >= int(sec_addr):
owner, name = name.split(':')
base = int(sec_addr)
size = next_sec - base
flags = MemoryFlags('.text' in name, '.text' in name)
return ElfLoadedMemorySection(name, base, size, self.data[base:next_sec], flags, owner)
else:
next_sec = int(sec_addr)
def translate_address(self, addr: int):
sec = self.get_sec_containing(addr)
if sec.name == '.empty':
return "<empty>"
symbs = self.debug_info['symbols'][sec.owner]
for sym, val in reversed(symbs.items()):
if addr >= val:
return "{}{:+x} ({}:{})".format(sym, addr - val, sec.owner, sec.name)
return "{}:{}{:+x}".format(sec.owner, sec.name, addr - sec.base)

@ -0,0 +1,244 @@
"""
RiscEmu (c) 2021 Anton Lydike
SPDX-License-Identifier: MIT
"""
import time
from riscemu.CPU import *
from .CSR import CSR
from .ElfLoader import ElfExecutable
from .ImageLoader import MemoryImageMMU
from .Exceptions import *
from .PrivMMU import PrivMMU
from ..IO import TextIO
from .PrivRV32I import PrivRV32I
from .privmodes import PrivModes
from ..instructions import RV32A, RV32M
import json
if typing.TYPE_CHECKING:
from riscemu import Executable, LoadedExecutable, LoadedInstruction
from riscemu.instructions.InstructionSet import InstructionSet
class PrivCPU(CPU):
"""
This is a CPU that has different modes, instruction sets and registers.
It should support M and U Mode, but no U-Mode Traps.
This allows us to
"""
csr: CSR
"""
Reference to the control and status registers
"""
TIME_RESOLUTION_NS: int = 1000000
"""
controls the resolution of the time csr register (in nanoseconds)
"""
INS_XLEN = 4
"""
Size of an instruction in memory. Should be 4, but since our loading code is shit, instruction take up
the equivalent of "1 byte" (this is actually impossible)
"""
def __init__(self, conf, mmu: PrivMMU):
super().__init__(conf, [PrivRV32I, RV32M, RV32A])
self.mode: PrivModes = PrivModes.MACHINE
mmu.set_cpu(self)
self.pc = mmu.get_entrypoint()
self.mmu = mmu
if hasattr(self.mmu, 'add_io'):
self.mmu.add_io(TextIO.TextIO(0xff0000, 64))
self.syscall_int = None
self.launch_debug = False
self.pending_traps: List[CpuTrap] = list()
self._time_start = 0
self._time_timecmp = 0
self._time_interrupt_enabled = False
# performance counters
self._perf_counters = list()
# init csr
self._init_csr()
def _run(self, verbose=False):
if self.pc <= 0:
return False
ins = None
try:
while not self.exit:
self.step(verbose)
except RiscemuBaseException as ex:
if isinstance(ex, LaunchDebuggerException):
self.launch_debug = True
self.pc += self.INS_XLEN
else:
print(FMT_ERROR + "[CPU] exception caught at 0x{:08X}: {}:".format(self.pc - 1, ins) + FMT_NONE)
print(ex.message())
if self.conf.debug_on_exception:
self.launch_debug = True
if self.exit:
print()
print(FMT_CPU + "Program exited with code {}".format(self.exit_code) + FMT_NONE)
sys.exit(self.exit_code)
elif self.launch_debug:
self.launch_debug = False
launch_debug_session(self, self.mmu, self.regs,
"Launching debugger:")
if not self.active_debug:
self._run(verbose)
else:
print()
print(FMT_CPU + "Program stopped without exiting - perhaps you stopped the debugger?" + FMT_NONE)
def load(self, e: riscemu.Executable):
raise NotImplementedError("Not supported!")
def run_loaded(self, le: 'riscemu.LoadedExecutable'):
raise NotImplementedError("Not supported!")
def get_tokenizer(self, tokenizer_input):
raise NotImplementedError("Not supported!")
def run(self, verbose: bool = False):
print(FMT_CPU + '[CPU] Started running from 0x{:08X} ({})'.format(self.pc, "kernel") + FMT_NONE)
self._time_start = time.perf_counter_ns() // self.TIME_RESOLUTION_NS
self._run(verbose)
def _init_csr(self):
# set up CSR
self.csr = CSR()
self.csr.set('mhartid', 0) # core id
# TODO: set correct value
self.csr.set('mimpid', 0) # implementation id
# set mxl to 1 (32 bit) and set bits for i and m isa
self.csr.set('misa', (1 << 30) + (1 << 8) + (1 << 12)) # available ISA
# CSR write callbacks:
@self.csr.callback('halt')
def halt(old: int, new: int):
if new != 0:
self.exit = True
self.exit_code = new
@self.csr.callback('mstatus')
def mstatus(old: int, new: int):
pass
@self.csr.callback('mtimecmp')
def mtimecmp(old, new):
self._time_timecmp = (self.csr.get('mtimecmph') << 32) + new
self._time_interrupt_enabled = True
@self.csr.callback('mtimecmph')
def mtimecmph(old, new):
self._time_timecmp = (new << 32) + self.csr.get('mtimecmp')
self._time_interrupt_enabled = True
# virtual CSR registers:
@self.csr.virtual_register('time')
def get_time():
return (time.perf_counter_ns() // self.TIME_RESOLUTION_NS - self._time_start) & (2 ** 32 - 1)
@self.csr.virtual_register('timeh')
def get_timeh():
return (time.perf_counter_ns() // self.TIME_RESOLUTION_NS - self._time_start) >> 32
# add minstret and mcycle counters
def _handle_trap(self, trap: CpuTrap):
# implement trap handling!
self.pending_traps.append(trap)
def step(self, verbose=True):
try:
self.cycle += 1
if self.cycle % 20 == 0:
self._timer_step()
self._check_interrupt()
ins = self.mmu.read_ins(self.pc)
if verbose and self.mode == PrivModes.USER:
print(FMT_CPU + " Running 0x{:08X}:{} {}".format(self.pc, FMT_NONE, ins))
self.run_instruction(ins)
self.pc += self.INS_XLEN
except CpuTrap as trap:
self._handle_trap(trap)
def _timer_step(self):
if not self._time_interrupt_enabled:
return
if self._time_timecmp <= (time.perf_counter_ns() // self.TIME_RESOLUTION_NS) - self._time_start:
self.pending_traps.append(TimerInterrupt())
self._time_interrupt_enabled = False
def _check_interrupt(self):
if not (len(self.pending_traps) > 0 and self.csr.get_mstatus('mie')):
return
# select best interrupt
# TODO: actually select based on the official ranking
trap = self.pending_traps.pop() # use the most recent trap
if not isinstance(trap, TimerInterrupt) or True:
print(FMT_CPU + "[CPU] [{}] taking trap {}!".format(self.cycle, trap) + FMT_NONE)
if trap.priv != PrivModes.MACHINE:
print(FMT_CPU + "[CPU] Trap not targeting machine mode encountered! - undefined behaviour!" + FMT_NONE)
raise Exception("Undefined behaviour!")
if self.mode != PrivModes.USER:
print(FMT_CPU + "[CPU] Trap triggered outside of user mode?!" + FMT_NONE)
self.csr.set_mstatus('mpie', self.csr.get_mstatus('mie'))
self.csr.set_mstatus('mpp', self.mode.value)
self.csr.set_mstatus('mie', 0)
self.csr.set('mcause', trap.mcause)
self.csr.set('mepc', self.pc-self.INS_XLEN)
self.csr.set('mtval', trap.mtval)
self.mode = trap.priv
mtvec = self.csr.get('mtvec')
if mtvec & 0b11 == 0:
self.pc = mtvec
if mtvec & 0b11 == 1:
self.pc = (mtvec & 0b11111111111111111111111111111100) + (trap.code * 4)
self.record_perf_profile()
if len(self._perf_counters) % 100 == 0:
self.show_perf()
def show_perf(self):
timed = 0
cycled = 0
cps_list = list()
print(FMT_CPU + "[CPU] Performance overview:")
for time_ns, cycle in self._perf_counters:
if cycled == 0:
cycled = cycle
timed = time_ns
continue
cps = (cycle - cycled) / (time_ns - timed) * 1000000000
# print(" {:03d} cycles in {:08d}ns ({:.2f} cycles/s)".format(
# cycle - cycled,
# time_ns - timed,
# cps
# ))
cycled = cycle
timed = time_ns
cps_list.append(cps)
print(" on average {:.0f} instructions/s".format(sum(cps_list) / len(cps_list)) + FMT_NONE)
self._perf_counters = list()
def record_perf_profile(self):
self._perf_counters.append((time.perf_counter_ns(), self.cycle))

@ -0,0 +1,41 @@
from ..MMU import *
from abc import abstractmethod
import typing
from .ElfLoader import ElfExecutable
if typing.TYPE_CHECKING:
from .PrivCPU import PrivCPU
class PrivMMU(MMU):
cpu: 'PrivCPU'
@abstractmethod
def get_entrypoint(self) -> int:
raise
def set_cpu(self, cpu: 'PrivCPU'):
self.cpu = cpu
def translate_address(self, addr: int):
return ""
class LoadedElfMMU(PrivMMU):
def __init__(self, elf: ElfExecutable):
super().__init__(conf=RunConfig())
self.entrypoint = elf.symbols['_start']
self.binaries.append(elf)
for sec in elf.sections:
self.sections.append(sec)
def load_bin(self, exe: Executable) -> LoadedExecutable:
raise NotImplementedError("This is a privMMU, it's initialized with a single ElfExecutable!")
def allocate_section(self, name: str, req_size: int, flag: MemoryFlags):
raise NotImplementedError("Not supported!")
def get_entrypoint(self):
return self.entrypoint

@ -0,0 +1,160 @@
"""
RiscEmu (c) 2021 Anton Lydike
SPDX-License-Identifier: MIT
"""
from ..instructions.RV32I import *
from ..Exceptions import INS_NOT_IMPLEMENTED
from .Exceptions import *
from .privmodes import PrivModes
from ..colors import FMT_CPU, FMT_NONE
import typing
if typing.TYPE_CHECKING:
from riscemu.priv.PrivCPU import PrivCPU
class PrivRV32I(RV32I):
cpu: 'PrivCPU'
"""
This is an extension of RV32I, written for the PrivCPU class
"""
def instruction_csrrw(self, ins: 'LoadedInstruction'):
rd, rs, csr_addr = self.parse_crs_ins(ins)
old_val = None
if rd != 'zero':
self.cpu.csr.assert_can_read(self.cpu.mode, csr_addr)
old_val = self.cpu.csr.get(csr_addr)
if rs != 'zero':
new_val = self.regs.get(rs)
self.cpu.csr.assert_can_write(self.cpu.mode, csr_addr)
self.cpu.csr.set(csr_addr, new_val)
if old_val is not None:
self.regs.set(rd, old_val)
def instruction_csrrs(self, ins: 'LoadedInstruction'):
rd, rs, csr_addr = self.parse_crs_ins(ins)
if rs != 'zero':
# oh no, this should not happen!
INS_NOT_IMPLEMENTED(ins)
if rd != 'zero':
self.cpu.csr.assert_can_read(self.cpu.mode, csr_addr)
old_val = self.cpu.csr.get(csr_addr)
self.regs.set(rd, old_val)
def instruction_csrrc(self, ins: 'LoadedInstruction'):
INS_NOT_IMPLEMENTED(ins)
def instruction_csrrsi(self, ins: 'LoadedInstruction'):
INS_NOT_IMPLEMENTED(ins)
def instruction_csrrwi(self, ins: 'LoadedInstruction'):
ASSERT_LEN(ins.args, 3)
rd, imm, addr = ins.get_reg(0), ins.get_imm(1), ins.get_imm(2)
if rd != 'zero':
self.cpu.csr.assert_can_read(self.cpu.mode, addr)
old_val = self.cpu.csr.get(addr)
self.regs.set(rd, old_val)
self.cpu.csr.assert_can_write(self.cpu.mode, addr)
self.cpu.csr.set(addr, imm)
def instruction_csrrci(self, ins: 'LoadedInstruction'):
INS_NOT_IMPLEMENTED(ins)
def instruction_mret(self, ins: 'LoadedInstruction'):
if self.cpu.mode != PrivModes.MACHINE:
print("MRET not inside machine level code!")
raise IllegalInstructionTrap(ins)
# retore mie
mpie = self.cpu.csr.get_mstatus('mpie')
self.cpu.csr.set_mstatus('mie', mpie)
# restore priv
mpp = self.cpu.csr.get_mstatus('mpp')
self.cpu.mode = PrivModes(mpp)
# restore pc
mepc = self.cpu.csr.get('mepc')
self.cpu.pc = mepc
sec = self.mmu.get_sec_containing(mepc)
if sec is not None:
print(FMT_CPU + "[CPU] [{}] returning to mode {} in {}".format(
self.cpu.cycle,
PrivModes(mpp).name,
self.mmu.translate_address(mepc)
) + FMT_NONE)
def instruction_uret(self, ins: 'LoadedInstruction'):
raise IllegalInstructionTrap(ins)
def instruction_sret(self, ins: 'LoadedInstruction'):
raise IllegalInstructionTrap(ins)
def instruction_scall(self, ins: 'LoadedInstruction'):
"""
Overwrite the scall from userspace RV32I
"""
raise EcallTrap(self.cpu.mode)
def instruction_beq(self, ins: 'LoadedInstruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
if rs1 == rs2:
self.pc += dst - 4
def instruction_bne(self, ins: 'LoadedInstruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
if rs1 != rs2:
self.pc += dst - 4
def instruction_blt(self, ins: 'LoadedInstruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
if rs1 < rs2:
self.pc += dst - 4
def instruction_bge(self, ins: 'LoadedInstruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
if rs1 >= rs2:
self.pc += dst - 4
def instruction_bltu(self, ins: 'LoadedInstruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins, signed=False)
if rs1 < rs2:
self.pc += dst - 4
def instruction_bgeu(self, ins: 'LoadedInstruction'):
rs1, rs2, dst = self.parse_rs_rs_imm(ins, signed=False)
if rs1 >= rs2:
self.pc += dst - 4
# technically deprecated
def instruction_j(self, ins: 'LoadedInstruction'):
raise NotImplementedError("Should never be reached!")
def instruction_jal(self, ins: 'LoadedInstruction'):
ASSERT_LEN(ins.args, 2)
reg = ins.get_reg(0)
addr = ins.get_imm(1)
self.regs.set(reg, self.pc)
self.pc += addr - 4
def instruction_jalr(self, ins: 'LoadedInstruction'):
ASSERT_LEN(ins.args, 3)
rd, rs, imm = self.parse_rd_rs_imm(ins)
self.regs.set(rd, self.pc)
self.pc = rs + imm - 4
def instruction_sbreak(self, ins: 'LoadedInstruction'):
raise LaunchDebuggerException()
def parse_crs_ins(self, ins: 'LoadedInstruction'):
ASSERT_LEN(ins.args, 3)
return ins.get_reg(0), ins.get_reg(1), ins.get_imm(2)
def parse_mem_ins(self, ins: 'LoadedInstruction') -> Tuple[str, int]:
ASSERT_LEN(ins.args, 3)
addr = self.get_reg_content(ins, 1) + ins.get_imm(2)
reg = ins.get_reg(0)
return reg, addr

@ -0,0 +1,15 @@
"""
RiscEmu (c) 2021 Anton Lydike
SPDX-License-Identifier: MIT
The priv Module holds everything necessary for emulating privileged risc-v assembly
Running priv is only preferable to the normal userspace emulator, if you actually want to emulate the whole system.
Syscalls will have to be intercepted by your assembly code.
The PrivCPU Implements the Risc-V M/U Model, meaning there is machine mode and user mode. No PMP or paging is available.
"""

@ -0,0 +1,37 @@
from .PrivCPU import PrivCPU, RunConfig
from .ImageLoader import MemoryImageMMU
from .PrivMMU import LoadedElfMMU
from .ElfLoader import ElfExecutable
from ..Tokenizer import RiscVInput
from ..ExecutableParser import ExecutableParser
import sys
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='RISC-V privileged architecture emulator', prog='riscemu')
parser.add_argument('--kernel', type=str, help='Kernel elf loaded with user programs', nargs='?')
parser.add_argument('--image', type=str, help='Memory image containing kernel', nargs='?')
parser.add_argument('-v', '--verbose', help="Verbose output", action='store_true')
args = parser.parse_args()
mmu = None
if args.kernel is not None:
mmu = LoadedElfMMU(ElfExecutable(args.kernel))
elif args.image is not None:
mmu = MemoryImageMMU(args.image)
if mmu is None:
print("You must specify one of --kernel or --image for running in privilege mode!")
sys.exit(1)
cpu = PrivCPU(RunConfig(), mmu)
cpu.run(args.verbose)

@ -0,0 +1,7 @@
from enum import IntEnum
class PrivModes(IntEnum):
USER = 0
SUPER = 1
MACHINE = 3
Loading…
Cancel
Save