From f5a889e40ced497654c03cb3e2bb24f7737417d4 Mon Sep 17 00:00:00 2001 From: Griatch Date: Sun, 2 Sep 2012 10:10:22 +0200 Subject: [PATCH] Added new process-pool runner based on AMPoule (integrated into Evennia). This allows e.g. utils.utils.run_async to offload long-running functions to a completely different subprocess entirely, offering real parallelism. Implementation is still experimental, notably not all objects can be transferred safely across the wire; also there is no concept of updating caches yet - so adding an object from the subprocess side will not be known in the main thread yet (since caches cannot yet tell the underlying database has changed). --- src/commands/default/batchprocess.py | 95 ++- src/server/__init__.py | 7 +- src/server/portal.py | 2 +- src/server/server.py | 65 +- src/settings_default.py | 41 +- src/utils/ampoule/COPYING.txt | 23 + src/utils/ampoule/EVENNIA.txt | 20 + src/utils/ampoule/__init__.py | 4 + src/utils/ampoule/child.py | 53 ++ src/utils/ampoule/commands.py | 11 + src/utils/ampoule/iampoule.py | 24 + src/utils/ampoule/main.py | 301 +++++++++ src/utils/ampoule/pool.py | 414 ++++++++++++ src/utils/ampoule/rpool.py | 67 ++ src/utils/ampoule/service.py | 69 ++ src/utils/ampoule/test/__init__.py | 0 src/utils/ampoule/test/test_process.py | 867 +++++++++++++++++++++++++ src/utils/ampoule/test/test_proxy.py | 49 ++ src/utils/ampoule/util.py | 46 ++ src/utils/idmapper/EVENNIA.txt | 24 + src/utils/idmapper/base.py | 5 + src/utils/utils.py | 195 +++++- 22 files changed, 2322 insertions(+), 60 deletions(-) create mode 100644 src/utils/ampoule/COPYING.txt create mode 100644 src/utils/ampoule/EVENNIA.txt create mode 100644 src/utils/ampoule/__init__.py create mode 100644 src/utils/ampoule/child.py create mode 100644 src/utils/ampoule/commands.py create mode 100644 src/utils/ampoule/iampoule.py create mode 100644 src/utils/ampoule/main.py create mode 100644 src/utils/ampoule/pool.py create mode 100644 src/utils/ampoule/rpool.py create mode 100644 src/utils/ampoule/service.py create mode 100644 src/utils/ampoule/test/__init__.py create mode 100644 src/utils/ampoule/test/test_process.py create mode 100644 src/utils/ampoule/test/test_proxy.py create mode 100644 src/utils/ampoule/util.py create mode 100644 src/utils/idmapper/EVENNIA.txt diff --git a/src/commands/default/batchprocess.py b/src/commands/default/batchprocess.py index d82fa4263c..2e248ed7de 100644 --- a/src/commands/default/batchprocess.py +++ b/src/commands/default/batchprocess.py @@ -31,8 +31,8 @@ from src.utils import utils # limit symbols for API inclusion __all__ = ("CmdBatchCommands", "CmdBatchCode") -HEADER_WIDTH = 70 -UTF8_ERROR = \ +_HEADER_WIDTH = 70 +_UTF8_ERROR = \ """ {rDecode error in '%s'.{n @@ -50,6 +50,35 @@ UTF8_ERROR = \ The (first) error was found with a character on line %s in the file. """ +_PROCPOOL_BATCHCMD_SOURCE = """ +from src.commands.default.batchprocess import batch_cmd_exec, step_pointer, BatchSafeCmdSet +caller.ndb.batch_stack = commands +caller.ndb.batch_stackptr = 0 +caller.ndb.batch_batchmode = "batch_commands" +caller.cmdset.add(BatchSafeCmdSet) +for inum in range(len(commands)): + print "command:", inum + caller.cmdset.add(BatchSafeCmdSet) + if not batch_cmd_exec(caller): + break + step_pointer(caller, 1) +print "leaving run ..." +""" +_PROCPOOL_BATCHCODE_SOURCE = """ +from src.commands.default.batchprocess import batch_cmd_exec, step_pointer, BatchSafeCmdSet +caller.ndb.batch_stack = commands +caller.ndb.batch_stackptr = 0 +caller.ndb.batch_batchmode = "batch_commands" +caller.cmdset.add(BatchSafeCmdSet) +for inum in range(len(commands)): + print "command:", inum + caller.cmdset.add(BatchSafeCmdSet) + if not batch_cmd_exec(caller): + break + step_pointer(caller, 1) +print "leaving run ..." +""" + #------------------------------------------------------------ # Helper functions @@ -59,7 +88,7 @@ def format_header(caller, entry): """ Formats a header """ - width = HEADER_WIDTH - 10 + width = _HEADER_WIDTH - 10 entry = entry.strip() header = utils.crop(entry, width=width) ptr = caller.ndb.batch_stackptr + 1 @@ -215,7 +244,7 @@ class CmdBatchCommands(MuxCommand): commands = BATCHCMD.parse_file(python_path) except UnicodeDecodeError, err: lnum = err.linenum - caller.msg(UTF8_ERROR % (python_path, lnum)) + caller.msg(_UTF8_ERROR % (python_path, lnum)) return if not commands: @@ -242,16 +271,24 @@ class CmdBatchCommands(MuxCommand): show_curr(caller) else: caller.msg("Running Batch-command processor - Automatic mode for %s (this might take some time) ..." % python_path) - # add the 'safety' cmdset in case the batch processing adds cmdsets to us - for inum in range(len(commands)): - # loop through the batch file - if not batch_cmd_exec(caller): - return - step_pointer(caller, 1) - # clean out the safety cmdset and clean out all other temporary attrs. - string = " Batchfile '%s' applied." % python_path - caller.msg("{G%s" % string) - purge_processor(caller) + + if settings.PROCPOOL_ENABLED: + # run in parallel process + def callback(r): + caller.msg(" {GBatchfile '%s' applied." % python_path) + purge_processor(caller) + utils.run_async(_PROCPOOL_BATCHCMD_SOURCE, commands=commands, caller=caller, at_return=callback) + else: + # run in-process (might block) + for inum in range(len(commands)): + # loop through the batch file + if not batch_cmd_exec(caller): + return + step_pointer(caller, 1) + # clean out the safety cmdset and clean out all other temporary attrs. + string = " Batchfile '%s' applied." % python_path + caller.msg("{G%s" % string) + purge_processor(caller) class CmdBatchCode(MuxCommand): """ @@ -293,7 +330,7 @@ class CmdBatchCode(MuxCommand): codes = BATCHCODE.parse_file(python_path) except UnicodeDecodeError, err: lnum = err.linenum - caller.msg(UTF8_ERROR % (python_path, lnum)) + caller.msg(_UTF8_ERROR % (python_path, lnum)) return if not codes: @@ -325,15 +362,25 @@ class CmdBatchCode(MuxCommand): show_curr(caller) else: caller.msg("Running Batch-code processor - Automatic mode for %s ..." % python_path) - # add the 'safety' cmdset in case the batch processing adds cmdsets to us - for inum in range(len(codes)): - # loop through the batch file - if not batch_code_exec(caller): - return - step_pointer(caller, 1) - string = " Batchfile '%s' applied." % python_path - caller.msg("{G%s" % string) - purge_processor(caller) + + if settings.PROCPOOL_ENABLED: + # run in parallel process + def callback(r): + caller.msg(" {GBatchfile '%s' applied." % python_path) + purge_processor(caller) + utils.run_async(_PROCPOOL_BATCHCODE_SOURCE, commands=commands, caller=caller, at_return=callback) + else: + # un in-process (will block) + for inum in range(len(commands)): + # loop through the batch file + if not batch_cmd_exec(caller): + return + step_pointer(caller, 1) + # clean out the safety cmdset and clean out all other temporary attrs. + string = " Batchfile '%s' applied." % python_path + caller.msg("{G%s" % string) + purge_processor(caller) + #------------------------------------------------------------ # State-commands for the interactive batch processor modes diff --git a/src/server/__init__.py b/src/server/__init__.py index 1e2c610945..cfe668ac71 100644 --- a/src/server/__init__.py +++ b/src/server/__init__.py @@ -1,11 +1,10 @@ """ -Makes it easier to import by grouping all relevant things already at this level. +Makes it easier to import by grouping all relevant things already at this level. -You can henceforth import most things directly from src.server +You can henceforth import most things directly from src.server Also, the initiated object manager is available as src.server.manager. """ -from src.server.models import * - +from src.server.models import * manager = ServerConfig.objects diff --git a/src/server/portal.py b/src/server/portal.py index 4b4300fb2c..19fb67b855 100644 --- a/src/server/portal.py +++ b/src/server/portal.py @@ -102,7 +102,7 @@ class Portal(object): """ print ' %(servername)s Portal (%(version)s) started.' % {'servername': SERVERNAME, 'version': VERSION} if AMP_ENABLED: - print " amp (Server): %s" % AMP_PORT + print " amp (to Server): %s" % AMP_PORT if TELNET_ENABLED: ports = ", ".join([str(port) for port in TELNET_PORTS]) ifaces = ",".join([" %s" % iface for iface in TELNET_INTERFACES if iface != '0.0.0.0']) diff --git a/src/server/server.py b/src/server/server.py index 8e02547baa..4bb18d2989 100644 --- a/src/server/server.py +++ b/src/server/server.py @@ -39,9 +39,6 @@ SERVER_RESTART = os.path.join(settings.GAME_DIR, 'server.restart') # module containing hook methods SERVER_HOOK_MODULE = mod_import(settings.AT_SERVER_STARTSTOP_MODULE) -# i18n -from django.utils.translation import ugettext as _ - #------------------------------------------------------------ # Evennia Server settings #------------------------------------------------------------ @@ -53,11 +50,25 @@ AMP_ENABLED = True AMP_HOST = settings.AMP_HOST AMP_PORT = settings.AMP_PORT +PROCPOOL_ENABLED = settings.PROCPOOL_ENABLED +PROCPOOL_DEBUG = settings.PROCPOOL_DEBUG +PROCPOOL_MIN_NPROC = settings.PROCPOOL_MIN_NPROC +PROCPOOL_MAX_NPROC = settings.PROCPOOL_MAX_NPROC +PROCPOOL_TIMEOUT = settings.PROCPOOL_TIMEOUT +PROCPOOL_IDLETIME = settings.PROCPOOL_IDLETIME +PROCPOOL_HOST = settings.PROCPOOL_HOST +PROCPOOL_PORT = settings.PROCPOOL_PORT +PROCPOOL_INTERFACE = settings.PROCPOOL_INTERFACE +PROCPOOL_UID = settings.PROCPOOL_UID +PROCPOOL_GID = settings.PROCPOOL_GID +PROCPOOL_DIRECTORY = settings.PROCPOOL_DIRECTORY + # server-channel mappings IMC2_ENABLED = settings.IMC2_ENABLED IRC_ENABLED = settings.IRC_ENABLED RSS_ENABLED = settings.RSS_ENABLED + #------------------------------------------------------------ # Evennia Main Server object #------------------------------------------------------------ @@ -208,7 +219,10 @@ class Evennia(object): Outputs server startup info to the terminal. """ print ' %(servername)s Server (%(version)s) started.' % {'servername': SERVERNAME, 'version': VERSION} - print ' amp (Portal): %s' % AMP_PORT + print ' amp (to Portal): %s' % AMP_PORT + if PROCPOOL_ENABLED: + print ' amp (Process Pool): %s' % PROCPOOL_PORT + def set_restart_mode(self, mode=None): """ @@ -319,6 +333,49 @@ if AMP_ENABLED: amp_service.setName("EvenniaPortal") EVENNIA.services.addService(amp_service) +# The ampoule twisted extension manages asynchronous process pools +# via an AMP port. It can be used to offload expensive operations +# to another process asynchronously. + +if PROCPOOL_ENABLED: + + from src.utils.ampoule import main as ampoule_main + from src.utils.ampoule import service as ampoule_service + from src.utils.ampoule import pool as ampoule_pool + from src.utils.ampoule.main import BOOTSTRAP as _BOOTSTRAP + from src.server.procpool import ProcPoolChild + + # for some reason absolute paths don't work here, only relative ones. + apackages = ("twisted", + os.path.join(os.pardir, "src", "utils", "ampoule"), + os.path.join(os.pardir, "ev"), + os.path.join(os.pardir)) + aenv = {"DJANGO_SETTINGS_MODULE":"settings", + "DATABASE_NAME":settings.DATABASES.get("default", {}).get("NAME") or settings.DATABASE_NAME} + if PROCPOOL_DEBUG: + _BOOTSTRAP = _BOOTSTRAP % "log.startLogging(sys.stderr)" + else: + _BOOTSTRAP = _BOOTSTRAP % "" + + procpool_starter = ampoule_main.ProcessStarter(packages=apackages, + env=aenv, + path=PROCPOOL_DIRECTORY, + uid=PROCPOOL_UID, + gid=PROCPOOL_GID, + bootstrap=_BOOTSTRAP, + childReactor=os.name == 'nt' and "select" or "epoll") + procpool = ampoule_pool.ProcessPool(name="ProcPool", + min=PROCPOOL_MIN_NPROC, + max=PROCPOOL_MAX_NPROC, + recycleAfter=500, + ampChild=ProcPoolChild, + starter=procpool_starter) + procpool_service = ampoule_service.AMPouleService(procpool, + ProcPoolChild, + PROCPOOL_PORT, + PROCPOOL_INTERFACE) + procpool_service.setName("ProcPool") + EVENNIA.services.addService(procpool_service) if IRC_ENABLED: diff --git a/src/settings_default.py b/src/settings_default.py index 576974aaa1..8641e36386 100644 --- a/src/settings_default.py +++ b/src/settings_default.py @@ -324,7 +324,7 @@ IRC_ENABLED = False # discussion channel 'ievennia' is on server01.mudbytes.net:5000. IMC2_ENABLED = False IMC2_NETWORK = "server01.mudbytes.net" -IMC2_PORT = 5000 +IMC2_PORT = 5000 # this is the imc2 port, not on localhost IMC2_CLIENT_PWD = "" IMC2_SERVER_PWD = "" # RSS allows to connect RSS feeds (from forum updates, blogs etc) to @@ -337,7 +337,44 @@ RSS_ENABLED=False RSS_UPDATE_INTERVAL = 60*10 # 10 minutes ###################################################################### -# Config for Django web features +# PROCPOOL setup +###################################################################### + +# Activates the Twisted AMPoule process pool. This creates a pool +# of subprocesses. When using e.g. utils.run_async Evennia will then +# be able to offload long-running processes to the pool. Process pooling +# shows much better parallelism than threading (and also makes use of +# multiple processes). But it may be slower for some +# combinations of database and operating system. Also, creating +# objects from another process will require re-syncing of caches. +PROCPOOL_ENABLED = True +# relay process stdout to log (debug mode, very spammy) +PROCPOOL_DEBUG = False +# max/min size of the process pool. Will expand up to max limit on demand. +PROCPOOL_MIN_NPROC = 5 +PROCPOOL_MAX_NPROC = 20 +# after sending a command, this is the maximum time in seconds the process +# may run without returning. After this time the process will be killed +PROCPOOL_TIMEOUT = None +# maximum time (seconds) a process may idle before being pruned from pool (if pool bigger than minsize) +PROCPOOL_IDLETIME = 20 +# only change if the port clashes with something else on the system +PROCPOOL_HOST = 'localhost' +PROCPOOL_PORT = 5001 +# 0.0.0.0 means listening to all interfaces +PROCPOOL_INTERFACE = '0.0.0.0' +# user-id and group-id to run the processes as (for OS:es supporting this). +# If you plan to run unsafe code one could experiment with setting this +# to an unprivileged user. +PROCPOOL_UID = None +PROCPOOL_GID = None +# real path to a directory where all processes will be run. If +# not given, processes will be executed in game/. +PROCPOOL_DIRECTORY = None + + +###################################################################### +# Django web features ###################################################################### # While DEBUG is False, show a regular server error page on the web diff --git a/src/utils/ampoule/COPYING.txt b/src/utils/ampoule/COPYING.txt new file mode 100644 index 0000000000..89d86c8ac4 --- /dev/null +++ b/src/utils/ampoule/COPYING.txt @@ -0,0 +1,23 @@ +Copyright (c) 2008 +Valentino Volonghi +Matthew Lefkowitz +Copyright (c) 2009 Canonical Ltd. + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/src/utils/ampoule/EVENNIA.txt b/src/utils/ampoule/EVENNIA.txt new file mode 100644 index 0000000000..09cc95ef0e --- /dev/null +++ b/src/utils/ampoule/EVENNIA.txt @@ -0,0 +1,20 @@ + +AMPOULE +------- + +https://launchpad.net/ampoule + +AMPoule is a process management system using Twisted spawnProcess +functionality. It uses AMP to pipe messages to a process Pool that it +manages. The service is called ProcPool in Evennia settings. + +AMPoule's very good, but unfortunately the source is very poorly +documented. Hence the source in this directory does not comform to +Evennia's normally rigid standards - for now we try to edit it as +little as possible so as to make it easy to apply upstream updates +down the line. + +Changes made by Evennia are minor - it's mainly limiting spam to the +log and an added ability to turn this on/off through settings. Most +Evennia related code are found in src/server/procpool.py and +src/server/server.py. diff --git a/src/utils/ampoule/__init__.py b/src/utils/ampoule/__init__.py new file mode 100644 index 0000000000..ed99c5ef2f --- /dev/null +++ b/src/utils/ampoule/__init__.py @@ -0,0 +1,4 @@ +from pool import deferToAMPProcess, pp +from commands import Shutdown, Ping, Echo +from child import AMPChild +__version__ = "0.2.1" diff --git a/src/utils/ampoule/child.py b/src/utils/ampoule/child.py new file mode 100644 index 0000000000..c9f2a08da9 --- /dev/null +++ b/src/utils/ampoule/child.py @@ -0,0 +1,53 @@ +from twisted.python import log +from twisted.internet import error +from twisted.protocols import amp +from src.utils.ampoule.commands import Echo, Shutdown, Ping + +class AMPChild(amp.AMP): + def __init__(self): + super(AMPChild, self).__init__(self) + self.shutdown = False + + def connectionLost(self, reason): + amp.AMP.connectionLost(self, reason) + from twisted.internet import reactor + try: + reactor.stop() + except error.ReactorNotRunning: + # woa, this means that something bad happened, + # most probably we received a SIGINT. Now this is only + # a problem when you use Ctrl+C to stop the main process + # because it would send the SIGINT to child processes too. + # In all other cases receiving a SIGINT here would be an + # error condition and correctly restarted. maybe we should + # use sigprocmask? + pass + if not self.shutdown: + # if the shutdown wasn't explicit we presume that it's an + # error condition and thus we return a -1 error returncode. + import os + os._exit(-1) + + def shutdown(self): + """ + This method is needed to shutdown the child gently without + generating an exception. + """ + #log.msg("Shutdown message received, goodbye.") + self.shutdown = True + return {} + Shutdown.responder(shutdown) + + def ping(self): + """ + Ping the child and return an answer + """ + return {'response': "pong"} + Ping.responder(ping) + + def echo(self, data): + """ + Echo some data through the child. + """ + return {'response': data} + Echo.responder(echo) diff --git a/src/utils/ampoule/commands.py b/src/utils/ampoule/commands.py new file mode 100644 index 0000000000..2ac61e7d04 --- /dev/null +++ b/src/utils/ampoule/commands.py @@ -0,0 +1,11 @@ +from twisted.protocols import amp + +class Shutdown(amp.Command): + responseType = amp.QuitBox + +class Ping(amp.Command): + response = [('response', amp.String())] + +class Echo(amp.Command): + arguments = [('data', amp.String())] + response = [('response', amp.String())] diff --git a/src/utils/ampoule/iampoule.py b/src/utils/ampoule/iampoule.py new file mode 100644 index 0000000000..caa3e0142a --- /dev/null +++ b/src/utils/ampoule/iampoule.py @@ -0,0 +1,24 @@ +from zope.interface import Interface + +class IStarter(Interface): + def startAMPProcess(ampChild, ampParent=None): + """ + @param ampChild: The AMP protocol spoken by the created child. + @type ampChild: L{twisted.protocols.amp.AMP} + + @param ampParent: The AMP protocol spoken by the parent. + @type ampParent: L{twisted.protocols.amp.AMP} + """ + + def startPythonProcess(prot, *args): + """ + @param prot: a L{protocol.ProcessProtocol} subclass + @type prot: L{protocol.ProcessProtocol} + + @param args: a tuple of arguments that will be passed to the + child process. + + @return: a tuple of the child process and the deferred finished. + finished triggers when the subprocess dies for any reason. + """ + diff --git a/src/utils/ampoule/main.py b/src/utils/ampoule/main.py new file mode 100644 index 0000000000..965e4a5b58 --- /dev/null +++ b/src/utils/ampoule/main.py @@ -0,0 +1,301 @@ +import os +import sys +import imp +import itertools + +from zope.interface import implements + +from twisted.internet import reactor, protocol, defer, error +from twisted.python import log, util, reflect +from twisted.protocols import amp +from twisted.python import runtime +from twisted.python.compat import set + +from src.utils.ampoule import iampoule + +gen = itertools.count() + +if runtime.platform.isWindows(): + IS_WINDOWS = True + TO_CHILD = 0 + FROM_CHILD = 1 +else: + IS_WINDOWS = False + TO_CHILD = 3 + FROM_CHILD = 4 + +class AMPConnector(protocol.ProcessProtocol): + """ + A L{ProcessProtocol} subclass that can understand and speak AMP. + + @ivar amp: the children AMP process + @type amp: L{amp.AMP} + + @ivar finished: a deferred triggered when the process dies. + @type finished: L{defer.Deferred} + + @ivar name: Unique name for the connector, much like a pid. + @type name: int + """ + + def __init__(self, proto, name=None): + """ + @param proto: An instance or subclass of L{amp.AMP} + @type proto: L{amp.AMP} + + @param name: optional name of the subprocess. + @type name: int + """ + self.finished = defer.Deferred() + self.amp = proto + self.name = name + if name is None: + self.name = gen.next() + + def signalProcess(self, signalID): + """ + Send the signal signalID to the child process + + @param signalID: The signal ID that you want to send to the + corresponding child + @type signalID: C{str} or C{int} + """ + return self.transport.signalProcess(signalID) + + def connectionMade(self): + #log.msg("Subprocess %s started." % (self.name,)) + self.amp.makeConnection(self) + + # Transport + disconnecting = False + + def write(self, data): + if IS_WINDOWS: + self.transport.write(data) + else: + self.transport.writeToChild(TO_CHILD, data) + + def loseConnection(self): + self.transport.closeChildFD(TO_CHILD) + self.transport.closeChildFD(FROM_CHILD) + self.transport.loseConnection() + + def getPeer(self): + return ('subprocess %i' % self.name,) + + def getHost(self): + return ('Evennia Server',) + + def childDataReceived(self, childFD, data): + if childFD == FROM_CHILD: + self.amp.dataReceived(data) + return + self.errReceived(data) + + def errReceived(self, data): + for line in data.strip().splitlines(): + log.msg("FROM %s: %s" % (self.name, line)) + + def processEnded(self, status): + #log.msg("Process: %s ended" % (self.name,)) + self.amp.connectionLost(status) + if status.check(error.ProcessDone): + self.finished.callback('') + return + self.finished.errback(status) + +BOOTSTRAP = """\ +import sys + +def main(reactor, ampChildPath): + from twisted.application import reactors + reactors.installReactor(reactor) + + from twisted.python import log + %s + + from twisted.internet import reactor, stdio + from twisted.python import reflect, runtime + + ampChild = reflect.namedAny(ampChildPath) + ampChildInstance = ampChild(*sys.argv[1:-2]) + if runtime.platform.isWindows(): + stdio.StandardIO(ampChildInstance) + else: + stdio.StandardIO(ampChildInstance, %s, %s) + enter = getattr(ampChildInstance, '__enter__', None) + if enter is not None: + enter() + try: + reactor.run() + except: + if enter is not None: + info = sys.exc_info() + if not ampChildInstance.__exit__(*info): + raise + else: + raise + else: + if enter is not None: + ampChildInstance.__exit__(None, None, None) + +main(sys.argv[-2], sys.argv[-1]) +""" % ('%s', TO_CHILD, FROM_CHILD) + +# in the first spot above, either insert an empty string or +# 'log.startLogging(sys.stderr)' +# to start logging + +class ProcessStarter(object): + + implements(iampoule.IStarter) + + connectorFactory = AMPConnector + def __init__(self, bootstrap=BOOTSTRAP, args=(), env={}, + path=None, uid=None, gid=None, usePTY=0, + packages=(), childReactor="select"): + """ + @param bootstrap: Startup code for the child process + @type bootstrap: C{str} + + @param args: Arguments that should be supplied to every child + created. + @type args: C{tuple} of C{str} + + @param env: Environment variables that should be present in the + child environment + @type env: C{dict} + + @param path: Path in which to run the child + @type path: C{str} + + @param uid: if defined, the uid used to run the new process. + @type uid: C{int} + + @param gid: if defined, the gid used to run the new process. + @type gid: C{int} + + @param usePTY: Should the child processes use PTY processes + @type usePTY: 0 or 1 + + @param packages: A tuple of packages that should be guaranteed + to be importable in the child processes + @type packages: C{tuple} of C{str} + + @param childReactor: a string that sets the reactor for child + processes + @type childReactor: C{str} + """ + self.bootstrap = bootstrap + self.args = args + self.env = env + self.path = path + self.uid = uid + self.gid = gid + self.usePTY = usePTY + self.packages = ("ampoule",) + packages + self.packages = packages + self.childReactor = childReactor + + def __repr__(self): + """ + Represent the ProcessStarter with a string. + """ + return """ProcessStarter(bootstrap=%r, + args=%r, + env=%r, + path=%r, + uid=%r, + gid=%r, + usePTY=%r, + packages=%r, + childReactor=%r)""" % (self.bootstrap, + self.args, + self.env, + self.path, + self.uid, + self.gid, + self.usePTY, + self.packages, + self.childReactor) + + def _checkRoundTrip(self, obj): + """ + Make sure that an object will properly round-trip through 'qual' and + 'namedAny'. + + Raise a L{RuntimeError} if they aren't. + """ + tripped = reflect.namedAny(reflect.qual(obj)) + if tripped is not obj: + raise RuntimeError("importing %r is not the same as %r" % + (reflect.qual(obj), obj)) + + def startAMPProcess(self, ampChild, ampParent=None, ampChildArgs=()): + """ + @param ampChild: a L{ampoule.child.AMPChild} subclass. + @type ampChild: L{ampoule.child.AMPChild} + + @param ampParent: an L{amp.AMP} subclass that implements the parent + protocol for this process pool + @type ampParent: L{amp.AMP} + """ + self._checkRoundTrip(ampChild) + fullPath = reflect.qual(ampChild) + if ampParent is None: + ampParent = amp.AMP + prot = self.connectorFactory(ampParent()) + args = ampChildArgs + (self.childReactor, fullPath) + return self.startPythonProcess(prot, *args) + + + def startPythonProcess(self, prot, *args): + """ + @param prot: a L{protocol.ProcessProtocol} subclass + @type prot: L{protocol.ProcessProtocol} + + @param args: a tuple of arguments that will be added after the + ones in L{self.args} to start the child process. + + @return: a tuple of the child process and the deferred finished. + finished triggers when the subprocess dies for any reason. + """ + spawnProcess(prot, self.bootstrap, self.args+args, env=self.env, + path=self.path, uid=self.uid, gid=self.gid, + usePTY=self.usePTY, packages=self.packages) + + # XXX: we could wait for startup here, but ... is there really any + # reason to? the pipe should be ready for writing. The subprocess + # might not start up properly, but then, a subprocess might shut down + # at any point too. So we just return amp and have this piece to be + # synchronous. + return prot.amp, prot.finished + +def spawnProcess(processProtocol, bootstrap, args=(), env={}, + path=None, uid=None, gid=None, usePTY=0, + packages=()): + env = env.copy() + + pythonpath = [] + for pkg in packages: + p = os.path.split(imp.find_module(pkg)[1])[0] + if p.startswith(os.path.join(sys.prefix, 'lib')): + continue + pythonpath.append(p) + pythonpath = list(set(pythonpath)) + pythonpath.extend(env.get('PYTHONPATH', '').split(os.pathsep)) + env['PYTHONPATH'] = os.pathsep.join(pythonpath) + args = (sys.executable, '-c', bootstrap) + args + # childFDs variable is needed because sometimes child processes + # misbehave and use stdout to output stuff that should really go + # to stderr. Of course child process might even use the wrong FDs + # that I'm using here, 3 and 4, so we are going to fix all these + # issues when I add support for the configuration object that can + # fix this stuff in a more configurable way. + if IS_WINDOWS: + return reactor.spawnProcess(processProtocol, sys.executable, args, + env, path, uid, gid, usePTY) + else: + return reactor.spawnProcess(processProtocol, sys.executable, args, + env, path, uid, gid, usePTY, + childFDs={0:"w", 1:"r", 2:"r", 3:"w", 4:"r"}) diff --git a/src/utils/ampoule/pool.py b/src/utils/ampoule/pool.py new file mode 100644 index 0000000000..05e59cdaa3 --- /dev/null +++ b/src/utils/ampoule/pool.py @@ -0,0 +1,414 @@ +import time +import random +import heapq +import itertools +import signal +choice = random.choice +now = time.time +count = itertools.count().next +pop = heapq.heappop + +from twisted.internet import defer, task, error +from twisted.python import log, failure + +from src.utils.ampoule import commands, main + +try: + DIE = signal.SIGKILL +except AttributeError: + # Windows doesn't have SIGKILL, let's just use SIGTERM then + DIE = signal.SIGTERM + +class ProcessPool(object): + """ + This class generalizes the functionality of a pool of + processes to which work can be dispatched. + + @ivar finished: Boolean flag, L{True} when the pool is finished. + + @ivar started: Boolean flag, L{True} when the pool is started. + + @ivar name: Optional name for the process pool + + @ivar min: Minimum number of subprocesses to set up + + @ivar max: Maximum number of subprocesses to set up + + @ivar maxIdle: Maximum number of seconds of indleness in a child + + @ivar starter: A process starter instance that provides + L{iampoule.IStarter}. + + @ivar recycleAfter: Maximum number of calls before restarting a + subprocess, 0 to not recycle. + + @ivar ampChild: The child AMP protocol subclass with the commands + that the child should implement. + + @ivar ampParent: The parent AMP protocol subclass with the commands + that the parent should implement. + + @ivar timeout: The general timeout (in seconds) for every child + process call. + """ + + finished = False + started = False + name = None + + def __init__(self, ampChild=None, ampParent=None, min=5, max=20, + name=None, maxIdle=20, recycleAfter=500, starter=None, + timeout=None, timeout_signal=DIE, ampChildArgs=()): + self.starter = starter + self.ampChildArgs = tuple(ampChildArgs) + if starter is None: + self.starter = main.ProcessStarter(packages=("twisted", "ampoule")) + self.ampParent = ampParent + self.ampChild = ampChild + if ampChild is None: + from src.utils.ampoule.child import AMPChild + self.ampChild = AMPChild + self.min = min + self.max = max + self.name = name + self.maxIdle = maxIdle + self.recycleAfter = recycleAfter + self.timeout = timeout + self.timeout_signal = timeout_signal + self._queue = [] + + self.processes = set() + self.ready = set() + self.busy = set() + self._finishCallbacks = {} + self._lastUsage = {} + self._calls = {} + self.looping = task.LoopingCall(self._pruneProcesses) + self.looping.start(maxIdle, now=False) + + def start(self, ampChild=None): + """ + Starts the ProcessPool with a given child protocol. + + @param ampChild: a L{ampoule.child.AMPChild} subclass. + @type ampChild: L{ampoule.child.AMPChild} subclass + """ + if ampChild is not None and not self.started: + self.ampChild = ampChild + self.finished = False + self.started = True + return self.adjustPoolSize() + + def _pruneProcesses(self): + """ + Remove idle processes from the pool. + """ + n = now() + d = [] + for child, lastUse in self._lastUsage.iteritems(): + if len(self.processes) > self.min and (n - lastUse) > self.maxIdle: + # we are setting lastUse when processing finishes, it + # might be processing right now + if child not in self.busy: + # we need to remove this child from the ready set + # and the processes set because otherwise it might + # get calls from doWork + self.ready.discard(child) + self.processes.discard(child) + d.append(self.stopAWorker(child)) + return defer.DeferredList(d) + + def _pruneProcess(self, child): + """ + Remove every trace of the process from this instance. + """ + self.processes.discard(child) + self.ready.discard(child) + self.busy.discard(child) + self._lastUsage.pop(child, None) + self._calls.pop(child, None) + self._finishCallbacks.pop(child, None) + + def _addProcess(self, child, finished): + """ + Adds the newly created child process to the pool. + """ + def restart(child, reason): + log.msg("FATAL: Restarting after %s" % (reason,)) + self._pruneProcess(child) + return self.startAWorker() + + def dieGently(data, child): + #log.msg("STOPPING: '%s'" % (data,)) + self._pruneProcess(child) + + self.processes.add(child) + self.ready.add(child) + finished.addCallback(dieGently, child + ).addErrback(lambda reason: restart(child, reason)) + self._finishCallbacks[child] = finished + self._lastUsage[child] = now() + self._calls[child] = 0 + self._catchUp() + + def _catchUp(self): + """ + If there are queued items in the list then run them. + """ + if self._queue: + _, (d, command, kwargs) = pop(self._queue) + self._cb_doWork(command, **kwargs).chainDeferred(d) + + def _handleTimeout(self, child): + """ + One of the children went timeout, we need to deal with it + + @param child: The child process + @type child: L{child.AMPChild} + """ + try: + child.transport.signalProcess(self.timeout_signal) + except error.ProcessExitedAlready: + # don't do anything then... we are too late + # or we were too early to call + pass + + def startAWorker(self): + """ + Start a worker and set it up in the system. + """ + if self.finished: + # this is a race condition: basically if we call self.stop() + # while a process is being recycled what happens is that the + # process will be created anyway. By putting a check for + # self.finished here we make sure that in no way we are creating + # processes when the pool is stopped. + # The race condition comes from the fact that: + # stopAWorker() is asynchronous while stop() is synchronous. + # so if you call: + # pp.stopAWorker(child).addCallback(lambda _: pp.startAWorker()) + # pp.stop() + # You might end up with a dirty reactor due to the stop() + # returning before the new process is created. + return + startAMPProcess = self.starter.startAMPProcess + child, finished = startAMPProcess(self.ampChild, + ampParent=self.ampParent, + ampChildArgs=self.ampChildArgs) + return self._addProcess(child, finished) + + def _cb_doWork(self, command, _timeout=None, _deadline=None, + **kwargs): + """ + Go and call the command. + + @param command: The L{amp.Command} to be executed in the child + @type command: L{amp.Command} + + @param _d: The deferred for the calling code. + @type _d: L{defer.Deferred} + + @param _timeout: The timeout for this call only + @type _timeout: C{int} + @param _deadline: The deadline for this call only + @type _deadline: C{int} + """ + timeoutCall = None + deadlineCall = None + + def _returned(result, child, is_error=False): + def cancelCall(call): + if call is not None and call.active(): + call.cancel() + cancelCall(timeoutCall) + cancelCall(deadlineCall) + self.busy.discard(child) + if not die: + # we are not marked to be removed, so add us back to + # the ready set and let's see if there's some catching + # up to do + self.ready.add(child) + self._catchUp() + else: + # We should die and we do, then we start a new worker + # to pick up stuff from the queue otherwise we end up + # without workers and the queue will remain there. + self.stopAWorker(child).addCallback(lambda _: self.startAWorker()) + self._lastUsage[child] = now() + # we can't do recycling here because it's too late and + # the process might have received tons of calls already + # which would make it run more calls than what is + # configured to do. + return result + + die = False + child = self.ready.pop() + self.busy.add(child) + self._calls[child] += 1 + + # Let's see if this call goes over the recycling barrier + if self.recycleAfter and self._calls[child] >= self.recycleAfter: + # it does so mark this child, using a closure, to be + # removed at the end of the call. + die = True + + # If the command doesn't require a response then callRemote + # returns nothing, so we prepare for that too. + # We also need to guard against timeout errors for child + # and local timeout parameter overrides the global one + if _timeout == 0: + timeout = _timeout + else: + timeout = _timeout or self.timeout + + if timeout is not None: + from twisted.internet import reactor + timeoutCall = reactor.callLater(timeout, self._handleTimeout, child) + + if _deadline is not None: + from twisted.internet import reactor + delay = max(0, _deadline - reactor.seconds()) + deadlineCall = reactor.callLater(delay, self._handleTimeout, + child) + + return defer.maybeDeferred(child.callRemote, command, **kwargs + ).addCallback(_returned, child + ).addErrback(_returned, child, is_error=True) + + def callRemote(self, *args, **kwargs): + """ + Proxy call to keep the API homogeneous across twisted's RPCs + """ + return self.doWork(*args, **kwargs) + + def doWork(self, command, **kwargs): + """ + Sends the command to one child. + + @param command: an L{amp.Command} type object. + @type command: L{amp.Command} + + @param kwargs: dictionary containing the arguments for the command. + """ + if self.ready: # there are unused processes, let's use them + return self._cb_doWork(command, **kwargs) + else: + if len(self.processes) < self.max: + # no unused but we can start some new ones + # since startAWorker is synchronous we won't have a + # race condition here in case of multiple calls to + # doWork, so we will end up in the else clause in case + # of such calls: + # Process pool with min=1, max=1, recycle_after=1 + # [call(Command) for x in xrange(BIG_NUMBER)] + self.startAWorker() + return self._cb_doWork(command, **kwargs) + else: + # No one is free... just queue up and wait for a process + # to start and pick up the first item in the queue. + d = defer.Deferred() + self._queue.append((count(), (d, command, kwargs))) + return d + + def stopAWorker(self, child=None): + """ + Gently stop a child so that it's not restarted anymore + + @param command: an L{ampoule.child.AmpChild} type object. + @type command: L{ampoule.child.AmpChild} or None + + """ + if child is None: + if self.ready: + child = self.ready.pop() + else: + child = choice(list(self.processes)) + child.callRemote(commands.Shutdown + # This is needed for timeout handling, the reason is pretty hard + # to explain but I'll try to: + # There's another small race condition in the system. If the + # child process is shut down by a signal and you try to stop + # the process pool immediately afterwards, like tests would do, + # the child AMP object would still be in the system and trying + # to call the command Shutdown on it would result in the same + # errback that we got originally, for this reason we need to + # trap it now so that it doesn't raise by not being handled. + # Does this even make sense to you? + ).addErrback(lambda reason: reason.trap(error.ProcessTerminated)) + return self._finishCallbacks[child] + + def _startSomeWorkers(self): + """ + Start a bunch of workers until we reach the max number of them. + """ + if len(self.processes) < self.max: + self.startAWorker() + + def adjustPoolSize(self, min=None, max=None): + """ + Change the pool size to be at least min and less than max, + useful when you change the values of max and min in the instance + and you want the pool to adapt to them. + """ + if min is None: + min = self.min + if max is None: + max = self.max + + assert min >= 0, 'minimum is negative' + assert min <= max, 'minimum is greater than maximum' + + self.min = min + self.max = max + + l = [] + if self.started: + + for i in xrange(len(self.processes)-self.max): + l.append(self.stopAWorker()) + while len(self.processes) < self.min: + self.startAWorker() + + return defer.DeferredList(l)#.addCallback(lambda _: self.dumpStats()) + + def stop(self): + """ + Stops the process protocol. + """ + self.finished = True + l = [self.stopAWorker(process) for process in self.processes] + def _cb(_): + if self.looping.running: + self.looping.stop() + + return defer.DeferredList(l).addCallback(_cb) + + def dumpStats(self): + log.msg("ProcessPool stats:") + log.msg('\tworkers: %s' % len(self.processes)) + log.msg('\ttimeout: %s' % (self.timeout)) + log.msg('\tparent: %r' % (self.ampParent,)) + log.msg('\tchild: %r' % (self.ampChild,)) + log.msg('\tmax idle: %r' % (self.maxIdle,)) + log.msg('\trecycle after: %r' % (self.recycleAfter,)) + log.msg('\tProcessStarter:') + log.msg('\t\t%r' % (self.starter,)) + +pp = None + +def deferToAMPProcess(command, **kwargs): + """ + Helper function that sends a command to the default process pool + and returns a deferred that fires when the result of the + subprocess computation is ready. + + @param command: an L{amp.Command} subclass + @param kwargs: dictionary containing the arguments for the command. + + @return: a L{defer.Deferred} with the data from the subprocess. + """ + global pp + if pp is None: + pp = ProcessPool() + return pp.start().addCallback(lambda _: pp.doWork(command, **kwargs)) + return pp.doWork(command, **kwargs) diff --git a/src/utils/ampoule/rpool.py b/src/utils/ampoule/rpool.py new file mode 100644 index 0000000000..e99e864b9d --- /dev/null +++ b/src/utils/ampoule/rpool.py @@ -0,0 +1,67 @@ +""" +This module implements a remote pool to use with AMP. +""" +from zope.interface import implements + +from twisted.protocols import amp +from twisted.internet import utils + +class AMPProxy(amp.AMP): + """ + A Proxy AMP protocol that forwards calls to a wrapped + callRemote-like callable. + """ + def __init__(self, wrapped, child): + """ + @param wrapped: A callRemote-like callable that takes an + L{amp.Command} as first argument and other + optional keyword arguments afterwards. + @type wrapped: L{callable}. + + @param child: The protocol class of the process pool children. + Used to forward only the methods that are actually + understood correctly by them. + @type child: L{amp.AMP} + """ + amp.AMP.__init__(self) + self.wrapped = wrapped + self.child = child + + localCd = set(self._commandDispatch.keys()) + childCd = set(self.child._commandDispatch.keys()) + assert localCd.intersection(childCd) == set(["StartTLS"]), \ + "Illegal method overriding in Proxy" + + def locateResponder(self, name): + """ + This is a custom locator to forward calls to the children + processes while keeping the ProcessPool a transparent MITM. + + This way of working has a few limitations, the first of which + is the fact that children won't be able to take advantage of + any dynamic locator except for the default L{CommandLocator} + that is based on the _commandDispatch attribute added by the + metaclass. This limitation might be lifted in the future. + """ + if name == "StartTLS": + # This is a special case where the proxy takes precedence + return amp.AMP.locateResponder(self, "StartTLS") + + # Get the dict of commands from the child AMP implementation. + cd = self.child._commandDispatch + if name in cd: + # If the command is there, then we forward stuff to it. + commandClass, _responderFunc = cd[name] + # We need to wrap the doWork function because the wrapping + # call doesn't pass the command as first argument since it + # thinks that we are the actual receivers and callable is + # already the responder while it isn't. + doWork = lambda **kw: self.wrapped(commandClass, **kw) + # Now let's call the right function and wrap the result + # dictionary. + return self._wrapWithSerialization(doWork, commandClass) + # of course if the name of the command is not in the child it + # means that it might be in this class, so fallback to the + # default behavior of this module. + return amp.AMP.locateResponder(self, name) + diff --git a/src/utils/ampoule/service.py b/src/utils/ampoule/service.py new file mode 100644 index 0000000000..7b24d6fb33 --- /dev/null +++ b/src/utils/ampoule/service.py @@ -0,0 +1,69 @@ +import os + +from twisted.application import service +from twisted.internet.protocol import ServerFactory + +def makeService(options): + """ + Create the service for the application + """ + ms = service.MultiService() + + from src.utils.ampoule.pool import ProcessPool + from src.utils.ampoule.main import ProcessStarter + name = options['name'] + ampport = options['ampport'] + ampinterface = options['ampinterface'] + child = options['child'] + parent = options['parent'] + min = options['min'] + max = options['max'] + maxIdle = options['max_idle'] + recycle = options['recycle'] + childReactor = options['reactor'] + timeout = options['timeout'] + + starter = ProcessStarter(packages=("twisted", "ampoule"), childReactor=childReactor) + pp = ProcessPool(child, parent, min, max, name, maxIdle, recycle, starter, timeout) + svc = AMPouleService(pp, child, ampport, ampinterface) + svc.setServiceParent(ms) + + return ms + +class AMPouleService(service.Service): + def __init__(self, pool, child, port, interface): + self.pool = pool + self.port = port + self.child = child + self.interface = interface + self.server = None + + def startService(self): + """ + Before reactor.run() is called we setup the system. + """ + service.Service.startService(self) + from src.utils.ampoule import rpool + from twisted.internet import reactor + + try: + factory = ServerFactory() + factory.protocol = lambda: rpool.AMPProxy(wrapped=self.pool.doWork, + child=self.child) + self.server = reactor.listenTCP(self.port, + factory, + interface=self.interface) + # this is synchronous when it's the startup, even though + # it returns a deferred. But we need to run it after the + # first cycle in order to wait for signal handlers to be + # installed. + reactor.callLater(0, self.pool.start) + except: + import traceback + print traceback.format_exc() + + def stopService(self): + service.Service.stopService(self) + if self.server is not None: + self.server.stopListening() + return self.pool.stop() diff --git a/src/utils/ampoule/test/__init__.py b/src/utils/ampoule/test/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/utils/ampoule/test/test_process.py b/src/utils/ampoule/test/test_process.py new file mode 100644 index 0000000000..3569ed6c3d --- /dev/null +++ b/src/utils/ampoule/test/test_process.py @@ -0,0 +1,867 @@ + +from signal import SIGHUP +import math +import os +import os.path +from cStringIO import StringIO as sio +import tempfile + +from twisted.internet import error, defer, reactor +from twisted.python import failure, reflect +from twisted.trial import unittest +from twisted.protocols import amp +from src.utils.ampoule import main, child, commands, pool + +class ShouldntHaveBeenCalled(Exception): + pass + +def _raise(_): + raise ShouldntHaveBeenCalled(_) + +class _FakeT(object): + closeStdinCalled = False + def __init__(self, s): + self.s = s + + def closeStdin(self): + self.closeStdinCalled = True + + def write(self, data): + self.s.write(data) + +class FakeAMP(object): + connector = None + reason = None + def __init__(self, s): + self.s = s + + def makeConnection(self, connector): + if self.connector is not None: + raise Exception("makeConnection called twice") + self.connector = connector + + def connectionLost(self, reason): + if self.reason is not None: + raise Exception("connectionLost called twice") + self.reason = reason + + def dataReceived(self, data): + self.s.write(data) + +class Ping(amp.Command): + arguments = [('data', amp.String())] + response = [('response', amp.String())] + +class Pong(amp.Command): + arguments = [('data', amp.String())] + response = [('response', amp.String())] + +class Pid(amp.Command): + response = [('pid', amp.Integer())] + +class Reactor(amp.Command): + response = [('classname', amp.String())] + +class NoResponse(amp.Command): + arguments = [('arg', amp.String())] + requiresAnswer = False + +class GetResponse(amp.Command): + response = [("response", amp.String())] + +class Child(child.AMPChild): + def ping(self, data): + return self.callRemote(Pong, data=data) + Ping.responder(ping) + +class PidChild(child.AMPChild): + def pid(self): + import os + return {'pid': os.getpid()} + Pid.responder(pid) + +class NoResponseChild(child.AMPChild): + _set = False + def noresponse(self, arg): + self._set = arg + return {} + NoResponse.responder(noresponse) + + def getresponse(self): + return {"response": self._set} + GetResponse.responder(getresponse) + +class ReactorChild(child.AMPChild): + def reactor(self): + from twisted.internet import reactor + return {'classname': reactor.__class__.__name__} + Reactor.responder(reactor) + +class First(amp.Command): + arguments = [('data', amp.String())] + response = [('response', amp.String())] + +class Second(amp.Command): + pass + +class WaitingChild(child.AMPChild): + deferred = None + def first(self, data): + self.deferred = defer.Deferred() + return self.deferred.addCallback(lambda _: {'response': data}) + First.responder(first) + def second(self): + self.deferred.callback('') + return {} + Second.responder(second) + +class Die(amp.Command): + pass + +class BadChild(child.AMPChild): + def die(self): + self.shutdown = False + self.transport.loseConnection() + return {} + Die.responder(die) + + +class Write(amp.Command): + response = [("response", amp.String())] + pass + + +class Writer(child.AMPChild): + + def __init__(self, data='hello'): + child.AMPChild.__init__(self) + self.data = data + + def write(self): + return {'response': self.data} + Write.responder(write) + + +class GetCWD(amp.Command): + + response = [("cwd", amp.String())] + + +class TempDirChild(child.AMPChild): + + def __init__(self, directory=None): + child.AMPChild.__init__(self) + self.directory = directory + + def __enter__(self): + directory = tempfile.mkdtemp() + os.chdir(directory) + if self.directory is not None: + os.mkdir(self.directory) + os.chdir(self.directory) + + def __exit__(self, exc_type, exc_val, exc_tb): + cwd = os.getcwd() + os.chdir('..') + os.rmdir(cwd) + + def getcwd(self): + return {'cwd': os.getcwd()} + GetCWD.responder(getcwd) + + +class TestAMPConnector(unittest.TestCase): + def setUp(self): + """ + The only reason why this method exists is to let 'trial ampoule' + to install the signal handlers (#3178 for reference). + """ + super(TestAMPConnector, self).setUp() + d = defer.Deferred() + reactor.callLater(0, d.callback, None) + return d + + def _makeConnector(self, s, sa): + a = FakeAMP(sa) + ac = main.AMPConnector(a) + assert ac.name is not None + ac.transport = _FakeT(s) + return ac + + def test_protocol(self): + """ + Test that outReceived writes to AMP and that it triggers the + finished deferred once the process ended. + """ + s = sio() + sa = sio() + ac = self._makeConnector(s, sa) + + for x in xrange(99): + ac.childDataReceived(4, str(x)) + + ac.processEnded(failure.Failure(error.ProcessDone(0))) + return ac.finished.addCallback( + lambda _: self.assertEqual(sa.getvalue(), ''.join(str(x) for x in xrange(99))) + ) + + def test_protocol_failing(self): + """ + Test that a failure in the process termination is correctly + propagated to the finished deferred. + """ + s = sio() + sa = sio() + ac = self._makeConnector(s, sa) + + ac.finished.addCallback(_raise) + fail = failure.Failure(error.ProcessTerminated()) + self.assertFailure(ac.finished, error.ProcessTerminated) + ac.processEnded(fail) + + def test_startProcess(self): + """ + Test that startProcess actually starts a subprocess and that + it receives data back from the process through AMP. + """ + s = sio() + a = FakeAMP(s) + STRING = "ciao" + BOOT = """\ +import sys, os +def main(arg): + os.write(4, arg) +main(sys.argv[1]) +""" + starter = main.ProcessStarter(bootstrap=BOOT, + args=(STRING,), + packages=("twisted", "ampoule")) + + amp, finished = starter.startPythonProcess(main.AMPConnector(a)) + def _eb(reason): + print reason + finished.addErrback(_eb) + return finished.addCallback(lambda _: self.assertEquals(s.getvalue(), STRING)) + + def test_failing_deferToProcess(self): + """ + Test failing subprocesses and the way they terminate and preserve + failing information. + """ + s = sio() + a = FakeAMP(s) + STRING = "ciao" + BOOT = """\ +import sys +def main(arg): + raise Exception(arg) +main(sys.argv[1]) +""" + starter = main.ProcessStarter(bootstrap=BOOT, args=(STRING,), packages=("twisted", "ampoule")) + ready, finished = starter.startPythonProcess(main.AMPConnector(a), "I'll be ignored") + + self.assertFailure(finished, error.ProcessTerminated) + finished.addErrback(lambda reason: self.assertEquals(reason.getMessage(), STRING)) + return finished + + def test_env_setting(self): + """ + Test that and environment variable passed to the process starter + is correctly passed to the child process. + """ + s = sio() + a = FakeAMP(s) + STRING = "ciao" + BOOT = """\ +import sys, os +def main(): + os.write(4, os.getenv("FOOBAR")) +main() +""" + starter = main.ProcessStarter(bootstrap=BOOT, + packages=("twisted", "ampoule"), + env={"FOOBAR": STRING}) + amp, finished = starter.startPythonProcess(main.AMPConnector(a), "I'll be ignored") + def _eb(reason): + print reason + finished.addErrback(_eb) + return finished.addCallback(lambda _: self.assertEquals(s.getvalue(), STRING)) + + def test_startAMPProcess(self): + """ + Test that you can start an AMP subprocess and that it correctly + accepts commands and correctly answers them. + """ + STRING = "ciao" + + starter = main.ProcessStarter(packages=("twisted", "ampoule")) + c, finished = starter.startAMPProcess(child.AMPChild) + c.callRemote(commands.Echo, data=STRING + ).addCallback(lambda response: + self.assertEquals(response['response'], STRING) + ).addCallback(lambda _: c.callRemote(commands.Shutdown)) + return finished + + def test_BootstrapContext(self): + starter = main.ProcessStarter(packages=('twisted', 'ampoule')) + c, finished = starter.startAMPProcess(TempDirChild) + cwd = [] + def checkBootstrap(response): + cwd.append(response['cwd']) + self.assertNotEquals(cwd, os.getcwd()) + d = c.callRemote(GetCWD) + d.addCallback(checkBootstrap) + d.addCallback(lambda _: c.callRemote(commands.Shutdown)) + finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0]))) + return finished + + def test_BootstrapContextInstance(self): + starter = main.ProcessStarter(packages=('twisted', 'ampoule')) + c, finished = starter.startAMPProcess(TempDirChild, + ampChildArgs=('foo',)) + cwd = [] + def checkBootstrap(response): + cwd.append(response['cwd']) + self.assertTrue(cwd[0].endswith('/foo')) + d = c.callRemote(GetCWD) + d.addCallback(checkBootstrap) + d.addCallback(lambda _: c.callRemote(commands.Shutdown)) + finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0]))) + return finished + + def test_startAMPAndParentProtocol(self): + """ + Test that you can start an AMP subprocess and the children can + call methods on their parent. + """ + DATA = "CIAO" + APPEND = "123" + + class Parent(amp.AMP): + def pong(self, data): + return {'response': DATA+APPEND} + Pong.responder(pong) + + starter = main.ProcessStarter(packages=("twisted", "ampoule")) + + subp, finished = starter.startAMPProcess(ampChild=Child, ampParent=Parent) + subp.callRemote(Ping, data=DATA + ).addCallback(lambda response: + self.assertEquals(response['response'], DATA+APPEND) + ).addCallback(lambda _: subp.callRemote(commands.Shutdown)) + return finished + + def test_roundtripError(self): + """ + Test that invoking a child using an unreachable class raises + a L{RunTimeError} . + """ + class Child(child.AMPChild): + pass + + starter = main.ProcessStarter(packages=("twisted", "ampoule")) + + self.assertRaises(RuntimeError, starter.startAMPProcess, ampChild=Child) + +class TestProcessPool(unittest.TestCase): + + def test_startStopWorker(self): + """ + Test that starting and stopping a worker keeps the state of + the process pool consistent. + """ + pp = pool.ProcessPool() + self.assertEquals(pp.started, False) + self.assertEquals(pp.finished, False) + self.assertEquals(pp.processes, set()) + self.assertEquals(pp._finishCallbacks, {}) + + def _checks(): + self.assertEquals(pp.started, False) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), 1) + self.assertEquals(len(pp._finishCallbacks), 1) + return pp.stopAWorker() + + def _closingUp(_): + self.assertEquals(pp.started, False) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), 0) + self.assertEquals(pp._finishCallbacks, {}) + pp.startAWorker() + return _checks().addCallback(_closingUp).addCallback(lambda _: pp.stop()) + + def test_startAndStop(self): + """ + Test that a process pool's start and stop method create the + expected number of workers and keep state consistent in the + process pool. + """ + pp = pool.ProcessPool() + self.assertEquals(pp.started, False) + self.assertEquals(pp.finished, False) + self.assertEquals(pp.processes, set()) + self.assertEquals(pp._finishCallbacks, {}) + + def _checks(_): + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.min) + self.assertEquals(len(pp._finishCallbacks), pp.min) + return pp.stop() + + def _closingUp(_): + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, True) + self.assertEquals(len(pp.processes), 0) + self.assertEquals(pp._finishCallbacks, {}) + return pp.start().addCallback(_checks).addCallback(_closingUp) + + def test_adjustPoolSize(self): + """ + Test that calls to pool.adjustPoolSize are correctly handled. + """ + pp = pool.ProcessPool(min=10) + self.assertEquals(pp.started, False) + self.assertEquals(pp.finished, False) + self.assertEquals(pp.processes, set()) + self.assertEquals(pp._finishCallbacks, {}) + + def _resize1(_): + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.min) + self.assertEquals(len(pp._finishCallbacks), pp.min) + return pp.adjustPoolSize(min=2, max=3) + + def _resize2(_): + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(pp.max, 3) + self.assertEquals(pp.min, 2) + self.assertEquals(len(pp.processes), pp.max) + self.assertEquals(len(pp._finishCallbacks), pp.max) + + def _resize3(_): + self.assertRaises(AssertionError, pp.adjustPoolSize, min=-1, max=5) + self.assertRaises(AssertionError, pp.adjustPoolSize, min=5, max=1) + return pp.stop() + + return pp.start( + ).addCallback(_resize1 + ).addCallback(_resize2 + ).addCallback(_resize3) + + def test_childRestart(self): + """ + Test that a failing child process is immediately restarted. + """ + pp = pool.ProcessPool(ampChild=BadChild, min=1) + STRING = "DATA" + + def _checks(_): + d = pp._finishCallbacks.values()[0] + pp.doWork(Die).addErrback(lambda _: None) + return d.addBoth(_checksAgain) + + def _checksAgain(_): + return pp.doWork(commands.Echo, data=STRING + ).addCallback(lambda result: self.assertEquals(result['response'], STRING)) + + return pp.start( + ).addCallback(_checks + ).addCallback(lambda _: pp.stop()) + + def test_parentProtocolChange(self): + """ + Test that the father can use an AMP protocol too. + """ + DATA = "CIAO" + APPEND = "123" + + class Parent(amp.AMP): + def pong(self, data): + return {'response': DATA+APPEND} + Pong.responder(pong) + + pp = pool.ProcessPool(ampChild=Child, ampParent=Parent) + def _checks(_): + return pp.doWork(Ping, data=DATA + ).addCallback(lambda response: + self.assertEquals(response['response'], DATA+APPEND) + ) + + return pp.start().addCallback(_checks).addCallback(lambda _: pp.stop()) + + + def test_deferToAMPProcess(self): + """ + Test that deferToAMPProcess works as expected. + """ + def cleanupGlobalPool(): + d = pool.pp.stop() + pool.pp = None + return d + self.addCleanup(cleanupGlobalPool) + + STRING = "CIAOOOO" + d = pool.deferToAMPProcess(commands.Echo, data=STRING) + d.addCallback(self.assertEquals, {"response": STRING}) + return d + + def test_checkStateInPool(self): + """ + Test that busy and ready lists are correctly maintained. + """ + pp = pool.ProcessPool(ampChild=WaitingChild) + + DATA = "foobar" + + def _checks(_): + d = pp.callRemote(First, data=DATA) + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.min) + self.assertEquals(len(pp._finishCallbacks), pp.min) + self.assertEquals(len(pp.ready), pp.min-1) + self.assertEquals(len(pp.busy), 1) + child = pp.busy.pop() + pp.busy.add(child) + child.callRemote(Second) + return d + + return pp.start( + ).addCallback(_checks + ).addCallback(lambda _: pp.stop()) + + def test_growingToMax(self): + """ + Test that the pool grows over time until it reaches max processes. + """ + MAX = 5 + pp = pool.ProcessPool(ampChild=WaitingChild, min=1, max=MAX) + + def _checks(_): + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.min) + self.assertEquals(len(pp._finishCallbacks), pp.min) + + D = "DATA" + d = [pp.doWork(First, data=D) for x in xrange(MAX)] + + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.max) + self.assertEquals(len(pp._finishCallbacks), pp.max) + + [child.callRemote(Second) for child in pp.processes] + return defer.DeferredList(d) + + return pp.start( + ).addCallback(_checks + ).addCallback(lambda _: pp.stop()) + + def test_growingToMaxAndShrinking(self): + """ + Test that the pool grows but after 'idle' time the number of + processes goes back to the minimum. + """ + + MAX = 5 + MIN = 1 + IDLE = 1 + pp = pool.ProcessPool(ampChild=WaitingChild, min=MIN, max=MAX, maxIdle=IDLE) + + def _checks(_): + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.min) + self.assertEquals(len(pp._finishCallbacks), pp.min) + + D = "DATA" + d = [pp.doWork(First, data=D) for x in xrange(MAX)] + + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.max) + self.assertEquals(len(pp._finishCallbacks), pp.max) + + [child.callRemote(Second) for child in pp.processes] + return defer.DeferredList(d).addCallback(_realChecks) + + def _realChecks(_): + from twisted.internet import reactor + d = defer.Deferred() + def _cb(): + def __(_): + try: + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.min) + self.assertEquals(len(pp._finishCallbacks), pp.min) + d.callback(None) + except Exception, e: + d.errback(e) + return pp._pruneProcesses().addCallback(__) + # just to be shure we are called after the pruner + pp.looping.stop() # stop the looping, we don't want it to + # this right here + reactor.callLater(IDLE, _cb) + return d + + return pp.start( + ).addCallback(_checks + ).addCallback(lambda _: pp.stop()) + + def test_recycling(self): + """ + Test that after a given number of calls subprocesses are + recycled. + """ + MAX = 1 + MIN = 1 + RECYCLE_AFTER = 1 + pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER) + self.addCleanup(pp.stop) + + def _checks(_): + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.min) + self.assertEquals(len(pp._finishCallbacks), pp.min) + return pp.doWork(Pid + ).addCallback(lambda response: response['pid']) + + def _checks2(pid): + return pp.doWork(Pid + ).addCallback(lambda response: response['pid'] + ).addCallback(self.assertNotEquals, pid) + + + d = pp.start() + d.addCallback(_checks) + d.addCallback(_checks2) + return d + + def test_recyclingWithQueueOverload(self): + """ + Test that we get the correct number of different results when + we overload the pool of calls. + """ + MAX = 5 + MIN = 1 + RECYCLE_AFTER = 10 + CALLS = 60 + pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER) + self.addCleanup(pp.stop) + + def _check(results): + s = set() + for succeed, response in results: + s.add(response['pid']) + + # For the first C{MAX} calls, each is basically guaranteed to go to + # a different child. After that, though, there are no guarantees. + # All the rest might go to a single child, since the child to + # perform a job is selected arbitrarily from the "ready" set. Fair + # distribution of jobs needs to be implemented; right now it's "set + # ordering" distribution of jobs. + self.assertTrue(len(s) > MAX) + + def _work(_): + l = [pp.doWork(Pid) for x in xrange(CALLS)] + d = defer.DeferredList(l) + return d.addCallback(_check) + d = pp.start() + d.addCallback(_work) + return d + + + def test_disableProcessRecycling(self): + """ + Test that by setting 0 to recycleAfter we actually disable process recycling. + """ + MAX = 1 + MIN = 1 + RECYCLE_AFTER = 0 + pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER) + + def _checks(_): + self.assertEquals(pp.started, True) + self.assertEquals(pp.finished, False) + self.assertEquals(len(pp.processes), pp.min) + self.assertEquals(len(pp._finishCallbacks), pp.min) + return pp.doWork(Pid + ).addCallback(lambda response: response['pid']) + + def _checks2(pid): + return pp.doWork(Pid + ).addCallback(lambda response: response['pid'] + ).addCallback(self.assertEquals, pid + ).addCallback(lambda _: pid) + + def finish(reason): + return pp.stop().addCallback(lambda _: reason) + + return pp.start( + ).addCallback(_checks + ).addCallback(_checks2 + ).addCallback(_checks2 + ).addCallback(finish) + + def test_changeChildrenReactor(self): + """ + Test that by passing the correct argument children change their + reactor type. + """ + MAX = 1 + MIN = 1 + FIRST = "select" + SECOND = "poll" + + def checkDefault(): + pp = pool.ProcessPool( + starter=main.ProcessStarter( + childReactor=FIRST, + packages=("twisted", "ampoule")), + ampChild=ReactorChild, min=MIN, max=MAX) + pp.start() + return pp.doWork(Reactor + ).addCallback(self.assertEquals, {'classname': "SelectReactor"} + ).addCallback(lambda _: pp.stop()) + + def checkPool(_): + pp = pool.ProcessPool( + starter=main.ProcessStarter( + childReactor=SECOND, + packages=("twisted", "ampoule")), + ampChild=ReactorChild, min=MIN, max=MAX) + pp.start() + return pp.doWork(Reactor + ).addCallback(self.assertEquals, {'classname': "PollReactor"} + ).addCallback(lambda _: pp.stop()) + + return checkDefault( + ).addCallback(checkPool) + try: + from select import poll + except ImportError: + test_changeChildrenReactor.skip = "This architecture doesn't support select.poll, I can't run this test" + + def test_commandsWithoutResponse(self): + """ + Test that if we send a command without a required answer we + actually don't have any problems. + """ + DATA = "hello" + pp = pool.ProcessPool(ampChild=NoResponseChild, min=1, max=1) + + def _check(_): + return pp.doWork(GetResponse + ).addCallback(self.assertEquals, {"response": DATA}) + + def _work(_): + return pp.doWork(NoResponse, arg=DATA) + + return pp.start( + ).addCallback(_work + ).addCallback(_check + ).addCallback(lambda _: pp.stop()) + + def test_SupplyChildArgs(self): + """Ensure that arguments for the child constructor are passed in.""" + pp = pool.ProcessPool(Writer, ampChildArgs=['body'], min=0) + def _check(result): + return pp.doWork(Write).addCallback( + self.assertEquals, {'response': 'body'}) + + return pp.start( + ).addCallback(_check + ).addCallback(lambda _: pp.stop()) + + def processTimeoutTest(self, timeout): + pp = pool.ProcessPool(WaitingChild, min=1, max=1) + + def _work(_): + d = pp.callRemote(First, data="ciao", _timeout=timeout) + self.assertFailure(d, error.ProcessTerminated) + return d + + return pp.start( + ).addCallback(_work + ).addCallback(lambda _: pp.stop()) + + def test_processTimeout(self): + """ + Test that a call that doesn't finish within the given timeout + time is correctly handled. + """ + return self.processTimeoutTest(1) + + def test_processTimeoutZero(self): + """ + Test that the process is correctly handled when the timeout is zero. + """ + return self.processTimeoutTest(0) + + def test_processDeadline(self): + pp = pool.ProcessPool(WaitingChild, min=1, max=1) + + def _work(_): + d = pp.callRemote(First, data="ciao", _deadline=reactor.seconds()) + self.assertFailure(d, error.ProcessTerminated) + return d + + return pp.start( + ).addCallback(_work + ).addCallback(lambda _: pp.stop()) + + def test_processBeforeDeadline(self): + pp = pool.ProcessPool(PidChild, min=1, max=1) + + def _work(_): + d = pp.callRemote(Pid, _deadline=reactor.seconds() + 10) + d.addCallback(lambda result: self.assertNotEqual(result['pid'], 0)) + return d + + return pp.start( + ).addCallback(_work + ).addCallback(lambda _: pp.stop()) + + def test_processTimeoutSignal(self): + """ + Test that a call that doesn't finish within the given timeout + time is correctly handled. + """ + pp = pool.ProcessPool(WaitingChild, min=1, max=1, + timeout_signal=SIGHUP) + + def _work(_): + d = pp.callRemote(First, data="ciao", _timeout=1) + d.addCallback(lambda d: self.fail()) + text = 'signal %d' % SIGHUP + d.addErrback( + lambda f: self.assertTrue(text in f.value[0], + '"%s" not in "%s"' % (text, f.value[0]))) + return d + + return pp.start( + ).addCallback(_work + ).addCallback(lambda _: pp.stop()) + + def test_processGlobalTimeout(self): + """ + Test that a call that doesn't finish within the given global + timeout time is correctly handled. + """ + pp = pool.ProcessPool(WaitingChild, min=1, max=1, timeout=1) + + def _work(_): + d = pp.callRemote(First, data="ciao") + self.assertFailure(d, error.ProcessTerminated) + return d + + return pp.start( + ).addCallback(_work + ).addCallback(lambda _: pp.stop()) diff --git a/src/utils/ampoule/test/test_proxy.py b/src/utils/ampoule/test/test_proxy.py new file mode 100644 index 0000000000..cea45ec5ac --- /dev/null +++ b/src/utils/ampoule/test/test_proxy.py @@ -0,0 +1,49 @@ +from twisted.internet import defer, reactor +from twisted.internet.protocol import ClientFactory +from twisted.trial import unittest +from twisted.protocols import amp + +from src.utils.ampoule import service, child, pool, main +from src.utils.ampoule.commands import Echo + +class ClientAMP(amp.AMP): + factory = None + def connectionMade(self): + if self.factory is not None: + self.factory.theProto = self + if hasattr(self.factory, 'onMade'): + self.factory.onMade.callback(None) + +class TestAMPProxy(unittest.TestCase): + def setUp(self): + """ + Setup the proxy service and the client connection to the proxy + service in order to run call through them. + + Inspiration comes from twisted.test.test_amp + """ + self.pp = pool.ProcessPool() + self.svc = service.AMPouleService(self.pp, child.AMPChild, 0, "") + self.svc.startService() + self.proxy_port = self.svc.server.getHost().port + self.clientFactory = ClientFactory() + self.clientFactory.protocol = ClientAMP + d = self.clientFactory.onMade = defer.Deferred() + self.clientConn = reactor.connectTCP("127.0.0.1", + self.proxy_port, + self.clientFactory) + self.addCleanup(self.clientConn.disconnect) + self.addCleanup(self.svc.stopService) + def setClient(_): + self.client = self.clientFactory.theProto + return d.addCallback(setClient) + + def test_forwardCall(self): + """ + Test that a call made from a client is correctly forwarded to + the process pool and the result is correctly reported. + """ + DATA = "hello" + return self.client.callRemote(Echo, data=DATA).addCallback( + self.assertEquals, {'response': DATA} + ) diff --git a/src/utils/ampoule/util.py b/src/utils/ampoule/util.py new file mode 100644 index 0000000000..738e5f2edd --- /dev/null +++ b/src/utils/ampoule/util.py @@ -0,0 +1,46 @@ +""" +some utilities +""" +import os +import sys +import __main__ + +from twisted.python.filepath import FilePath +from twisted.python.reflect import namedAny +# from twisted.python.modules import theSystemPath + +def findPackagePath(modulePath): + """ + Try to find the sys.path entry from a modulePath object, simultaneously + computing the module name of the targetted file. + """ + p = modulePath + l = [p.basename().split(".")[0]] + while p.parent() != p: + for extension in ['py', 'pyc', 'pyo', 'pyd', 'dll']: + sib = p.sibling("__init__."+extension) + if sib.exists(): + p = p.parent() + l.insert(0, p.basename()) + break + else: + return p.parent(), '.'.join(l) + + +def mainpoint(function): + """ + Decorator which declares a function to be an object's mainpoint. + """ + if function.__module__ == '__main__': + # OK time to run a function + p = FilePath(__main__.__file__) + p, mn = findPackagePath(p) + pname = p.path + if pname not in map(os.path.abspath, sys.path): + sys.path.insert(0, pname) + # Maybe remove the module's path? + exitcode = namedAny(mn+'.'+function.__name__)(sys.argv) + if exitcode is None: + exitcode = 0 + sys.exit(exitcode) + return function diff --git a/src/utils/idmapper/EVENNIA.txt b/src/utils/idmapper/EVENNIA.txt new file mode 100644 index 0000000000..ae70a38e30 --- /dev/null +++ b/src/utils/idmapper/EVENNIA.txt @@ -0,0 +1,24 @@ + +IDMAPPER +-------- + +https://github.com/dcramer/django-idmapper + +IDmapper (actually Django-idmapper) implements a custom Django model +that is cached between database writes/read (SharedMemoryModel). It +not only lowers memory consumption but most importantly allows for +semi-persistance of properties on database model instances (something +not guaranteed for normal Django models). + +Evennia makes a few modifications to the original IDmapper routines +(we try to limit our modifications in order to make it easy to update +it from upstream down the line). + +- We change the caching from a WeakValueDictionary to a normal + dictionary. This is done because we use the models as semi- + persistent storage while the server was running. In some situations + the models would run out of scope and the WeakValueDictionary + then allowed them to be garbage collected. With this change they + are guaranteed to remain (which is good for persistence but + potentially bad for memory consumption). +- We add some caching/reset hooks called from the server side. diff --git a/src/utils/idmapper/base.py b/src/utils/idmapper/base.py index 1c30344019..25c7b7785b 100755 --- a/src/utils/idmapper/base.py +++ b/src/utils/idmapper/base.py @@ -128,6 +128,11 @@ class SharedMemoryModel(Model): cls.__instance_cache__ = {} #WeakValueDictionary() flush_instance_cache = classmethod(flush_instance_cache) + def save(cls, *args, **kwargs): + "overload spot for saving" + super(SharedMemoryModel, cls).save(*args, **kwargs) + + # Use a signal so we make sure to catch cascades. def flush_cache(**kwargs): for model in SharedMemoryModel.__subclasses__(): diff --git a/src/utils/utils.py b/src/utils/utils.py index d705a31c43..37e201e801 100644 --- a/src/utils/utils.py +++ b/src/utils/utils.py @@ -12,8 +12,14 @@ import textwrap import datetime import random from twisted.internet import threads +from django.contrib.contenttypes.models import ContentType from django.conf import settings +try: + import cPickle as pickle +except ImportError: + import pickle + ENCODINGS = settings.ENCODINGS def is_iter(iterable): @@ -415,6 +421,7 @@ def inherits_from(obj, parent): return any(1 for obj_path in obj_paths if obj_path == parent_path) + def format_table(table, extra_space=1): """ Takes a table of collumns: [[val,val,val,...], [val,val,val,...], ...] @@ -449,47 +456,184 @@ def format_table(table, extra_space=1): for icol, col in enumerate(table)]) return ftable -def run_async(async_func, *args, **kwargs): + +_FROM_MODEL_MAP = None +def to_pickle(obj, do_pickle=False): """ - This wrapper will use Twisted's asynchronous features to run a slow - function using a separate reactor thread. In effect this means that - the server will not be blocked while the slow process finish. + Prepares object for being pickled. This will remap database models + into an intermediary format, making them easily retrievable later. + + obj - a python object to prepare for pickling + do_pickle - actually pickle the object as well + + Database + + + """ + # prepare globals + global _DUMPS, _LOADS, _MODEL_MAP + if not _DUMPS: + _DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) + if not _LOADS: + _LOADS = lambda data: pickle.loads(to_str(data)) + if not _MODEL_MAP: + _FROM_MODEL_MAP = dict((c.model, c.natural_key()) for c in ContentType.objects.all()) + +_TO_MODEL_MAP = None +def from_pickle(obj, do_pickle=False): + """ + Converts back from a data stream prepared with to_pickle. This will + re-acquire database objects stored in the special format. + + obj - an object or a pickle, as indicated by the do_pickle flag + do_pickle - actually unpickle the input before continuing + """ + # prepare globals + global _DUMPS, _LOADS, _MODEL_MAP + if not _DUMPS: + _DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) + if not _LOADS: + _LOADS = lambda data: pickle.loads(to_str(data)) + if not _MODEL_MAP: + _TO_MODEL_MAP = dict((c.natural_key(), c.model_class()) for c in ContentType.objects.all()) + + +_PPOOL = None +_PCMD = None +_DUMPS = None +_LOADS = None +_MODEL_MAP = None +def run_async(to_execute, *args, **kwargs): + """ + Runs a function or executes a code snippet asynchronously. + + Inputs: + to_execute (callable or string) - if a callable, this function + will be executed in a separate thread, using the + *args/**kwargs as input. + If a string, this string must be a source snippet. + This string will executed using the ProcPool is + enabled, if not this will raise a RunTimeError. + *args - if to_execute is a callable, these args will be used + as arguments for that function. If to_execute is a string + *args are not used. + *kwargs - if to_execute is a callable, these kwargs will be used + as keyword arguments in that function. If a string, they + instead are used to define the executable environment + that should be available to execute the code in to_execute. + + There are two special (optional) kwargs. These are available + both if to_execute is a callable or a source string. + 'at_return' -should point to a callable with one argument. + It will be called with the return value from + to_execute. + 'at_return_kwargs' - this dictionary which be used as keyword + arguments to the at_return callback. + 'at_err' - this will be called with a Failure instance if + there is an error in to_execute. + 'at_err_kwargs' - this dictionary will be used as keyword + arguments to the at_err errback. + + run_async will either relay the code to a thread or to a processPool + depending on input and what is available in the system. To activate + Process pooling, settings.PROCPOOL_ENABLE must be set. + + to_execute in string form should handle all imports needed. kwargs + can be used to send objects and properties. Such properties will + be pickled, except Database Objects which will be sent across + on a special format and re-loaded on the other side. + + To get a return value from your code snippet, Use the _return() + function: Every call to this function from your snippet will + append the argument to an internal list of returns. This return value + (or a list) will be the first argument to the at_return callback. Use this function with restrain and only for features/commands that you know has no influence on the cause-and-effect order of your game (commands given after the async function might be executed before - it has finished). Accessing the same property from different threads can - lead to unpredicted behaviour if you are not careful (this is called a + it has finished). Accessing the same property from different threads/processes + can lead to unpredicted behaviour if you are not careful (this is called a "race condition"). Also note that some databases, notably sqlite3, don't support access from multiple threads simultaneously, so if you do heavy database access from - your async_func under sqlite3 you will probably run very slow or even get + your to_execute under sqlite3 you will probably run very slow or even get tracebacks. - arg: - async_func - function that should be run asynchroneously - - reserved keywords: - at_return(r) - if given, this function will be called when async_func returns - value r at the end of a successful execution - at_err(e) - if given, this function is called if async_func fails with an exception e. - use e.trap(ExceptionType1, ExceptionType2) - - all other arguments/keywords will be used as args/kwargs fro async_func. - """ - # create deferred object + # handle all global imports. + global _PPOOL, _PCMD, _DUMPS, _LOADS, _MODEL_MAP + if _PPOOL == None: + # Try to load process Pool + from src.server.sessionhandler import SESSIONS as _SESSIONS + try: + _PPOOL = _SESSIONS.server.services.namedServices.get("ProcPool").pool + except AttributeError: + _PPOOL = False + if not _PCMD: + from src.server.procpool import ExecuteCode as _PCMD + if not _DUMPS: + _DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) + if not _LOADS: + _LOADS = lambda data: pickle.loads(to_str(data)) + if not _MODEL_MAP: + _MODEL_MAP = dict((c.model, c.natural_key()) for c in ContentType.objects.all()) - deferred = threads.deferToThread(async_func, *args, **kwargs) - if "at_return" in kwargs: - deferred.addCallback(kwargs["at_return"]) - if "at_err" in kwargs: - deferred.addErrback(kwargs["at_err"]) - # always add a logging errback as a last catch + # determine callbacks/errbacks def default_errback(e): from src.utils import logger logger.log_trace(e) + def convert_return(f): + def func(ret): + rval = ret["response"] and _LOADS(ret["response"]) + if f: return f(rval) + else: return rval + return func + + callback = convert_return(kwargs.pop("at_return", None)) + errback = kwargs.pop("at_err", None) + callback_kwargs = kwargs.pop("at_return_kwargs", {}) + errback_kwargs = kwargs.pop("at_err_kwargs", {}) + + if not callable(to_execute) and _PPOOL: + # run source code in process pool + if to_execute == "Echo": + # testing - addCallback set externally + from src.utils.ampoule.commands import Echo as to_execute + deferred = _PPOOL.doWork(to_execute, **{"data":args[0]}) + else: + cmdargs = {"source":to_str(to_execute)} + to_pickle = {"normal":{}, "objs":{}} + for key, val in kwargs.items(): + if hasattr(val, "dbobj"): + val = val.dbobj + natural_key = _MODEL_MAP.get(hasattr(val, "id") and \ + hasattr(val, '__class__') and \ + val.__class__.__name__.lower()) + if natural_key: + # a database object. Store natural_key (a tuple) along with the objs id. + to_pickle["objs"][key] = (natural_key, val.id) + else: + to_pickle["normal"][key] = val + if to_pickle["normal"] or to_pickle["objs"]: + cmdargs["environment"] = _DUMPS(to_pickle) + else: + cmdargs["environment"] = "" + # defer to process pool + deferred = _PPOOL.doWork(_PCMD, **cmdargs) + elif callable(to_execute): + # no process pool available, or we gave an explicit function and not code. Use threading. + deferred = threads.deferToThread(to_execute, *args, **kwargs) + else: + # no appropriate input + raise RuntimeError("'%s' could not be handled by run_async" % to_execute) + + # attach callbacks + if callback: + deferred.addCallback(callback, **callback_kwargs) + if errback: + deferred.addCallback(errback, **errback_kwargs) + # always add a logging errback as a last catch deferred.addErrback(default_errback) @@ -548,6 +692,7 @@ def check_evennia_dependencies(): if settings.IRC_ENABLED: try: import twisted.words + twisted.words # set to avoid debug info about not-used import except ImportError: errstring += "\n ERROR: IRC is enabled, but twisted.words is not installed. Please install it." errstring += "\n Linux Debian/Ubuntu users should install package 'python-twisted-words', others"