|
- """
- ################################################################################
- # Copyright (c) 2003, Pfizer
- # Copyright (c) 2001, Cayce Ullman.
- # Copyright (c) 2001, Brian Matthews.
- #
- # All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are met:
- # Redistributions of source code must retain the above copyright notice, this
- # list of conditions and the following disclaimer.
- #
- # Redistributions in binary form must reproduce the above copyright notice,
- # this list of conditions and the following disclaimer in the documentation
- # and/or other materials provided with the distribution.
- #
- # Neither the name of actzero, inc. nor the names of its contributors may
- # be used to endorse or promote products derived from this software without
- # specific prior written permission.
- #
- # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- # ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR
- # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- #
- ################################################################################
- """
-
- ident = '$Id: SOAPBuilder.py,v 1.19 2004/04/10 04:28:46 irjudson Exp $'
- from version import __version__
-
- import cgi
- import copy
- from wstools.XMLname import toXMLname, fromXMLname
- import fpconst
-
- # SOAPpy modules
- from Config import Config
- from NS import NS
- from Types import *
-
- # Test whether this Python version has Types.BooleanType
- # If it doesn't have it, then False and True are serialized as integers
- try:
- BooleanType
- pythonHasBooleanType = 1
- except NameError:
- pythonHasBooleanType = 0
-
- ################################################################################
- # SOAP Builder
- ################################################################################
- class SOAPBuilder:
- _xml_top = '<?xml version="1.0"?>\n'
- _xml_enc_top = '<?xml version="1.0" encoding="%s"?>\n'
- _env_top = '%(ENV_T)s:Envelope %(ENV_T)s:encodingStyle="%(ENC)s"' % \
- NS.__dict__
- _env_bot = '</%(ENV_T)s:Envelope>\n' % NS.__dict__
-
- # Namespaces potentially defined in the Envelope tag.
-
- _env_ns = {NS.ENC: NS.ENC_T, NS.ENV: NS.ENV_T,
- NS.XSD: NS.XSD_T, NS.XSD2: NS.XSD2_T, NS.XSD3: NS.XSD3_T,
- NS.XSI: NS.XSI_T, NS.XSI2: NS.XSI2_T, NS.XSI3: NS.XSI3_T}
-
- def __init__(self, args = (), kw = {}, method = None, namespace = None,
- header = None, methodattrs = None, envelope = 1, encoding = 'UTF-8',
- use_refs = 0, config = Config, noroot = 0):
-
- # Test the encoding, raising an exception if it's not known
- if encoding != None:
- ''.encode(encoding)
-
- self.args = args
- self.kw = kw
- self.envelope = envelope
- self.encoding = encoding
- self.method = method
- self.namespace = namespace
- self.header = header
- self.methodattrs= methodattrs
- self.use_refs = use_refs
- self.config = config
- self.out = []
- self.tcounter = 0
- self.ncounter = 1
- self.icounter = 1
- self.envns = {}
- self.ids = {}
- self.depth = 0
- self.multirefs = []
- self.multis = 0
- self.body = not isinstance(args, bodyType)
- self.noroot = noroot
-
- def build(self):
- if Config.debug: print "In build."
- ns_map = {}
-
- # Cache whether typing is on or not
- typed = self.config.typed
-
- if self.header:
- # Create a header.
- self.dump(self.header, "Header", typed = typed)
- self.header = None # Wipe it out so no one is using it.
-
- if self.body:
- # Call genns to record that we've used SOAP-ENV.
- self.depth += 1
- body_ns = self.genns(ns_map, NS.ENV)[0]
- self.out.append("<%sBody>\n" % body_ns)
-
- if self.method:
- self.depth += 1
- a = ''
- if self.methodattrs:
- for (k, v) in self.methodattrs.items():
- a += ' %s="%s"' % (k, v)
-
- if self.namespace: # Use the namespace info handed to us
- methodns, n = self.genns(ns_map, self.namespace)
- else:
- methodns, n = '', ''
-
- self.out.append('<%s%s%s%s%s>\n' % (
- methodns, self.method, n, a, self.genroot(ns_map)))
-
- try:
- if type(self.args) != TupleType:
- args = (self.args,)
- else:
- args = self.args
-
- for i in args:
- self.dump(i, typed = typed, ns_map = ns_map)
-
- if hasattr(self.config, "argsOrdering") and self.config.argsOrdering.has_key(self.method):
- for k in self.config.argsOrdering.get(self.method):
- self.dump(self.kw.get(k), k, typed = typed, ns_map = ns_map)
- else:
- for (k, v) in self.kw.items():
- self.dump(v, k, typed = typed, ns_map = ns_map)
-
- except RecursionError:
- if self.use_refs == 0:
- # restart
- b = SOAPBuilder(args = self.args, kw = self.kw,
- method = self.method, namespace = self.namespace,
- header = self.header, methodattrs = self.methodattrs,
- envelope = self.envelope, encoding = self.encoding,
- use_refs = 1, config = self.config)
- return b.build()
- raise
-
- if self.method:
- self.out.append("</%s%s>\n" % (methodns, self.method))
- self.depth -= 1
-
- if self.body:
- # dump may add to self.multirefs, but the for loop will keep
- # going until it has used all of self.multirefs, even those
- # entries added while in the loop.
-
- self.multis = 1
-
- for obj, tag in self.multirefs:
- self.dump(obj, tag, typed = typed, ns_map = ns_map)
-
- self.out.append("</%sBody>\n" % body_ns)
- self.depth -= 1
-
- if self.envelope:
- e = map (lambda ns: ' xmlns:%s="%s"' % (ns[1], ns[0]),
- self.envns.items())
-
- self.out = ['<', self._env_top] + e + ['>\n'] + \
- self.out + \
- [self._env_bot]
-
- if self.encoding != None:
- self.out.insert(0, self._xml_enc_top % self.encoding)
- return ''.join(self.out).encode(self.encoding)
-
- self.out.insert(0, self._xml_top)
- return ''.join(self.out)
-
- def gentag(self):
- if Config.debug: print "In gentag."
- self.tcounter += 1
- return "v%d" % self.tcounter
-
- def genns(self, ns_map, nsURI):
- if nsURI == None:
- return ('', '')
-
- if type(nsURI) == TupleType: # already a tuple
- if len(nsURI) == 2:
- ns, nsURI = nsURI
- else:
- ns, nsURI = None, nsURI[0]
- else:
- ns = None
-
- if ns_map.has_key(nsURI):
- return (ns_map[nsURI] + ':', '')
-
- if self._env_ns.has_key(nsURI):
- ns = self.envns[nsURI] = ns_map[nsURI] = self._env_ns[nsURI]
- return (ns + ':', '')
-
- if not ns:
- ns = "ns%d" % self.ncounter
- self.ncounter += 1
- ns_map[nsURI] = ns
- if self.config.buildWithNamespacePrefix:
- return (ns + ':', ' xmlns:%s="%s"' % (ns, nsURI))
- else:
- return ('', ' xmlns="%s"' % (nsURI))
-
- def genroot(self, ns_map):
- if self.noroot:
- return ''
-
- if self.depth != 2:
- return ''
-
- ns, n = self.genns(ns_map, NS.ENC)
- return ' %sroot="%d"%s' % (ns, not self.multis, n)
-
- # checkref checks an element to see if it needs to be encoded as a
- # multi-reference element or not. If it returns None, the element has
- # been handled and the caller can continue with subsequent elements.
- # If it returns a string, the string should be included in the opening
- # tag of the marshaled element.
-
- def checkref(self, obj, tag, ns_map):
- if self.depth < 2:
- return ''
-
- if not self.ids.has_key(id(obj)):
- n = self.ids[id(obj)] = self.icounter
- self.icounter = n + 1
-
- if self.use_refs == 0:
- return ''
-
- if self.depth == 2:
- return ' id="i%d"' % n
-
- self.multirefs.append((obj, tag))
- else:
- if self.use_refs == 0:
- raise RecursionError, "Cannot serialize recursive object"
-
- n = self.ids[id(obj)]
-
- if self.multis and self.depth == 2:
- return ' id="i%d"' % n
-
- self.out.append('<%s href="#i%d"%s/>\n' %
- (tag, n, self.genroot(ns_map)))
- return None
-
- # dumpers
-
- def dump(self, obj, tag = None, typed = 1, ns_map = {}):
- if Config.debug: print "In dump.", "obj=", obj
- ns_map = ns_map.copy()
- self.depth += 1
-
- if type(tag) not in (NoneType, StringType, UnicodeType):
- raise KeyError, "tag must be a string or None"
-
- try:
- meth = getattr(self, "dump_" + type(obj).__name__)
- except AttributeError:
- if type(obj) == LongType:
- obj_type = "integer"
- elif pythonHasBooleanType and type(obj) == BooleanType:
- obj_type = "boolean"
- else:
- obj_type = type(obj).__name__
-
- self.out.append(self.dumper(None, obj_type, obj, tag, typed,
- ns_map, self.genroot(ns_map)))
- else:
- meth(obj, tag, typed, ns_map)
-
-
- self.depth -= 1
-
- # generic dumper
- def dumper(self, nsURI, obj_type, obj, tag, typed = 1, ns_map = {},
- rootattr = '', id = '',
- xml = '<%(tag)s%(type)s%(id)s%(attrs)s%(root)s>%(data)s</%(tag)s>\n'):
- if Config.debug: print "In dumper."
-
- if nsURI == None:
- nsURI = self.config.typesNamespaceURI
-
- tag = tag or self.gentag()
-
- tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
-
- a = n = t = ''
- if typed and obj_type:
- ns, n = self.genns(ns_map, nsURI)
- ins = self.genns(ns_map, self.config.schemaNamespaceURI)[0]
- t = ' %stype="%s%s"%s' % (ins, ns, obj_type, n)
-
- try: a = obj._marshalAttrs(ns_map, self)
- except: pass
-
- try: data = obj._marshalData()
- except:
- if (obj_type != "string"): # strings are already encoded
- data = cgi.escape(str(obj))
- else:
- data = obj
-
-
- return xml % {"tag": tag, "type": t, "data": data, "root": rootattr,
- "id": id, "attrs": a}
-
- def dump_float(self, obj, tag, typed = 1, ns_map = {}):
- if Config.debug: print "In dump_float."
- tag = tag or self.gentag()
-
- tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
-
- if Config.strict_range:
- doubleType(obj)
-
- if fpconst.isPosInf(obj):
- obj = "INF"
- elif fpconst.isNegInf(obj):
- obj = "-INF"
- elif fpconst.isNaN(obj):
- obj = "NaN"
- else:
- obj = str(obj)
-
- # Note: python 'float' is actually a SOAP 'double'.
- self.out.append(self.dumper(None, "double", obj, tag, typed, ns_map,
- self.genroot(ns_map)))
-
- def dump_string(self, obj, tag, typed = 0, ns_map = {}):
- if Config.debug: print "In dump_string."
- tag = tag or self.gentag()
- tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
-
- id = self.checkref(obj, tag, ns_map)
- if id == None:
- return
-
- try: data = obj._marshalData()
- except: data = obj
-
- self.out.append(self.dumper(None, "string", cgi.escape(data), tag,
- typed, ns_map, self.genroot(ns_map), id))
-
- dump_str = dump_string # For Python 2.2+
- dump_unicode = dump_string
-
- def dump_None(self, obj, tag, typed = 0, ns_map = {}):
- if Config.debug: print "In dump_None."
- tag = tag or self.gentag()
- tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
- ns = self.genns(ns_map, self.config.schemaNamespaceURI)[0]
-
- self.out.append('<%s %snull="1"%s/>\n' %
- (tag, ns, self.genroot(ns_map)))
-
- dump_NoneType = dump_None # For Python 2.2+
-
- def dump_list(self, obj, tag, typed = 1, ns_map = {}):
- if Config.debug: print "In dump_list.", "obj=", obj
- tag = tag or self.gentag()
- tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
-
- if type(obj) == InstanceType:
- data = obj.data
- else:
- data = obj
-
- id = self.checkref(obj, tag, ns_map)
- if id == None:
- return
-
- try:
- sample = data[0]
- empty = 0
- except:
- # preserve type if present
- if getattr(obj,"_typed",None) and getattr(obj,"_type",None):
- if getattr(obj, "_complexType", None):
- sample = typedArrayType(typed=obj._type,
- complexType = obj._complexType)
- sample._typename = obj._type
- obj._ns = NS.URN
- else:
- sample = typedArrayType(typed=obj._type)
- else:
- sample = structType()
- empty = 1
-
- # First scan list to see if all are the same type
- same_type = 1
-
- if not empty:
- for i in data[1:]:
- if type(sample) != type(i) or \
- (type(sample) == InstanceType and \
- sample.__class__ != i.__class__):
- same_type = 0
- break
-
- ndecl = ''
- if same_type:
- if (isinstance(sample, structType)) or \
- type(sample) == DictType or \
- (isinstance(sample, anyType) and \
- (getattr(sample, "_complexType", None) and \
- sample._complexType)): # force to urn struct
- try:
- tns = obj._ns or NS.URN
- except:
- tns = NS.URN
-
- ns, ndecl = self.genns(ns_map, tns)
-
- try:
- typename = sample._typename
- except:
- typename = "SOAPStruct"
-
- t = ns + typename
-
- elif isinstance(sample, anyType):
- ns = sample._validNamespaceURI(self.config.typesNamespaceURI,
- self.config.strictNamespaces)
- if ns:
- ns, ndecl = self.genns(ns_map, ns)
- t = ns + sample._type
- else:
- t = 'ur-type'
- else:
- typename = type(sample).__name__
-
- # For Python 2.2+
- if type(sample) == StringType: typename = 'string'
-
- # HACK: python 'float' is actually a SOAP 'double'.
- if typename=="float": typename="double"
- t = self.genns(ns_map, self.config.typesNamespaceURI)[0] + \
- typename
-
- else:
- t = self.genns(ns_map, self.config.typesNamespaceURI)[0] + \
- "ur-type"
-
- try: a = obj._marshalAttrs(ns_map, self)
- except: a = ''
-
- ens, edecl = self.genns(ns_map, NS.ENC)
- ins, idecl = self.genns(ns_map, self.config.schemaNamespaceURI)
-
- self.out.append(
- '<%s %sarrayType="%s[%d]" %stype="%sArray"%s%s%s%s%s%s>\n' %
- (tag, ens, t, len(data), ins, ens, ndecl, edecl, idecl,
- self.genroot(ns_map), id, a))
-
- typed = not same_type
-
- try: elemsname = obj._elemsname
- except: elemsname = "item"
-
- for i in data:
- self.dump(i, elemsname, typed, ns_map)
-
- self.out.append('</%s>\n' % tag)
-
- dump_tuple = dump_list
-
- def dump_dictionary(self, obj, tag, typed = 1, ns_map = {}):
- if Config.debug: print "In dump_dictionary."
- tag = tag or self.gentag()
- tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
-
- id = self.checkref(obj, tag, ns_map)
- if id == None:
- return
-
- try: a = obj._marshalAttrs(ns_map, self)
- except: a = ''
-
- self.out.append('<%s%s%s%s>\n' %
- (tag, id, a, self.genroot(ns_map)))
-
- for (k, v) in obj.items():
- if k[0] != "_":
- self.dump(v, k, 1, ns_map)
-
- self.out.append('</%s>\n' % tag)
-
- dump_dict = dump_dictionary # For Python 2.2+
-
- def dump_instance(self, obj, tag, typed = 1, ns_map = {}):
- if Config.debug: print "In dump_instance.", "obj=", obj, "tag=", tag
- if not tag:
- # If it has a name use it.
- if isinstance(obj, anyType) and obj._name:
- tag = obj._name
- else:
- tag = self.gentag()
- tag = toXMLname(tag) # convert from SOAP 1.2 XML name encoding
-
- if isinstance(obj, arrayType): # Array
- self.dump_list(obj, tag, typed, ns_map)
- return
-
- if isinstance(obj, faultType): # Fault
- cns, cdecl = self.genns(ns_map, NS.ENC)
- vns, vdecl = self.genns(ns_map, NS.ENV)
- self.out.append('''<%sFault %sroot="1"%s%s>
- <faultcode>%s</faultcode>
- <faultstring>%s</faultstring>
- ''' % (vns, cns, vdecl, cdecl, obj.faultcode, obj.faultstring))
- if hasattr(obj, "detail"):
- self.dump(obj.detail, "detail", typed, ns_map)
- self.out.append("</%sFault>\n" % vns)
- return
-
- r = self.genroot(ns_map)
-
- try: a = obj._marshalAttrs(ns_map, self)
- except: a = ''
-
- if isinstance(obj, voidType): # void
- self.out.append("<%s%s%s></%s>\n" % (tag, a, r, tag))
- return
-
- id = self.checkref(obj, tag, ns_map)
- if id == None:
- return
-
- if isinstance(obj, structType):
- # Check for namespace
- ndecl = ''
- ns = obj._validNamespaceURI(self.config.typesNamespaceURI,
- self.config.strictNamespaces)
- if ns:
- ns, ndecl = self.genns(ns_map, ns)
- tag = ns + tag
- self.out.append("<%s%s%s%s%s>\n" % (tag, ndecl, id, a, r))
-
- keylist = obj.__dict__.keys()
-
- # first write out items with order information
- if hasattr(obj, '_keyord'):
- for i in range(len(obj._keyord)):
- self.dump(obj._aslist(i), obj._keyord[i], 1, ns_map)
- keylist.remove(obj._keyord[i])
-
- # now write out the rest
- for k in keylist:
- if (k[0] != "_"):
- self.dump(getattr(obj,k), k, 1, ns_map)
-
- if isinstance(obj, bodyType):
- self.multis = 1
-
- for v, k in self.multirefs:
- self.dump(v, k, typed = typed, ns_map = ns_map)
-
- self.out.append('</%s>\n' % tag)
-
- elif isinstance(obj, anyType):
- t = ''
-
- if typed:
- ns = obj._validNamespaceURI(self.config.typesNamespaceURI,
- self.config.strictNamespaces)
- if ns:
- ons, ondecl = self.genns(ns_map, ns)
- ins, indecl = self.genns(ns_map,
- self.config.schemaNamespaceURI)
- t = ' %stype="%s%s"%s%s' % \
- (ins, ons, obj._type, ondecl, indecl)
-
- self.out.append('<%s%s%s%s%s>%s</%s>\n' %
- (tag, t, id, a, r, obj._marshalData(), tag))
-
- else: # Some Class
- self.out.append('<%s%s%s>\n' % (tag, id, r))
-
- for (k, v) in obj.__dict__.items():
- if k[0] != "_":
- self.dump(v, k, 1, ns_map)
-
- self.out.append('</%s>\n' % tag)
-
-
- ################################################################################
- # SOAPBuilder's more public interface
- ################################################################################
- def buildSOAP(args=(), kw={}, method=None, namespace=None, header=None,
- methodattrs=None,envelope=1,encoding='UTF-8',config=Config,noroot = 0):
- t = SOAPBuilder(args=args,kw=kw, method=method, namespace=namespace,
- header=header, methodattrs=methodattrs,envelope=envelope,
- encoding=encoding, config=config,noroot=noroot)
- return t.build()
|