This repo contains code to mirror other repos. It also contains the code that is getting mirrored.
Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 

227 řádky
7.9 KiB

  1. ## Lexer Implementation
  2. import re
  3. import sre_parse
  4. from .utils import Str, classify
  5. from .common import is_terminal, PatternStr, PatternRE, TokenDef
  6. class LexError(Exception):
  7. pass
  8. class UnexpectedInput(LexError):
  9. def __init__(self, seq, lex_pos, line, column):
  10. context = seq[lex_pos:lex_pos+5]
  11. message = "No token defined for: '%s' in %r at line %d" % (seq[lex_pos], context, line)
  12. super(UnexpectedInput, self).__init__(message)
  13. self.line = line
  14. self.column = column
  15. self.context = context
  16. class Token(Str):
  17. def __new__(cls, type_, value, pos_in_stream=None, line=None, column=None):
  18. inst = Str.__new__(cls, value)
  19. inst.type = type_
  20. inst.pos_in_stream = pos_in_stream
  21. inst.value = value
  22. inst.line = line
  23. inst.column = column
  24. return inst
  25. @classmethod
  26. def new_borrow_pos(cls, type_, value, borrow_t):
  27. inst = cls(type_, value, borrow_t.pos_in_stream)
  28. inst.line = borrow_t.line
  29. inst.column = borrow_t.column
  30. return inst
  31. def __repr__(self):
  32. return 'Token(%s, %r)' % (self.type, self.value)
  33. def __deepcopy__(self, memo):
  34. return Token(self.type, self.value, self.pos_in_stream, self.line, self.column)
  35. class Regex:
  36. def __init__(self, pattern, flags=()):
  37. self.pattern = pattern
  38. self.flags = flags
  39. def _regexp_has_newline(r):
  40. return '\n' in r or '\\n' in r or ('(?s)' in r and '.' in r)
  41. def _create_unless_callback(strs):
  42. def unless_callback(t):
  43. if t in strs:
  44. t.type = strs[t]
  45. return t
  46. return unless_callback
  47. def _create_unless(tokens):
  48. tokens_by_type = classify(tokens, lambda t: type(t.pattern))
  49. assert len(tokens_by_type) <= 2, tokens_by_type.keys()
  50. embedded_strs = set()
  51. callback = {}
  52. for retok in tokens_by_type.get(PatternRE, []):
  53. unless = {}
  54. for strtok in tokens_by_type.get(PatternStr, []):
  55. s = strtok.pattern.value
  56. m = re.match(retok.pattern.value, s)
  57. if m and m.group(0) == s:
  58. embedded_strs.add(strtok.name)
  59. unless[s] = strtok.name
  60. if unless:
  61. callback[retok.name] = _create_unless_callback(unless)
  62. tokens = [t for t in tokens if t.name not in embedded_strs]
  63. return tokens, callback
  64. class Lexer(object):
  65. def __init__(self, tokens, ignore=()):
  66. assert all(isinstance(t, TokenDef) for t in tokens), tokens
  67. self.ignore = ignore
  68. self.newline_char = '\n'
  69. tokens = list(tokens)
  70. # Sanitization
  71. for t in tokens:
  72. try:
  73. re.compile(t.pattern.to_regexp())
  74. except:
  75. raise LexError("Cannot compile token: %s: %s" % (t.name, t.pattern))
  76. width = sre_parse.parse(t.pattern.to_regexp()).getwidth()
  77. if width[0] == 0:
  78. raise LexError("Lexer does not allow zero-width tokens. (%s: %s)" % (t.name, t.pattern))
  79. token_names = {t.name for t in tokens}
  80. for t in ignore:
  81. if t not in token_names:
  82. raise LexError("Token '%s' was marked to ignore but it is not defined!" % t)
  83. # Init
  84. self.newline_types = [t.name for t in tokens if _regexp_has_newline(t.pattern.to_regexp())]
  85. self.ignore_types = [t for t in ignore]
  86. tokens, self.callback = _create_unless(tokens)
  87. assert all(self.callback.values())
  88. tokens.sort(key=lambda x:(x.pattern.priority, len(x.pattern.value)), reverse=True)
  89. self.tokens = tokens
  90. self.mres = self._build_mres(tokens, len(tokens))
  91. def _build_mres(self, tokens, max_size):
  92. # Python sets an unreasonable group limit (currently 100) in its re module
  93. # Worse, the only way to know we reached it is by catching an AssertionError!
  94. # This function recursively tries less and less groups until it's successful.
  95. mres = []
  96. while tokens:
  97. try:
  98. mre = re.compile(u'|'.join(u'(?P<%s>%s)'%(t.name, t.pattern.to_regexp()) for t in tokens[:max_size]))
  99. except AssertionError: # Yes, this is what Python provides us.. :/
  100. return self._build_mres(tokens, max_size//2)
  101. mres.append((mre, {i:n for n,i in mre.groupindex.items()} ))
  102. tokens = tokens[max_size:]
  103. return mres
  104. def lex(self, stream):
  105. lex_pos = 0
  106. line = 1
  107. col_start_pos = 0
  108. newline_types = list(self.newline_types)
  109. ignore_types = list(self.ignore_types)
  110. while True:
  111. for mre, type_from_index in self.mres:
  112. m = mre.match(stream, lex_pos)
  113. if m:
  114. value = m.group(0)
  115. type_ = type_from_index[m.lastindex]
  116. if type_ not in ignore_types:
  117. t = Token(type_, value, lex_pos, line, lex_pos - col_start_pos)
  118. if t.type in self.callback:
  119. t = self.callback[t.type](t)
  120. yield t
  121. if type_ in newline_types:
  122. newlines = value.count(self.newline_char)
  123. if newlines:
  124. line += newlines
  125. col_start_pos = lex_pos + value.rindex(self.newline_char)
  126. lex_pos += len(value)
  127. break
  128. else:
  129. if lex_pos < len(stream):
  130. raise UnexpectedInput(stream, lex_pos, line, lex_pos - col_start_pos)
  131. break
  132. class ContextualLexer:
  133. def __init__(self, tokens, states, ignore=(), always_accept=()):
  134. tokens_by_name = {}
  135. for t in tokens:
  136. assert t.name not in tokens_by_name, t
  137. tokens_by_name[t.name] = t
  138. lexer_by_tokens = {}
  139. self.lexers = {}
  140. for state, accepts in states.items():
  141. key = frozenset(accepts)
  142. try:
  143. lexer = lexer_by_tokens[key]
  144. except KeyError:
  145. accepts = set(accepts) # For python3
  146. accepts |= set(ignore)
  147. accepts |= set(always_accept)
  148. state_tokens = [tokens_by_name[n] for n in accepts if is_terminal(n) and n!='$end']
  149. lexer = Lexer(state_tokens, ignore=ignore)
  150. lexer_by_tokens[key] = lexer
  151. self.lexers[state] = lexer
  152. self.root_lexer = Lexer(tokens, ignore=ignore)
  153. self.set_parser_state(None) # Needs to be set on the outside
  154. def set_parser_state(self, state):
  155. self.parser_state = state
  156. def lex(self, stream):
  157. lex_pos = 0
  158. line = 1
  159. col_start_pos = 0
  160. newline_types = list(self.root_lexer.newline_types)
  161. ignore_types = list(self.root_lexer.ignore_types)
  162. while True:
  163. lexer = self.lexers[self.parser_state]
  164. for mre, type_from_index in lexer.mres:
  165. m = mre.match(stream, lex_pos)
  166. if m:
  167. value = m.group(0)
  168. type_ = type_from_index[m.lastindex]
  169. if type_ not in ignore_types:
  170. t = Token(type_, value, lex_pos, line, lex_pos - col_start_pos)
  171. if t.type in lexer.callback:
  172. t = lexer.callback[t.type](t)
  173. yield t
  174. if type_ in newline_types:
  175. newlines = value.count(lexer.newline_char)
  176. if newlines:
  177. line += newlines
  178. col_start_pos = lex_pos + value.rindex(lexer.newline_char)
  179. lex_pos += len(value)
  180. break
  181. else:
  182. if lex_pos < len(stream):
  183. print("Allowed tokens:", lexer.tokens)
  184. raise UnexpectedInput(stream, lex_pos, line, lex_pos - col_start_pos)
  185. break