This repo contains code to mirror other repos. It also contains the code that is getting mirrored.
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

253 rindas
8.5 KiB

  1. ## Lexer Implementation
  2. import re
  3. from .utils import Str, classify
  4. from .common import is_terminal, PatternStr, PatternRE, TokenDef
  5. ###{standalone
  6. class LexError(Exception):
  7. pass
  8. class UnexpectedInput(LexError):
  9. def __init__(self, seq, lex_pos, line, column, allowed=None, considered_rules=None):
  10. context = seq[lex_pos:lex_pos+5]
  11. message = "No token defined for: '%s' in %r at line %d col %d" % (seq[lex_pos], context, line, column)
  12. if allowed:
  13. message += '\n\nExpecting: %s\n' % allowed
  14. super(UnexpectedInput, self).__init__(message)
  15. self.line = line
  16. self.column = column
  17. self.context = context
  18. self.allowed = allowed
  19. self.considered_rules = considered_rules
  20. class Token(Str):
  21. def __new__(cls, type_, value, pos_in_stream=None, line=None, column=None):
  22. self = super(Token, cls).__new__(cls, value)
  23. self.type = type_
  24. self.pos_in_stream = pos_in_stream
  25. self.value = value
  26. self.line = line
  27. self.column = column
  28. return self
  29. @classmethod
  30. def new_borrow_pos(cls, type_, value, borrow_t):
  31. return cls(type_, value, borrow_t.pos_in_stream, line=borrow_t.line, column=borrow_t.column)
  32. def __reduce__(self):
  33. return (self.__class__, (self.type, self.pos_in_stream, self.value, self.line, self.column, ))
  34. def __repr__(self):
  35. return 'Token(%s, %r)' % (self.type, self.value)
  36. def __deepcopy__(self, memo):
  37. return Token(self.type, self.value, self.pos_in_stream, self.line, self.column)
  38. def __eq__(self, other):
  39. if isinstance(other, Token) and self.type != other.type:
  40. return False
  41. return Str.__eq__(self, other)
  42. __hash__ = Str.__hash__
  43. class LineCounter:
  44. def __init__(self):
  45. self.newline_char = '\n'
  46. self.char_pos = 0
  47. self.line = 1
  48. self.column = 0
  49. self.line_start_pos = 0
  50. def feed(self, token, test_newline=True):
  51. """Consume a token and calculate the new line & column.
  52. As an optional optimization, set test_newline=False is token doesn't contain a newline.
  53. """
  54. if test_newline:
  55. newlines = token.count(self.newline_char)
  56. if newlines:
  57. self.line += newlines
  58. self.line_start_pos = self.char_pos + token.rindex(self.newline_char) + 1
  59. self.char_pos += len(token)
  60. self.column = self.char_pos - self.line_start_pos
  61. class _Lex:
  62. "Built to serve both Lexer and ContextualLexer"
  63. def __init__(self, lexer):
  64. self.lexer = lexer
  65. def lex(self, stream, newline_types, ignore_types):
  66. newline_types = list(newline_types)
  67. ignore_types = list(ignore_types)
  68. line_ctr = LineCounter()
  69. t = None
  70. while True:
  71. lexer = self.lexer
  72. for mre, type_from_index in lexer.mres:
  73. m = mre.match(stream, line_ctr.char_pos)
  74. if m:
  75. value = m.group(0)
  76. type_ = type_from_index[m.lastindex]
  77. if type_ not in ignore_types:
  78. t = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column)
  79. if t.type in lexer.callback:
  80. t = lexer.callback[t.type](t)
  81. yield t
  82. else:
  83. if type_ in lexer.callback:
  84. t = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column)
  85. lexer.callback[type_](t)
  86. line_ctr.feed(value, type_ in newline_types)
  87. if t:
  88. t.end_line = line_ctr.line
  89. t.end_column = line_ctr.column
  90. break
  91. else:
  92. if line_ctr.char_pos < len(stream):
  93. raise UnexpectedInput(stream, line_ctr.char_pos, line_ctr.line, line_ctr.column)
  94. break
  95. class UnlessCallback:
  96. def __init__(self, mres):
  97. self.mres = mres
  98. def __call__(self, t):
  99. for mre, type_from_index in self.mres:
  100. m = mre.match(t.value)
  101. if m:
  102. value = m.group(0)
  103. t.type = type_from_index[m.lastindex]
  104. break
  105. return t
  106. ###}
  107. def _create_unless(tokens):
  108. tokens_by_type = classify(tokens, lambda t: type(t.pattern))
  109. assert len(tokens_by_type) <= 2, tokens_by_type.keys()
  110. embedded_strs = set()
  111. callback = {}
  112. for retok in tokens_by_type.get(PatternRE, []):
  113. unless = [] # {}
  114. for strtok in tokens_by_type.get(PatternStr, []):
  115. s = strtok.pattern.value
  116. m = re.match(retok.pattern.to_regexp(), s)
  117. if m and m.group(0) == s:
  118. unless.append(strtok)
  119. if strtok.pattern.flags <= retok.pattern.flags:
  120. embedded_strs.add(strtok)
  121. if unless:
  122. callback[retok.name] = UnlessCallback(build_mres(unless, match_whole=True))
  123. tokens = [t for t in tokens if t not in embedded_strs]
  124. return tokens, callback
  125. def _build_mres(tokens, max_size, match_whole):
  126. # Python sets an unreasonable group limit (currently 100) in its re module
  127. # Worse, the only way to know we reached it is by catching an AssertionError!
  128. # This function recursively tries less and less groups until it's successful.
  129. postfix = '$' if match_whole else ''
  130. mres = []
  131. while tokens:
  132. try:
  133. mre = re.compile(u'|'.join(u'(?P<%s>%s)'%(t.name, t.pattern.to_regexp()+postfix) for t in tokens[:max_size]))
  134. except AssertionError: # Yes, this is what Python provides us.. :/
  135. return _build_mres(tokens, max_size//2, match_whole)
  136. mres.append((mre, {i:n for n,i in mre.groupindex.items()} ))
  137. tokens = tokens[max_size:]
  138. return mres
  139. def build_mres(tokens, match_whole=False):
  140. return _build_mres(tokens, len(tokens), match_whole)
  141. def _regexp_has_newline(r):
  142. return '\n' in r or '\\n' in r or ('(?s)' in r and '.' in r)
  143. class Lexer:
  144. def __init__(self, tokens, ignore=(), user_callbacks={}):
  145. assert all(isinstance(t, TokenDef) for t in tokens), tokens
  146. tokens = list(tokens)
  147. # Sanitization
  148. for t in tokens:
  149. try:
  150. re.compile(t.pattern.to_regexp())
  151. except:
  152. raise LexError("Cannot compile token %s: %s" % (t.name, t.pattern))
  153. if t.pattern.min_width == 0:
  154. raise LexError("Lexer does not allow zero-width tokens. (%s: %s)" % (t.name, t.pattern))
  155. assert set(ignore) <= {t.name for t in tokens}
  156. # Init
  157. self.newline_types = [t.name for t in tokens if _regexp_has_newline(t.pattern.to_regexp())]
  158. self.ignore_types = list(ignore)
  159. tokens.sort(key=lambda x:(-x.priority, -x.pattern.max_width, -len(x.pattern.value), x.name))
  160. tokens, self.callback = _create_unless(tokens)
  161. assert all(self.callback.values())
  162. for type_, f in user_callbacks.items():
  163. assert type_ not in self.callback
  164. self.callback[type_] = f
  165. self.tokens = tokens
  166. self.mres = build_mres(tokens)
  167. def lex(self, stream):
  168. return _Lex(self).lex(stream, self.newline_types, self.ignore_types)
  169. class ContextualLexer:
  170. def __init__(self, tokens, states, ignore=(), always_accept=(), user_callbacks={}):
  171. tokens_by_name = {}
  172. for t in tokens:
  173. assert t.name not in tokens_by_name, t
  174. tokens_by_name[t.name] = t
  175. lexer_by_tokens = {}
  176. self.lexers = {}
  177. for state, accepts in states.items():
  178. key = frozenset(accepts)
  179. try:
  180. lexer = lexer_by_tokens[key]
  181. except KeyError:
  182. accepts = set(accepts) | set(ignore) | set(always_accept)
  183. state_tokens = [tokens_by_name[n] for n in accepts if is_terminal(n) and n!='$END']
  184. lexer = Lexer(state_tokens, ignore=ignore, user_callbacks=user_callbacks)
  185. lexer_by_tokens[key] = lexer
  186. self.lexers[state] = lexer
  187. self.root_lexer = Lexer(tokens, ignore=ignore, user_callbacks=user_callbacks)
  188. self.set_parser_state(None) # Needs to be set on the outside
  189. def set_parser_state(self, state):
  190. self.parser_state = state
  191. def lex(self, stream):
  192. l = _Lex(self.lexers[self.parser_state])
  193. for x in l.lex(stream, self.root_lexer.newline_types, self.root_lexer.ignore_types):
  194. yield x
  195. l.lexer = self.lexers[self.parser_state]