Added per-process timeout capability to run_async. Also changed contrib/evlang to use run_async

This commit is contained in:
Griatch 2012-09-03 21:16:54 +02:00
parent 5c6ee44039
commit 81dfeb6788
3 changed files with 30 additions and 13 deletions

View file

@ -806,7 +806,7 @@ def validate_code(codestring):
raise LimitedExecCodeException(codestring, checker.errors)
return True
def limited_exec(code, context = {}, timeout_secs = 2, retobj=None):
def limited_exec(code, context = {}, timeout_secs=2, retobj=None):
"""
Validate source code and make sure it contains no unauthorized
expression/statements as configured via 'UNALLOWED_AST_NODES' and
@ -833,7 +833,7 @@ def limited_exec(code, context = {}, timeout_secs = 2, retobj=None):
errback = lambda e: retobj.msg(e)
# run code only after validation has completed
if validate_context(context) and validate_code(code):
run_async(code, *context, at_return=callback, at_err=errback)
run_async(code, *context, proc_timeout=timeout_secs, at_return=callback, at_err=errback)
else:
# run code only after validation has completed
if validate_context(context) and validate_code(code):

View file

@ -11,6 +11,7 @@ class AMPChild(amp.AMP):
def connectionLost(self, reason):
amp.AMP.connectionLost(self, reason)
from twisted.internet import reactor
print "connectionLost called", reason
try:
reactor.stop()
except error.ReactorNotRunning:
@ -28,6 +29,9 @@ class AMPChild(amp.AMP):
import os
os._exit(-1)
def processEnded(self, *args, **kwargs):
print "processEnded called", args, kwargs
def shutdown(self):
"""
This method is needed to shutdown the child gently without

View file

@ -508,6 +508,7 @@ def to_pickle(data, do_pickle=True, emptypickle=True):
if do_pickle and not (not emptypickle and not data and data != False):
return _DUMPS(data)
return data
_TO_MODEL_MAP = None
_IS_PACKED_DBOBJ = lambda o: type(o)== tuple and len(o)==3 and o[0]=='__packed_dbobj__'
_TO_TYPECLASS = lambda o: (hasattr(o, 'typeclass') and o.typeclass) or o
@ -592,17 +593,22 @@ _PPOOL = None
_PCMD = None
_DUMPS = None
_LOADS = None
_PROC_ERR = "A process has ended with a probable error condition: process ended by signal 9."
def run_async(to_execute, *args, **kwargs):
"""
Runs a function or executes a code snippet asynchronously.
Inputs:
to_execute (callable or string) - if a callable, this function
will be executed in a separate thread, using the
*args/**kwargs as input.
If a string, this string must be a source snippet.
This string will executed using the ProcPool is
enabled, if not this will raise a RunTimeError.
to_execute (callable) - if this is a callable, it will
be executed with *args and non-reserver *kwargs as
arguments.
The callable will be executed using ProcPool, or in
a thread if ProcPool is not available.
to_execute (string) - this is only available is ProcPool is
running. If a string, to_execute this will be treated as a code
snippet to execute asynchronously. *args are then not used
and non-reserverd *kwargs are used to define the execution
environment made available to the code.
reserved kwargs:
'use_thread' (bool) - this only works with callables (not code).
@ -612,6 +618,9 @@ def run_async(to_execute, *args, **kwargs):
to make sure to not get out of sync with the
main process (such as accessing in-memory global
properties)
'proc_timeout' (int) - only used if ProcPool is available. Sets a
max time for execution. This alters the value set
by settings.PROCPOOL_TIMEOUT
'at_return' -should point to a callable with one argument.
It will be called with the return value from
to_execute.
@ -669,6 +678,8 @@ def run_async(to_execute, *args, **kwargs):
if not _PCMD:
from src.server.procpool import ExecuteCode as _PCMD
use_timeout = kwargs.pop("proc_timeout", None)
# helper converters for callbacks/errbacks
def convert_return(f):
def func(ret, *args, **kwargs):
@ -683,7 +694,8 @@ def run_async(to_execute, *args, **kwargs):
def func(err, *args, **kwargs):
err.trap(Exception)
err = err.getErrorMessage()
print err
if use_timeout and err == _PROC_ERR:
err = "Process took longer than %ss and timed out." % use_timeout
if f:
return f(err, *args, **kwargs)
else:
@ -694,9 +706,8 @@ def run_async(to_execute, *args, **kwargs):
_LOGGER.log_errmsg(err)
return func
use_thread = kwargs.pop("use_thread", False)
# handle special reserved input kwargs
use_thread = kwargs.pop("use_thread", False)
callback = convert_return(kwargs.pop("at_return", None))
errback = convert_err(kwargs.pop("at_err", None))
callback_kwargs = kwargs.pop("at_return_kwargs", {})
@ -706,14 +717,16 @@ def run_async(to_execute, *args, **kwargs):
# process pool is running
if isinstance(to_execute, basestring):
# run source code in process pool
cmdargs = {"source": to_str(to_execute)}
cmdargs = {"_timeout":use_timeout}
cmdargs["source"] = to_str(to_execute)
cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or ""
# defer to process pool
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
elif callable(to_execute):
# execute callable in process
callname = to_execute.__name__
cmdargs = {"source": "_return(%s(*args,**kwargs))" % callname}
cmdargs = {"_timeout":use_timeout}
cmdargs["source"] = "_return(%s(*args,**kwargs))" % callname
cmdargs["environment"] = to_pickle({callname:to_execute, "args":args, "kwargs":kwargs})
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
else: