179 lines
5.8 KiB

  1. #!/usr/bin/env python
  2. ############################################################################
  3. # Joshua R. Boverhof, David W. Robertson, LBNL
  4. # See LBNLCopyright for copyright notice!
  5. ###########################################################################
  6. import sys
  7. import unittest
  8. import os
  9. import inspect
  10. cmd_folder = os.path.abspath(os.path.join(os.path.split(inspect.getfile(
  11. inspect.currentframe()))[0], ".."))
  12. if cmd_folder not in sys.path:
  13. sys.path.insert(0, cmd_folder)
  14. from wstools.Utility import DOM
  15. from wstools.WSDLTools import WSDLReader
  16. from wstools.TimeoutSocket import TimeoutError
  17. try:
  18. import configparser
  19. except:
  20. from six.moves import configparser
  21. cwd = 'tests'
  22. # that's for tox/pytest
  23. nameGenerator = None
  24. def makeTestSuite(section='services_by_file'):
  25. global nameGenerator
  26. cp, numTests = setUpOptions(section)
  27. nameGenerator = getOption(cp, section)
  28. suite = unittest.TestSuite()
  29. for i in range(0, numTests):
  30. suite.addTest(unittest.makeSuite(WSDLToolsTestCase, 'test_'))
  31. return suite
  32. class WSDLToolsTestCase(unittest.TestCase):
  33. def __init__(self, methodName='runTest'):
  34. unittest.TestCase.__init__(self, methodName)
  35. def setUp(self):
  36. makeTestSuite()
  37. if hasattr(nameGenerator, '__next__'):
  38. self.path = nameGenerator.__next__()
  39. else:
  40. self.path = nameGenerator.next()
  41. #print(self.path)
  42. sys.stdout.flush()
  43. def __str__(self):
  44. teststr = unittest.TestCase.__str__(self)
  45. if hasattr(self, "path"):
  46. return "%s: %s" % (teststr, self.path)
  47. else:
  48. return "%s" % (teststr)
  49. def checkWSDLCollection(self, tag_name, component, key='name'):
  50. if self.wsdl is None:
  51. return
  52. definition = self.wsdl.document.documentElement
  53. version = DOM.WSDLUriToVersion(definition.namespaceURI)
  54. nspname = DOM.GetWSDLUri(version)
  55. for node in DOM.getElements(definition, tag_name, nspname):
  56. name = DOM.getAttr(node, key)
  57. comp = component[name]
  58. self.failUnlessEqual(eval('comp.%s' % key), name)
  59. def checkXSDCollection(self, tag_name, component, node, key='name'):
  60. for cnode in DOM.getElements(node, tag_name):
  61. name = DOM.getAttr(cnode, key)
  62. component[name]
  63. def test_all(self):
  64. try:
  65. if self.path[:7] == 'http://':
  66. self.wsdl = WSDLReader().loadFromURL(self.path)
  67. else:
  68. self.wsdl = WSDLReader().loadFromFile('tests/' + self.path)
  69. except TimeoutError:
  70. print("connection timed out")
  71. sys.stdout.flush()
  72. return
  73. except:
  74. self.path = self.path + ": load failed, unable to start"
  75. raise
  76. try:
  77. self.checkWSDLCollection('service', self.wsdl.services)
  78. except:
  79. self.path = self.path + ": wsdl.services"
  80. raise
  81. try:
  82. self.checkWSDLCollection('message', self.wsdl.messages)
  83. except:
  84. self.path = self.path + ": wsdl.messages"
  85. raise
  86. try:
  87. self.checkWSDLCollection('portType', self.wsdl.portTypes)
  88. except:
  89. self.path = self.path + ": wsdl.portTypes"
  90. raise
  91. try:
  92. self.checkWSDLCollection('binding', self.wsdl.bindings)
  93. except:
  94. self.path = self.path + ": wsdl.bindings"
  95. raise
  96. try:
  97. self.checkWSDLCollection('import', self.wsdl.imports,
  98. key='namespace')
  99. except:
  100. self.path = self.path + ": wsdl.imports"
  101. raise
  102. try:
  103. for key in self.wsdl.types.keys():
  104. schema = self.wsdl.types[key]
  105. self.failUnlessEqual(key, schema.getTargetNamespace())
  106. definition = self.wsdl.document.documentElement
  107. version = DOM.WSDLUriToVersion(definition.namespaceURI)
  108. nspname = DOM.GetWSDLUri(version)
  109. for node in DOM.getElements(definition, 'types', nspname):
  110. for snode in DOM.getElements(node, 'schema'):
  111. tns = DOM.findTargetNS(snode)
  112. schema = self.wsdl.types[tns]
  113. self.schemaAttributesDeclarations(schema, snode)
  114. self.schemaAttributeGroupDeclarations(schema, snode)
  115. self.schemaElementDeclarations(schema, snode)
  116. self.schemaTypeDefinitions(schema, snode)
  117. except:
  118. self.path = self.path + ": wsdl.types"
  119. raise
  120. if self.wsdl.extensions:
  121. print('No check for WSDLTools(%s) Extensions:' % (self.wsdl.name))
  122. for ext in self.wsdl.extensions:
  123. print('\t', ext)
  124. def schemaAttributesDeclarations(self, schema, node):
  125. self.checkXSDCollection('attribute', schema.attr_decl, node)
  126. def schemaAttributeGroupDeclarations(self, schema, node):
  127. self.checkXSDCollection('group', schema.attr_groups, node)
  128. def schemaElementDeclarations(self, schema, node):
  129. self.checkXSDCollection('element', schema.elements, node)
  130. def schemaTypeDefinitions(self, schema, node):
  131. self.checkXSDCollection('complexType', schema.types, node)
  132. self.checkXSDCollection('simpleType', schema.types, node)
  133. def setUpOptions(section):
  134. cp = configparser.ConfigParser()
  135. cp.optionxform = str
  136. cp.read(cwd + '/config.txt')
  137. if not cp.sections():
  138. print('fatal error: configuration file config.txt not present')
  139. sys.exit(0)
  140. if not cp.has_section(section):
  141. print('%s section not present in configuration file, exiting' % section)
  142. sys.exit(0)
  143. return cp, len(cp.options(section))
  144. def getOption(cp, section):
  145. for name, value in cp.items(section):
  146. yield value