diff --git a/contrib/evlang/evlang.py b/contrib/evlang/evlang.py index b1e91d7dfa..aaed327b44 100644 --- a/contrib/evlang/evlang.py +++ b/contrib/evlang/evlang.py @@ -84,9 +84,8 @@ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspa from django.core.management import setup_environ from game import settings setup_environ(settings) -from src.utils.utils import run_async +#from src.utils.utils import run_async as thread_run_async -_PROCPOOL_ENABLED = settings.PROCPOOL_ENABLED _LOGGER = None #------------------------------------------------------------ @@ -808,37 +807,42 @@ def validate_code(codestring): raise LimitedExecCodeException(codestring, checker.errors) return True -def limited_exec(code, context = {}, timeout_secs=2, retobj=None): +def limited_exec(code, context = {}, timeout_secs=2, retobj=None, procpool_async=None): """ Validate source code and make sure it contains no unauthorized expression/statements as configured via 'UNALLOWED_AST_NODES' and 'UNALLOWED_BUILTINS'. By default this means that code is not allowed import modules or access dangerous builtins like 'open' or - 'eval'. If code is considered 'safe' it will be executed via - 'exec' using 'context' as the global environment. More details on - how code is executed can be found in the Python Reference Manual - section 6.14 (ignore the remark on '__builtins__'). The 'context' - enviroment is also validated and is not allowed to contain modules - or builtins. The following exception will be raised on errors: + 'eval'. - if 'context' contains unallowed objects = + code - code to execute. Will be evaluated for safety + context - if code is deemed safe, code will execute with this environment + time_out_secs - only used if procpool_async is given. Sets timeout + for remote code execution + retobj - only used if procpool_async is also given. Defines an Object + (which must define a msg() method), for receiving returns from + the execution. + procpool_async - a run_async function alternative to the one in src.utils.utils. + this must accept the keywords + proc_timeout (will be set to timeout_secs + at_return - a callback + at_err - an errback + If retobj is given, at_return/at_err will be created and + set to msg callbacks and errors to that object. + Tracebacks: LimitedExecContextException - - if code didn't validate and is considered 'unsafe' = LimitedExecCodeException - - if code did not execute within the given timelimit = - LimitedExecTimeoutException """ if validate_context(context) and validate_code(code): # run code only after validation has completed - if _PROCPOOL_ENABLED: + if procpool_async: + # custom run_async if retobj: callback = lambda r: retobj.msg(r) errback = lambda e: retobj.msg(e) - run_async(code, *context, proc_timeout=timeout_secs, at_return=callback, at_err=errback) + procpool_async(code, *context, proc_timeout=timeout_secs, at_return=callback, at_err=errback) else: - run_async(code, *context, proc_timeout=timeout_secs) + procpool_async(code, *context, proc_timeout=timeout_secs) else: # run in-process exec code in context diff --git a/src/utils/ampoule/COPYING.txt b/contrib/procpools/ampoule/COPYING.txt similarity index 100% rename from src/utils/ampoule/COPYING.txt rename to contrib/procpools/ampoule/COPYING.txt diff --git a/src/utils/ampoule/EVENNIA.txt b/contrib/procpools/ampoule/EVENNIA.txt similarity index 100% rename from src/utils/ampoule/EVENNIA.txt rename to contrib/procpools/ampoule/EVENNIA.txt diff --git a/src/utils/ampoule/__init__.py b/contrib/procpools/ampoule/__init__.py similarity index 100% rename from src/utils/ampoule/__init__.py rename to contrib/procpools/ampoule/__init__.py diff --git a/src/utils/ampoule/child.py b/contrib/procpools/ampoule/child.py similarity index 89% rename from src/utils/ampoule/child.py rename to contrib/procpools/ampoule/child.py index c9f2a08da9..d1fade960b 100644 --- a/src/utils/ampoule/child.py +++ b/contrib/procpools/ampoule/child.py @@ -1,7 +1,14 @@ +""" +This defines the the parent for all subprocess children. + +Inherit from this to define a new type of subprocess. + +""" + from twisted.python import log from twisted.internet import error from twisted.protocols import amp -from src.utils.ampoule.commands import Echo, Shutdown, Ping +from contrib.procpools.ampoule.commands import Echo, Shutdown, Ping class AMPChild(amp.AMP): def __init__(self): diff --git a/src/utils/ampoule/commands.py b/contrib/procpools/ampoule/commands.py similarity index 100% rename from src/utils/ampoule/commands.py rename to contrib/procpools/ampoule/commands.py diff --git a/src/utils/ampoule/iampoule.py b/contrib/procpools/ampoule/iampoule.py similarity index 100% rename from src/utils/ampoule/iampoule.py rename to contrib/procpools/ampoule/iampoule.py diff --git a/src/utils/ampoule/main.py b/contrib/procpools/ampoule/main.py similarity index 99% rename from src/utils/ampoule/main.py rename to contrib/procpools/ampoule/main.py index 965e4a5b58..9aec2817a4 100644 --- a/src/utils/ampoule/main.py +++ b/contrib/procpools/ampoule/main.py @@ -11,7 +11,7 @@ from twisted.protocols import amp from twisted.python import runtime from twisted.python.compat import set -from src.utils.ampoule import iampoule +from contrib.procpools.ampoule import iampoule gen = itertools.count() diff --git a/src/utils/ampoule/pool.py b/contrib/procpools/ampoule/pool.py similarity index 99% rename from src/utils/ampoule/pool.py rename to contrib/procpools/ampoule/pool.py index 1c84c35560..d90921dad0 100644 --- a/src/utils/ampoule/pool.py +++ b/contrib/procpools/ampoule/pool.py @@ -11,7 +11,7 @@ pop = heapq.heappop from twisted.internet import defer, task, error from twisted.python import log, failure -from src.utils.ampoule import commands, main +from contrib.procpools.ampoule import commands, main try: DIE = signal.SIGKILL @@ -66,7 +66,7 @@ class ProcessPool(object): self.ampParent = ampParent self.ampChild = ampChild if ampChild is None: - from src.utils.ampoule.child import AMPChild + from contrib.procpools.ampoule.child import AMPChild self.ampChild = AMPChild self.min = min self.max = max diff --git a/src/utils/ampoule/rpool.py b/contrib/procpools/ampoule/rpool.py similarity index 97% rename from src/utils/ampoule/rpool.py rename to contrib/procpools/ampoule/rpool.py index e99e864b9d..642b988352 100644 --- a/src/utils/ampoule/rpool.py +++ b/contrib/procpools/ampoule/rpool.py @@ -1,10 +1,8 @@ """ This module implements a remote pool to use with AMP. """ -from zope.interface import implements from twisted.protocols import amp -from twisted.internet import utils class AMPProxy(amp.AMP): """ diff --git a/src/utils/ampoule/service.py b/contrib/procpools/ampoule/service.py similarity index 92% rename from src/utils/ampoule/service.py rename to contrib/procpools/ampoule/service.py index 7b24d6fb33..bded0ac990 100644 --- a/src/utils/ampoule/service.py +++ b/contrib/procpools/ampoule/service.py @@ -9,8 +9,8 @@ def makeService(options): """ ms = service.MultiService() - from src.utils.ampoule.pool import ProcessPool - from src.utils.ampoule.main import ProcessStarter + from contrib.procpools.ampoule.pool import ProcessPool + from contrib.procpools.ampoule.main import ProcessStarter name = options['name'] ampport = options['ampport'] ampinterface = options['ampinterface'] @@ -43,7 +43,7 @@ class AMPouleService(service.Service): Before reactor.run() is called we setup the system. """ service.Service.startService(self) - from src.utils.ampoule import rpool + from contrib.procpools.ampoule import rpool from twisted.internet import reactor try: diff --git a/src/utils/ampoule/test/__init__.py b/contrib/procpools/ampoule/test/__init__.py similarity index 100% rename from src/utils/ampoule/test/__init__.py rename to contrib/procpools/ampoule/test/__init__.py diff --git a/src/utils/ampoule/test/test_process.py b/contrib/procpools/ampoule/test/test_process.py similarity index 99% rename from src/utils/ampoule/test/test_process.py rename to contrib/procpools/ampoule/test/test_process.py index 3569ed6c3d..b35de99813 100644 --- a/src/utils/ampoule/test/test_process.py +++ b/contrib/procpools/ampoule/test/test_process.py @@ -10,7 +10,7 @@ from twisted.internet import error, defer, reactor from twisted.python import failure, reflect from twisted.trial import unittest from twisted.protocols import amp -from src.utils.ampoule import main, child, commands, pool +from contrib.procpools.ampoule import main, child, commands, pool class ShouldntHaveBeenCalled(Exception): pass diff --git a/src/utils/ampoule/test/test_proxy.py b/contrib/procpools/ampoule/test/test_proxy.py similarity index 93% rename from src/utils/ampoule/test/test_proxy.py rename to contrib/procpools/ampoule/test/test_proxy.py index cea45ec5ac..0ba09b2dbf 100644 --- a/src/utils/ampoule/test/test_proxy.py +++ b/contrib/procpools/ampoule/test/test_proxy.py @@ -3,8 +3,8 @@ from twisted.internet.protocol import ClientFactory from twisted.trial import unittest from twisted.protocols import amp -from src.utils.ampoule import service, child, pool, main -from src.utils.ampoule.commands import Echo +from contrib.procpools.ampoule import service, child, pool, main +from contrib.procpools.ampoule.commands import Echo class ClientAMP(amp.AMP): factory = None diff --git a/src/utils/ampoule/util.py b/contrib/procpools/ampoule/util.py similarity index 100% rename from src/utils/ampoule/util.py rename to contrib/procpools/ampoule/util.py diff --git a/contrib/procpools/python_procpool.py b/contrib/procpools/python_procpool.py new file mode 100644 index 0000000000..da2bc35c6e --- /dev/null +++ b/contrib/procpools/python_procpool.py @@ -0,0 +1,294 @@ +""" +ProcPool + +This module implements and handles processes running under the AMPoule +pool. The ProcPool can accept data from processes and runs them in a +dynamically changing pool of processes, talking to them over AMP. This +offers full asynchronous operation (Python threading does not work as +well for this). + +The ExecuteCode command found here is used by src.utils.utils.run_async() +to launch snippets of code on the process pool. The pool itself is a +service named "Process Pool" and is controlled from src/server/server.py. +It can be customized via settings.PROCPOOL_* + +""" + +from twisted.protocols import amp +from twisted.internet import threads +from contrib.procpools.ampoule.child import AMPChild +from src.utils.utils import to_pickle, from_pickle, clean_object_caches, to_str +from src.utils import logger +from src import PROC_MODIFIED_OBJS + +# +# Multiprocess command for communication Server<->Client, relaying +# data for remote Python execution +# + +class ExecuteCode(amp.Command): + """ + Executes python code in the python process, + returning result when ready. + + source - a compileable Python source code string + environment - a pickled dictionary of Python + data. Each key will become the name + of a variable available to the source + code. Database objects are stored on + the form ((app, modelname), id) allowing + the receiver to easily rebuild them on + this side. + errors - an all-encompassing error handler + response - a string or a pickled string + + """ + arguments = [('source', amp.String()), + ('environment', amp.String())] + errors = [(Exception, 'EXCEPTION')] + response = [('response', amp.String()), + ('recached', amp.String())] + +# +# Multiprocess AMP client-side factory, for executing remote Python code +# + +class PythonProcPoolChild(AMPChild): + """ + This is describing what happens on the subprocess side. + + This already supports Echo, Shutdown and Ping. + + Methods: + executecode - a remote code execution environment + + """ + def executecode(self, source, environment): + """ + Remote code execution + + source - Python code snippet + environment - pickled dictionary of environment + variables. They are stored in + two keys "normal" and "objs" where + normal holds a dictionary of + normally pickled python objects + wheras objs points to a dictionary + of database represenations ((app,key),id). + + The environment's entries will be made available as + local variables during the execution. Normal eval + results will be returned as-is. For more complex + code snippets (run by exec), the _return function + is available: All data sent to _return(retval) will + be returned from this system whenever the system + finishes. Multiple calls to _return will result in + a list being return. The return value is pickled + and thus allows for returning any pickleable data. + + """ + + class Ret(object): + "Helper class for holding returns from exec" + def __init__(self): + self.returns = [] + def __call__(self, *args, **kwargs): + self.returns.extend(list(args)) + def get_returns(self): + lr = len(self.returns) + if lr == 0: + return "" + elif lr == 1: + return to_pickle(self.returns[0], emptypickle=False) or "" + else: + return to_pickle(self.returns, emptypickle=False) or "" + _return = Ret() + + + available_vars = {'_return':_return} + if environment: + # load environment + try: + environment = from_pickle(environment) + available_vars.update(environment) + except Exception, e: + logger.log_trace() + _return(e) + + # try to execute with eval first + try: + ret = eval(source, {}, available_vars) + ret = _return.get_returns() or to_pickle(ret, emptypickle=False) or "" + except Exception: + # use exec instead + exec source in available_vars + ret = _return.get_returns() + # get the list of affected objects to recache + objs = list(set(PROC_MODIFIED_OBJS)) + # we need to include the locations too, to update their content caches + objs = objs + list(set([o.location for o in objs if hasattr(o, "location") and o.location])) + #print "objs:", objs + #print "to_pickle", to_pickle(objs, emptypickle=False, do_pickle=False) + to_recache = to_pickle(objs, emptypickle=False) or "" + # empty the list without loosing memory reference + PROC_MODIFIED_OBJS[:] = [] + print "... executecode done." + return {'response': ret, + 'recached': to_recache} + ExecuteCode.responder(executecode) + + + +_PPOOL = None +_SESSIONS = None +_PROC_ERR = "A process has ended with a probable error condition: process ended by signal 9." + +def run_async(to_execute, *args, **kwargs): + """ + Runs a function or executes a code snippet asynchronously. + + Inputs: + to_execute (callable) - if this is a callable, it will + be executed with *args and non-reserver *kwargs as + arguments. + The callable will be executed using ProcPool, or in + a thread if ProcPool is not available. + to_execute (string) - this is only available is ProcPool is + running. If a string, to_execute this will be treated as a code + snippet to execute asynchronously. *args are then not used + and non-reserverd *kwargs are used to define the execution + environment made available to the code. + + reserved kwargs: + 'use_thread' (bool) - this only works with callables (not code). + It forces the code to run in a thread instead + of using the Process Pool, even if the latter + is available. This could be useful if you want + to make sure to not get out of sync with the + main process (such as accessing in-memory global + properties) + 'proc_timeout' (int) - only used if ProcPool is available. Sets a + max time for execution. This alters the value set + by settings.PROCPOOL_TIMEOUT + 'at_return' -should point to a callable with one argument. + It will be called with the return value from + to_execute. + 'at_return_kwargs' - this dictionary which be used as keyword + arguments to the at_return callback. + 'at_err' - this will be called with a Failure instance if + there is an error in to_execute. + 'at_err_kwargs' - this dictionary will be used as keyword + arguments to the at_err errback. + 'procpool_name' - the Service name of the procpool to use. + Default is PythonProcPool. + + *args - if to_execute is a callable, these args will be used + as arguments for that function. If to_execute is a string + *args are not used. + *kwargs - if to_execute is a callable, these kwargs will be used + as keyword arguments in that function. If a string, they + instead are used to define the executable environment + that should be available to execute the code in to_execute. + + run_async will either relay the code to a thread or to a processPool + depending on input and what is available in the system. To activate + Process pooling, settings.PROCPOOL_ENABLE must be set. + + to_execute in string form should handle all imports needed. kwargs + can be used to send objects and properties. Such properties will + be pickled, except Database Objects which will be sent across + on a special format and re-loaded on the other side. + + To get a return value from your code snippet, Use the _return() + function: Every call to this function from your snippet will + append the argument to an internal list of returns. This return value + (or a list) will be the first argument to the at_return callback. + + Use this function with restrain and only for features/commands + that you know has no influence on the cause-and-effect order of your + game (commands given after the async function might be executed before + it has finished). Accessing the same property from different threads/processes + can lead to unpredicted behaviour if you are not careful (this is called a + "race condition"). + + Also note that some databases, notably sqlite3, don't support access from + multiple threads simultaneously, so if you do heavy database access from + your to_execute under sqlite3 you will probably run very slow or even get + tracebacks. + + """ + # handle all global imports. + global _PPOOL, _SESSIONS + + # get the procpool name, if set in kwargs + procpool_name = kwargs.get("procpool_name", "PythonProcPool") + + if _PPOOL == None: + # Try to load process Pool + from src.server.sessionhandler import SESSIONS as _SESSIONS + try: + _PPOOL = _SESSIONS.server.services.namedServices.get(procpool_name).pool + except AttributeError: + _PPOOL = False + + use_timeout = kwargs.pop("proc_timeout", None) + + # helper converters for callbacks/errbacks + def convert_return(f): + def func(ret, *args, **kwargs): + rval = ret["response"] and from_pickle(ret["response"]) + reca = ret["recached"] and from_pickle(ret["recached"]) + # recache all indicated objects + [clean_object_caches(obj) for obj in reca] + if f: return f(rval, *args, **kwargs) + else: return rval + return func + def convert_err(f): + def func(err, *args, **kwargs): + err.trap(Exception) + err = err.getErrorMessage() + if use_timeout and err == _PROC_ERR: + err = "Process took longer than %ss and timed out." % use_timeout + if f: + return f(err, *args, **kwargs) + else: + err = "Error reported from subprocess: '%s'" % err + logger.log_errmsg(err) + return func + + # handle special reserved input kwargs + use_thread = kwargs.pop("use_thread", False) + callback = convert_return(kwargs.pop("at_return", None)) + errback = convert_err(kwargs.pop("at_err", None)) + callback_kwargs = kwargs.pop("at_return_kwargs", {}) + errback_kwargs = kwargs.pop("at_err_kwargs", {}) + + if _PPOOL and not use_thread: + # process pool is running + if isinstance(to_execute, basestring): + # run source code in process pool + cmdargs = {"_timeout":use_timeout} + cmdargs["source"] = to_str(to_execute) + cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or "" + # defer to process pool + deferred = _PPOOL.doWork(ExecuteCode, **cmdargs) + elif callable(to_execute): + # execute callable in process + callname = to_execute.__name__ + cmdargs = {"_timeout":use_timeout} + cmdargs["source"] = "_return(%s(*args,**kwargs))" % callname + cmdargs["environment"] = to_pickle({callname:to_execute, "args":args, "kwargs":kwargs}) + deferred = _PPOOL.doWork(ExecuteCode, **cmdargs) + else: + raise RuntimeError("'%s' could not be handled by the process pool" % to_execute) + elif callable(to_execute): + # no process pool available, fall back to old deferToThread mechanism. + deferred = threads.deferToThread(to_execute, *args, **kwargs) + else: + # no appropriate input for this server setup + raise RuntimeError("'%s' could not be handled by run_async - no valid input or no process pool." % to_execute) + + # attach callbacks + if callback: + deferred.addCallback(callback, **callback_kwargs) + deferred.addErrback(errback, **errback_kwargs) diff --git a/src/objects/models.py b/src/objects/models.py index cf54e14fc4..77837519d6 100644 --- a/src/objects/models.py +++ b/src/objects/models.py @@ -276,7 +276,6 @@ class ObjectDB(TypedObject): "Setter. Allows for self.location = location" try: old_loc = _GA(self, "location") - if ObjectDB.objects.dbref(location): # dbref search loc = ObjectDB.objects.dbref_search(location) diff --git a/src/server/portal.py b/src/server/portal.py index 34ce5bd585..8e2731ff03 100644 --- a/src/server/portal.py +++ b/src/server/portal.py @@ -7,7 +7,6 @@ sets up all the networking features. (this is done automatically by game/evennia.py). """ -import time import sys import os if os.name == 'nt': @@ -19,10 +18,10 @@ from twisted.application import internet, service from twisted.internet import protocol, reactor from twisted.web import server, static from django.conf import settings -from src.utils.utils import get_evennia_version, mod_import +from src.utils.utils import get_evennia_version, mod_import, make_iter from src.server.sessionhandler import PORTAL_SESSIONS -PORTAL_SERVICES_PLUGIN_MODULE = mod_import(settings.PORTAL_SERVICES_PLUGIN_MODULE) +PORTAL_SERVICES_PLUGIN_MODULES = [mod_import(module) for module in make_iter(settings.PORTAL_SERVICES_PLUGIN_MODULES)] if os.name == 'nt': # For Windows we need to handle pid files manually. @@ -287,9 +286,9 @@ if WEBSERVER_ENABLED: webserver.setName('EvenniaWebServer%s' % pstring) PORTAL.services.addService(webserver) -if PORTAL_SERVICES_PLUGIN_MODULE: +for plugin_module in PORTAL_SERVICES_PLUGIN_MODULES: # external plugin services to start - PORTAL_SERVICES_PLUGIN_MODULE.start_plugin_services(PORTAL) + plugin_module.start_plugin_services(PORTAL) if os.name == 'nt': diff --git a/src/server/procpool.py b/src/server/procpool.py deleted file mode 100644 index b1c49215a9..0000000000 --- a/src/server/procpool.py +++ /dev/null @@ -1,134 +0,0 @@ -""" -ProcPool - -This module implements and handles processes running under the AMPoule -pool. The ProcPool can accept data from processes and runs them in a -dynamically changing pool of processes, talking to them over AMP. This -offers full asynchronous operation (Python threading does not work as -well for this). - -The ExecuteCode command found here is used by src.utils.utils.run_async() -to launch snippets of code on the process pool. The pool itself is a -service named "Process Pool" and is controlled from src/server/server.py. -It can be customized via settings.PROCPOOL_* - -""" - -from twisted.protocols import amp -from src.utils.ampoule.child import AMPChild -from src.utils.utils import to_pickle, from_pickle -from src import PROC_MODIFIED_OBJS - -# handle global setups -_LOGGER = None - -# Evennia multiprocess command - -class ExecuteCode(amp.Command): - """ - Executes python code in the python process, - returning result when ready. - - source - a compileable Python source code string - environment - a pickled dictionary of Python - data. Each key will become the name - of a variable available to the source - code. Database objects are stored on - the form ((app, modelname), id) allowing - the receiver to easily rebuild them on - this side. - errors - an all-encompassing error handler - response - a string or a pickled string - - """ - arguments = [('source', amp.String()), - ('environment', amp.String())] - errors = [(Exception, 'EXCEPTION')] - response = [('response', amp.String()), - ('recached', amp.String())] - - -# Evennia multiprocess child process template -class ProcPoolChild(AMPChild): - """ - This is describing what happens on the subprocess side. - - This already supports Echo, Shutdown and Ping. - - Methods: - executecode - a remote code execution environment - - """ - def executecode(self, source, environment): - """ - Remote code execution - - source - Python code snippet - environment - pickled dictionary of environment - variables. They are stored in - two keys "normal" and "objs" where - normal holds a dictionary of - normally pickled python objects - wheras objs points to a dictionary - of database represenations ((app,key),id). - - The environment's entries will be made available as - local variables during the execution. Normal eval - results will be returned as-is. For more complex - code snippets (run by exec), the _return function - is available: All data sent to _return(retval) will - be returned from this system whenever the system - finishes. Multiple calls to _return will result in - a list being return. The return value is pickled - and thus allows for returning any pickleable data. - - """ - - class Ret(object): - "Helper class for holding returns from exec" - def __init__(self): - self.returns = [] - def __call__(self, *args, **kwargs): - self.returns.extend(list(args)) - def get_returns(self): - lr = len(self.returns) - if lr == 0: - return "" - elif lr == 1: - return to_pickle(self.returns[0], emptypickle=False) or "" - else: - return to_pickle(self.returns, emptypickle=False) or "" - _return = Ret() - - available_vars = {'_return':_return} - if environment: - # load environment - try: - environment = from_pickle(environment) - except Exception: - global _LOGGER - if not _LOGGER: - from src.utils.logger import logger as _LOGGER - _LOGGER.log_trace("Could not find remote object") - available_vars.update(environment) - # try to execute with eval first - try: - ret = eval(source, {}, available_vars) - ret = _return.get_returns() or to_pickle(ret, emptypickle=False) or "" - except Exception: - # use exec instead - exec source in available_vars - ret = _return.get_returns() - # get the list of affected objects to recache - objs = list(set(PROC_MODIFIED_OBJS)) - # we need to include the locations too, to update their content caches - objs = objs + list(set([o.location for o in objs if hasattr(o, "location") and o.location])) - #print "objs:", objs - #print "to_pickle", to_pickle(objs, emptypickle=False, do_pickle=False) - to_recache = to_pickle(objs, emptypickle=False) or "" - # empty the list without loosing memory reference - PROC_MODIFIED_OBJS[:] = [] - return {'response': ret, - 'recached': to_recache} - ExecuteCode.responder(executecode) - diff --git a/src/server/server.py b/src/server/server.py index ab40682cd0..b2dc7f4c51 100644 --- a/src/server/server.py +++ b/src/server/server.py @@ -26,7 +26,7 @@ from src.scripts.models import ScriptDB from src.server.models import ServerConfig from src.server import initial_setup -from src.utils.utils import get_evennia_version, mod_import +from src.utils.utils import get_evennia_version, mod_import, make_iter from src.comms import channelhandler from src.server.sessionhandler import SESSIONS @@ -43,8 +43,7 @@ SERVER_RESTART = os.path.join(settings.GAME_DIR, 'server.restart') SERVER_STARTSTOP_MODULE = mod_import(settings.AT_SERVER_STARTSTOP_MODULE) # module containing plugin services -SERVER_SERVICES_PLUGIN_MODULE = mod_import(settings.SERVER_SERVICES_PLUGIN_MODULE) - +SERVER_SERVICES_PLUGIN_MODULES = [mod_import(module) for module in make_iter(settings.SERVER_SERVICES_PLUGIN_MODULES)] #------------------------------------------------------------ # Evennia Server settings @@ -57,19 +56,6 @@ AMP_ENABLED = True AMP_HOST = settings.AMP_HOST AMP_PORT = settings.AMP_PORT -PROCPOOL_ENABLED = settings.PROCPOOL_ENABLED -PROCPOOL_DEBUG = settings.PROCPOOL_DEBUG -PROCPOOL_MIN_NPROC = settings.PROCPOOL_MIN_NPROC -PROCPOOL_MAX_NPROC = settings.PROCPOOL_MAX_NPROC -PROCPOOL_TIMEOUT = settings.PROCPOOL_TIMEOUT -PROCPOOL_IDLETIME = settings.PROCPOOL_IDLETIME -PROCPOOL_HOST = settings.PROCPOOL_HOST -PROCPOOL_PORT = settings.PROCPOOL_PORT -PROCPOOL_INTERFACE = settings.PROCPOOL_INTERFACE -PROCPOOL_UID = settings.PROCPOOL_UID -PROCPOOL_GID = settings.PROCPOOL_GID -PROCPOOL_DIRECTORY = settings.PROCPOOL_DIRECTORY - # server-channel mappings IMC2_ENABLED = settings.IMC2_ENABLED IRC_ENABLED = settings.IRC_ENABLED @@ -227,9 +213,6 @@ class Evennia(object): """ print ' %(servername)s Server (%(version)s) started.' % {'servername': SERVERNAME, 'version': VERSION} print ' amp (to Portal): %s' % AMP_PORT - if PROCPOOL_ENABLED: - print ' amp (Process Pool): %s' % PROCPOOL_PORT - def set_restart_mode(self, mode=None): """ @@ -341,50 +324,6 @@ if AMP_ENABLED: amp_service.setName("EvenniaPortal") EVENNIA.services.addService(amp_service) -# The ampoule twisted extension manages asynchronous process pools -# via an AMP port. It can be used to offload expensive operations -# to another process asynchronously. - -if PROCPOOL_ENABLED: - - from src.utils.ampoule import main as ampoule_main - from src.utils.ampoule import service as ampoule_service - from src.utils.ampoule import pool as ampoule_pool - from src.utils.ampoule.main import BOOTSTRAP as _BOOTSTRAP - from src.server.procpool import ProcPoolChild - - # for some reason absolute paths don't work here, only relative ones. - apackages = ("twisted", - os.path.join(os.pardir, "src", "utils", "ampoule"), - os.path.join(os.pardir, "ev"), - os.path.join(os.pardir)) - aenv = {"DJANGO_SETTINGS_MODULE":"settings", - "DATABASE_NAME":settings.DATABASES.get("default", {}).get("NAME") or settings.DATABASE_NAME} - if PROCPOOL_DEBUG: - _BOOTSTRAP = _BOOTSTRAP % "log.startLogging(sys.stderr)" - else: - _BOOTSTRAP = _BOOTSTRAP % "" - - procpool_starter = ampoule_main.ProcessStarter(packages=apackages, - env=aenv, - path=PROCPOOL_DIRECTORY, - uid=PROCPOOL_UID, - gid=PROCPOOL_GID, - bootstrap=_BOOTSTRAP, - childReactor=os.name == 'nt' and "select" or "epoll") - procpool = ampoule_pool.ProcessPool(name="ProcPool", - min=PROCPOOL_MIN_NPROC, - max=PROCPOOL_MAX_NPROC, - recycleAfter=500, - ampChild=ProcPoolChild, - starter=procpool_starter) - procpool_service = ampoule_service.AMPouleService(procpool, - ProcPoolChild, - PROCPOOL_PORT, - PROCPOOL_INTERFACE) - procpool_service.setName("ProcPool") - EVENNIA.services.addService(procpool_service) - if IRC_ENABLED: # IRC channel connections @@ -405,9 +344,9 @@ if RSS_ENABLED: from src.comms import rss rss.connect_all() -if SERVER_SERVICES_PLUGIN_MODULE: +for plugin_module in SERVER_SERVICES_PLUGIN_MODULES: # external plugin protocols - SERVER_SERVICES_PLUGIN_MODULE.start_plugin_services(EVENNIA) + plugin_module.start_plugin_services(EVENNIA) # clear server startup mode ServerConfig.objects.conf("server_starting_mode", delete=True) diff --git a/src/settings_default.py b/src/settings_default.py index c84dd392cd..c6fb5b4554 100644 --- a/src/settings_default.py +++ b/src/settings_default.py @@ -179,14 +179,14 @@ AT_INITIAL_SETUP_HOOK_MODULE = "" # at_server_stop() methods. These methods will be called every time # the server starts, reloads and resets/stops respectively. AT_SERVER_STARTSTOP_MODULE = "" -# Module containing a function start_plugin_services(application). This module +# List of one or more module paths to modules containing a function start_plugin_services(application). This module # will be called with the main Evennia Server application when the Server is initiated. # It will be called last in the startup sequence. -SERVER_SERVICES_PLUGIN_MODULE = "" -# Module containing a function start_plugin_services(application). This module +SERVER_SERVICES_PLUGIN_MODULES = [] +# List of one or more module paths to modules containing a function start_plugin_services(application). This module # will be called with the main Evennia Portal application when the Portal is initiated. # It will be called last in the startup sequence. -PORTAL_SERVICES_PLUGIN_MODULE = "" +PORTAL_SERVICES_PLUGIN_MODULES = [] # Module holding MSSP meta data. This is used by MUD-crawlers to determine # what type of game you are running, how many players you have etc. MSSP_META_MODULE = "" @@ -348,47 +348,6 @@ IMC2_SERVER_PWD = "" RSS_ENABLED=False RSS_UPDATE_INTERVAL = 60*10 # 10 minutes -###################################################################### -# Process Pool setup -###################################################################### - -# Activates the Twisted AMPoule process pool. This creates a pool -# of subprocesses. When using e.g. utils.run_async Evennia will then -# be able to offload long-running processes to the pool. Process pooling -# shows much better parallelism than threading (and also makes use of -# multiple processes). But it may be slower for some -# combinations of database and operating system. Also, creating -# objects from another process will require re-syncing of caches. -# ProcPool is disabled by default on SQlite3 since it cannot handle -# multiple process-writes very well. It should work fine with other supported -# databases. If you plan to change your database, copy the following line -# to your settings file to have it deactivate automatically for sqlite3. -PROCPOOL_ENABLED = False # not DATABASES["default"]["ENGINE"] == 'django.db.backends.sqlite3' -# relay process stdout to log (debug mode, very spammy) -PROCPOOL_DEBUG = False -# max/min size of the process pool. Will expand up to max limit on demand. -PROCPOOL_MIN_NPROC = 5 -PROCPOOL_MAX_NPROC = 20 -# after sending a command, this is the maximum time in seconds the process -# may run without returning. After this time the process will be killed -PROCPOOL_TIMEOUT = None -# maximum time (seconds) a process may idle before being pruned from pool (if pool bigger than minsize) -PROCPOOL_IDLETIME = 20 -# only change if the port clashes with something else on the system -PROCPOOL_HOST = 'localhost' -PROCPOOL_PORT = 5001 -# 0.0.0.0 means listening to all interfaces -PROCPOOL_INTERFACE = '0.0.0.0' -# user-id and group-id to run the processes as (for OS:es supporting this). -# If you plan to run unsafe code one could experiment with setting this -# to an unprivileged user. -PROCPOOL_UID = None -PROCPOOL_GID = None -# real path to a directory where all processes will be run. If -# not given, processes will be executed in game/. -PROCPOOL_DIRECTORY = None - - ###################################################################### # Django web features ###################################################################### diff --git a/src/utils/logger.py b/src/utils/logger.py index 074ddfbe34..9325eb8ccc 100644 --- a/src/utils/logger.py +++ b/src/utils/logger.py @@ -7,7 +7,6 @@ a higher layer module. """ from traceback import format_exc from twisted.python import log -from src.utils import utils def log_trace(errmsg=None): """ @@ -22,7 +21,7 @@ def log_trace(errmsg=None): log.msg('[::] %s' % line) if errmsg: try: - errmsg = utils.to_str(errmsg) + errmsg = str(errmsg) except Exception, e: errmsg = str(e) for line in errmsg.splitlines(): @@ -37,7 +36,7 @@ def log_errmsg(errmsg): errormsg: (string) The message to be logged. """ try: - errmsg = utils.to_str(errmsg) + errmsg = str(errmsg) except Exception, e: errmsg = str(e) for line in errmsg.splitlines(): @@ -51,7 +50,7 @@ def log_warnmsg(warnmsg): warnmsg: (string) The message to be logged. """ try: - warnmsg = utils.to_str(warnmsg) + warnmsg = str(warnmsg) except Exception, e: warnmsg = str(e) for line in warnmsg.splitlines(): @@ -65,7 +64,7 @@ def log_infomsg(infomsg): infomsg: (string) The message to be logged. """ try: - infomsg = utils.to_str(infomsg) + infomsg = str(infomsg) except Exception, e: infomsg = str(e) for line in infomsg.splitlines(): @@ -76,7 +75,7 @@ def log_depmsg(depmsg): Prints a deprecation message """ try: - depmsg = utils.to_str(depmsg) + depmsg = str(depmsg) except Exception, e: depmsg = str(e) for line in depmsg.splitlines(): diff --git a/src/utils/utils.py b/src/utils/utils.py index 6bd1d13c9e..a09f39fac0 100644 --- a/src/utils/utils.py +++ b/src/utils/utils.py @@ -20,13 +20,10 @@ except ImportError: import pickle ENCODINGS = settings.ENCODINGS -_LOGGER = None _GA = object.__getattribute__ _SA = object.__setattr__ _DA = object.__delattr__ - - def is_iter(iterable): """ Checks if an object behaves iterably. However, @@ -465,10 +462,10 @@ def format_table(table, extra_space=1): return ftable - _FROM_MODEL_MAP = None _TO_DBOBJ = lambda o: (hasattr(o, "dbobj") and o.dbobj) or o _TO_PACKED_DBOBJ = lambda natural_key, dbref: ('__packed_dbobj__', natural_key, dbref) +_DUMPS = None def to_pickle(data, do_pickle=True, emptypickle=True): """ Prepares object for being pickled. This will remap database models @@ -482,11 +479,10 @@ def to_pickle(data, do_pickle=True, emptypickle=True): Database objects are stored as ('__packed_dbobj__', , ) """ # prepare globals - global _DUMPS, _LOADS, _FROM_MODEL_MAP + global _DUMPS, _FROM_MODEL_MAP + _DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) if not _DUMPS: _DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) - if not _LOADS: - _LOADS = lambda data: pickle.loads(to_str(data)) if not _FROM_MODEL_MAP: _FROM_MODEL_MAP = defaultdict(str) _FROM_MODEL_MAP.update(dict((c.model, c.natural_key()) for c in ContentType.objects.all())) @@ -511,12 +507,14 @@ def to_pickle(data, do_pickle=True, emptypickle=True): # do recursive conversion data = iter_db2id(data) if do_pickle and not (not emptypickle and not data and data != False): + print "_DUMPS2:", _DUMPS return _DUMPS(data) return data _TO_MODEL_MAP = None _IS_PACKED_DBOBJ = lambda o: type(o)== tuple and len(o)==3 and o[0]=='__packed_dbobj__' _TO_TYPECLASS = lambda o: (hasattr(o, 'typeclass') and o.typeclass) or o +_LOADS = None from django.db import transaction @transaction.autocommit def from_pickle(data, do_pickle=True): @@ -528,9 +526,7 @@ def from_pickle(data, do_pickle=True): do_pickle - actually unpickle the input before continuing """ # prepare globals - global _DUMPS, _LOADS, _TO_MODEL_MAP - if not _DUMPS: - _DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) + global _LOADS, _TO_MODEL_MAP if not _LOADS: _LOADS = lambda data: pickle.loads(to_str(data)) if not _TO_MODEL_MAP: @@ -572,8 +568,8 @@ def clean_object_caches(obj): global _TYPECLASSMODELS, _OBJECTMODELS if not _TYPECLASSMODELS: from src.typeclasses import models as _TYPECLASSMODELS - if not _OBJECTMODELS: - from src.objects import models as _OBJECTMODELS + #if not _OBJECTMODELS: + # from src.objects import models as _OBJECTMODELS #print "recaching:", obj if not obj: @@ -596,8 +592,6 @@ def clean_object_caches(obj): _PPOOL = None _PCMD = None -_DUMPS = None -_LOADS = None _PROC_ERR = "A process has ended with a probable error condition: process ended by signal 9." def run_async(to_execute, *args, **kwargs): """ @@ -609,23 +603,8 @@ def run_async(to_execute, *args, **kwargs): arguments. The callable will be executed using ProcPool, or in a thread if ProcPool is not available. - to_execute (string) - this is only available is ProcPool is - running. If a string, to_execute this will be treated as a code - snippet to execute asynchronously. *args are then not used - and non-reserverd *kwargs are used to define the execution - environment made available to the code. reserved kwargs: - 'use_thread' (bool) - this only works with callables (not code). - It forces the code to run in a thread instead - of using the Process Pool, even if the latter - is available. This could be useful if you want - to make sure to not get out of sync with the - main process (such as accessing in-memory global - properties) - 'proc_timeout' (int) - only used if ProcPool is available. Sets a - max time for execution. This alters the value set - by settings.PROCPOOL_TIMEOUT 'at_return' -should point to a callable with one argument. It will be called with the return value from to_execute. @@ -636,32 +615,20 @@ def run_async(to_execute, *args, **kwargs): 'at_err_kwargs' - this dictionary will be used as keyword arguments to the at_err errback. - *args - if to_execute is a callable, these args will be used + *args - these args will be used as arguments for that function. If to_execute is a string *args are not used. - *kwargs - if to_execute is a callable, these kwargs will be used + *kwargs - these kwargs will be used as keyword arguments in that function. If a string, they instead are used to define the executable environment that should be available to execute the code in to_execute. - run_async will either relay the code to a thread or to a processPool - depending on input and what is available in the system. To activate - Process pooling, settings.PROCPOOL_ENABLE must be set. - - to_execute in string form should handle all imports needed. kwargs - can be used to send objects and properties. Such properties will - be pickled, except Database Objects which will be sent across - on a special format and re-loaded on the other side. - - To get a return value from your code snippet, Use the _return() - function: Every call to this function from your snippet will - append the argument to an internal list of returns. This return value - (or a list) will be the first argument to the at_return callback. + run_async will relay executed code to a thread. Use this function with restrain and only for features/commands that you know has no influence on the cause-and-effect order of your game (commands given after the async function might be executed before - it has finished). Accessing the same property from different threads/processes + it has finished). Accessing the same property from different threads can lead to unpredicted behaviour if you are not careful (this is called a "race condition"). @@ -671,72 +638,14 @@ def run_async(to_execute, *args, **kwargs): tracebacks. """ - # handle all global imports. - global _PPOOL, _PCMD - if _PPOOL == None: - # Try to load process Pool - from src.server.sessionhandler import SESSIONS as _SESSIONS - try: - _PPOOL = _SESSIONS.server.services.namedServices.get("ProcPool").pool - except AttributeError: - _PPOOL = False - if not _PCMD: - from src.server.procpool import ExecuteCode as _PCMD - - use_timeout = kwargs.pop("proc_timeout", None) - - # helper converters for callbacks/errbacks - def convert_return(f): - def func(ret, *args, **kwargs): - rval = ret["response"] and from_pickle(ret["response"]) - reca = ret["recached"] and from_pickle(ret["recached"]) - # recache all indicated objects - [clean_object_caches(obj) for obj in reca] - if f: return f(rval, *args, **kwargs) - else: return rval - return func - def convert_err(f): - def func(err, *args, **kwargs): - err.trap(Exception) - err = err.getErrorMessage() - if use_timeout and err == _PROC_ERR: - err = "Process took longer than %ss and timed out." % use_timeout - if f: - return f(err, *args, **kwargs) - else: - global _LOGGER - if not _LOGGER: - from src.utils import logger as _LOGGER - err = "Error reported from subprocess: '%s'" % err - _LOGGER.log_errmsg(err) - return func # handle special reserved input kwargs - use_thread = kwargs.pop("use_thread", False) callback = convert_return(kwargs.pop("at_return", None)) errback = convert_err(kwargs.pop("at_err", None)) callback_kwargs = kwargs.pop("at_return_kwargs", {}) errback_kwargs = kwargs.pop("at_err_kwargs", {}) - if _PPOOL and not use_thread: - # process pool is running - if isinstance(to_execute, basestring): - # run source code in process pool - cmdargs = {"_timeout":use_timeout} - cmdargs["source"] = to_str(to_execute) - cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or "" - # defer to process pool - deferred = _PPOOL.doWork(_PCMD, **cmdargs) - elif callable(to_execute): - # execute callable in process - callname = to_execute.__name__ - cmdargs = {"_timeout":use_timeout} - cmdargs["source"] = "_return(%s(*args,**kwargs))" % callname - cmdargs["environment"] = to_pickle({callname:to_execute, "args":args, "kwargs":kwargs}) - deferred = _PPOOL.doWork(_PCMD, **cmdargs) - else: - raise RuntimeError("'%s' could not be handled by run_async" % to_execute) - elif callable(to_execute): + if callable(to_execute): # no process pool available, fall back to old deferToThread mechanism. deferred = threads.deferToThread(to_execute, *args, **kwargs) else: