Added new process-pool runner based on AMPoule (integrated into Evennia).

This allows e.g. utils.utils.run_async to offload long-running functions
to a completely different subprocess entirely, offering real parallelism.

Implementation is still experimental, notably not all objects can be
transferred safely across the wire; also there is no concept of
updating caches yet - so adding an object from the subprocess side
will not be known in the main thread yet (since caches cannot yet tell
the underlying database has changed).
This commit is contained in:
Griatch 2012-09-02 10:10:22 +02:00
parent dcc7f29a91
commit f5a889e40c
22 changed files with 2322 additions and 60 deletions

View file

@ -12,8 +12,14 @@ import textwrap
import datetime
import random
from twisted.internet import threads
from django.contrib.contenttypes.models import ContentType
from django.conf import settings
try:
import cPickle as pickle
except ImportError:
import pickle
ENCODINGS = settings.ENCODINGS
def is_iter(iterable):
@ -415,6 +421,7 @@ def inherits_from(obj, parent):
return any(1 for obj_path in obj_paths if obj_path == parent_path)
def format_table(table, extra_space=1):
"""
Takes a table of collumns: [[val,val,val,...], [val,val,val,...], ...]
@ -449,47 +456,184 @@ def format_table(table, extra_space=1):
for icol, col in enumerate(table)])
return ftable
def run_async(async_func, *args, **kwargs):
_FROM_MODEL_MAP = None
def to_pickle(obj, do_pickle=False):
"""
This wrapper will use Twisted's asynchronous features to run a slow
function using a separate reactor thread. In effect this means that
the server will not be blocked while the slow process finish.
Prepares object for being pickled. This will remap database models
into an intermediary format, making them easily retrievable later.
obj - a python object to prepare for pickling
do_pickle - actually pickle the object as well
Database
"""
# prepare globals
global _DUMPS, _LOADS, _MODEL_MAP
if not _DUMPS:
_DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
if not _LOADS:
_LOADS = lambda data: pickle.loads(to_str(data))
if not _MODEL_MAP:
_FROM_MODEL_MAP = dict((c.model, c.natural_key()) for c in ContentType.objects.all())
_TO_MODEL_MAP = None
def from_pickle(obj, do_pickle=False):
"""
Converts back from a data stream prepared with to_pickle. This will
re-acquire database objects stored in the special format.
obj - an object or a pickle, as indicated by the do_pickle flag
do_pickle - actually unpickle the input before continuing
"""
# prepare globals
global _DUMPS, _LOADS, _MODEL_MAP
if not _DUMPS:
_DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
if not _LOADS:
_LOADS = lambda data: pickle.loads(to_str(data))
if not _MODEL_MAP:
_TO_MODEL_MAP = dict((c.natural_key(), c.model_class()) for c in ContentType.objects.all())
_PPOOL = None
_PCMD = None
_DUMPS = None
_LOADS = None
_MODEL_MAP = None
def run_async(to_execute, *args, **kwargs):
"""
Runs a function or executes a code snippet asynchronously.
Inputs:
to_execute (callable or string) - if a callable, this function
will be executed in a separate thread, using the
*args/**kwargs as input.
If a string, this string must be a source snippet.
This string will executed using the ProcPool is
enabled, if not this will raise a RunTimeError.
*args - if to_execute is a callable, these args will be used
as arguments for that function. If to_execute is a string
*args are not used.
*kwargs - if to_execute is a callable, these kwargs will be used
as keyword arguments in that function. If a string, they
instead are used to define the executable environment
that should be available to execute the code in to_execute.
There are two special (optional) kwargs. These are available
both if to_execute is a callable or a source string.
'at_return' -should point to a callable with one argument.
It will be called with the return value from
to_execute.
'at_return_kwargs' - this dictionary which be used as keyword
arguments to the at_return callback.
'at_err' - this will be called with a Failure instance if
there is an error in to_execute.
'at_err_kwargs' - this dictionary will be used as keyword
arguments to the at_err errback.
run_async will either relay the code to a thread or to a processPool
depending on input and what is available in the system. To activate
Process pooling, settings.PROCPOOL_ENABLE must be set.
to_execute in string form should handle all imports needed. kwargs
can be used to send objects and properties. Such properties will
be pickled, except Database Objects which will be sent across
on a special format and re-loaded on the other side.
To get a return value from your code snippet, Use the _return()
function: Every call to this function from your snippet will
append the argument to an internal list of returns. This return value
(or a list) will be the first argument to the at_return callback.
Use this function with restrain and only for features/commands
that you know has no influence on the cause-and-effect order of your
game (commands given after the async function might be executed before
it has finished). Accessing the same property from different threads can
lead to unpredicted behaviour if you are not careful (this is called a
it has finished). Accessing the same property from different threads/processes
can lead to unpredicted behaviour if you are not careful (this is called a
"race condition").
Also note that some databases, notably sqlite3, don't support access from
multiple threads simultaneously, so if you do heavy database access from
your async_func under sqlite3 you will probably run very slow or even get
your to_execute under sqlite3 you will probably run very slow or even get
tracebacks.
arg:
async_func - function that should be run asynchroneously
reserved keywords:
at_return(r) - if given, this function will be called when async_func returns
value r at the end of a successful execution
at_err(e) - if given, this function is called if async_func fails with an exception e.
use e.trap(ExceptionType1, ExceptionType2)
all other arguments/keywords will be used as args/kwargs fro async_func.
"""
# create deferred object
# handle all global imports.
global _PPOOL, _PCMD, _DUMPS, _LOADS, _MODEL_MAP
if _PPOOL == None:
# Try to load process Pool
from src.server.sessionhandler import SESSIONS as _SESSIONS
try:
_PPOOL = _SESSIONS.server.services.namedServices.get("ProcPool").pool
except AttributeError:
_PPOOL = False
if not _PCMD:
from src.server.procpool import ExecuteCode as _PCMD
if not _DUMPS:
_DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
if not _LOADS:
_LOADS = lambda data: pickle.loads(to_str(data))
if not _MODEL_MAP:
_MODEL_MAP = dict((c.model, c.natural_key()) for c in ContentType.objects.all())
deferred = threads.deferToThread(async_func, *args, **kwargs)
if "at_return" in kwargs:
deferred.addCallback(kwargs["at_return"])
if "at_err" in kwargs:
deferred.addErrback(kwargs["at_err"])
# always add a logging errback as a last catch
# determine callbacks/errbacks
def default_errback(e):
from src.utils import logger
logger.log_trace(e)
def convert_return(f):
def func(ret):
rval = ret["response"] and _LOADS(ret["response"])
if f: return f(rval)
else: return rval
return func
callback = convert_return(kwargs.pop("at_return", None))
errback = kwargs.pop("at_err", None)
callback_kwargs = kwargs.pop("at_return_kwargs", {})
errback_kwargs = kwargs.pop("at_err_kwargs", {})
if not callable(to_execute) and _PPOOL:
# run source code in process pool
if to_execute == "Echo":
# testing - addCallback set externally
from src.utils.ampoule.commands import Echo as to_execute
deferred = _PPOOL.doWork(to_execute, **{"data":args[0]})
else:
cmdargs = {"source":to_str(to_execute)}
to_pickle = {"normal":{}, "objs":{}}
for key, val in kwargs.items():
if hasattr(val, "dbobj"):
val = val.dbobj
natural_key = _MODEL_MAP.get(hasattr(val, "id") and \
hasattr(val, '__class__') and \
val.__class__.__name__.lower())
if natural_key:
# a database object. Store natural_key (a tuple) along with the objs id.
to_pickle["objs"][key] = (natural_key, val.id)
else:
to_pickle["normal"][key] = val
if to_pickle["normal"] or to_pickle["objs"]:
cmdargs["environment"] = _DUMPS(to_pickle)
else:
cmdargs["environment"] = ""
# defer to process pool
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
elif callable(to_execute):
# no process pool available, or we gave an explicit function and not code. Use threading.
deferred = threads.deferToThread(to_execute, *args, **kwargs)
else:
# no appropriate input
raise RuntimeError("'%s' could not be handled by run_async" % to_execute)
# attach callbacks
if callback:
deferred.addCallback(callback, **callback_kwargs)
if errback:
deferred.addCallback(errback, **errback_kwargs)
# always add a logging errback as a last catch
deferred.addErrback(default_errback)
@ -548,6 +692,7 @@ def check_evennia_dependencies():
if settings.IRC_ENABLED:
try:
import twisted.words
twisted.words # set to avoid debug info about not-used import
except ImportError:
errstring += "\n ERROR: IRC is enabled, but twisted.words is not installed. Please install it."
errstring += "\n Linux Debian/Ubuntu users should install package 'python-twisted-words', others"