#!/usr/bin/env python ############################################################################ # Joshua R. Boverhof, David W. Robertson, LBNL # See LBNLCopyright for copyright notice! ########################################################################### import sys import unittest import ConfigParser import os from wstools.Utility import DOM from wstools.WSDLTools import WSDLReader from wstools.TimeoutSocket import TimeoutError cwd = 'tests' # that's for tox/pytest nameGenerator = None def makeTestSuite(section='services_by_file'): global nameGenerator cp, numTests = setUpOptions(section) nameGenerator = getOption(cp, section) suite = unittest.TestSuite() for i in range(0, numTests): suite.addTest(unittest.makeSuite(WSDLToolsTestCase, 'test_')) return suite class WSDLToolsTestCase(unittest.TestCase): def __init__(self, methodName='runTest'): unittest.TestCase.__init__(self, methodName) def setUp(self): makeTestSuite() self.path = nameGenerator.next() print self.path sys.stdout.flush() def __str__(self): teststr = unittest.TestCase.__str__(self) if hasattr(self, "path"): return "%s: %s" % (teststr, self.path) else: return "%s" % (teststr) def checkWSDLCollection(self, tag_name, component, key='name'): if self.wsdl is None: return definition = self.wsdl.document.documentElement version = DOM.WSDLUriToVersion(definition.namespaceURI) nspname = DOM.GetWSDLUri(version) for node in DOM.getElements(definition, tag_name, nspname): name = DOM.getAttr(node, key) comp = component[name] self.failUnlessEqual(eval('comp.%s' % key), name) def checkXSDCollection(self, tag_name, component, node, key='name'): for cnode in DOM.getElements(node, tag_name): name = DOM.getAttr(cnode, key) component[name] def test_all(self): try: if self.path[:7] == 'http://': self.wsdl = WSDLReader().loadFromURL(self.path) else: self.wsdl = WSDLReader().loadFromFile('tests/' + self.path) except TimeoutError: print "connection timed out" sys.stdout.flush() return except: self.path = self.path + ": load failed, unable to start" raise try: self.checkWSDLCollection('service', self.wsdl.services) except: self.path = self.path + ": wsdl.services" raise try: self.checkWSDLCollection('message', self.wsdl.messages) except: self.path = self.path + ": wsdl.messages" raise try: self.checkWSDLCollection('portType', self.wsdl.portTypes) except: self.path = self.path + ": wsdl.portTypes" raise try: self.checkWSDLCollection('binding', self.wsdl.bindings) except: self.path = self.path + ": wsdl.bindings" raise try: self.checkWSDLCollection('import', self.wsdl.imports, \ key='namespace') except: self.path = self.path + ": wsdl.imports" raise try: for key in self.wsdl.types.keys(): schema = self.wsdl.types[key] self.failUnlessEqual(key, schema.getTargetNamespace()) definition = self.wsdl.document.documentElement version = DOM.WSDLUriToVersion(definition.namespaceURI) nspname = DOM.GetWSDLUri(version) for node in DOM.getElements(definition, 'types', nspname): for snode in DOM.getElements(node, 'schema'): tns = DOM.findTargetNS(snode) schema = self.wsdl.types[tns] self.schemaAttributesDeclarations(schema, snode) self.schemaAttributeGroupDeclarations(schema, snode) self.schemaElementDeclarations(schema, snode) self.schemaTypeDefinitions(schema, snode) except: self.path = self.path + ": wsdl.types" raise if self.wsdl.extensions: print 'No check for WSDLTools(%s) Extensions:' % (self.wsdl.name) for ext in self.wsdl.extensions: print '\t', ext def schemaAttributesDeclarations(self, schema, node): self.checkXSDCollection('attribute', schema.attr_decl, node) def schemaAttributeGroupDeclarations(self, schema, node): self.checkXSDCollection('group', schema.attr_groups, node) def schemaElementDeclarations(self, schema, node): self.checkXSDCollection('element', schema.elements, node) def schemaTypeDefinitions(self, schema, node): self.checkXSDCollection('complexType', schema.types, node) self.checkXSDCollection('simpleType', schema.types, node) def setUpOptions(section): cp = ConfigParser.ConfigParser() cp.read(cwd + '/config.txt') if not cp.sections(): print 'fatal error: configuration file config.txt not present' sys.exit(0) if not cp.has_section(section): print '%s section not present in configuration file, exiting' % section sys.exit(0) return cp, len(cp.options(section)) def getOption(cp, section): for name, value in cp.items(section): yield value