|
- ## Lexer Implementation
-
- import re
-
- from .utils import Str, classify, get_regexp_width, Py36, Serialize
- from .exceptions import UnexpectedCharacters, LexError, UnexpectedToken
-
- ###{standalone
- from copy import copy
-
- class Pattern(Serialize):
-
- def __init__(self, value, flags=()):
- self.value = value
- self.flags = frozenset(flags)
-
- def __repr__(self):
- return repr(self.to_regexp())
-
- # Pattern Hashing assumes all subclasses have a different priority!
- def __hash__(self):
- return hash((type(self), self.value, self.flags))
- def __eq__(self, other):
- return type(self) == type(other) and self.value == other.value and self.flags == other.flags
-
- def to_regexp(self):
- raise NotImplementedError()
-
- if Py36:
- # Python 3.6 changed syntax for flags in regular expression
- def _get_flags(self, value):
- for f in self.flags:
- value = ('(?%s:%s)' % (f, value))
- return value
-
- else:
- def _get_flags(self, value):
- for f in self.flags:
- value = ('(?%s)' % f) + value
- return value
-
-
- class PatternStr(Pattern):
- __serialize_fields__ = 'value', 'flags'
-
- type = "str"
-
- def to_regexp(self):
- return self._get_flags(re.escape(self.value))
-
- @property
- def min_width(self):
- return len(self.value)
- max_width = min_width
-
- class PatternRE(Pattern):
- __serialize_fields__ = 'value', 'flags', '_width'
-
- type = "re"
-
- def to_regexp(self):
- return self._get_flags(self.value)
-
- _width = None
- def _get_width(self):
- if self._width is None:
- self._width = get_regexp_width(self.to_regexp())
- return self._width
-
- @property
- def min_width(self):
- return self._get_width()[0]
- @property
- def max_width(self):
- return self._get_width()[1]
-
-
- class TerminalDef(Serialize):
- __serialize_fields__ = 'name', 'pattern', 'priority'
- __serialize_namespace__ = PatternStr, PatternRE
-
- def __init__(self, name, pattern, priority=1):
- assert isinstance(pattern, Pattern), pattern
- self.name = name
- self.pattern = pattern
- self.priority = priority
-
- def __repr__(self):
- return '%s(%r, %r)' % (type(self).__name__, self.name, self.pattern)
-
-
- class Token(Str):
- """A string with meta-information, that is produced by the lexer.
-
- When parsing text, the resulting chunks of the input that haven't been discarded,
- will end up in the tree as Token instances. The Token class inherits from Python's ``str``,
- so normal string comparisons and operations will work as expected.
-
- Attributes:
- type: Name of the token (as specified in grammar)
- value: Value of the token (redundant, as ``token.value == token`` will always be true)
- pos_in_stream: The index of the token in the text
- line: The line of the token in the text (starting with 1)
- column: The column of the token in the text (starting with 1)
- end_line: The line where the token ends
- end_column: The next column after the end of the token. For example,
- if the token is a single character with a column value of 4,
- end_column will be 5.
- end_pos: the index where the token ends (basically ``pos_in_stream + len(token)``)
- """
- __slots__ = ('type', 'pos_in_stream', 'value', 'line', 'column', 'end_line', 'end_column', 'end_pos')
-
- def __new__(cls, type_, value, pos_in_stream=None, line=None, column=None, end_line=None, end_column=None, end_pos=None):
- try:
- self = super(Token, cls).__new__(cls, value)
- except UnicodeDecodeError:
- value = value.decode('latin1')
- self = super(Token, cls).__new__(cls, value)
-
- self.type = type_
- self.pos_in_stream = pos_in_stream
- self.value = value
- self.line = line
- self.column = column
- self.end_line = end_line
- self.end_column = end_column
- self.end_pos = end_pos
- return self
-
- def update(self, type_=None, value=None):
- return Token.new_borrow_pos(
- type_ if type_ is not None else self.type,
- value if value is not None else self.value,
- self
- )
-
- @classmethod
- def new_borrow_pos(cls, type_, value, borrow_t):
- return cls(type_, value, borrow_t.pos_in_stream, borrow_t.line, borrow_t.column, borrow_t.end_line, borrow_t.end_column, borrow_t.end_pos)
-
- def __reduce__(self):
- return (self.__class__, (self.type, self.value, self.pos_in_stream, self.line, self.column, ))
-
- def __repr__(self):
- return 'Token(%r, %r)' % (self.type, self.value)
-
- def __deepcopy__(self, memo):
- return Token(self.type, self.value, self.pos_in_stream, self.line, self.column)
-
- def __eq__(self, other):
- if isinstance(other, Token) and self.type != other.type:
- return False
-
- return Str.__eq__(self, other)
-
- __hash__ = Str.__hash__
-
-
- class LineCounter:
- def __init__(self, newline_char):
- self.newline_char = newline_char
- self.char_pos = 0
- self.line = 1
- self.column = 1
- self.line_start_pos = 0
-
- def feed(self, token, test_newline=True):
- """Consume a token and calculate the new line & column.
-
- As an optional optimization, set test_newline=False is token doesn't contain a newline.
- """
- if test_newline:
- newlines = token.count(self.newline_char)
- if newlines:
- self.line += newlines
- self.line_start_pos = self.char_pos + token.rindex(self.newline_char) + 1
-
- self.char_pos += len(token)
- self.column = self.char_pos - self.line_start_pos + 1
-
- class _Lex:
- "Built to serve both Lexer and ContextualLexer"
- def __init__(self, lexer, state=None):
- self.lexer = lexer
- self.state = state
-
- def lex(self, stream, newline_types, ignore_types):
- newline_types = frozenset(newline_types)
- ignore_types = frozenset(ignore_types)
- line_ctr = LineCounter('\n' if not self.lexer.use_bytes else b'\n')
- last_token = None
-
- while line_ctr.char_pos < len(stream):
- lexer = self.lexer
- res = lexer.match(stream, line_ctr.char_pos)
- if not res:
- allowed = {v for m, tfi in lexer.mres for v in tfi.values()} - ignore_types
- if not allowed:
- allowed = {"<END-OF-FILE>"}
- raise UnexpectedCharacters(stream, line_ctr.char_pos, line_ctr.line, line_ctr.column, allowed=allowed, state=self.state, token_history=last_token and [last_token])
-
- value, type_ = res
-
- if type_ not in ignore_types:
- t = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column)
- line_ctr.feed(value, type_ in newline_types)
- t.end_line = line_ctr.line
- t.end_column = line_ctr.column
- t.end_pos = line_ctr.char_pos
- if t.type in lexer.callback:
- t = lexer.callback[t.type](t)
- if not isinstance(t, Token):
- raise ValueError("Callbacks must return a token (returned %r)" % t)
- yield t
- last_token = t
- else:
- if type_ in lexer.callback:
- t2 = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column)
- lexer.callback[type_](t2)
- line_ctr.feed(value, type_ in newline_types)
-
-
-
-
- class UnlessCallback:
- def __init__(self, mres):
- self.mres = mres
-
- def __call__(self, t):
- for mre, type_from_index in self.mres:
- m = mre.match(t.value)
- if m:
- t.type = type_from_index[m.lastindex]
- break
- return t
-
- class CallChain:
- def __init__(self, callback1, callback2, cond):
- self.callback1 = callback1
- self.callback2 = callback2
- self.cond = cond
-
- def __call__(self, t):
- t2 = self.callback1(t)
- return self.callback2(t) if self.cond(t2) else t2
-
-
-
-
-
- def _create_unless(terminals, g_regex_flags, re_, use_bytes):
- tokens_by_type = classify(terminals, lambda t: type(t.pattern))
- assert len(tokens_by_type) <= 2, tokens_by_type.keys()
- embedded_strs = set()
- callback = {}
- for retok in tokens_by_type.get(PatternRE, []):
- unless = [] # {}
- for strtok in tokens_by_type.get(PatternStr, []):
- if strtok.priority > retok.priority:
- continue
- s = strtok.pattern.value
- m = re_.match(retok.pattern.to_regexp(), s, g_regex_flags)
- if m and m.group(0) == s:
- unless.append(strtok)
- if strtok.pattern.flags <= retok.pattern.flags:
- embedded_strs.add(strtok)
- if unless:
- callback[retok.name] = UnlessCallback(build_mres(unless, g_regex_flags, re_, match_whole=True, use_bytes=use_bytes))
-
- terminals = [t for t in terminals if t not in embedded_strs]
- return terminals, callback
-
-
- def _build_mres(terminals, max_size, g_regex_flags, match_whole, re_, use_bytes):
- # Python sets an unreasonable group limit (currently 100) in its re module
- # Worse, the only way to know we reached it is by catching an AssertionError!
- # This function recursively tries less and less groups until it's successful.
- postfix = '$' if match_whole else ''
- mres = []
- while terminals:
- pattern = u'|'.join(u'(?P<%s>%s)' % (t.name, t.pattern.to_regexp() + postfix) for t in terminals[:max_size])
- if use_bytes:
- pattern = pattern.encode('latin-1')
- try:
- mre = re_.compile(pattern, g_regex_flags)
- except AssertionError: # Yes, this is what Python provides us.. :/
- return _build_mres(terminals, max_size//2, g_regex_flags, match_whole, re_, use_bytes)
-
- # terms_from_name = {t.name: t for t in terminals[:max_size]}
- mres.append((mre, {i:n for n,i in mre.groupindex.items()} ))
- terminals = terminals[max_size:]
- return mres
-
- def build_mres(terminals, g_regex_flags, re_, use_bytes, match_whole=False):
- return _build_mres(terminals, len(terminals), g_regex_flags, match_whole, re_, use_bytes)
-
- def _regexp_has_newline(r):
- r"""Expressions that may indicate newlines in a regexp:
- - newlines (\n)
- - escaped newline (\\n)
- - anything but ([^...])
- - any-char (.) when the flag (?s) exists
- - spaces (\s)
- """
- return '\n' in r or '\\n' in r or '\\s' in r or '[^' in r or ('(?s' in r and '.' in r)
-
- class Lexer(object):
- """Lexer interface
-
- Method Signatures:
- lex(self, stream) -> Iterator[Token]
- """
- lex = NotImplemented
-
-
- class TraditionalLexer(Lexer):
-
- def __init__(self, conf):
- terminals = list(conf.tokens)
- assert all(isinstance(t, TerminalDef) for t in terminals), terminals
-
- self.re = conf.re_module
-
- if not conf.skip_validation:
- # Sanitization
- for t in terminals:
- try:
- self.re.compile(t.pattern.to_regexp(), conf.g_regex_flags)
- except self.re.error:
- raise LexError("Cannot compile token %s: %s" % (t.name, t.pattern))
-
- if t.pattern.min_width == 0:
- raise LexError("Lexer does not allow zero-width terminals. (%s: %s)" % (t.name, t.pattern))
-
- assert set(conf.ignore) <= {t.name for t in terminals}
-
- # Init
- self.newline_types = [t.name for t in terminals if _regexp_has_newline(t.pattern.to_regexp())]
- self.ignore_types = list(conf.ignore)
-
- terminals.sort(key=lambda x:(-x.priority, -x.pattern.max_width, -len(x.pattern.value), x.name))
- self.terminals = terminals
- self.user_callbacks = conf.callbacks
- self.g_regex_flags = conf.g_regex_flags
- self.use_bytes = conf.use_bytes
-
- self._mres = None
- # self.build(g_regex_flags)
-
- def _build(self):
- terminals, self.callback = _create_unless(self.terminals, self.g_regex_flags, re_=self.re, use_bytes=self.use_bytes)
- assert all(self.callback.values())
-
- for type_, f in self.user_callbacks.items():
- if type_ in self.callback:
- # Already a callback there, probably UnlessCallback
- self.callback[type_] = CallChain(self.callback[type_], f, lambda t: t.type == type_)
- else:
- self.callback[type_] = f
-
- self._mres = build_mres(terminals, self.g_regex_flags, self.re, self.use_bytes)
-
- @property
- def mres(self):
- if self._mres is None:
- self._build()
- return self._mres
-
- def match(self, stream, pos):
- for mre, type_from_index in self.mres:
- m = mre.match(stream, pos)
- if m:
- return m.group(0), type_from_index[m.lastindex]
-
- def lex(self, stream):
- return _Lex(self).lex(stream, self.newline_types, self.ignore_types)
-
-
-
-
- class ContextualLexer(Lexer):
-
- def __init__(self, conf, states, always_accept=()):
- terminals = list(conf.tokens)
- tokens_by_name = {}
- for t in terminals:
- assert t.name not in tokens_by_name, t
- tokens_by_name[t.name] = t
-
- trad_conf = copy(conf)
- trad_conf.tokens = terminals
-
- lexer_by_tokens = {}
- self.lexers = {}
- for state, accepts in states.items():
- key = frozenset(accepts)
- try:
- lexer = lexer_by_tokens[key]
- except KeyError:
- accepts = set(accepts) | set(conf.ignore) | set(always_accept)
- state_tokens = [tokens_by_name[n] for n in accepts if n and n in tokens_by_name]
- lexer_conf = copy(trad_conf)
- lexer_conf.tokens = state_tokens
- lexer = TraditionalLexer(lexer_conf)
- lexer_by_tokens[key] = lexer
-
- self.lexers[state] = lexer
-
- assert trad_conf.tokens is terminals
- self.root_lexer = TraditionalLexer(trad_conf)
-
- def lex(self, stream, get_parser_state):
- parser_state = get_parser_state()
- l = _Lex(self.lexers[parser_state], parser_state)
- try:
- for x in l.lex(stream, self.root_lexer.newline_types, self.root_lexer.ignore_types):
- yield x
- parser_state = get_parser_state()
- l.lexer = self.lexers[parser_state]
- l.state = parser_state # For debug only, no need to worry about multithreading
- except UnexpectedCharacters as e:
- # In the contextual lexer, UnexpectedCharacters can mean that the terminal is defined,
- # but not in the current context.
- # This tests the input against the global context, to provide a nicer error.
- root_match = self.root_lexer.match(stream, e.pos_in_stream)
- if not root_match:
- raise
-
- value, type_ = root_match
- t = Token(type_, value, e.pos_in_stream, e.line, e.column)
- raise UnexpectedToken(t, e.allowed, state=e.state)
-
- ###}
|