# The file was automatically generated by Lark v0.9.0 __version__ = "0.9.0" # # # Lark Stand-alone Generator Tool # ---------------------------------- # Generates a stand-alone LALR(1) parser with a standard lexer # # Git: https://github.com/erezsh/lark # Author: Erez Shinan (erezshin@gmail.com) # # # >>> LICENSE # # This tool and its generated code use a separate license from Lark, # and are subject to the terms of the Mozilla Public License, v. 2.0. # If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. # # If you wish to purchase a commercial license for this tool and its # generated code, you may contact me via email or otherwise. # # If MPL2 is incompatible with your free or open-source project, # contact me and we'll work it out. # # import os from io import open class LarkError(Exception): pass class GrammarError(LarkError): pass class ParseError(LarkError): pass class LexError(LarkError): pass class UnexpectedEOF(ParseError): def __init__(self, expected): self.expected = expected message = ("Unexpected end-of-input. Expected one of: \n\t* %s\n" % '\n\t* '.join(x.name for x in self.expected)) super(UnexpectedEOF, self).__init__(message) class UnexpectedInput(LarkError): #-- pos_in_stream = None def get_context(self, text, span=40): #-- pos = self.pos_in_stream start = max(pos - span, 0) end = pos + span if not isinstance(text, bytes): before = text[start:pos].rsplit('\n', 1)[-1] after = text[pos:end].split('\n', 1)[0] return before + after + '\n' + ' ' * len(before) + '^\n' else: before = text[start:pos].rsplit(b'\n', 1)[-1] after = text[pos:end].split(b'\n', 1)[0] return (before + after + b'\n' + b' ' * len(before) + b'^\n').decode("ascii", "backslashreplace") def match_examples(self, parse_fn, examples, token_type_match_fallback=False, use_accepts=False): #-- assert self.state is not None, "Not supported for this exception" if isinstance(examples, dict): examples = examples.items() candidate = (None, False) for i, (label, example) in enumerate(examples): assert not isinstance(example, STRING_TYPE) for j, malformed in enumerate(example): try: parse_fn(malformed) except UnexpectedInput as ut: if ut.state == self.state: if use_accepts and ut.accepts != self.accepts: logger.debug("Different accepts with same state[%d]: %s != %s at example [%s][%s]" % (self.state, self.accepts, ut.accepts, i, j)) continue try: if ut.token == self.token: ## logger.debug("Exact Match at example [%s][%s]" % (i, j)) return label if token_type_match_fallback: ## if (ut.token.type == self.token.type) and not candidate[-1]: logger.debug("Token Type Fallback at example [%s][%s]" % (i, j)) candidate = label, True except AttributeError: pass if not candidate[0]: logger.debug("Same State match at example [%s][%s]" % (i, j)) candidate = label, False return candidate[0] class UnexpectedCharacters(LexError, UnexpectedInput): def __init__(self, seq, lex_pos, line, column, allowed=None, considered_tokens=None, state=None, token_history=None): self.line = line self.column = column self.pos_in_stream = lex_pos self.state = state self.allowed = allowed self.considered_tokens = considered_tokens if isinstance(seq, bytes): _s = seq[lex_pos:lex_pos+1].decode("ascii", "backslashreplace") else: _s = seq[lex_pos] message = "No terminal defined for '%s' at line %d col %d" % (_s, line, column) message += '\n\n' + self.get_context(seq) if allowed: message += '\nExpecting: %s\n' % allowed if token_history: message += '\nPrevious tokens: %s\n' % ', '.join(repr(t) for t in token_history) super(UnexpectedCharacters, self).__init__(message) class UnexpectedToken(ParseError, UnexpectedInput): #-- def __init__(self, token, expected, considered_rules=None, state=None, puppet=None): self.line = getattr(token, 'line', '?') self.column = getattr(token, 'column', '?') self.pos_in_stream = getattr(token, 'pos_in_stream', None) self.state = state self.token = token self.expected = expected ## self.considered_rules = considered_rules self.puppet = puppet ## ## self.accepts = puppet and puppet.accepts() message = ("Unexpected token %r at line %s, column %s.\n" "Expected one of: \n\t* %s\n" % (token, self.line, self.column, '\n\t* '.join(self.accepts or self.expected))) super(UnexpectedToken, self).__init__(message) class VisitError(LarkError): #-- def __init__(self, rule, obj, orig_exc): self.obj = obj self.orig_exc = orig_exc message = 'Error trying to process rule "%s":\n\n%s' % (rule, orig_exc) super(VisitError, self).__init__(message) import logging logger = logging.getLogger("lark") logger.addHandler(logging.StreamHandler()) ## ## logger.setLevel(logging.CRITICAL) def classify(seq, key=None, value=None): d = {} for item in seq: k = key(item) if (key is not None) else item v = value(item) if (value is not None) else item if k in d: d[k].append(v) else: d[k] = [v] return d def _deserialize(data, namespace, memo): if isinstance(data, dict): if '__type__' in data: ## class_ = namespace[data['__type__']] return class_.deserialize(data, memo) elif '@' in data: return memo[data['@']] return {key:_deserialize(value, namespace, memo) for key, value in data.items()} elif isinstance(data, list): return [_deserialize(value, namespace, memo) for value in data] return data class Serialize(object): def memo_serialize(self, types_to_memoize): memo = SerializeMemoizer(types_to_memoize) return self.serialize(memo), memo.serialize() def serialize(self, memo=None): if memo and memo.in_types(self): return {'@': memo.memoized.get(self)} fields = getattr(self, '__serialize_fields__') res = {f: _serialize(getattr(self, f), memo) for f in fields} res['__type__'] = type(self).__name__ postprocess = getattr(self, '_serialize', None) if postprocess: postprocess(res, memo) return res @classmethod def deserialize(cls, data, memo): namespace = getattr(cls, '__serialize_namespace__', {}) namespace = {c.__name__:c for c in namespace} fields = getattr(cls, '__serialize_fields__') if '@' in data: return memo[data['@']] inst = cls.__new__(cls) for f in fields: try: setattr(inst, f, _deserialize(data[f], namespace, memo)) except KeyError as e: raise KeyError("Cannot find key for class", cls, e) postprocess = getattr(inst, '_deserialize', None) if postprocess: postprocess() return inst class SerializeMemoizer(Serialize): __serialize_fields__ = 'memoized', def __init__(self, types_to_memoize): self.types_to_memoize = tuple(types_to_memoize) self.memoized = Enumerator() def in_types(self, value): return isinstance(value, self.types_to_memoize) def serialize(self): return _serialize(self.memoized.reversed(), None) @classmethod def deserialize(cls, data, namespace, memo): return _deserialize(data, namespace, memo) try: STRING_TYPE = basestring except NameError: ## STRING_TYPE = str import types from functools import wraps, partial from contextlib import contextmanager Str = type(u'') try: classtype = types.ClassType ## except AttributeError: classtype = type ## def smart_decorator(f, create_decorator): if isinstance(f, types.FunctionType): return wraps(f)(create_decorator(f, True)) elif isinstance(f, (classtype, type, types.BuiltinFunctionType)): return wraps(f)(create_decorator(f, False)) elif isinstance(f, types.MethodType): return wraps(f)(create_decorator(f.__func__, True)) elif isinstance(f, partial): ## return wraps(f.func)(create_decorator(lambda *args, **kw: f(*args[1:], **kw), True)) else: return create_decorator(f.__func__.__call__, True) try: import regex except ImportError: regex = None import sys, re Py36 = (sys.version_info[:2] >= (3, 6)) import sre_parse import sre_constants categ_pattern = re.compile(r'\\p{[A-Za-z_]+}') def get_regexp_width(expr): if regex: ## ## ## regexp_final = re.sub(categ_pattern, 'A', expr) else: if re.search(categ_pattern, expr): raise ImportError('`regex` module must be installed in order to use Unicode categories.', expr) regexp_final = expr try: return [int(x) for x in sre_parse.parse(regexp_final).getwidth()] except sre_constants.error: raise ValueError(expr) from collections import OrderedDict class Meta: def __init__(self): self.empty = True class Tree(object): #-- def __init__(self, data, children, meta=None): self.data = data self.children = children self._meta = meta @property def meta(self): if self._meta is None: self._meta = Meta() return self._meta def __repr__(self): return 'Tree(%s, %s)' % (self.data, self.children) def _pretty_label(self): return self.data def _pretty(self, level, indent_str): if len(self.children) == 1 and not isinstance(self.children[0], Tree): return [ indent_str*level, self._pretty_label(), '\t', '%s' % (self.children[0],), '\n'] l = [ indent_str*level, self._pretty_label(), '\n' ] for n in self.children: if isinstance(n, Tree): l += n._pretty(level+1, indent_str) else: l += [ indent_str*(level+1), '%s' % (n,), '\n' ] return l def pretty(self, indent_str=' '): #-- return ''.join(self._pretty(0, indent_str)) def __eq__(self, other): try: return self.data == other.data and self.children == other.children except AttributeError: return False def __ne__(self, other): return not (self == other) def __hash__(self): return hash((self.data, tuple(self.children))) def iter_subtrees(self): #-- queue = [self] subtrees = OrderedDict() for subtree in queue: subtrees[id(subtree)] = subtree queue += [c for c in reversed(subtree.children) if isinstance(c, Tree) and id(c) not in subtrees] del queue return reversed(list(subtrees.values())) def find_pred(self, pred): #-- return filter(pred, self.iter_subtrees()) def find_data(self, data): #-- return self.find_pred(lambda t: t.data == data) from inspect import getmembers, getmro class Discard(Exception): #-- pass ## class _Decoratable: #-- @classmethod def _apply_decorator(cls, decorator, **kwargs): mro = getmro(cls) assert mro[0] is cls libmembers = {name for _cls in mro[1:] for name, _ in getmembers(_cls)} for name, value in getmembers(cls): ## if name.startswith('_') or (name in libmembers and name not in cls.__dict__): continue if not callable(value): continue ## if hasattr(cls.__dict__[name], 'vargs_applied') or hasattr(value, 'vargs_applied'): continue static = isinstance(cls.__dict__[name], (staticmethod, classmethod)) setattr(cls, name, decorator(value, static=static, **kwargs)) return cls def __class_getitem__(cls, _): return cls class Transformer(_Decoratable): #-- __visit_tokens__ = True ## def __init__(self, visit_tokens=True): self.__visit_tokens__ = visit_tokens def _call_userfunc(self, tree, new_children=None): ## children = new_children if new_children is not None else tree.children try: f = getattr(self, tree.data) except AttributeError: return self.__default__(tree.data, children, tree.meta) else: try: wrapper = getattr(f, 'visit_wrapper', None) if wrapper is not None: return f.visit_wrapper(f, tree.data, children, tree.meta) else: return f(children) except (GrammarError, Discard): raise except Exception as e: raise VisitError(tree.data, tree, e) def _call_userfunc_token(self, token): try: f = getattr(self, token.type) except AttributeError: return self.__default_token__(token) else: try: return f(token) except (GrammarError, Discard): raise except Exception as e: raise VisitError(token.type, token, e) def _transform_children(self, children): for c in children: try: if isinstance(c, Tree): yield self._transform_tree(c) elif self.__visit_tokens__ and isinstance(c, Token): yield self._call_userfunc_token(c) else: yield c except Discard: pass def _transform_tree(self, tree): children = list(self._transform_children(tree.children)) return self._call_userfunc(tree, children) def transform(self, tree): return self._transform_tree(tree) def __mul__(self, other): return TransformerChain(self, other) def __default__(self, data, children, meta): #-- return Tree(data, children, meta) def __default_token__(self, token): #-- return token class InlineTransformer(Transformer): ## def _call_userfunc(self, tree, new_children=None): ## children = new_children if new_children is not None else tree.children try: f = getattr(self, tree.data) except AttributeError: return self.__default__(tree.data, children, tree.meta) else: return f(*children) class TransformerChain(object): def __init__(self, *transformers): self.transformers = transformers def transform(self, tree): for t in self.transformers: tree = t.transform(tree) return tree def __mul__(self, other): return TransformerChain(*self.transformers + (other,)) class Transformer_InPlace(Transformer): #-- def _transform_tree(self, tree): ## return self._call_userfunc(tree) def transform(self, tree): for subtree in tree.iter_subtrees(): subtree.children = list(self._transform_children(subtree.children)) return self._transform_tree(tree) class Transformer_NonRecursive(Transformer): #-- def transform(self, tree): ## rev_postfix = [] q = [tree] while q: t = q.pop() rev_postfix.append( t ) if isinstance(t, Tree): q += t.children ## stack = [] for x in reversed(rev_postfix): if isinstance(x, Tree): size = len(x.children) if size: args = stack[-size:] del stack[-size:] else: args = [] stack.append(self._call_userfunc(x, args)) else: stack.append(x) t ,= stack ## return t class Transformer_InPlaceRecursive(Transformer): #-- def _transform_tree(self, tree): tree.children = list(self._transform_children(tree.children)) return self._call_userfunc(tree) ## class VisitorBase: def _call_userfunc(self, tree): return getattr(self, tree.data, self.__default__)(tree) def __default__(self, tree): #-- return tree def __class_getitem__(cls, _): return cls class Visitor(VisitorBase): #-- def visit(self, tree): for subtree in tree.iter_subtrees(): self._call_userfunc(subtree) return tree def visit_topdown(self,tree): for subtree in tree.iter_subtrees_topdown(): self._call_userfunc(subtree) return tree class Visitor_Recursive(VisitorBase): #-- def visit(self, tree): for child in tree.children: if isinstance(child, Tree): self.visit(child) self._call_userfunc(tree) return tree def visit_topdown(self,tree): self._call_userfunc(tree) for child in tree.children: if isinstance(child, Tree): self.visit_topdown(child) return tree def visit_children_decor(func): #-- @wraps(func) def inner(cls, tree): values = cls.visit_children(tree) return func(cls, values) return inner class Interpreter(_Decoratable): #-- def visit(self, tree): f = getattr(self, tree.data) wrapper = getattr(f, 'visit_wrapper', None) if wrapper is not None: return f.visit_wrapper(f, tree.data, tree.children, tree.meta) else: return f(tree) def visit_children(self, tree): return [self.visit(child) if isinstance(child, Tree) else child for child in tree.children] def __getattr__(self, name): return self.__default__ def __default__(self, tree): return self.visit_children(tree) ## def _apply_decorator(obj, decorator, **kwargs): try: _apply = obj._apply_decorator except AttributeError: return decorator(obj, **kwargs) else: return _apply(decorator, **kwargs) def _inline_args__func(func): @wraps(func) def create_decorator(_f, with_self): if with_self: def f(self, children): return _f(self, *children) else: def f(self, children): return _f(*children) return f return smart_decorator(func, create_decorator) def inline_args(obj): ## return _apply_decorator(obj, _inline_args__func) def _visitor_args_func_dec(func, visit_wrapper=None, static=False): def create_decorator(_f, with_self): if with_self: def f(self, *args, **kwargs): return _f(self, *args, **kwargs) else: def f(self, *args, **kwargs): return _f(*args, **kwargs) return f if static: f = wraps(func)(create_decorator(func, False)) else: f = smart_decorator(func, create_decorator) f.vargs_applied = True f.visit_wrapper = visit_wrapper return f def _vargs_inline(f, data, children, meta): return f(*children) def _vargs_meta_inline(f, data, children, meta): return f(meta, *children) def _vargs_meta(f, data, children, meta): return f(children, meta) ## def _vargs_tree(f, data, children, meta): return f(Tree(data, children, meta)) def v_args(inline=False, meta=False, tree=False, wrapper=None): #-- if tree and (meta or inline): raise ValueError("Visitor functions cannot combine 'tree' with 'meta' or 'inline'.") func = None if meta: if inline: func = _vargs_meta_inline else: func = _vargs_meta elif inline: func = _vargs_inline elif tree: func = _vargs_tree if wrapper is not None: if func is not None: raise ValueError("Cannot use 'wrapper' along with 'tree', 'meta' or 'inline'.") func = wrapper def _visitor_args_dec(obj): return _apply_decorator(obj, _visitor_args_func_dec, visit_wrapper=func) return _visitor_args_dec class Indenter: def __init__(self): self.paren_level = None self.indent_level = None assert self.tab_len > 0 def handle_NL(self, token): if self.paren_level > 0: return yield token indent_str = token.rsplit('\n', 1)[1] ## indent = indent_str.count(' ') + indent_str.count('\t') * self.tab_len if indent > self.indent_level[-1]: self.indent_level.append(indent) yield Token.new_borrow_pos(self.INDENT_type, indent_str, token) else: while indent < self.indent_level[-1]: self.indent_level.pop() yield Token.new_borrow_pos(self.DEDENT_type, indent_str, token) assert indent == self.indent_level[-1], '%s != %s' % (indent, self.indent_level[-1]) def _process(self, stream): for token in stream: if token.type == self.NL_type: for t in self.handle_NL(token): yield t else: yield token if token.type in self.OPEN_PAREN_types: self.paren_level += 1 elif token.type in self.CLOSE_PAREN_types: self.paren_level -= 1 assert self.paren_level >= 0 while len(self.indent_level) > 1: self.indent_level.pop() yield Token(self.DEDENT_type, '') assert self.indent_level == [0], self.indent_level def process(self, stream): self.paren_level = 0 self.indent_level = [0] return self._process(stream) ## @property def always_accept(self): return (self.NL_type,) class Symbol(Serialize): __slots__ = ('name',) is_term = NotImplemented def __init__(self, name): self.name = name def __eq__(self, other): assert isinstance(other, Symbol), other return self.is_term == other.is_term and self.name == other.name def __ne__(self, other): return not (self == other) def __hash__(self): return hash(self.name) def __repr__(self): return '%s(%r)' % (type(self).__name__, self.name) fullrepr = property(__repr__) class Terminal(Symbol): __serialize_fields__ = 'name', 'filter_out' is_term = True def __init__(self, name, filter_out=False): self.name = name self.filter_out = filter_out @property def fullrepr(self): return '%s(%r, %r)' % (type(self).__name__, self.name, self.filter_out) class NonTerminal(Symbol): __serialize_fields__ = 'name', is_term = False class RuleOptions(Serialize): __serialize_fields__ = 'keep_all_tokens', 'expand1', 'priority', 'template_source', 'empty_indices' def __init__(self, keep_all_tokens=False, expand1=False, priority=None, template_source=None, empty_indices=()): self.keep_all_tokens = keep_all_tokens self.expand1 = expand1 self.priority = priority self.template_source = template_source self.empty_indices = empty_indices def __repr__(self): return 'RuleOptions(%r, %r, %r, %r)' % ( self.keep_all_tokens, self.expand1, self.priority, self.template_source ) class Rule(Serialize): #-- __slots__ = ('origin', 'expansion', 'alias', 'options', 'order', '_hash') __serialize_fields__ = 'origin', 'expansion', 'order', 'alias', 'options' __serialize_namespace__ = Terminal, NonTerminal, RuleOptions def __init__(self, origin, expansion, order=0, alias=None, options=None): self.origin = origin self.expansion = expansion self.alias = alias self.order = order self.options = options or RuleOptions() self._hash = hash((self.origin, tuple(self.expansion))) def _deserialize(self): self._hash = hash((self.origin, tuple(self.expansion))) def __str__(self): return '<%s : %s>' % (self.origin.name, ' '.join(x.name for x in self.expansion)) def __repr__(self): return 'Rule(%r, %r, %r, %r)' % (self.origin, self.expansion, self.alias, self.options) def __hash__(self): return self._hash def __eq__(self, other): if not isinstance(other, Rule): return False return self.origin == other.origin and self.expansion == other.expansion from copy import copy class Pattern(Serialize): def __init__(self, value, flags=()): self.value = value self.flags = frozenset(flags) def __repr__(self): return repr(self.to_regexp()) ## def __hash__(self): return hash((type(self), self.value, self.flags)) def __eq__(self, other): return type(self) == type(other) and self.value == other.value and self.flags == other.flags def to_regexp(self): raise NotImplementedError() if Py36: ## def _get_flags(self, value): for f in self.flags: value = ('(?%s:%s)' % (f, value)) return value else: def _get_flags(self, value): for f in self.flags: value = ('(?%s)' % f) + value return value class PatternStr(Pattern): __serialize_fields__ = 'value', 'flags' type = "str" def to_regexp(self): return self._get_flags(re.escape(self.value)) @property def min_width(self): return len(self.value) max_width = min_width class PatternRE(Pattern): __serialize_fields__ = 'value', 'flags', '_width' type = "re" def to_regexp(self): return self._get_flags(self.value) _width = None def _get_width(self): if self._width is None: self._width = get_regexp_width(self.to_regexp()) return self._width @property def min_width(self): return self._get_width()[0] @property def max_width(self): return self._get_width()[1] class TerminalDef(Serialize): __serialize_fields__ = 'name', 'pattern', 'priority' __serialize_namespace__ = PatternStr, PatternRE def __init__(self, name, pattern, priority=1): assert isinstance(pattern, Pattern), pattern self.name = name self.pattern = pattern self.priority = priority def __repr__(self): return '%s(%r, %r)' % (type(self).__name__, self.name, self.pattern) class Token(Str): #-- __slots__ = ('type', 'pos_in_stream', 'value', 'line', 'column', 'end_line', 'end_column', 'end_pos') def __new__(cls, type_, value, pos_in_stream=None, line=None, column=None, end_line=None, end_column=None, end_pos=None): try: self = super(Token, cls).__new__(cls, value) except UnicodeDecodeError: value = value.decode('latin1') self = super(Token, cls).__new__(cls, value) self.type = type_ self.pos_in_stream = pos_in_stream self.value = value self.line = line self.column = column self.end_line = end_line self.end_column = end_column self.end_pos = end_pos return self def update(self, type_=None, value=None): return Token.new_borrow_pos( type_ if type_ is not None else self.type, value if value is not None else self.value, self ) @classmethod def new_borrow_pos(cls, type_, value, borrow_t): return cls(type_, value, borrow_t.pos_in_stream, borrow_t.line, borrow_t.column, borrow_t.end_line, borrow_t.end_column, borrow_t.end_pos) def __reduce__(self): return (self.__class__, (self.type, self.value, self.pos_in_stream, self.line, self.column, )) def __repr__(self): return 'Token(%s, %r)' % (self.type, self.value) def __deepcopy__(self, memo): return Token(self.type, self.value, self.pos_in_stream, self.line, self.column) def __eq__(self, other): if isinstance(other, Token) and self.type != other.type: return False return Str.__eq__(self, other) __hash__ = Str.__hash__ class LineCounter: def __init__(self, newline_char): self.newline_char = newline_char self.char_pos = 0 self.line = 1 self.column = 1 self.line_start_pos = 0 def feed(self, token, test_newline=True): #-- if test_newline: newlines = token.count(self.newline_char) if newlines: self.line += newlines self.line_start_pos = self.char_pos + token.rindex(self.newline_char) + 1 self.char_pos += len(token) self.column = self.char_pos - self.line_start_pos + 1 class _Lex: #-- def __init__(self, lexer, state=None): self.lexer = lexer self.state = state def lex(self, stream, newline_types, ignore_types): newline_types = frozenset(newline_types) ignore_types = frozenset(ignore_types) line_ctr = LineCounter('\n' if not self.lexer.use_bytes else b'\n') last_token = None while line_ctr.char_pos < len(stream): lexer = self.lexer res = lexer.match(stream, line_ctr.char_pos) if not res: allowed = {v for m, tfi in lexer.mres for v in tfi.values()} - ignore_types if not allowed: allowed = {""} raise UnexpectedCharacters(stream, line_ctr.char_pos, line_ctr.line, line_ctr.column, allowed=allowed, state=self.state, token_history=last_token and [last_token]) value, type_ = res if type_ not in ignore_types: t = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column) line_ctr.feed(value, type_ in newline_types) t.end_line = line_ctr.line t.end_column = line_ctr.column t.end_pos = line_ctr.char_pos if t.type in lexer.callback: t = lexer.callback[t.type](t) if not isinstance(t, Token): raise ValueError("Callbacks must return a token (returned %r)" % t) yield t last_token = t else: if type_ in lexer.callback: t2 = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column) lexer.callback[type_](t2) line_ctr.feed(value, type_ in newline_types) class UnlessCallback: def __init__(self, mres): self.mres = mres def __call__(self, t): for mre, type_from_index in self.mres: m = mre.match(t.value) if m: t.type = type_from_index[m.lastindex] break return t class CallChain: def __init__(self, callback1, callback2, cond): self.callback1 = callback1 self.callback2 = callback2 self.cond = cond def __call__(self, t): t2 = self.callback1(t) return self.callback2(t) if self.cond(t2) else t2 def _create_unless(terminals, g_regex_flags, re_, use_bytes): tokens_by_type = classify(terminals, lambda t: type(t.pattern)) assert len(tokens_by_type) <= 2, tokens_by_type.keys() embedded_strs = set() callback = {} for retok in tokens_by_type.get(PatternRE, []): unless = [] ## for strtok in tokens_by_type.get(PatternStr, []): if strtok.priority > retok.priority: continue s = strtok.pattern.value m = re_.match(retok.pattern.to_regexp(), s, g_regex_flags) if m and m.group(0) == s: unless.append(strtok) if strtok.pattern.flags <= retok.pattern.flags: embedded_strs.add(strtok) if unless: callback[retok.name] = UnlessCallback(build_mres(unless, g_regex_flags, re_, match_whole=True, use_bytes=use_bytes)) terminals = [t for t in terminals if t not in embedded_strs] return terminals, callback def _build_mres(terminals, max_size, g_regex_flags, match_whole, re_, use_bytes): ## ## ## postfix = '$' if match_whole else '' mres = [] while terminals: pattern = u'|'.join(u'(?P<%s>%s)' % (t.name, t.pattern.to_regexp() + postfix) for t in terminals[:max_size]) if use_bytes: pattern = pattern.encode('latin-1') try: mre = re_.compile(pattern, g_regex_flags) except AssertionError: ## return _build_mres(terminals, max_size//2, g_regex_flags, match_whole, re_, use_bytes) ## mres.append((mre, {i:n for n,i in mre.groupindex.items()} )) terminals = terminals[max_size:] return mres def build_mres(terminals, g_regex_flags, re_, use_bytes, match_whole=False): return _build_mres(terminals, len(terminals), g_regex_flags, match_whole, re_, use_bytes) def _regexp_has_newline(r): #-- return '\n' in r or '\\n' in r or '\\s' in r or '[^' in r or ('(?s' in r and '.' in r) class Lexer(object): #-- lex = NotImplemented class TraditionalLexer(Lexer): def __init__(self, conf): terminals = list(conf.tokens) assert all(isinstance(t, TerminalDef) for t in terminals), terminals self.re = conf.re_module if not conf.skip_validation: ## for t in terminals: try: self.re.compile(t.pattern.to_regexp(), conf.g_regex_flags) except self.re.error: raise LexError("Cannot compile token %s: %s" % (t.name, t.pattern)) if t.pattern.min_width == 0: raise LexError("Lexer does not allow zero-width terminals. (%s: %s)" % (t.name, t.pattern)) assert set(conf.ignore) <= {t.name for t in terminals} ## self.newline_types = [t.name for t in terminals if _regexp_has_newline(t.pattern.to_regexp())] self.ignore_types = list(conf.ignore) terminals.sort(key=lambda x:(-x.priority, -x.pattern.max_width, -len(x.pattern.value), x.name)) self.terminals = terminals self.user_callbacks = conf.callbacks self.g_regex_flags = conf.g_regex_flags self.use_bytes = conf.use_bytes self._mres = None ## def _build(self): terminals, self.callback = _create_unless(self.terminals, self.g_regex_flags, re_=self.re, use_bytes=self.use_bytes) assert all(self.callback.values()) for type_, f in self.user_callbacks.items(): if type_ in self.callback: ## self.callback[type_] = CallChain(self.callback[type_], f, lambda t: t.type == type_) else: self.callback[type_] = f self._mres = build_mres(terminals, self.g_regex_flags, self.re, self.use_bytes) @property def mres(self): if self._mres is None: self._build() return self._mres def match(self, stream, pos): for mre, type_from_index in self.mres: m = mre.match(stream, pos) if m: return m.group(0), type_from_index[m.lastindex] def lex(self, stream): return _Lex(self).lex(stream, self.newline_types, self.ignore_types) class ContextualLexer(Lexer): def __init__(self, conf, states, always_accept=()): terminals = list(conf.tokens) tokens_by_name = {} for t in terminals: assert t.name not in tokens_by_name, t tokens_by_name[t.name] = t trad_conf = copy(conf) trad_conf.tokens = terminals lexer_by_tokens = {} self.lexers = {} for state, accepts in states.items(): key = frozenset(accepts) try: lexer = lexer_by_tokens[key] except KeyError: accepts = set(accepts) | set(conf.ignore) | set(always_accept) state_tokens = [tokens_by_name[n] for n in accepts if n and n in tokens_by_name] lexer_conf = copy(trad_conf) lexer_conf.tokens = state_tokens lexer = TraditionalLexer(lexer_conf) lexer_by_tokens[key] = lexer self.lexers[state] = lexer assert trad_conf.tokens is terminals self.root_lexer = TraditionalLexer(trad_conf) def lex(self, stream, get_parser_state): parser_state = get_parser_state() l = _Lex(self.lexers[parser_state], parser_state) try: for x in l.lex(stream, self.root_lexer.newline_types, self.root_lexer.ignore_types): yield x parser_state = get_parser_state() l.lexer = self.lexers[parser_state] l.state = parser_state ## except UnexpectedCharacters as e: ## ## ## root_match = self.root_lexer.match(stream, e.pos_in_stream) if not root_match: raise value, type_ = root_match t = Token(type_, value, e.pos_in_stream, e.line, e.column) raise UnexpectedToken(t, e.allowed, state=e.state) class LexerConf(Serialize): __serialize_fields__ = 'tokens', 'ignore', 'g_regex_flags', 'use_bytes' __serialize_namespace__ = TerminalDef, def __init__(self, tokens, re_module, ignore=(), postlex=None, callbacks=None, g_regex_flags=0, skip_validation=False, use_bytes=False): self.tokens = tokens ## self.ignore = ignore self.postlex = postlex self.callbacks = callbacks or {} self.g_regex_flags = g_regex_flags self.re_module = re_module self.skip_validation = skip_validation self.use_bytes = use_bytes from functools import partial, wraps from itertools import repeat, product class ExpandSingleChild: def __init__(self, node_builder): self.node_builder = node_builder def __call__(self, children): if len(children) == 1: return children[0] else: return self.node_builder(children) class PropagatePositions: def __init__(self, node_builder): self.node_builder = node_builder def __call__(self, children): res = self.node_builder(children) ## if isinstance(res, Tree): res_meta = res.meta for c in children: if isinstance(c, Tree): child_meta = c.meta if not child_meta.empty: res_meta.line = child_meta.line res_meta.column = child_meta.column res_meta.start_pos = child_meta.start_pos res_meta.empty = False break elif isinstance(c, Token): res_meta.line = c.line res_meta.column = c.column res_meta.start_pos = c.pos_in_stream res_meta.empty = False break for c in reversed(children): if isinstance(c, Tree): child_meta = c.meta if not child_meta.empty: res_meta.end_line = child_meta.end_line res_meta.end_column = child_meta.end_column res_meta.end_pos = child_meta.end_pos res_meta.empty = False break elif isinstance(c, Token): res_meta.end_line = c.end_line res_meta.end_column = c.end_column res_meta.end_pos = c.end_pos res_meta.empty = False break return res class ChildFilter: def __init__(self, to_include, append_none, node_builder): self.node_builder = node_builder self.to_include = to_include self.append_none = append_none def __call__(self, children): filtered = [] for i, to_expand, add_none in self.to_include: if add_none: filtered += [None] * add_none if to_expand: filtered += children[i].children else: filtered.append(children[i]) if self.append_none: filtered += [None] * self.append_none return self.node_builder(filtered) class ChildFilterLALR(ChildFilter): #-- def __call__(self, children): filtered = [] for i, to_expand, add_none in self.to_include: if add_none: filtered += [None] * add_none if to_expand: if filtered: filtered += children[i].children else: ## filtered = children[i].children else: filtered.append(children[i]) if self.append_none: filtered += [None] * self.append_none return self.node_builder(filtered) class ChildFilterLALR_NoPlaceholders(ChildFilter): #-- def __init__(self, to_include, node_builder): self.node_builder = node_builder self.to_include = to_include def __call__(self, children): filtered = [] for i, to_expand in self.to_include: if to_expand: if filtered: filtered += children[i].children else: ## filtered = children[i].children else: filtered.append(children[i]) return self.node_builder(filtered) def _should_expand(sym): return not sym.is_term and sym.name.startswith('_') def maybe_create_child_filter(expansion, keep_all_tokens, ambiguous, _empty_indices): ## if _empty_indices: assert _empty_indices.count(False) == len(expansion) s = ''.join(str(int(b)) for b in _empty_indices) empty_indices = [len(ones) for ones in s.split('0')] assert len(empty_indices) == len(expansion)+1, (empty_indices, len(expansion)) else: empty_indices = [0] * (len(expansion)+1) to_include = [] nones_to_add = 0 for i, sym in enumerate(expansion): nones_to_add += empty_indices[i] if keep_all_tokens or not (sym.is_term and sym.filter_out): to_include.append((i, _should_expand(sym), nones_to_add)) nones_to_add = 0 nones_to_add += empty_indices[len(expansion)] if _empty_indices or len(to_include) < len(expansion) or any(to_expand for i, to_expand,_ in to_include): if _empty_indices or ambiguous: return partial(ChildFilter if ambiguous else ChildFilterLALR, to_include, nones_to_add) else: ## return partial(ChildFilterLALR_NoPlaceholders, [(i, x) for i,x,_ in to_include]) class AmbiguousExpander: #-- def __init__(self, to_expand, tree_class, node_builder): self.node_builder = node_builder self.tree_class = tree_class self.to_expand = to_expand def __call__(self, children): def _is_ambig_tree(child): return hasattr(child, 'data') and child.data == '_ambig' ## ## ## ## ambiguous = [] for i, child in enumerate(children): if _is_ambig_tree(child): if i in self.to_expand: ambiguous.append(i) to_expand = [j for j, grandchild in enumerate(child.children) if _is_ambig_tree(grandchild)] child.expand_kids_by_index(*to_expand) if not ambiguous: return self.node_builder(children) expand = [ iter(child.children) if i in ambiguous else repeat(child) for i, child in enumerate(children) ] return self.tree_class('_ambig', [self.node_builder(list(f[0])) for f in product(zip(*expand))]) def maybe_create_ambiguous_expander(tree_class, expansion, keep_all_tokens): to_expand = [i for i, sym in enumerate(expansion) if keep_all_tokens or ((not (sym.is_term and sym.filter_out)) and _should_expand(sym))] if to_expand: return partial(AmbiguousExpander, to_expand, tree_class) def ptb_inline_args(func): @wraps(func) def f(children): return func(*children) return f def inplace_transformer(func): @wraps(func) def f(children): ## tree = Tree(func.__name__, children) return func(tree) return f def apply_visit_wrapper(func, name, wrapper): if wrapper is _vargs_meta or wrapper is _vargs_meta_inline: raise NotImplementedError("Meta args not supported for internal transformer") @wraps(func) def f(children): return wrapper(func, name, children, None) return f class ParseTreeBuilder: def __init__(self, rules, tree_class, propagate_positions=False, keep_all_tokens=False, ambiguous=False, maybe_placeholders=False): self.tree_class = tree_class self.propagate_positions = propagate_positions self.always_keep_all_tokens = keep_all_tokens self.ambiguous = ambiguous self.maybe_placeholders = maybe_placeholders self.rule_builders = list(self._init_builders(rules)) def _init_builders(self, rules): for rule in rules: options = rule.options keep_all_tokens = self.always_keep_all_tokens or options.keep_all_tokens expand_single_child = options.expand1 wrapper_chain = list(filter(None, [ (expand_single_child and not rule.alias) and ExpandSingleChild, maybe_create_child_filter(rule.expansion, keep_all_tokens, self.ambiguous, options.empty_indices if self.maybe_placeholders else None), self.propagate_positions and PropagatePositions, self.ambiguous and maybe_create_ambiguous_expander(self.tree_class, rule.expansion, keep_all_tokens), ])) yield rule, wrapper_chain def create_callback(self, transformer=None): callbacks = {} for rule, wrapper_chain in self.rule_builders: user_callback_name = rule.alias or rule.options.template_source or rule.origin.name try: f = getattr(transformer, user_callback_name) ## wrapper = getattr(f, 'visit_wrapper', None) if wrapper is not None: f = apply_visit_wrapper(f, user_callback_name, wrapper) else: if isinstance(transformer, InlineTransformer): f = ptb_inline_args(f) elif isinstance(transformer, Transformer_InPlace): f = inplace_transformer(f) except AttributeError: f = partial(self.tree_class, user_callback_name) for w in wrapper_chain: f = w(f) if rule in callbacks: raise GrammarError("Rule '%s' already exists" % (rule,)) callbacks[rule] = f return callbacks class LALR_Parser(object): def __init__(self, parser_conf, debug=False): assert all(r.options.priority is None for r in parser_conf.rules), "LALR doesn't yet support prioritization" analysis = LALR_Analyzer(parser_conf, debug=debug) analysis.compute_lalr() callbacks = parser_conf.callbacks self._parse_table = analysis.parse_table self.parser_conf = parser_conf self.parser = _Parser(analysis.parse_table, callbacks, debug) @classmethod def deserialize(cls, data, memo, callbacks): inst = cls.__new__(cls) inst._parse_table = IntParseTable.deserialize(data, memo) inst.parser = _Parser(inst._parse_table, callbacks) return inst def serialize(self, memo): return self._parse_table.serialize(memo) def parse(self, *args): return self.parser.parse(*args) class _Parser: def __init__(self, parse_table, callbacks, debug=False): self.parse_table = parse_table self.callbacks = callbacks self.debug = debug def parse(self, seq, start, set_state=None, value_stack=None, state_stack=None): token = None stream = iter(seq) states = self.parse_table.states start_state = self.parse_table.start_states[start] end_state = self.parse_table.end_states[start] state_stack = state_stack or [start_state] value_stack = value_stack or [] if set_state: set_state(start_state) def get_action(token): state = state_stack[-1] try: return states[state][token.type] except KeyError: expected = {s for s in states[state].keys() if s.isupper()} try: puppet = ParserPuppet(self, state_stack, value_stack, start, stream, set_state) except NameError: ## puppet = None raise UnexpectedToken(token, expected, state=state, puppet=puppet) def reduce(rule): size = len(rule.expansion) if size: s = value_stack[-size:] del state_stack[-size:] del value_stack[-size:] else: s = [] value = self.callbacks[rule](s) _action, new_state = states[state_stack[-1]][rule.origin.name] assert _action is Shift state_stack.append(new_state) value_stack.append(value) ## try: for token in stream: while True: action, arg = get_action(token) assert arg != end_state if action is Shift: state_stack.append(arg) value_stack.append(token) if set_state: set_state(arg) break ## else: reduce(arg) except Exception as e: if self.debug: print("") print("STATE STACK DUMP") print("----------------") for i, s in enumerate(state_stack): print('%d)' % i , s) print("") raise token = Token.new_borrow_pos('$END', '', token) if token else Token('$END', '', 0, 1, 1) while True: _action, arg = get_action(token) assert(_action is Reduce) reduce(arg) if state_stack[-1] == end_state: return value_stack[-1] class Action: def __init__(self, name): self.name = name def __str__(self): return self.name def __repr__(self): return str(self) Shift = Action('Shift') Reduce = Action('Reduce') class ParseTable: def __init__(self, states, start_states, end_states): self.states = states self.start_states = start_states self.end_states = end_states def serialize(self, memo): tokens = Enumerator() rules = Enumerator() states = { state: {tokens.get(token): ((1, arg.serialize(memo)) if action is Reduce else (0, arg)) for token, (action, arg) in actions.items()} for state, actions in self.states.items() } return { 'tokens': tokens.reversed(), 'states': states, 'start_states': self.start_states, 'end_states': self.end_states, } @classmethod def deserialize(cls, data, memo): tokens = data['tokens'] states = { state: {tokens[token]: ((Reduce, Rule.deserialize(arg, memo)) if action==1 else (Shift, arg)) for token, (action, arg) in actions.items()} for state, actions in data['states'].items() } return cls(states, data['start_states'], data['end_states']) class IntParseTable(ParseTable): @classmethod def from_ParseTable(cls, parse_table): enum = list(parse_table.states) state_to_idx = {s:i for i,s in enumerate(enum)} int_states = {} for s, la in parse_table.states.items(): la = {k:(v[0], state_to_idx[v[1]]) if v[0] is Shift else v for k,v in la.items()} int_states[ state_to_idx[s] ] = la start_states = {start:state_to_idx[s] for start, s in parse_table.start_states.items()} end_states = {start:state_to_idx[s] for start, s in parse_table.end_states.items()} return cls(int_states, start_states, end_states) def get_frontend(parser, lexer): if parser=='lalr': if lexer is None: raise ValueError('The LALR parser requires use of a lexer') elif lexer == 'standard': return LALR_TraditionalLexer elif lexer == 'contextual': return LALR_ContextualLexer elif issubclass(lexer, Lexer): class LALR_CustomLexerWrapper(LALR_CustomLexer): def __init__(self, lexer_conf, parser_conf, options=None): super(LALR_CustomLexerWrapper, self).__init__( lexer, lexer_conf, parser_conf, options=options) def init_lexer(self): self.lexer = lexer(self.lexer_conf) return LALR_CustomLexerWrapper else: raise ValueError('Unknown lexer: %s' % lexer) elif parser=='earley': if lexer=='standard': return Earley elif lexer=='dynamic': return XEarley elif lexer=='dynamic_complete': return XEarley_CompleteLex elif lexer=='contextual': raise ValueError('The Earley parser does not support the contextual parser') else: raise ValueError('Unknown lexer: %s' % lexer) elif parser == 'cyk': if lexer == 'standard': return CYK else: raise ValueError('CYK parser requires using standard parser.') else: raise ValueError('Unknown parser: %s' % parser) class _ParserFrontend(Serialize): def _parse(self, input, start, *args): if start is None: start = self.start if len(start) > 1: raise ValueError("Lark initialized with more than 1 possible start rule. Must specify which start rule to parse", start) start ,= start return self.parser.parse(input, start, *args) def _get_lexer_callbacks(transformer, terminals): result = {} for terminal in terminals: callback = getattr(transformer, terminal.name, None) if callback is not None: result[terminal.name] = callback return result class WithLexer(_ParserFrontend): lexer = None parser = None lexer_conf = None start = None __serialize_fields__ = 'parser', 'lexer_conf', 'start' __serialize_namespace__ = LexerConf, def __init__(self, lexer_conf, parser_conf, options=None): self.lexer_conf = lexer_conf self.start = parser_conf.start self.postlex = lexer_conf.postlex @classmethod def deserialize(cls, data, memo, callbacks, postlex, transformer, re_module): inst = super(WithLexer, cls).deserialize(data, memo) inst.postlex = postlex inst.parser = LALR_Parser.deserialize(inst.parser, memo, callbacks) terminals = [item for item in memo.values() if isinstance(item, TerminalDef)] inst.lexer_conf.callbacks = _get_lexer_callbacks(transformer, terminals) inst.lexer_conf.re_module = re_module inst.lexer_conf.skip_validation=True inst.init_lexer() return inst def _serialize(self, data, memo): data['parser'] = data['parser'].serialize(memo) def lex(self, *args): stream = self.lexer.lex(*args) return self.postlex.process(stream) if self.postlex else stream def parse(self, text, start=None): token_stream = self.lex(text) return self._parse(token_stream, start) def init_traditional_lexer(self): self.lexer = TraditionalLexer(self.lexer_conf) class LALR_WithLexer(WithLexer): def __init__(self, lexer_conf, parser_conf, options=None): debug = options.debug if options else False self.parser = LALR_Parser(parser_conf, debug=debug) WithLexer.__init__(self, lexer_conf, parser_conf, options) self.init_lexer() def init_lexer(self, **kw): raise NotImplementedError() class LALR_TraditionalLexer(LALR_WithLexer): def init_lexer(self): self.init_traditional_lexer() class LALR_ContextualLexer(LALR_WithLexer): def init_lexer(self): states = {idx:list(t.keys()) for idx, t in self.parser._parse_table.states.items()} always_accept = self.postlex.always_accept if self.postlex else () self.lexer = ContextualLexer(self.lexer_conf, states, always_accept=always_accept) def parse(self, text, start=None): parser_state = [None] def set_parser_state(s): parser_state[0] = s token_stream = self.lex(text, lambda: parser_state[0]) return self._parse(token_stream, start, set_parser_state) class LarkOptions(Serialize): #-- OPTIONS_DOC = """ **=== General ===** start The start symbol. Either a string, or a list of strings for multiple possible starts (Default: "start") debug Display debug information, such as warnings (default: False) transformer Applies the transformer to every parse tree (equivlent to applying it after the parse, but faster) propagate_positions Propagates (line, column, end_line, end_column) attributes into all tree branches. maybe_placeholders When True, the ``[]`` operator returns ``None`` when not matched. When ``False``, ``[]`` behaves like the ``?`` operator, and returns no value at all. (default= ``False``. Recommended to set to ``True``) regex When True, uses the ``regex`` module instead of the stdlib ``re``. cache Cache the results of the Lark grammar analysis, for x2 to x3 faster loading. LALR only for now. - When ``False``, does nothing (default) - When ``True``, caches to a temporary file in the local directory - When given a string, caches to the path pointed by the string g_regex_flags Flags that are applied to all terminals (both regex and strings) keep_all_tokens Prevent the tree builder from automagically removing "punctuation" tokens (default: False) **=== Algorithm ===** parser Decides which parser engine to use. Accepts "earley" or "lalr". (Default: "earley"). (there is also a "cyk" option for legacy) lexer Decides whether or not to use a lexer stage - "auto" (default): Choose for me based on the parser - "standard": Use a standard lexer - "contextual": Stronger lexer (only works with parser="lalr") - "dynamic": Flexible and powerful (only with parser="earley") - "dynamic_complete": Same as dynamic, but tries *every* variation of tokenizing possible. ambiguity Decides how to handle ambiguity in the parse. Only relevant if parser="earley" - "resolve" - The parser will automatically choose the simplest derivation (it chooses consistently: greedy for tokens, non-greedy for rules) - "explicit": The parser will return all derivations wrapped in "_ambig" tree nodes (i.e. a forest). **=== Misc. / Domain Specific ===** postlex Lexer post-processing (Default: None) Only works with the standard and contextual lexers. priority How priorities should be evaluated - auto, none, normal, invert (Default: auto) lexer_callbacks Dictionary of callbacks for the lexer. May alter tokens during lexing. Use with caution. use_bytes Accept an input of type ``bytes`` instead of ``str`` (Python 3 only). edit_terminals A callback for editing the terminals before parse. """ if __doc__: __doc__ += OPTIONS_DOC _defaults = { 'debug': False, 'keep_all_tokens': False, 'tree_class': None, 'cache': False, 'postlex': None, 'parser': 'earley', 'lexer': 'auto', 'transformer': None, 'start': 'start', 'priority': 'auto', 'ambiguity': 'auto', 'regex': False, 'propagate_positions': False, 'lexer_callbacks': {}, 'maybe_placeholders': False, 'edit_terminals': None, 'g_regex_flags': 0, 'use_bytes': False, } def __init__(self, options_dict): o = dict(options_dict) options = {} for name, default in self._defaults.items(): if name in o: value = o.pop(name) if isinstance(default, bool) and name not in ('cache', 'use_bytes'): value = bool(value) else: value = default options[name] = value if isinstance(options['start'], STRING_TYPE): options['start'] = [options['start']] self.__dict__['options'] = options assert self.parser in ('earley', 'lalr', 'cyk', None) if self.parser == 'earley' and self.transformer: raise ValueError('Cannot specify an embedded transformer when using the Earley algorithm.' 'Please use your transformer on the resulting parse tree, or use a different algorithm (i.e. LALR)') if o: raise ValueError("Unknown options: %s" % o.keys()) def __getattr__(self, name): try: return self.options[name] except KeyError as e: raise AttributeError(e) def __setattr__(self, name, value): assert name in self.options self.options[name] = value def serialize(self, memo): return self.options @classmethod def deserialize(cls, data, memo): return cls(data) class Lark(Serialize): #-- def __init__(self, grammar, **options): self.options = LarkOptions(options) ## use_regex = self.options.regex if use_regex: if regex: re_module = regex else: raise ImportError('`regex` module must be installed if calling `Lark(regex=True)`.') else: re_module = re ## try: self.source = grammar.name except AttributeError: self.source = '' ## try: read = grammar.read except AttributeError: pass else: grammar = read() assert isinstance(grammar, STRING_TYPE) self.grammar_source = grammar if self.options.use_bytes: if not isascii(grammar): raise ValueError("Grammar must be ascii only, when use_bytes=True") if sys.version_info[0] == 2 and self.options.use_bytes != 'force': raise NotImplementedError("`use_bytes=True` may have issues on python2." "Use `use_bytes='force'` to use it at your own risk.") cache_fn = None if self.options.cache: if self.options.parser != 'lalr': raise NotImplementedError("cache only works with parser='lalr' for now") if isinstance(self.options.cache, STRING_TYPE): cache_fn = self.options.cache else: if self.options.cache is not True: raise ValueError("cache argument must be bool or str") unhashable = ('transformer', 'postlex', 'lexer_callbacks', 'edit_terminals') from . import __version__ options_str = ''.join(k+str(v) for k, v in options.items() if k not in unhashable) s = grammar + options_str + __version__ md5 = hashlib.md5(s.encode()).hexdigest() cache_fn = '.lark_cache_%s.tmp' % md5 if FS.exists(cache_fn): logger.debug('Loading grammar from cache: %s', cache_fn) with FS.open(cache_fn, 'rb') as f: self._load(f, self.options.transformer, self.options.postlex) return if self.options.lexer == 'auto': if self.options.parser == 'lalr': self.options.lexer = 'contextual' elif self.options.parser == 'earley': self.options.lexer = 'dynamic' elif self.options.parser == 'cyk': self.options.lexer = 'standard' else: assert False, self.options.parser lexer = self.options.lexer assert lexer in ('standard', 'contextual', 'dynamic', 'dynamic_complete') or issubclass(lexer, Lexer) if self.options.ambiguity == 'auto': if self.options.parser == 'earley': self.options.ambiguity = 'resolve' else: disambig_parsers = ['earley', 'cyk'] assert self.options.parser in disambig_parsers, ( 'Only %s supports disambiguation right now') % ', '.join(disambig_parsers) if self.options.priority == 'auto': if self.options.parser in ('earley', 'cyk', ): self.options.priority = 'normal' elif self.options.parser in ('lalr', ): self.options.priority = None elif self.options.priority in ('invert', 'normal'): assert self.options.parser in ('earley', 'cyk'), "priorities are not supported for LALR at this time" assert self.options.priority in ('auto', None, 'normal', 'invert'), 'invalid priority option specified: {}. options are auto, none, normal, invert.'.format(self.options.priority) assert self.options.ambiguity not in ('resolve__antiscore_sum', ), 'resolve__antiscore_sum has been replaced with the option priority="invert"' assert self.options.ambiguity in ('resolve', 'explicit', 'auto', ) ## self.grammar = load_grammar(grammar, self.source, re_module) ## self.terminals, self.rules, self.ignore_tokens = self.grammar.compile(self.options.start) if self.options.edit_terminals: for t in self.terminals: self.options.edit_terminals(t) self._terminals_dict = {t.name: t for t in self.terminals} ## ## if self.options.priority == 'invert': for rule in self.rules: if rule.options.priority is not None: rule.options.priority = -rule.options.priority ## ## ## elif self.options.priority == None: for rule in self.rules: if rule.options.priority is not None: rule.options.priority = None ## lexer_callbacks = (_get_lexer_callbacks(self.options.transformer, self.terminals) if self.options.transformer else {}) lexer_callbacks.update(self.options.lexer_callbacks) self.lexer_conf = LexerConf(self.terminals, re_module, self.ignore_tokens, self.options.postlex, lexer_callbacks, self.options.g_regex_flags, use_bytes=self.options.use_bytes) if self.options.parser: self.parser = self._build_parser() elif lexer: self.lexer = self._build_lexer() if cache_fn: logger.debug('Saving grammar to cache: %s', cache_fn) with FS.open(cache_fn, 'wb') as f: self.save(f) ## __doc__ += "\nOptions:\n" + LarkOptions.OPTIONS_DOC __serialize_fields__ = 'parser', 'rules', 'options' def _build_lexer(self): return TraditionalLexer(self.lexer_conf) def _prepare_callbacks(self): self.parser_class = get_frontend(self.options.parser, self.options.lexer) self._parse_tree_builder = ParseTreeBuilder(self.rules, self.options.tree_class or Tree, self.options.propagate_positions, self.options.keep_all_tokens, self.options.parser!='lalr' and self.options.ambiguity=='explicit', self.options.maybe_placeholders) self._callbacks = self._parse_tree_builder.create_callback(self.options.transformer) def _build_parser(self): self._prepare_callbacks() parser_conf = ParserConf(self.rules, self._callbacks, self.options.start) return self.parser_class(self.lexer_conf, parser_conf, options=self.options) def save(self, f): #-- data, m = self.memo_serialize([TerminalDef, Rule]) pickle.dump({'data': data, 'memo': m}, f) @classmethod def load(cls, f): #-- inst = cls.__new__(cls) return inst._load(f) def _load(self, f, transformer=None, postlex=None): if isinstance(f, dict): d = f else: d = pickle.load(f) memo = d['memo'] data = d['data'] assert memo memo = SerializeMemoizer.deserialize(memo, {'Rule': Rule, 'TerminalDef': TerminalDef}, {}) options = dict(data['options']) if transformer is not None: options['transformer'] = transformer if postlex is not None: options['postlex'] = postlex self.options = LarkOptions.deserialize(options, memo) re_module = regex if self.options.regex else re self.rules = [Rule.deserialize(r, memo) for r in data['rules']] self.source = '' self._prepare_callbacks() self.parser = self.parser_class.deserialize( data['parser'], memo, self._callbacks, self.options.postlex, self.options.transformer, re_module ) return self @classmethod def _load_from_dict(cls, data, memo, transformer=None, postlex=None): inst = cls.__new__(cls) return inst._load({'data': data, 'memo': memo}, transformer, postlex) @classmethod def open(cls, grammar_filename, rel_to=None, **options): #-- if rel_to: basepath = os.path.dirname(rel_to) grammar_filename = os.path.join(basepath, grammar_filename) with open(grammar_filename, encoding='utf8') as f: return cls(f, **options) def __repr__(self): return 'Lark(open(%r), parser=%r, lexer=%r, ...)' % (self.source, self.options.parser, self.options.lexer) def lex(self, text): #-- if not hasattr(self, 'lexer'): self.lexer = self._build_lexer() stream = self.lexer.lex(text) if self.options.postlex: return self.options.postlex.process(stream) return stream def get_terminal(self, name): #-- return self._terminals_dict[name] def parse(self, text, start=None, on_error=None): #-- try: return self.parser.parse(text, start=start) except UnexpectedToken as e: if on_error is None: raise while True: if not on_error(e): raise e try: return e.puppet.resume_parse() except UnexpectedToken as e2: e = e2 DATA = ( {'parser': {'parser': {'tokens': {0: 'RBRACE', 1: 'COMMA', 2: 'RSQB', 3: '$END', 4: '__object_star_1', 5: 'COLON', 6: 'LBRACE', 7: 'value', 8: 'string', 9: 'object', 10: 'TRUE', 11: 'SIGNED_NUMBER', 12: 'LSQB', 13: 'NULL', 14: 'FALSE', 15: 'array', 16: 'ESCAPED_STRING', 17: '__array_star_0', 18: 'pair', 19: 'start'}, 'states': {0: {0: (1, {'@': 12}), 1: (1, {'@': 12})}, 1: {1: (1, {'@': 13}), 2: (1, {'@': 13}), 0: (1, {'@': 13}), 3: (1, {'@': 13})}, 2: {1: (1, {'@': 14}), 2: (1, {'@': 14}), 0: (1, {'@': 14}), 3: (1, {'@': 14})}, 3: {0: (0, 25), 1: (0, 32)}, 4: {4: (0, 3), 1: (0, 27), 0: (0, 33)}, 5: {0: (1, {'@': 15}), 1: (1, {'@': 15})}, 6: {}, 7: {1: (0, 23), 2: (0, 2)}, 8: {1: (1, {'@': 16}), 2: (1, {'@': 16})}, 9: {1: (1, {'@': 17}), 2: (1, {'@': 17}), 5: (1, {'@': 17}), 0: (1, {'@': 17}), 3: (1, {'@': 17})}, 10: {1: (1, {'@': 18}), 2: (1, {'@': 18}), 0: (1, {'@': 18}), 3: (1, {'@': 18})}, 11: {1: (1, {'@': 19}), 2: (1, {'@': 19}), 0: (1, {'@': 19}), 3: (1, {'@': 19})}, 12: {1: (1, {'@': 20}), 2: (1, {'@': 20}), 0: (1, {'@': 20}), 3: (1, {'@': 20})}, 13: {5: (0, 22)}, 14: {6: (0, 21), 7: (0, 29), 8: (0, 12), 9: (0, 1), 10: (0, 16), 11: (0, 11), 12: (0, 26), 13: (0, 30), 14: (0, 15), 15: (0, 10), 16: (0, 9)}, 15: {1: (1, {'@': 21}), 2: (1, {'@': 21}), 0: (1, {'@': 21}), 3: (1, {'@': 21})}, 16: {1: (1, {'@': 22}), 2: (1, {'@': 22}), 0: (1, {'@': 22}), 3: (1, {'@': 22})}, 17: {1: (1, {'@': 23}), 2: (1, {'@': 23}), 0: (1, {'@': 23}), 3: (1, {'@': 23})}, 18: {2: (0, 24), 1: (0, 14), 17: (0, 7)}, 19: {1: (1, {'@': 24}), 2: (1, {'@': 24}), 0: (1, {'@': 24}), 3: (1, {'@': 24})}, 20: {0: (1, {'@': 25}), 1: (1, {'@': 25})}, 21: {8: (0, 13), 18: (0, 4), 16: (0, 9), 0: (0, 19)}, 22: {6: (0, 21), 8: (0, 12), 9: (0, 1), 10: (0, 16), 11: (0, 11), 12: (0, 26), 13: (0, 30), 14: (0, 15), 15: (0, 10), 7: (0, 20), 16: (0, 9)}, 23: {6: (0, 21), 7: (0, 8), 9: (0, 1), 8: (0, 12), 10: (0, 16), 11: (0, 11), 12: (0, 26), 13: (0, 30), 14: (0, 15), 15: (0, 10), 16: (0, 9)}, 24: {1: (1, {'@': 26}), 2: (1, {'@': 26}), 0: (1, {'@': 26}), 3: (1, {'@': 26})}, 25: {1: (1, {'@': 27}), 2: (1, {'@': 27}), 0: (1, {'@': 27}), 3: (1, {'@': 27})}, 26: {6: (0, 21), 10: (0, 16), 12: (0, 26), 13: (0, 30), 14: (0, 15), 7: (0, 18), 8: (0, 12), 16: (0, 9), 9: (0, 1), 11: (0, 11), 15: (0, 10), 2: (0, 17)}, 27: {8: (0, 13), 18: (0, 0), 16: (0, 9)}, 28: {6: (0, 21), 10: (0, 16), 12: (0, 26), 13: (0, 30), 8: (0, 12), 16: (0, 9), 19: (0, 6), 9: (0, 1), 11: (0, 11), 7: (0, 31), 15: (0, 10), 14: (0, 15)}, 29: {1: (1, {'@': 28}), 2: (1, {'@': 28})}, 30: {1: (1, {'@': 29}), 2: (1, {'@': 29}), 0: (1, {'@': 29}), 3: (1, {'@': 29})}, 31: {3: (1, {'@': 30})}, 32: {18: (0, 5), 8: (0, 13), 16: (0, 9)}, 33: {1: (1, {'@': 31}), 2: (1, {'@': 31}), 0: (1, {'@': 31}), 3: (1, {'@': 31})}}, 'start_states': {'start': 28}, 'end_states': {'start': 6}}, 'lexer_conf': {'tokens': [{'@': 0}, {'@': 1}, {'@': 2}, {'@': 3}, {'@': 4}, {'@': 5}, {'@': 6}, {'@': 7}, {'@': 8}, {'@': 9}, {'@': 10}, {'@': 11}], 'ignore': ['WS'], 'g_regex_flags': 0, 'use_bytes': False, '__type__': 'LexerConf'}, 'start': ['start'], '__type__': 'LALR_ContextualLexer'}, 'rules': [{'@': 30}, {'@': 13}, {'@': 18}, {'@': 20}, {'@': 19}, {'@': 22}, {'@': 21}, {'@': 29}, {'@': 14}, {'@': 26}, {'@': 23}, {'@': 27}, {'@': 31}, {'@': 24}, {'@': 25}, {'@': 17}, {'@': 28}, {'@': 16}, {'@': 12}, {'@': 15}], 'options': {'debug': False, 'keep_all_tokens': False, 'tree_class': None, 'cache': False, 'postlex': None, 'parser': 'lalr', 'lexer': 'contextual', 'transformer': None, 'start': ['start'], 'priority': None, 'ambiguity': 'auto', 'regex': False, 'propagate_positions': False, 'lexer_callbacks': {}, 'maybe_placeholders': False, 'edit_terminals': None, 'g_regex_flags': 0, 'use_bytes': False}, '__type__': 'Lark'} ) MEMO = ( {0: {'name': 'ESCAPED_STRING', 'pattern': {'value': '\\".*?(?