Made run_async also handle function calls through the ProcPool.

This commit is contained in:
Griatch 2012-09-02 16:54:07 +02:00
parent e25ac1e46b
commit 575d7a86fa
2 changed files with 26 additions and 12 deletions

View file

@ -277,7 +277,10 @@ class CmdBatchCommands(MuxCommand):
def callback(r):
caller.msg(" {GBatchfile '%s' applied." % python_path)
purge_processor(caller)
utils.run_async(_PROCPOOL_BATCHCMD_SOURCE, commands=commands, caller=caller, at_return=callback)
def errback(e):
caller.msg(" {RError from processor: 'e'")
purge_processor(caller)
utils.run_async(_PROCPOOL_BATCHCMD_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback)
else:
# run in-process (might block)
for inum in range(len(commands)):
@ -368,7 +371,10 @@ class CmdBatchCode(MuxCommand):
def callback(r):
caller.msg(" {GBatchfile '%s' applied." % python_path)
purge_processor(caller)
utils.run_async(_PROCPOOL_BATCHCODE_SOURCE, commands=commands, caller=caller, at_return=callback)
def errback(e):
caller.msg(" {RError from processor: 'e'")
purge_processor(caller)
utils.run_async(_PROCPOOL_BATCHCODE_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback)
else:
# un in-process (will block)
for inum in range(len(commands)):

View file

@ -420,8 +420,6 @@ def inherits_from(obj, parent):
parent_path = "%s.%s" % (parent.__class__.__module__, parent.__class__.__name__)
return any(1 for obj_path in obj_paths if obj_path == parent_path)
def format_table(table, extra_space=1):
"""
Takes a table of collumns: [[val,val,val,...], [val,val,val,...], ...]
@ -643,17 +641,27 @@ def run_async(to_execute, *args, **kwargs):
callback_kwargs = kwargs.pop("at_return_kwargs", {})
errback_kwargs = kwargs.pop("at_err_kwargs", {})
if isinstance(to_execute, basestring) and _PPOOL:
# run source code in process pool
cmdargs = {"source": to_str(to_execute)}
cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or ""
# defer to process pool
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
if _PPOOL:
# process pool is running
if isinstance(to_execute, basestring):
# run source code in process pool
cmdargs = {"source": to_str(to_execute)}
cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or ""
# defer to process pool
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
elif callable(to_execute):
# execute callable in process
callname = to_execute.__name__
cmdargs = {"source": "_return(%s(*args,**kwargs))" % callname}
cmdargs["environment"] = to_pickle({callname:to_execute, "args":args, "kwargs":kwargs})
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
else:
raise RuntimeError("'%s' could not be handled by run_async" % to_execute)
elif callable(to_execute):
# no process pool available, or we gave an explicit function and not code. Use threading.
# no process pool available, fall back to old deferToThread mechanism.
deferred = threads.deferToThread(to_execute, *args, **kwargs)
else:
# no appropriate input
# no appropriate input for this server setup
raise RuntimeError("'%s' could not be handled by run_async" % to_execute)
# attach callbacks