Viewing file: schema.py (3.61 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
'''An example of 4RDF schema support'''
from Ft.Rdf.RdfsHandler import RdfsConstraintViolation from Ft.Xml.Domlette import NonvalidatingReader from Ft.Lib import Uri
def SchemaDemo(): print "Initializing driver" print "===================" from Ft.Rdf.Drivers import Memory Memory.InitializeModule() db = Memory.CreateDb('test') db.begin() print "done."
print print "Creating RDF model" print "==================" from Ft.Rdf import Model, RdfsHandler, Statement sh = RdfsHandler.RdfsHandler() m = Model.Model(db, sh) print "done."
print print "New model seeded with RDF meta-model statements" print "===============================================" #The model should start out with all the RDF meta-model for statement in m.complete(None, None, None): print statement
print print "Reading spamschema.rdfs" print "=======================" #Read in the schema just as any other RDF file_uri = Uri.OsPathToUri('spamschema.rdfs', attemptAbsolute=True) schemadoc = NonvalidatingReader.parseUri(file_uri) print "done."
print print "Deserializing spamschema.rdfs into the model" print "============================================" from Ft.Rdf.Serializers.Dom import Serializer serializer = Serializer()
#When we do a batch add like this, turn off schema so that we can add them all more quickly #Then check them m.suspendSchema() serializer.deserialize(m, schemadoc, scope='http://schema.spam.com/') m.resumeSchema() print "done."
print print "Checking against schema" print "=======================" m.checkConsistency()
print print "Model with RDF schema read in" print "=============================" for statement in m.complete(None, None, None): print statement
print print "Reading in spamdata.rdf" print "=======================" #Read in the RDF data, here we won't suspend schema. file_uri = Uri.OsPathToUri('spamdata.rdf', attemptAbsolute=True) doc = NonvalidatingReader.parseUri(file_uri) print "done."
print print "Deserializing spamdata.rdf into the model" print "=========================================" serializer.deserialize(m, doc, 'http://spam.com/data')
print print "Model with RDF data read in" print "===========================" for statement in m.complete(None, None, None): print statement
print print "Misc tests" print "==========" print sh.isInstance('http://rblrebels.com/~robelee', 'http://schema.spam.com/#Person') print sh.isInstance('http://rblrebels.com/~spartacus', 'http://schema.spam.com/#Person') print sh.isSubClass('http://schema.spam.com/#Moderator', 'http://schema.spam.com/#Person')
#Try to add a statement that violates constraints #represents has a range of http://schema.spam.com#Organization s = Statement.Statement('http://rblrebels.com/~robelee', 'http://schema.spam.com/#represents', 'ILLEGAL') print m.complete('http://rblrebels.com/~spartacus', 'http://schema.spam.com/#represents', None) print m.complete('http://schema.spam.com/#represents', None, None)
print "--------------------------------------------------" print "If schemas are truely working, then this line should raise an exception" print "--------------------------------------------------" try: m.add(s)
#This will never get called db.commit() except RdfsConstraintViolation, e: print "RdfsConstraintViolation exception raised as expected:" print e return
if __name__ == '__main__': SchemaDemo()
|