This repo contains code to mirror other repos. It also contains the code that is getting mirrored.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

172 lines
5.3 KiB

  1. ## Lexer Implementation
  2. import re
  3. from .utils import Str, classify
  4. class LexError(Exception):
  5. pass
  6. class TokenDef(object):
  7. def __init__(self, name, value):
  8. self.name = name
  9. self.value = value
  10. def __repr__(self):
  11. return '%s(%r, %r)' % (type(self).__name__, self.name, self.value)
  12. class TokenDef__Str(TokenDef):
  13. def to_regexp(self):
  14. return re.escape(self.value)
  15. priority = 0
  16. class TokenDef__Regexp(TokenDef):
  17. def to_regexp(self):
  18. return self.value
  19. priority = 1
  20. class UnexpectedInput(LexError):
  21. def __init__(self, seq, lex_pos, line, column):
  22. context = seq[lex_pos:lex_pos+5]
  23. message = "No token defined for: '%s' in %r at line %d" % (seq[lex_pos], context, line)
  24. super(LexError, self).__init__(message)
  25. self.line = line
  26. self.column = column
  27. self.context = context
  28. class Token(Str):
  29. def __new__(cls, type_, value, pos_in_stream=None):
  30. inst = Str.__new__(cls, value)
  31. inst.type = type_
  32. inst.pos_in_stream = pos_in_stream
  33. inst.value = value
  34. return inst
  35. @classmethod
  36. def new_borrow_pos(cls, type_, value, borrow_t):
  37. inst = cls(type_, value, borrow_t.pos_in_stream)
  38. inst.line = borrow_t.line
  39. inst.column = borrow_t.column
  40. return inst
  41. def __repr__(self):
  42. return 'Token(%s, %r)' % (self.type, self.value)
  43. class Regex:
  44. def __init__(self, pattern, flags=()):
  45. self.pattern = pattern
  46. self.flags = flags
  47. def _regexp_has_newline(r):
  48. return '\n' in r or '\\n' in r or ('(?s)' in r and '.' in r)
  49. def _create_unless_callback(strs):
  50. def unless_callback(t):
  51. if t in strs:
  52. t.type = strs[t]
  53. return t
  54. return unless_callback
  55. def _create_unless(tokens):
  56. tokens_by_type = classify(tokens, type)
  57. assert len(tokens_by_type) <= 2, tokens_by_type.keys()
  58. embedded_strs = set()
  59. callback = {}
  60. for retok in tokens_by_type.get(TokenDef__Regexp, []):
  61. unless = {}
  62. for strtok in tokens_by_type.get(TokenDef__Str, []):
  63. m = re.match(retok.value, strtok.value)
  64. if m and m.group(0) == strtok.value:
  65. embedded_strs.add(strtok.name)
  66. unless[strtok.value] = strtok.name
  67. if unless:
  68. callback[retok.name] = _create_unless_callback(unless)
  69. tokens = [t for t in tokens if t.name not in embedded_strs]
  70. return tokens, callback
  71. class Lexer(object):
  72. def __init__(self, tokens, ignore=()):
  73. assert all(isinstance(t, TokenDef) for t in tokens)
  74. self.ignore = ignore
  75. self.newline_char = '\n'
  76. tokens = list(tokens)
  77. # Sanitization
  78. for t in tokens:
  79. try:
  80. re.compile(t.to_regexp())
  81. except:
  82. raise LexError("Cannot compile token: %s: %s" % t)
  83. token_names = {t.name for t in tokens}
  84. assert all(t in token_names for t in ignore)
  85. # Init
  86. self.newline_types = [t.name for t in tokens if _regexp_has_newline(t.to_regexp())]
  87. self.ignore_types = [t for t in ignore]
  88. tokens, self.callback = _create_unless(tokens)
  89. assert all(self.callback.values())
  90. tokens.sort(key=lambda x:(x.priority, len(x.value)), reverse=True)
  91. self.tokens = tokens
  92. self.mres = self._build_mres(tokens, len(tokens))
  93. def _build_mres(self, tokens, max_size):
  94. # Python sets an unreasonable group limit (currently 100) in its re module
  95. # Worse, the only way to know we reached it is by catching an AssertionError!
  96. # This function recursively tries less and less groups until it's successful.
  97. mres = []
  98. while tokens:
  99. try:
  100. mre = re.compile(u'|'.join(u'(?P<%s>%s)'%(t.name, t.to_regexp()) for t in tokens[:max_size]))
  101. except AssertionError: # Yes, this is what Python provides us.. :/
  102. return self._build_mres(tokens, max_size//2)
  103. mres.append((mre, {i:n for n,i in mre.groupindex.items()} ))
  104. tokens = tokens[max_size:]
  105. return mres
  106. def lex(self, stream):
  107. lex_pos = 0
  108. line = 1
  109. col_start_pos = 0
  110. newline_types = list(self.newline_types)
  111. ignore_types = list(self.ignore_types)
  112. while True:
  113. for mre, type_from_index in self.mres:
  114. m = mre.match(stream, lex_pos)
  115. if m:
  116. value = m.group(0)
  117. type_ = type_from_index[m.lastindex]
  118. if type_ not in ignore_types:
  119. t = Token(type_, value, lex_pos)
  120. t.line = line
  121. t.column = lex_pos - col_start_pos
  122. if t.type in self.callback:
  123. t = self.callback[t.type](t)
  124. yield t
  125. if type_ in newline_types:
  126. newlines = value.count(self.newline_char)
  127. if newlines:
  128. line += newlines
  129. col_start_pos = lex_pos + value.rindex(self.newline_char)
  130. lex_pos += len(value)
  131. break
  132. else:
  133. if lex_pos < len(stream):
  134. raise UnexpectedInput(stream, lex_pos, line, lex_pos - col_start_pos)
  135. break