# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. ident = "$Id$" import weakref from cStringIO import StringIO from Namespaces import OASIS, XMLNS, WSA, WSA_LIST, WSRF_V1_2, WSRF from Utility import Collection, CollectionNS, DOM, ElementProxy, basejoin from XMLSchema import XMLSchema, SchemaReader, WSDLToolsAdapter class WSDLReader: """A WSDLReader creates WSDL instances from urls and xml data.""" # Custom subclasses of WSDLReader may wish to implement a caching # strategy or other optimizations. Because application needs vary # so widely, we don't try to provide any caching by default. def loadFromStream(self, stream, name=None): """Return a WSDL instance loaded from a stream object.""" document = DOM.loadDocument(stream) wsdl = WSDL() if name: wsdl.location = name elif hasattr(stream, 'name'): wsdl.location = stream.name wsdl.load(document) return wsdl def loadFromURL(self, url): """Return a WSDL instance loaded from the given url.""" document = DOM.loadFromURL(url) wsdl = WSDL() wsdl.location = url wsdl.load(document) return wsdl def loadFromString(self, data): """Return a WSDL instance loaded from an xml string.""" return self.loadFromStream(StringIO(data)) def loadFromFile(self, filename): """Return a WSDL instance loaded from the given file.""" file = open(filename, 'rb') try: wsdl = self.loadFromStream(file) finally: file.close() return wsdl class WSDL: """A WSDL object models a WSDL service description. WSDL objects may be created manually or loaded from an xml representation using a WSDLReader instance.""" def __init__(self, targetNamespace=None, strict=1): self.targetNamespace = targetNamespace or 'urn:this-document.wsdl' self.documentation = '' self.location = None self.document = None self.name = None self.services = CollectionNS(self) self.messages = CollectionNS(self) self.portTypes = CollectionNS(self) self.bindings = CollectionNS(self) self.imports = Collection(self) self.types = Types(self) self.extensions = [] self.strict = strict def __del__(self): if self.document is not None: self.document.unlink() version = '1.1' def addService(self, name, documentation='', targetNamespace=None): if self.services.has_key(name): raise WSDLError( 'Duplicate service element: %s' % name ) item = Service(name, documentation) if targetNamespace: item.targetNamespace = targetNamespace self.services[name] = item return item def addMessage(self, name, documentation='', targetNamespace=None): if self.messages.has_key(name): raise WSDLError( 'Duplicate message element: %s.' % name ) item = Message(name, documentation) if targetNamespace: item.targetNamespace = targetNamespace self.messages[name] = item return item def addPortType(self, name, documentation='', targetNamespace=None): if self.portTypes.has_key(name): raise WSDLError( 'Duplicate portType element: name' ) item = PortType(name, documentation) if targetNamespace: item.targetNamespace = targetNamespace self.portTypes[name] = item return item def addBinding(self, name, type, documentation='', targetNamespace=None): if self.bindings.has_key(name): raise WSDLError( 'Duplicate binding element: %s' % name ) item = Binding(name, type, documentation) if targetNamespace: item.targetNamespace = targetNamespace self.bindings[name] = item return item def addImport(self, namespace, location): item = ImportElement(namespace, location) self.imports[namespace] = item return item def toDom(self): """ Generate a DOM representation of the WSDL instance. Not dealing with generating XML Schema, thus the targetNamespace of all XML Schema elements or types used by WSDL message parts needs to be specified via import information items. """ namespaceURI = DOM.GetWSDLUri(self.version) self.document = DOM.createDocument(namespaceURI ,'wsdl:definitions') # Set up a couple prefixes for easy reading. child = DOM.getElement(self.document, None) child.setAttributeNS(None, 'targetNamespace', self.targetNamespace) child.setAttributeNS(XMLNS.BASE, 'xmlns:wsdl', namespaceURI) child.setAttributeNS(XMLNS.BASE, 'xmlns:xsd', 'http://www.w3.org/1999/XMLSchema') child.setAttributeNS(XMLNS.BASE, 'xmlns:soap', 'http://schemas.xmlsoap.org/wsdl/soap/') child.setAttributeNS(XMLNS.BASE, 'xmlns:tns', self.targetNamespace) if self.name: child.setAttributeNS(None, 'name', self.name) # wsdl:import for item in self.imports: item.toDom() # wsdl:message for item in self.messages: item.toDom() # wsdl:portType for item in self.portTypes: item.toDom() # wsdl:binding for item in self.bindings: item.toDom() # wsdl:service for item in self.services: item.toDom() def load(self, document): # We save a reference to the DOM document to ensure that elements # saved as "extensions" will continue to have a meaningful context # for things like namespace references. The lifetime of the DOM # document is bound to the lifetime of the WSDL instance. self.document = document definitions = DOM.getElement(document, 'definitions', None, None) if definitions is None: raise WSDLError( 'Missing element.' ) self.version = DOM.WSDLUriToVersion(definitions.namespaceURI) NS_WSDL = DOM.GetWSDLUri(self.version) self.targetNamespace = DOM.getAttr(definitions, 'targetNamespace', None, None) self.name = DOM.getAttr(definitions, 'name', None, None) self.documentation = GetDocumentation(definitions) # # Retrieve all 's, append all children of imported # document to main document. First iteration grab all original # 's from document, second iteration grab all # "imported" from document, etc break out when # no more 's. # imported = [] base_location = self.location do_it = True while do_it: do_it = False for element in DOM.getElements(definitions, 'import', NS_WSDL): location = DOM.getAttr(element, 'location') if base_location is not None: location = basejoin(base_location, location) if location not in imported: do_it = True self._import(document, element, base_location) imported.append(location) else: definitions.removeChild(element) base_location = None # # No more 's, now load up all other # WSDL information items. # for element in DOM.getElements(definitions, None, None): targetNamespace = DOM.getAttr(element, 'targetNamespace') localName = element.localName if not DOM.nsUriMatch(element.namespaceURI, NS_WSDL): if localName == 'schema': tns = DOM.getAttr(element, 'targetNamespace') reader = SchemaReader(base_url=self.imports[tns].location) schema = reader.loadFromNode(WSDLToolsAdapter(self), element) # schema.setBaseUrl(self.location) self.types.addSchema(schema) else: self.extensions.append(element) continue elif localName == 'message': name = DOM.getAttr(element, 'name') docs = GetDocumentation(element) message = self.addMessage(name, docs, targetNamespace) parts = DOM.getElements(element, 'part', NS_WSDL) message.load(parts) continue elif localName == 'portType': name = DOM.getAttr(element, 'name') docs = GetDocumentation(element) ptype = self.addPortType(name, docs, targetNamespace) #operations = DOM.getElements(element, 'operation', NS_WSDL) #ptype.load(operations) ptype.load(element) continue elif localName == 'binding': name = DOM.getAttr(element, 'name') type = DOM.getAttr(element, 'type', default=None) if type is None: raise WSDLError( 'Missing type attribute for binding %s.' % name ) type = ParseQName(type, element) docs = GetDocumentation(element) binding = self.addBinding(name, type, docs, targetNamespace) operations = DOM.getElements(element, 'operation', NS_WSDL) binding.load(operations) binding.load_ex(GetExtensions(element)) continue elif localName == 'service': name = DOM.getAttr(element, 'name') docs = GetDocumentation(element) service = self.addService(name, docs, targetNamespace) ports = DOM.getElements(element, 'port', NS_WSDL) service.load(ports) service.load_ex(GetExtensions(element)) continue elif localName == 'types': self.types.documentation = GetDocumentation(element) base_location = DOM.getAttr(element, 'base-location') if base_location: element.removeAttribute('base-location') base_location = base_location or self.location reader = SchemaReader(base_url=base_location) for item in DOM.getElements(element, None, None): if item.localName == 'schema': schema = reader.loadFromNode(WSDLToolsAdapter(self), item) # XXX could have been imported #schema.setBaseUrl(self.location) schema.setBaseUrl(base_location) self.types.addSchema(schema) else: self.types.addExtension(item) # XXX remove the attribute # element.removeAttribute('base-location') continue def _import(self, document, element, base_location=None): '''Algo take element's children, clone them, and add them to the main document. Support for relative locations is a bit complicated. The orig document context is lost, so we need to store base location in DOM elements representing , by creating a special temporary "base-location" attribute, and , by resolving the relative "location" and storing it as "location". document -- document we are loading element -- DOM Element representing base_location -- location of document from which this was gleaned. ''' namespace = DOM.getAttr(element, 'namespace', default=None) location = DOM.getAttr(element, 'location', default=None) if namespace is None or location is None: raise WSDLError( 'Invalid import element (missing namespace or location).' ) if base_location: location = basejoin(base_location, location) element.setAttributeNS(None, 'location', location) obimport = self.addImport(namespace, location) obimport._loaded = 1 importdoc = DOM.loadFromURL(location) try: if location.find('#') > -1: idref = location.split('#')[-1] imported = DOM.getElementById(importdoc, idref) else: imported = importdoc.documentElement if imported is None: raise WSDLError( 'Import target element not found for: %s' % location ) imported_tns = DOM.findTargetNS(imported) if imported_tns != namespace: return if imported.localName == 'definitions': imported_nodes = imported.childNodes else: imported_nodes = [imported] parent = element.parentNode parent.removeChild(element) for node in imported_nodes: if node.nodeType != node.ELEMENT_NODE: continue child = DOM.importNode(document, node, 1) parent.appendChild(child) child.setAttribute('targetNamespace', namespace) attrsNS = imported._attrsNS for attrkey in attrsNS.keys(): if attrkey[0] == DOM.NS_XMLNS: attr = attrsNS[attrkey].cloneNode(1) child.setAttributeNode(attr) #XXX Quick Hack, should be in WSDL Namespace. if child.localName == 'import': rlocation = child.getAttributeNS(None, 'location') alocation = basejoin(location, rlocation) child.setAttribute('location', alocation) elif child.localName == 'types': child.setAttribute('base-location', location) finally: importdoc.unlink() return location class Element: """A class that provides common functions for WSDL element classes.""" def __init__(self, name=None, documentation=''): self.name = name self.documentation = documentation self.extensions = [] def addExtension(self, item): item.parent = weakref.ref(self) self.extensions.append(item) def getWSDL(self): """Return the WSDL object that contains this information item.""" parent = self while 1: # skip any collections if isinstance(parent, WSDL): return parent try: parent = parent.parent() except: break return None class ImportElement(Element): def __init__(self, namespace, location): self.namespace = namespace self.location = location # def getWSDL(self): # """Return the WSDL object that contains this Message Part.""" # return self.parent().parent() def toDom(self): wsdl = self.getWSDL() ep = ElementProxy(None, DOM.getElement(wsdl.document, None)) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), 'import') epc.setAttributeNS(None, 'namespace', self.namespace) epc.setAttributeNS(None, 'location', self.location) _loaded = None class Types(Collection): default = lambda self,k: k.targetNamespace def __init__(self, parent): Collection.__init__(self, parent) self.documentation = '' self.extensions = [] def addSchema(self, schema): name = schema.targetNamespace self[name] = schema return schema def addExtension(self, item): self.extensions.append(item) class Message(Element): def __init__(self, name, documentation=''): Element.__init__(self, name, documentation) self.parts = Collection(self) def addPart(self, name, type=None, element=None): if self.parts.has_key(name): raise WSDLError( 'Duplicate message part element: %s' % name ) if type is None and element is None: raise WSDLError( 'Missing type or element attribute for part: %s' % name ) item = MessagePart(name) item.element = element item.type = type self.parts[name] = item return item def load(self, elements): for element in elements: name = DOM.getAttr(element, 'name') part = MessagePart(name) self.parts[name] = part elemref = DOM.getAttr(element, 'element', default=None) typeref = DOM.getAttr(element, 'type', default=None) if typeref is None and elemref is None: raise WSDLError( 'No type or element attribute for part: %s' % name ) if typeref is not None: part.type = ParseTypeRef(typeref, element) if elemref is not None: part.element = ParseTypeRef(elemref, element) # def getElementDeclaration(self): # """Return the XMLSchema.ElementDeclaration instance or None""" # element = None # if self.element: # nsuri,name = self.element # wsdl = self.getWSDL() # if wsdl.types.has_key(nsuri) and wsdl.types[nsuri].elements.has_key(name): # element = wsdl.types[nsuri].elements[name] # return element # # def getTypeDefinition(self): # """Return the XMLSchema.TypeDefinition instance or None""" # type = None # if self.type: # nsuri,name = self.type # wsdl = self.getWSDL() # if wsdl.types.has_key(nsuri) and wsdl.types[nsuri].types.has_key(name): # type = wsdl.types[nsuri].types[name] # return type # def getWSDL(self): # """Return the WSDL object that contains this Message Part.""" # return self.parent().parent() def toDom(self): wsdl = self.getWSDL() ep = ElementProxy(None, DOM.getElement(wsdl.document, None)) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), 'message') epc.setAttributeNS(None, 'name', self.name) for part in self.parts: part.toDom(epc._getNode()) class MessagePart(Element): def __init__(self, name): Element.__init__(self, name, '') self.element = None self.type = None # def getWSDL(self): # """Return the WSDL object that contains this Message Part.""" # return self.parent().parent().parent().parent() def getTypeDefinition(self): wsdl = self.getWSDL() nsuri,name = self.type schema = wsdl.types.get(nsuri, {}) return schema.get(name) def getElementDeclaration(self): wsdl = self.getWSDL() nsuri,name = self.element schema = wsdl.types.get(nsuri, {}) return schema.get(name) def toDom(self, node): """node -- node representing message""" wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), 'part') epc.setAttributeNS(None, 'name', self.name) if self.element is not None: ns,name = self.element prefix = epc.getPrefix(ns) epc.setAttributeNS(None, 'element', '%s:%s'%(prefix,name)) elif self.type is not None: ns,name = self.type prefix = epc.getPrefix(ns) epc.setAttributeNS(None, 'type', '%s:%s'%(prefix,name)) class PortType(Element): '''PortType has a anyAttribute, thus must provide for an extensible mechanism for supporting such attributes. ResourceProperties is specified in WS-ResourceProperties. wsa:Action is specified in WS-Address. Instance Data: name -- name attribute resourceProperties -- optional. wsr:ResourceProperties attribute, value is a QName this is Parsed into a (namespaceURI, name) that represents a Global Element Declaration. operations ''' def __init__(self, name, documentation=''): Element.__init__(self, name, documentation) self.operations = Collection(self) self.resourceProperties = None # def getWSDL(self): # return self.parent().parent() def getTargetNamespace(self): return self.targetNamespace or self.getWSDL().targetNamespace def getResourceProperties(self): return self.resourceProperties def addOperation(self, name, documentation='', parameterOrder=None): item = Operation(name, documentation, parameterOrder) self.operations[name] = item return item def load(self, element): self.name = DOM.getAttr(element, 'name') self.documentation = GetDocumentation(element) self.targetNamespace = DOM.getAttr(element, 'targetNamespace') for nsuri in WSRF_V1_2.PROPERTIES.XSD_LIST: if DOM.hasAttr(element, 'ResourceProperties', nsuri): rpref = DOM.getAttr(element, 'ResourceProperties', nsuri) self.resourceProperties = ParseQName(rpref, element) NS_WSDL = DOM.GetWSDLUri(self.getWSDL().version) elements = DOM.getElements(element, 'operation', NS_WSDL) for element in elements: name = DOM.getAttr(element, 'name') docs = GetDocumentation(element) param_order = DOM.getAttr(element, 'parameterOrder', default=None) if param_order is not None: param_order = param_order.split(' ') operation = self.addOperation(name, docs, param_order) item = DOM.getElement(element, 'input', None, None) if item is not None: name = DOM.getAttr(item, 'name') docs = GetDocumentation(item) msgref = DOM.getAttr(item, 'message') message = ParseQName(msgref, item) for WSA in WSA_LIST: action = DOM.getAttr(item, 'Action', WSA.ADDRESS, None) if action: break operation.setInput(message, name, docs, action) item = DOM.getElement(element, 'output', None, None) if item is not None: name = DOM.getAttr(item, 'name') docs = GetDocumentation(item) msgref = DOM.getAttr(item, 'message') message = ParseQName(msgref, item) for WSA in WSA_LIST: action = DOM.getAttr(item, 'Action', WSA.ADDRESS, None) if action: break operation.setOutput(message, name, docs, action) for item in DOM.getElements(element, 'fault', None): name = DOM.getAttr(item, 'name') docs = GetDocumentation(item) msgref = DOM.getAttr(item, 'message') message = ParseQName(msgref, item) for WSA in WSA_LIST: action = DOM.getAttr(item, 'Action', WSA.ADDRESS, None) if action: break operation.addFault(message, name, docs, action) def toDom(self): wsdl = self.getWSDL() ep = ElementProxy(None, DOM.getElement(wsdl.document, None)) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), 'portType') epc.setAttributeNS(None, 'name', self.name) if self.resourceProperties: ns,name = self.resourceProperties prefix = epc.getPrefix(ns) epc.setAttributeNS(WSRF.PROPERTIES.LATEST, 'ResourceProperties', '%s:%s'%(prefix,name)) for op in self.operations: op.toDom(epc._getNode()) class Operation(Element): def __init__(self, name, documentation='', parameterOrder=None): Element.__init__(self, name, documentation) self.parameterOrder = parameterOrder self.faults = Collection(self) self.input = None self.output = None def getWSDL(self): """Return the WSDL object that contains this Operation.""" return self.parent().parent().parent().parent() def getPortType(self): return self.parent().parent() def getInputAction(self): """wsa:Action attribute""" return GetWSAActionInput(self) def getInputMessage(self): if self.input is None: return None wsdl = self.getPortType().getWSDL() return wsdl.messages[self.input.message] def getOutputAction(self): """wsa:Action attribute""" return GetWSAActionOutput(self) def getOutputMessage(self): if self.output is None: return None wsdl = self.getPortType().getWSDL() return wsdl.messages[self.output.message] def getFaultAction(self, name): """wsa:Action attribute""" return GetWSAActionFault(self, name) def getFaultMessage(self, name): wsdl = self.getPortType().getWSDL() return wsdl.messages[self.faults[name].message] def addFault(self, message, name, documentation='', action=None): if self.faults.has_key(name): raise WSDLError( 'Duplicate fault element: %s' % name ) item = MessageRole('fault', message, name, documentation, action) self.faults[name] = item return item def setInput(self, message, name='', documentation='', action=None): self.input = MessageRole('input', message, name, documentation, action) self.input.parent = weakref.ref(self) return self.input def setOutput(self, message, name='', documentation='', action=None): self.output = MessageRole('output', message, name, documentation, action) self.output.parent = weakref.ref(self) return self.output def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), 'operation') epc.setAttributeNS(None, 'name', self.name) node = epc._getNode() if self.input: self.input.toDom(node) if self.output: self.output.toDom(node) for fault in self.faults: fault.toDom(node) class MessageRole(Element): def __init__(self, type, message, name='', documentation='', action=None): Element.__init__(self, name, documentation) self.message = message self.type = type self.action = action def getWSDL(self): """Return the WSDL object that contains this information item.""" parent = self while 1: # skip any collections if isinstance(parent, WSDL): return parent try: parent = parent.parent() except: break return None def getMessage(self): """Return the WSDL object that represents the attribute message (namespaceURI, name) tuple """ wsdl = self.getWSDL() return wsdl.messages[self.message] def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), self.type) if not isinstance(self.message, basestring) and len(self.message) == 2: ns,name = self.message prefix = epc.getPrefix(ns) epc.setAttributeNS(None, 'message', '%s:%s' %(prefix,name)) else: epc.setAttributeNS(None, 'message', self.message) if self.action: epc.setAttributeNS(WSA.ADDRESS, 'Action', self.action) if self.name: epc.setAttributeNS(None, 'name', self.name) class Binding(Element): def __init__(self, name, type, documentation=''): Element.__init__(self, name, documentation) self.operations = Collection(self) self.type = type # def getWSDL(self): # """Return the WSDL object that contains this binding.""" # return self.parent().parent() def getPortType(self): """Return the PortType object associated with this binding.""" return self.getWSDL().portTypes[self.type] def findBinding(self, kind): for item in self.extensions: if isinstance(item, kind): return item return None def findBindings(self, kind): return [ item for item in self.extensions if isinstance(item, kind) ] def addOperationBinding(self, name, documentation=''): item = OperationBinding(name, documentation) self.operations[name] = item return item def load(self, elements): for element in elements: name = DOM.getAttr(element, 'name') docs = GetDocumentation(element) opbinding = self.addOperationBinding(name, docs) opbinding.load_ex(GetExtensions(element)) item = DOM.getElement(element, 'input', None, None) if item is not None: #TODO: addInputBinding? mbinding = MessageRoleBinding('input') mbinding.documentation = GetDocumentation(item) opbinding.input = mbinding mbinding.load_ex(GetExtensions(item)) mbinding.parent = weakref.ref(opbinding) item = DOM.getElement(element, 'output', None, None) if item is not None: mbinding = MessageRoleBinding('output') mbinding.documentation = GetDocumentation(item) opbinding.output = mbinding mbinding.load_ex(GetExtensions(item)) mbinding.parent = weakref.ref(opbinding) for item in DOM.getElements(element, 'fault', None): name = DOM.getAttr(item, 'name') mbinding = MessageRoleBinding('fault', name) mbinding.documentation = GetDocumentation(item) opbinding.faults[name] = mbinding mbinding.load_ex(GetExtensions(item)) mbinding.parent = weakref.ref(opbinding) def load_ex(self, elements): for e in elements: ns, name = e.namespaceURI, e.localName if ns in DOM.NS_SOAP_BINDING_ALL and name == 'binding': transport = DOM.getAttr(e, 'transport', default=None) style = DOM.getAttr(e, 'style', default='document') ob = SoapBinding(transport, style) self.addExtension(ob) continue elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'binding': verb = DOM.getAttr(e, 'verb') ob = HttpBinding(verb) self.addExtension(ob) continue else: self.addExtension(e) def toDom(self): wsdl = self.getWSDL() ep = ElementProxy(None, DOM.getElement(wsdl.document, None)) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), 'binding') epc.setAttributeNS(None, 'name', self.name) ns,name = self.type prefix = epc.getPrefix(ns) epc.setAttributeNS(None, 'type', '%s:%s' %(prefix,name)) node = epc._getNode() for ext in self.extensions: ext.toDom(node) for op_binding in self.operations: op_binding.toDom(node) class OperationBinding(Element): def __init__(self, name, documentation=''): Element.__init__(self, name, documentation) self.input = None self.output = None self.faults = Collection(self) # def getWSDL(self): # """Return the WSDL object that contains this binding.""" # return self.parent().parent().parent().parent() def getBinding(self): """Return the parent Binding object of the operation binding.""" return self.parent().parent() def getOperation(self): """Return the abstract Operation associated with this binding.""" return self.getBinding().getPortType().operations[self.name] def findBinding(self, kind): for item in self.extensions: if isinstance(item, kind): return item return None def findBindings(self, kind): return [ item for item in self.extensions if isinstance(item, kind) ] def addInputBinding(self, binding): if self.input is None: self.input = MessageRoleBinding('input') self.input.parent = weakref.ref(self) self.input.addExtension(binding) return binding def addOutputBinding(self, binding): if self.output is None: self.output = MessageRoleBinding('output') self.output.parent = weakref.ref(self) self.output.addExtension(binding) return binding def addFaultBinding(self, name, binding): fault = self.get(name, None) if fault is None: fault = MessageRoleBinding('fault', name) fault.addExtension(binding) return binding def load_ex(self, elements): for e in elements: ns, name = e.namespaceURI, e.localName if ns in DOM.NS_SOAP_BINDING_ALL and name == 'operation': soapaction = DOM.getAttr(e, 'soapAction', default=None) style = DOM.getAttr(e, 'style', default=None) ob = SoapOperationBinding(soapaction, style) self.addExtension(ob) continue elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'operation': location = DOM.getAttr(e, 'location') ob = HttpOperationBinding(location) self.addExtension(ob) continue else: self.addExtension(e) def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), 'operation') epc.setAttributeNS(None, 'name', self.name) node = epc._getNode() for ext in self.extensions: ext.toDom(node) if self.input: self.input.toDom(node) if self.output: self.output.toDom(node) for fault in self.faults: fault.toDom(node) class MessageRoleBinding(Element): def __init__(self, type, name='', documentation=''): Element.__init__(self, name, documentation) self.type = type def findBinding(self, kind): for item in self.extensions: if isinstance(item, kind): return item return None def findBindings(self, kind): return [ item for item in self.extensions if isinstance(item, kind) ] def load_ex(self, elements): for e in elements: ns, name = e.namespaceURI, e.localName if ns in DOM.NS_SOAP_BINDING_ALL and name == 'body': encstyle = DOM.getAttr(e, 'encodingStyle', default=None) namespace = DOM.getAttr(e, 'namespace', default=None) parts = DOM.getAttr(e, 'parts', default=None) use = DOM.getAttr(e, 'use', default=None) if use is None: raise WSDLError( 'Invalid soap:body binding element.' ) ob = SoapBodyBinding(use, namespace, encstyle, parts) self.addExtension(ob) continue elif ns in DOM.NS_SOAP_BINDING_ALL and name == 'fault': encstyle = DOM.getAttr(e, 'encodingStyle', default=None) namespace = DOM.getAttr(e, 'namespace', default=None) name = DOM.getAttr(e, 'name', default=None) use = DOM.getAttr(e, 'use', default=None) if use is None or name is None: raise WSDLError( 'Invalid soap:fault binding element.' ) ob = SoapFaultBinding(name, use, namespace, encstyle) self.addExtension(ob) continue elif ns in DOM.NS_SOAP_BINDING_ALL and name in ( 'header', 'headerfault' ): encstyle = DOM.getAttr(e, 'encodingStyle', default=None) namespace = DOM.getAttr(e, 'namespace', default=None) message = DOM.getAttr(e, 'message') part = DOM.getAttr(e, 'part') use = DOM.getAttr(e, 'use') if name == 'header': _class = SoapHeaderBinding else: _class = SoapHeaderFaultBinding message = ParseQName(message, e) ob = _class(message, part, use, namespace, encstyle) self.addExtension(ob) continue elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'urlReplacement': ob = HttpUrlReplacementBinding() self.addExtension(ob) continue elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'urlEncoded': ob = HttpUrlEncodedBinding() self.addExtension(ob) continue elif ns in DOM.NS_MIME_BINDING_ALL and name == 'multipartRelated': ob = MimeMultipartRelatedBinding() self.addExtension(ob) ob.load_ex(GetExtensions(e)) continue elif ns in DOM.NS_MIME_BINDING_ALL and name == 'content': part = DOM.getAttr(e, 'part', default=None) type = DOM.getAttr(e, 'type', default=None) ob = MimeContentBinding(part, type) self.addExtension(ob) continue elif ns in DOM.NS_MIME_BINDING_ALL and name == 'mimeXml': part = DOM.getAttr(e, 'part', default=None) ob = MimeXmlBinding(part) self.addExtension(ob) continue else: self.addExtension(e) def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), self.type) node = epc._getNode() for item in self.extensions: if item: item.toDom(node) class Service(Element): def __init__(self, name, documentation=''): Element.__init__(self, name, documentation) self.ports = Collection(self) def getWSDL(self): return self.parent().parent() def addPort(self, name, binding, documentation=''): item = Port(name, binding, documentation) self.ports[name] = item return item def load(self, elements): for element in elements: name = DOM.getAttr(element, 'name', default=None) docs = GetDocumentation(element) binding = DOM.getAttr(element, 'binding', default=None) if name is None or binding is None: raise WSDLError( 'Invalid port element.' ) binding = ParseQName(binding, element) port = self.addPort(name, binding, docs) port.load_ex(GetExtensions(element)) def load_ex(self, elements): for e in elements: self.addExtension(e) def toDom(self): wsdl = self.getWSDL() ep = ElementProxy(None, DOM.getElement(wsdl.document, None)) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), "service") epc.setAttributeNS(None, "name", self.name) node = epc._getNode() for port in self.ports: port.toDom(node) class Port(Element): def __init__(self, name, binding, documentation=''): Element.__init__(self, name, documentation) self.binding = binding # def getWSDL(self): # return self.parent().parent().getWSDL() def getService(self): """Return the Service object associated with this port.""" return self.parent().parent() def getBinding(self): """Return the Binding object that is referenced by this port.""" wsdl = self.getService().getWSDL() return wsdl.bindings[self.binding] def getPortType(self): """Return the PortType object that is referenced by this port.""" wsdl = self.getService().getWSDL() binding = wsdl.bindings[self.binding] return wsdl.portTypes[binding.type] def getAddressBinding(self): """A convenience method to obtain the extension element used as the address binding for the port.""" for item in self.extensions: if isinstance(item, SoapAddressBinding) or \ isinstance(item, HttpAddressBinding): return item raise WSDLError( 'No address binding found in port.' ) def load_ex(self, elements): for e in elements: ns, name = e.namespaceURI, e.localName if ns in DOM.NS_SOAP_BINDING_ALL and name == 'address': location = DOM.getAttr(e, 'location', default=None) ob = SoapAddressBinding(location) self.addExtension(ob) continue elif ns in DOM.NS_HTTP_BINDING_ALL and name == 'address': location = DOM.getAttr(e, 'location', default=None) ob = HttpAddressBinding(location) self.addExtension(ob) continue else: self.addExtension(e) def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLUri(wsdl.version), "port") epc.setAttributeNS(None, "name", self.name) ns,name = self.binding prefix = epc.getPrefix(ns) epc.setAttributeNS(None, "binding", "%s:%s" %(prefix,name)) node = epc._getNode() for ext in self.extensions: ext.toDom(node) class SoapBinding: def __init__(self, transport, style='rpc'): self.transport = transport self.style = style def getWSDL(self): return self.parent().getWSDL() def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLSoapBindingUri(wsdl.version), 'binding') if self.transport: epc.setAttributeNS(None, "transport", self.transport) if self.style: epc.setAttributeNS(None, "style", self.style) class SoapAddressBinding: def __init__(self, location): self.location = location def getWSDL(self): return self.parent().getWSDL() def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLSoapBindingUri(wsdl.version), 'address') epc.setAttributeNS(None, "location", self.location) class SoapOperationBinding: def __init__(self, soapAction=None, style=None): self.soapAction = soapAction self.style = style def getWSDL(self): return self.parent().getWSDL() def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLSoapBindingUri(wsdl.version), 'operation') if self.soapAction: epc.setAttributeNS(None, 'soapAction', self.soapAction) if self.style: epc.setAttributeNS(None, 'style', self.style) class SoapBodyBinding: def __init__(self, use, namespace=None, encodingStyle=None, parts=None): if not use in ('literal', 'encoded'): raise WSDLError( 'Invalid use attribute value: %s' % use ) self.encodingStyle = encodingStyle self.namespace = namespace if type(parts) in (type(''), type(u'')): parts = parts.split() self.parts = parts self.use = use def getWSDL(self): return self.parent().getWSDL() def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLSoapBindingUri(wsdl.version), 'body') epc.setAttributeNS(None, "use", self.use) epc.setAttributeNS(None, "namespace", self.namespace) class SoapFaultBinding: def __init__(self, name, use, namespace=None, encodingStyle=None): if not use in ('literal', 'encoded'): raise WSDLError( 'Invalid use attribute value: %s' % use ) self.encodingStyle = encodingStyle self.namespace = namespace self.name = name self.use = use def getWSDL(self): return self.parent().getWSDL() def toDom(self, node): wsdl = self.getWSDL() ep = ElementProxy(None, node) epc = ep.createAppendElement(DOM.GetWSDLSoapBindingUri(wsdl.version), 'body') epc.setAttributeNS(None, "use", self.use) epc.setAttributeNS(None, "name", self.name) if self.namespace is not None: epc.setAttributeNS(None, "namespace", self.namespace) if self.encodingStyle is not None: epc.setAttributeNS(None, "encodingStyle", self.encodingStyle) class SoapHeaderBinding: def __init__(self, message, part, use, namespace=None, encodingStyle=None): if not use in ('literal', 'encoded'): raise WSDLError( 'Invalid use attribute value: %s' % use ) self.encodingStyle = encodingStyle self.namespace = namespace self.message = message self.part = part self.use = use tagname = 'header' class SoapHeaderFaultBinding(SoapHeaderBinding): tagname = 'headerfault' class HttpBinding: def __init__(self, verb): self.verb = verb class HttpAddressBinding: def __init__(self, location): self.location = location class HttpOperationBinding: def __init__(self, location): self.location = location class HttpUrlReplacementBinding: pass class HttpUrlEncodedBinding: pass class MimeContentBinding: def __init__(self, part=None, type=None): self.part = part self.type = type class MimeXmlBinding: def __init__(self, part=None): self.part = part class MimeMultipartRelatedBinding: def __init__(self): self.parts = [] def load_ex(self, elements): for e in elements: ns, name = e.namespaceURI, e.localName if ns in DOM.NS_MIME_BINDING_ALL and name == 'part': self.parts.append(MimePartBinding()) continue class MimePartBinding: def __init__(self): self.items = [] def load_ex(self, elements): for e in elements: ns, name = e.namespaceURI, e.localName if ns in DOM.NS_MIME_BINDING_ALL and name == 'content': part = DOM.getAttr(e, 'part', default=None) type = DOM.getAttr(e, 'type', default=None) ob = MimeContentBinding(part, type) self.items.append(ob) continue elif ns in DOM.NS_MIME_BINDING_ALL and name == 'mimeXml': part = DOM.getAttr(e, 'part', default=None) ob = MimeXmlBinding(part) self.items.append(ob) continue elif ns in DOM.NS_SOAP_BINDING_ALL and name == 'body': encstyle = DOM.getAttr(e, 'encodingStyle', default=None) namespace = DOM.getAttr(e, 'namespace', default=None) parts = DOM.getAttr(e, 'parts', default=None) use = DOM.getAttr(e, 'use', default=None) if use is None: raise WSDLError( 'Invalid soap:body binding element.' ) ob = SoapBodyBinding(use, namespace, encstyle, parts) self.items.append(ob) continue class WSDLError(Exception): pass def DeclareNSPrefix(writer, prefix, nsuri): if writer.hasNSPrefix(nsuri): return writer.declareNSPrefix(prefix, nsuri) def ParseTypeRef(value, element): parts = value.split(':', 1) if len(parts) == 1: return (DOM.findTargetNS(element), value) nsuri = DOM.findNamespaceURI(parts[0], element) return (nsuri, parts[1]) def ParseQName(value, element): nameref = value.split(':', 1) if len(nameref) == 2: nsuri = DOM.findNamespaceURI(nameref[0], element) name = nameref[-1] else: nsuri = DOM.findTargetNS(element) name = nameref[-1] return nsuri, name def GetDocumentation(element): docnode = DOM.getElement(element, 'documentation', None, None) if docnode is not None: return DOM.getElementText(docnode) return '' def GetExtensions(element): return [ item for item in DOM.getElements(element, None, None) if item.namespaceURI != DOM.NS_WSDL ] def GetWSAActionFault(operation, name): """Find wsa:Action attribute, and return value or WSA.FAULT for the default. """ attr = operation.faults[name].action if attr is not None: return attr return WSA.FAULT def GetWSAActionInput(operation): """Find wsa:Action attribute, and return value or the default.""" attr = operation.input.action if attr is not None: return attr portType = operation.getPortType() targetNamespace = portType.getTargetNamespace() ptName = portType.name msgName = operation.input.name if not msgName: msgName = operation.name + 'Request' if targetNamespace.endswith('/'): return '%s%s/%s' %(targetNamespace, ptName, msgName) return '%s/%s/%s' %(targetNamespace, ptName, msgName) def GetWSAActionOutput(operation): """Find wsa:Action attribute, and return value or the default.""" attr = operation.output.action if attr is not None: return attr targetNamespace = operation.getPortType().getTargetNamespace() ptName = operation.getPortType().name msgName = operation.output.name if not msgName: msgName = operation.name + 'Response' if targetNamespace.endswith('/'): return '%s%s/%s' %(targetNamespace, ptName, msgName) return '%s/%s/%s' %(targetNamespace, ptName, msgName) def FindExtensions(object, kind, t_type=type(())): if isinstance(kind, t_type): result = [] namespaceURI, name = kind return [ item for item in object.extensions if hasattr(item, 'nodeType') \ and DOM.nsUriMatch(namespaceURI, item.namespaceURI) \ and item.name == name ] return [ item for item in object.extensions if isinstance(item, kind) ] def FindExtension(object, kind, t_type=type(())): if isinstance(kind, t_type): namespaceURI, name = kind for item in object.extensions: if hasattr(item, 'nodeType') \ and DOM.nsUriMatch(namespaceURI, item.namespaceURI) \ and item.name == name: return item else: for item in object.extensions: if isinstance(item, kind): return item return None class SOAPCallInfo: """SOAPCallInfo captures the important binding information about a SOAP operation, in a structure that is easier to work with than raw WSDL structures.""" def __init__(self, methodName): self.methodName = methodName self.inheaders = [] self.outheaders = [] self.inparams = [] self.outparams = [] self.retval = None encodingStyle = DOM.NS_SOAP_ENC documentation = '' soapAction = None transport = None namespace = None location = None use = 'encoded' style = 'rpc' def addInParameter(self, name, type, namespace=None, element_type=0): """Add an input parameter description to the call info.""" parameter = ParameterInfo(name, type, namespace, element_type) self.inparams.append(parameter) return parameter def addOutParameter(self, name, type, namespace=None, element_type=0): """Add an output parameter description to the call info.""" parameter = ParameterInfo(name, type, namespace, element_type) self.outparams.append(parameter) return parameter def setReturnParameter(self, name, type, namespace=None, element_type=0): """Set the return parameter description for the call info.""" parameter = ParameterInfo(name, type, namespace, element_type) self.retval = parameter return parameter def addInHeaderInfo(self, name, type, namespace, element_type=0, mustUnderstand=0): """Add an input SOAP header description to the call info.""" headerinfo = HeaderInfo(name, type, namespace, element_type) if mustUnderstand: headerinfo.mustUnderstand = 1 self.inheaders.append(headerinfo) return headerinfo def addOutHeaderInfo(self, name, type, namespace, element_type=0, mustUnderstand=0): """Add an output SOAP header description to the call info.""" headerinfo = HeaderInfo(name, type, namespace, element_type) if mustUnderstand: headerinfo.mustUnderstand = 1 self.outheaders.append(headerinfo) return headerinfo def getInParameters(self): """Return a sequence of the in parameters of the method.""" return self.inparams def getOutParameters(self): """Return a sequence of the out parameters of the method.""" return self.outparams def getReturnParameter(self): """Return param info about the return value of the method.""" return self.retval def getInHeaders(self): """Return a sequence of the in headers of the method.""" return self.inheaders def getOutHeaders(self): """Return a sequence of the out headers of the method.""" return self.outheaders class ParameterInfo: """A ParameterInfo object captures parameter binding information.""" def __init__(self, name, type, namespace=None, element_type=0): if element_type: self.element_type = 1 if namespace is not None: self.namespace = namespace self.name = name self.type = type element_type = 0 namespace = None default = None class HeaderInfo(ParameterInfo): """A HeaderInfo object captures SOAP header binding information.""" def __init__(self, name, type, namespace, element_type=None): ParameterInfo.__init__(self, name, type, namespace, element_type) mustUnderstand = 0 actor = None def callInfoFromWSDL(port, name): """Return a SOAPCallInfo given a WSDL port and operation name.""" wsdl = port.getService().getWSDL() binding = port.getBinding() portType = binding.getPortType() operation = portType.operations[name] opbinding = binding.operations[name] messages = wsdl.messages callinfo = SOAPCallInfo(name) addrbinding = port.getAddressBinding() if not isinstance(addrbinding, SoapAddressBinding): raise ValueError, 'Unsupported binding type.' callinfo.location = addrbinding.location soapbinding = binding.findBinding(SoapBinding) if soapbinding is None: raise ValueError, 'Missing soap:binding element.' callinfo.transport = soapbinding.transport callinfo.style = soapbinding.style or 'document' soap_op_binding = opbinding.findBinding(SoapOperationBinding) if soap_op_binding is not None: callinfo.soapAction = soap_op_binding.soapAction callinfo.style = soap_op_binding.style or callinfo.style parameterOrder = operation.parameterOrder if operation.input is not None: message = messages[operation.input.message] msgrole = opbinding.input mime = msgrole.findBinding(MimeMultipartRelatedBinding) if mime is not None: raise ValueError, 'Mime bindings are not supported.' else: for item in msgrole.findBindings(SoapHeaderBinding): part = messages[item.message].parts[item.part] header = callinfo.addInHeaderInfo( part.name, part.element or part.type, item.namespace, element_type = part.element and 1 or 0 ) header.encodingStyle = item.encodingStyle body = msgrole.findBinding(SoapBodyBinding) if body is None: raise ValueError, 'Missing soap:body binding.' callinfo.encodingStyle = body.encodingStyle callinfo.namespace = body.namespace callinfo.use = body.use if body.parts is not None: parts = [] for name in body.parts: parts.append(message.parts[name]) else: parts = message.parts.values() for part in parts: callinfo.addInParameter( part.name, part.element or part.type, element_type = part.element and 1 or 0 ) if operation.output is not None: try: message = messages[operation.output.message] except KeyError: if self.strict: raise RuntimeError( "Recieved message not defined in the WSDL schema: %s" % operation.output.message) else: message = wsdl.addMessage(operation.output.message) print "Warning:", \ "Recieved message not defined in the WSDL schema.", \ "Adding it." print "Message:", operation.output.message msgrole = opbinding.output mime = msgrole.findBinding(MimeMultipartRelatedBinding) if mime is not None: raise ValueError, 'Mime bindings are not supported.' else: for item in msgrole.findBindings(SoapHeaderBinding): part = messages[item.message].parts[item.part] header = callinfo.addOutHeaderInfo( part.name, part.element or part.type, item.namespace, element_type = part.element and 1 or 0 ) header.encodingStyle = item.encodingStyle body = msgrole.findBinding(SoapBodyBinding) if body is None: raise ValueError, 'Missing soap:body binding.' callinfo.encodingStyle = body.encodingStyle callinfo.namespace = body.namespace callinfo.use = body.use if body.parts is not None: parts = [] for name in body.parts: parts.append(message.parts[name]) else: parts = message.parts.values() if parts: for part in parts: callinfo.addOutParameter( part.name, part.element or part.type, element_type = part.element and 1 or 0 ) return callinfo