"My name is Earley" from .utils import classify class MatchFailed(object): pass class AbortParseMatch(Exception): pass class Rule(object): def __init__(self, name, symbols, postprocess): self.name = name self.symbols = symbols self.postprocess = postprocess class State(object): def __init__(self, rule, expect, reference, data=None): self.rule = rule self.expect = expect self.reference = reference self.data = data or [] self.is_complete = (self.expect == len(self.rule.symbols)) if not self.is_complete: self.expect_symbol = self.rule.symbols[self.expect] self.is_literal = isinstance(self.expect_symbol, dict) if self.is_literal: self.expect_symbol = self.expect_symbol['literal'] assert isinstance(self.expect_symbol, (str, unicode)), self.expect_symbol def next_state(self, data): return State(self.rule, self.expect+1, self.reference, self.data + [data]) def consume_terminal(self, inp): if not self.is_complete and self.is_literal: # PORT: originally tests regexp if self.expect_symbol == inp.type: return self.next_state(inp) def consume_nonterminal(self, inp): if not self.is_complete and not self.is_literal: if self.expect_symbol == inp: return self.next_state(inp) def process(self, location, ind, table, rules, added_rules): if self.is_complete: # Completed a rule if self.rule.postprocess: try: # self.data = self.rule.postprocess(self.data, self.reference) # import pdb # pdb.set_trace() self.data = self.rule.postprocess(self.data) except AbortParseMatch: self.data = MatchFailed if self.data is not MatchFailed: for s in table[self.reference]: x = s.consume_nonterminal(self.rule.name) if x: x.data[-1] = self.data x.epsilon_closure(location, ind, table) else: exp = self.rule.symbols[self.expect] if isinstance(exp, dict): return for r in rules[exp]: assert r.name == exp if r not in added_rules: if r.symbols: added_rules.add(r) State(r, 0, location).epsilon_closure(location, ind, table) else: # Empty rule new_copy = self.consume_nonterminal(r.name) if r.postprocess: new_copy.data[-1] = r.postprocess([]) # new_copy.data[-1] = r.postprocess([], self.reference) else: new_copy.data[-1] = [] new_copy.epsilon_closure(location, ind, table) def epsilon_closure(self, location, ind, table, result=None): col = table[location] if not result: result = col result.append(self) if not self.is_complete: for i in xrange(ind): state = col[i] if state.is_complete and state.reference == location: x = self.consume_nonterminal(state.rule.name) if x: x.data[-1] = state.data x.epsilon_closure(location, ind, table) class Parser(object): def __init__(self, rules, start=None): self.rules = [Rule(r['name'], r['symbols'], r.get('postprocess', None)) for r in rules] self.rules_by_name = classify(self.rules, lambda r: r.name) self.start = start or self.rules[0].name def advance_to(self, table, n, added_rules): for w, s in enumerate(table[n]): s.process(n, w, table, self.rules_by_name, added_rules) def parse(self, stream): initial_rules = set(self.rules_by_name[self.start]) table = [[State(r, 0, 0) for r in initial_rules]] self.advance_to(table, 0, initial_rules) for pos, token in enumerate(stream): table.append([]) for s in table[pos]: x = s.consume_terminal(token) if x: table[pos + 1].append(x) self.advance_to(table, pos + 1, set()) if not table[-1]: raise Exception('Error at line {t.line}:{t.column}'.format(t=stream[pos])) return list(self.finish(table)) def finish(self, table): for t in table[-1]: if (t.rule.name == self.start and t.expect == len(t.rule.symbols) and t.reference == 0 and t.data != MatchFailed): yield t.data