From 1a3e0481c7f09c255f92e7f5ca41ee9f562b5899 Mon Sep 17 00:00:00 2001 From: Griatch Date: Mon, 23 Feb 2015 13:47:08 +0100 Subject: [PATCH] Moved contrib/procpool out into a separate repository until it can be looked at. --- evennia/contrib/procpools/README.txt | 40 - evennia/contrib/procpools/__init__.py | 1 - evennia/contrib/procpools/ampoule/COPYING.txt | 23 - evennia/contrib/procpools/ampoule/EVENNIA.txt | 20 - evennia/contrib/procpools/ampoule/__init__.py | 4 - evennia/contrib/procpools/ampoule/child.py | 60 -- evennia/contrib/procpools/ampoule/commands.py | 11 - evennia/contrib/procpools/ampoule/iampoule.py | 24 - evennia/contrib/procpools/ampoule/main.py | 302 ------ evennia/contrib/procpools/ampoule/pool.py | 416 -------- evennia/contrib/procpools/ampoule/rpool.py | 65 -- evennia/contrib/procpools/ampoule/service.py | 69 -- .../procpools/ampoule/test/__init__.py | 0 .../procpools/ampoule/test/test_process.py | 892 ------------------ .../procpools/ampoule/test/test_proxy.py | 52 - evennia/contrib/procpools/ampoule/util.py | 46 - evennia/contrib/procpools/python_procpool.py | 326 ------- .../procpools/python_procpool_plugin.py | 115 --- 18 files changed, 2466 deletions(-) delete mode 100644 evennia/contrib/procpools/README.txt delete mode 100644 evennia/contrib/procpools/__init__.py delete mode 100644 evennia/contrib/procpools/ampoule/COPYING.txt delete mode 100644 evennia/contrib/procpools/ampoule/EVENNIA.txt delete mode 100644 evennia/contrib/procpools/ampoule/__init__.py delete mode 100644 evennia/contrib/procpools/ampoule/child.py delete mode 100644 evennia/contrib/procpools/ampoule/commands.py delete mode 100644 evennia/contrib/procpools/ampoule/iampoule.py delete mode 100644 evennia/contrib/procpools/ampoule/main.py delete mode 100644 evennia/contrib/procpools/ampoule/pool.py delete mode 100644 evennia/contrib/procpools/ampoule/rpool.py delete mode 100644 evennia/contrib/procpools/ampoule/service.py delete mode 100644 evennia/contrib/procpools/ampoule/test/__init__.py delete mode 100644 evennia/contrib/procpools/ampoule/test/test_process.py delete mode 100644 evennia/contrib/procpools/ampoule/test/test_proxy.py delete mode 100644 evennia/contrib/procpools/ampoule/util.py delete mode 100644 evennia/contrib/procpools/python_procpool.py delete mode 100644 evennia/contrib/procpools/python_procpool_plugin.py diff --git a/evennia/contrib/procpools/README.txt b/evennia/contrib/procpools/README.txt deleted file mode 100644 index 78a8d21b9a..0000000000 --- a/evennia/contrib/procpools/README.txt +++ /dev/null @@ -1,40 +0,0 @@ - -ProcPools ---------- - -This contrib defines a process pool subsystem for Evennia. - -A process pool handles a range of separately running processes that -can accept information from the main Evennia process. The pool dynamically -grows and shrinks depending on the need (and will queue requests if there -are no free slots available). - -The main use of this is to launch long-running, possibly blocking code -in a way that will not freeze up the rest of the server. So you could -execute time.sleep(10) on the process pool without anyone else on the -server noticing anything. - -This folder has the following contents: - -ampoule/ - this is a separate library managing the process pool. You - should not need to touch this. - -Python Procpool ---------------- -python_procpool.py - this implements a way to execute arbitrary python - code on the procpool. Import run_async() from this - module in order to use this functionality in-code - (this is a replacement to the in-process run_async - found in evennia.utils.utils). -python_procpool_plugin.py - this is a plugin module for the python - procpool, to start and add it to the server. Adding it - is a single line in your settings file - see the header - of the file for more info. - - - -Adding other Procpools ----------------------- -To add other types of procpools (such as for executing other remote languages - than Python), you can pretty much mimic the layout of python_procpool.py - and python_procpool_plugin.py. diff --git a/evennia/contrib/procpools/__init__.py b/evennia/contrib/procpools/__init__.py deleted file mode 100644 index 40a96afc6f..0000000000 --- a/evennia/contrib/procpools/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# -*- coding: utf-8 -*- diff --git a/evennia/contrib/procpools/ampoule/COPYING.txt b/evennia/contrib/procpools/ampoule/COPYING.txt deleted file mode 100644 index 89d86c8ac4..0000000000 --- a/evennia/contrib/procpools/ampoule/COPYING.txt +++ /dev/null @@ -1,23 +0,0 @@ -Copyright (c) 2008 -Valentino Volonghi -Matthew Lefkowitz -Copyright (c) 2009 Canonical Ltd. - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/evennia/contrib/procpools/ampoule/EVENNIA.txt b/evennia/contrib/procpools/ampoule/EVENNIA.txt deleted file mode 100644 index 1eda8946c6..0000000000 --- a/evennia/contrib/procpools/ampoule/EVENNIA.txt +++ /dev/null @@ -1,20 +0,0 @@ - -AMPOULE -------- - -https://launchpad.net/ampoule - -AMPoule is a process management system using Twisted spawnProcess -functionality. It uses AMP to pipe messages to a process Pool that it -manages. The service is called ProcPool in Evennia settings. - -AMPoule's very good, but unfortunately the source is very poorly -documented. Hence the source in this directory does not comform to -Evennia's normally rigid standards - for now we try to edit it as -little as possible so as to make it easy to apply upstream updates -down the line. - -Changes made by Evennia are minor - it's mainly limiting spam to the -log and an added ability to turn this on/off through settings. Most -Evennia related code are found in evennia/server/procpool.py and -evennia/server/server.py. diff --git a/evennia/contrib/procpools/ampoule/__init__.py b/evennia/contrib/procpools/ampoule/__init__.py deleted file mode 100644 index ed99c5ef2f..0000000000 --- a/evennia/contrib/procpools/ampoule/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from pool import deferToAMPProcess, pp -from commands import Shutdown, Ping, Echo -from child import AMPChild -__version__ = "0.2.1" diff --git a/evennia/contrib/procpools/ampoule/child.py b/evennia/contrib/procpools/ampoule/child.py deleted file mode 100644 index 2dec79a188..0000000000 --- a/evennia/contrib/procpools/ampoule/child.py +++ /dev/null @@ -1,60 +0,0 @@ -""" -This defines the the parent for all subprocess children. - -Inherit from this to define a new type of subprocess. - -""" - -from twisted.internet import error -from twisted.protocols import amp -from evennia.contrib.procpools.ampoule.commands import Echo, Shutdown, Ping - - -class AMPChild(amp.AMP): - def __init__(self): - super(AMPChild, self).__init__(self) - self.shutdown = False - - def connectionLost(self, reason): - amp.AMP.connectionLost(self, reason) - from twisted.internet import reactor - try: - reactor.stop() - except error.ReactorNotRunning: - # woa, this means that something bad happened, - # most probably we received a SIGINT. Now this is only - # a problem when you use Ctrl+C to stop the main process - # because it would send the SIGINT to child processes too. - # In all other cases receiving a SIGINT here would be an - # error condition and correctly restarted. maybe we should - # use sigprocmask? - pass - if not self.shutdown: - # if the shutdown wasn't explicit we presume that it's an - # error condition and thus we return a -1 error returncode. - import os - os._exit(-1) - - def shutdown(self): - """ - This method is needed to shutdown the child gently without - generating an exception. - """ - #log.msg("Shutdown message received, goodbye.") - self.shutdown = True - return {} - Shutdown.responder(shutdown) - - def ping(self): - """ - Ping the child and return an answer - """ - return {'response': "pong"} - Ping.responder(ping) - - def echo(self, data): - """ - Echo some data through the child. - """ - return {'response': data} - Echo.responder(echo) diff --git a/evennia/contrib/procpools/ampoule/commands.py b/evennia/contrib/procpools/ampoule/commands.py deleted file mode 100644 index 2ac61e7d04..0000000000 --- a/evennia/contrib/procpools/ampoule/commands.py +++ /dev/null @@ -1,11 +0,0 @@ -from twisted.protocols import amp - -class Shutdown(amp.Command): - responseType = amp.QuitBox - -class Ping(amp.Command): - response = [('response', amp.String())] - -class Echo(amp.Command): - arguments = [('data', amp.String())] - response = [('response', amp.String())] diff --git a/evennia/contrib/procpools/ampoule/iampoule.py b/evennia/contrib/procpools/ampoule/iampoule.py deleted file mode 100644 index caa3e0142a..0000000000 --- a/evennia/contrib/procpools/ampoule/iampoule.py +++ /dev/null @@ -1,24 +0,0 @@ -from zope.interface import Interface - -class IStarter(Interface): - def startAMPProcess(ampChild, ampParent=None): - """ - @param ampChild: The AMP protocol spoken by the created child. - @type ampChild: L{twisted.protocols.amp.AMP} - - @param ampParent: The AMP protocol spoken by the parent. - @type ampParent: L{twisted.protocols.amp.AMP} - """ - - def startPythonProcess(prot, *args): - """ - @param prot: a L{protocol.ProcessProtocol} subclass - @type prot: L{protocol.ProcessProtocol} - - @param args: a tuple of arguments that will be passed to the - child process. - - @return: a tuple of the child process and the deferred finished. - finished triggers when the subprocess dies for any reason. - """ - diff --git a/evennia/contrib/procpools/ampoule/main.py b/evennia/contrib/procpools/ampoule/main.py deleted file mode 100644 index ead8d15270..0000000000 --- a/evennia/contrib/procpools/ampoule/main.py +++ /dev/null @@ -1,302 +0,0 @@ -import os -import sys -import imp -import itertools - -from zope.interface import implements - -from twisted.internet import reactor, protocol, defer, error -from twisted.python import log, util, reflect -from twisted.protocols import amp -from twisted.python import runtime -from twisted.python.compat import set - -from evennia.contrib.procpools.ampoule import iampoule - -gen = itertools.count() - -if runtime.platform.isWindows(): - IS_WINDOWS = True - TO_CHILD = 0 - FROM_CHILD = 1 -else: - IS_WINDOWS = False - TO_CHILD = 3 - FROM_CHILD = 4 - -class AMPConnector(protocol.ProcessProtocol): - """ - A L{ProcessProtocol} subclass that can understand and speak AMP. - - @ivar amp: the children AMP process - @type amp: L{amp.AMP} - - @ivar finished: a deferred triggered when the process dies. - @type finished: L{defer.Deferred} - - @ivar name: Unique name for the connector, much like a pid. - @type name: int - """ - - def __init__(self, proto, name=None): - """ - @param proto: An instance or subclass of L{amp.AMP} - @type proto: L{amp.AMP} - - @param name: optional name of the subprocess. - @type name: int - """ - self.finished = defer.Deferred() - self.amp = proto - self.name = name - if name is None: - self.name = gen.next() - - def signalProcess(self, signalID): - """ - Send the signal signalID to the child process - - @param signalID: The signal ID that you want to send to the - corresponding child - @type signalID: C{str} or C{int} - """ - return self.transport.signalProcess(signalID) - - def connectionMade(self): - #log.msg("Subprocess %s started." % (self.name,)) - self.amp.makeConnection(self) - - # Transport - disconnecting = False - - def write(self, data): - if IS_WINDOWS: - self.transport.write(data) - else: - self.transport.writeToChild(TO_CHILD, data) - - def loseConnection(self): - self.transport.closeChildFD(TO_CHILD) - self.transport.closeChildFD(FROM_CHILD) - self.transport.loseConnection() - - def getPeer(self): - return ('subprocess %i' % self.name,) - - def getHost(self): - return ('Evennia Server',) - - def childDataReceived(self, childFD, data): - if childFD == FROM_CHILD: - self.amp.dataReceived(data) - return - self.errReceived(data) - - def errReceived(self, data): - for line in data.strip().splitlines(): - log.msg("FROM %s: %s" % (self.name, line)) - - def processEnded(self, status): - #log.msg("Process: %s ended" % (self.name,)) - self.amp.connectionLost(status) - if status.check(error.ProcessDone): - self.finished.callback('') - return - self.finished.errback(status) - -BOOTSTRAP = """\ -import sys - -def main(reactor, ampChildPath): - from twisted.application import reactors - reactors.installReactor(reactor) - - from twisted.python import log - %s - - from twisted.internet import reactor, stdio - from twisted.python import reflect, runtime - - ampChild = reflect.namedAny(ampChildPath) - ampChildInstance = ampChild(*sys.argv[1:-2]) - if runtime.platform.isWindows(): - stdio.StandardIO(ampChildInstance) - else: - stdio.StandardIO(ampChildInstance, %s, %s) - enter = getattr(ampChildInstance, '__enter__', None) - if enter is not None: - enter() - try: - reactor.run() - except: - if enter is not None: - info = sys.exc_info() - if not ampChildInstance.__exit__(*info): - raise - else: - raise - else: - if enter is not None: - ampChildInstance.__exit__(None, None, None) - -main(sys.argv[-2], sys.argv[-1]) -""" % ('%s', TO_CHILD, FROM_CHILD) - -# in the first spot above, either insert an empty string or -# 'log.startLogging(sys.stderr)' -# to start logging - -class ProcessStarter(object): - - implements(iampoule.IStarter) - - connectorFactory = AMPConnector - def __init__(self, bootstrap=BOOTSTRAP, args=(), env={}, - path=None, uid=None, gid=None, usePTY=0, - packages=(), childReactor="select"): - """ - @param bootstrap: Startup code for the child process - @type bootstrap: C{str} - - @param args: Arguments that should be supplied to every child - created. - @type args: C{tuple} of C{str} - - @param env: Environment variables that should be present in the - child environment - @type env: C{dict} - - @param path: Path in which to run the child - @type path: C{str} - - @param uid: if defined, the uid used to run the new process. - @type uid: C{int} - - @param gid: if defined, the gid used to run the new process. - @type gid: C{int} - - @param usePTY: Should the child processes use PTY processes - @type usePTY: 0 or 1 - - @param packages: A tuple of packages that should be guaranteed - to be importable in the child processes - @type packages: C{tuple} of C{str} - - @param childReactor: a string that sets the reactor for child - processes - @type childReactor: C{str} - """ - self.bootstrap = bootstrap - self.args = args - self.env = env - self.path = path - self.uid = uid - self.gid = gid - self.usePTY = usePTY - self.packages = ("evennia.contrib.procpools.ampoule",) + packages - self.packages = packages - self.childReactor = childReactor - - def __repr__(self): - """ - Represent the ProcessStarter with a string. - """ - return """ProcessStarter(bootstrap=%r, - args=%r, - env=%r, - path=%r, - uid=%r, - gid=%r, - usePTY=%r, - packages=%r, - childReactor=%r)""" % (self.bootstrap, - self.args, - self.env, - self.path, - self.uid, - self.gid, - self.usePTY, - self.packages, - self.childReactor) - - def _checkRoundTrip(self, obj): - """ - Make sure that an object will properly round-trip through 'qual' and - 'namedAny'. - - Raise a L{RuntimeError} if they aren't. - """ - tripped = reflect.namedAny(reflect.qual(obj)) - if tripped is not obj: - raise RuntimeError("importing %r is not the same as %r" % - (reflect.qual(obj), obj)) - - def startAMPProcess(self, ampChild, ampParent=None, ampChildArgs=()): - """ - @param ampChild: a L{ampoule.child.AMPChild} subclass. - @type ampChild: L{ampoule.child.AMPChild} - - @param ampParent: an L{amp.AMP} subclass that implements the parent - protocol for this process pool - @type ampParent: L{amp.AMP} - """ - self._checkRoundTrip(ampChild) - fullPath = reflect.qual(ampChild) - if ampParent is None: - ampParent = amp.AMP - prot = self.connectorFactory(ampParent()) - args = ampChildArgs + (self.childReactor, fullPath) - return self.startPythonProcess(prot, *args) - - - def startPythonProcess(self, prot, *args): - """ - @param prot: a L{protocol.ProcessProtocol} subclass - @type prot: L{protocol.ProcessProtocol} - - @param args: a tuple of arguments that will be added after the - ones in L{self.args} to start the child process. - - @return: a tuple of the child process and the deferred finished. - finished triggers when the subprocess dies for any reason. - """ - spawnProcess(prot, self.bootstrap, self.args+args, env=self.env, - path=self.path, uid=self.uid, gid=self.gid, - usePTY=self.usePTY, packages=self.packages) - - # XXX: we could wait for startup here, but ... is there really any - # reason to? the pipe should be ready for writing. The subprocess - # might not start up properly, but then, a subprocess might shut down - # at any point too. So we just return amp and have this piece to be - # synchronous. - return prot.amp, prot.finished - -def spawnProcess(processProtocol, bootstrap, args=(), env={}, - path=None, uid=None, gid=None, usePTY=0, - packages=()): - env = env.copy() - - pythonpath = [] - for pkg in packages: - pkg_path, name = os.path.split(pkg) - p = os.path.split(imp.find_module(name, [pkg_path] if pkg_path else None)[1])[0] - if p.startswith(os.path.join(sys.prefix, 'lib')): - continue - pythonpath.append(p) - pythonpath = list(set(pythonpath)) - pythonpath.extend(env.get('PYTHONPATH', '').split(os.pathsep)) - env['PYTHONPATH'] = os.pathsep.join(pythonpath) - args = (sys.executable, '-c', bootstrap) + args - # childFDs variable is needed because sometimes child processes - # misbehave and use stdout to output stuff that should really go - # to stderr. Of course child process might even use the wrong FDs - # that I'm using here, 3 and 4, so we are going to fix all these - # issues when I add support for the configuration object that can - # fix this stuff in a more configurable way. - if IS_WINDOWS: - return reactor.spawnProcess(processProtocol, sys.executable, args, - env, path, uid, gid, usePTY) - else: - return reactor.spawnProcess(processProtocol, sys.executable, args, - env, path, uid, gid, usePTY, - childFDs={0:"w", 1:"r", 2:"r", 3:"w", 4:"r"}) diff --git a/evennia/contrib/procpools/ampoule/pool.py b/evennia/contrib/procpools/ampoule/pool.py deleted file mode 100644 index e5b0762506..0000000000 --- a/evennia/contrib/procpools/ampoule/pool.py +++ /dev/null @@ -1,416 +0,0 @@ -import time -import random -import heapq -import itertools -import signal -choice = random.choice -now = time.time -count = itertools.count().next -pop = heapq.heappop - -from twisted.internet import defer, task, error -from twisted.python import log, failure - -from evennia.contrib.procpools.ampoule import commands, main - -try: - DIE = signal.SIGKILL -except AttributeError: - # Windows doesn't have SIGKILL, let's just use SIGTERM then - DIE = signal.SIGTERM - - -class ProcessPool(object): - """ - This class generalizes the functionality of a pool of - processes to which work can be dispatched. - - @ivar finished: Boolean flag, L{True} when the pool is finished. - - @ivar started: Boolean flag, L{True} when the pool is started. - - @ivar name: Optional name for the process pool - - @ivar min: Minimum number of subprocesses to set up - - @ivar max: Maximum number of subprocesses to set up - - @ivar maxIdle: Maximum number of seconds of indleness in a child - - @ivar starter: A process starter instance that provides - L{iampoule.IStarter}. - - @ivar recycleAfter: Maximum number of calls before restarting a - subprocess, 0 to not recycle. - - @ivar ampChild: The child AMP protocol subclass with the commands - that the child should implement. - - @ivar ampParent: The parent AMP protocol subclass with the commands - that the parent should implement. - - @ivar timeout: The general timeout (in seconds) for every child - process call. - """ - - finished = False - started = False - name = None - - def __init__(self, ampChild=None, ampParent=None, min=5, max=20, - name=None, maxIdle=20, recycleAfter=500, starter=None, - timeout=None, timeout_signal=DIE, ampChildArgs=()): - self.starter = starter - self.ampChildArgs = tuple(ampChildArgs) - if starter is None: - self.starter = main.ProcessStarter(packages=("twisted",)) - self.ampParent = ampParent - self.ampChild = ampChild - if ampChild is None: - from evennia.contrib.procpools.ampoule.child import AMPChild - self.ampChild = AMPChild - self.min = min - self.max = max - self.name = name - self.maxIdle = maxIdle - self.recycleAfter = recycleAfter - self.timeout = timeout - self.timeout_signal = timeout_signal - self._queue = [] - - self.processes = set() - self.ready = set() - self.busy = set() - self._finishCallbacks = {} - self._lastUsage = {} - self._calls = {} - self.looping = task.LoopingCall(self._pruneProcesses) - self.looping.start(maxIdle, now=False) - - def start(self, ampChild=None): - """ - Starts the ProcessPool with a given child protocol. - - @param ampChild: a L{ampoule.child.AMPChild} subclass. - @type ampChild: L{ampoule.child.AMPChild} subclass - """ - if ampChild is not None and not self.started: - self.ampChild = ampChild - self.finished = False - self.started = True - return self.adjustPoolSize() - - def _pruneProcesses(self): - """ - Remove idle processes from the pool. - """ - n = now() - d = [] - for child, lastUse in self._lastUsage.iteritems(): - if len(self.processes) > self.min and (n - lastUse) > self.maxIdle: - # we are setting lastUse when processing finishes, it - # might be processing right now - if child not in self.busy: - # we need to remove this child from the ready set - # and the processes set because otherwise it might - # get calls from doWork - self.ready.discard(child) - self.processes.discard(child) - d.append(self.stopAWorker(child)) - return defer.DeferredList(d) - - def _pruneProcess(self, child): - """ - Remove every trace of the process from this instance. - """ - self.processes.discard(child) - self.ready.discard(child) - self.busy.discard(child) - self._lastUsage.pop(child, None) - self._calls.pop(child, None) - self._finishCallbacks.pop(child, None) - - def _addProcess(self, child, finished): - """ - Adds the newly created child process to the pool. - """ - def restart(child, reason): - #log.msg("FATAL: Restarting after %s" % (reason,)) - self._pruneProcess(child) - return self.startAWorker() - - def dieGently(data, child): - #log.msg("STOPPING: '%s'" % (data,)) - self._pruneProcess(child) - - self.processes.add(child) - self.ready.add(child) - finished.addCallback(dieGently, child - ).addErrback(lambda reason: restart(child, reason)) - self._finishCallbacks[child] = finished - self._lastUsage[child] = now() - self._calls[child] = 0 - self._catchUp() - - def _catchUp(self): - """ - If there are queued items in the list then run them. - """ - if self._queue: - _, (d, command, kwargs) = pop(self._queue) - self._cb_doWork(command, **kwargs).chainDeferred(d) - - def _handleTimeout(self, child): - """ - One of the children went timeout, we need to deal with it - - @param child: The child process - @type child: L{child.AMPChild} - """ - try: - child.transport.signalProcess(self.timeout_signal) - except error.ProcessExitedAlready: - # don't do anything then... we are too late - # or we were too early to call - pass - - def startAWorker(self): - """ - Start a worker and set it up in the system. - """ - if self.finished: - # this is a race condition: basically if we call self.stop() - # while a process is being recycled what happens is that the - # process will be created anyway. By putting a check for - # self.finished here we make sure that in no way we are creating - # processes when the pool is stopped. - # The race condition comes from the fact that: - # stopAWorker() is asynchronous while stop() is synchronous. - # so if you call: - # pp.stopAWorker(child).addCallback(lambda _: pp.startAWorker()) - # pp.stop() - # You might end up with a dirty reactor due to the stop() - # returning before the new process is created. - return - startAMPProcess = self.starter.startAMPProcess - child, finished = startAMPProcess(self.ampChild, - ampParent=self.ampParent, - ampChildArgs=self.ampChildArgs) - return self._addProcess(child, finished) - - def _cb_doWork(self, command, _timeout=None, _deadline=None, - **kwargs): - """ - Go and call the command. - - @param command: The L{amp.Command} to be executed in the child - @type command: L{amp.Command} - - @param _d: The deferred for the calling code. - @type _d: L{defer.Deferred} - - @param _timeout: The timeout for this call only - @type _timeout: C{int} - @param _deadline: The deadline for this call only - @type _deadline: C{int} - """ - timeoutCall = None - deadlineCall = None - - def _returned(result, child, is_error=False): - def cancelCall(call): - if call is not None and call.active(): - call.cancel() - cancelCall(timeoutCall) - cancelCall(deadlineCall) - self.busy.discard(child) - if not die: - # we are not marked to be removed, so add us back to - # the ready set and let's see if there's some catching - # up to do - self.ready.add(child) - self._catchUp() - else: - # We should die and we do, then we start a new worker - # to pick up stuff from the queue otherwise we end up - # without workers and the queue will remain there. - self.stopAWorker(child).addCallback(lambda _: self.startAWorker()) - self._lastUsage[child] = now() - # we can't do recycling here because it's too late and - # the process might have received tons of calls already - # which would make it run more calls than what is - # configured to do. - return result - - die = False - child = self.ready.pop() - self.busy.add(child) - self._calls[child] += 1 - - # Let's see if this call goes over the recycling barrier - if self.recycleAfter and self._calls[child] >= self.recycleAfter: - # it does so mark this child, using a closure, to be - # removed at the end of the call. - die = True - - # If the command doesn't require a response then callRemote - # returns nothing, so we prepare for that too. - # We also need to guard against timeout errors for child - # and local timeout parameter overrides the global one - if _timeout == 0: - timeout = _timeout - else: - timeout = _timeout or self.timeout - - if timeout is not None: - from twisted.internet import reactor - timeoutCall = reactor.callLater(timeout, self._handleTimeout, child) - - if _deadline is not None: - from twisted.internet import reactor - delay = max(0, _deadline - reactor.seconds()) - deadlineCall = reactor.callLater(delay, self._handleTimeout, - child) - - return defer.maybeDeferred(child.callRemote, command, **kwargs - ).addCallback(_returned, child - ).addErrback(_returned, child, is_error=True) - - def callRemote(self, *args, **kwargs): - """ - Proxy call to keep the API homogeneous across twisted's RPCs - """ - return self.doWork(*args, **kwargs) - - def doWork(self, command, **kwargs): - """ - Sends the command to one child. - - @param command: an L{amp.Command} type object. - @type command: L{amp.Command} - - @param kwargs: dictionary containing the arguments for the command. - """ - if self.ready: # there are unused processes, let's use them - return self._cb_doWork(command, **kwargs) - else: - if len(self.processes) < self.max: - # no unused but we can start some new ones - # since startAWorker is synchronous we won't have a - # race condition here in case of multiple calls to - # doWork, so we will end up in the else clause in case - # of such calls: - # Process pool with min=1, max=1, recycle_after=1 - # [call(Command) for x in xrange(BIG_NUMBER)] - self.startAWorker() - return self._cb_doWork(command, **kwargs) - else: - # No one is free... just queue up and wait for a process - # to start and pick up the first item in the queue. - d = defer.Deferred() - self._queue.append((count(), (d, command, kwargs))) - return d - - def stopAWorker(self, child=None): - """ - Gently stop a child so that it's not restarted anymore - - @param command: an L{ampoule.child.AmpChild} type object. - @type command: L{ampoule.child.AmpChild} or None - - """ - if child is None: - if self.ready: - child = self.ready.pop() - else: - child = choice(list(self.processes)) - child.callRemote(commands.Shutdown - # This is needed for timeout handling, the reason is pretty hard - # to explain but I'll try to: - # There's another small race condition in the system. If the - # child process is shut down by a signal and you try to stop - # the process pool immediately afterwards, like tests would do, - # the child AMP object would still be in the system and trying - # to call the command Shutdown on it would result in the same - # errback that we got originally, for this reason we need to - # trap it now so that it doesn't raise by not being handled. - # Does this even make sense to you? - ).addErrback(lambda reason: reason.trap(error.ProcessTerminated)) - return self._finishCallbacks[child] - - def _startSomeWorkers(self): - """ - Start a bunch of workers until we reach the max number of them. - """ - if len(self.processes) < self.max: - self.startAWorker() - - def adjustPoolSize(self, min=None, max=None): - """ - Change the pool size to be at least min and less than max, - useful when you change the values of max and min in the instance - and you want the pool to adapt to them. - """ - if min is None: - min = self.min - if max is None: - max = self.max - - assert min >= 0, 'minimum is negative' - assert min <= max, 'minimum is greater than maximum' - - self.min = min - self.max = max - - l = [] - if self.started: - - for i in xrange(len(self.processes)-self.max): - l.append(self.stopAWorker()) - while len(self.processes) < self.min: - self.startAWorker() - - return defer.DeferredList(l)#.addCallback(lambda _: self.dumpStats()) - - def stop(self): - """ - Stops the process protocol. - """ - self.finished = True - l = [self.stopAWorker(process) for process in self.processes] - def _cb(_): - if self.looping.running: - self.looping.stop() - - return defer.DeferredList(l).addCallback(_cb) - - def dumpStats(self): - log.msg("ProcessPool stats:") - log.msg('\tworkers: %s' % len(self.processes)) - log.msg('\ttimeout: %s' % (self.timeout)) - log.msg('\tparent: %r' % (self.ampParent,)) - log.msg('\tchild: %r' % (self.ampChild,)) - log.msg('\tmax idle: %r' % (self.maxIdle,)) - log.msg('\trecycle after: %r' % (self.recycleAfter,)) - log.msg('\tProcessStarter:') - log.msg('\t\t%r' % (self.starter,)) - -pp = None - - -def deferToAMPProcess(command, **kwargs): - """ - Helper function that sends a command to the default process pool - and returns a deferred that fires when the result of the - subprocess computation is ready. - - @param command: an L{amp.Command} subclass - @param kwargs: dictionary containing the arguments for the command. - - @return: a L{defer.Deferred} with the data from the subprocess. - """ - global pp - if pp is None: - pp = ProcessPool() - return pp.start().addCallback(lambda _: pp.doWork(command, **kwargs)) - return pp.doWork(command, **kwargs) diff --git a/evennia/contrib/procpools/ampoule/rpool.py b/evennia/contrib/procpools/ampoule/rpool.py deleted file mode 100644 index 642b988352..0000000000 --- a/evennia/contrib/procpools/ampoule/rpool.py +++ /dev/null @@ -1,65 +0,0 @@ -""" -This module implements a remote pool to use with AMP. -""" - -from twisted.protocols import amp - -class AMPProxy(amp.AMP): - """ - A Proxy AMP protocol that forwards calls to a wrapped - callRemote-like callable. - """ - def __init__(self, wrapped, child): - """ - @param wrapped: A callRemote-like callable that takes an - L{amp.Command} as first argument and other - optional keyword arguments afterwards. - @type wrapped: L{callable}. - - @param child: The protocol class of the process pool children. - Used to forward only the methods that are actually - understood correctly by them. - @type child: L{amp.AMP} - """ - amp.AMP.__init__(self) - self.wrapped = wrapped - self.child = child - - localCd = set(self._commandDispatch.keys()) - childCd = set(self.child._commandDispatch.keys()) - assert localCd.intersection(childCd) == set(["StartTLS"]), \ - "Illegal method overriding in Proxy" - - def locateResponder(self, name): - """ - This is a custom locator to forward calls to the children - processes while keeping the ProcessPool a transparent MITM. - - This way of working has a few limitations, the first of which - is the fact that children won't be able to take advantage of - any dynamic locator except for the default L{CommandLocator} - that is based on the _commandDispatch attribute added by the - metaclass. This limitation might be lifted in the future. - """ - if name == "StartTLS": - # This is a special case where the proxy takes precedence - return amp.AMP.locateResponder(self, "StartTLS") - - # Get the dict of commands from the child AMP implementation. - cd = self.child._commandDispatch - if name in cd: - # If the command is there, then we forward stuff to it. - commandClass, _responderFunc = cd[name] - # We need to wrap the doWork function because the wrapping - # call doesn't pass the command as first argument since it - # thinks that we are the actual receivers and callable is - # already the responder while it isn't. - doWork = lambda **kw: self.wrapped(commandClass, **kw) - # Now let's call the right function and wrap the result - # dictionary. - return self._wrapWithSerialization(doWork, commandClass) - # of course if the name of the command is not in the child it - # means that it might be in this class, so fallback to the - # default behavior of this module. - return amp.AMP.locateResponder(self, name) - diff --git a/evennia/contrib/procpools/ampoule/service.py b/evennia/contrib/procpools/ampoule/service.py deleted file mode 100644 index 166cd36c71..0000000000 --- a/evennia/contrib/procpools/ampoule/service.py +++ /dev/null @@ -1,69 +0,0 @@ -import os - -from twisted.application import service -from twisted.internet.protocol import ServerFactory - -def makeService(options): - """ - Create the service for the application - """ - ms = service.MultiService() - - from evennia.contrib.procpools.ampoule.pool import ProcessPool - from evennia.contrib.procpools.ampoule.main import ProcessStarter - name = options['name'] - ampport = options['ampport'] - ampinterface = options['ampinterface'] - child = options['child'] - parent = options['parent'] - min = options['min'] - max = options['max'] - maxIdle = options['max_idle'] - recycle = options['recycle'] - childReactor = options['reactor'] - timeout = options['timeout'] - - starter = ProcessStarter(packages=("twisted",), childReactor=childReactor) - pp = ProcessPool(child, parent, min, max, name, maxIdle, recycle, starter, timeout) - svc = AMPouleService(pp, child, ampport, ampinterface) - svc.setServiceParent(ms) - - return ms - -class AMPouleService(service.Service): - def __init__(self, pool, child, port, interface): - self.pool = pool - self.port = port - self.child = child - self.interface = interface - self.server = None - - def startService(self): - """ - Before reactor.run() is called we setup the system. - """ - service.Service.startService(self) - from evennia.contrib.procpools.ampoule import rpool - from twisted.internet import reactor - - try: - factory = ServerFactory() - factory.protocol = lambda: rpool.AMPProxy(wrapped=self.pool.doWork, - child=self.child) - self.server = reactor.listenTCP(self.port, - factory, - interface=self.interface) - # this is synchronous when it's the startup, even though - # it returns a deferred. But we need to run it after the - # first cycle in order to wait for signal handlers to be - # installed. - reactor.callLater(0, self.pool.start) - except: - import traceback - print traceback.format_exc() - - def stopService(self): - service.Service.stopService(self) - if self.server is not None: - self.server.stopListening() - return self.pool.stop() diff --git a/evennia/contrib/procpools/ampoule/test/__init__.py b/evennia/contrib/procpools/ampoule/test/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/evennia/contrib/procpools/ampoule/test/test_process.py b/evennia/contrib/procpools/ampoule/test/test_process.py deleted file mode 100644 index 23474fd893..0000000000 --- a/evennia/contrib/procpools/ampoule/test/test_process.py +++ /dev/null @@ -1,892 +0,0 @@ - -from signal import SIGHUP -import os -import os.path -from cStringIO import StringIO as sio -import tempfile - -from twisted.internet import error, defer, reactor -from twisted.python import failure -from twisted.trial import unittest -from twisted.protocols import amp -from evennia.contrib.procpools.ampoule import main, child, commands, pool - - -class ShouldntHaveBeenCalled(Exception): - pass - - -def _raise(_): - raise ShouldntHaveBeenCalled(_) - - -class _FakeT(object): - closeStdinCalled = False - def __init__(self, s): - self.s = s - - def closeStdin(self): - self.closeStdinCalled = True - - def write(self, data): - self.s.write(data) - - -class FakeAMP(object): - connector = None - reason = None - def __init__(self, s): - self.s = s - - def makeConnection(self, connector): - if self.connector is not None: - raise Exception("makeConnection called twice") - self.connector = connector - - def connectionLost(self, reason): - if self.reason is not None: - raise Exception("connectionLost called twice") - self.reason = reason - - def dataReceived(self, data): - self.s.write(data) - - -class Ping(amp.Command): - arguments = [('data', amp.String())] - response = [('response', amp.String())] - - -class Pong(amp.Command): - arguments = [('data', amp.String())] - response = [('response', amp.String())] - - -class Pid(amp.Command): - response = [('pid', amp.Integer())] - - -class Reactor(amp.Command): - response = [('classname', amp.String())] - - -class NoResponse(amp.Command): - arguments = [('arg', amp.String())] - requiresAnswer = False - - -class GetResponse(amp.Command): - response = [("response", amp.String())] - - -class Child(child.AMPChild): - def ping(self, data): - return self.callRemote(Pong, data=data) - Ping.responder(ping) - - -class PidChild(child.AMPChild): - def pid(self): - import os - return {'pid': os.getpid()} - Pid.responder(pid) - - -class NoResponseChild(child.AMPChild): - _set = False - def noresponse(self, arg): - self._set = arg - return {} - NoResponse.responder(noresponse) - - def getresponse(self): - return {"response": self._set} - GetResponse.responder(getresponse) - - -class ReactorChild(child.AMPChild): - def reactor(self): - from twisted.internet import reactor - return {'classname': reactor.__class__.__name__} - Reactor.responder(reactor) - - -class First(amp.Command): - arguments = [('data', amp.String())] - response = [('response', amp.String())] - - -class Second(amp.Command): - pass - - -class WaitingChild(child.AMPChild): - deferred = None - - def first(self, data): - self.deferred = defer.Deferred() - return self.deferred.addCallback(lambda _: {'response': data}) - First.responder(first) - - def second(self): - self.deferred.callback('') - return {} - Second.responder(second) - - -class Die(amp.Command): - pass - - -class BadChild(child.AMPChild): - def die(self): - self.shutdown = False - self.transport.loseConnection() - return {} - Die.responder(die) - - -class Write(amp.Command): - response = [("response", amp.String())] - pass - - -class Writer(child.AMPChild): - - def __init__(self, data='hello'): - child.AMPChild.__init__(self) - self.data = data - - def write(self): - return {'response': self.data} - Write.responder(write) - - -class GetCWD(amp.Command): - - response = [("cwd", amp.String())] - - -class TempDirChild(child.AMPChild): - - def __init__(self, directory=None): - child.AMPChild.__init__(self) - self.directory = directory - - def __enter__(self): - directory = tempfile.mkdtemp() - os.chdir(directory) - if self.directory is not None: - os.mkdir(self.directory) - os.chdir(self.directory) - - def __exit__(self, exc_type, exc_val, exc_tb): - cwd = os.getcwd() - os.chdir('..') - os.rmdir(cwd) - - def getcwd(self): - return {'cwd': os.getcwd()} - GetCWD.responder(getcwd) - - -class TestAMPConnector(unittest.TestCase): - def setUp(self): - """ - The only reason why this method exists is to let 'trial ampoule' - to install the signal handlers (#3178 for reference). - """ - super(TestAMPConnector, self).setUp() - d = defer.Deferred() - reactor.callLater(0, d.callback, None) - return d - - def _makeConnector(self, s, sa): - a = FakeAMP(sa) - ac = main.AMPConnector(a) - assert ac.name is not None - ac.transport = _FakeT(s) - return ac - - def test_protocol(self): - """ - Test that outReceived writes to AMP and that it triggers the - finished deferred once the process ended. - """ - s = sio() - sa = sio() - ac = self._makeConnector(s, sa) - - for x in xrange(99): - ac.childDataReceived(4, str(x)) - - ac.processEnded(failure.Failure(error.ProcessDone(0))) - return ac.finished.addCallback( - lambda _: self.assertEqual(sa.getvalue(), ''.join(str(x) for x in xrange(99))) - ) - - def test_protocol_failing(self): - """ - Test that a failure in the process termination is correctly - propagated to the finished deferred. - """ - s = sio() - sa = sio() - ac = self._makeConnector(s, sa) - - ac.finished.addCallback(_raise) - fail = failure.Failure(error.ProcessTerminated()) - self.assertFailure(ac.finished, error.ProcessTerminated) - ac.processEnded(fail) - - def test_startProcess(self): - """ - Test that startProcess actually starts a subprocess and that - it receives data back from the process through AMP. - """ - s = sio() - a = FakeAMP(s) - STRING = "ciao" - BOOT = """\ -import sys, os -def main(arg): - os.write(4, arg) -main(sys.argv[1]) -""" - starter = main.ProcessStarter(bootstrap=BOOT, - args=(STRING,), - packages=("twisted",)) - - amp, finished = starter.startPythonProcess(main.AMPConnector(a)) - def _eb(reason): - print reason - finished.addErrback(_eb) - return finished.addCallback(lambda _: self.assertEquals(s.getvalue(), STRING)) - - def test_failing_deferToProcess(self): - """ - Test failing subprocesses and the way they terminate and preserve - failing information. - """ - s = sio() - a = FakeAMP(s) - STRING = "ciao" - BOOT = """\ -import sys -def main(arg): - raise Exception(arg) -main(sys.argv[1]) -""" - starter = main.ProcessStarter(bootstrap=BOOT, args=(STRING,), packages=("twisted",)) - ready, finished = starter.startPythonProcess(main.AMPConnector(a), "I'll be ignored") - - self.assertFailure(finished, error.ProcessTerminated) - finished.addErrback(lambda reason: self.assertEquals(reason.getMessage(), STRING)) - return finished - - def test_env_setting(self): - """ - Test that and environment variable passed to the process starter - is correctly passed to the child process. - """ - s = sio() - a = FakeAMP(s) - STRING = "ciao" - BOOT = """\ -import sys, os -def main(): - os.write(4, os.getenv("FOOBAR")) -main() -""" - starter = main.ProcessStarter(bootstrap=BOOT, - packages=("twisted",), - env={"FOOBAR": STRING}) - amp, finished = starter.startPythonProcess(main.AMPConnector(a), "I'll be ignored") - - def _eb(reason): - print reason - finished.addErrback(_eb) - return finished.addCallback(lambda _: self.assertEquals(s.getvalue(), STRING)) - - def test_startAMPProcess(self): - """ - Test that you can start an AMP subprocess and that it correctly - accepts commands and correctly answers them. - """ - STRING = "ciao" - - starter = main.ProcessStarter(packages=("twisted",)) - c, finished = starter.startAMPProcess(child.AMPChild) - c.callRemote(commands.Echo, data=STRING - ).addCallback(lambda response: - self.assertEquals(response['response'], STRING) - ).addCallback(lambda _: c.callRemote(commands.Shutdown)) - return finished - - def test_BootstrapContext(self): - starter = main.ProcessStarter(packages=('twisted',)) - c, finished = starter.startAMPProcess(TempDirChild) - cwd = [] - - def checkBootstrap(response): - cwd.append(response['cwd']) - self.assertNotEquals(cwd, os.getcwd()) - d = c.callRemote(GetCWD) - d.addCallback(checkBootstrap) - d.addCallback(lambda _: c.callRemote(commands.Shutdown)) - finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0]))) - return finished - - def test_BootstrapContextInstance(self): - starter = main.ProcessStarter(packages=('twisted',)) - c, finished = starter.startAMPProcess(TempDirChild, - ampChildArgs=('foo',)) - cwd = [] - - def checkBootstrap(response): - cwd.append(response['cwd']) - self.assertTrue(cwd[0].endswith('/foo')) - d = c.callRemote(GetCWD) - d.addCallback(checkBootstrap) - d.addCallback(lambda _: c.callRemote(commands.Shutdown)) - finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0]))) - return finished - - def test_startAMPAndParentProtocol(self): - """ - Test that you can start an AMP subprocess and the children can - call methods on their parent. - """ - DATA = "CIAO" - APPEND = "123" - - class Parent(amp.AMP): - def pong(self, data): - return {'response': DATA+APPEND} - Pong.responder(pong) - - starter = main.ProcessStarter(packages=("twisted",)) - - subp, finished = starter.startAMPProcess(ampChild=Child, ampParent=Parent) - subp.callRemote(Ping, data=DATA - ).addCallback(lambda response: - self.assertEquals(response['response'], DATA+APPEND) - ).addCallback(lambda _: subp.callRemote(commands.Shutdown)) - return finished - - def test_roundtripError(self): - """ - Test that invoking a child using an unreachable class raises - a L{RunTimeError} . - """ - class Child(child.AMPChild): - pass - - starter = main.ProcessStarter(packages=("twisted",)) - - self.assertRaises(RuntimeError, starter.startAMPProcess, ampChild=Child) - - -class TestProcessPool(unittest.TestCase): - - def test_startStopWorker(self): - """ - Test that starting and stopping a worker keeps the state of - the process pool consistent. - """ - pp = pool.ProcessPool() - self.assertEquals(pp.started, False) - self.assertEquals(pp.finished, False) - self.assertEquals(pp.processes, set()) - self.assertEquals(pp._finishCallbacks, {}) - - def _checks(): - self.assertEquals(pp.started, False) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), 1) - self.assertEquals(len(pp._finishCallbacks), 1) - return pp.stopAWorker() - - def _closingUp(_): - self.assertEquals(pp.started, False) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), 0) - self.assertEquals(pp._finishCallbacks, {}) - pp.startAWorker() - return _checks().addCallback(_closingUp).addCallback(lambda _: pp.stop()) - - def test_startAndStop(self): - """ - Test that a process pool's start and stop method create the - expected number of workers and keep state consistent in the - process pool. - """ - pp = pool.ProcessPool() - self.assertEquals(pp.started, False) - self.assertEquals(pp.finished, False) - self.assertEquals(pp.processes, set()) - self.assertEquals(pp._finishCallbacks, {}) - - def _checks(_): - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.min) - self.assertEquals(len(pp._finishCallbacks), pp.min) - return pp.stop() - - def _closingUp(_): - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, True) - self.assertEquals(len(pp.processes), 0) - self.assertEquals(pp._finishCallbacks, {}) - return pp.start().addCallback(_checks).addCallback(_closingUp) - - def test_adjustPoolSize(self): - """ - Test that calls to pool.adjustPoolSize are correctly handled. - """ - pp = pool.ProcessPool(min=10) - self.assertEquals(pp.started, False) - self.assertEquals(pp.finished, False) - self.assertEquals(pp.processes, set()) - self.assertEquals(pp._finishCallbacks, {}) - - def _resize1(_): - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.min) - self.assertEquals(len(pp._finishCallbacks), pp.min) - return pp.adjustPoolSize(min=2, max=3) - - def _resize2(_): - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(pp.max, 3) - self.assertEquals(pp.min, 2) - self.assertEquals(len(pp.processes), pp.max) - self.assertEquals(len(pp._finishCallbacks), pp.max) - - def _resize3(_): - self.assertRaises(AssertionError, pp.adjustPoolSize, min=-1, max=5) - self.assertRaises(AssertionError, pp.adjustPoolSize, min=5, max=1) - return pp.stop() - - return pp.start( - ).addCallback(_resize1 - ).addCallback(_resize2 - ).addCallback(_resize3) - - def test_childRestart(self): - """ - Test that a failing child process is immediately restarted. - """ - pp = pool.ProcessPool(ampChild=BadChild, min=1) - STRING = "DATA" - - def _checks(_): - d = pp._finishCallbacks.values()[0] - pp.doWork(Die).addErrback(lambda _: None) - return d.addBoth(_checksAgain) - - def _checksAgain(_): - return pp.doWork(commands.Echo, data=STRING - ).addCallback(lambda result: self.assertEquals(result['response'], STRING)) - - return pp.start( - ).addCallback(_checks - ).addCallback(lambda _: pp.stop()) - - def test_parentProtocolChange(self): - """ - Test that the father can use an AMP protocol too. - """ - DATA = "CIAO" - APPEND = "123" - - class Parent(amp.AMP): - def pong(self, data): - return {'response': DATA+APPEND} - Pong.responder(pong) - - pp = pool.ProcessPool(ampChild=Child, ampParent=Parent) - - def _checks(_): - return pp.doWork(Ping, data=DATA - ).addCallback(lambda response: - self.assertEquals(response['response'], DATA+APPEND) - ) - - return pp.start().addCallback(_checks).addCallback(lambda _: pp.stop()) - - def test_deferToAMPProcess(self): - """ - Test that deferToAMPProcess works as expected. - """ - def cleanupGlobalPool(): - d = pool.pp.stop() - pool.pp = None - return d - self.addCleanup(cleanupGlobalPool) - - STRING = "CIAOOOO" - d = pool.deferToAMPProcess(commands.Echo, data=STRING) - d.addCallback(self.assertEquals, {"response": STRING}) - return d - - def test_checkStateInPool(self): - """ - Test that busy and ready lists are correctly maintained. - """ - pp = pool.ProcessPool(ampChild=WaitingChild) - - DATA = "foobar" - - def _checks(_): - d = pp.callRemote(First, data=DATA) - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.min) - self.assertEquals(len(pp._finishCallbacks), pp.min) - self.assertEquals(len(pp.ready), pp.min-1) - self.assertEquals(len(pp.busy), 1) - child = pp.busy.pop() - pp.busy.add(child) - child.callRemote(Second) - return d - - return pp.start( - ).addCallback(_checks - ).addCallback(lambda _: pp.stop()) - - def test_growingToMax(self): - """ - Test that the pool grows over time until it reaches max processes. - """ - MAX = 5 - pp = pool.ProcessPool(ampChild=WaitingChild, min=1, max=MAX) - - def _checks(_): - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.min) - self.assertEquals(len(pp._finishCallbacks), pp.min) - - D = "DATA" - d = [pp.doWork(First, data=D) for x in xrange(MAX)] - - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.max) - self.assertEquals(len(pp._finishCallbacks), pp.max) - - [child.callRemote(Second) for child in pp.processes] - return defer.DeferredList(d) - - return pp.start( - ).addCallback(_checks - ).addCallback(lambda _: pp.stop()) - - def test_growingToMaxAndShrinking(self): - """ - Test that the pool grows but after 'idle' time the number of - processes goes back to the minimum. - """ - - MAX = 5 - MIN = 1 - IDLE = 1 - pp = pool.ProcessPool(ampChild=WaitingChild, min=MIN, max=MAX, maxIdle=IDLE) - - def _checks(_): - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.min) - self.assertEquals(len(pp._finishCallbacks), pp.min) - - D = "DATA" - d = [pp.doWork(First, data=D) for x in xrange(MAX)] - - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.max) - self.assertEquals(len(pp._finishCallbacks), pp.max) - - [child.callRemote(Second) for child in pp.processes] - return defer.DeferredList(d).addCallback(_realChecks) - - def _realChecks(_): - from twisted.internet import reactor - d = defer.Deferred() - - def _cb(): - def __(_): - try: - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.min) - self.assertEquals(len(pp._finishCallbacks), pp.min) - d.callback(None) - except Exception, e: - d.errback(e) - return pp._pruneProcesses().addCallback(__) - # just to be shure we are called after the pruner - pp.looping.stop() # stop the looping, we don't want it to - # this right here - reactor.callLater(IDLE, _cb) - return d - - return pp.start( - ).addCallback(_checks - ).addCallback(lambda _: pp.stop()) - - def test_recycling(self): - """ - Test that after a given number of calls subprocesses are - recycled. - """ - MAX = 1 - MIN = 1 - RECYCLE_AFTER = 1 - pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER) - self.addCleanup(pp.stop) - - def _checks(_): - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.min) - self.assertEquals(len(pp._finishCallbacks), pp.min) - return pp.doWork(Pid - ).addCallback(lambda response: response['pid']) - - def _checks2(pid): - return pp.doWork(Pid - ).addCallback(lambda response: response['pid'] - ).addCallback(self.assertNotEquals, pid) - - - d = pp.start() - d.addCallback(_checks) - d.addCallback(_checks2) - return d - - def test_recyclingWithQueueOverload(self): - """ - Test that we get the correct number of different results when - we overload the pool of calls. - """ - MAX = 5 - MIN = 1 - RECYCLE_AFTER = 10 - CALLS = 60 - pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER) - self.addCleanup(pp.stop) - - def _check(results): - s = set() - for succeed, response in results: - s.add(response['pid']) - - # For the first C{MAX} calls, each is basically guaranteed to go to - # a different child. After that, though, there are no guarantees. - # All the rest might go to a single child, since the child to - # perform a job is selected arbitrarily from the "ready" set. Fair - # distribution of jobs needs to be implemented; right now it's "set - # ordering" distribution of jobs. - self.assertTrue(len(s) > MAX) - - def _work(_): - l = [pp.doWork(Pid) for x in xrange(CALLS)] - d = defer.DeferredList(l) - return d.addCallback(_check) - d = pp.start() - d.addCallback(_work) - return d - - def test_disableProcessRecycling(self): - """ - Test that by setting 0 to recycleAfter we actually disable process recycling. - """ - MAX = 1 - MIN = 1 - RECYCLE_AFTER = 0 - pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER) - - def _checks(_): - self.assertEquals(pp.started, True) - self.assertEquals(pp.finished, False) - self.assertEquals(len(pp.processes), pp.min) - self.assertEquals(len(pp._finishCallbacks), pp.min) - return pp.doWork(Pid - ).addCallback(lambda response: response['pid']) - - def _checks2(pid): - return pp.doWork(Pid - ).addCallback(lambda response: response['pid'] - ).addCallback(self.assertEquals, pid - ).addCallback(lambda _: pid) - - def finish(reason): - return pp.stop().addCallback(lambda _: reason) - - return pp.start( - ).addCallback(_checks - ).addCallback(_checks2 - ).addCallback(_checks2 - ).addCallback(finish) - - def test_changeChildrenReactor(self): - """ - Test that by passing the correct argument children change their - reactor type. - """ - MAX = 1 - MIN = 1 - FIRST = "select" - SECOND = "poll" - - def checkDefault(): - pp = pool.ProcessPool( - starter=main.ProcessStarter( - childReactor=FIRST, - packages=("twisted",)), - ampChild=ReactorChild, min=MIN, max=MAX) - pp.start() - return pp.doWork(Reactor - ).addCallback(self.assertEquals, {'classname': "SelectReactor"} - ).addCallback(lambda _: pp.stop()) - - def checkPool(_): - pp = pool.ProcessPool( - starter=main.ProcessStarter( - childReactor=SECOND, - packages=("twisted",)), - ampChild=ReactorChild, min=MIN, max=MAX) - pp.start() - return pp.doWork(Reactor - ).addCallback(self.assertEquals, {'classname': "PollReactor"} - ).addCallback(lambda _: pp.stop()) - - return checkDefault( - ).addCallback(checkPool) - try: - from select import poll - except ImportError: - test_changeChildrenReactor.skip = "This architecture doesn't support select.poll, I can't run this test" - - def test_commandsWithoutResponse(self): - """ - Test that if we send a command without a required answer we - actually don't have any problems. - """ - DATA = "hello" - pp = pool.ProcessPool(ampChild=NoResponseChild, min=1, max=1) - - def _check(_): - return pp.doWork(GetResponse - ).addCallback(self.assertEquals, {"response": DATA}) - - def _work(_): - return pp.doWork(NoResponse, arg=DATA) - - return pp.start( - ).addCallback(_work - ).addCallback(_check - ).addCallback(lambda _: pp.stop()) - - def test_SupplyChildArgs(self): - """Ensure that arguments for the child constructor are passed in.""" - pp = pool.ProcessPool(Writer, ampChildArgs=['body'], min=0) - - def _check(result): - return pp.doWork(Write).addCallback( - self.assertEquals, {'response': 'body'}) - - return pp.start( - ).addCallback(_check - ).addCallback(lambda _: pp.stop()) - - def processTimeoutTest(self, timeout): - pp = pool.ProcessPool(WaitingChild, min=1, max=1) - - def _work(_): - d = pp.callRemote(First, data="ciao", _timeout=timeout) - self.assertFailure(d, error.ProcessTerminated) - return d - - return pp.start( - ).addCallback(_work - ).addCallback(lambda _: pp.stop()) - - def test_processTimeout(self): - """ - Test that a call that doesn't finish within the given timeout - time is correctly handled. - """ - return self.processTimeoutTest(1) - - def test_processTimeoutZero(self): - """ - Test that the process is correctly handled when the timeout is zero. - """ - return self.processTimeoutTest(0) - - def test_processDeadline(self): - pp = pool.ProcessPool(WaitingChild, min=1, max=1) - - def _work(_): - d = pp.callRemote(First, data="ciao", _deadline=reactor.seconds()) - self.assertFailure(d, error.ProcessTerminated) - return d - - return pp.start( - ).addCallback(_work - ).addCallback(lambda _: pp.stop()) - - def test_processBeforeDeadline(self): - pp = pool.ProcessPool(PidChild, min=1, max=1) - - def _work(_): - d = pp.callRemote(Pid, _deadline=reactor.seconds() + 10) - d.addCallback(lambda result: self.assertNotEqual(result['pid'], 0)) - return d - - return pp.start( - ).addCallback(_work - ).addCallback(lambda _: pp.stop()) - - def test_processTimeoutSignal(self): - """ - Test that a call that doesn't finish within the given timeout - time is correctly handled. - """ - pp = pool.ProcessPool(WaitingChild, min=1, max=1, - timeout_signal=SIGHUP) - - def _work(_): - d = pp.callRemote(First, data="ciao", _timeout=1) - d.addCallback(lambda d: self.fail()) - text = 'signal %d' % SIGHUP - d.addErrback( - lambda f: self.assertTrue(text in f.value[0], - '"%s" not in "%s"' % (text, f.value[0]))) - return d - - return pp.start( - ).addCallback(_work - ).addCallback(lambda _: pp.stop()) - - def test_processGlobalTimeout(self): - """ - Test that a call that doesn't finish within the given global - timeout time is correctly handled. - """ - pp = pool.ProcessPool(WaitingChild, min=1, max=1, timeout=1) - - def _work(_): - d = pp.callRemote(First, data="ciao") - self.assertFailure(d, error.ProcessTerminated) - return d - - return pp.start( - ).addCallback(_work - ).addCallback(lambda _: pp.stop()) diff --git a/evennia/contrib/procpools/ampoule/test/test_proxy.py b/evennia/contrib/procpools/ampoule/test/test_proxy.py deleted file mode 100644 index 92422576e8..0000000000 --- a/evennia/contrib/procpools/ampoule/test/test_proxy.py +++ /dev/null @@ -1,52 +0,0 @@ -from twisted.internet import defer, reactor -from twisted.internet.protocol import ClientFactory -from twisted.trial import unittest -from twisted.protocols import amp - -from evennia.contrib.procpools.ampoule import service, child, pool, main -from evennia.contrib.procpools.ampoule.commands import Echo - - -class ClientAMP(amp.AMP): - factory = None - - def connectionMade(self): - if self.factory is not None: - self.factory.theProto = self - if hasattr(self.factory, 'onMade'): - self.factory.onMade.callback(None) - - -class TestAMPProxy(unittest.TestCase): - def setUp(self): - """ - Setup the proxy service and the client connection to the proxy - service in order to run call through them. - - Inspiration comes from twisted.test.test_amp - """ - self.pp = pool.ProcessPool() - self.svc = service.AMPouleService(self.pp, child.AMPChild, 0, "") - self.svc.startService() - self.proxy_port = self.svc.server.getHost().port - self.clientFactory = ClientFactory() - self.clientFactory.protocol = ClientAMP - d = self.clientFactory.onMade = defer.Deferred() - self.clientConn = reactor.connectTCP("127.0.0.1", - self.proxy_port, - self.clientFactory) - self.addCleanup(self.clientConn.disconnect) - self.addCleanup(self.svc.stopService) - def setClient(_): - self.client = self.clientFactory.theProto - return d.addCallback(setClient) - - def test_forwardCall(self): - """ - Test that a call made from a client is correctly forwarded to - the process pool and the result is correctly reported. - """ - DATA = "hello" - return self.client.callRemote(Echo, data=DATA).addCallback( - self.assertEquals, {'response': DATA} - ) diff --git a/evennia/contrib/procpools/ampoule/util.py b/evennia/contrib/procpools/ampoule/util.py deleted file mode 100644 index 738e5f2edd..0000000000 --- a/evennia/contrib/procpools/ampoule/util.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -some utilities -""" -import os -import sys -import __main__ - -from twisted.python.filepath import FilePath -from twisted.python.reflect import namedAny -# from twisted.python.modules import theSystemPath - -def findPackagePath(modulePath): - """ - Try to find the sys.path entry from a modulePath object, simultaneously - computing the module name of the targetted file. - """ - p = modulePath - l = [p.basename().split(".")[0]] - while p.parent() != p: - for extension in ['py', 'pyc', 'pyo', 'pyd', 'dll']: - sib = p.sibling("__init__."+extension) - if sib.exists(): - p = p.parent() - l.insert(0, p.basename()) - break - else: - return p.parent(), '.'.join(l) - - -def mainpoint(function): - """ - Decorator which declares a function to be an object's mainpoint. - """ - if function.__module__ == '__main__': - # OK time to run a function - p = FilePath(__main__.__file__) - p, mn = findPackagePath(p) - pname = p.path - if pname not in map(os.path.abspath, sys.path): - sys.path.insert(0, pname) - # Maybe remove the module's path? - exitcode = namedAny(mn+'.'+function.__name__)(sys.argv) - if exitcode is None: - exitcode = 0 - sys.exit(exitcode) - return function diff --git a/evennia/contrib/procpools/python_procpool.py b/evennia/contrib/procpools/python_procpool.py deleted file mode 100644 index c9a5184492..0000000000 --- a/evennia/contrib/procpools/python_procpool.py +++ /dev/null @@ -1,326 +0,0 @@ -""" -Python ProcPool - -Evennia Contribution - Griatch 2012 - -The ProcPool is used to execute code on a separate process. This allows for -true asynchronous operation. Process communication happens over AMP and is -thus fully asynchronous as far as Evennia is concerned. - -The process pool is implemented using a slightly modified version of -the Ampoule package (included). - -The python_process pool is a service activated with the instructions -in python_procpool_plugin.py. - -To use, import run_async from this module and use instead of the -in-process version found in evennia.utils.utils. Note that this is a much -more complex function than the default run_async, so make sure to read -the header carefully. - -To test it works, make sure to activate the process pool, then try the -following as superuser: - -@py from evennia.contrib.procpools.python_procpool import run_async;run_async("_return('Wohoo!')", at_return=self.msg, at_err=self.msg) - -You can also try to import time and do time.sleep(5) before the -_return statement, to test it really is asynchronous. - -""" - -from twisted.protocols import amp -from twisted.internet import threads -from evennia.contrib.procpools.ampoule.child import AMPChild -from evennia.utils.dbserialize import to_pickle, from_pickle, do_pickle, do_unpickle -from evennia.utils.idmapper.base import PROC_MODIFIED_OBJS -from evennia.utils.utils import clean_object_caches, to_str -from evennia.utils import logger - - -# -# Multiprocess command for communication Server<->Client, relaying -# data for remote Python execution -# - -class ExecuteCode(amp.Command): - """ - Executes python code in the python process, - returning result when ready. - - source - a compileable Python source code string - environment - a pickled dictionary of Python - data. Each key will become the name - of a variable available to the source - code. Database objects are stored on - the form ((app, modelname), id) allowing - the receiver to easily rebuild them on - this side. - errors - an all-encompassing error handler - response - a string or a pickled string - - """ - arguments = [('source', amp.String()), - ('environment', amp.String())] - errors = [(Exception, 'EXCEPTION')] - response = [('response', amp.String()), - ('recached', amp.String())] - - -# -# Multiprocess AMP client-side factory, for executing remote Python code -# - -class PythonProcPoolChild(AMPChild): - """ - This is describing what happens on the subprocess side. - - This already supports Echo, Shutdown and Ping. - - Methods: - executecode - a remote code execution environment - - """ - def executecode(self, source, environment): - """ - Remote code execution - - source - Python code snippet - environment - pickled dictionary of environment - variables. They are stored in - two keys "normal" and "objs" where - normal holds a dictionary of - normally pickled python objects - wheras objs points to a dictionary - of database represenations ((app,key),id). - - The environment's entries will be made available as - local variables during the execution. Normal eval - results will be returned as-is. For more complex - code snippets (run by exec), the _return function - is available: All data sent to _return(retval) will - be returned from this system whenever the system - finishes. Multiple calls to _return will result in - a list being return. The return value is pickled - and thus allows for returning any pickleable data. - - """ - - class Ret(object): - "Helper class for holding returns from exec" - def __init__(self): - self.returns = [] - def __call__(self, *args, **kwargs): - self.returns.extend(list(args)) - def get_returns(self): - lr = len(self.returns) - val = lr and (lr == 1 and self.returns[0] or self.returns) or None - if val not in (None, [], ()): - return do_pickle(to_pickle(val)) - else: - return "" - _return = Ret() - - available_vars = {'_return': _return} - if environment: - # load environment - try: - environment = from_pickle(do_unpickle(environment)) - available_vars.update(environment) - except Exception: - logger.log_trace() - # try to execute with eval first - try: - ret = eval(source, {}, available_vars) - if ret not in (None, [], ()): - ret = _return.get_returns() or do_pickle(to_pickle(ret)) - else: - ret = "" - except Exception: - # use exec instead - exec source in available_vars - ret = _return.get_returns() - # get the list of affected objects to recache - objs = PROC_MODIFIED_OBJS.values() - # we need to include the locations too, to update their content caches - objs = objs + list(set([o.location for o in objs - if hasattr(o, "location") and o.location])) - #print "objs:", objs - #print "to_pickle", to_pickle(objs, emptypickle=False, do_pickle=False) - if objs not in (None, [], ()): - to_recache = do_pickle(to_pickle(objs)) - else: - to_recache = "" - # empty the list without loosing memory reference - #PROC_MODIFIED_OBJS[:] = [] - PROC_MODIFIED_OBJS.clear() #TODO - is this not messing anything up? - return {'response': ret, - 'recached': to_recache} - ExecuteCode.responder(executecode) - - -# -# Procpool run_async - Server-side access function for executing -# code in another process -# - -_PPOOL = None -_SESSIONS = None -_PROC_ERR = "A process has ended with a probable error condition: process ended by signal 9." - - -def run_async(to_execute, *args, **kwargs): - """ - Runs a function or executes a code snippet asynchronously. - - Inputs: - to_execute (callable) - if this is a callable, it will - be executed with *args and non-reserver *kwargs as - arguments. - The callable will be executed using ProcPool, or in - a thread if ProcPool is not available. - to_execute (string) - this is only available is ProcPool is - running. If a string, to_execute this will be treated as a code - snippet to execute asynchronously. *args are then not used - and non-reserverd *kwargs are used to define the execution - environment made available to the code. - - reserved kwargs: - 'use_thread' (bool) - this only works with callables (not code). - It forces the code to run in a thread instead - of using the Process Pool, even if the latter - is available. This could be useful if you want - to make sure to not get out of sync with the - main process (such as accessing in-memory global - properties) - 'proc_timeout' (int) - only used if ProcPool is available. Sets a - max time for execution. This alters the value set - by settings.PROCPOOL_TIMEOUT - 'at_return' -should point to a callable with one argument. - It will be called with the return value from - to_execute. - 'at_return_kwargs' - this dictionary which be used as keyword - arguments to the at_return callback. - 'at_err' - this will be called with a Failure instance if - there is an error in to_execute. - 'at_err_kwargs' - this dictionary will be used as keyword - arguments to the at_err errback. - 'procpool_name' - the Service name of the procpool to use. - Default is PythonProcPool. - - *args - if to_execute is a callable, these args will be used - as arguments for that function. If to_execute is a string - *args are not used. - *kwargs - if to_execute is a callable, these kwargs will be used - as keyword arguments in that function. If a string, they - instead are used to define the executable environment - that should be available to execute the code in to_execute. - - run_async will either relay the code to a thread or to a processPool - depending on input and what is available in the system. To activate - Process pooling, settings.PROCPOOL_ENABLE must be set. - - to_execute in string form should handle all imports needed. kwargs - can be used to send objects and properties. Such properties will - be pickled, except Database Objects which will be sent across - on a special format and re-loaded on the other side. - - To get a return value from your code snippet, Use the _return() - function: Every call to this function from your snippet will - append the argument to an internal list of returns. This return value - (or a list) will be the first argument to the at_return callback. - - Use this function with restrain and only for features/commands - that you know has no influence on the cause-and-effect order of your - game (commands given after the async function might be executed before - it has finished). Accessing the same property from different - threads/processes can lead to unpredicted behaviour if you are not - careful (this is called a "race condition"). - - Also note that some databases, notably sqlite3, don't support access from - multiple threads simultaneously, so if you do heavy database access from - your to_execute under sqlite3 you will probably run very slow or even get - tracebacks. - - """ - # handle all global imports. - global _PPOOL, _SESSIONS - - # get the procpool name, if set in kwargs - procpool_name = kwargs.get("procpool_name", "PythonProcPool") - - if _PPOOL is None: - # Try to load process Pool - from evennia.server.sessionhandler import SESSIONS as _SESSIONS - try: - _PPOOL = _SESSIONS.server.services.namedServices.get(procpool_name).pool - except AttributeError: - _PPOOL = False - - use_timeout = kwargs.pop("proc_timeout", _PPOOL.timeout) - - # helper converters for callbacks/errbacks - def convert_return(f): - def func(ret, *args, **kwargs): - rval = ret["response"] and from_pickle(do_unpickle(ret["response"])) - reca = ret["recached"] and from_pickle(do_unpickle(ret["recached"])) - # recache all indicated objects - [clean_object_caches(obj) for obj in reca] - if f: - return f(rval, *args, **kwargs) - else: - return rval - return func - def convert_err(f): - def func(err, *args, **kwargs): - err.trap(Exception) - err = err.getErrorMessage() - if use_timeout and err == _PROC_ERR: - err = "Process took longer than %ss and timed out." % use_timeout - if f: - return f(err, *args, **kwargs) - else: - err = "Error reported from subprocess: '%s'" % err - logger.log_errmsg(err) - return func - - # handle special reserved input kwargs - use_thread = kwargs.pop("use_thread", False) - callback = convert_return(kwargs.pop("at_return", None)) - errback = convert_err(kwargs.pop("at_err", None)) - callback_kwargs = kwargs.pop("at_return_kwargs", {}) - errback_kwargs = kwargs.pop("at_err_kwargs", {}) - - if _PPOOL and not use_thread: - # process pool is running - if isinstance(to_execute, basestring): - # run source code in process pool - cmdargs = {"_timeout": use_timeout} - cmdargs["source"] = to_str(to_execute) - if kwargs: - cmdargs["environment"] = do_pickle(to_pickle(kwargs)) - else: - cmdargs["environment"] = "" - # defer to process pool - deferred = _PPOOL.doWork(ExecuteCode, **cmdargs) - elif callable(to_execute): - # execute callable in process - callname = to_execute.__name__ - cmdargs = {"_timeout": use_timeout} - cmdargs["source"] = "_return(%s(*args,**kwargs))" % callname - cmdargs["environment"] = do_pickle(to_pickle({callname: to_execute, - "args": args, - "kwargs": kwargs})) - deferred = _PPOOL.doWork(ExecuteCode, **cmdargs) - else: - raise RuntimeError("'%s' could not be handled by the process pool" % to_execute) - elif callable(to_execute): - # no process pool available, fall back to old deferToThread mechanism. - deferred = threads.deferToThread(to_execute, *args, **kwargs) - else: - # no appropriate input for this server setup - raise RuntimeError("'%s' could not be handled by run_async - no valid input or no process pool." % to_execute) - - # attach callbacks - if callback: - deferred.addCallback(callback, **callback_kwargs) - deferred.addErrback(errback, **errback_kwargs) diff --git a/evennia/contrib/procpools/python_procpool_plugin.py b/evennia/contrib/procpools/python_procpool_plugin.py deleted file mode 100644 index 6c47abb1a8..0000000000 --- a/evennia/contrib/procpools/python_procpool_plugin.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -Python ProcPool plugin - -Evennia contribution - Griatch 2012 - -This is a plugin for the Evennia services. It will make the service -and run_async in python_procpool.py available to the system. - -To activate, add the following line to your settings file: - -SERVER_SERVICES_PLUGIN_MODULES.append("contrib.procpools.python_procpool_plugin") - -Next reboot the server and the new service will be available. - -It is not recommended to use this with an SQLite3 database, at least -if you plan to do many out-of-process database writes. SQLite3 does -not work very well with a high frequency of off-process writes due to -file locking clashes. Test what works with your mileage. - -""" -import os -import sys -from django.conf import settings - - -# Process Pool setup - -# convenient flag to turn off process pool without changing settings -PROCPOOL_ENABLED = True -# relay process stdout to log (debug mode, very spammy) -PROCPOOL_DEBUG = False -# max/min size of the process pool. Will expand up to max limit on demand. -PROCPOOL_MIN_NPROC = 5 -PROCPOOL_MAX_NPROC = 20 -# maximum time (seconds) a process may idle before being pruned from -# pool (if pool bigger than minsize) -PROCPOOL_IDLETIME = 20 -# after sending a command, this is the maximum time in seconds the process -# may run without returning. After this time the process will be killed. This -# can be seen as a fallback; the run_async method takes a keyword proc_timeout -# that will override this value on a per-case basis. -PROCPOOL_TIMEOUT = 10 -# only change if the port clashes with something else on the system -PROCPOOL_PORT = 5001 -# 0.0.0.0 means listening to all interfaces -PROCPOOL_INTERFACE = '127.0.0.1' -# user-id and group-id to run the processes as (for OS:es supporting this). -# If you plan to run unsafe code one could experiment with setting this -# to an unprivileged user. -PROCPOOL_UID = None -PROCPOOL_GID = None -# real path to a directory where all processes will be run. If -# not given, processes will be executed in game/. -PROCPOOL_DIRECTORY = None - - -# don't need to change normally -SERVICE_NAME = "PythonProcPool" - - -# plugin hook - -def start_plugin_services(server): - """ - This will be called by the Evennia Server when starting up. - - server - the main Evennia server application - """ - if not PROCPOOL_ENABLED: - return - - # terminal output - print ' amp (Process Pool): %s' % PROCPOOL_PORT - - from evennia.contrib.procpools.ampoule import main as ampoule_main - from evennia.contrib.procpools.ampoule import service as ampoule_service - from evennia.contrib.procpools.ampoule import pool as ampoule_pool - from evennia.contrib.procpools.ampoule.main import BOOTSTRAP as _BOOTSTRAP - from evennia.contrib.procpools.python_procpool import PythonProcPoolChild - - # for some reason absolute paths don't work here, only relative ones. - apackages = ("twisted", - "evennia", - "settings") - aenv = {"DJANGO_SETTINGS_MODULE": "settings", - "DATABASE_NAME": settings.DATABASES.get("default", {}).get("NAME") or settings.DATABASE_NAME} - if PROCPOOL_DEBUG: - _BOOTSTRAP = _BOOTSTRAP % "log.startLogging(sys.stderr)" - else: - _BOOTSTRAP = _BOOTSTRAP % "" - procpool_starter = ampoule_main.ProcessStarter(packages=apackages, - env=aenv, - path=PROCPOOL_DIRECTORY, - uid=PROCPOOL_UID, - gid=PROCPOOL_GID, - bootstrap=_BOOTSTRAP, - childReactor=sys.platform == 'linux2' and "epoll" or "default") - procpool = ampoule_pool.ProcessPool(name=SERVICE_NAME, - min=PROCPOOL_MIN_NPROC, - max=PROCPOOL_MAX_NPROC, - recycleAfter=500, - timeout=PROCPOOL_TIMEOUT, - maxIdle=PROCPOOL_IDLETIME, - ampChild=PythonProcPoolChild, - starter=procpool_starter) - procpool_service = ampoule_service.AMPouleService(procpool, - PythonProcPoolChild, - PROCPOOL_PORT, - PROCPOOL_INTERFACE) - procpool_service.setName(SERVICE_NAME) - # add the new services to the server - server.services.addService(procpool_service) - - -