Merge pull request #1 from AntonLydike/kernel-mode
Adding limited privileged emulation using the `riscemu.priv` modulefloat_support
commit
0b34aea520
@ -1,2 +1,3 @@
|
||||
venv
|
||||
__pycache__
|
||||
__pycache__
|
||||
.mypy_cache
|
||||
|
@ -0,0 +1 @@
|
||||
pyelftools~=0.27
|
@ -0,0 +1,22 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class IOModule(ABC):
|
||||
addr: int
|
||||
size: int
|
||||
|
||||
def __init__(self, addr: int, size: int):
|
||||
self.addr = addr
|
||||
self.size = size
|
||||
|
||||
@abstractmethod
|
||||
def read(self, addr: int, size: int):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def write(self, addr: int, data: bytearray, size: int):
|
||||
pass
|
||||
|
||||
def contains(self, addr, size: int = 0):
|
||||
return self.addr <= addr < self.addr + self.size and \
|
||||
self.addr <= addr + size <= self.addr + self.size
|
@ -0,0 +1,92 @@
|
||||
from .IOModule import IOModule
|
||||
from ..priv.Exceptions import InstructionAccessFault
|
||||
from ..helpers import int_from_bytes
|
||||
from threading import Thread
|
||||
import time
|
||||
|
||||
|
||||
def _window_loop(textIO: 'TextIO'):
|
||||
try:
|
||||
import PySimpleGUI as sg
|
||||
|
||||
logs = sg.Text(font="monospace")
|
||||
col = sg.Column([[logs]], size=(640, 400), scrollable=True)
|
||||
window = sg.Window("TextIO:{:x}".format(textIO.addr), [[col]])
|
||||
lines = list()
|
||||
|
||||
window.finalize()
|
||||
textIO.set_sg_window(window)
|
||||
while True:
|
||||
e, v = window.read()
|
||||
if e == sg.WINDOW_CLOSED:
|
||||
window.close()
|
||||
textIO.set_sg_window(None)
|
||||
break
|
||||
if e == 'putlog':
|
||||
lines.insert(0, v[0])
|
||||
logs.update(value='\n'.join(lines) + '\n')
|
||||
col.contents_changed()
|
||||
|
||||
except ImportError:
|
||||
print("[TextIO] window disabled - please install PySimpleGui!")
|
||||
textIO.set_sg_window(None)
|
||||
|
||||
class TextIO(IOModule):
|
||||
def __init__(self, addr: int, buflen: int = 128):
|
||||
super(TextIO, self).__init__(addr, buflen + 4)
|
||||
self.buff = bytearray(buflen)
|
||||
self.current_line = ""
|
||||
self.sg_window = None
|
||||
self.start_buffer = list()
|
||||
|
||||
self.thread = Thread(target=_window_loop, args=(self,))
|
||||
self.thread.start()
|
||||
time.sleep(0.1)
|
||||
|
||||
def set_sg_window(self, window):
|
||||
if self.sg_window is not None and window is not None:
|
||||
raise Exception("cannot set window twice!")
|
||||
self.sg_window = window
|
||||
|
||||
buff = self.start_buffer
|
||||
self.start_buffer = None if window is None else list()
|
||||
|
||||
for line in buff:
|
||||
self._present(line)
|
||||
|
||||
def read(self, addr: int, size: int) -> bytearray:
|
||||
raise InstructionAccessFault(addr)
|
||||
|
||||
def write(self, addr: int, data: bytearray, size: int):
|
||||
if addr == self.addr:
|
||||
if size > 4:
|
||||
raise InstructionAccessFault(addr)
|
||||
if int_from_bytes(data[0:4]) > 0:
|
||||
self._print()
|
||||
return
|
||||
buff_start = addr - self.addr - 4
|
||||
self.buff[buff_start:buff_start + size] = data[0:size]
|
||||
|
||||
def _print(self):
|
||||
buff = self.buff
|
||||
self.buff = bytearray(self.size)
|
||||
if b'\x00' in buff:
|
||||
buff = buff.split(b'\x00')[0]
|
||||
text = buff.decode('ascii')
|
||||
if '\n' in text:
|
||||
lines = text.split("\n")
|
||||
lines[0] = self.current_line + lines[0]
|
||||
for line in lines[:-1]:
|
||||
self._present(line)
|
||||
self.current_line = lines[-1]
|
||||
else:
|
||||
self.current_line += text
|
||||
|
||||
def _present(self, text: str):
|
||||
if self.sg_window is not None:
|
||||
self.sg_window.write_event_value('putlog', text)
|
||||
elif self.start_buffer is not None:
|
||||
self.start_buffer.append(text)
|
||||
else:
|
||||
print("[TextIO:{:x}] {}".format(self.addr, text))
|
||||
|
@ -0,0 +1 @@
|
||||
from .decoder import decode, RISCV_REGS
|
@ -0,0 +1,17 @@
|
||||
if __name__ == '__main__':
|
||||
import code
|
||||
import readline
|
||||
import rlcompleter
|
||||
|
||||
from .decoder import *
|
||||
from .formats import *
|
||||
from .instruction_table import *
|
||||
from .regs import RISCV_REGS
|
||||
|
||||
sess_vars = globals()
|
||||
sess_vars.update(locals())
|
||||
|
||||
readline.set_completer(rlcompleter.Completer(sess_vars).complete)
|
||||
readline.set_completer(rlcompleter.Completer(sess_vars).complete)
|
||||
readline.parse_and_bind("tab: complete")
|
||||
code.InteractiveConsole(sess_vars).interact(banner="Interaktive decoding session started...", exitmsg="Closing...")
|
@ -0,0 +1,88 @@
|
||||
from .instruction_table import *
|
||||
from typing import Tuple, List
|
||||
|
||||
|
||||
def print_ins(ins: int):
|
||||
print(" f7 rs2 rs1 f3 rd op")
|
||||
print(
|
||||
f"0b{ins >> 25 :07b}_{(ins >> 20) & 0b11111:05b}_{(ins >> 15) & 0b11111:05b}_{(ins >> 12) & 0b111:03b}_{(ins >> 7) & 0b11111:05b}_{ins & 0b1111111:07b}");
|
||||
|
||||
|
||||
STATIC_INSN: Dict[int, Tuple[str, List[int], int]] = {
|
||||
0x00000013: ("nop", [], 0x00000013),
|
||||
0x00008067: ("ret", [], 0x00008067),
|
||||
0xfe010113: ("addi", [2, 2, -32], 0xfe010113),
|
||||
0x02010113: ("addi", [2, 2, 32], 0x02010113),
|
||||
0x00100073: ("ebreak", [], 0x00100073),
|
||||
0x00000073: ("ecall", [], 0x00000073),
|
||||
0x30200073: ("mret", [], 0x30200073),
|
||||
0x00200073: ("uret", [], 0x00200073),
|
||||
0x10200073: ("sret", [], 0x10200073),
|
||||
0x10500073: ("wfi", [], 0x10500073),
|
||||
}
|
||||
|
||||
|
||||
def int_from_ins(insn: bytearray):
|
||||
return int.from_bytes(insn, 'little')
|
||||
|
||||
|
||||
def name_from_insn(ins: int):
|
||||
opcode = op(ins)
|
||||
if opcode not in RV32:
|
||||
print_ins(ins)
|
||||
raise RuntimeError(f"Invalid opcode: {opcode:0x} in insn {ins:x}")
|
||||
dec = RV32[opcode]
|
||||
|
||||
if isinstance(dec, str):
|
||||
return dec
|
||||
|
||||
fun3 = funct3(ins)
|
||||
if fun3 not in dec:
|
||||
print_ins(ins)
|
||||
raise RuntimeError(f"Invalid funct3: {fun3:0x} in insn {ins:x}")
|
||||
|
||||
dec = dec[fun3]
|
||||
if isinstance(dec, str):
|
||||
return dec
|
||||
|
||||
if opcode == 0x1c and fun3 == 0:
|
||||
# we have ecall/ebreak
|
||||
token = imm110(ins)
|
||||
if token in dec:
|
||||
return dec[token]
|
||||
print_ins(ins)
|
||||
raise RuntimeError(f"Invalid instruction in ebreak/ecall region: {ins:x}")
|
||||
|
||||
fun7 = funct7(ins)
|
||||
if opcode == 0b1011 and fun3 == 0b10:
|
||||
# ignore the two aq/lr bits located in the fun7 block
|
||||
# riscemu has no memory reordering, therefore we don't need to look at these bits ever
|
||||
fun7 = fun7 >> 2
|
||||
|
||||
if fun7 in dec:
|
||||
if opcode == 0x0C or (opcode == 0x04 and fun3 == 5):
|
||||
dec = dec[fun7]
|
||||
return dec
|
||||
print("unknown instruction?!")
|
||||
return dec[fun7]
|
||||
|
||||
print_ins(ins)
|
||||
raise RuntimeError(f"Invalid instruction: {ins:x}")
|
||||
|
||||
|
||||
def decode(ins: Union[bytearray, bytes]) -> Tuple[str, List[int], int]:
|
||||
insn = int_from_ins(ins)
|
||||
|
||||
if insn & 3 != 3:
|
||||
print_ins(insn)
|
||||
raise RuntimeError("Not a RV32 instruction!")
|
||||
|
||||
if insn in STATIC_INSN:
|
||||
return STATIC_INSN[insn]
|
||||
|
||||
opcode = op(insn)
|
||||
if opcode not in INSTRUCTION_ARGS_DECODER:
|
||||
print_ins(insn)
|
||||
raise RuntimeError("No instruction decoder found for instruction")
|
||||
|
||||
return name_from_insn(insn), INSTRUCTION_ARGS_DECODER[opcode](insn), insn
|
@ -0,0 +1,117 @@
|
||||
from typing import Dict, Callable, List, Union
|
||||
from .regs import RISCV_REGS
|
||||
|
||||
def op(ins: int):
|
||||
return (ins >> 2) & 31
|
||||
|
||||
|
||||
def rd(ins: int):
|
||||
return (ins >> 7) & 31
|
||||
|
||||
|
||||
def funct3(ins: int):
|
||||
return (ins >> 12) & 7
|
||||
|
||||
|
||||
def rs1(ins: int):
|
||||
return (ins >> 15) & 31
|
||||
|
||||
|
||||
def rs2(ins: int):
|
||||
return (ins >> 20) & 31
|
||||
|
||||
|
||||
def funct7(ins: int):
|
||||
return ins >> 25
|
||||
|
||||
|
||||
def imm110(ins: int):
|
||||
return ins >> 20
|
||||
|
||||
|
||||
def imm3112(ins: int):
|
||||
return ins >> 12
|
||||
|
||||
|
||||
def imm_i(ins: int):
|
||||
return sign_extend(imm110(ins), 12)
|
||||
|
||||
|
||||
def imm_s(ins: int):
|
||||
num = (funct7(ins) << 5) + rd(ins)
|
||||
return sign_extend(num, 12)
|
||||
|
||||
|
||||
def imm_b(ins: int):
|
||||
lower = rd(ins)
|
||||
higher = funct7(ins)
|
||||
|
||||
num = (lower & 0b11110) + ((higher & 0b0111111) << 5) + ((lower & 1) << 11) + ((higher >> 6) << 12)
|
||||
return sign_extend(num, 13)
|
||||
|
||||
|
||||
def imm_u(ins: int):
|
||||
return sign_extend(imm3112(ins), 20)
|
||||
|
||||
|
||||
def imm_j(ins: int):
|
||||
return sign_extend(
|
||||
(((ins >> 21) & 0b1111111111) << 1) +
|
||||
(((ins >> 20) & 1) << 11) +
|
||||
(((ins >> 12) & 0b11111111) << 12) +
|
||||
(((ins >> 31) & 1) << 20), 21
|
||||
)
|
||||
|
||||
|
||||
def sign_extend(num, bits):
|
||||
sign_mask = 1 << (bits - 1)
|
||||
return (num & (sign_mask - 1)) - (num & sign_mask)
|
||||
|
||||
|
||||
def decode_i(ins: int) -> List[int]:
|
||||
return [rd(ins), rs1(ins), imm_i(ins)]
|
||||
|
||||
|
||||
def decode_b(ins: int) -> List[int]:
|
||||
return [rs1(ins), rs2(ins), imm_b(ins)]
|
||||
|
||||
|
||||
def decode_u(ins: int) -> List[int]:
|
||||
return [rd(ins), imm_u(ins)]
|
||||
|
||||
|
||||
def decode_r(ins: int) -> List[int]:
|
||||
return [rd(ins), rs1(ins), rs2(ins)]
|
||||
|
||||
|
||||
def decode_s(ins: int) -> List[int]:
|
||||
return [rs2(ins), rs1(ins), imm_s(ins)]
|
||||
|
||||
|
||||
def decode_j(ins: int) -> List[int]:
|
||||
return [rd(ins), imm_j(ins)]
|
||||
|
||||
|
||||
def decode_i_shamt(ins: int) -> List[int]:
|
||||
if funct3(ins) in (1, 5):
|
||||
return [rd(ins), rs1(ins), rs2(ins)]
|
||||
return decode_i(ins)
|
||||
|
||||
|
||||
def decode_i_unsigned(ins: int) -> List[int]:
|
||||
return [rd(ins), rs1(ins), imm110(ins)]
|
||||
|
||||
|
||||
INSTRUCTION_ARGS_DECODER: Dict[int, Callable[[int], List[int]]] = {
|
||||
0x00: decode_i,
|
||||
0x04: decode_i_shamt,
|
||||
0x05: decode_u,
|
||||
0x08: decode_s,
|
||||
0x0C: decode_r,
|
||||
0x0D: decode_u,
|
||||
0x18: decode_b,
|
||||
0x19: decode_i,
|
||||
0x1b: decode_j,
|
||||
0x1c: decode_i_unsigned,
|
||||
0b1011: decode_r
|
||||
}
|
@ -0,0 +1,81 @@
|
||||
from collections import defaultdict
|
||||
from .formats import *
|
||||
|
||||
tbl = lambda: defaultdict(tbl)
|
||||
|
||||
RV32 = tbl()
|
||||
RV32[0x1b] = "jal"
|
||||
RV32[0x0D] = "lui"
|
||||
RV32[0x05] = "auipc"
|
||||
RV32[0x19][0] = "jalr"
|
||||
|
||||
RV32[0x04][0] = "addi"
|
||||
RV32[0x04][1] = "slli"
|
||||
RV32[0x04][2] = "slti"
|
||||
RV32[0x04][3] = "sltiu"
|
||||
RV32[0x04][4] = "xori"
|
||||
RV32[0x04][5][0x00] = "srli"
|
||||
RV32[0x04][5][0x20] = "srai"
|
||||
RV32[0x04][6] = "ori"
|
||||
RV32[0x04][7] = "andi"
|
||||
|
||||
RV32[0x18][0] = "beq"
|
||||
RV32[0x18][1] = "bne"
|
||||
RV32[0x18][4] = "blt"
|
||||
RV32[0x18][5] = "bge"
|
||||
RV32[0x18][6] = "bltu"
|
||||
RV32[0x18][7] = "bgeu"
|
||||
|
||||
RV32[0x00][0] = "lb"
|
||||
RV32[0x00][1] = "lh"
|
||||
RV32[0x00][2] = "lw"
|
||||
RV32[0x00][4] = "lbu"
|
||||
RV32[0x00][5] = "lhu"
|
||||
|
||||
RV32[0x08][0] = "sb"
|
||||
RV32[0x08][1] = "sh"
|
||||
RV32[0x08][2] = "sw"
|
||||
|
||||
RV32[0x1c][1] = "csrrw"
|
||||
RV32[0x1c][2] = "csrrs"
|
||||
RV32[0x1c][3] = "csrrc"
|
||||
RV32[0x1c][5] = "csrrwi"
|
||||
RV32[0x1c][6] = "csrrsi"
|
||||
RV32[0x1c][7] = "csrrci"
|
||||
|
||||
RV32[0x1c][0][0] = "ecall"
|
||||
RV32[0x1c][0][1] = "ebreak"
|
||||
|
||||
RV32[0x0C][0][0] = "add"
|
||||
RV32[0x0C][0][32] = "sub"
|
||||
RV32[0x0C][1][0] = "sll"
|
||||
RV32[0x0C][2][0] = "slt"
|
||||
RV32[0x0C][3][0] = "sltu"
|
||||
RV32[0x0C][4][0] = "xor"
|
||||
RV32[0x0C][5][0] = "srl"
|
||||
RV32[0x0C][5][32] = "sra"
|
||||
RV32[0x0C][6][0] = "or"
|
||||
RV32[0x0C][7][0] = "and"
|
||||
|
||||
# rv32m
|
||||
RV32[0x0C][0][1] = "mul"
|
||||
RV32[0x0C][1][1] = "mulh"
|
||||
RV32[0x0C][2][1] = "mulhsu"
|
||||
RV32[0x0C][3][1] = "mulhu"
|
||||
RV32[0x0C][4][1] = "div"
|
||||
RV32[0x0C][5][1] = "divu"
|
||||
RV32[0x0C][6][1] = "rem"
|
||||
RV32[0x0C][7][1] = "remu"
|
||||
|
||||
# rv32a
|
||||
RV32[0b1011][0b10][0b00010] = "lr.w"
|
||||
RV32[0b1011][0b10][0b00011] = "sc.w"
|
||||
RV32[0b1011][0b10][0b00001] = "amoswap.w"
|
||||
RV32[0b1011][0b10][0b00000] = "amoadd.w"
|
||||
RV32[0b1011][0b10][0b00100] = "amoxor.w"
|
||||
RV32[0b1011][0b10][0b01100] = "amoand.w"
|
||||
RV32[0b1011][0b10][0b01000] = "amoor.w"
|
||||
RV32[0b1011][0b10][0b10000] = "amomin.w"
|
||||
RV32[0b1011][0b10][0b10100] = "amomax.w"
|
||||
RV32[0b1011][0b10][0b11000] = "amominu.w"
|
||||
RV32[0b1011][0b10][0b11100] = "amomaxu.w"
|
@ -0,0 +1,6 @@
|
||||
RISCV_REGS = [
|
||||
'zero', 'ra', 'sp', 'gp', 'tp', 't0', 't1', 't2',
|
||||
's0', 's1', 'a0', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7',
|
||||
's2', 's3', 's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11',
|
||||
't3', 't4', 't5', 't6'
|
||||
]
|
@ -0,0 +1,78 @@
|
||||
from .InstructionSet import InstructionSet, LoadedInstruction
|
||||
from ..Exceptions import INS_NOT_IMPLEMENTED
|
||||
from ..helpers import int_from_bytes, int_to_bytes, to_unsigned, to_signed
|
||||
|
||||
|
||||
class RV32A(InstructionSet):
|
||||
"""
|
||||
The RV32A instruction set. Currently, load-reserved and store conditionally are not supported
|
||||
due to limitations in the way the MMU is implemented. Maybe a later implementation will add support
|
||||
for this?
|
||||
"""
|
||||
|
||||
def instruction_lr_w(self, ins: 'LoadedInstruction'):
|
||||
INS_NOT_IMPLEMENTED(ins)
|
||||
|
||||
def instruction_sc_w(self, ins: 'LoadedInstruction'):
|
||||
INS_NOT_IMPLEMENTED(ins)
|
||||
|
||||
def instruction_amoswap_w(self, ins: 'LoadedInstruction'):
|
||||
dest, addr, val = self.parse_rd_rs_rs(ins)
|
||||
if dest == 'zero':
|
||||
self.mmu.write(addr, int_to_bytes(addr, 4))
|
||||
else:
|
||||
old = int_from_bytes(self.mmu.read(addr, 4))
|
||||
self.mmu.write(addr, int_to_bytes(val, 4))
|
||||
self.regs.set(dest, old)
|
||||
|
||||
def instruction_amoadd_w(self, ins: 'LoadedInstruction'):
|
||||
dest, addr, val = self.parse_rd_rs_rs(ins)
|
||||
old = int_from_bytes(self.mmu.read(addr, 4))
|
||||
self.mmu.write(addr, int_to_bytes(old + val, 4))
|
||||
self.regs.set(dest, old)
|
||||
|
||||
def instruction_amoand_w(self, ins: 'LoadedInstruction'):
|
||||
dest, addr, val = self.parse_rd_rs_rs(ins)
|
||||
old = int_from_bytes(self.mmu.read(addr, 4))
|
||||
self.mmu.write(addr, int_to_bytes(old & val, 4))
|
||||
self.regs.set(dest, old)
|
||||
|
||||
def instruction_amoor_w(self, ins: 'LoadedInstruction'):
|
||||
dest, addr, val = self.parse_rd_rs_rs(ins)
|
||||
old = int_from_bytes(self.mmu.read(addr, 4))
|
||||
self.mmu.write(addr, int_to_bytes(old | val, 4))
|
||||
self.regs.set(dest, old)
|
||||
|
||||
def instruction_amoxor_w(self, ins: 'LoadedInstruction'):
|
||||
dest, addr, val = self.parse_rd_rs_rs(ins)
|
||||
old = int_from_bytes(self.mmu.read(addr, 4))
|
||||
self.mmu.write(addr, int_to_bytes(old ^ val, 4))
|
||||
self.regs.set(dest, old)
|
||||
|
||||
def instruction_amomax_w(self, ins: 'LoadedInstruction'):
|
||||
dest, addr, val = self.parse_rd_rs_rs(ins)
|
||||
old = int_from_bytes(self.mmu.read(addr, 4))
|
||||
self.mmu.write(addr, int_to_bytes(max(old, val), 4))
|
||||
self.regs.set(dest, old)
|
||||
|
||||
def instruction_amomaxu_w(self, ins: 'LoadedInstruction'):
|
||||
dest, addr, val = self.parse_rd_rs_rs(ins)
|
||||
val = to_unsigned(val)
|
||||
old = int_from_bytes(self.mmu.read(addr, 4), unsigned=True)
|
||||
|
||||
self.mmu.write(addr, int_to_bytes(to_signed(max(old, val)), 4))
|
||||
self.regs.set(dest, old)
|
||||
|
||||
def instruction_amomin_w(self, ins: 'LoadedInstruction'):
|
||||
dest, addr, val = self.parse_rd_rs_rs(ins)
|
||||
old = int_from_bytes(self.mmu.read(addr, 4))
|
||||
self.mmu.write(addr, int_to_bytes(min(old, val), 4))
|
||||
self.regs.set(dest, old)
|
||||
|
||||
def instruction_amominu_w(self, ins: 'LoadedInstruction'):
|
||||
dest, addr, val = self.parse_rd_rs_rs(ins)
|
||||
val = to_unsigned(val)
|
||||
old = int_from_bytes(self.mmu.read(addr, 4), unsigned=True)
|
||||
|
||||
self.mmu.write(addr, int_to_bytes(to_signed(min(old, val)), 4))
|
||||
self.regs.set(dest, old)
|
@ -0,0 +1,138 @@
|
||||
from typing import Dict, Union, Callable, Optional
|
||||
from collections import defaultdict
|
||||
from .privmodes import PrivModes
|
||||
from .Exceptions import InstructionAccessFault
|
||||
from ..helpers import to_signed
|
||||
from ..colors import FMT_CSR, FMT_NONE
|
||||
|
||||
from .CSRConsts import CSR_NAME_TO_ADDR, MSTATUS_LEN_2, MSTATUS_OFFSETS
|
||||
|
||||
|
||||
class CSR:
|
||||
"""
|
||||
This holds all Control and Status Registers (CSR)
|
||||
"""
|
||||
regs: Dict[int, int]
|
||||
"""
|
||||
All Control and Status Registers are stored here
|
||||
"""
|
||||
|
||||
virtual_regs: Dict[int, Callable[[], int]]
|
||||
"""
|
||||
list of virtual CSR registers, with values computed on read
|
||||
"""
|
||||
|
||||
listeners: Dict[int, Callable[[int, int], None]]
|
||||
|
||||
mstatus_cache: Dict[str, int]
|
||||
mstatus_cache_dirty = True
|
||||
|
||||
def __init__(self):
|
||||
self.regs = defaultdict(lambda: 0)
|
||||
self.listeners = defaultdict(lambda: (lambda x, y: None))
|
||||
self.virtual_regs = dict()
|
||||
self.mstatus_cache = dict()
|
||||
# TODO: implement write masks (bitmasks which control writeable bits in registers
|
||||
|
||||
def set(self, addr: Union[str, int], val: int):
|
||||
addr = self._name_to_addr(addr)
|
||||
if addr is None:
|
||||
return
|
||||
val = to_signed(val)
|
||||
self.listeners[addr](self.regs[addr], val)
|
||||
if addr == 0x300:
|
||||
self.mstatus_cache_dirty = True
|
||||
self.regs[addr] = val
|
||||
|
||||
def get(self, addr: Union[str, int]) -> int:
|
||||
addr = self._name_to_addr(addr)
|
||||
if addr is None:
|
||||
raise RuntimeError(f"Invalid CSR name: {addr}!")
|
||||
if addr in self.virtual_regs:
|
||||
return self.virtual_regs[addr]()
|
||||
return self.regs[addr]
|
||||
|
||||
def set_listener(self, addr: Union[str, int], listener: Callable[[int, int], None]):
|
||||
addr = self._name_to_addr(addr)
|
||||
if addr is None:
|
||||
print("unknown csr address name: {}".format(addr))
|
||||
return
|
||||
self.listeners[addr] = listener
|
||||
|
||||
# mstatus properties
|
||||
def set_mstatus(self, name: str, val: int):
|
||||
"""
|
||||
Set mstatus bits using this helper. mstatus is a 32 bit register, holding various machine status flags
|
||||
Setting them by hand is super painful, so this helper allows you to set specific bits.
|
||||
|
||||
Please make sure your supplied value has the correct width!
|
||||
|
||||
:param name:
|
||||
:param val:
|
||||
:return:
|
||||
"""
|
||||
size = 2 if name in MSTATUS_LEN_2 else 1
|
||||
off = MSTATUS_OFFSETS[name]
|
||||
mask = (2 ** size - 1) << off
|
||||
old_val = self.get('mstatus')
|
||||
erased = old_val & (~mask)
|
||||
new_val = erased | (val << off)
|
||||
self.set('mstatus', new_val)
|
||||
|
||||
def get_mstatus(self, name) -> int:
|
||||
if not self.mstatus_cache_dirty and name in self.mstatus_cache:
|
||||
return self.mstatus_cache[name]
|
||||
|
||||
size = 2 if name in MSTATUS_LEN_2 else 1
|
||||
off = MSTATUS_OFFSETS[name]
|
||||
mask = (2 ** size - 1) << off
|
||||
val = (self.get('mstatus') & mask) >> off
|
||||
if self.mstatus_cache_dirty:
|
||||
self.mstatus_cache = dict(name=val)
|
||||
else:
|
||||
self.mstatus_cache[name] = val
|
||||
return val
|
||||
|
||||
def callback(self, addr: Union[str, int]):
|
||||
def inner(func: Callable[[int, int], None]):
|
||||
self.set_listener(addr, func)
|
||||
return func
|
||||
|
||||
return inner
|
||||
|
||||
def assert_can_read(self, mode: PrivModes, addr: int):
|
||||
if (addr >> 8) & 3 > mode.value:
|
||||
raise InstructionAccessFault(addr)
|
||||
|
||||
def assert_can_write(self, mode: PrivModes, addr: int):
|
||||
if (addr >> 8) & 3 > mode.value or addr >> 10 == 11:
|
||||
raise InstructionAccessFault(addr)
|
||||
|
||||
def _name_to_addr(self, addr: Union[str, int]) -> Optional[int]:
|
||||
if isinstance(addr, str):
|
||||
if addr not in CSR_NAME_TO_ADDR:
|
||||
print("Unknown CSR register {}".format(addr))
|
||||
return None
|
||||
return CSR_NAME_TO_ADDR[addr]
|
||||
return addr
|
||||
|
||||
def virtual_register(self, addr: Union[str, int]):
|
||||
addr = self._name_to_addr(addr)
|
||||
if addr is None:
|
||||
print("unknown csr address name: {}".format(addr))
|
||||
|
||||
def inner(func: Callable[[], int]):
|
||||
self.virtual_regs[addr] = func
|
||||
return func
|
||||
|
||||
return inner
|
||||
|
||||
def dump_mstatus(self):
|
||||
print(FMT_CSR + "[CSR] dumping mstatus:")
|
||||
i = 0
|
||||
for name in MSTATUS_OFFSETS:
|
||||
print(" {:<5} {}".format(name, self.get_mstatus(name)), end="")
|
||||
if i % 6 == 5:
|
||||
print()
|
||||
i += 1
|
||||
print(FMT_NONE)
|
@ -0,0 +1,81 @@
|
||||
from typing import Dict, Tuple
|
||||
|
||||
MCAUSE_TRANSLATION: Dict[Tuple[int, int], str]= {
|
||||
(1, 0): 'User software interrupt',
|
||||
(1, 1): 'Supervisor software interrupt',
|
||||
(1, 3): 'Machine software interrupt',
|
||||
(1, 4): 'User timer interrupt',
|
||||
(1, 5): 'Supervisor timer interrupt',
|
||||
(1, 7): 'Machine timer interrupt',
|
||||
(1, 8): 'User external interrupt',
|
||||
(1, 9): 'Supervisor external interrupt',
|
||||
(1, 11): 'Machine external interrupt',
|
||||
(0, 0): 'Instruction address misaligned',
|
||||
(0, 1): 'Instruction access fault',
|
||||
(0, 2): 'Illegal instruction',
|
||||
(0, 3): 'Breakpoint',
|
||||
(0, 4): 'Load address misaligned',
|
||||
(0, 5): 'Load access fault',
|
||||
(0, 6): 'Store/AMO address misaligned',
|
||||
(0, 7): 'Store/AMO access fault',
|
||||
(0, 8): 'environment call from user mode',
|
||||
(0, 9): 'environment call from supervisor mode',
|
||||
(0, 11): 'environment call from machine mode',
|
||||
(0, 12): 'Instruction page fault',
|
||||
(0, 13): 'Load page fault',
|
||||
(0, 15): 'Store/AMO page fault',
|
||||
}
|
||||
"""
|
||||
Assigns tuple (interrupt bit, exception code) to their respective readable names
|
||||
"""
|
||||
|
||||
MSTATUS_OFFSETS: Dict[str, int] = {
|
||||
'uie': 0,
|
||||
'sie': 1,
|
||||
'mie': 3,
|
||||
'upie': 4,
|
||||
'spie': 5,
|
||||
'mpie': 7,
|
||||
'spp': 8,
|
||||
'mpp': 11,
|
||||
'fs': 13,
|
||||
'xs': 15,
|
||||
'mpriv': 17,
|
||||
'sum': 18,
|
||||
'mxr': 19,
|
||||
'tvm': 20,
|
||||
'tw': 21,
|
||||
'tsr': 22,
|
||||
'sd': 31
|
||||
}
|
||||
"""
|
||||
Offsets for all mstatus bits
|
||||
"""
|
||||
|
||||
MSTATUS_LEN_2 = ('mpp', 'fs', 'xs')
|
||||
"""
|
||||
All mstatus parts that have length 2. All other mstatus parts have length 1
|
||||
"""
|
||||
|
||||
CSR_NAME_TO_ADDR: Dict[str, int] = {
|
||||
'mstatus': 0x300,
|
||||
'misa': 0x301,
|
||||
'mie': 0x304,
|
||||
'mtvec': 0x305,
|
||||
'mepc': 0x341,
|
||||
'mcause': 0x342,
|
||||
'mtval': 0x343,
|
||||
'mip': 0x344,
|
||||
'mvendorid': 0xF11,
|
||||
'marchid': 0xF12,
|
||||
'mimpid': 0xF13,
|
||||
'mhartid': 0xF14,
|
||||
'time': 0xc01,
|
||||
'timeh': 0xc81,
|
||||
'halt': 0x789,
|
||||
'mtimecmp': 0x780,
|
||||
'mtimecmph': 0x781,
|
||||
}
|
||||
"""
|
||||
Translation for named registers
|
||||
"""
|
@ -0,0 +1,158 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Dict, Tuple
|
||||
|
||||
from .Exceptions import *
|
||||
from ..Exceptions import RiscemuBaseException
|
||||
from ..Executable import MemoryFlags, LoadedMemorySection
|
||||
from ..decoder import decode, RISCV_REGS
|
||||
from ..helpers import FMT_PARSE, FMT_NONE, FMT_GREEN, FMT_BOLD
|
||||
|
||||
FMT_ELF = FMT_GREEN + FMT_BOLD
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from elftools.elf.elffile import ELFFile
|
||||
from elftools.elf.sections import Section, SymbolTableSection
|
||||
|
||||
# This requires pyelftools package!
|
||||
|
||||
INCLUDE_SEC = ('.text', '.stack', '.bss', '.sdata', '.sbss')
|
||||
|
||||
|
||||
class ElfExecutable:
|
||||
sections: List['ElfLoadedMemorySection']
|
||||
sections_by_name: Dict[str, 'ElfLoadedMemorySection']
|
||||
symbols: Dict[str, int]
|
||||
run_ptr: int
|
||||
|
||||
def __init__(self, name: str):
|
||||
self.sections = list()
|
||||
self.sections_by_name = dict()
|
||||
self.symbols = dict()
|
||||
|
||||
try:
|
||||
from elftools.elf.elffile import ELFFile
|
||||
from elftools.elf.sections import Section, SymbolTableSection
|
||||
|
||||
with open(name, 'rb') as f:
|
||||
print(FMT_ELF + "[ElfLoader] Loading elf executable from: {}".format(name) + FMT_NONE)
|
||||
self._read_elf(ELFFile(f))
|
||||
except ImportError as e:
|
||||
print(FMT_PARSE + "[ElfLoader] Cannot load elf files without PyElfTools package! You can install them using pip install pyelftools!" + FMT_NONE)
|
||||
raise e
|
||||
|
||||
def _read_elf(self, elf: 'ELFFile'):
|
||||
if not elf.header.e_machine == 'EM_RISCV':
|
||||
raise InvalidElfException("Not a RISC-V elf file!")
|
||||
if not elf.header.e_ident.EI_CLASS == 'ELFCLASS32':
|
||||
raise InvalidElfException("Only 32bit executables are supported!")
|
||||
|
||||
self.run_ptr = elf.header.e_entry
|
||||
|
||||
from elftools.elf.sections import SymbolTableSection
|
||||
for sec in elf.iter_sections():
|
||||
if isinstance(sec, SymbolTableSection):
|
||||
self._parse_symtab(sec)
|
||||
continue
|
||||
|
||||
if sec.name not in INCLUDE_SEC:
|
||||
continue
|
||||
|
||||
self.add_sec(self._lms_from_elf_sec(sec, 'kernel'))
|
||||
|
||||
def _lms_from_elf_sec(self, sec: 'Section', owner: str):
|
||||
is_code = sec.name in ('.text',)
|
||||
data = bytearray(sec.data())
|
||||
flags = MemoryFlags(is_code, is_code)
|
||||
print(FMT_ELF + "[ElfLoader] Section {} at: {:X}".format(sec.name, sec.header.sh_addr) + FMT_NONE)
|
||||
return ElfLoadedMemorySection(
|
||||
sec.name,
|
||||
sec.header.sh_addr,
|
||||
sec.data_size,
|
||||
data,
|
||||
flags,
|
||||
owner
|
||||
)
|
||||
|
||||
def _parse_symtab(self, symtab: 'SymbolTableSection'):
|
||||
self.symbols = {
|
||||
sym.name: sym.entry.st_value for sym in symtab.iter_symbols() if sym.name
|
||||
}
|
||||
|
||||
def add_sec(self, new_sec: 'ElfLoadedMemorySection'):
|
||||
for sec in self.sections:
|
||||
if sec.base < sec.end <= new_sec.base or sec.end > sec.base >= new_sec.end:
|
||||
continue
|
||||
else:
|
||||
print(FMT_ELF + "[ElfLoader] Invalid elf layout: Two sections overlap: \n\t{}\n\t{}".format(
|
||||
sec, new_sec
|
||||
) + FMT_NONE)
|
||||
raise RuntimeError("Cannot load elf with overlapping sections!")
|
||||
|
||||
self.sections.append(new_sec)
|
||||
self.sections_by_name[new_sec.name] = new_sec
|
||||
|
||||
|
||||
class InvalidElfException(RiscemuBaseException):
|
||||
def __init__(self, msg: str):
|
||||
super().__init__()
|
||||
self.msg = msg
|
||||
|
||||
def message(self):
|
||||
return FMT_PARSE + "{}(\"{}\")".format(self.__class__.__name__, self.msg) + FMT_NONE
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ElfInstruction:
|
||||
name: str
|
||||
args: List[int]
|
||||
encoded: int
|
||||
|
||||
def get_imm(self, num: int) -> int:
|
||||
return self.args[num]
|
||||
|
||||
def get_imm_reg(self, num: int) -> Tuple[int, int]:
|
||||
return self.args[-1], self.args[-2]
|
||||
|
||||
def get_reg(self, num: int) -> str:
|
||||
return RISCV_REGS[self.args[num]]
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if self.name == 'jal' and self.args[0] == 0:
|
||||
return "j {}".format(self.args[1])
|
||||
elif self.name in ('lw', 'lh', 'lb', 'lbu', 'lhu', 'sw', 'sh', 'sb'):
|
||||
args = "{}, {}({})".format(
|
||||
RISCV_REGS[self.args[0]], self.args[2], RISCV_REGS[self.args[1]]
|
||||
)
|
||||
else:
|
||||
args = ", ".join(map(str, self.args))
|
||||
return "{:<8} {}".format(
|
||||
self.name,
|
||||
args
|
||||
)
|
||||
|
||||
|
||||
class ElfLoadedMemorySection(LoadedMemorySection):
|
||||
ins_cache: List[Optional[ElfInstruction]]
|
||||
"""
|
||||
A fast cache for accessing pre-decoded instructions
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.__setattr__('ins_cache', [None] * (self.size // 4))
|
||||
|
||||
def read_instruction(self, offset):
|
||||
if self.ins_cache[offset//4] is not None:
|
||||
return self.ins_cache[offset//4]
|
||||
if not self.flags.executable:
|
||||
print(FMT_PARSE + "Reading instruction from non-executable memory!" + FMT_NONE)
|
||||
raise InstructionAccessFault(offset + self.base)
|
||||
if offset % 4 != 0:
|
||||
raise InstructionAddressMisalignedTrap(offset + self.base)
|
||||
ins = ElfInstruction(*decode(self.content[offset:offset + 4]))
|
||||
self.ins_cache[offset // 4] = ins
|
||||
return ins
|
||||
|
||||
@property
|
||||
def end(self):
|
||||
return self.size + self.base
|
@ -0,0 +1,91 @@
|
||||
from typing import Optional, NewType
|
||||
from enum import Enum
|
||||
from .privmodes import PrivModes
|
||||
from .CSRConsts import MCAUSE_TRANSLATION
|
||||
|
||||
import typing
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from .ElfLoader import ElfInstruction
|
||||
|
||||
|
||||
class CpuTrapType(Enum):
|
||||
TIMER = 1
|
||||
SOFTWARE = 2
|
||||
EXTERNAL = 3
|
||||
EXCEPTION = 4
|
||||
|
||||
|
||||
class CpuTrap(BaseException):
|
||||
code: int
|
||||
"""
|
||||
31-bit value encoding the exception code in the mstatus register
|
||||
"""
|
||||
interrupt: int
|
||||
"""
|
||||
The isInterrupt bit in the mstatus register
|
||||
"""
|
||||
|
||||
mtval: int
|
||||
"""
|
||||
contents of the mtval register
|
||||
"""
|
||||
|
||||
type: CpuTrapType
|
||||
"""
|
||||
The type (timer, external, software) of the trap
|
||||
"""
|
||||
|
||||
priv: PrivModes
|
||||
"""
|
||||
The privilege level this trap targets
|
||||
"""
|
||||
|
||||
def __init__(self, code: int, mtval, type: CpuTrapType, priv: PrivModes = PrivModes.MACHINE):
|
||||
self.interrupt = 0 if type == CpuTrapType.EXCEPTION else 1
|
||||
self.code = code
|
||||
self.mtval = mtval
|
||||
self.priv = priv
|
||||
self.type = type
|
||||
|
||||
@property
|
||||
def mcause(self):
|
||||
return (self.interrupt << 31) + self.code
|
||||
|
||||
def __repr__(self):
|
||||
name = "Reserved interrupt({}, {})".format(self.interrupt, self.code)
|
||||
|
||||
if (self.interrupt, self.code) in MCAUSE_TRANSLATION:
|
||||
name = MCAUSE_TRANSLATION[(self.interrupt, self.code)] + "({}, {})".format(self.interrupt, self.code)
|
||||
|
||||
return "{} {{priv={}, type={}, mtval={:x}}}".format(
|
||||
name, self.priv.name, self.type.name, self.mtval
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
class IllegalInstructionTrap(CpuTrap):
|
||||
def __init__(self, ins: 'ElfInstruction'):
|
||||
super().__init__(2, ins.encoded, CpuTrapType.EXCEPTION)
|
||||
|
||||
|
||||
class InstructionAddressMisalignedTrap(CpuTrap):
|
||||
def __init__(self, addr: int):
|
||||
super().__init__(0, addr, CpuTrapType.EXCEPTION)
|
||||
|
||||
|
||||
class InstructionAccessFault(CpuTrap):
|
||||
def __init__(self, addr: int):
|
||||
super().__init__(1, addr, CpuTrapType.EXCEPTION)
|
||||
|
||||
|
||||
class TimerInterrupt(CpuTrap):
|
||||
def __init__(self):
|
||||
super().__init__(7, 0, CpuTrapType.TIMER)
|
||||
|
||||
|
||||
class EcallTrap(CpuTrap):
|
||||
def __init__(self, mode: PrivModes):
|
||||
super().__init__(mode.value + 8, 0, CpuTrapType.EXCEPTION)
|
@ -0,0 +1,125 @@
|
||||
"""
|
||||
Laods a memory image with debug information into memory
|
||||
"""
|
||||
|
||||
import json
|
||||
from functools import lru_cache
|
||||
from typing import Dict, List, Optional, TYPE_CHECKING
|
||||
|
||||
from .ElfLoader import ElfInstruction, ElfLoadedMemorySection, InstructionAccessFault, InstructionAddressMisalignedTrap
|
||||
from .PrivMMU import PrivMMU
|
||||
from ..Config import RunConfig
|
||||
from ..Executable import LoadedMemorySection, MemoryFlags
|
||||
from ..IO.IOModule import IOModule
|
||||
from ..colors import FMT_ERROR, FMT_NONE, FMT_MEM
|
||||
from ..decoder import decode
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
|
||||
class MemoryImageMMU(PrivMMU):
|
||||
io: List[IOModule]
|
||||
data: bytearray
|
||||
io_start: int
|
||||
debug_info: Dict[str, Dict[str, Dict[str, str]]]
|
||||
|
||||
def __init__(self, file_name: str, io_start: int = 0xFF0000):
|
||||
super(MemoryImageMMU, self).__init__(conf=RunConfig())
|
||||
|
||||
with open(file_name, 'rb') as memf:
|
||||
data = memf.read()
|
||||
with open(file_name + '.dbg', 'r') as dbgf:
|
||||
debug_info: Dict = json.load(dbgf)
|
||||
|
||||
self.data = bytearray(data)
|
||||
# TODO: super wasteful memory allocation happening here
|
||||
if len(data) < io_start:
|
||||
self.data += bytearray(io_start - len(data))
|
||||
self.debug_info = debug_info
|
||||
self.io_start = io_start
|
||||
self.io = list()
|
||||
|
||||
def get_entrypoint(self):
|
||||
try:
|
||||
start = self.debug_info['symbols']['kernel'].get('_start', None)
|
||||
if start is not None:
|
||||
return start
|
||||
return self.debug_info['symbols']['kernel'].get('_ftext')
|
||||
except KeyError:
|
||||
print(FMT_ERROR + '[MMU] cannot find kernel entry in debug information! Falling back to 0x100' + FMT_NONE)
|
||||
return 0x100
|
||||
|
||||
@lru_cache(maxsize=2048)
|
||||
def read_ins(self, addr: int) -> ElfInstruction:
|
||||
if addr >= self.io_start:
|
||||
raise InstructionAccessFault(addr)
|
||||
if addr % 4 != 0:
|
||||
raise InstructionAddressMisalignedTrap(addr)
|
||||
|
||||
return ElfInstruction(*decode(self.data[addr:addr + 4]))
|
||||
|
||||
def read(self, addr: int, size: int) -> bytearray:
|
||||
if addr < 0x100:
|
||||
pc = self.cpu.pc
|
||||
text_sec = self.get_sec_containing(pc)
|
||||
print(FMT_ERROR + "[MMU] possible null dereference (read {:x}) from (pc={:x},sec={},rel={:x})".format(
|
||||
addr, pc, text_sec.owner + ':' + text_sec.name, pc - text_sec.base
|
||||
) + FMT_NONE)
|
||||
if addr >= self.io_start:
|
||||
return self.io_at(addr).read(addr, size)
|
||||
return self.data[addr: addr + size]
|
||||
|
||||
def write(self, addr: int, size: int, data):
|
||||
if addr < 0x100:
|
||||
pc = self.cpu.pc
|
||||
text_sec = self.get_sec_containing(pc)
|
||||
print(FMT_ERROR + "[MMU] possible null dereference (write {:x}) from (pc={:x},sec={},rel={:x})".format(
|
||||
addr, pc, text_sec.owner + ':' + text_sec.name, pc - text_sec.base
|
||||
) + FMT_NONE)
|
||||
|
||||
if addr >= self.io_start:
|
||||
return self.io_at(addr).write(addr, data, size)
|
||||
self.data[addr:addr + size] = data[0:size]
|
||||
|
||||
def io_at(self, addr) -> IOModule:
|
||||
for mod in self.io:
|
||||
if mod.contains(addr):
|
||||
return mod
|
||||
raise InstructionAccessFault(addr)
|
||||
|
||||
def add_io(self, io: IOModule):
|
||||
self.io.append(io)
|
||||
|
||||
def __repr__(self):
|
||||
return "MemoryImageMMU()"
|
||||
|
||||
@lru_cache(maxsize=32)
|
||||
def get_sec_containing(self, addr: int) -> Optional[LoadedMemorySection]:
|
||||
next_sec = len(self.data)
|
||||
for sec_addr, name in reversed(self.debug_info['sections'].items()):
|
||||
if addr >= int(sec_addr):
|
||||
owner, name = name.split(':')
|
||||
base = int(sec_addr)
|
||||
size = next_sec - base
|
||||
flags = MemoryFlags('.text' in name, '.text' in name)
|
||||
return ElfLoadedMemorySection(name, base, size, self.data[base:next_sec], flags, owner)
|
||||
else:
|
||||
next_sec = int(sec_addr)
|
||||
|
||||
def translate_address(self, addr: int):
|
||||
sec = self.get_sec_containing(addr)
|
||||
if sec.name == '.empty':
|
||||
return "<empty>"
|
||||
symbs = self.debug_info['symbols'][sec.owner]
|
||||
for sym, val in reversed(symbs.items()):
|
||||
if addr >= val:
|
||||
return "{}{:+x} ({}:{})".format(sym, addr - val, sec.owner, sec.name)
|
||||
return "{}:{}{:+x}".format(sec.owner, sec.name, addr - sec.base)
|
||||
|
||||
def symbol(self, symb: str):
|
||||
print(FMT_MEM + "Looking up symbol {}".format(symb))
|
||||
for owner, symbs in self.debug_info['symbols'].items():
|
||||
if symb in symbs:
|
||||
print(" Hit in {}: {} = {}".format(owner, symb, symbs[symb]))
|
||||
print(FMT_NONE, end="")
|
@ -0,0 +1,246 @@
|
||||
"""
|
||||
RiscEmu (c) 2021 Anton Lydike
|
||||
|
||||
SPDX-License-Identifier: MIT
|
||||
"""
|
||||
import time
|
||||
|
||||
from riscemu.CPU import *
|
||||
from .CSR import CSR
|
||||
from .Exceptions import *
|
||||
from .PrivMMU import PrivMMU
|
||||
from .PrivRV32I import PrivRV32I
|
||||
from .privmodes import PrivModes
|
||||
from ..IO import TextIO
|
||||
from ..instructions import RV32A, RV32M
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from riscemu import Executable, LoadedExecutable, LoadedInstruction
|
||||
from riscemu.instructions.InstructionSet import InstructionSet
|
||||
|
||||
|
||||
class PrivCPU(CPU):
|
||||
"""
|
||||
This is a CPU that has different modes, instruction sets and registers.
|
||||
|
||||
It should support M and U Mode, but no U-Mode Traps.
|
||||
|
||||
This allows us to
|
||||
"""
|
||||
|
||||
csr: CSR
|
||||
"""
|
||||
Reference to the control and status registers
|
||||
"""
|
||||
|
||||
TIME_RESOLUTION_NS: int = 10000000
|
||||
"""
|
||||
controls the resolution of the time csr register (in nanoseconds)
|
||||
"""
|
||||
|
||||
INS_XLEN = 4
|
||||
"""
|
||||
Size of an instruction in memory. Should be 4, but since our loading code is shit, instruction take up
|
||||
the equivalent of "1 byte" (this is actually impossible)
|
||||
"""
|
||||
|
||||
def __init__(self, conf, mmu: PrivMMU):
|
||||
super().__init__(conf, [PrivRV32I, RV32M, RV32A])
|
||||
self.mode: PrivModes = PrivModes.MACHINE
|
||||
|
||||
mmu.set_cpu(self)
|
||||
self.pc = mmu.get_entrypoint()
|
||||
self.mmu = mmu
|
||||
|
||||
if hasattr(self.mmu, 'add_io'):
|
||||
self.mmu.add_io(TextIO.TextIO(0xff0000, 64))
|
||||
|
||||
self.syscall_int = None
|
||||
self.launch_debug = False
|
||||
self.pending_traps: List[CpuTrap] = list()
|
||||
|
||||
self._time_start = 0
|
||||
self._time_timecmp = 0
|
||||
self._time_interrupt_enabled = False
|
||||
|
||||
# performance counters
|
||||
self._perf_counters = list()
|
||||
|
||||
# init csr
|
||||
self._init_csr()
|
||||
|
||||
def _run(self, verbose=False):
|
||||
if self.pc <= 0:
|
||||
return False
|
||||
ins = None
|
||||
try:
|
||||
while not self.exit:
|
||||
self.step(verbose)
|
||||
except RiscemuBaseException as ex:
|
||||
if isinstance(ex, LaunchDebuggerException):
|
||||
self.launch_debug = True
|
||||
self.pc += self.INS_XLEN
|
||||
|
||||
if self.exit:
|
||||
print()
|
||||
print(FMT_CPU + "Program exited with code {}".format(self.exit_code) + FMT_NONE)
|
||||
sys.exit(self.exit_code)
|
||||
elif self.launch_debug:
|
||||
self.launch_debug = False
|
||||
launch_debug_session(self, self.mmu, self.regs,
|
||||
"Launching debugger:")
|
||||
if not self.active_debug:
|
||||
self._run(verbose)
|
||||
else:
|
||||
print()
|
||||
print(FMT_CPU + "Program stopped without exiting - perhaps you stopped the debugger?" + FMT_NONE)
|
||||
|
||||
def load(self, e: riscemu.Executable):
|
||||
raise NotImplementedError("Not supported!")
|
||||
|
||||
def run_loaded(self, le: 'riscemu.LoadedExecutable'):
|
||||
raise NotImplementedError("Not supported!")
|
||||
|
||||
def get_tokenizer(self, tokenizer_input):
|
||||
raise NotImplementedError("Not supported!")
|
||||
|
||||
def run(self, verbose: bool = False):
|
||||
print(FMT_CPU + '[CPU] Started running from 0x{:08X} ({})'.format(self.pc, "kernel") + FMT_NONE)
|
||||
self._time_start = time.perf_counter_ns() // self.TIME_RESOLUTION_NS
|
||||
self._run(self.conf.verbosity > 1)
|
||||
|
||||
def _init_csr(self):
|
||||
# set up CSR
|
||||
self.csr = CSR()
|
||||
self.csr.set('mhartid', 0) # core id
|
||||
# TODO: set correct value
|
||||
self.csr.set('mimpid', 0) # implementation id
|
||||
# set mxl to 1 (32 bit) and set bits for i and m isa
|
||||
self.csr.set('misa', (1 << 30) + (1 << 8) + (1 << 12)) # available ISA
|
||||
|
||||
# CSR write callbacks:
|
||||
|
||||
@self.csr.callback('halt')
|
||||
def halt(old: int, new: int):
|
||||
if new != 0:
|
||||
self.exit = True
|
||||
self.exit_code = new
|
||||
|
||||
@self.csr.callback('mstatus')
|
||||
def mstatus(old: int, new: int):
|
||||
pass
|
||||
|
||||
@self.csr.callback('mtimecmp')
|
||||
def mtimecmp(old, new):
|
||||
self._time_timecmp = (self.csr.get('mtimecmph') << 32) + new
|
||||
self._time_interrupt_enabled = True
|
||||
|
||||
@self.csr.callback('mtimecmph')
|
||||
def mtimecmph(old, new):
|
||||
self._time_timecmp = (new << 32) + self.csr.get('mtimecmp')
|
||||
self._time_interrupt_enabled = True
|
||||
|
||||
# virtual CSR registers:
|
||||
|
||||
@self.csr.virtual_register('time')
|
||||
def get_time():
|
||||
return (time.perf_counter_ns() // self.TIME_RESOLUTION_NS - self._time_start) & (2 ** 32 - 1)
|
||||
|
||||
@self.csr.virtual_register('timeh')
|
||||
def get_timeh():
|
||||
return (time.perf_counter_ns() // self.TIME_RESOLUTION_NS - self._time_start) >> 32
|
||||
|
||||
# add minstret and mcycle counters
|
||||
|
||||
def _handle_trap(self, trap: CpuTrap):
|
||||
# implement trap handling!
|
||||
self.pending_traps.append(trap)
|
||||
|
||||
def step(self, verbose=True):
|
||||
try:
|
||||
self.cycle += 1
|
||||
if self.cycle % 20 == 0:
|
||||
self._timer_step()
|
||||
self._check_interrupt()
|
||||
ins = self.mmu.read_ins(self.pc)
|
||||
if verbose and self.mode == PrivModes.USER:
|
||||
print(FMT_CPU + " Running 0x{:08X}:{} {}".format(self.pc, FMT_NONE, ins))
|
||||
self.run_instruction(ins)
|
||||
self.pc += self.INS_XLEN
|
||||
except CpuTrap as trap:
|
||||
self._handle_trap(trap)
|
||||
if trap.interrupt == 0 and not isinstance(trap, EcallTrap):
|
||||
print(FMT_CPU + "[CPU] Trap {} encountered at {} (0x{:x})".format(
|
||||
trap,
|
||||
self.mmu.translate_address(self.pc),
|
||||
self.pc
|
||||
) + FMT_NONE)
|
||||
if self.conf.debug_on_exception:
|
||||
raise LaunchDebuggerException()
|
||||
self.pc += self.INS_XLEN
|
||||
|
||||
def _timer_step(self):
|
||||
if not self._time_interrupt_enabled:
|
||||
return
|
||||
if self._time_timecmp <= (time.perf_counter_ns() // self.TIME_RESOLUTION_NS) - self._time_start:
|
||||
self.pending_traps.append(TimerInterrupt())
|
||||
self._time_interrupt_enabled = False
|
||||
|
||||
def _check_interrupt(self):
|
||||
if not (len(self.pending_traps) > 0 and self.csr.get_mstatus('mie')):
|
||||
return
|
||||
# select best interrupt
|
||||
# TODO: actually select based on the official ranking
|
||||
trap = self.pending_traps.pop() # use the most recent trap
|
||||
if self.conf.verbosity > 0:
|
||||
print(FMT_CPU + "[CPU] taking trap {}!".format(trap) + FMT_NONE)
|
||||
|
||||
if trap.priv != PrivModes.MACHINE:
|
||||
print(FMT_CPU + "[CPU] Trap not targeting machine mode encountered! - undefined behaviour!" + FMT_NONE)
|
||||
raise Exception("Undefined behaviour!")
|
||||
|
||||
if self.mode != PrivModes.USER:
|
||||
print(FMT_CPU + "[CPU] Trap triggered outside of user mode?!" + FMT_NONE)
|
||||
|
||||
self.csr.set_mstatus('mpie', self.csr.get_mstatus('mie'))
|
||||
self.csr.set_mstatus('mpp', self.mode.value)
|
||||
self.csr.set_mstatus('mie', 0)
|
||||
self.csr.set('mcause', trap.mcause)
|
||||
self.csr.set('mepc', self.pc - self.INS_XLEN)
|
||||
self.csr.set('mtval', trap.mtval)
|
||||
self.mode = trap.priv
|
||||
mtvec = self.csr.get('mtvec')
|
||||
if mtvec & 0b11 == 0:
|
||||
self.pc = mtvec
|
||||
if mtvec & 0b11 == 1:
|
||||
self.pc = (mtvec & 0b11111111111111111111111111111100) + (trap.code * 4)
|
||||
self.record_perf_profile()
|
||||
if len(self._perf_counters) % 100 == 0:
|
||||
self.show_perf()
|
||||
|
||||
def show_perf(self):
|
||||
timed = 0
|
||||
cycled = 0
|
||||
cps_list = list()
|
||||
|
||||
print(FMT_CPU + "[CPU] Performance overview:")
|
||||
for time_ns, cycle in self._perf_counters:
|
||||
if cycled == 0:
|
||||
cycled = cycle
|
||||
timed = time_ns
|
||||
continue
|
||||
cps = (cycle - cycled) / (time_ns - timed) * 1000000000
|
||||
|
||||
# print(" {:03d} cycles in {:08d}ns ({:.2f} cycles/s)".format(
|
||||
# cycle - cycled,
|
||||
# time_ns - timed,
|
||||
# cps
|
||||
# ))
|
||||
cycled = cycle
|
||||
timed = time_ns
|
||||
cps_list.append(cps)
|
||||
print(" on average {:.0f} instructions/s".format(sum(cps_list) / len(cps_list)) + FMT_NONE)
|
||||
self._perf_counters = list()
|
||||
|
||||
def record_perf_profile(self):
|
||||
self._perf_counters.append((time.perf_counter_ns(), self.cycle))
|
@ -0,0 +1,42 @@
|
||||
from ..MMU import *
|
||||
from abc import abstractmethod
|
||||
|
||||
import typing
|
||||
|
||||
from .ElfLoader import ElfExecutable
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from .PrivCPU import PrivCPU
|
||||
|
||||
|
||||
class PrivMMU(MMU):
|
||||
cpu: 'PrivCPU'
|
||||
|
||||
@abstractmethod
|
||||
def get_entrypoint(self) -> int:
|
||||
raise
|
||||
|
||||
def set_cpu(self, cpu: 'PrivCPU'):
|
||||
self.cpu = cpu
|
||||
|
||||
def translate_address(self, addr: int):
|
||||
return ""
|
||||
|
||||
|
||||
class LoadedElfMMU(PrivMMU):
|
||||
def __init__(self, elf: ElfExecutable):
|
||||
super().__init__(conf=RunConfig())
|
||||
self.entrypoint = elf.symbols['_start']
|
||||
|
||||
self.binaries.append(elf)
|
||||
for sec in elf.sections:
|
||||
self.sections.append(sec)
|
||||
|
||||
def load_bin(self, exe: Executable) -> LoadedExecutable:
|
||||
raise NotImplementedError("This is a privMMU, it's initialized with a single ElfExecutable!")
|
||||
|
||||
def allocate_section(self, name: str, req_size: int, flag: MemoryFlags):
|
||||
raise NotImplementedError("Not supported!")
|
||||
|
||||
def get_entrypoint(self):
|
||||
return self.entrypoint
|
@ -0,0 +1,168 @@
|
||||
"""
|
||||
RiscEmu (c) 2021 Anton Lydike
|
||||
|
||||
SPDX-License-Identifier: MIT
|
||||
"""
|
||||
|
||||
from ..instructions.RV32I import *
|
||||
from ..Exceptions import INS_NOT_IMPLEMENTED
|
||||
from .Exceptions import *
|
||||
from .privmodes import PrivModes
|
||||
from ..colors import FMT_CPU, FMT_NONE
|
||||
import typing
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from riscemu.priv.PrivCPU import PrivCPU
|
||||
|
||||
|
||||
class PrivRV32I(RV32I):
|
||||
cpu: 'PrivCPU'
|
||||
"""
|
||||
This is an extension of RV32I, written for the PrivCPU class
|
||||
"""
|
||||
|
||||
def instruction_csrrw(self, ins: 'LoadedInstruction'):
|
||||
rd, rs, csr_addr = self.parse_crs_ins(ins)
|
||||
old_val = None
|
||||
if rd != 'zero':
|
||||
self.cpu.csr.assert_can_read(self.cpu.mode, csr_addr)
|
||||
old_val = self.cpu.csr.get(csr_addr)
|
||||
if rs != 'zero':
|
||||
new_val = self.regs.get(rs)
|
||||
self.cpu.csr.assert_can_write(self.cpu.mode, csr_addr)
|
||||
self.cpu.csr.set(csr_addr, new_val)
|
||||
if old_val is not None:
|
||||
self.regs.set(rd, old_val)
|
||||
|
||||
def instruction_csrrs(self, ins: 'LoadedInstruction'):
|
||||
rd, rs, csr_addr = self.parse_crs_ins(ins)
|
||||
if rs != 'zero':
|
||||
# oh no, this should not happen!
|
||||
INS_NOT_IMPLEMENTED(ins)
|
||||
if rd != 'zero':
|
||||
self.cpu.csr.assert_can_read(self.cpu.mode, csr_addr)
|
||||
old_val = self.cpu.csr.get(csr_addr)
|
||||
self.regs.set(rd, old_val)
|
||||
|
||||
|
||||
def instruction_csrrc(self, ins: 'LoadedInstruction'):
|
||||
INS_NOT_IMPLEMENTED(ins)
|
||||
|
||||
def instruction_csrrsi(self, ins: 'LoadedInstruction'):
|
||||
INS_NOT_IMPLEMENTED(ins)
|
||||
|
||||
def instruction_csrrwi(self, ins: 'LoadedInstruction'):
|
||||
ASSERT_LEN(ins.args, 3)
|
||||
rd, imm, addr = ins.get_reg(0), ins.get_imm(1), ins.get_imm(2)
|
||||
if rd != 'zero':
|
||||
self.cpu.csr.assert_can_read(self.cpu.mode, addr)
|
||||
old_val = self.cpu.csr.get(addr)
|
||||
self.regs.set(rd, old_val)
|
||||
self.cpu.csr.assert_can_write(self.cpu.mode, addr)
|
||||
self.cpu.csr.set(addr, imm)
|
||||
|
||||
|
||||
def instruction_csrrci(self, ins: 'LoadedInstruction'):
|
||||
INS_NOT_IMPLEMENTED(ins)
|
||||
|
||||
def instruction_mret(self, ins: 'LoadedInstruction'):
|
||||
if self.cpu.mode != PrivModes.MACHINE:
|
||||
print("MRET not inside machine level code!")
|
||||
raise IllegalInstructionTrap(ins)
|
||||
# retore mie
|
||||
mpie = self.cpu.csr.get_mstatus('mpie')
|
||||
self.cpu.csr.set_mstatus('mie', mpie)
|
||||
# restore priv
|
||||
mpp = self.cpu.csr.get_mstatus('mpp')
|
||||
self.cpu.mode = PrivModes(mpp)
|
||||
# restore pc
|
||||
mepc = self.cpu.csr.get('mepc')
|
||||
self.cpu.pc = mepc - self.cpu.INS_XLEN
|
||||
|
||||
if self.cpu.conf.verbosity > 0:
|
||||
sec = self.mmu.get_sec_containing(mepc)
|
||||
if sec is not None:
|
||||
print(FMT_CPU + "[CPU] returning to mode {} in {} (0x{:x})".format(
|
||||
PrivModes(mpp).name,
|
||||
self.mmu.translate_address(mepc),
|
||||
mepc
|
||||
) + FMT_NONE)
|
||||
if self.cpu.conf.verbosity > 1:
|
||||
self.regs.dump_reg_a()
|
||||
|
||||
def instruction_uret(self, ins: 'LoadedInstruction'):
|
||||
raise IllegalInstructionTrap(ins)
|
||||
|
||||
def instruction_sret(self, ins: 'LoadedInstruction'):
|
||||
raise IllegalInstructionTrap(ins)
|
||||
|
||||
def instruction_scall(self, ins: 'LoadedInstruction'):
|
||||
"""
|
||||
Overwrite the scall from userspace RV32I
|
||||
"""
|
||||
raise EcallTrap(self.cpu.mode)
|
||||
|
||||
def instruction_beq(self, ins: 'LoadedInstruction'):
|
||||
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
|
||||
if rs1 == rs2:
|
||||
self.pc += dst - 4
|
||||
|
||||
def instruction_bne(self, ins: 'LoadedInstruction'):
|
||||
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
|
||||
if rs1 != rs2:
|
||||
self.pc += dst - 4
|
||||
|
||||
def instruction_blt(self, ins: 'LoadedInstruction'):
|
||||
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
|
||||
if rs1 < rs2:
|
||||
self.pc += dst - 4
|
||||
|
||||
def instruction_bge(self, ins: 'LoadedInstruction'):
|
||||
rs1, rs2, dst = self.parse_rs_rs_imm(ins)
|
||||
if rs1 >= rs2:
|
||||
self.pc += dst - 4
|
||||
|
||||
def instruction_bltu(self, ins: 'LoadedInstruction'):
|
||||
rs1, rs2, dst = self.parse_rs_rs_imm(ins, signed=False)
|
||||
if rs1 < rs2:
|
||||
self.pc += dst - 4
|
||||
|
||||
def instruction_bgeu(self, ins: 'LoadedInstruction'):
|
||||
rs1, rs2, dst = self.parse_rs_rs_imm(ins, signed=False)
|
||||
if rs1 >= rs2:
|
||||
self.pc += dst - 4
|
||||
|
||||
# technically deprecated
|
||||
def instruction_j(self, ins: 'LoadedInstruction'):
|
||||
raise NotImplementedError("Should never be reached!")
|
||||
|
||||
def instruction_jal(self, ins: 'LoadedInstruction'):
|
||||
ASSERT_LEN(ins.args, 2)
|
||||
reg = ins.get_reg(0)
|
||||
addr = ins.get_imm(1)
|
||||
if reg == 'ra' and self.cpu.mode == PrivModes.USER and self.cpu.conf.verbosity > 1:
|
||||
print(FMT_CPU + 'Jumping to {} (0x{:x})'.format(
|
||||
self.mmu.translate_address(self.pc + addr),
|
||||
self.pc + addr
|
||||
) + FMT_NONE)
|
||||
self.regs.set(reg, self.pc)
|
||||
self.pc += addr - 4
|
||||
|
||||
def instruction_jalr(self, ins: 'LoadedInstruction'):
|
||||
ASSERT_LEN(ins.args, 3)
|
||||
rd, rs, imm = self.parse_rd_rs_imm(ins)
|
||||
self.regs.set(rd, self.pc)
|
||||
self.pc = rs + imm - 4
|
||||
|
||||
def instruction_sbreak(self, ins: 'LoadedInstruction'):
|
||||
raise LaunchDebuggerException()
|
||||
|
||||
def parse_crs_ins(self, ins: 'LoadedInstruction'):
|
||||
ASSERT_LEN(ins.args, 3)
|
||||
return ins.get_reg(0), ins.get_reg(1), ins.get_imm(2)
|
||||
|
||||
def parse_mem_ins(self, ins: 'LoadedInstruction') -> Tuple[str, int]:
|
||||
ASSERT_LEN(ins.args, 3)
|
||||
addr = self.get_reg_content(ins, 1) + ins.get_imm(2)
|
||||
reg = ins.get_reg(0)
|
||||
return reg, addr
|
@ -0,0 +1,15 @@
|
||||
"""
|
||||
RiscEmu (c) 2021 Anton Lydike
|
||||
|
||||
SPDX-License-Identifier: MIT
|
||||
|
||||
The priv Module holds everything necessary for emulating privileged risc-v assembly
|
||||
|
||||
|
||||
Running priv is only preferable to the normal userspace emulator, if you actually want to emulate the whole system.
|
||||
|
||||
Syscalls will have to be intercepted by your assembly code.
|
||||
|
||||
|
||||
The PrivCPU Implements the Risc-V M/U Model, meaning there is machine mode and user mode. No PMP or paging is available.
|
||||
"""
|
@ -0,0 +1,35 @@
|
||||
from .PrivCPU import PrivCPU, RunConfig
|
||||
from .ImageLoader import MemoryImageMMU
|
||||
from .PrivMMU import LoadedElfMMU
|
||||
from .ElfLoader import ElfExecutable
|
||||
|
||||
import sys
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description='RISC-V privileged architecture emulator', prog='riscemu')
|
||||
|
||||
parser.add_argument('--kernel', type=str, help='Kernel elf loaded with user programs', nargs='?')
|
||||
parser.add_argument('--image', type=str, help='Memory image containing kernel', nargs='?')
|
||||
parser.add_argument('--debug-exceptions', help='Launch the interactive debugger when an exception is generated', action='store_true')
|
||||
|
||||
parser.add_argument('-v', '--verbose', help="Verbosity level (can be used multiple times)", action='count', default=0)
|
||||
|
||||
args = parser.parse_args()
|
||||
mmu = None
|
||||
|
||||
if args.kernel is not None:
|
||||
mmu = LoadedElfMMU(ElfExecutable(args.kernel))
|
||||
elif args.image is not None:
|
||||
mmu = MemoryImageMMU(args.image)
|
||||
|
||||
if mmu is None:
|
||||
print("You must specify one of --kernel or --image for running in privilege mode!")
|
||||
sys.exit(1)
|
||||
|
||||
cpu = PrivCPU(RunConfig(verbosity=args.verbose, debug_on_exception=args.debug_exceptions), mmu)
|
||||
cpu.run()
|
||||
|
||||
|
||||
|
@ -0,0 +1,7 @@
|
||||
from enum import IntEnum
|
||||
|
||||
|
||||
class PrivModes(IntEnum):
|
||||
USER = 0
|
||||
SUPER = 1
|
||||
MACHINE = 3
|
Loading…
Reference in New Issue