Viewing file: __init__.py (5.83 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
__revision__ = '$Id: __init__.py,v 1.13 2003/11/20 01:14:18 jkloth Exp $'
import os, tempfile, time, sha, socket from Ft.Lib.TestSuite import TestMode
from Ft.Server.Server import Launcher from Ft.Server.Server.Lib import LogUtil, ConfigFile from Ft.Server.FtRpc import FtServerFtRpcException from Ft.Server.Client import Error, FtServerClientException
def PreprocessFiles(dirs, files): """ This function is responsible for sorting and trimming the file and directory lists as needed for proper testing. """ from Ft.Lib.TestSuite import RemoveTests, SortTests
ignored_files = [] RemoveTests(files, ignored_files)
ordered_files = [] SortTests(files, ordered_files)
ignored_dirs = [] RemoveTests(dirs, ignored_dirs)
ordered_dirs = ['Core','Commands'] SortTests(dirs, ordered_dirs)
return (dirs, files)
# -- run modes -------------------------------------------------------
class TestLauncher(Launcher.Launcher): def info(*args): pass
_sha_hash = lambda str: sha.new(str).hexdigest()
# The Dynamic and/or Private Ports are those from 49152 through 65535 FTRPC_HOST = 'localhost' FTRPC_PORT = 58803
class _ServerMode(TestMode.TestMode):
databaseInited = 0
def __init__(self): self.dbName = 'test_db' TestMode.TestMode.__init__(self, "Core Server", 1)
def _init(self, tester): if not tester.test_data.has_key('userName'): tester.test_data['userName'] = 'root' tester.test_data['password'] = 'root'
tester.test_data['ftrpc-host'] = FTRPC_HOST tester.test_data['ftrpc-port'] = FTRPC_PORT
success = 1 if not self.databaseInited: self.cfgFileName = tempfile.mktemp() f = open(self.cfgFileName, 'w') f.write(CFG_FILE) f.close() success = self.createRepo(tester) _ServerMode.databaseInited = 1 return success
def _pre(self, tester): tester.startTest("Start Server") launcher = TestLauncher(tester.test_data['userName'], _sha_hash(tester.test_data['password']), self._properties) launcher.start() time.sleep(2) tester.testDone() return
def _post(self, tester): tester.startTest("Stop Server") launcher = TestLauncher(tester.test_data['userName'], _sha_hash(tester.test_data['password']), self._properties) launcher.stop() tester.testDone() return def createRepo(self,tester): from Ft.Server.Server import SCore from Ft.Server.Server.Commands import Init from Ft.Server.Common.Install import InstallUtil
# Needed because of class attribute modifications in these files. # Without this, strange exceptions occur when running the complete # Server test suite. reload(InstallUtil) reload(Init) self._properties = ConfigFile.Read(self.cfgFileName)['TestCore'] self._properties['ConfigFile'] = self.cfgFileName self._properties['CoreId'] = 'TestCore'
tester.test_data['properties'] = self._properties
tester.startTest("Initialize repository")
# See if there is already a server present s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.connect((FTRPC_HOST, FTRPC_PORT)) except socket.error: # No servers listening on the host:port we want pass else: tester.error("Unable to test the client code as there is a server " "listening on %s:%d" % (FTRPC_HOST, FTRPC_PORT)) tester.testDone() return 0
username = tester.test_data['userName'] password = tester.test_data['password'] Init.DoInit(self._properties, username, _sha_hash(password), ['Core'], users=[(username, password)], quiet=1, confirm=0, destroyRepo=1)
repo = SCore.GetRepository(username, _sha_hash(password), LogUtil.NullLogger(), self._properties) # Add the FtRpc Server if repo.hasResource('/ftss/servers'): repo.deleteResource('/ftss/servers') serverRoot = repo.createContainer('/ftss/servers')
serverRoot.createDocument('FtRpc.server', FTRPC_SERVER)
# Add a clean docdefs container if repo.hasResource('/ftss/docdefs'): repo.deleteResource('/ftss/docdefs') repo.createContainer('/ftss/docdefs')
# Add a clean commands container if repo.hasResource('/ftss/commands'): repo.deleteResource('/ftss/commands') repo.createContainer('/ftss/commands')
repo.txCommit()
tester.testDone() # Success! return 1
MODES = [_ServerMode(), ]
CFG_FILE="""<rdf:RDF xmlns:rdf='http://www.w3.org/1999/02/22-rdf-syntax-ns#' xmlns='http://xmlns.4suite.org/4ss/properties#' > <Core rdf:ID='TestCore'> <Driver rdf:parseType='Resource'> <rdf:type resource='http://xmlns.4suite.org/4ss/properties#FlatFile'/> <Root>testserver</Root> </Driver>
<ListenAddress>%s</ListenAddress> <PidFile>%s</PidFile> <LogFile>%s</LogFile> <LogLevel>debug</LogLevel> </Core> </rdf:RDF>""" % (FTRPC_HOST, tempfile.mktemp(), os.path.join(tempfile.tempdir, 'log'))
FTRPC_SERVER = """<?xml version="1.0"?> <Server xmlns='http://xmlns.4suite.org/reserved' xmlns:dc='http://purl.org/dc/elements/1.1/'> <dc:Description>This is a test server</dc:Description> <Module>FtRpc</Module> <Handler>ftrpc</Handler> <Port>%d</Port> <Status running='1'/> <LogLevel>notice</LogLevel> <ServerName>FtRpc</ServerName> </Server>""" % FTRPC_PORT
|