Viewing file: ResourceMetaData.py (25.84 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Server/Server/Xslt/ResourceMetaData.py,v 1.36 2005/04/06 23:05:46 jkloth Exp $ """ XSLT and XPath extensions supporting the 4SS ResourceMetaData API
Copyright 2004 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
import cStringIO
from Ft.Xml.XPath import Conversions from Ft.Xml.Xslt import XsltElement, ContentInfo, AttributeInfo from Ft.Xml.Xslt import OutputParameters from Ft.Server.Common import ResourceTypes, Schema, AclConstants from Ft.Server import RESERVED_NAMESPACE from Ft.Server.Server import FtServerServerException
from Ns import SCORE_NS import FtssXsltBase
def ResourceType(context, path=None): """ Get the resource type of the resource """ path = Conversions.StringValue(path) base = FtssXsltBase.FetchBaseObject(context.processor, path) return Schema.g_rdfResourceTypes[base.resourceType]
def GetAbsolutePath(context, path=None): """ Get the full path into the system, resolving aliases if needed """ path = Conversions.StringValue(path) #print path return FtssXsltBase.FetchBaseObject(context.processor,path).getAbsolutePath()
def GetName(context, path=None): """ Get the name of the resource. """ path = Conversions.StringValue(path) #print path return FtssXsltBase.FetchBaseObject(context.processor,path).getName()
def GetCreationDate(context, path=None): """ Get the date the resource was created """ path = Conversions.StringValue(path) #print path return FtssXsltBase.FetchBaseObject(context.processor,path).getCreationDate()
def GetLastModifiedDate(context, path=None): """ Get the date the resource was last modified """ path = Conversions.StringValue(path) #print path return FtssXsltBase.FetchBaseObject(context.processor,path).getLastModifiedDate()
def GetSize(context, path=None): """ Get the size of the resource """ path = Conversions.StringValue(path) #print path return FtssXsltBase.FetchBaseObject(context.processor,path).getSize()
def GetContent(context, path=None, encoding="LATIN-1"): """ Get the string content of this resource as an XPath string object
path - the repository path to the object to retrieve encoding - the encoding to use in converting the content to Unicode """ #In future, this function may/should validate that the result is #A good XPath string path = Conversions.StringValue(path) base = FtssXsltBase.FetchBaseObject(context.processor, path) return unicode(base.getContent(), encoding)
def GetRawContent(context, path=None, encoding="iso-8859-1"): """ Get the string content of this resource as a raw Unicode object
path - the repository path to the object to retrieve encoding - the encoding to use in converting the content to Unicode """ #In current implementation, this is identical to GetContent #But this function does signal a different intent: to return an #Arbitrary Unicode object rather than a conforming XPath string path = Conversions.StringValue(path) base = FtssXsltBase.FetchBaseObject(context.processor, path) return unicode(base.getContent(), encoding)
def GetMetaDataResource(context, path=None): """ Get the string meta data of this resource """ path = Conversions.StringValue(path) base = FtssXsltBase.FetchBaseObject(context.processor, path) return base.getMetaDataResource().getAbsolutePath()
def GetParent(context, path=None): """ Get the parent container of this resource """ path = Conversions.StringValue(path) base = FtssXsltBase.FetchBaseObject(context.processor, path) parent = base.getParent() if parent: return parent.getAbsolutePath() return ""
## def getRoot(self): ## """ ## Get a reference to the system repository ## """ ## self._verifyTx() ## return self._cache.getResource(self._path._documentRoot)
def GetAliases(context, path=None): """ Get a list of alias objects that reference this object """ path = Conversions.StringValue(path) base = FtssXsltBase.FetchBaseObject(context.processor, path)
doc = None if hasattr(context.node,'ownerDocument'): doc = context.node.ownerDocument or context.node else: doc = context.node while doc.parentNode: doc = doc.parentNode
df = doc.createDocumentFragment() for token in base.getAliases(): df.appendChild(doc.createTextNode(token.getAbsolutePath()))
return df.childNodes
## ##################################### ## #ACL Interfaces ## ##################################### class AddAclElement(XsltElement): """ Add access for an given identifer and level to the ACL of a resource """
content = ContentInfo.Empty
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description=('The path of the resource where to add the ACL.' ' If not given, the source document is used.')), 'ident' : AttributeInfo.StringAvt( required=1, description='The ACL identifier (a user or group identifier)'), 'access' : AttributeInfo.StringAvt( required=1, description=('The access level or type. The built-in ACL' ' levels are "read" | "write" | "write user model"' ' | "execute" | "delete" | "change owner"' ' | "change permissions"')), 'allowed' : AttributeInfo.YesNoAvt( default='yes', description='Allow or deny this type of access'), }
def instantiate(self, context, processor): context.setProcessState(self)
path = self._path.evaluate(context) ident = self._ident.evaluate(context) access = self._access.evaluate(context) allowed = self._allowed.evaluate(context)
obj = FtssXsltBase.FetchBaseObject(processor, path) obj.addAcl(access, ident, allowed)
return
def GetAclByKey(context,aclKey,path=None): aclKey = Conversions.StringValue(aclKey) return _GetAcl(context,aclKey,path)
def GetAllAcl(context,path=None): return _GetAcl(context,None,path)
def _GetAcl(context,aclKey=None,path=None): """ Get a list of users and groups specified by the aclKey """ path = Conversions.StringValue(path) base = FtssXsltBase.FetchBaseObject(context.processor, path)
proc = context.processor proc.pushResultTree(context.currentInstruction.baseUri) try: def _serializeAclKey(writer, key, data): writer.startElement("AclKey") writer.attribute("key", key) for ident,allowed in data.items(): allowed = allowed == AclConstants.ALLOWED and "1" or "0" writer.startElement("Ident") writer.attribute("name", ident) writer.attribute("allowed", allowed) writer.endElement("Ident") writer.endElement("AclKey")
if aclKey: _serializeAclKey(proc.writers[-1], aclKey, base.getAcl(aclKey)) else: proc.writers[-1].startElement('Acl') acl = base.getAcl() for key,data in acl.items(): _serializeAclKey(proc.writers[-1], key, data) proc.writers[-1].endElement('Acl') finally: rtf = proc.popResult() return rtf.childNodes
class SetAclElement(XsltElement): """ Reset the ACL for a resource, and then optionally add access for an given identifer and level to the ACL of a resource """
content = ContentInfo.Empty
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description=('The path of the resource where to set the ACL.' ' If not given, the source document is used.')), 'ident' : AttributeInfo.StringAvt( required=1, description='The ACL identifier (user or group)'), 'access' : AttributeInfo.StringAvt( required=1, description=('The access level or type. The built-in ACL' ' levels are "read" | "write" | "write user model"' ' | "execute" | "delete" | "change owner"' ' | "change permissions"')), 'allowed' : AttributeInfo.YesNoAvt( default='yes', description='Allow or deny this type of access'), }
def instantiate(self, context, processor): context.setProcessState(self)
path = self._path.evaluate(context) ident = self._ident.evaluate(context) access = self._access.evaluate(context) allowed = self._allowed.evaluate(context)
obj = FtssXsltBase.FetchBaseObject(processor, path) obj.setAcl(access, ident, allowed)
return
def VerifyAcl(context, aclKey, path=None, verifyTraverse=1): """ Verify that the testAclIdent is in the acl list specified by the acl key for this object. """ path = Conversions.StringValue(path) aclKey = Conversions.StringValue(aclKey) verifyTraverse = Conversions.BooleanValue(verifyTraverse) try: obj = FtssXsltBase.FetchBaseObject(context.processor, path) obj.verifyAcl(aclKey, verifyTraverse) except FtServerServerException, e: return 0 return 1
## def getAclIdentifiers(self): ## """ ## Get a list of all acl identifiers for the current users ## """ ## self._verifyTx() ## return self._driver.getAclIdentifiers()
## def getAclIdent(self): ## """ ## Get the ACL identifier for the current user ## """ ## self._verifyTx() ## return self._driver.getAclIdent()
class InheritAclElement(XsltElement): """ Set the inheritance ACL of a resource """
content = ContentInfo.Empty
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description=('The path of the resource where to set the ACL.' ' If not given, the source document is used.')), 'access' : AttributeInfo.StringAvt( required=1, description=('The access level or type. The built-in ACL' ' levels are "read" | "write" | "write user model"' ' | "execute" | "delete" | "change owner"' ' | "change permissions"')), }
def instantiate(self, context, processor): context.setProcessState(self)
path = self._path.evaluate(context) access = self._access.evaluate(context)
obj = FtssXsltBase.FetchBaseObject(processor, path) obj.inheritAcl(access)
return
class RemoveAclElement(XsltElement): """ Remove the ACL from a resource """
content = ContentInfo.Empty
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description=('The path of the resource where to add the ACL.' ' If not given, the source document is used.')), 'ident' : AttributeInfo.StringAvt( required=1, description='The ACL identifier (user or group)'), 'access' : AttributeInfo.StringAvt( required=1, description=('The access level or type. The built-in ACL' ' levels are "read" | "write" | "write user model"' ' | "execute" | "delete" | "change owner"' ' | "change permissions"')), }
def instantiate(self, context, processor): context.setProcessState(self)
path = self._path.evaluate(context) ident = self._ident.evaluate(context) access = self._access.evaluate(context)
obj = FtssXsltBase.FetchBaseObject(processor, path) obj.removeAcl(access, ident)
return
def GetOwner(context,path=None): """ Get the owner of the resource """ path = Conversions.StringValue(path) #print path owner = FtssXsltBase.FetchBaseObject(context.processor,path).getOwner() if owner == None: return "" return owner.getAbsolutePath()
class SetOwnerElement(XsltElement): """ Change the owner of a resource """ content = ContentInfo.Empty
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description=('The path of the resource to change the IMT of.' ' If not given, the source document is used.')), 'owner' : AttributeInfo.UriReferenceAvt( required=1, description='The path to the new owner'), }
def instantiate(self, context, processor): context.setProcessState(self)
# This may return None (the 'default' from AttributeInfo) path = self._path.evaluate(context) owner = FtssXsltBase.FetchBaseObject(processor, self._owner.evaluate(context))
obj = FtssXsltBase.FetchBaseObject(processor, path) obj.setOwner(owner)
return
## ########################################## ## #XML Interfaces ## ##########################################
## def applyXslt(self, stylesheets, params=None, ignorePis=1, ## extParams=None, extModules=None): ## """ ## applies the specified stylesheets (with the specified transformation parameters) ## on the DOM representation of the raw file resource ## """ ## extModules = extModules or []
## params = params or {} ## params[(RESERVED_NAMESPACE, 'display-path')] = self._path.displayPath ## params[(RESERVED_NAMESPACE, 'absolute-path')] = self._path.absolutePath ## extParams = extParams or {} ## extParams[(RESERVED_NAMESPACE, 'path')] = self._path.absolutePath
## #First map the stylesheets ## if type(stylesheets) not in [type([]),type(())]: ## stylesheets = (stylesheets,)
## p = Ft.Xml.Xslt.Processor.Processor() ## p.extensionParams = extParams
## p._repository = self.getRoot() ## mods = p.registerExtensionModules( ## ['Ft.Server.Server.Xslt']+extModules ## )
## p.setDocumentReader( ## FtssInputSource.NonvalidatingReader ## )
## for s in stylesheets: ## if type(s) == type(''): ## #It is a URI ## isrc = FtssInputSource.FtssInputSourceFactory.fromUri(self.getAbsolutePath(),self._driver) ## isrc = isrc.resolve(s,'','Repo ApplyXlst') ## p.appendStylesheet(isrc) ## elif hasattr(s,'toStylesheet'): ## #A document reference ## p.appendStylesheetInstance(s.toStylesheet(self)) ## elif hasattr(s,'asStylesheet'): ## #A reference to a document in the system ## p.appendStylesheetInstance(s.asStylesheet())
## isrc = FtssInputSource.FtssInputSourceFactory.fromUri(self.getAbsolutePath(),self._driver) ## rt = p.run(isrc, ignorePis=ignorePis, topLevelParams=params) ## if extParams is not None and hasattr(p, 'extensionParams'): ## extParams.update(p.extensionParams)
## imt = p._lastOutputParams.mediaType ## return rt, imt
def AsDom(context,path=None): """Get an RTF that is as a live version of the document function. NOTE: This does not work like document(), where document() caches RTFs and always returns the same one per the spec, as-dom will return the latest in the repo."""
path = Conversions.StringValue(path) base = FtssXsltBase.FetchBaseObject(context.processor, path) return [base.asDom()]
## def toXPathContext(self,nss=None): ## """ ## Create an XPath Context with the src of this RawFile ## """ ## self._verifyTx() ## return self._driver.getContext(self._path,nss)
class XUpdateElement(XsltElement): """ Allows XML content to be updated with the XUpdate protocol
The content of the fcore:xupdate element is a template for the XUpdate document source. """
content = ContentInfo.Template
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description='The path of the resource to apply XUpdate to'), }
def instantiate(self, context, processor): context.setProcessState(self)
# This may return None (the 'default' from AttributeInfo) path = self._path.evaluate(context)
stream = cStringIO.StringIO() params = processor.writers[-1]._outputParams.clone() params.method = (None, u'xml') processor.addHandler(params, stream) self.processChildren(context, processor) processor.removeHandler() content = stream.getvalue() #print content obj = FtssXsltBase.FetchBaseObject(processor, path) obj.xUpdate(content) return
## ##################################### ## #Query Interfaces ## ##################################### ## def hasResource(self,path): ## """ ## Query if the system has a resource specified by the path ## """ ## self._verifyTx() ## path = self._basePath.normalize(path,0) ## if self._driver.hasResource(path): ## return self._driver.getType(path) ## return 0
## ##################################### ## #Retrieval Interfaces ## ##################################### ## def fetchResource(self,path): ## """ ## Fetch an resource from the system. ## """ ## self._verifyTx() ## path = self._basePath.normalize(path) ## return self._fetchResource(path)
## ########################################## ## #Mutation Interfaces ## ##########################################
class SetContentElement(XsltElement): """ Set the string content of the resource with the given path, document definition and type. The output of the body of the element makes up the content of the resulting document, either literally or as a result of the contained XSLT instructions (according to the literal-content attribute). All the attributes available on the xsl:output instruction are also allowed on this element, in order to control the creation of the content. If the resource is a document whose definition specifies a full text index, then the content is re-indexed
""" content = ContentInfo.Template
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description='The path of the resource to modify'), 'literal-content' : AttributeInfo.YesNo( default='no', description=('If yes, treat the content literally, i.e. do not' ' execute any XSLT instructions or extensions')), 'base-path' : AttributeInfo.UriReferenceAvt( description='The base to use for relative paths'), 'content-as-string' : AttributeInfo.YesNo( description='Deprecated - use literal-content now'),
'method' : AttributeInfo.QNameAvt(), 'version' : AttributeInfo.NMTokenAvt(), 'encoding' : AttributeInfo.StringAvt(), 'omit-xml-declaration' : AttributeInfo.YesNoAvt(), 'standalone' : AttributeInfo.YesNoAvt(), 'doctype-public' : AttributeInfo.StringAvt(), 'doctype-system' : AttributeInfo.StringAvt(), 'cdata-section-elements' : AttributeInfo.QNamesAvt(), 'indent' : AttributeInfo.YesNoAvt(), 'media-type' : AttributeInfo.StringAvt(), }
doesSetup = True def setup(self): self._output_parameters = OutputParameters.OutputParameters() return
def instantiate(self, context, processor): context.setProcessState(self)
path = self._path.evaluate(context) basePath = self._base_path.evaluate(context) literal_content = self._literal_content if self._content_as_string is not None: print "fcore:set-content/@content-as-string is deprecated. Please use @literal-content instead" literal_content = self._content_as_string
stream = cStringIO.StringIO() #op = processor.writers[-1]._outputParams.clone() #op.method = (EMPTY_NAMESPACE, 'xml') #op.omitXmlDeclaration = "yes" self._output_parameters.avtParse(self, context) processor.addHandler(self._output_parameters, stream) if literal_content: from Ft.Xml.Xslt.CopyOfElement import CopyNode for child in self.children: CopyNode(processor, child) else: self.processChildren(context, processor) processor.removeHandler()
text = stream.getvalue()
if basePath: base = FtssXsltBase.FetchBaseObject(processor, basePath) if not base.isResourceType(ResourceTypes.ResourceType.CONTAINER): base = base.getParent() obj = base.fetchResource(processor, path) else: obj = FtssXsltBase.FetchBaseObject(processor, path) obj.setContent(text)
return
class DeleteElement(XsltElement): """ Delete this resource """
content = ContentInfo.Empty
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description='The path of the resource to delete'), }
def instantiate(self, context, processor): context.setProcessState(self) # This may return None (the 'default' from AttributeInfo) path = self._path.evaluate(context) obj = FtssXsltBase.FetchBaseObject(processor, path) print "Deleting resource", obj.getAbsolutePath() obj.delete() return
## ################################### ## #Creation Interfaces ## ###################################
class AddAliasElement(XsltElement): """ Adds an alias for this resource to the repository. """ content = ContentInfo.Empty
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description=('The path of the resource to add an alias too.' ' Defaults to current.')), 'alias' : AttributeInfo.UriReferenceAvt( required=1, description='The path of the new alias'), 'doc-def' : AttributeInfo.UriReferenceAvt( description='The document definition to use for the new alias'), }
def instantiate(self, context, processor): context.setProcessState(self)
# This may return None (the 'default' from AttributeInfo) path = self._path.evaluate(context) alias = self._alias.evaluate(context) dd = self._doc_def.evaluate(context)
obj = FtssXsltBase.FetchBaseObject(processor, path)
obj.addAlias(alias,dd)
return
## ###################################### ## #Delete Interface ## ###################################### class RemoveAliasElement(XsltElement): """ Remove an alias for this resource from the repository. """
content = ContentInfo.Empty
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description=('The path of the resource to add an alias to.' ' Defaults to current.')), 'alias' : AttributeInfo.UriReferenceAvt( required=1, description='The path of the new alias'), }
def instantiate(self, context, processor): context.setProcessState(self)
# This may return None (the 'default' from AttributeInfo) path = self._path.evaluate(context) alias = self._alias.evaluate(context)
obj = FtssXsltBase.FetchBaseObject(processor, path)
obj.removeAlias(alias)
return
## ######################################## ## #Temporary file Interfaces ## ######################################## class MarkTemporaryElement(XsltElement): """ Mark a resource as temporary """
content = ContentInfo.Empty
legalAttrs = { 'path' : AttributeInfo.UriReferenceAvt( description=('The path of the resource to mark as temporary.' ' Defaults to current.')), 'time-to-live' : AttributeInfo.NumberAvt( required=1, description=('The time in seconds from now that the resource is' ' to live.')), }
def instantiate(self, context, processor): context.setProcessState(self)
# This may return None (the 'default' from AttributeInfo) path = self._path.evaluate(context) ttl = int(self._time_to_live.evaluate(context))
obj = FtssXsltBase.FetchBaseObject(processor, path) #print obj #print ttl obj.markTemporary(ttl)
return
ExtFunctions = { (SCORE_NS, 'get-content'): GetContent, (SCORE_NS, 'get-raw-content'): GetRawContent, (SCORE_NS, 'resource-type'): ResourceType, (SCORE_NS, 'get-metadata-resource'): GetMetaDataResource, (SCORE_NS, 'get-creation-date'): GetCreationDate, (SCORE_NS, 'get-last-modified-date'): GetLastModifiedDate, (SCORE_NS, 'get-size'): GetSize, (SCORE_NS, 'get-absolute-path'): GetAbsolutePath, (SCORE_NS, 'get-name'): GetName, (SCORE_NS, 'get-owner'): GetOwner, (SCORE_NS, 'get-acl-by-key'): GetAclByKey, (SCORE_NS, 'get-all-acl'): GetAllAcl, (SCORE_NS, 'get-parent'): GetParent, (SCORE_NS, 'get-aliases'): GetAliases, (SCORE_NS, 'verify-acl'): VerifyAcl, (SCORE_NS, 'as-dom'): AsDom, }
ExtElements = { (SCORE_NS, 'set-content'): SetContentElement, (SCORE_NS, 'x-update'): XUpdateElement, (SCORE_NS, 'set-acl'): SetAclElement, (SCORE_NS, 'inherit-acl'): InheritAclElement, (SCORE_NS, 'add-acl'): AddAclElement, (SCORE_NS, 'remove-acl'): RemoveAclElement, (SCORE_NS, 'mark-temporary'): MarkTemporaryElement, (SCORE_NS, 'add-alias'): AddAliasElement, (SCORE_NS, 'remove-alias'): RemoveAliasElement, (SCORE_NS, 'set-owner'): SetOwnerElement, (SCORE_NS, 'delete-resource'): DeleteElement, }
|