Viewing file: ContainerImp.py (26.65 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Server/Server/SCore/ContainerImp.py,v 1.40 2005/04/11 16:51:12 jkloth Exp $ """ Container repository resource class.
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
import time, warnings
import XmlDocumentImp
from Ft.Lib import Uri, Time from Ft.Rdf import RDF_MS_BASE from Ft.Server import FTSERVER_NAMESPACE from Ft.Server.Common import ResourceTypes, Schema from Ft.Server.Common import DocumentDefinitionSerialization, XmlLib from Ft.Server.Common import AclConstants, SchematronStylesheet from Ft.Server.Common.ResourceTypes import ResourceType from Ft.Server.Server import FtServerServerException, Error from Ft.Server.Server.Drivers import FtssInputSource from Ft.Server.Server.Lib import SwishHelper from Ft.Xml import XPath, Domlette, Xslt from Ft.Xml.XLink import XLINK_NAMESPACE from Ft.Xml.XPath import Context from Ft.Xml.Xslt import XSL_NAMESPACE, StylesheetReader from Ft.Server.Server.Drivers.FtssDriver import DataStoreContainer
class ContainerImp(XmlDocumentImp.XmlDocumentImp, DataStoreContainer): """ A repository container """ resourceType = ResourceType.CONTAINER def __init__(self, path, driver, cache): XmlDocumentImp.XmlDocumentImp.__init__(self, path, driver, cache) return
def _delete(self): for c in self[:]: c.delete() return XmlDocumentImp.XmlDocumentImp._delete(self)
def getBasePath(self): """ Get the path of this object as a base path """ self._verifyTx() return self._path
################################### #XML representation ################################### def getContent(self): """ Overidden to do return XML serialization of datastore container """ self._verifyTx() return str(self._driver._driver.childReferenceXML(self._path))
def setContent(self,src): """ Raises NOT_SUPPORTED """ self._verifyTx() raise FtServerServerException(Error.NOT_SUPPORTED, reason='Cant provide XML content for datastore containers')
################################### #Creation Interfaces ################################### def createDocument(self, path, src, docDef=None, imt='text/xml', forcedType=None): """ Creates a document with the given document definition name, path, type, and source. if the type is not specified it will attempt to infer the type based on IMT and the src. If path is an empty string, then a UUID will be generated for the XML, XSLT, and Schematron documents. If the document definition specifies a full text index, then the content is indexed """ if not isinstance(src, str): raise TypeError("src for %s must be a string object, not %s" % (path,type(src)))
self._verifyTx() if forcedType is None: #Peek at the document to see what type it is #We always want to pass a base path, so if not given, use #the base object's if path: if path.startswith('/'): tmpPath = path else: tmpPath = self._basePath.normalize(path).absolutePath else: tmpPath = self._basePath.absolutePath dom = FtssInputSource.NonvalidatingReader.parseString( src, tmpPath, self._driver)
processorNss = {'ftss':FTSERVER_NAMESPACE, 'xlink':XLINK_NAMESPACE, 'dc':Schema.DC, } con = Context.Context(dom, processorNss=processorNss) if dom.documentElement.namespaceURI == FTSERVER_NAMESPACE: #One of our own if dom.documentElement.localName == 'Container': #Create a container, verify the children if len(XPath.Evaluate('/ftss:Container/ftss:Children/ftss:ChildReference', context=con)): raise FtServerServerException(Error.INVALID_XML, msg="You cannot specify children in a container") forcedType = ResourceType.CONTAINER elif dom.documentElement.localName == 'DocDef': #Just an XPATH Doc Def forcedType = ResourceType.XPATH_DOCUMENT_DEFINITION elif dom.documentElement.localName == 'Command': #Just a Command doc forcedType = ResourceType.COMMAND elif dom.documentElement.localName == 'Server': #Just a Server doc forcedType = ResourceType.SERVER elif dom.documentElement.localName == 'Alias': #Just a Alias doc forcedType = ResourceType.ALIAS elif dom.documentElement.localName == 'User': #Just a User doc forcedType = ResourceType.USER elif dom.documentElement.localName == 'Group': #Just a Group doc forcedType = ResourceType.GROUP elif dom.documentElement.localName == 'NsMappings': #Just a NsMappings doc forcedType = ResourceType.XML_DOCUMENT else: raise FtServerServerException(Error.INVALID_XML, msg = "Unknown element in the %s namespace: %s" % (FTSERVER_NAMESPACE, dom.documentElement.localName))
elif (dom.documentElement.namespaceURI, dom.documentElement.localName) == (XSL_NAMESPACE, 'stylesheet') or \ (dom.documentElement.namespaceURI, dom.documentElement.localName) == (XSL_NAMESPACE, 'transform') or \ dom.documentElement.hasAttributeNS(XSL_NAMESPACE, 'version'): if len(XPath.Evaluate('/*/ftss:CreationParams', context=con)) or \ len(XPath.Evaluate('/*/ftss:BaseNames', context=con)): #An XSLT Document Definition forcedType = ResourceType.XSLT_DOCUMENT_DEFINITION else: #Just an XSLT document forcedType = ResourceType.XSLT_DOCUMENT elif (dom.documentElement.namespaceURI, dom.documentElement.localName) == ('http://www.ascc.net/xml/schematron', 'schema'): #Just a schematron document forcedType = ResourceType.SCHEMATRON_DOCUMENT elif (dom.documentElement.namespaceURI, dom.documentElement.localName) == (RDF_MS_BASE, 'RDF'): #Just a RDF document forcedType = ResourceType.RDF_DOCUMENT else: #Just an XML Document, let it fall through forcedType = ResourceType.XML_DOCUMENT
if forcedType in ResourceTypes.XML_DOCUMENTS + [ResourceType.RAW_FILE] and not path: #Generate a URI path = Uri.BASIC_RESOLVER.generate()[9:]
path = self._basePath.normalize(path)
if docDef: docDefPath = self._basePath.normalize(docDef) if self._driver.getType(docDefPath) not in ResourceTypes.DOCUMENT_DEFINITIONS: raise FtServerServerException(Error.INVALID_PATH, path=docDefPath.displayPath, type='Document Definition') dd = self._fetchResource(docDefPath) cp = dd.getCreationParams() if cp.validationInfo is not None: if not cp.validationInfo.validate(self.getRoot(), src,path.absolutePath): raise FtServerServerException(Error.VALIDATION_ERROR, message='Document Failed Validation') else: docDefPath = None
if forcedType in [ResourceType.XML_DOCUMENT, ResourceType.XSLT_DOCUMENT, ResourceType.COMMAND, ResourceType.SERVER, ResourceType.SCHEMATRON_DOCUMENT]:
if forcedType == ResourceType.SERVER: #Make sure it is a SU if not AclConstants.SUPER_USER_GROUP_NAME in self._driver.getAclIdentifiers(): raise FtServerServerException(Error.PERMISSION_DENIED, level = 'create server', path=self.getDisplayPath())
#import XmlDocumentImp md = XmlDocumentImp.NewXmlDocumentXml( self._driver, path.name, {}, self._driver.getAclIdent(), imt, src, docDefPath and docDefPath.absolutePath or None, forcedType, )
self._driver.createResource(path, md, src) if forcedType == ResourceType.XSLT_DOCUMENT:
isrc = FtssInputSource.FtssInputSourceFactory.fromString(src,path.absolutePath,self._driver)
res = self._fetchResource(path)
reader = StylesheetReader.StylesheetReader()
# register extension elements with the reader # NOTE: this is only needed for compiled stylesheets as # the Processor interface will take care of this functions, elements = res.getExtFunctionsAndElements() reader.addExtensionElementMapping(elements)
sty = reader.fromSrc(isrc)
#Compile the stylesheet self._driver.setResourceCache( path, sty.root ) elif forcedType == ResourceType.SCHEMATRON_DOCUMENT: sch = SchematronStylesheet.ParseSchematron(src, path.absolutePath,self._driver).root self._driver.setResourceCache( path, sch ) elif forcedType == ResourceType.XPATH_DOCUMENT_DEFINITION: r = FtssInputSource.NonvalidatingReader dom = r.parseString(src, path.absolutePath, self._driver) bns, ns_maps, rdf_maps, creationParams = DocumentDefinitionSerialization.Deserialize(dom) dd = self.createXPathDocumentDefinition(path.absolutePath, ns_maps, rdf_maps, creationParams, docDef=docDefPath) for b in bns: dd.addBaseDocumentDefinition(self.fetchResource(b)) dd.setContent(src) elif forcedType == ResourceType.RDF_DOCUMENT: import RdfDocumentImp md = XmlDocumentImp.NewXmlDocumentXml( self._driver, path.name, {}, self._driver.getAclIdent(), imt, src, None, forcedType, )
# res.setContent(src) set the source to the actual content. # Changed create to now do that because of the unneeded steps. #dummySrc = XmlLib.MakeString('<dummy>Place holder for RDF with scope of %s</dummy>' % path.absolutePath) #self._driver.createResource(path, md, dummySrc) self._driver.createResource(path, md, src)
res = self._fetchResource(path) # Change setContent to newContent so as to not remove statements # since there shouldn't be any yet! #res.setContent(src) res.newContent(src) elif forcedType == ResourceType.CONTAINER: self.createContainer(path, createParents=0, docDef=docDefPath, actualContent=src) elif forcedType == ResourceType.ALIAS: r = FtssInputSource.NonvalidatingReader dom = r.parseString(src, path.absolutePath,self._driver) processorNss = {'ftss':FTSERVER_NAMESPACE, } con = Context.Context(dom, processorNss=processorNss) reference = XPath.Evaluate('string(/ftss:Alias/@reference)', context=con) if not reference: raise FtServerServerException(Error.INVALID_XML, msg="You must specify a reference attribute") temp = self.fetchResource(reference) obj = temp.addAlias(path.absolutePath) obj.setContent(src) elif forcedType == ResourceType.USER: r = FtssInputSource.NonvalidatingReader dom = r.parseString(src, path.absolutePath, self._driver) processorNss = {'ftss':FTSERVER_NAMESPACE, } con = Context.Context(dom,processorNss = processorNss) pw = XPath.Evaluate('string(/ftss:User/ftss:PasswdHash)',context = con) if not pw: raise FtServerServerException(Error.INVALID_XML, msg="You must specify a password element attribute") base = self._fetchResource(path.getParentPath()) obj = base.createUser(path.name,pw) obj.setContent(src) elif forcedType == ResourceType.GROUP: r = FtssInputSource.NonvalidatingReader dom = r.parseString(src, path.absolutePath,self._driver) processorNss = {'ftss':FTSERVER_NAMESPACE, 'xlink':XLINK_NAMESPACE } con = Context.Context(dom,processorNss = processorNss) members = XPath.Evaluate('/ftss:Group/ftss:Members/ftss:MemberReference/@xlink:href',context = con)
memberObjs = [] for m in members: if not m.value[-9:] == ';metadata': raise FtServerServerException(Error.INVALID_XML, msg="Member reference must specify a reference to meta data") memberObjs.append(self.getRoot().fetchResource(m.value[:-9])) base = self._fetchResource(path.getParentPath()) obj = base.createGroup(path.name) obj.setContent(src) elif forcedType == ResourceType.XSLT_DOCUMENT_DEFINITION: self.createXsltDocumentDefinition(path.absolutePath, src, None, docDef=docDefPath) else: raise FtServerServerException(Error.UNKNOWN_RESOURCE_TYPE, type=forcedType)
#See if it needs text indexing obj = self._fetchResource(path) if obj.getDocumentDefinition() and obj.getDocumentDefinition().getCreationParams().fullTextIndex: SwishHelper.Index(path.absolutePath, obj.getContent(), self.getRoot()) return obj
def createGroup(self, name, docDef=None): """ Creates a new group. You need write access to the system group container """ self._verifyTx() #Make sure they have permissions if not AclConstants.USERS_GROUP_NAME in self._driver.getAclIdentifiers(): if not AclConstants.SUPER_USER_GROUP_NAME in self._driver.getAclIdentifiers(): raise FtServerServerException(Error.PERMISSION_DENIED, level='create group', path=self.getDisplayPath())
#See if it is a valid Acl Ident if name in self.getRoot().getAllUserNames(): raise FtServerServerException(Error.GROUP_EXISTS, groupName=name) if name in self.getRoot().getAllGroupNames(): raise FtServerServerException(Error.GROUP_EXISTS, groupName=name) if name in [AclConstants.WORLD_GROUP_NAME, AclConstants.USERS_GROUP_NAME, AclConstants.OWNER, AclConstants.ANONYMOUS_USER_NAME]: raise FtServerServerException(Error.GROUP_EXISTS, groupName=name)
groupPath = self._basePath.normalize(name)
acl = self._driver.defaultAcl(name, self._driver.getAclIdent())
import GroupImp md,content = GroupImp.NewGroupXml(self._driver, name, groupPath.name, acl, self._driver.getAclIdent(), docDef)
self._driver.createResource(groupPath, md, content) return self._fetchResource(groupPath)
def createUser(self, name, passHash, docDef=None): """ Create a new user. You must have write access to te system group container """ self._verifyTx() #Make sure they have permissions if not AclConstants.USERS_GROUP_NAME in self._driver.getAclIdentifiers(): if not AclConstants.SUPER_USER_GROUP_NAME in self._driver.getAclIdentifiers(): raise FtServerServerException(Error.PERMISSION_DENIED, level='create user', path=self.getDisplayPath())
#See if it is a valid Acl Ident if name in self.getRoot().getAllUserNames(): raise FtServerServerException(Error.USER_EXISTS, userName=name) if name in self.getRoot().getAllGroupNames(): raise FtServerServerException(Error.USER_EXISTS, userName=name)
if name in [AclConstants.WORLD_GROUP_NAME, AclConstants.USERS_GROUP_NAME, AclConstants.OWNER, AclConstants.ANONYMOUS_USER_NAME]: raise FtServerServerException(Error.USER_EXISTS, userName=name)
userPath = self._basePath.normalize(name)
acl = self._driver.defaultAcl(name, self._driver.getAclIdent())
import UserImp md, content = UserImp.NewUserXml(self._driver, name, userPath.name, passHash, acl, self._driver.getAclIdent(), docDef) self._driver.createResource(userPath, md, content) return self._fetchResource(userPath)
def createXPathDocumentDefinition(self, path, nss, maps, cp, docDef=None): content = DocumentDefinitionSerialization.Serialize([], nss, maps, cp)
path = self._basePath.normalize(path)
if docDef: docDefPath = self._basePath.normalize(docDef)
if self._driver.getType(docDefPath) not in ResourceTypes.DOCUMENT_DEFINITIONS: raise FtServerServerException(Error.INVALID_PATH, path=docDefPath.displayPath, type='Document Definition') else: docDefPath = None
import XPathDocumentDefinitionImp md = XPathDocumentDefinitionImp.NewDocumentDefinitionXml( self._driver, path.name, {}, self._driver.getAclIdent(), 'text/xml', content, docDefPath and docDefPath.absolutePath or None, )
self._driver.createResource(path, md, content) return self._fetchResource(path)
def createXsltDocumentDefinition(self, path, xsltSrc, cp, docDef=None): """Create an XSLT document definition from pieces"""
path = self._basePath.normalize(path)
if docDef: docDefPath = self._basePath.normalize(docDef) if self._driver.getType(docDefPath) not in ResourceTypes.DOCUMENT_DEFINITIONS: raise FtServerServerException(Error.INVALID_PATH, path=docDefPath.displayPath, type='Document Definition') else: docDefPath = None
import XsltDocumentDefinitionImp md = XsltDocumentDefinitionImp.NewDocumentDefinitionXml( self._driver, path.name, {}, self._driver.getAclIdent(), 'text/xml', xsltSrc, docDefPath and docDefPath.absolutePath or None, )
self._driver.createResource(path, md, xsltSrc) self._driver.xupdateContent(path, XsltDocumentDefinitionImp.NEW_SHEET_XUPDATE)
dd = self._fetchResource(path)
reader = StylesheetReader.StylesheetReader() # register extension elements with the reader # NOTE: this is only needed for compiled stylesheets as # the Processor interface will take care of this functions, elements = dd.getExtFunctionsAndElements() reader.addExtensionElementMapping(elements)
isrc = FtssInputSource.FtssInputSourceFactory.fromString(xsltSrc,path.absolutePath,self._driver)
sty = reader.fromSrc(isrc) #Compile the stylesheet self._driver.setResourceCache( path, sty.root ) if cp is not None: dd.setCreationParams(cp) return dd
def createRawFile(self, path, imt, src): """ Creates a raw file resource, using the specified, path, internet media type, and source string. """
if not isinstance(src, str): raise TypeError("src must be a string object")
self._verifyTx() if not path: path = Uri.BASIC_RESOLVER.generate()[9:]
path = self._basePath.normalize(path) import RawFileImp md = RawFileImp.NewRawFileXml(self._driver, path.name, {}, self._driver.getAclIdent(), imt, src)
self._driver.createResource(path, md, src) return self._fetchResource(path)
def createUriReferenceFile(self, path, imt, srcUri): """ Create an URI Reference file. An URI Reference file is a reference to a URI """
self._verifyTx() path = self._basePath.normalize(path)
import UriReferenceFileImp md = UriReferenceFileImp.NewUriReferenceFileXml( self._driver, path.name, {}, self._driver.getAclIdent(), imt, srcUri, ) self._driver.createResource( path, md, "Place holder for Uri Reference File %s"%srcUri) return self._fetchResource(path)
###################################### #Delete Interface ###################################### def deleteResource(self, path, traverseAliases=None): """ Delete any resource from the system. """ if (traverseAliases != None): warnings.warn("You are using a deprecated traverse alias feature. Please specify with path arguments ;traverse or ;no-traverse", DeprecationWarning, 2) if traverseAliases: path += ";traverse" else: path += ";no-traverse" res = self.fetchResource(path) res.delete() return
######################################## #Temporary file Interfaces ######################################## def setPathTemporary(self, path, timeToLive): """ Marks the specified repository object as temporary """ path = self._basePath.normalize(path) m = self._driver.getSystemModel() timeToLive = Time.FromPythonTime(time.time() + timeToLive) #see if it was already marked if len(m.complete(path.absolutePath, Schema.TIME_TO_LIVE, None)) > 0: #Update xu = XmlLib.MakeString(UPDATE_TIME_TO_LIVE_XUPDATE%(FTSERVER_NAMESPACE, XLINK_NAMESPACE, timeToLive)) else: #Set xu = XmlLib.MakeString(SET_TIME_TO_LIVE_XUPDATE%(FTSERVER_NAMESPACE, XLINK_NAMESPACE, timeToLive)) self._driver.xupdateMetaData(path, xu) return
######################################### #Access interfaces ######################################### #User List and Dict interfaces def __len__(self): """Overides the length operator""" self._verifyTx() return len(self._getChildren())
def __getitem__(self, i): """Implements list access for containers and repositories""" self._verifyTx() if isinstance(i, (str, unicode)): #Dict interface childNames = self.keys() if not i in childNames: raise KeyError(i) i = childNames.index(i)
return self.fetchResource(self._getChildren()[i] + ';no-traverse')
def __getslice__(self, i, j): """Implements list slice access for containers and repositories""" i = max(i, 0); j = min(j, len(self._getChildren())) res = [] for ctr in range(i, j): res.append(self[ctr]) return res
def index(self, item): """ User List interfaces """ self._verifyTx() for ctr in xrange(len(self)): if self[ctr] == item: return ctr return -1
def keys(self): """User Dict interface""" self._verifyTx() return self._getChildren()
def iterkeys(self): """User Dict interface""" return iter(self.keys())
def items(self): """User Dict interface""" self._verifyTx() return map(None, self.keys(), self.values())
def iteritems(self): """User Dict interface""" return iter(self.items())
def values(self): """User Dict interface""" self._verifyTx() return self[:]
def itervalues(self): """User Dict interface""" return iter(self.values())
def has_key(self, key): """User Dict interface""" return key in self.keys()
__contains__ = has_key
def get(self, key, failobj=None): """User Dict interface""" try: return self[key] except KeyError: return failobj
UPDATE_TIME_TO_LIVE_XUPDATE="""<xupdate:modifications version="1.0" xmlns:xupdate="http://www.xmldb.org/xupdate" xmlns:ftss="%s" xmlns:xlink="%s" > <xupdate:update select="/ftss:MetaData/ftss:TimeToLive">%s</xupdate:update> </xupdate:modifications> """
SET_TIME_TO_LIVE_XUPDATE="""<xupdate:modifications version="1.0" xmlns:xupdate="http://www.xmldb.org/xupdate" xmlns:ftss="%s" xmlns:xlink="%s" > <xupdate:append select="/ftss:MetaData"><ftss:TimeToLive>%s</ftss:TimeToLive></xupdate:append> </xupdate:modifications> """
|