From 81dfeb6788a155083dd2983cff7408e03cbc3db8 Mon Sep 17 00:00:00 2001 From: Griatch Date: Mon, 3 Sep 2012 21:16:54 +0200 Subject: [PATCH] Added per-process timeout capability to run_async. Also changed contrib/evlang to use run_async --- contrib/evlang/evlang.py | 4 ++-- src/utils/ampoule/child.py | 4 ++++ src/utils/utils.py | 35 ++++++++++++++++++++++++----------- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/contrib/evlang/evlang.py b/contrib/evlang/evlang.py index 20eec82b5f..d5d068391e 100644 --- a/contrib/evlang/evlang.py +++ b/contrib/evlang/evlang.py @@ -806,7 +806,7 @@ def validate_code(codestring): raise LimitedExecCodeException(codestring, checker.errors) return True -def limited_exec(code, context = {}, timeout_secs = 2, retobj=None): +def limited_exec(code, context = {}, timeout_secs=2, retobj=None): """ Validate source code and make sure it contains no unauthorized expression/statements as configured via 'UNALLOWED_AST_NODES' and @@ -833,7 +833,7 @@ def limited_exec(code, context = {}, timeout_secs = 2, retobj=None): errback = lambda e: retobj.msg(e) # run code only after validation has completed if validate_context(context) and validate_code(code): - run_async(code, *context, at_return=callback, at_err=errback) + run_async(code, *context, proc_timeout=timeout_secs, at_return=callback, at_err=errback) else: # run code only after validation has completed if validate_context(context) and validate_code(code): diff --git a/src/utils/ampoule/child.py b/src/utils/ampoule/child.py index c9f2a08da9..96c55296e8 100644 --- a/src/utils/ampoule/child.py +++ b/src/utils/ampoule/child.py @@ -11,6 +11,7 @@ class AMPChild(amp.AMP): def connectionLost(self, reason): amp.AMP.connectionLost(self, reason) from twisted.internet import reactor + print "connectionLost called", reason try: reactor.stop() except error.ReactorNotRunning: @@ -28,6 +29,9 @@ class AMPChild(amp.AMP): import os os._exit(-1) + def processEnded(self, *args, **kwargs): + print "processEnded called", args, kwargs + def shutdown(self): """ This method is needed to shutdown the child gently without diff --git a/src/utils/utils.py b/src/utils/utils.py index 929bf8189a..01723f8207 100644 --- a/src/utils/utils.py +++ b/src/utils/utils.py @@ -508,6 +508,7 @@ def to_pickle(data, do_pickle=True, emptypickle=True): if do_pickle and not (not emptypickle and not data and data != False): return _DUMPS(data) return data + _TO_MODEL_MAP = None _IS_PACKED_DBOBJ = lambda o: type(o)== tuple and len(o)==3 and o[0]=='__packed_dbobj__' _TO_TYPECLASS = lambda o: (hasattr(o, 'typeclass') and o.typeclass) or o @@ -592,17 +593,22 @@ _PPOOL = None _PCMD = None _DUMPS = None _LOADS = None +_PROC_ERR = "A process has ended with a probable error condition: process ended by signal 9." def run_async(to_execute, *args, **kwargs): """ Runs a function or executes a code snippet asynchronously. Inputs: - to_execute (callable or string) - if a callable, this function - will be executed in a separate thread, using the - *args/**kwargs as input. - If a string, this string must be a source snippet. - This string will executed using the ProcPool is - enabled, if not this will raise a RunTimeError. + to_execute (callable) - if this is a callable, it will + be executed with *args and non-reserver *kwargs as + arguments. + The callable will be executed using ProcPool, or in + a thread if ProcPool is not available. + to_execute (string) - this is only available is ProcPool is + running. If a string, to_execute this will be treated as a code + snippet to execute asynchronously. *args are then not used + and non-reserverd *kwargs are used to define the execution + environment made available to the code. reserved kwargs: 'use_thread' (bool) - this only works with callables (not code). @@ -612,6 +618,9 @@ def run_async(to_execute, *args, **kwargs): to make sure to not get out of sync with the main process (such as accessing in-memory global properties) + 'proc_timeout' (int) - only used if ProcPool is available. Sets a + max time for execution. This alters the value set + by settings.PROCPOOL_TIMEOUT 'at_return' -should point to a callable with one argument. It will be called with the return value from to_execute. @@ -669,6 +678,8 @@ def run_async(to_execute, *args, **kwargs): if not _PCMD: from src.server.procpool import ExecuteCode as _PCMD + use_timeout = kwargs.pop("proc_timeout", None) + # helper converters for callbacks/errbacks def convert_return(f): def func(ret, *args, **kwargs): @@ -683,7 +694,8 @@ def run_async(to_execute, *args, **kwargs): def func(err, *args, **kwargs): err.trap(Exception) err = err.getErrorMessage() - print err + if use_timeout and err == _PROC_ERR: + err = "Process took longer than %ss and timed out." % use_timeout if f: return f(err, *args, **kwargs) else: @@ -694,9 +706,8 @@ def run_async(to_execute, *args, **kwargs): _LOGGER.log_errmsg(err) return func - use_thread = kwargs.pop("use_thread", False) - # handle special reserved input kwargs + use_thread = kwargs.pop("use_thread", False) callback = convert_return(kwargs.pop("at_return", None)) errback = convert_err(kwargs.pop("at_err", None)) callback_kwargs = kwargs.pop("at_return_kwargs", {}) @@ -706,14 +717,16 @@ def run_async(to_execute, *args, **kwargs): # process pool is running if isinstance(to_execute, basestring): # run source code in process pool - cmdargs = {"source": to_str(to_execute)} + cmdargs = {"_timeout":use_timeout} + cmdargs["source"] = to_str(to_execute) cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or "" # defer to process pool deferred = _PPOOL.doWork(_PCMD, **cmdargs) elif callable(to_execute): # execute callable in process callname = to_execute.__name__ - cmdargs = {"source": "_return(%s(*args,**kwargs))" % callname} + cmdargs = {"_timeout":use_timeout} + cmdargs["source"] = "_return(%s(*args,**kwargs))" % callname cmdargs["environment"] = to_pickle({callname:to_execute, "args":args, "kwargs":kwargs}) deferred = _PPOOL.doWork(_PCMD, **cmdargs) else: