176 lines
5.5 KiB

  1. #!/usr/bin/env python
  2. ############################################################################
  3. # Joshua R. Boverhof, David W. Robertson, LBNL
  4. # See LBNLCopyright for copyright notice!
  5. ###########################################################################
  6. import sys
  7. import unittest
  8. import ConfigParser
  9. import os
  10. from wstools.Utility import DOM
  11. from wstools.WSDLTools import WSDLReader
  12. from wstools.TimeoutSocket import TimeoutError
  13. from wstools import tests
  14. cwd = os.path.dirname(tests.__file__)
  15. # that's for tox/pytest
  16. nameGenerator = None
  17. def makeTestSuite(section='services_by_file'):
  18. global nameGenerator
  19. cp, numTests = setUpOptions(section)
  20. nameGenerator = getOption(cp, section)
  21. suite = unittest.TestSuite()
  22. for i in range(0, numTests):
  23. suite.addTest(unittest.makeSuite(WSDLToolsTestCase, 'test_'))
  24. return suite
  25. class WSDLToolsTestCase(unittest.TestCase):
  26. def __init__(self, methodName='runTest'):
  27. unittest.TestCase.__init__(self, methodName)
  28. def setUp(self):
  29. makeTestSuite()
  30. self.path = nameGenerator.next()
  31. print self.path
  32. sys.stdout.flush()
  33. def __str__(self):
  34. teststr = unittest.TestCase.__str__(self)
  35. if hasattr(self, "path"):
  36. return "%s: %s" % (teststr, self.path)
  37. else:
  38. return "%s" % (teststr)
  39. def checkWSDLCollection(self, tag_name, component, key='name'):
  40. if self.wsdl is None:
  41. return
  42. definition = self.wsdl.document.documentElement
  43. version = DOM.WSDLUriToVersion(definition.namespaceURI)
  44. nspname = DOM.GetWSDLUri(version)
  45. for node in DOM.getElements(definition, tag_name, nspname):
  46. name = DOM.getAttr(node, key)
  47. comp = component[name]
  48. self.failUnlessEqual(eval('comp.%s' % key), name)
  49. def checkXSDCollection(self, tag_name, component, node, key='name'):
  50. for cnode in DOM.getElements(node, tag_name):
  51. name = DOM.getAttr(cnode, key)
  52. component[name]
  53. def test_all(self):
  54. try:
  55. if self.path[:7] == 'http://':
  56. self.wsdl = WSDLReader().loadFromURL(self.path)
  57. else:
  58. self.wsdl = WSDLReader().loadFromFile('tests/' + self.path)
  59. except TimeoutError:
  60. print "connection timed out"
  61. sys.stdout.flush()
  62. return
  63. except:
  64. self.path = self.path + ": load failed, unable to start"
  65. raise
  66. try:
  67. self.checkWSDLCollection('service', self.wsdl.services)
  68. except:
  69. self.path = self.path + ": wsdl.services"
  70. raise
  71. try:
  72. self.checkWSDLCollection('message', self.wsdl.messages)
  73. except:
  74. self.path = self.path + ": wsdl.messages"
  75. raise
  76. try:
  77. self.checkWSDLCollection('portType', self.wsdl.portTypes)
  78. except:
  79. self.path = self.path + ": wsdl.portTypes"
  80. raise
  81. try:
  82. self.checkWSDLCollection('binding', self.wsdl.bindings)
  83. except:
  84. self.path = self.path + ": wsdl.bindings"
  85. raise
  86. try:
  87. self.checkWSDLCollection('import', self.wsdl.imports, \
  88. key='namespace')
  89. except:
  90. self.path = self.path + ": wsdl.imports"
  91. raise
  92. try:
  93. for key in self.wsdl.types.keys():
  94. schema = self.wsdl.types[key]
  95. self.failUnlessEqual(key, schema.getTargetNamespace())
  96. definition = self.wsdl.document.documentElement
  97. version = DOM.WSDLUriToVersion(definition.namespaceURI)
  98. nspname = DOM.GetWSDLUri(version)
  99. for node in DOM.getElements(definition, 'types', nspname):
  100. for snode in DOM.getElements(node, 'schema'):
  101. tns = DOM.findTargetNS(snode)
  102. schema = self.wsdl.types[tns]
  103. self.schemaAttributesDeclarations(schema, snode)
  104. self.schemaAttributeGroupDeclarations(schema, snode)
  105. self.schemaElementDeclarations(schema, snode)
  106. self.schemaTypeDefinitions(schema, snode)
  107. except:
  108. self.path = self.path + ": wsdl.types"
  109. raise
  110. if self.wsdl.extensions:
  111. print 'No check for WSDLTools(%s) Extensions:' % (self.wsdl.name)
  112. for ext in self.wsdl.extensions:
  113. print '\t', ext
  114. def schemaAttributesDeclarations(self, schema, node):
  115. self.checkXSDCollection('attribute', schema.attr_decl, node)
  116. def schemaAttributeGroupDeclarations(self, schema, node):
  117. self.checkXSDCollection('group', schema.attr_groups, node)
  118. def schemaElementDeclarations(self, schema, node):
  119. self.checkXSDCollection('element', schema.elements, node)
  120. def schemaTypeDefinitions(self, schema, node):
  121. self.checkXSDCollection('complexType', schema.types, node)
  122. self.checkXSDCollection('simpleType', schema.types, node)
  123. def setUpOptions(section):
  124. cp = ConfigParser.ConfigParser()
  125. cp.read(cwd + '/config.txt')
  126. if not cp.sections():
  127. print 'fatal error: configuration file config.txt not present'
  128. sys.exit(0)
  129. if not cp.has_section(section):
  130. print '%s section not present in configuration file, exiting' % section
  131. sys.exit(0)
  132. return cp, len(cp.options(section))
  133. def getOption(cp, section):
  134. for name, value in cp.items(section):
  135. yield value
  136. def main():
  137. unittest.main(defaultTest="makeTestSuite")
  138. if __name__ == "__main__":
  139. main()