|
|
@@ -2,7 +2,7 @@ |
|
|
|
|
|
|
|
import re |
|
|
|
|
|
|
|
from .utils import Str, classify, get_regexp_width, Py36, Serialize |
|
|
|
from .utils import Str, classify, get_regexp_width, Py36, Serialize, suppress |
|
|
|
from .exceptions import UnexpectedCharacters, LexError, UnexpectedToken |
|
|
|
|
|
|
|
###{standalone |
|
|
@@ -178,49 +178,6 @@ class LineCounter: |
|
|
|
self.char_pos += len(token) |
|
|
|
self.column = self.char_pos - self.line_start_pos + 1 |
|
|
|
|
|
|
|
class _Lex: |
|
|
|
"Built to serve both Lexer and ContextualLexer" |
|
|
|
def __init__(self, lexer, state=None): |
|
|
|
self.lexer = lexer |
|
|
|
self.state = state |
|
|
|
|
|
|
|
def lex(self, stream, newline_types, ignore_types): |
|
|
|
newline_types = frozenset(newline_types) |
|
|
|
ignore_types = frozenset(ignore_types) |
|
|
|
line_ctr = LineCounter('\n' if not self.lexer.use_bytes else b'\n') |
|
|
|
last_token = None |
|
|
|
|
|
|
|
while line_ctr.char_pos < len(stream): |
|
|
|
lexer = self.lexer |
|
|
|
res = lexer.match(stream, line_ctr.char_pos) |
|
|
|
if not res: |
|
|
|
allowed = {v for m, tfi in lexer.mres for v in tfi.values()} - ignore_types |
|
|
|
if not allowed: |
|
|
|
allowed = {"<END-OF-FILE>"} |
|
|
|
raise UnexpectedCharacters(stream, line_ctr.char_pos, line_ctr.line, line_ctr.column, allowed=allowed, state=self.state, token_history=last_token and [last_token]) |
|
|
|
|
|
|
|
value, type_ = res |
|
|
|
|
|
|
|
if type_ not in ignore_types: |
|
|
|
t = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column) |
|
|
|
line_ctr.feed(value, type_ in newline_types) |
|
|
|
t.end_line = line_ctr.line |
|
|
|
t.end_column = line_ctr.column |
|
|
|
t.end_pos = line_ctr.char_pos |
|
|
|
if t.type in lexer.callback: |
|
|
|
t = lexer.callback[t.type](t) |
|
|
|
if not isinstance(t, Token): |
|
|
|
raise ValueError("Callbacks must return a token (returned %r)" % t) |
|
|
|
yield t |
|
|
|
last_token = t |
|
|
|
else: |
|
|
|
if type_ in lexer.callback: |
|
|
|
t2 = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column) |
|
|
|
lexer.callback[type_](t2) |
|
|
|
line_ctr.feed(value, type_ in newline_types) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UnlessCallback: |
|
|
|
def __init__(self, mres): |
|
|
@@ -308,7 +265,7 @@ class Lexer(object): |
|
|
|
"""Lexer interface |
|
|
|
|
|
|
|
Method Signatures: |
|
|
|
lex(self, stream) -> Iterator[Token] |
|
|
|
lex(self, text) -> Iterator[Token] |
|
|
|
""" |
|
|
|
lex = NotImplemented |
|
|
|
|
|
|
@@ -335,8 +292,8 @@ class TraditionalLexer(Lexer): |
|
|
|
assert set(conf.ignore) <= {t.name for t in terminals} |
|
|
|
|
|
|
|
# Init |
|
|
|
self.newline_types = [t.name for t in terminals if _regexp_has_newline(t.pattern.to_regexp())] |
|
|
|
self.ignore_types = list(conf.ignore) |
|
|
|
self.newline_types = frozenset(t.name for t in terminals if _regexp_has_newline(t.pattern.to_regexp())) |
|
|
|
self.ignore_types = frozenset(conf.ignore) |
|
|
|
|
|
|
|
terminals.sort(key=lambda x:(-x.priority, -x.pattern.max_width, -len(x.pattern.value), x.name)) |
|
|
|
self.terminals = terminals |
|
|
@@ -345,7 +302,6 @@ class TraditionalLexer(Lexer): |
|
|
|
self.use_bytes = conf.use_bytes |
|
|
|
|
|
|
|
self._mres = None |
|
|
|
# self.build(g_regex_flags) |
|
|
|
|
|
|
|
def _build(self): |
|
|
|
terminals, self.callback = _create_unless(self.terminals, self.g_regex_flags, re_=self.re, use_bytes=self.use_bytes) |
|
|
@@ -366,16 +322,61 @@ class TraditionalLexer(Lexer): |
|
|
|
self._build() |
|
|
|
return self._mres |
|
|
|
|
|
|
|
def match(self, stream, pos): |
|
|
|
def match(self, text, pos): |
|
|
|
for mre, type_from_index in self.mres: |
|
|
|
m = mre.match(stream, pos) |
|
|
|
m = mre.match(text, pos) |
|
|
|
if m: |
|
|
|
return m.group(0), type_from_index[m.lastindex] |
|
|
|
|
|
|
|
def lex(self, stream): |
|
|
|
return _Lex(self).lex(stream, self.newline_types, self.ignore_types) |
|
|
|
def make_lexer_state(self, text): |
|
|
|
line_ctr = LineCounter('\n' if not self.use_bytes else b'\n') |
|
|
|
return LexerState(text, line_ctr) |
|
|
|
|
|
|
|
def lex(self, text): |
|
|
|
state = self.make_lexer_state(text) |
|
|
|
with suppress(EOFError): |
|
|
|
while True: |
|
|
|
yield self.next_token(state) |
|
|
|
|
|
|
|
def next_token(self, lex_state): |
|
|
|
text = lex_state.text |
|
|
|
line_ctr = lex_state.line_ctr |
|
|
|
while line_ctr.char_pos < len(text): |
|
|
|
res = self.match(text, line_ctr.char_pos) |
|
|
|
if not res: |
|
|
|
allowed = {v for m, tfi in self.mres for v in tfi.values()} - self.ignore_types |
|
|
|
if not allowed: |
|
|
|
allowed = {"<END-OF-FILE>"} |
|
|
|
raise UnexpectedCharacters(text, line_ctr.char_pos, line_ctr.line, line_ctr.column, allowed=allowed, token_history=lex_state.last_token and [lex_state.last_token]) |
|
|
|
|
|
|
|
value, type_ = res |
|
|
|
|
|
|
|
if type_ not in self.ignore_types: |
|
|
|
t = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column) |
|
|
|
line_ctr.feed(value, type_ in self.newline_types) |
|
|
|
t.end_line = line_ctr.line |
|
|
|
t.end_column = line_ctr.column |
|
|
|
t.end_pos = line_ctr.char_pos |
|
|
|
if t.type in self.callback: |
|
|
|
t = self.callback[t.type](t) |
|
|
|
if not isinstance(t, Token): |
|
|
|
raise ValueError("Callbacks must return a token (returned %r)" % t) |
|
|
|
lex_state.last_token = t |
|
|
|
return t |
|
|
|
else: |
|
|
|
if type_ in self.callback: |
|
|
|
t2 = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column) |
|
|
|
self.callback[type_](t2) |
|
|
|
line_ctr.feed(value, type_ in self.newline_types) |
|
|
|
|
|
|
|
# EOF |
|
|
|
raise EOFError(self) |
|
|
|
|
|
|
|
class LexerState: |
|
|
|
def __init__(self, text, line_ctr, last_token=None): |
|
|
|
self.text = text |
|
|
|
self.line_ctr = line_ctr |
|
|
|
self.last_token = last_token |
|
|
|
|
|
|
|
|
|
|
|
class ContextualLexer(Lexer): |
|
|
@@ -409,25 +410,24 @@ class ContextualLexer(Lexer): |
|
|
|
assert trad_conf.tokens is terminals |
|
|
|
self.root_lexer = TraditionalLexer(trad_conf) |
|
|
|
|
|
|
|
def lex(self, stream, get_parser_state): |
|
|
|
parser_state = get_parser_state() |
|
|
|
l = _Lex(self.lexers[parser_state], parser_state) |
|
|
|
def lex(self, text, get_parser_state): |
|
|
|
state = self.root_lexer.make_lexer_state(text) |
|
|
|
try: |
|
|
|
for x in l.lex(stream, self.root_lexer.newline_types, self.root_lexer.ignore_types): |
|
|
|
yield x |
|
|
|
parser_state = get_parser_state() |
|
|
|
l.lexer = self.lexers[parser_state] |
|
|
|
l.state = parser_state # For debug only, no need to worry about multithreading |
|
|
|
while True: |
|
|
|
lexer = self.lexers[get_parser_state()] |
|
|
|
yield lexer.next_token(state) |
|
|
|
except EOFError: |
|
|
|
pass |
|
|
|
except UnexpectedCharacters as e: |
|
|
|
# In the contextual lexer, UnexpectedCharacters can mean that the terminal is defined, |
|
|
|
# but not in the current context. |
|
|
|
# This tests the input against the global context, to provide a nicer error. |
|
|
|
root_match = self.root_lexer.match(stream, e.pos_in_stream) |
|
|
|
root_match = self.root_lexer.match(text, e.pos_in_stream) |
|
|
|
if not root_match: |
|
|
|
raise |
|
|
|
|
|
|
|
value, type_ = root_match |
|
|
|
t = Token(type_, value, e.pos_in_stream, e.line, e.column) |
|
|
|
raise UnexpectedToken(t, e.allowed, state=e.state) |
|
|
|
raise UnexpectedToken(t, e.allowed, state=get_parser_state()) |
|
|
|
|
|
|
|
###} |