You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
11 KiB
Python

from dataclasses import dataclass
import json
import re
import sys
from typing import Dict, Tuple
from math import ceil, log10
import datetime
START_OF_REVIEWS = '## The actual reviews:'
# helper classes and functions
@dataclass
class LexingContext:
sources: Dict[str,str]
def get_nth_line_bounds(self, source_name: str, n: int):
if source_name not in self.sources:
raise KeyError("Unknown source file \"{}\"!".format(source_name))
start = 0
source = self.sources[source_name]
for i in range(n):
next_start = source.find('\n', start)
if next_start == -1:
return None
start = next_start + 1
return start, source.find('\n', start)
def get_lines_containing(self, span: 'Span'):
if span.source_name not in self.sources:
raise KeyError("Unknown source file \"{}\"!".format(span.source_name))
start = 0
line_no = 0
source = self.sources[span.source_name]
while True:
next_start = source.find('\n', start)
line_no += 1
# handle eof
if next_start == -1:
return None
# as long as the next newline comes before the spans start we are good
if next_start < span.start:
start = next_start + 1
continue
# if the whole span is on one line, we are good as well
if next_start >= span.end:
return [ source[start:next_start] ], start, line_no
while next_start < span.end:
next_start = source.find('\n', next_start+1)
return source[start:next_start].split('\n'), start, line_no
@dataclass(frozen=True)
class Span:
start: int
"""
Start of tokens location in source file, global byte offset in file
"""
end: int
"""
End of tokens location in source file, global byte offset in file
"""
source_name: str
context: LexingContext
def union(self, *spans: 'Span'):
for span in spans:
assert span.source_name == self.source_name
assert span.context == self.context
return Span(
start=min(self.start, *(span.start for span in spans)),
end=max(self.end, *(span.end for span in spans)),
source_name=self.source_name,
context=self.context
)
def transform(self, start:int=0, end:int=0):
return Span(self.start + start, self.end + end, self.source_name, self.context)
def __repr__(self):
return "{}(start={},end={},source_name={})".format(
self.__class__.__name__,
self.start, self.end, self.source_name
)
def create_span_context_str(span: Span, message: str, color: str = '\033[31m'):
lines, offset_into_file, line_no = span.context.get_lines_containing(span)
relative_offset = span.start - offset_into_file
annotation_len = span.end - span.start
digit_len = ceil(log10(line_no + len(lines)))
if digit_len == 0:
digit_len = 1
output_str = ">>> In file {}:{}\n".format(span.source_name, line_no)
for i, source_line in enumerate(lines):
source_line = source_line[:relative_offset] + color + source_line[relative_offset:relative_offset+annotation_len] + '\033[0m' + source_line[relative_offset+annotation_len:]
output_str += '{:>{}d}: {}\n'.format(line_no + i, digit_len, source_line)
if relative_offset > len(source_line):
continue
# TODO: handle multi-line underlines
output_str += "{}{}{}{}\n".format(
color,
' ' * (relative_offset + digit_len + 2),
'^' * min(annotation_len, len(source_line) - relative_offset),
'\033[0m'
)
if annotation_len > len(source_line) - relative_offset:
relative_offset = 0
annotation_len -= len(source_line) - relative_offset
if message:
output_str += color
output_str += ' ' * (relative_offset + digit_len + 2) + '|\n'
for message_line in message.split("\n"):
output_str += ' ' * (relative_offset + digit_len + 2) + message_line + '\n'
return output_str + '\033[0m'
def print_warning(span: Span, message: str, color="\033[33m"):
print(create_span_context_str(span, "Warning: " + message, color))
class ParseError(Exception):
span: Span
message: str
def __init__(self, msg: str, span: Span=None) -> None:
super().__init__((msg, span))
self.span = span
self.message = msg
def print_context_message(self):
if not self.span:
print("\n".join(">>> {}".format(line) for line in self.message.split('\n')))
else:
print(create_span_context_str(self.span, self.message))
class EndOfInputError(ParseError):
def __init__(self,span: Span, search_str:str = None) -> None:
if search_str:
super().__init__(f"Unexpected end-of-input in {span.source_name} while scanning for {search_str}!", span)
else:
super().__init__(f"Unexpected end-of-input in {span.source_name}!", span)
def to_json_field_name(field_name: str) -> str:
return re.sub(r'[^\w\d]+', '_', field_name).lower().strip('_')
## parser
class MarkdownBlogParser:
def __init__(self, source: str) -> None:
self.fname = source
with open(source, 'r') as f:
self.content = f.read()
self.pos = self.content.index(START_OF_REVIEWS) + len(START_OF_REVIEWS)
self.context = LexingContext({source: self.content})
self.size = len(self.content)
self.reviews = []
self.consume_whitespace()
def peek(self, offset: int = 0):
if self.pos + offset >= self.size:
return None
return self.content[self.pos + offset]
def startswith(self, *patterns: str, offset: int = 0):
# match longest first
for pattern in sorted(patterns, key=len, reverse=True):
if self.content.startswith(pattern, self.pos + offset):
return pattern
return False
def consume_whitespace(self):
while self.pos < self.size and self.content[self.pos] in '\n\r\t ':
self.pos += 1
if self.pos == self.size:
raise EndOfInputError(Span(self.pos-1, self.pos, self.fname, self.context), "Whitespace")
def read_until(self, pattern: str, inclusive=True) -> Tuple[str, Span]:
start = self.pos
pos = self.pos
while pos < self.size and not self.content[pos:].startswith(pattern):
pos += 1
if pos == self.size:
raise EndOfInputError(Span(start, pos, self.fname, self.context), pattern)
if inclusive:
pos += len(pattern)
self.pos = pos
return self.content[start:pos], Span(start, pos, self.fname, self.context)
def parse(self):
line, span = self.read_until('\n', inclusive=True)
result = re.fullmatch(r'### ([\w\s&-/!]+)\s+("[^"]+")[ \t]*(\([^)]+\))?\n', line)
if not result:
raise ParseError("Expected review heading of form '### Company \"pesto name\" (variant)\n'", span.transform(end=-1))
# now we get the first bit of info!
company, name, variant = (result.group(x) for x in (1,2,3))
self.current_review = {
'company': company,
'name': name.strip()[1:-1],
'variant': variant.strip()[1:-1] if variant else None,
}
if 'template' in line.lower():
return self.reviews
# parse inner review fields
while self.inner_review_parse():
pass
# add review to global list
self.reviews.append(self.current_review)
# and next review!
return self.parse()
def inner_review_parse(self):
# read until next thing
self.consume_whitespace()
if self.startswith('### '):
return None
# we are done!
# we have an item:
if self.startswith('*'):
token = '*'
if self.startswith('**'):
token = '**'
self.pos += len(token)
title, span = self.read_until(token, False)
self.pos += len(token)
if title[-1] != ':':
raise ParseError("Expected field declaration like '*Date:*'", span)
field_name = to_json_field_name(title)
value, span = self.read_until('\n\n')
self.current_review[field_name] = value.strip()
return True
# we have a table! how exciting!
if self.startswith('|'):
# skip headers
headers, span = self.read_until('\n')
headers = headers.split('|')
if not len(headers) == 4:
raise ParseError("Expected table header here (like '|Category | Rating / Score |'", span.transform(end=-1))
table_name = to_json_field_name(headers[2])
# skip alignment col
line, span = self.read_until('\n')
if not len(line.split('|')) == len(headers):
raise ParseError("Alignment row seems invalid, must contain the same number of '|' as headers!", span.transform(end=-1))
values = dict()
while self.peek() == '|':
line, span = self.read_until('\n')
line = line.split('|')
if len(line) != len(headers):
raise ParseError("Content row seems invalid, must contain the same number of '|' as headers!", span.transform(end=-1))
values[to_json_field_name(line[1])] = line[2].strip()
self.current_review[table_name] = values
return True
raise ParseError("Unexpected input!", Span(self.pos, self.pos+1, self.fname, self.context))
class ReviewPostprocessor:
def __init__(self) -> None:
pass
def process_all(self, dicts):
return [
self.process(d) for d in dicts
]
def process(self, review: dict) -> dict:
def noop(input):
return input
return {
field: getattr(self, field, noop)(value)
for field, value in review.items()
}
def ingredients(self, ingredients: str):
return [
x.strip() for x in ingredients.rstrip('.').split(',')
]
def images(self, images: str):
return [
x.strip() for x in images.split(',')
]
def rating_value(self, table: Dict[str, str]):
new = dict()
for key, value in table.items():
new[key] = value
if '/' in value:
x,y = value.split('/')
new[key + '_percent'] = float(x) / float(y)
return new
def final_verdict(self, verdict: str):
return {
'string': verdict,
'value': verdict.count('') / len(verdict)
}
if __name__ == '__main__':
parser = MarkdownBlogParser('blog.md')
try:
reviews = ReviewPostprocessor().process_all(parser.parse())
with open("reviews.json", 'w') as f:
json.dump({
'reviews': sorted(reviews, key=lambda review: tuple((review[key] or '').lower() for key in ('company', 'name', 'variant'))),
'created': str(datetime.date.today())
}, f, indent=2)
except ParseError as err:
err.print_context_message()
sys.exit(1)