From 575d7a86fa5e47bff9e413f68718a82829f08b51 Mon Sep 17 00:00:00 2001 From: Griatch Date: Sun, 2 Sep 2012 16:54:07 +0200 Subject: [PATCH] Made run_async also handle function calls through the ProcPool. --- src/commands/default/batchprocess.py | 10 ++++++++-- src/utils/utils.py | 28 ++++++++++++++++++---------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/commands/default/batchprocess.py b/src/commands/default/batchprocess.py index 2e248ed7de..db77ea6c49 100644 --- a/src/commands/default/batchprocess.py +++ b/src/commands/default/batchprocess.py @@ -277,7 +277,10 @@ class CmdBatchCommands(MuxCommand): def callback(r): caller.msg(" {GBatchfile '%s' applied." % python_path) purge_processor(caller) - utils.run_async(_PROCPOOL_BATCHCMD_SOURCE, commands=commands, caller=caller, at_return=callback) + def errback(e): + caller.msg(" {RError from processor: 'e'") + purge_processor(caller) + utils.run_async(_PROCPOOL_BATCHCMD_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback) else: # run in-process (might block) for inum in range(len(commands)): @@ -368,7 +371,10 @@ class CmdBatchCode(MuxCommand): def callback(r): caller.msg(" {GBatchfile '%s' applied." % python_path) purge_processor(caller) - utils.run_async(_PROCPOOL_BATCHCODE_SOURCE, commands=commands, caller=caller, at_return=callback) + def errback(e): + caller.msg(" {RError from processor: 'e'") + purge_processor(caller) + utils.run_async(_PROCPOOL_BATCHCODE_SOURCE, commands=commands, caller=caller, at_return=callback, at_err=errback) else: # un in-process (will block) for inum in range(len(commands)): diff --git a/src/utils/utils.py b/src/utils/utils.py index 1de2e52bc9..ca307f6de5 100644 --- a/src/utils/utils.py +++ b/src/utils/utils.py @@ -420,8 +420,6 @@ def inherits_from(obj, parent): parent_path = "%s.%s" % (parent.__class__.__module__, parent.__class__.__name__) return any(1 for obj_path in obj_paths if obj_path == parent_path) - - def format_table(table, extra_space=1): """ Takes a table of collumns: [[val,val,val,...], [val,val,val,...], ...] @@ -643,17 +641,27 @@ def run_async(to_execute, *args, **kwargs): callback_kwargs = kwargs.pop("at_return_kwargs", {}) errback_kwargs = kwargs.pop("at_err_kwargs", {}) - if isinstance(to_execute, basestring) and _PPOOL: - # run source code in process pool - cmdargs = {"source": to_str(to_execute)} - cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or "" - # defer to process pool - deferred = _PPOOL.doWork(_PCMD, **cmdargs) + if _PPOOL: + # process pool is running + if isinstance(to_execute, basestring): + # run source code in process pool + cmdargs = {"source": to_str(to_execute)} + cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or "" + # defer to process pool + deferred = _PPOOL.doWork(_PCMD, **cmdargs) + elif callable(to_execute): + # execute callable in process + callname = to_execute.__name__ + cmdargs = {"source": "_return(%s(*args,**kwargs))" % callname} + cmdargs["environment"] = to_pickle({callname:to_execute, "args":args, "kwargs":kwargs}) + deferred = _PPOOL.doWork(_PCMD, **cmdargs) + else: + raise RuntimeError("'%s' could not be handled by run_async" % to_execute) elif callable(to_execute): - # no process pool available, or we gave an explicit function and not code. Use threading. + # no process pool available, fall back to old deferToThread mechanism. deferred = threads.deferToThread(to_execute, *args, **kwargs) else: - # no appropriate input + # no appropriate input for this server setup raise RuntimeError("'%s' could not be handled by run_async" % to_execute) # attach callbacks