Viewing file: Serialize.py (22.96 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Server/Common/Install/Serialize.py,v 1.36 2005/03/19 09:28:44 mbrown Exp $ """ Serialization and Deserialization of setup files.
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
from __future__ import generators
import sys, os, posixpath from xml.dom import Node
from Ft.Lib import Uri from Ft.Server.Common import ResourceTypes, AclConstants from Ft.Server.Common.Install import PRODUCT_NAMESPACE, InstallUtil from Ft.Server.Common.Schema import DC from Ft.Server.Common.Util import IMT_MAP from Ft.Server.Server import FtServerServerException, Error from Ft.Xml import EMPTY_NAMESPACE from Ft.Xml import Domlette from Ft.Xml.XPath import Conversions
g_defaultFileMap = {} for ext,imt in IMT_MAP.items(): if imt == 'text/xml': klass = InstallUtil.XmlDocument else: klass = InstallUtil.RawFile g_defaultFileMap[ext] = (None, imt, klass)
#Add some RDF stuff g_defaultFileMap['.rdf'] = (None,None, InstallUtil.RdfDocument) g_defaultFileMap['.daml'] = (None,None, InstallUtil.RdfDocument) g_defaultFileMap['.owl'] = (None,None, InstallUtil.RdfDocument) g_defaultFileMap['.rdfs'] = (None,None, InstallUtil.RdfDocument)
# os.walk (a generator) is available in Python 2.3+ try: from os import walk as _walk except ImportError: # define our own def _walk(top, topdown=True, onerror=None): """os.walk() from Python 2.4""" from os import listdir from os.path import join, isdir, islink try: names = listdir(top) except error, err: if onerror is not None: onerror(err) return
dirs, nondirs = [], [] for name in names: if isdir(join(top, name)): dirs.append(name) else: nondirs.append(name)
if topdown: yield top, dirs, nondirs for name in dirs: path = join(top, name) if not islink(path): for x in _walk(path, topdown, onerror): yield x if not topdown: yield top, dirs, nondirs
def Deserialize(stream, refUri=''): """ Deserialize a stream that references a setup file by URI """ reader = Domlette.NonvalidatingReader dom = reader.parseStream(stream, refUri)
if (dom.documentElement.namespaceURI, dom.documentElement.localName) != (PRODUCT_NAMESPACE, 'Product'): raise FtServerServerException(Error.CONFIG_MISSING_PROPERTY, property="'Product' in namespace %r" % PRODUCT_NAMESPACE)
version = dom.documentElement.getAttributeNS(EMPTY_NAMESPACE, 'version') description = '' title = ''
resources = []
# don't forget: # stream is the setup XML itself # refUri is the URI of the stream # src, href are URIs, possibly relative to refUri # path is an absolute path in the repo for child in dom.documentElement.childNodes: if (child.namespaceURI, child.localName) == (DC, 'Description'): description = Conversions.StringValue(child) elif (child.namespaceURI, child.localName) == (DC, 'Title'): title = Conversions.StringValue(child) elif child.namespaceURI != PRODUCT_NAMESPACE: continue elif child.localName == 'Include': ValidateAttribute(child,EMPTY_NAMESPACE, 'href') href = Uri.PathResolve([refUri, child.getAttributeNS(EMPTY_NAMESPACE, 'href')]) f = Uri.UrlOpen(href) childprod = Deserialize(f, refUri=href) f.close() resources.extend(childprod.resourceList) elif child.localName == 'Container': ValidateAttribute(child,EMPTY_NAMESPACE, 'path') path = child.getAttributeNS(EMPTY_NAMESPACE, 'path') acl, owner = LoadAcl(child) docDef = child.getAttributeNS(EMPTY_NAMESPACE, 'doc-def') or None cont = InstallUtil.Container(path, acl, owner, None, docDef) resources.append(cont) elif child.localName == 'UriReferenceFile': imt = child.getAttributeNS(EMPTY_NAMESPACE, 'imt') or 'text/plain' ValidateAttribute(child,EMPTY_NAMESPACE,'path') path = child.getAttributeNS(EMPTY_NAMESPACE, 'path') href = Uri.PathResolve([refUri, child.getAttributeNS(EMPTY_NAMESPACE, 'href')]) acl, owner = LoadAcl(child) urf = InstallUtil.UriReferenceFile(path, acl, owner, imt, href) resources.append(urf) elif child.localName == 'ExtensionModule': ValidateAttribute(child,EMPTY_NAMESPACE,'file') file = child.getAttributeNS(EMPTY_NAMESPACE, 'file') file = Uri.PathResolve([refUri, file]) #file = Uri.UriToOsPath(file) extmod = InstallUtil.ExtensionModule(file) resources.append(extmod) elif child.localName == 'RawFile': imt = child.getAttributeNS(EMPTY_NAMESPACE, 'imt') or 'text/plain' ValidateAttribute(child,EMPTY_NAMESPACE,'path') path = child.getAttributeNS(EMPTY_NAMESPACE, 'path') ValidateAttribute(child,EMPTY_NAMESPACE,'src') src = Uri.PathResolve([refUri, child.getAttributeNS(EMPTY_NAMESPACE, 'src')]) acl, owner = LoadAcl(child) rf = InstallUtil.RawFile(path, acl, owner, imt) rf.setPath(contentUri=src) resources.append(rf) elif child.localName in g_typeMap: klass = g_typeMap[child.localName] imt = child.getAttributeNS(EMPTY_NAMESPACE, 'imt') or 'text/xml' ValidateAttribute(child,EMPTY_NAMESPACE,'path') path = child.getAttributeNS(EMPTY_NAMESPACE, 'path') docDefName = child.getAttributeNS(EMPTY_NAMESPACE, 'doc-def') or None
ValidateAttribute(child,EMPTY_NAMESPACE,'src') src = child.getAttributeNS(EMPTY_NAMESPACE, 'src') src = Uri.PathResolve([refUri, src])
acl, owner = LoadAcl(child) doc = klass(path, acl, owner, imt, docDefName) doc.setPath(contentUri=src) if child.hasAttributeNS(EMPTY_NAMESPACE, 'encoding'): doc.encoding = child.getAttributeNS(EMPTY_NAMESPACE, 'encoding') resources.append(doc) elif child.localName == 'Alias': ValidateAttribute(child,EMPTY_NAMESPACE,'reference') ref = child.getAttributeNS(EMPTY_NAMESPACE, 'reference') ValidateAttribute(child,EMPTY_NAMESPACE,'path') path = child.getAttributeNS(EMPTY_NAMESPACE, 'path')
acl,owner = LoadAcl(child) al = InstallUtil.Alias(path, acl, owner, None, None, ref) resources.append(al) elif child.localName == 'User': name = child.getAttributeNS(EMPTY_NAMESPACE, 'name') base_path = child.getAttributeNS(EMPTY_NAMESPACE, 'base-path') ValidateAttribute(child,EMPTY_NAMESPACE,'name') #FIXME: temporary for backwards compat if not base_path: base_path = child.getAttributeNS(EMPTY_NAMESPACE, 'base-uri') print "%s element for %s uses obsolete attribute 'base-uri'. Please amend to 'base-path'"%(child.localName, name)
#Try to use the passwdHash if it is there passwdHash = None password = None if child.hasAttributeNS(EMPTY_NAMESPACE, 'passwdHash'): passwdHash = child.getAttributeNS(EMPTY_NAMESPACE, 'passwdHash') else: password = child.getAttributeNS(EMPTY_NAMESPACE, 'password')
docDefName = child.getAttributeNS(EMPTY_NAMESPACE, 'doc-def') or None acl,owner = LoadAcl(child) data={} for sChild in child.childNodes: if (sChild.namespaceURI, sChild.localName) == (PRODUCT_NAMESPACE,'Entry'): key = sChild.getAttributeNS(EMPTY_NAMESPACE,'key') if not sChild.firstChild: value = '' else: value = sChild.firstChild.nodeValue data[key]=value
u = InstallUtil.User(name, acl, owner, None, docDefName, base_path, password, data, passwdHash = passwdHash) resources.append(u)
elif child.localName == 'Group': ValidateAttribute(child,EMPTY_NAMESPACE,'name') name = child.getAttributeNS(EMPTY_NAMESPACE, 'name') base_path = child.getAttributeNS(EMPTY_NAMESPACE, 'base-path') #FIXME: temporary for backwards compat if not base_path: base_path = child.getAttributeNS(EMPTY_NAMESPACE, 'base-uri') print "%s element for %s uses obsolete attribute 'base-uri'. Please amend to 'base-path'"%(child.localName, name) docDefName = child.getAttributeNS(EMPTY_NAMESPACE, 'doc-def') or None acl, owner = LoadAcl(child) members = [] for sChild in child.childNodes: if (sChild.namespaceURI, sChild.localName) == (PRODUCT_NAMESPACE,'Member'): m = sChild.getAttributeNS(EMPTY_NAMESPACE, 'name') if not m: raise SyntaxError("Invalid member definition in Group %s, must specifiy a name attribute" % name) members.append(m)
g = InstallUtil.Group(name, acl, owner, None, None, base_path, members) resources.append(g) elif child.localName == 'Mirror': ValidateAttribute(child,EMPTY_NAMESPACE,'path') path = child.getAttributeNS(EMPTY_NAMESPACE, 'path') #if path[-1] == '/': path = path[:-1] ValidateAttribute(child,EMPTY_NAMESPACE,'src') src = Uri.PathResolve([refUri, child.getAttributeNS(EMPTY_NAMESPACE, 'src')]) fm = g_defaultFileMap.copy() for sChild in child.childNodes: if (sChild.namespaceURI, sChild.localName) == (PRODUCT_NAMESPACE, 'FileMapping'): ext = sChild.getAttributeNS(EMPTY_NAMESPACE, 'extension') dd = sChild.getAttributeNS(EMPTY_NAMESPACE, 'doc-def') or None imt = sChild.getAttributeNS(EMPTY_NAMESPACE, 'imt') or None rt = sChild.getAttributeNS(EMPTY_NAMESPACE, 'resource-type') or None klass = g_typeMap.get(rt, None) if klass is None and dd is not None: klass= InstallUtil.XmlDocument if klass is None: klass = InstallUtil.RawFile if ext[0] != '.': ext = '.' + ext fm[ext] = (dd, imt, klass) acl, owner = LoadAcl(child) cont = InstallUtil.Container(path, acl, owner, None, None) resources.append(cont) resources.extend(MirrorDir(path, src, fm)) else: raise Exception("Unknown definition in the '%s' namespace, %s" % (PRODUCT_NAMESPACE, child.localName))
product = InstallUtil.Product(resources) product.version = version product.name = title product.description = description return product
def ValidateAttribute(node,ns,ln): if not node.hasAttributeNS(ns,ln): raise ValueError("Element: '%s:%s' does not have required the attribute '%s'" % (node.prefix,node.localName,ln))
def Serialize(product, refUri=''): """ Serialize a product list into a string """ #Now create the setup file #Because install is smart enough to figure out dependencies, just throw in the information rt = ""
rt += """<ftssp:Product xmlns:ftssp='%s' xmlns:dc='http://purl.org/dc/elements/1.1/' version = '%s'>\n""" % (PRODUCT_NAMESPACE, product.version or '0') rt += """ <dc:Description>%s</dc:Description> <dc:Title>%s</dc:Title>\n""" % (product.description or '',product.name or '')
refUri = refUri + '/' * (refUri[-1] != '/')
#Add the resources for res in product.resourceList: if res.resourceType == ResourceTypes.ResourceType.CONTAINER: if res.docDef: ddString = "doc-def = '%s' " % docDef else: ddString = ''
rt += " <ftssp:Container path='%s' %s>\n" % (res.path,ddString) rt += SerializeAclAndOwner(res) rt += " </ftssp:Container>\n" elif res.resourceType == ResourceTypes.ResourceType.RAW_FILE: if res.contentUri[:len(refUri)] == refUri: lfn = res.contentUri[len(refUri):] else: lfn = res.contentUri rt += " <ftssp:RawFile path='%s' src='%s' imt='%s'>\n" % (res.path, lfn, res.imt) rt += SerializeAclAndOwner(res) rt += " </ftssp:RawFile>\n" elif res.resourceType in g_xmldocMapping: if res.docDef: ddString = "doc-def = '%s'" % res.docDef else: ddString = ''
if res.contentUri[:len(refUri)] == refUri: lfn = res.contentUri[len(refUri):] else: lfn = res.contentUri
rt += " <ftssp:%s path='%s' src='%s' imt='%s' %s>\n" % (g_xmldocMapping[res.resourceType], res.path, lfn, res.imt, ddString ) rt += SerializeAclAndOwner(res) rt += " </ftssp:%s>\n" % g_xmldocMapping[res.resourceType] elif res.resourceType == ResourceTypes.ResourceType.USER: if res.docDef: ddString = "doc-def = '%s'" % res.docDef else: ddString = ''
if res.password is None: pwString = "passwdHash = '%s'" % res.passwordHash else: pwString = "password = '%s'" % res.password
rt += " <ftssp:User name='%s' %s imt='%s' %s base-path='%s'>\n" % (res.userName, pwString, res.imt, ddString, res.basePath) rt += SerializeAclAndOwner(res) rt += " </ftssp:User>\n" elif res.resourceType == ResourceTypes.ResourceType.URI_REFERENCE_FILE: if res.href[:len(refUri)] == refUri: lfn = res.href[len(refUri):] else: lfn = res.href
rt += " <ftssp:UriReferenceFile path='%s' href='%s' imt='%s'>\n" % (res.path, lfn, res.imt) rt += SerializeAclAndOwner(res) rt += " </ftssp:UriReferenceFile>\n" elif res.resourceType == ResourceTypes.ResourceType.GROUP: if res.docDef: ddString = "doc-def = '%s'" % res.docDef else: ddString = ''
rt += " <ftssp:Group name='%s' imt='%s' %s base-path='%s'>\n" % (res.groupName, res.imt, ddString, res.basePath, ) for m in res.members: rt += " <ftssp:Member name='%s'/>\n" % (m)
rt += SerializeAclAndOwner(res) rt += " </ftssp:Group>\n"
elif res.resourceType == ResourceTypes.ResourceType.ALIAS: if res.docDef: ddString = "doc-def = '%s'" % res.docDef else: ddString = ''
rt += " <ftssp:Alias reference='%s' imt='%s' %s path='%s'>\n" % (res.reference, res.imt, ddString, res.path,) rt += SerializeAclAndOwner(res) rt += " </ftssp:Alias>\n" elif isinstance(res,InstallUtil.ExtensionModule): rt += " <ftssp:ExtensionModule file='%s'/>\n" % res._file
rt += """</ftssp:Product>""" return rt
def SerializeAclAndOwner(res): r = " <ftssp:Acl>\n" for typ,ident,allowed in res.acl: if ident == AclConstants.SUPER_USER_GROUP_NAME: r += " <ftssp:SuperUserAccess type='%s' allowed='%d'/>\n""" % (typ, not not allowed) elif ident == AclConstants.WORLD_GROUP_NAME: r += " <ftssp:WorldAccess type='%s' allowed='%d'/>\n""" % (typ, not not allowed) else: r += " <ftssp:Access ident='%s' type='%s' allowed='%d'/>\n""" % (ident, typ, not not allowed) r += " </ftssp:Acl>\n"
if res.owner != None: r += " <ftssp:Owner ident='%s'/>\n" % res.owner return r
g_typeMap = {'XsltDocument' : InstallUtil.XsltDocument, 'RdfDocument' : InstallUtil.RdfDocument, 'SchematronDocument' : InstallUtil.SchematronDocument, 'XPathDocumentDefinition' : InstallUtil.XPathDocumentDefinition, 'XsltDocumentDefinition' : InstallUtil.XsltDocumentDefinition, 'XmlDocument' : InstallUtil.XmlDocument, 'Server' : InstallUtil.Server, 'Command' : InstallUtil.Command, }
g_xmldocMapping = {ResourceTypes.ResourceType.XSLT_DOCUMENT : 'XsltDocument', ResourceTypes.ResourceType.SCHEMATRON_DOCUMENT : 'SchematronDocument', ResourceTypes.ResourceType.RDF_DOCUMENT : 'RdfDocument', ResourceTypes.ResourceType.XML_DOCUMENT : 'XmlDocument', ResourceTypes.ResourceType.XPATH_DOCUMENT_DEFINITION : 'XPathDocumentDefinition', ResourceTypes.ResourceType.XSLT_DOCUMENT_DEFINITION : 'XsltDocumentDefinition', ResourceTypes.ResourceType.SERVER : 'Server', ResourceTypes.ResourceType.COMMAND : 'Command', }
def LoadAcl(node): acl = [] owner = None for child in node.childNodes: if (child.namespaceURI,child.localName) == (PRODUCT_NAMESPACE, 'Acl'): for subChild in child.childNodes: if (subChild.namespaceURI,subChild.localName) == (PRODUCT_NAMESPACE,'Access'): #Generic Access allowed = subChild.getAttributeNS(EMPTY_NAMESPACE, 'allowed') == '1' acl.append((subChild.getAttributeNS(EMPTY_NAMESPACE, 'type'), subChild.getAttributeNS(EMPTY_NAMESPACE, 'ident'), allowed)) elif (subChild.namespaceURI,subChild.localName) == (PRODUCT_NAMESPACE, 'SuperUserAccess'): allowed = subChild.getAttributeNS(EMPTY_NAMESPACE, 'allowed') == '1' acl.append((subChild.getAttributeNS(EMPTY_NAMESPACE, 'type'), AclConstants.SUPER_USER_GROUP_NAME, allowed)) elif (subChild.namespaceURI, subChild.localName) == (PRODUCT_NAMESPACE, 'WorldAccess'): allowed = subChild.getAttributeNS(EMPTY_NAMESPACE, 'allowed') == '1' acl.append((subChild.getAttributeNS(EMPTY_NAMESPACE, 'type'), AclConstants.WORLD_GROUP_NAME, allowed)) elif (child.namespaceURI,child.localName) == (PRODUCT_NAMESPACE, 'Owner'): owner = child.getAttributeNS(EMPTY_NAMESPACE, 'ident') return acl, owner
def MirrorDirGen(path, src, fileMap, srcIsUri=True): """ Wrapper for CopyDirGen().
It optionally accepts a 'file' URI for src, and verifies that src refers to a directory. It does not accept an initial list of resources to append to. """ if srcIsUri: src = Uri.UriToOsPath(src, attemptAbsolute=False) if not os.path.isdir(src): raise Exception("%s is not a directory" % src) #print path,src,fileMap res = CopyDir(path, src, fileMap, []) #print res return res
def MirrorDir(path, src, fileMap, srcIsUri=True): """ For compatibility, same as MirrorDirGen, but returns a complete list, rather than a generator. """ return list(MirrorDirGen(path, src, fileMap, srcIsUri))
def CopyDir(path, src, fileMap={}, resources=[]): """ For compatibility, same as CopyDir, but returns a complete list, rather than a generator. """ return list(CopyDirGen(path, src, fileMap, resources))
def CopyDirGen(path, src, fileMap={}, resources=[]): """ Returns a generator of objects representing resources to add to the repository. The resources mirror a current snapshot of a local file system directory.
path = destination repo path (an existing Container) src = file system source path (a directory) must be a valid absolute URI if srcIsUri is True (default) must be a local path if srcIsUri is False fileMap = mapping of filename extensions to resource classes
A tree like /root/dir1/dir2 will be returned as a list like ['/root', '/root/dir1', '/root/dir1/dir2']. Files in each directory will be listed immediately following that directory. So files in dir1 will come before the entry for dir2. Directories are processed breadth-first.
path = destination repo path (an existing Container) src = file system source path (a directory) fileMap = mapping of filename extensions to resource classes resources = list of resource objects to append to """ #FIXME: get ACL defaults from file system? acl = [] owner = None
# yield the members of the original list for r in resources: yield r
# walk the file system srclen = len(src) for dirpath, subdirs, files in _walk(src, topdown=True):
# convert file system dir path to dest repo path if dirpath == src: container_path = path else: container_path = posixpath.join(path, posixpath.join(*dirpath[srclen+1:].split(os.sep))) # yield a Container object for the dir, but not the first one yield InstallUtil.Container(container_path, acl, owner, None, None)
# yield an appropriate object for each non-directory for fname in files: ext = os.path.splitext(fname)[1] if ext not in fileMap: print ("CopyDirGen(): Ignoring file with unknown extension: %s" % fname) continue
dd, imt, klass = fileMap[ext] repo_path = posixpath.join(container_path, fname) if klass is InstallUtil.RawFile: obj = klass(repo_path, acl, owner, imt) else: obj = klass(repo_path, acl, owner, imt, dd) fpath = os.path.join(dirpath, fname) obj.setPath(Uri.OsPathToUri(fpath, attemptAbsolute=False)) yield obj
|