You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
11 KiB
Python

2 years ago
from dataclasses import dataclass
import json
import re
import sys
2 years ago
from typing import Dict, Tuple
from math import ceil, log10
import datetime
START_OF_REVIEWS = '## The actual reviews:'
# helper classes and functions
@dataclass
class LexingContext:
sources: Dict[str,str]
def get_nth_line_bounds(self, source_name: str, n: int):
if source_name not in self.sources:
raise KeyError("Unknown source file \"{}\"!".format(source_name))
start = 0
source = self.sources[source_name]
for i in range(n):
next_start = source.find('\n', start)
if next_start == -1:
return None
start = next_start + 1
return start, source.find('\n', start)
def get_lines_containing(self, span: 'Span'):
if span.source_name not in self.sources:
raise KeyError("Unknown source file \"{}\"!".format(span.source_name))
start = 0
line_no = 0
source = self.sources[span.source_name]
while True:
next_start = source.find('\n', start)
line_no += 1
# handle eof
if next_start == -1:
return None
# as long as the next newline comes before the spans start we are good
if next_start < span.start:
start = next_start + 1
continue
# if the whole span is on one line, we are good as well
if next_start >= span.end:
return [ source[start:next_start] ], start, line_no
while next_start < span.end:
next_start = source.find('\n', next_start+1)
return source[start:next_start].split('\n'), start, line_no
@dataclass(frozen=True)
class Span:
start: int
"""
Start of tokens location in source file, global byte offset in file
"""
end: int
"""
End of tokens location in source file, global byte offset in file
"""
source_name: str
context: LexingContext
def union(self, *spans: 'Span'):
for span in spans:
assert span.source_name == self.source_name
assert span.context == self.context
return Span(
start=min(self.start, *(span.start for span in spans)),
end=max(self.end, *(span.end for span in spans)),
source_name=self.source_name,
context=self.context
)
def transform(self, start:int=0, end:int=0):
return Span(self.start + start, self.end + end, self.source_name, self.context)
def __repr__(self):
return "{}(start={},end={},source_name={})".format(
self.__class__.__name__,
self.start, self.end, self.source_name
)
def create_span_context_str(span: Span, message: str, color: str = '\033[31m'):
lines, offset_into_file, line_no = span.context.get_lines_containing(span)
relative_offset = span.start - offset_into_file
annotation_len = span.end - span.start
digit_len = ceil(log10(line_no + len(lines)))
if digit_len == 0:
digit_len = 1
output_str = ">>> In file {}:{}\n".format(span.source_name, line_no)
for i, source_line in enumerate(lines):
source_line = source_line[:relative_offset] + color + source_line[relative_offset:relative_offset+annotation_len] + '\033[0m' + source_line[relative_offset+annotation_len:]
output_str += '{:>{}d}: {}\n'.format(line_no + i, digit_len, source_line)
if relative_offset > len(source_line):
continue
# TODO: handle multi-line underlines
output_str += "{}{}{}{}\n".format(
color,
' ' * (relative_offset + digit_len + 2),
'^' * min(annotation_len, len(source_line) - relative_offset),
'\033[0m'
)
if annotation_len > len(source_line) - relative_offset:
relative_offset = 0
annotation_len -= len(source_line) - relative_offset
if message:
output_str += color
output_str += ' ' * (relative_offset + digit_len + 2) + '|\n'
for message_line in message.split("\n"):
output_str += ' ' * (relative_offset + digit_len + 2) + message_line + '\n'
return output_str + '\033[0m'
def print_warning(span: Span, message: str, color="\033[33m"):
print(create_span_context_str(span, "Warning: " + message, color))
class ParseError(Exception):
span: Span
message: str
def __init__(self, msg: str, span: Span=None) -> None:
super().__init__((msg, span))
self.span = span
self.message = msg
def print_context_message(self):
if not self.span:
print("\n".join(">>> {}".format(line) for line in self.message.split('\n')))
else:
print(create_span_context_str(self.span, self.message))
class EndOfInputError(ParseError):
def __init__(self,span: Span, search_str:str = None) -> None:
if search_str:
super().__init__(f"Unexpected end-of-input in {span.source_name} while scanning for {search_str}!", span)
else:
super().__init__(f"Unexpected end-of-input in {span.source_name}!", span)
def to_json_field_name(field_name: str) -> str:
return re.sub(r'[^\w\d]+', '_', field_name).lower().strip('_')
## parser
class MarkdownBlogParser:
def __init__(self, source: str) -> None:
self.fname = source
with open(source, 'r') as f:
self.content = f.read()
self.pos = self.content.index(START_OF_REVIEWS) + len(START_OF_REVIEWS)
self.context = LexingContext({source: self.content})
self.size = len(self.content)
self.reviews = []
self.consume_whitespace()
def peek(self, offset: int = 0):
if self.pos + offset >= self.size:
return None
return self.content[self.pos + offset]
def startswith(self, *patterns: str, offset: int = 0):
# match longest first
for pattern in sorted(patterns, key=len, reverse=True):
if self.content.startswith(pattern, self.pos + offset):
return pattern
return False
def consume_whitespace(self):
while self.pos < self.size and self.content[self.pos] in '\n\r\t ':
self.pos += 1
if self.pos == self.size:
raise EndOfInputError(Span(self.pos-1, self.pos, self.fname, self.context), "Whitespace")
def read_until(self, pattern: str, inclusive=True) -> Tuple[str, Span]:
start = self.pos
pos = self.pos
while pos < self.size and not self.content[pos:].startswith(pattern):
pos += 1
if pos == self.size:
raise EndOfInputError(Span(start, pos, self.fname, self.context), pattern)
if inclusive:
pos += len(pattern)
self.pos = pos
return self.content[start:pos], Span(start, pos, self.fname, self.context)
def parse(self):
line, span = self.read_until('\n', inclusive=True)
result = re.fullmatch(r'### ([\w\s&-/]+)\s+("[^"]+")[ \t]*(\([^)]+\))?\n', line)
2 years ago
if not result:
raise ParseError("Expected review heading of form '### Company \"pesto name\" (variant)\n'", span.transform(end=-1))
# now we get the first bit of info!
company, name, variant = (result.group(x) for x in (1,2,3))
self.current_review = {
'company': company,
'name': name.strip()[1:-1],
'variant': variant.strip()[1:-1] if variant else None,
}
if 'template' in line.lower():
return self.reviews
# parse inner review fields
while self.inner_review_parse():
pass
# add review to global list
self.reviews.append(self.current_review)
# and next review!
return self.parse()
def inner_review_parse(self):
# read until next thing
self.consume_whitespace()
if self.startswith('### '):
return None
# we are done!
# we have an item:
if self.startswith('*'):
token = '*'
if self.startswith('**'):
token = '**'
self.pos += len(token)
title, span = self.read_until(token, False)
self.pos += len(token)
if title[-1] != ':':
raise ParseError("Expected field declaration like '*Date:*'", span)
field_name = to_json_field_name(title)
value, span = self.read_until('\n\n')
self.current_review[field_name] = value.strip()
return True
# we have a table! how exciting!
if self.startswith('|'):
# skip headers
headers, span = self.read_until('\n')
headers = headers.split('|')
if not len(headers) == 4:
raise ParseError("Expected table header here (like '|Category | Rating / Score |'", span.transform(end=-1))
table_name = to_json_field_name(headers[2])
# skip alignment col
line, span = self.read_until('\n')
if not len(line.split('|')) == len(headers):
raise ParseError("Alignment row seems invalid, must contain the same number of '|' as headers!", span.transform(end=-1))
values = dict()
while self.peek() == '|':
line, span = self.read_until('\n')
line = line.split('|')
if len(line) != len(headers):
raise ParseError("Content row seems invalid, must contain the same number of '|' as headers!", span.transform(end=-1))
values[to_json_field_name(line[1])] = line[2].strip()
self.current_review[table_name] = values
return True
raise ParseError("Unexpected input!", Span(self.pos, self.pos+1, self.fname, self.context))
class ReviewPostprocessor:
def __init__(self) -> None:
pass
def process_all(self, dicts):
return [
self.process(d) for d in dicts
]
def process(self, review: dict) -> dict:
def noop(input):
return input
return {
field: getattr(self, field, noop)(value)
for field, value in review.items()
}
def ingredients(self, ingredients: str):
return [
x.strip() for x in ingredients.rstrip('.').split(',')
]
def images(self, images: str):
return [
x.strip() for x in images.split(',')
]
2 years ago
def rating_value(self, table: Dict[str, str]):
new = dict()
for key, value in table.items():
new[key] = value
if '/' in value:
x,y = value.split('/')
new[key + '_percent'] = float(x) / float(y)
return new
def final_verdict(self, verdict: str):
return {
'string': verdict,
'value': verdict.count('') / len(verdict)
}
if __name__ == '__main__':
parser = MarkdownBlogParser('blog.md')
try:
reviews = ReviewPostprocessor().process_all(parser.parse())
with open("reviews.json", 'w') as f:
json.dump({
'reviews': sorted(reviews, key=lambda review: tuple((review[key] or '').lower() for key in ('company', 'name', 'variant'))),
2 years ago
'created': str(datetime.date.today())
}, f, indent=2)
except ParseError as err:
err.print_context_message()
sys.exit(1)
2 years ago