This repo contains code to mirror other repos. It also contains the code that is getting mirrored.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
7.9 KiB

  1. ## Lexer Implementation
  2. import re
  3. from .utils import Str, classify, STRING_TYPE
  4. from .common import is_terminal
  5. class LexError(Exception):
  6. pass
  7. class TokenDef(object):
  8. def __init__(self, name, value):
  9. assert isinstance(value, STRING_TYPE), value
  10. self.name = name
  11. self.value = value
  12. def __repr__(self):
  13. return '%s(%r, %r)' % (type(self).__name__, self.name, self.value)
  14. class TokenDef__Str(TokenDef):
  15. def to_regexp(self):
  16. return re.escape(self.value)
  17. priority = 0
  18. class TokenDef__Regexp(TokenDef):
  19. def to_regexp(self):
  20. return self.value
  21. priority = 1
  22. class UnexpectedInput(LexError):
  23. def __init__(self, seq, lex_pos, line, column):
  24. context = seq[lex_pos:lex_pos+5]
  25. message = "No token defined for: '%s' in %r at line %d" % (seq[lex_pos], context, line)
  26. super(LexError, self).__init__(message)
  27. self.line = line
  28. self.column = column
  29. self.context = context
  30. class Token(Str):
  31. def __new__(cls, type_, value, pos_in_stream=None):
  32. inst = Str.__new__(cls, value)
  33. inst.type = type_
  34. inst.pos_in_stream = pos_in_stream
  35. inst.value = value
  36. return inst
  37. @classmethod
  38. def new_borrow_pos(cls, type_, value, borrow_t):
  39. inst = cls(type_, value, borrow_t.pos_in_stream)
  40. inst.line = borrow_t.line
  41. inst.column = borrow_t.column
  42. return inst
  43. def __repr__(self):
  44. return 'Token(%s, %r)' % (self.type, self.value)
  45. class Regex:
  46. def __init__(self, pattern, flags=()):
  47. self.pattern = pattern
  48. self.flags = flags
  49. def _regexp_has_newline(r):
  50. return '\n' in r or '\\n' in r or ('(?s)' in r and '.' in r)
  51. def _create_unless_callback(strs):
  52. def unless_callback(t):
  53. if t in strs:
  54. t.type = strs[t]
  55. return t
  56. return unless_callback
  57. def _create_unless(tokens):
  58. tokens_by_type = classify(tokens, type)
  59. assert len(tokens_by_type) <= 2, tokens_by_type.keys()
  60. embedded_strs = set()
  61. callback = {}
  62. for retok in tokens_by_type.get(TokenDef__Regexp, []):
  63. unless = {}
  64. for strtok in tokens_by_type.get(TokenDef__Str, []):
  65. m = re.match(retok.value, strtok.value)
  66. if m and m.group(0) == strtok.value:
  67. embedded_strs.add(strtok.name)
  68. unless[strtok.value] = strtok.name
  69. if unless:
  70. callback[retok.name] = _create_unless_callback(unless)
  71. tokens = [t for t in tokens if t.name not in embedded_strs]
  72. return tokens, callback
  73. class Lexer(object):
  74. def __init__(self, tokens, ignore=()):
  75. assert all(isinstance(t, TokenDef) for t in tokens), tokens
  76. self.ignore = ignore
  77. self.newline_char = '\n'
  78. tokens = list(tokens)
  79. # Sanitization
  80. for t in tokens:
  81. try:
  82. re.compile(t.to_regexp())
  83. except:
  84. raise LexError("Cannot compile token: %s: %s" % t)
  85. token_names = {t.name for t in tokens}
  86. assert all(t in token_names for t in ignore)
  87. # Init
  88. self.newline_types = [t.name for t in tokens if _regexp_has_newline(t.to_regexp())]
  89. self.ignore_types = [t for t in ignore]
  90. tokens, self.callback = _create_unless(tokens)
  91. assert all(self.callback.values())
  92. tokens.sort(key=lambda x:(x.priority, len(x.value)), reverse=True)
  93. self.tokens = tokens
  94. self.mres = self._build_mres(tokens, len(tokens))
  95. def _build_mres(self, tokens, max_size):
  96. # Python sets an unreasonable group limit (currently 100) in its re module
  97. # Worse, the only way to know we reached it is by catching an AssertionError!
  98. # This function recursively tries less and less groups until it's successful.
  99. mres = []
  100. while tokens:
  101. try:
  102. mre = re.compile(u'|'.join(u'(?P<%s>%s)'%(t.name, t.to_regexp()) for t in tokens[:max_size]))
  103. except AssertionError: # Yes, this is what Python provides us.. :/
  104. return self._build_mres(tokens, max_size//2)
  105. mres.append((mre, {i:n for n,i in mre.groupindex.items()} ))
  106. tokens = tokens[max_size:]
  107. return mres
  108. def lex(self, stream):
  109. lex_pos = 0
  110. line = 1
  111. col_start_pos = 0
  112. newline_types = list(self.newline_types)
  113. ignore_types = list(self.ignore_types)
  114. while True:
  115. for mre, type_from_index in self.mres:
  116. m = mre.match(stream, lex_pos)
  117. if m:
  118. value = m.group(0)
  119. type_ = type_from_index[m.lastindex]
  120. if type_ not in ignore_types:
  121. t = Token(type_, value, lex_pos)
  122. t.line = line
  123. t.column = lex_pos - col_start_pos
  124. if t.type in self.callback:
  125. t = self.callback[t.type](t)
  126. yield t
  127. if type_ in newline_types:
  128. newlines = value.count(self.newline_char)
  129. if newlines:
  130. line += newlines
  131. col_start_pos = lex_pos + value.rindex(self.newline_char)
  132. lex_pos += len(value)
  133. break
  134. else:
  135. if lex_pos < len(stream):
  136. raise UnexpectedInput(stream, lex_pos, line, lex_pos - col_start_pos)
  137. break
  138. class ContextualLexer:
  139. def __init__(self, tokens, states, ignore=(), always_accept=()):
  140. tokens_by_name = {}
  141. for t in tokens:
  142. assert t.name not in tokens_by_name, t
  143. tokens_by_name[t.name] = t
  144. lexer_by_tokens = {}
  145. self.lexers = {}
  146. for state, accepts in states.items():
  147. key = frozenset(accepts)
  148. try:
  149. lexer = lexer_by_tokens[key]
  150. except KeyError:
  151. accepts = set(accepts) # For python3
  152. accepts |= set(ignore)
  153. accepts |= set(always_accept)
  154. state_tokens = [tokens_by_name[n] for n in accepts if is_terminal(n) and n!='$end']
  155. lexer = Lexer(state_tokens, ignore=ignore)
  156. lexer_by_tokens[key] = lexer
  157. self.lexers[state] = lexer
  158. self.root_lexer = Lexer(tokens, ignore=ignore)
  159. self.set_parser_state(None) # Needs to be set on the outside
  160. def set_parser_state(self, state):
  161. self.parser_state = state
  162. def lex(self, stream):
  163. lex_pos = 0
  164. line = 1
  165. col_start_pos = 0
  166. newline_types = list(self.root_lexer.newline_types)
  167. ignore_types = list(self.root_lexer.ignore_types)
  168. while True:
  169. lexer = self.lexers[self.parser_state]
  170. for mre, type_from_index in lexer.mres:
  171. m = mre.match(stream, lex_pos)
  172. if m:
  173. value = m.group(0)
  174. type_ = type_from_index[m.lastindex]
  175. if type_ not in ignore_types:
  176. t = Token(type_, value, lex_pos)
  177. t.line = line
  178. t.column = lex_pos - col_start_pos
  179. if t.type in lexer.callback:
  180. t = lexer.callback[t.type](t)
  181. yield t
  182. if type_ in newline_types:
  183. newlines = value.count(lexer.newline_char)
  184. if newlines:
  185. line += newlines
  186. col_start_pos = lex_pos + value.rindex(lexer.newline_char)
  187. lex_pos += len(value)
  188. break
  189. else:
  190. if lex_pos < len(stream):
  191. print("Allowed tokens:", lexer.tokens)
  192. raise UnexpectedInput(stream, lex_pos, line, lex_pos - col_start_pos)
  193. break