from dataclasses import dataclass import json import re from typing import Dict, Tuple from math import ceil, log10 import datetime START_OF_REVIEWS = '## The actual reviews:' # helper classes and functions @dataclass class LexingContext: sources: Dict[str,str] def get_nth_line_bounds(self, source_name: str, n: int): if source_name not in self.sources: raise KeyError("Unknown source file \"{}\"!".format(source_name)) start = 0 source = self.sources[source_name] for i in range(n): next_start = source.find('\n', start) if next_start == -1: return None start = next_start + 1 return start, source.find('\n', start) def get_lines_containing(self, span: 'Span'): if span.source_name not in self.sources: raise KeyError("Unknown source file \"{}\"!".format(span.source_name)) start = 0 line_no = 0 source = self.sources[span.source_name] while True: next_start = source.find('\n', start) line_no += 1 # handle eof if next_start == -1: return None # as long as the next newline comes before the spans start we are good if next_start < span.start: start = next_start + 1 continue # if the whole span is on one line, we are good as well if next_start >= span.end: return [ source[start:next_start] ], start, line_no while next_start < span.end: next_start = source.find('\n', next_start+1) return source[start:next_start].split('\n'), start, line_no @dataclass(frozen=True) class Span: start: int """ Start of tokens location in source file, global byte offset in file """ end: int """ End of tokens location in source file, global byte offset in file """ source_name: str context: LexingContext def union(self, *spans: 'Span'): for span in spans: assert span.source_name == self.source_name assert span.context == self.context return Span( start=min(self.start, *(span.start for span in spans)), end=max(self.end, *(span.end for span in spans)), source_name=self.source_name, context=self.context ) def transform(self, start:int=0, end:int=0): return Span(self.start + start, self.end + end, self.source_name, self.context) def __repr__(self): return "{}(start={},end={},source_name={})".format( self.__class__.__name__, self.start, self.end, self.source_name ) def create_span_context_str(span: Span, message: str, color: str = '\033[31m'): lines, offset_into_file, line_no = span.context.get_lines_containing(span) relative_offset = span.start - offset_into_file annotation_len = span.end - span.start digit_len = ceil(log10(line_no + len(lines))) if digit_len == 0: digit_len = 1 output_str = ">>> In file {}:{}\n".format(span.source_name, line_no) for i, source_line in enumerate(lines): source_line = source_line[:relative_offset] + color + source_line[relative_offset:relative_offset+annotation_len] + '\033[0m' + source_line[relative_offset+annotation_len:] output_str += '{:>{}d}: {}\n'.format(line_no + i, digit_len, source_line) if relative_offset > len(source_line): continue # TODO: handle multi-line underlines output_str += "{}{}{}{}\n".format( color, ' ' * (relative_offset + digit_len + 2), '^' * min(annotation_len, len(source_line) - relative_offset), '\033[0m' ) if annotation_len > len(source_line) - relative_offset: relative_offset = 0 annotation_len -= len(source_line) - relative_offset if message: output_str += color output_str += ' ' * (relative_offset + digit_len + 2) + '|\n' for message_line in message.split("\n"): output_str += ' ' * (relative_offset + digit_len + 2) + message_line + '\n' return output_str + '\033[0m' def print_warning(span: Span, message: str, color="\033[33m"): print(create_span_context_str(span, "Warning: " + message, color)) class ParseError(Exception): span: Span message: str def __init__(self, msg: str, span: Span=None) -> None: super().__init__((msg, span)) self.span = span self.message = msg def print_context_message(self): if not self.span: print("\n".join(">>> {}".format(line) for line in self.message.split('\n'))) else: print(create_span_context_str(self.span, self.message)) class EndOfInputError(ParseError): def __init__(self,span: Span, search_str:str = None) -> None: if search_str: super().__init__(f"Unexpected end-of-input in {span.source_name} while scanning for {search_str}!", span) else: super().__init__(f"Unexpected end-of-input in {span.source_name}!", span) def to_json_field_name(field_name: str) -> str: return re.sub(r'[^\w\d]+', '_', field_name).lower().strip('_') ## parser class MarkdownBlogParser: def __init__(self, source: str) -> None: self.fname = source with open(source, 'r') as f: self.content = f.read() self.pos = self.content.index(START_OF_REVIEWS) + len(START_OF_REVIEWS) self.context = LexingContext({source: self.content}) self.size = len(self.content) self.reviews = [] self.consume_whitespace() def peek(self, offset: int = 0): if self.pos + offset >= self.size: return None return self.content[self.pos + offset] def startswith(self, *patterns: str, offset: int = 0): # match longest first for pattern in sorted(patterns, key=len, reverse=True): if self.content.startswith(pattern, self.pos + offset): return pattern return False def consume_whitespace(self): while self.pos < self.size and self.content[self.pos] in '\n\r\t ': self.pos += 1 if self.pos == self.size: raise EndOfInputError(Span(self.pos-1, self.pos, self.fname, self.context), "Whitespace") def read_until(self, pattern: str, inclusive=True) -> Tuple[str, Span]: start = self.pos pos = self.pos while pos < self.size and not self.content[pos:].startswith(pattern): pos += 1 if pos == self.size: raise EndOfInputError(Span(start, pos, self.fname, self.context), pattern) if inclusive: pos += len(pattern) self.pos = pos return self.content[start:pos], Span(start, pos, self.fname, self.context) def parse(self): line, span = self.read_until('\n', inclusive=True) result = re.fullmatch(r'### ([\w\s]+)\s+("[^"]+")[ \t]*(\([^)]+\))?\n', line) if not result: raise ParseError("Expected review heading of form '### Company \"pesto name\" (variant)\n'", span.transform(end=-1)) # now we get the first bit of info! company, name, variant = (result.group(x) for x in (1,2,3)) self.current_review = { 'company': company, 'name': name.strip()[1:-1], 'variant': variant.strip()[1:-1] if variant else None, } if 'template' in line.lower(): return self.reviews # parse inner review fields while self.inner_review_parse(): pass # add review to global list self.reviews.append(self.current_review) # and next review! return self.parse() def inner_review_parse(self): # read until next thing self.consume_whitespace() if self.startswith('### '): return None # we are done! # we have an item: if self.startswith('*'): token = '*' if self.startswith('**'): token = '**' self.pos += len(token) title, span = self.read_until(token, False) self.pos += len(token) if title[-1] != ':': raise ParseError("Expected field declaration like '*Date:*'", span) field_name = to_json_field_name(title) value, span = self.read_until('\n\n') self.current_review[field_name] = value.strip() return True # we have a table! how exciting! if self.startswith('|'): # skip headers headers, span = self.read_until('\n') headers = headers.split('|') if not len(headers) == 4: raise ParseError("Expected table header here (like '|Category | Rating / Score |'", span.transform(end=-1)) table_name = to_json_field_name(headers[2]) # skip alignment col line, span = self.read_until('\n') if not len(line.split('|')) == len(headers): raise ParseError("Alignment row seems invalid, must contain the same number of '|' as headers!", span.transform(end=-1)) values = dict() while self.peek() == '|': line, span = self.read_until('\n') line = line.split('|') if len(line) != len(headers): raise ParseError("Content row seems invalid, must contain the same number of '|' as headers!", span.transform(end=-1)) values[to_json_field_name(line[1])] = line[2].strip() self.current_review[table_name] = values return True raise ParseError("Unexpected input!", Span(self.pos, self.pos+1, self.fname, self.context)) class ReviewPostprocessor: def __init__(self) -> None: pass def process_all(self, dicts): return [ self.process(d) for d in dicts ] def process(self, review: dict) -> dict: def noop(input): return input return { field: getattr(self, field, noop)(value) for field, value in review.items() } def ingredients(self, ingredients: str): return [ x.strip() for x in ingredients.rstrip('.').split(',') ] def rating_value(self, table: Dict[str, str]): new = dict() for key, value in table.items(): new[key] = value if '/' in value: x,y = value.split('/') new[key + '_percent'] = float(x) / float(y) return new def final_verdict(self, verdict: str): return { 'string': verdict, 'value': verdict.count('★') / len(verdict) } if __name__ == '__main__': parser = MarkdownBlogParser('blog.md') try: reviews = ReviewPostprocessor().process_all(parser.parse()) with open("reviews.json", 'w') as f: json.dump({ 'reviews': reviews, 'created': str(datetime.date.today()) }, f, indent=2) except ParseError as err: err.print_context_message()