Added procpool functionality for sending arbitrary data over the wire (including

nested databaseobjects). Rules are similar to Attribute saving.
Adding a missing component to the procpool setup.
This commit is contained in:
Griatch 2012-09-02 12:53:11 +02:00
parent 3ebeab5689
commit af99a80b4b
2 changed files with 192 additions and 51 deletions

126
src/server/procpool.py Normal file
View file

@ -0,0 +1,126 @@
"""
ProcPool
This module implements and handles processes running under the AMPoule
pool. The ProcPool can accept data from processes and runs them in a
dynamically changing pool of processes, talking to them over AMP. This
offers full asynchronous operation (Python threading does not work as
well for this).
The ExecuteCode command found here is used by src.utils.utils.run_async()
to launch snippets of code on the process pool. The pool itself is a
service named "Process Pool" and is controlled from src/server/server.py.
It can be customized via settings.PROCPOOL_*
"""
from twisted.protocols import amp
from src.utils.ampoule.child import AMPChild
from src.utils.utils import to_pickle, from_pickle
# handle global setups
_LOGGER = None
# Evennia multiprocess command
class ExecuteCode(amp.Command):
"""
Executes python code in the python process,
returning result when ready.
source - a compileable Python source code string
environment - a pickled dictionary of Python
data. Each key will become the name
of a variable available to the source
code. Database objects are stored on
the form ((app, modelname), id) allowing
the receiver to easily rebuild them on
this side.
errors - an all-encompassing error handler
response - a string or a pickled string
"""
arguments = [('source', amp.String()),
('environment', amp.String())]
errors = [(Exception, 'EXCEPTION')]
response = [('response', amp.String())]
# Evennia multiprocess child process template
class ProcPoolChild(AMPChild):
"""
This is describing what happens on the subprocess side.
This already supports Echo, Shutdown and Ping.
Methods:
executecode - a remote code execution environment
"""
def executecode(self, source, environment):
"""
Remote code execution
source - Python code snippet
environment - pickled dictionary of environment
variables. They are stored in
two keys "normal" and "objs" where
normal holds a dictionary of
normally pickled python objects
wheras objs points to a dictionary
of database represenations ((app,key),id).
The environment's entries will be made available as
local variables during the execution. Normal eval
results will be returned as-is. For more complex
code snippets (run by exec), the _return function
is available: All data sent to _return(retval) will
be returned from this system whenever the system
finishes. Multiple calls to _return will result in
a list being return. The return value is pickled
and thus allows for returning any pickleable data.
"""
import ev, utils
class Ret(object):
"Helper class for holding returns from exec"
def __init__(self):
self.returns = []
def __call__(self, *args, **kwargs):
self.returns.extend(list(args))
def get_returns(self):
lr = len(self.returns)
if lr == 0:
return ""
elif lr == 1:
return to_pickle(self.returns[0], emptypickle=False) or ""
else:
return to_pickle(self.returns, emptypickle=False) or ""
_return = Ret()
available_vars = {'ev':ev,
'inherits_from':utils.inherits_from,
'_return':_return}
if environment:
# load environment
try:
environment = from_pickle(environment)
except Exception:
global _LOGGER
if not _LOGGER:
from src.utils.logger import logger as _LOGGER
_LOGGER.log_trace("Could not find remote object")
available_vars.update(environment)
try:
ret = eval(source, {}, available_vars)
if ret != None:
return {'response':to_pickle(ret, emptypickle=False) or ""}
except Exception:
# use exec instead
exec source in available_vars
return {'response': _return.get_returns()}
ExecuteCode.responder(executecode)

View file

@ -6,11 +6,10 @@ be of use when designing your own game.
"""
from inspect import ismodule
import os, sys, imp, types, math
import textwrap
import datetime
import random
import textwrap, datetime, random
from inspect import ismodule
from collections import defaultdict
from twisted.internet import threads
from django.contrib.contenttypes.models import ContentType
from django.conf import settings
@ -457,30 +456,57 @@ def format_table(table, extra_space=1):
return ftable
_FROM_MODEL_MAP = None
def to_pickle(obj, do_pickle=False):
_TO_DBOBJ = lambda o: (hasattr(o, "dbobj") and o.dbobj) or o
_TO_PACKED_DBOBJ = lambda natural_key, dbref: ('__packed_dbobj__', natural_key, dbref)
def to_pickle(data, do_pickle=True, emptypickle=True):
"""
Prepares object for being pickled. This will remap database models
into an intermediary format, making them easily retrievable later.
obj - a python object to prepare for pickling
do_pickle - actually pickle the object as well
Database
do_pickle - return a pickled object
emptypickle - allow pickling also a None/empty value (False will be pickled)
This has no effect if do_pickle is False
Database objects are stored as ('__packed_dbobj__', <natural_key_tuple>, <dbref>)
"""
# prepare globals
global _DUMPS, _LOADS, _MODEL_MAP
global _DUMPS, _LOADS, _FROM_MODEL_MAP
if not _DUMPS:
_DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
if not _LOADS:
_LOADS = lambda data: pickle.loads(to_str(data))
if not _MODEL_MAP:
_FROM_MODEL_MAP = dict((c.model, c.natural_key()) for c in ContentType.objects.all())
if not _FROM_MODEL_MAP:
_FROM_MODEL_MAP = defaultdict(str)
_FROM_MODEL_MAP.update(dict((c.model, c.natural_key()) for c in ContentType.objects.all()))
def iter_db2id(item):
"recursively looping over iterable items, finding dbobjs"
dtype = type(item)
if dtype in (basestring, int, float):
return item
elif dtype == tuple:
return tuple(iter_db2id(val) for val in item)
elif dtype == dict:
return dict((key, iter_db2id(val)) for key, val in item.items())
else:
item = _TO_DBOBJ(item)
natural_key = _FROM_MODEL_MAP[hasattr(item, "id") and hasattr(item, '__class__') and item.__class__.__name__.lower()]
if natural_key:
return _TO_PACKED_DBOBJ(natural_key, item.id)
return item
# do recursive conversion
data = iter_db2id(data)
if do_pickle and not (not emptypickle and not data and data != False):
return _DUMPS(data)
return data
_TO_MODEL_MAP = None
def from_pickle(obj, do_pickle=False):
_IS_PACKED_DBOBJ = lambda o: type(o)== tuple and len(o)==3 and o[0]=='__packed_dbobj__'
_TO_TYPECLASS = lambda o: (hasattr(o, 'typeclass') and o.typeclass) or o
def from_pickle(data, do_pickle=True):
"""
Converts back from a data stream prepared with to_pickle. This will
re-acquire database objects stored in the special format.
@ -489,20 +515,36 @@ def from_pickle(obj, do_pickle=False):
do_pickle - actually unpickle the input before continuing
"""
# prepare globals
global _DUMPS, _LOADS, _MODEL_MAP
global _DUMPS, _LOADS, _TO_MODEL_MAP
if not _DUMPS:
_DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
if not _LOADS:
_LOADS = lambda data: pickle.loads(to_str(data))
if not _MODEL_MAP:
_TO_MODEL_MAP = dict((c.natural_key(), c.model_class()) for c in ContentType.objects.all())
if not _TO_MODEL_MAP:
_TO_MODEL_MAP = defaultdict(str)
_TO_MODEL_MAP.update(dict((c.natural_key(), c.model_class()) for c in ContentType.objects.all()))
def iter_id2db(item):
"Recreate all objects recursively"
dtype = type(item)
if dtype in (basestring, int, float):
return item
elif _IS_PACKED_DBOBJ(item): # this is a tuple and must be done before tuple-check
return _TO_TYPECLASS(_TO_MODEL_MAP[item[1]].objects.get(id=item[2]))
elif dtype == tuple:
return tuple(iter_id2db(val) for val in item)
elif dtype == dict:
return dict((key, iter_id2db(val)) for key, val in item.items())
return item
if do_pickle:
data = _LOADS(data)
# do recursive conversion
return iter_id2db(data)
_PPOOL = None
_PCMD = None
_DUMPS = None
_LOADS = None
_MODEL_MAP = None
def run_async(to_execute, *args, **kwargs):
"""
Runs a function or executes a code snippet asynchronously.
@ -562,7 +604,7 @@ def run_async(to_execute, *args, **kwargs):
"""
# handle all global imports.
global _PPOOL, _PCMD, _DUMPS, _LOADS, _MODEL_MAP
global _PPOOL, _PCMD
if _PPOOL == None:
# Try to load process Pool
from src.server.sessionhandler import SESSIONS as _SESSIONS
@ -572,12 +614,6 @@ def run_async(to_execute, *args, **kwargs):
_PPOOL = False
if not _PCMD:
from src.server.procpool import ExecuteCode as _PCMD
if not _DUMPS:
_DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
if not _LOADS:
_LOADS = lambda data: pickle.loads(to_str(data))
if not _MODEL_MAP:
_MODEL_MAP = dict((c.model, c.natural_key()) for c in ContentType.objects.all())
# determine callbacks/errbacks
def default_errback(e):
@ -585,7 +621,7 @@ def run_async(to_execute, *args, **kwargs):
logger.log_trace(e)
def convert_return(f):
def func(ret):
rval = ret["response"] and _LOADS(ret["response"])
rval = ret["response"] and from_pickle(ret["response"])
if f: return f(rval)
else: return rval
return func
@ -595,32 +631,12 @@ def run_async(to_execute, *args, **kwargs):
callback_kwargs = kwargs.pop("at_return_kwargs", {})
errback_kwargs = kwargs.pop("at_err_kwargs", {})
if not callable(to_execute) and _PPOOL:
if isinstance(to_execute, basestring) and _PPOOL:
# run source code in process pool
if to_execute == "Echo":
# testing - addCallback set externally
from src.utils.ampoule.commands import Echo as to_execute
deferred = _PPOOL.doWork(to_execute, **{"data":args[0]})
else:
cmdargs = {"source":to_str(to_execute)}
to_pickle = {"normal":{}, "objs":{}}
for key, val in kwargs.items():
if hasattr(val, "dbobj"):
val = val.dbobj
natural_key = _MODEL_MAP.get(hasattr(val, "id") and \
hasattr(val, '__class__') and \
val.__class__.__name__.lower())
if natural_key:
# a database object. Store natural_key (a tuple) along with the objs id.
to_pickle["objs"][key] = (natural_key, val.id)
else:
to_pickle["normal"][key] = val
if to_pickle["normal"] or to_pickle["objs"]:
cmdargs["environment"] = _DUMPS(to_pickle)
else:
cmdargs["environment"] = ""
# defer to process pool
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
cmdargs = {"source": to_str(to_execute)}
cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or ""
# defer to process pool
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
elif callable(to_execute):
# no process pool available, or we gave an explicit function and not code. Use threading.
deferred = threads.deferToThread(to_execute, *args, **kwargs)
@ -643,7 +659,6 @@ def check_evennia_dependencies():
Returns False if a show-stopping version mismatch is found.
"""
# defining the requirements
python_min = '2.6'
twisted_min = '10.0'