From ffcf4b3c2f49dfb61cf4ebec99bdd4a9d0bdb1e0 Mon Sep 17 00:00:00 2001 From: Griatch Date: Mon, 3 Sep 2012 01:11:14 +0200 Subject: [PATCH] Added cache-resyncing to ProcPool. This makes sure to update all affected object caches whenever the subprocess returns (this is potentially not good enough for long-running scripts, will have to ponder that one). Made ProcPool work with MySQL (where it works much better). Tested and fixed many small bugs. --- src/__init__.py | 2 + src/commands/default/batchprocess.py | 4 +- src/objects/models.py | 4 ++ src/server/procpool.py | 27 ++++++++++---- src/typeclasses/models.py | 5 ++- src/utils/idmapper/base.py | 37 +++++++++++++++++- src/utils/utils.py | 56 +++++++++++++++++++++++++++- 7 files changed, 121 insertions(+), 14 deletions(-) diff --git a/src/__init__.py b/src/__init__.py index e69de29bb2..2b758d664e 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -0,0 +1,2 @@ +# experimental central dictionary for models in subprocesses to report they have been changed. +PROC_MODIFIED_OBJS = [] diff --git a/src/commands/default/batchprocess.py b/src/commands/default/batchprocess.py index db77ea6c49..d92edf6f0d 100644 --- a/src/commands/default/batchprocess.py +++ b/src/commands/default/batchprocess.py @@ -278,7 +278,7 @@ class CmdBatchCommands(MuxCommand): caller.msg(" {GBatchfile '%s' applied." % python_path) purge_processor(caller) def errback(e): - caller.msg(" {RError from processor: 'e'") + caller.msg(" {RError from processor: '%s'" % e) purge_processor(caller) utils.run_async(_PROCPOOL_BATCHCMD_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback) else: @@ -372,7 +372,7 @@ class CmdBatchCode(MuxCommand): caller.msg(" {GBatchfile '%s' applied." % python_path) purge_processor(caller) def errback(e): - caller.msg(" {RError from processor: 'e'") + caller.msg(" {RError from processor: '%s'" % e) purge_processor(caller) utils.run_async(_PROCPOOL_BATCHCODE_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback) else: diff --git a/src/objects/models.py b/src/objects/models.py index f68c4febee..af0659cf79 100644 --- a/src/objects/models.py +++ b/src/objects/models.py @@ -41,6 +41,10 @@ _GA = object.__getattribute__ _SA = object.__setattr__ _DA = object.__delattr__ +def clean_content_cache(obj): + "Clean obj's content cache" + _SA(obj, "_contents_cache", None) + #------------------------------------------------------------ # # ObjAttribute diff --git a/src/server/procpool.py b/src/server/procpool.py index cb49d0feea..8de90a024f 100644 --- a/src/server/procpool.py +++ b/src/server/procpool.py @@ -17,6 +17,7 @@ It can be customized via settings.PROCPOOL_* from twisted.protocols import amp from src.utils.ampoule.child import AMPChild from src.utils.utils import to_pickle, from_pickle +from src import PROC_MODIFIED_OBJS # handle global setups _LOGGER = None @@ -43,11 +44,11 @@ class ExecuteCode(amp.Command): arguments = [('source', amp.String()), ('environment', amp.String())] errors = [(Exception, 'EXCEPTION')] - response = [('response', amp.String())] + response = [('response', amp.String()), + ('recached', amp.String())] # Evennia multiprocess child process template - class ProcPoolChild(AMPChild): """ This is describing what happens on the subprocess side. @@ -58,7 +59,6 @@ class ProcPoolChild(AMPChild): executecode - a remote code execution environment """ - def executecode(self, source, environment): """ Remote code execution @@ -82,8 +82,9 @@ class ProcPoolChild(AMPChild): a list being return. The return value is pickled and thus allows for returning any pickleable data. - """ + + import ev, utils class Ret(object): "Helper class for holding returns from exec" @@ -114,14 +115,24 @@ class ProcPoolChild(AMPChild): from src.utils.logger import logger as _LOGGER _LOGGER.log_trace("Could not find remote object") available_vars.update(environment) + # try to execute with eval first try: ret = eval(source, {}, available_vars) - if ret != None: - return {'response':to_pickle(ret, emptypickle=False) or ""} + ret = to_pickle(ret, emptypickle=False) or "" except Exception: # use exec instead exec source in available_vars - - return {'response': _return.get_returns()} + ret = _return.get_returns() + # get the list of affected objects to recache + objs = list(set(PROC_MODIFIED_OBJS)) + # we need to include the locations too, to update their content caches + objs = objs + list(set([o.location for o in objs if hasattr(o, "location") and o.location])) + #print "objs:", objs + #print "to_pickle", to_pickle(objs, emptypickle=False, do_pickle=False) + to_recache = to_pickle(objs, emptypickle=False) or "" + # empty the list without loosing memory reference + PROC_MODIFIED_OBJS[:] = [] + return {'response': ret, + 'recached': to_recache} ExecuteCode.responder(executecode) diff --git a/src/typeclasses/models.py b/src/typeclasses/models.py index ae641f364c..54d8bea501 100644 --- a/src/typeclasses/models.py +++ b/src/typeclasses/models.py @@ -71,13 +71,16 @@ def _set_cache(obj, name, val): _SA(obj, "db_%s" % name, val) _GA(obj, "save")() _SA(obj, "_cached_db_%s" % name, val) - def _del_cache(obj, name): "On-model cache deleter" try: _DA(obj, "_cached_db_%s" % name) except AttributeError: pass +def _clean_cache(obj): + "On-model cache resetter" + [_DA(obj, cname) for cname in obj.__dict__.keys() if cname.startswith("_cached_db_")] + # this cache holds the attributes loaded on objects, one dictionary # of attributes per object. diff --git a/src/utils/idmapper/base.py b/src/utils/idmapper/base.py index 25c7b7785b..dc7479751c 100755 --- a/src/utils/idmapper/base.py +++ b/src/utils/idmapper/base.py @@ -7,12 +7,41 @@ leave caching unexpectedly (no use if WeakRefs). Also adds cache_size() for monitoring the size of the cache. """ +import os from django.db.models.base import Model, ModelBase from django.db.models.signals import post_save, pre_delete, \ post_syncdb from manager import SharedMemoryManager +# determine if our current pid is different from the server PID (i.e. +# if we are in a subprocess or not) +from src import PROC_MODIFIED_OBJS +def _get_pids(): + """ + Get the PID (Process ID) by trying to access + an PID file. + """ + from django.conf import settings + server_pidfile = os.path.join(settings.GAME_DIR, 'server.pid') + portal_pidfile = os.path.join(settings.GAME_DIR, 'portal.pid') + server_pid, portal_pid = None, None + if os.path.exists(server_pidfile): + f = open(server_pidfile, 'r') + server_pid = f.read() + f.close() + if os.path.exists(portal_pidfile): + f = open(portal_pidfile, 'r') + portal_pid = f.read() + f.close() + if server_pid and portal_pid: + return int(server_pid), int(portal_pid) + return None, None +_SELF_PID = os.getpid() +_SERVER_PID = None +_PORTAL_PID = None +_IS_SUBPROCESS = False + class SharedMemoryModelBase(ModelBase): # CL: upstream had a __new__ method that skipped ModelBase's __new__ if @@ -130,9 +159,15 @@ class SharedMemoryModel(Model): def save(cls, *args, **kwargs): "overload spot for saving" + global _SERVER_PID, _PORTAL_PID, _IS_SUBPROCESS, _SELF_PID + if not _SERVER_PID and not _PORTAL_PID: + _SERVER_PID, _PORTAL_PID = _get_pids() + _IS_SUBPROCESS = (_SERVER_PID and _PORTAL_PID) and (_SERVER_PID != _SELF_PID) and (_PORTAL_PID != _SELF_PID) + if _IS_SUBPROCESS: + #print "storing in PROC_MODIFIED_OBJS:", cls.db_key, cls.id + PROC_MODIFIED_OBJS.append(cls) super(SharedMemoryModel, cls).save(*args, **kwargs) - # Use a signal so we make sure to catch cascades. def flush_cache(**kwargs): for model in SharedMemoryModel.__subclasses__(): diff --git a/src/utils/utils.py b/src/utils/utils.py index 55d4be8e2b..929bf8189a 100644 --- a/src/utils/utils.py +++ b/src/utils/utils.py @@ -21,6 +21,11 @@ except ImportError: ENCODINGS = settings.ENCODINGS _LOGGER = None +_GA = object.__getattribute__ +_SA = object.__setattr__ +_DA = object.__delattr__ + + def is_iter(iterable): """ @@ -490,6 +495,8 @@ def to_pickle(data, do_pickle=True, emptypickle=True): return tuple(iter_db2id(val) for val in item) elif dtype == dict: return dict((key, iter_db2id(val)) for key, val in item.items()) + elif hasattr(item, '__iter__'): + return [iter_db2id(val) for val in item] else: item = _TO_DBOBJ(item) natural_key = _FROM_MODEL_MAP[hasattr(item, "id") and hasattr(item, '__class__') and item.__class__.__name__.lower()] @@ -501,10 +508,11 @@ def to_pickle(data, do_pickle=True, emptypickle=True): if do_pickle and not (not emptypickle and not data and data != False): return _DUMPS(data) return data - _TO_MODEL_MAP = None _IS_PACKED_DBOBJ = lambda o: type(o)== tuple and len(o)==3 and o[0]=='__packed_dbobj__' _TO_TYPECLASS = lambda o: (hasattr(o, 'typeclass') and o.typeclass) or o +from django.db import transaction +@transaction.autocommit def from_pickle(data, do_pickle=True): """ Converts back from a data stream prepared with to_pickle. This will @@ -529,17 +537,57 @@ def from_pickle(data, do_pickle=True): if dtype in (basestring, int, float): return item elif _IS_PACKED_DBOBJ(item): # this is a tuple and must be done before tuple-check - return _TO_TYPECLASS(_TO_MODEL_MAP[item[1]].objects.get(id=item[2])) + #print item[1], item[2] + if item[2]: #TODO Not sure why this could ever be None, but it can + return _TO_TYPECLASS(_TO_MODEL_MAP[item[1]].objects.get(id=item[2])) + return None elif dtype == tuple: return tuple(iter_id2db(val) for val in item) elif dtype == dict: return dict((key, iter_id2db(val)) for key, val in item.items()) + elif hasattr(item, '__iter__'): + return [iter_id2db(val) for val in item] return item if do_pickle: data = _LOADS(data) + # we have to make sure the database is in a safe state + # (this is relevant for multiprocess operation) + transaction.commit() # do recursive conversion return iter_id2db(data) + +_TYPECLASSMODELS = None +_OBJECTMODELS = None +def clean_object_caches(obj): + """ + Clean all object caches on the given object + """ + global _TYPECLASSMODELS, _OBJECTMODELS + if not _TYPECLASSMODELS: + from src.typeclasses import models as _TYPECLASSMODELS + if not _OBJECTMODELS: + from src.objects import models as _OBJECTMODELS + + #print "recaching:", obj + if not obj: + return + obj = hasattr(obj, "dbobj") and obj.dbobj or obj + # contents cache + try: + _SA(obj, "_contents_cache", None) + except AttributeError: + pass + + # on-object property cache + [_DA(obj, cname) for cname in obj.__dict__.keys() if cname.startswith("_cached_db_")] + try: + hashid = _GA(obj, "hashid") + hasid = obj.hashid + _TYPECLASSMODELS._ATTRIBUTE_CACHE[hashid] = {} + except AttributeError: + pass + _PPOOL = None _PCMD = None _DUMPS = None @@ -625,6 +673,9 @@ def run_async(to_execute, *args, **kwargs): def convert_return(f): def func(ret, *args, **kwargs): rval = ret["response"] and from_pickle(ret["response"]) + reca = ret["recached"] and from_pickle(ret["recached"]) + # recache all indicated objects + [clean_object_caches(obj) for obj in reca] if f: return f(rval, *args, **kwargs) else: return rval return func @@ -632,6 +683,7 @@ def run_async(to_execute, *args, **kwargs): def func(err, *args, **kwargs): err.trap(Exception) err = err.getErrorMessage() + print err if f: return f(err, *args, **kwargs) else: