This repo contains code to mirror other repos. It also contains the code that is getting mirrored.
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

111 rader
3.7 KiB

  1. ## Lexer Implementation
  2. import re
  3. from .utils import Str
  4. class LexError(Exception):
  5. pass
  6. class Token(Str):
  7. def __new__(cls, type, value, pos_in_stream=None):
  8. inst = Str.__new__(cls, value)
  9. inst.type = type
  10. inst.pos_in_stream = pos_in_stream
  11. inst.value = value
  12. return inst
  13. @classmethod
  14. def new_borrow_pos(cls, type, value, borrow_t):
  15. inst = cls(type, value, borrow_t.pos_in_stream)
  16. inst.line = borrow_t.line
  17. inst.column = borrow_t.column
  18. return inst
  19. def __repr__(self):
  20. return 'Token(%s, %s)' % (self.type, self.value)
  21. class Regex:
  22. def __init__(self, pattern, flags=()):
  23. self.pattern = pattern
  24. self.flags = flags
  25. class Lexer(object):
  26. def __init__(self, tokens, callbacks, ignore=()):
  27. self.ignore = ignore
  28. self.newline_char = '\n'
  29. tokens = list(tokens)
  30. # Sanitization
  31. token_names = {t[0] for t in tokens}
  32. for t in tokens:
  33. try:
  34. re.compile(t[1])
  35. except:
  36. raise LexError("Cannot compile token: %s: %s" % t)
  37. assert all(t in token_names for t in ignore)
  38. # Init
  39. self.tokens = tokens
  40. self.callbacks = callbacks
  41. self.token_types = list(token_names)
  42. self.type_index = {name:i for i,name in enumerate(self.token_types)}
  43. self.newline_types = [self.type_index[t[0]] for t in tokens if '\n' in t[1] or '\\n' in t[1] or '(?s)' in t[1]]
  44. self.ignore_types = [self.type_index[t] for t in ignore]
  45. self.mres = self._build_mres(tokens, len(tokens))
  46. def _build_mres(self, tokens, max_size):
  47. # Python sets an unreasonable group limit (currently 100) in its re module
  48. # Worse, the only way to know we reached it is by catching an AssertionError!
  49. # This function recursively tries less and less groups until it's successful.
  50. mres = []
  51. while tokens:
  52. try:
  53. mre = re.compile(u'|'.join(u'(?P<%s>%s)'%t for t in tokens[:max_size]))
  54. except AssertionError: # Yes, this is what Python provides us.. :/
  55. return self._build_mres(tokens, max_size/2)
  56. mres.append((mre, {i:self.type_index[n] for n,i in mre.groupindex.items()} ))
  57. tokens = tokens[max_size:]
  58. return mres
  59. def lex(self, stream):
  60. lex_pos = 0
  61. line = 1
  62. col_start_pos = 0
  63. newline_types = list(self.newline_types)
  64. ignore_types = list(self.ignore_types)
  65. while True:
  66. for mre, type_from_index in self.mres:
  67. m = mre.match(stream, lex_pos)
  68. if m:
  69. value = m.group(0)
  70. type_num = type_from_index[m.lastindex]
  71. if type_num not in ignore_types:
  72. t = Token(self.token_types[type_num], value, lex_pos)
  73. t.line = line
  74. t.column = lex_pos - col_start_pos
  75. if t.type in self.callbacks:
  76. t = self.callbacks[t.type](t)
  77. yield t
  78. if type_num in newline_types:
  79. newlines = value.count(self.newline_char)
  80. if newlines:
  81. line += newlines
  82. col_start_pos = lex_pos + value.rindex(self.newline_char)
  83. lex_pos += len(value)
  84. break
  85. else:
  86. if lex_pos < len(stream):
  87. context = stream[lex_pos:lex_pos+5]
  88. raise LexError("No token defined for: '%s' in %s at line %d" % (stream[lex_pos], context, line))
  89. break