Viewing file: test_helper.py (22.47 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
from Ft.Xml import XPath, Domlette
from Ft.Xml.XPath import Conversions
from Ft.Server import FTSERVER_NAMESPACE from Ft.Server.Common import Schema, ResourceTypes, CreationParams, ValidationInfo, DocumentReference
from Rdf import RdfHelper from Ft.Rdf import RDF_MS_BASE
from Server import NullLogger
import sha
def GetAnonRepo(tester): from Ft.Server.Server import SCore return SCore.GetRepository(None, None, NullLogger.NullLogger(), tester.test_data['properties'])
def RetrieveSession(tester,sid,key): from Ft.Server.Server import SCore return SCore.RetrieveSession(sid, key, NullLogger.NullLogger(), tester.test_data['properties'])
def GetRepo(tester,userName = None, passwd = None): if userName is None: userName = tester.test_data['userName'] if passwd is None: passwd = tester.test_data['password'] passwd = sha.new(passwd).hexdigest() from Ft.Server.Server import SCore return SCore.GetRepository(userName, passwd, NullLogger.NullLogger(), tester.test_data['properties'])
def TestMetaData(tester, src, path, resourceType, aclKeys, owner, imt, size, others, docDef= None, ): dom = Domlette.NonvalidatingReader.parseString(src,path) mdContext = XPath.Context.Context(dom,processorNss={'ftss':FTSERVER_NAMESPACE})
#Make sure its MD is correct p = XPath.Evaluate('/ftss:MetaData/@path',context=mdContext) tester.compare(1,len(p)) tester.compare(path,Conversions.StringValue(p[0]))
p = XPath.Evaluate('/ftss:MetaData/@type',context=mdContext) tester.compare(1,len(p)) tester.compare(Schema.g_rdfResourceTypes[resourceType],Conversions.StringValue(p[0]))
p = XPath.Evaluate('/ftss:MetaData/ftss:Acl/ftss:Access/@type',context=mdContext)
testAcl = {} for k in p: testAcl[Conversions.StringValue(k)] = 1
tester.compare(len(aclKeys),len(testAcl.keys())) for k in testAcl.keys(): tester.compareIn(aclKeys,k)
p = XPath.Evaluate('/ftss:MetaData/ftss:Owner',context=mdContext) tester.compare(1,len(p)) tester.compare(owner,Conversions.StringValue(p[0]))
p = XPath.Evaluate('/ftss:MetaData/ftss:Imt',context=mdContext) tester.compare(1,len(p)) tester.compare(imt,Conversions.StringValue(p[0]))
p = XPath.Evaluate('/ftss:MetaData/ftss:Size',context=mdContext) tester.compare(1,len(p)) tester.compare(size,Conversions.StringValue(p[0]))
p = XPath.Evaluate('/ftss:MetaData/@creation-date',context=mdContext) tester.compare(1,len(p)) cd = Conversions.StringValue(p[0])
p = XPath.Evaluate('/ftss:MetaData/ftss:LastModifiedDate',context=mdContext) tester.compare(1,len(p)) md = Conversions.StringValue(p[0])
p = XPath.Evaluate('/ftss:MetaData/@document-definition',context=mdContext) if resourceType not in [ResourceTypes.ResourceType.RAW_FILE, ResourceTypes.ResourceType.URI_REFERENCE_FILE]: tester.compare(1,len(p)) if docDef: tester.compare(docDef,Conversions.StringValue(p[0])) else: tester.compare(Schema.NULL_DOCDEF,Conversions.StringValue(p[0])) else: tester.compare(0,len(p))
return cd,md
def CompareNsMap(tester,original,actual):
#Compare the dictionarys. #Cause there may be a few extra, we need to #Just make sure actual has everything that orginal has #tester.compare(len(original.keys()),len(actual.keys()))
for k,v in original.items(): tester.compare(1,actual.has_key(k)) tester.compare(v,actual[k]) def CompareRdfMaps(tester,original,actual): #Compare a list of lists
tester.compare(len(original),len(actual)) for s,p,o,t in original: found = 0 for ts,tp,to,tt in actual: if ts == s and tp == p and to == o and tt == t: found = 1 break if not found: tester.error("Mapping %s not found" % str((s,p,o,t)))
def CompareCreationParams(tester,original,actual): tester.compare(original.fullTextIndex,actual.fullTextIndex) tester.compare(original.enforceSchema,actual.enforceSchema) CompareValidationInfo(tester,original.validationInfo,actual.validationInfo)
tester.compare(len(original.extModules),len(actual.extModules)) for m in original.extModules: tester.compareIn(actual.extModules,m)
def CompareValidationInfo(tester,original,actual): tester.compare(original.validationType,actual.validationType) if original.validationType == ValidationInfo.ValidationType.NONE: return elif original.validationType == ValidationInfo.ValidationType.SCHEMATRON: CompareDocumentReference(tester,original.documentReference,actual.documentReference) elif original.validationType == ValidationInfo.ValidationType.DTD: CompareDocumentReference(tester,original.documentReference,actual.documentReference) else: raise "Finish"
def CompareDocumentReference(tester,original,actual):
tester.compare(original.referenceType,actual.referenceType) if original.referenceType == DocumentReference.DocumentReferenceType.INTERNAL: tester.compare(original.uri,actual.uri) elif original.referenceType == DocumentReference.DocumentReferenceType.EXTERNAL: tester.compare(original.uri,actual.uri) elif original.referenceType == DocumentReference.DocumentReferenceType.STRING: tester.compare(original.baseUri,actual.baseUri) tester.compare(original.data,actual.data) else: raise "Unknown"
def TestContainerRdf(tester,repo,path,size,owner,creationDate,modifiedDate,children): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(path,None,None) actual = RdfHelper.make_tuple(actual,'')
expected = GetContainerTuple(path,str(size),owner,creationDate,modifiedDate,children)
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def TestRawFileRdf(tester,repo,path,size,owner,creationDate,modifiedDate,imt): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(path,None,None) actual = RdfHelper.make_tuple(actual, '')
expected = GetRawFileTuple(path,size,creationDate,modifiedDate,imt,owner)
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def TestXmlDocumentRdf(tester,repo,path,size,owner,creationDate,modifiedDate,imt,docDef,docDefStmts=None): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(path,None,None) actual = RdfHelper.make_tuple(actual, '')
if creationDate is None: creationDate = model.complete(path,Schema.CREATION_DATE,None)[0].object if modifiedDate is None: modifiedDate = model.complete(path,Schema.MODIFIED_DATE,None)[0].object
expected = GetXmlDocumentTuple(path,size,creationDate,modifiedDate,imt,owner,docDef,docDefStmts = docDefStmts)
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def TestUriReferenceFileRdf(tester,repo,path,size,owner,creationDate,modifiedDate,imt,ref): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(path,None,None) actual = RdfHelper.make_tuple(actual, '')
if creationDate is None: creationDate = model.complete(path,Schema.CREATION_DATE,None)[0].object if modifiedDate is None: modifiedDate = model.complete(path,Schema.MODIFIED_DATE,None)[0].object
expected = GetUriReferenceFileTuple(path,size,creationDate,modifiedDate,imt,owner,ref=ref)
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def TestDocumentDefinitionRdf(tester,repo,name,size,owner,creationDate,modifiedDate,creationParams,parents=None,typ=None): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(name,None,None) actual = RdfHelper.make_tuple(actual, '')
expected = GetDocumentDefinitionTuple(name, size, creationDate, modifiedDate, owner, creationParams, parents, typ)
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def TestCommandRdf(tester,repo,path,size,owner,creationDate,modifiedDate,docDef,commandName,subCommands): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(path,None,None) actual = RdfHelper.make_tuple(actual, '')
expected = GetCommandTuple(path, size, creationDate, modifiedDate, owner, docDef, commandName, subCommands)
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def TestServerRdf(tester,repo,path,size,owner,creationDate,modifiedDate,docDef,serverName,handler,running): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(path,None,None) actual = RdfHelper.make_tuple(actual, '')
expected = GetServerTuple(path, size, creationDate, modifiedDate, owner, docDef, serverName, handler, running, )
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def TestUserRdf(tester,repo,path,size,owner,creationDate,modifiedDate,imt,docDef,userName): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(path,None,None) actual = RdfHelper.make_tuple(actual, '')
if creationDate is None: creationDate = model.complete(path,Schema.CREATION_DATE,None)[0].object if modifiedDate is None: modifiedDate = model.complete(path,Schema.MODIFIED_DATE,None)[0].object
expected = GetUserTuple(path,size,creationDate,modifiedDate,imt,owner,docDef,userName)
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def TestGroupRdf(tester,repo,path,size,owner,creationDate,modifiedDate,imt,docDef,groupName,members): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(path,None,None) actual = RdfHelper.make_tuple(actual, '')
if creationDate is None: creationDate = model.complete(path,Schema.CREATION_DATE,None)[0].object if modifiedDate is None: modifiedDate = model.complete(path,Schema.MODIFIED_DATE,None)[0].object
expected = GetGroupTuple(path,size,creationDate,modifiedDate,imt,owner,docDef,groupName,members)
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def TestAliasRdf(tester,repo,path,size,owner,creationDate,modifiedDate,imt,docDef,reference): #Make sure the proper RDF was generated
model = repo.getModel() actual = model.complete(path,None,None) actual = RdfHelper.make_tuple(actual, '')
if creationDate is None: creationDate = model.complete(path,Schema.CREATION_DATE,None)[0].object if modifiedDate is None: modifiedDate = model.complete(path,Schema.MODIFIED_DATE,None)[0].object
expected = GetAliasTuple(path,size,creationDate,modifiedDate,imt,owner,docDef,reference)
tester.compare(expected, actual, func=RdfHelper.compare_tuple)
def GetContainerTuple(path,size,owner,creationDate,modifiedDate,children):
t = [(path, Schema.DOCDEF, Schema.NULL_DOCDEF, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.CONTENT_SIZE, size, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.TYPE, Schema.g_rdfResourceTypes[ResourceTypes.ResourceType.CONTAINER], '', Schema.SYSTEM_SOURCE_URI), (path, Schema.IMT, 'text/xml', '', Schema.SYSTEM_SOURCE_URI), (path, Schema.OWNER, owner, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.CREATION_DATE, creationDate, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.MODIFIED_DATE, modifiedDate, '', Schema.SYSTEM_SOURCE_URI)] #Datastore container doesn't reflect parent/child relationships in RDF #for c in children: # t.append((path, # Schema.CONTAINER_CHILD, # c + ';metadata', # '', # Schema.SYSTEM_SOURCE_URI)) return t
def GetRawFileTuple(path,size,creationDate,modifiedDate,imt,owner,additions=None,type=ResourceTypes.ResourceType.RAW_FILE):
t = [(path, Schema.CONTENT_SIZE, str(size), '', Schema.SYSTEM_SOURCE_URI), (path, Schema.TYPE, Schema.g_rdfResourceTypes[type], '', Schema.SYSTEM_SOURCE_URI), (path, Schema.OWNER, owner, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.IMT, imt, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.CREATION_DATE, creationDate, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.MODIFIED_DATE, modifiedDate, '', Schema.SYSTEM_SOURCE_URI)]
if additions: t.extend(additions) return RdfHelper.convert_tuple(t)
def GetXmlDocumentTuple(path,size,creationDate,modifiedDate,imt,owner,docDef,docDefStmts = None): additions=[(path, Schema.DOCDEF, docDef, '', Schema.SYSTEM_SOURCE_URI), ]
if docDefStmts: additions.extend(docDefStmts)
return GetRawFileTuple(path, size, creationDate, modifiedDate, imt, owner, additions = additions, type=ResourceTypes.ResourceType.XML_DOCUMENT, )
def GetUriReferenceFileTuple(path,size,creationDate,modifiedDate,imt,owner,ref): additions=[(path, Schema.URI_REFERENCE_LOCATION, ref, '', Schema.SYSTEM_SOURCE_URI), ]
return GetRawFileTuple(path, size, creationDate, modifiedDate, imt, owner, additions = additions, type=ResourceTypes.ResourceType.URI_REFERENCE_FILE, )
def GetCommandTuple(path,size,creationDate,modifiedDate,owner,docDef,name,subCommands): additions=[(path, Schema.DOCDEF, docDef or Schema.NULL_DOCDEF, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.COMMAND_NAME, name, '', Schema.SYSTEM_SOURCE_URI) ]
for s in subCommands: additions.append((path, Schema.COMMAND_SUBCOMMAND, s, '', Schema.SYSTEM_SOURCE_URI))
return GetRawFileTuple(path, size, creationDate, modifiedDate, 'text/xml', owner, additions = additions, type=ResourceTypes.ResourceType.COMMAND, )
def GetServerTuple(path,size,creationDate,modifiedDate,owner,docDef,name,handler,running): additions=[(path, Schema.DOCDEF, docDef or Schema.NULL_DOCDEF, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.SERVER_NAME, name, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.SERVER_HANDLER, handler, '', Schema.SYSTEM_SOURCE_URI) ]
if running: additions.append((path, Schema.SERVER_RUNNING, '1', '', Schema.SYSTEM_SOURCE_URI)) else: additions.append((path, Schema.SERVER_RUNNING, '0', '', Schema.SYSTEM_SOURCE_URI))
return GetRawFileTuple(path, size, creationDate, modifiedDate, 'text/xml', owner, additions = additions, type=ResourceTypes.ResourceType.SERVER, )
def GetDocumentDefinitionTuple(path,size,creationDate,modifiedDate,owner,creationParams,parents,typ): additions=[(path, Schema.DOCDEF, Schema.NULL_DOCDEF, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.FULL_TEXT_INDEX, str(creationParams.fullTextIndex), '', Schema.SYSTEM_SOURCE_URI) ]
if creationParams.validationInfo.validationType == ValidationInfo.ValidationType.NONE: additions.append((path, Schema.XML_VALIDATION_TYPE, Schema.NO_VALIDATION, '', Schema.SYSTEM_SOURCE_URI)) elif creationParams.validationInfo.validationType == ValidationInfo.ValidationType.SCHEMATRON: additions.append((path, Schema.XML_VALIDATION_TYPE, Schema.SCHEMATRON_VALIDATION, '', Schema.SYSTEM_SOURCE_URI)) elif creationParams.validationInfo.validationType == ValidationInfo.ValidationType.DTD: additions.append((path, Schema.XML_VALIDATION_TYPE, Schema.DTD_VALIDATION, '', Schema.SYSTEM_SOURCE_URI)) for e in creationParams.extModules: additions.append((path, Schema.XSLT_EXT_MODULE, e, '', Schema.SYSTEM_SOURCE_URI))
additions.append((path, Schema.ENFORCE_RDF_SCHEMA, str(creationParams.enforceSchema), '', Schema.SYSTEM_SOURCE_URI))
parents = parents or [] for parent in parents: additions.append((path, Schema.BASE_DOCDEF, parent, '', Schema.SYSTEM_SOURCE_URI))
return GetRawFileTuple(path, size, creationDate, modifiedDate, 'text/xml', owner, additions = additions, type=typ or ResourceTypes.ResourceType.XPATH_DOCUMENT_DEFINITION, )
def GetUserTuple(path,size,creationDate,modifiedDate,imt,owner,docDef,userName): additions=[(path, Schema.DOCDEF, docDef, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.USER_NAME, userName, '', Schema.SYSTEM_SOURCE_URI), ]
return GetRawFileTuple(path, size, creationDate, modifiedDate, imt, owner, additions = additions, type=ResourceTypes.ResourceType.USER, )
def GetGroupTuple(path,size,creationDate,modifiedDate,imt,owner,docDef,groupName,members): additions=[(path, Schema.DOCDEF, docDef, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.GROUP_NAME, groupName, '', Schema.SYSTEM_SOURCE_URI), ]
for m in members: additions.append((path, Schema.GROUP_MEMBER, m + ';metadata', '', Schema.SYSTEM_SOURCE_URI))
return GetRawFileTuple(path, size, creationDate, modifiedDate, imt, owner, additions = additions, type=ResourceTypes.ResourceType.GROUP, )
def GetAliasTuple(path,size,creationDate,modifiedDate,imt,owner,docDef,reference): additions=[(path, Schema.DOCDEF, docDef, '', Schema.SYSTEM_SOURCE_URI), (path, Schema.ALIAS_REFERENCE, reference, '', Schema.SYSTEM_SOURCE_URI), ]
return GetRawFileTuple(path, size, creationDate, modifiedDate, imt, owner, additions = additions, type=ResourceTypes.ResourceType.ALIAS, )
|