From af99a80b4b8bc5854e40eeee9046f4ea9b995523 Mon Sep 17 00:00:00 2001 From: Griatch Date: Sun, 2 Sep 2012 12:53:11 +0200 Subject: [PATCH] Added procpool functionality for sending arbitrary data over the wire (including nested databaseobjects). Rules are similar to Attribute saving. Adding a missing component to the procpool setup. --- src/server/procpool.py | 126 +++++++++++++++++++++++++++++++++++++++++ src/utils/utils.py | 117 +++++++++++++++++++++----------------- 2 files changed, 192 insertions(+), 51 deletions(-) create mode 100644 src/server/procpool.py diff --git a/src/server/procpool.py b/src/server/procpool.py new file mode 100644 index 0000000000..83d9d266f2 --- /dev/null +++ b/src/server/procpool.py @@ -0,0 +1,126 @@ +""" +ProcPool + +This module implements and handles processes running under the AMPoule +pool. The ProcPool can accept data from processes and runs them in a +dynamically changing pool of processes, talking to them over AMP. This +offers full asynchronous operation (Python threading does not work as +well for this). + +The ExecuteCode command found here is used by src.utils.utils.run_async() +to launch snippets of code on the process pool. The pool itself is a +service named "Process Pool" and is controlled from src/server/server.py. +It can be customized via settings.PROCPOOL_* + +""" + +from twisted.protocols import amp +from src.utils.ampoule.child import AMPChild +from src.utils.utils import to_pickle, from_pickle + +# handle global setups +_LOGGER = None + +# Evennia multiprocess command + +class ExecuteCode(amp.Command): + """ + Executes python code in the python process, + returning result when ready. + + source - a compileable Python source code string + environment - a pickled dictionary of Python + data. Each key will become the name + of a variable available to the source + code. Database objects are stored on + the form ((app, modelname), id) allowing + the receiver to easily rebuild them on + this side. + errors - an all-encompassing error handler + response - a string or a pickled string + + """ + arguments = [('source', amp.String()), + ('environment', amp.String())] + errors = [(Exception, 'EXCEPTION')] + response = [('response', amp.String())] + + +# Evennia multiprocess child process template + +class ProcPoolChild(AMPChild): + """ + This is describing what happens on the subprocess side. + + This already supports Echo, Shutdown and Ping. + + Methods: + executecode - a remote code execution environment + + """ + + def executecode(self, source, environment): + """ + Remote code execution + + source - Python code snippet + environment - pickled dictionary of environment + variables. They are stored in + two keys "normal" and "objs" where + normal holds a dictionary of + normally pickled python objects + wheras objs points to a dictionary + of database represenations ((app,key),id). + + The environment's entries will be made available as + local variables during the execution. Normal eval + results will be returned as-is. For more complex + code snippets (run by exec), the _return function + is available: All data sent to _return(retval) will + be returned from this system whenever the system + finishes. Multiple calls to _return will result in + a list being return. The return value is pickled + and thus allows for returning any pickleable data. + + + """ + import ev, utils + class Ret(object): + "Helper class for holding returns from exec" + def __init__(self): + self.returns = [] + def __call__(self, *args, **kwargs): + self.returns.extend(list(args)) + def get_returns(self): + lr = len(self.returns) + if lr == 0: + return "" + elif lr == 1: + return to_pickle(self.returns[0], emptypickle=False) or "" + else: + return to_pickle(self.returns, emptypickle=False) or "" + _return = Ret() + + available_vars = {'ev':ev, + 'inherits_from':utils.inherits_from, + '_return':_return} + if environment: + # load environment + try: + environment = from_pickle(environment) + except Exception: + global _LOGGER + if not _LOGGER: + from src.utils.logger import logger as _LOGGER + _LOGGER.log_trace("Could not find remote object") + available_vars.update(environment) + try: + ret = eval(source, {}, available_vars) + if ret != None: + return {'response':to_pickle(ret, emptypickle=False) or ""} + except Exception: + # use exec instead + exec source in available_vars + return {'response': _return.get_returns()} + ExecuteCode.responder(executecode) + diff --git a/src/utils/utils.py b/src/utils/utils.py index 37e201e801..f4d53a350c 100644 --- a/src/utils/utils.py +++ b/src/utils/utils.py @@ -6,11 +6,10 @@ be of use when designing your own game. """ -from inspect import ismodule import os, sys, imp, types, math -import textwrap -import datetime -import random +import textwrap, datetime, random +from inspect import ismodule +from collections import defaultdict from twisted.internet import threads from django.contrib.contenttypes.models import ContentType from django.conf import settings @@ -457,30 +456,57 @@ def format_table(table, extra_space=1): return ftable + _FROM_MODEL_MAP = None -def to_pickle(obj, do_pickle=False): +_TO_DBOBJ = lambda o: (hasattr(o, "dbobj") and o.dbobj) or o +_TO_PACKED_DBOBJ = lambda natural_key, dbref: ('__packed_dbobj__', natural_key, dbref) +def to_pickle(data, do_pickle=True, emptypickle=True): """ Prepares object for being pickled. This will remap database models into an intermediary format, making them easily retrievable later. obj - a python object to prepare for pickling - do_pickle - actually pickle the object as well - - Database - + do_pickle - return a pickled object + emptypickle - allow pickling also a None/empty value (False will be pickled) + This has no effect if do_pickle is False + Database objects are stored as ('__packed_dbobj__', , ) """ # prepare globals - global _DUMPS, _LOADS, _MODEL_MAP + global _DUMPS, _LOADS, _FROM_MODEL_MAP if not _DUMPS: _DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) if not _LOADS: _LOADS = lambda data: pickle.loads(to_str(data)) - if not _MODEL_MAP: - _FROM_MODEL_MAP = dict((c.model, c.natural_key()) for c in ContentType.objects.all()) + if not _FROM_MODEL_MAP: + _FROM_MODEL_MAP = defaultdict(str) + _FROM_MODEL_MAP.update(dict((c.model, c.natural_key()) for c in ContentType.objects.all())) + + def iter_db2id(item): + "recursively looping over iterable items, finding dbobjs" + dtype = type(item) + if dtype in (basestring, int, float): + return item + elif dtype == tuple: + return tuple(iter_db2id(val) for val in item) + elif dtype == dict: + return dict((key, iter_db2id(val)) for key, val in item.items()) + else: + item = _TO_DBOBJ(item) + natural_key = _FROM_MODEL_MAP[hasattr(item, "id") and hasattr(item, '__class__') and item.__class__.__name__.lower()] + if natural_key: + return _TO_PACKED_DBOBJ(natural_key, item.id) + return item + # do recursive conversion + data = iter_db2id(data) + if do_pickle and not (not emptypickle and not data and data != False): + return _DUMPS(data) + return data _TO_MODEL_MAP = None -def from_pickle(obj, do_pickle=False): +_IS_PACKED_DBOBJ = lambda o: type(o)== tuple and len(o)==3 and o[0]=='__packed_dbobj__' +_TO_TYPECLASS = lambda o: (hasattr(o, 'typeclass') and o.typeclass) or o +def from_pickle(data, do_pickle=True): """ Converts back from a data stream prepared with to_pickle. This will re-acquire database objects stored in the special format. @@ -489,20 +515,36 @@ def from_pickle(obj, do_pickle=False): do_pickle - actually unpickle the input before continuing """ # prepare globals - global _DUMPS, _LOADS, _MODEL_MAP + global _DUMPS, _LOADS, _TO_MODEL_MAP if not _DUMPS: _DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) if not _LOADS: _LOADS = lambda data: pickle.loads(to_str(data)) - if not _MODEL_MAP: - _TO_MODEL_MAP = dict((c.natural_key(), c.model_class()) for c in ContentType.objects.all()) + if not _TO_MODEL_MAP: + _TO_MODEL_MAP = defaultdict(str) + _TO_MODEL_MAP.update(dict((c.natural_key(), c.model_class()) for c in ContentType.objects.all())) + def iter_id2db(item): + "Recreate all objects recursively" + dtype = type(item) + if dtype in (basestring, int, float): + return item + elif _IS_PACKED_DBOBJ(item): # this is a tuple and must be done before tuple-check + return _TO_TYPECLASS(_TO_MODEL_MAP[item[1]].objects.get(id=item[2])) + elif dtype == tuple: + return tuple(iter_id2db(val) for val in item) + elif dtype == dict: + return dict((key, iter_id2db(val)) for key, val in item.items()) + return item + if do_pickle: + data = _LOADS(data) + # do recursive conversion + return iter_id2db(data) _PPOOL = None _PCMD = None _DUMPS = None _LOADS = None -_MODEL_MAP = None def run_async(to_execute, *args, **kwargs): """ Runs a function or executes a code snippet asynchronously. @@ -562,7 +604,7 @@ def run_async(to_execute, *args, **kwargs): """ # handle all global imports. - global _PPOOL, _PCMD, _DUMPS, _LOADS, _MODEL_MAP + global _PPOOL, _PCMD if _PPOOL == None: # Try to load process Pool from src.server.sessionhandler import SESSIONS as _SESSIONS @@ -572,12 +614,6 @@ def run_async(to_execute, *args, **kwargs): _PPOOL = False if not _PCMD: from src.server.procpool import ExecuteCode as _PCMD - if not _DUMPS: - _DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL)) - if not _LOADS: - _LOADS = lambda data: pickle.loads(to_str(data)) - if not _MODEL_MAP: - _MODEL_MAP = dict((c.model, c.natural_key()) for c in ContentType.objects.all()) # determine callbacks/errbacks def default_errback(e): @@ -585,7 +621,7 @@ def run_async(to_execute, *args, **kwargs): logger.log_trace(e) def convert_return(f): def func(ret): - rval = ret["response"] and _LOADS(ret["response"]) + rval = ret["response"] and from_pickle(ret["response"]) if f: return f(rval) else: return rval return func @@ -595,32 +631,12 @@ def run_async(to_execute, *args, **kwargs): callback_kwargs = kwargs.pop("at_return_kwargs", {}) errback_kwargs = kwargs.pop("at_err_kwargs", {}) - if not callable(to_execute) and _PPOOL: + if isinstance(to_execute, basestring) and _PPOOL: # run source code in process pool - if to_execute == "Echo": - # testing - addCallback set externally - from src.utils.ampoule.commands import Echo as to_execute - deferred = _PPOOL.doWork(to_execute, **{"data":args[0]}) - else: - cmdargs = {"source":to_str(to_execute)} - to_pickle = {"normal":{}, "objs":{}} - for key, val in kwargs.items(): - if hasattr(val, "dbobj"): - val = val.dbobj - natural_key = _MODEL_MAP.get(hasattr(val, "id") and \ - hasattr(val, '__class__') and \ - val.__class__.__name__.lower()) - if natural_key: - # a database object. Store natural_key (a tuple) along with the objs id. - to_pickle["objs"][key] = (natural_key, val.id) - else: - to_pickle["normal"][key] = val - if to_pickle["normal"] or to_pickle["objs"]: - cmdargs["environment"] = _DUMPS(to_pickle) - else: - cmdargs["environment"] = "" - # defer to process pool - deferred = _PPOOL.doWork(_PCMD, **cmdargs) + cmdargs = {"source": to_str(to_execute)} + cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or "" + # defer to process pool + deferred = _PPOOL.doWork(_PCMD, **cmdargs) elif callable(to_execute): # no process pool available, or we gave an explicit function and not code. Use threading. deferred = threads.deferToThread(to_execute, *args, **kwargs) @@ -643,7 +659,6 @@ def check_evennia_dependencies(): Returns False if a show-stopping version mismatch is found. """ - # defining the requirements python_min = '2.6' twisted_min = '10.0'