Viewing file: FtssModel.py (9.9 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Source: /var/local/cvsroot/4Suite/Ft/Server/Server/Drivers/FtssModel.py,v $ $Revision: 1.29 $ $Date: 2004/07/09 04:03:17 $ """ The driver for the 4Suite RDF DBMS
Copyright 2002 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
import types from Ft.Rdf import BNODE_BASE, BNODE_BASE_LEN from Ft.Lib import Uuid from Ft.Server.Common import AclConstants
class FtssModel: """A wrapper that combines the model of the system and user models. The system model is read only"""
def __init__(self, sysModel, userModel, acl, path): self._sysModel = sysModel self._userModel = userModel self._acl = acl self._path = path
#For Versa self._driver = FtssModelDriver(sysModel._driver,userModel._driver) return def addContainer(self, container, scope=None): self._acl.verifyAcl(self._path, AclConstants.WRITE_USER_MODEL_ACCESS, 0) return self._userModel.addContainer(container, scope)
def extractContainer(self, uri, scope=None): cont = self._userModel.extractContainer(uri, scope) if not cont: cont = self._systemModel.extractContainer(uri, scope) return cont
def add(self, statements, checkSchema=1): if not isinstance(statements, types.ListType): statements = [statements] scopes = {} #for s in statements: scopes[s.scope] = None #FIXME: temp kludge for s in statements: if s.subject[:1] == "/": s.subject = "ftss://" + s.subject if s.scope[:1] == "/": s.scope = "ftss://" + s.scope scopes[s.scope] = None for k in scopes.keys(): if k: path = self._path.normalize(k+';no-traverse') else: path = self._path self._acl.verifyAcl(path, AclConstants.WRITE_USER_MODEL_ACCESS, 0) #self._acl.verifyAcl(self._path, AclConstants.WRITE_USER_MODEL_ACCESS, 0) former_state = self._userModel.setSchemaActivity(checkSchema) result = self._userModel.add(statements) self._userModel.setSchemaActivity(former_state) return result
def enableSchema(self): self._userModel.enableSchema() self._sysModel.enableSchema()
def disableSchema(self): self._userModel.disableSchema() self._sysModel.disableSchema()
def checkConsistency(self): self._userModel.checkConsistency() self._sysModel.checkConsistency()
def remove(self, statements): self._acl.verifyAcl(self._path, AclConstants.WRITE_USER_MODEL_ACCESS, 0) return self._userModel.remove(statements)
def removePattern(self, subject, predicate, object, statementUri=None, scope=None, **flags): #FIXME: Broken for regex patterns in scope. Not sure how to handle this case, anyway path = not not scope and self._path.normalize(scope+';no-traverse') or self._path self._acl.verifyAcl(path, AclConstants.WRITE_USER_MODEL_ACCESS, 0) return apply(self._userModel.removePattern, (subject, predicate, object, statementUri, scope), flags)
def contains(self, statement): return self._userModel.contains or self._sysModel.container(statement)
def containsPattern(self, subject, predicate, object, statementUri=None, scope=None, **flags): res = apply(self._userModel.containsPattern, (subject, predicate, object, statementUri, scope), flags) if not res: return apply(self._sysModel.containsPattern, (subject, predicate, object, statementUri, scope), flags) return res
def statements(self, scope=None): return self._userModel.complete(None, None, None, None, scope) + self._sysModel.complete(None, None, None, None, scope)
def size(self, scope=None): return self._userModel.size(scope) + self._sysModel.size()
def complete(self, subject, predicate, object, statementUri=None, scope=None, **flags): res = apply(self._userModel.complete, (subject, predicate, object, statementUri, scope), flags) res2 = apply(self._sysModel.complete, (subject, predicate, object, statementUri, scope), flags) res.extend(res2) return res
def exclude(self, subject, predicate, object, statementUri=None, scope=None, **flags): res = apply(self._userModel.exclude, (subject, predicate, object, statementUri, scope), flags) res2 = apply(self._sysModel.exclude, (subject, predicate, object, statementUri, scope), flags) res.extend(res2) return res
def generateUri(self): """ Generates URIs on the fly, e.g. for reified statements. Do *not* use this to generate anonymous resources. Use generateBnode instead The default method is to generate a UUID URN, but this can be easily overridden.""" return 'urn:uuid:'+Uuid.UuidAsString(Uuid.GenerateUuid())
def generateBnode(self): """ Generates blank nodes (bnodes), AKA anonymous resources """ return BNODE_BASE + Uuid.UuidAsString(Uuid.GenerateUuid())
def isBnodeLabel(self, label): """ Determines whether a label is a blank node """ return label[:BNODE_BASE_LEN] == BNODE_BASE
def versaQuery(self, querySrc, nsMapping=None, varBindings=None, scope=None): #print "Compiling Versa: %s" % querySrc exp = Versa.Compile(querySrc) con = Versa.CreateContext(model=self, nsMapping=nsMapping, varBindings=varBindings, scope=scope) #print "Evaluating Query" result = exp.evaluate(con)
stream = cStringIO.StringIO() enc, dec, rdr, wrtr = codecs.lookup("utf-8") stream = wrtr(stream) from Ft.Rdf.Parsers.Versa import Util as VersaUtil VersaUtil.ResultsToXml(result, stream) return stream.getvalue()
def versaQueryRaw(self, querySrc, nsMapping=None, varBindings=None, scope=None): #print "Compiling Versa: %s" % querySrc exp = Versa.Compile(querySrc) con = Versa.CreateContext(model=self, nsMapping=nsMapping, varBindings=varBindings, scope=scope) #print "Evaluating Query" return exp.evaluate(con)
import cStringIO, codecs from Ft.Rdf.Parsers import Versa from Ft.Rdf.Parsers.Versa import DataTypes def WriteVersaResult(result, stream, indent=''): if DataTypes.IsList(result): stream.write(indent) stream.write(u"<List>\n") for r in result: WriteVersaResult(r, stream, indent+' ') stream.write(indent) stream.write(u"</List>\n") elif DataTypes.IsSet(result): stream.write(indent) stream.write(u"<Set>\n") for r in DataTypes.ToList(result): WriteVersaResult(r, stream, indent+' ') stream.write(indent) stream.write(u"</Set>\n") elif DataTypes.IsString(result): stream.write(indent) stream.write(u"<String>%s</String>\n"%result) elif DataTypes.IsResource(result): stream.write(indent) stream.write(u"<Resource>%s</Resource>\n"%str(result)) elif DataTypes.IsNumber(result): stream.write(indent) stream.write(u"<Number>%s</Number>\n"%result) elif DataTypes.IsBoolean(result): stream.write(indent) stream.write(u"<Boolean>%s</Boolean>\n"%result) else: raise Exception('Unknown Versa result type') return
#import time
class FtssModelDriver: """ Cause Versa uses a driver directly """ def __init__(self, sysDriver, userDriver): self._sysDriver = sysDriver self._userDriver = userDriver self.props = self._userDriver.props return
def complete(self, subject, predicate, object, statementUri, scope, flags): return self._sysDriver.complete(subject, predicate, object, statementUri, scope, flags) + self._userDriver.complete(subject, predicate, object, statementUri, scope, flags)
def size(self, scope): return self._sysDriver.size(scope) + self._userDriver.size(scope)
def contains(self, subject, predicate, object, statementUri, scope, flags): return self._sysDriver.contains(subject, predicate, object, statementUri, scope, flags) + self._userDriver.contains(subject, predicate, object, statementUri, scope, flags)
def subjectsFromPredAndObjs(self, predicate, objects, scope): return self._sysDriver.subjectsFromPredAndObjs(predicate, objects, scope) + self._userDriver.subjectsFromPredAndObjs(predicate, objects, scope)
def subjectsFromPredsAndObj(self, predicates, object, scope): return self._sysDriver.subjectsFromPredsAndObj(predicates, object, scope) + self._userDriver.subjectsFromPredsAndObj(predicates, object, scope)
def objectsFromSubAndPreds(self, subject, predicates, scope): return self._sysDriver.objectsFromSubAndPreds(subject, predicates, scope) + self._userDriver.objectsFromSubAndPreds(subject, predicates, scope) def objectsFromSubsAndPred(self, subjects, predicate, scope): return self._sysDriver.objectsFromSubsAndPred(subjects, predicate, scope) + self._userDriver.objectsFromSubsAndPred(subjects, predicate, scope)
def resources(self, scope): return self._sysDriver.resources(scope) + self._userDriver.resources(scope)
def isResource(self, res): return self._sysDriver.isResource(res) or self._userDriver.isResource(res)
|