Moved contrib/procpool out into a separate repository until it can be looked at.

This commit is contained in:
Griatch 2015-02-23 13:47:08 +01:00
parent 65adb0982b
commit 1a3e0481c7
18 changed files with 0 additions and 2466 deletions

View file

@ -1,40 +0,0 @@
ProcPools
---------
This contrib defines a process pool subsystem for Evennia.
A process pool handles a range of separately running processes that
can accept information from the main Evennia process. The pool dynamically
grows and shrinks depending on the need (and will queue requests if there
are no free slots available).
The main use of this is to launch long-running, possibly blocking code
in a way that will not freeze up the rest of the server. So you could
execute time.sleep(10) on the process pool without anyone else on the
server noticing anything.
This folder has the following contents:
ampoule/ - this is a separate library managing the process pool. You
should not need to touch this.
Python Procpool
---------------
python_procpool.py - this implements a way to execute arbitrary python
code on the procpool. Import run_async() from this
module in order to use this functionality in-code
(this is a replacement to the in-process run_async
found in evennia.utils.utils).
python_procpool_plugin.py - this is a plugin module for the python
procpool, to start and add it to the server. Adding it
is a single line in your settings file - see the header
of the file for more info.
Adding other Procpools
----------------------
To add other types of procpools (such as for executing other remote languages
than Python), you can pretty much mimic the layout of python_procpool.py
and python_procpool_plugin.py.

View file

@ -1 +0,0 @@
# -*- coding: utf-8 -*-

View file

@ -1,23 +0,0 @@
Copyright (c) 2008
Valentino Volonghi
Matthew Lefkowitz
Copyright (c) 2009 Canonical Ltd.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View file

@ -1,20 +0,0 @@
AMPOULE
-------
https://launchpad.net/ampoule
AMPoule is a process management system using Twisted spawnProcess
functionality. It uses AMP to pipe messages to a process Pool that it
manages. The service is called ProcPool in Evennia settings.
AMPoule's very good, but unfortunately the source is very poorly
documented. Hence the source in this directory does not comform to
Evennia's normally rigid standards - for now we try to edit it as
little as possible so as to make it easy to apply upstream updates
down the line.
Changes made by Evennia are minor - it's mainly limiting spam to the
log and an added ability to turn this on/off through settings. Most
Evennia related code are found in evennia/server/procpool.py and
evennia/server/server.py.

View file

@ -1,4 +0,0 @@
from pool import deferToAMPProcess, pp
from commands import Shutdown, Ping, Echo
from child import AMPChild
__version__ = "0.2.1"

View file

@ -1,60 +0,0 @@
"""
This defines the the parent for all subprocess children.
Inherit from this to define a new type of subprocess.
"""
from twisted.internet import error
from twisted.protocols import amp
from evennia.contrib.procpools.ampoule.commands import Echo, Shutdown, Ping
class AMPChild(amp.AMP):
def __init__(self):
super(AMPChild, self).__init__(self)
self.shutdown = False
def connectionLost(self, reason):
amp.AMP.connectionLost(self, reason)
from twisted.internet import reactor
try:
reactor.stop()
except error.ReactorNotRunning:
# woa, this means that something bad happened,
# most probably we received a SIGINT. Now this is only
# a problem when you use Ctrl+C to stop the main process
# because it would send the SIGINT to child processes too.
# In all other cases receiving a SIGINT here would be an
# error condition and correctly restarted. maybe we should
# use sigprocmask?
pass
if not self.shutdown:
# if the shutdown wasn't explicit we presume that it's an
# error condition and thus we return a -1 error returncode.
import os
os._exit(-1)
def shutdown(self):
"""
This method is needed to shutdown the child gently without
generating an exception.
"""
#log.msg("Shutdown message received, goodbye.")
self.shutdown = True
return {}
Shutdown.responder(shutdown)
def ping(self):
"""
Ping the child and return an answer
"""
return {'response': "pong"}
Ping.responder(ping)
def echo(self, data):
"""
Echo some data through the child.
"""
return {'response': data}
Echo.responder(echo)

View file

@ -1,11 +0,0 @@
from twisted.protocols import amp
class Shutdown(amp.Command):
responseType = amp.QuitBox
class Ping(amp.Command):
response = [('response', amp.String())]
class Echo(amp.Command):
arguments = [('data', amp.String())]
response = [('response', amp.String())]

View file

@ -1,24 +0,0 @@
from zope.interface import Interface
class IStarter(Interface):
def startAMPProcess(ampChild, ampParent=None):
"""
@param ampChild: The AMP protocol spoken by the created child.
@type ampChild: L{twisted.protocols.amp.AMP}
@param ampParent: The AMP protocol spoken by the parent.
@type ampParent: L{twisted.protocols.amp.AMP}
"""
def startPythonProcess(prot, *args):
"""
@param prot: a L{protocol.ProcessProtocol} subclass
@type prot: L{protocol.ProcessProtocol}
@param args: a tuple of arguments that will be passed to the
child process.
@return: a tuple of the child process and the deferred finished.
finished triggers when the subprocess dies for any reason.
"""

View file

@ -1,302 +0,0 @@
import os
import sys
import imp
import itertools
from zope.interface import implements
from twisted.internet import reactor, protocol, defer, error
from twisted.python import log, util, reflect
from twisted.protocols import amp
from twisted.python import runtime
from twisted.python.compat import set
from evennia.contrib.procpools.ampoule import iampoule
gen = itertools.count()
if runtime.platform.isWindows():
IS_WINDOWS = True
TO_CHILD = 0
FROM_CHILD = 1
else:
IS_WINDOWS = False
TO_CHILD = 3
FROM_CHILD = 4
class AMPConnector(protocol.ProcessProtocol):
"""
A L{ProcessProtocol} subclass that can understand and speak AMP.
@ivar amp: the children AMP process
@type amp: L{amp.AMP}
@ivar finished: a deferred triggered when the process dies.
@type finished: L{defer.Deferred}
@ivar name: Unique name for the connector, much like a pid.
@type name: int
"""
def __init__(self, proto, name=None):
"""
@param proto: An instance or subclass of L{amp.AMP}
@type proto: L{amp.AMP}
@param name: optional name of the subprocess.
@type name: int
"""
self.finished = defer.Deferred()
self.amp = proto
self.name = name
if name is None:
self.name = gen.next()
def signalProcess(self, signalID):
"""
Send the signal signalID to the child process
@param signalID: The signal ID that you want to send to the
corresponding child
@type signalID: C{str} or C{int}
"""
return self.transport.signalProcess(signalID)
def connectionMade(self):
#log.msg("Subprocess %s started." % (self.name,))
self.amp.makeConnection(self)
# Transport
disconnecting = False
def write(self, data):
if IS_WINDOWS:
self.transport.write(data)
else:
self.transport.writeToChild(TO_CHILD, data)
def loseConnection(self):
self.transport.closeChildFD(TO_CHILD)
self.transport.closeChildFD(FROM_CHILD)
self.transport.loseConnection()
def getPeer(self):
return ('subprocess %i' % self.name,)
def getHost(self):
return ('Evennia Server',)
def childDataReceived(self, childFD, data):
if childFD == FROM_CHILD:
self.amp.dataReceived(data)
return
self.errReceived(data)
def errReceived(self, data):
for line in data.strip().splitlines():
log.msg("FROM %s: %s" % (self.name, line))
def processEnded(self, status):
#log.msg("Process: %s ended" % (self.name,))
self.amp.connectionLost(status)
if status.check(error.ProcessDone):
self.finished.callback('')
return
self.finished.errback(status)
BOOTSTRAP = """\
import sys
def main(reactor, ampChildPath):
from twisted.application import reactors
reactors.installReactor(reactor)
from twisted.python import log
%s
from twisted.internet import reactor, stdio
from twisted.python import reflect, runtime
ampChild = reflect.namedAny(ampChildPath)
ampChildInstance = ampChild(*sys.argv[1:-2])
if runtime.platform.isWindows():
stdio.StandardIO(ampChildInstance)
else:
stdio.StandardIO(ampChildInstance, %s, %s)
enter = getattr(ampChildInstance, '__enter__', None)
if enter is not None:
enter()
try:
reactor.run()
except:
if enter is not None:
info = sys.exc_info()
if not ampChildInstance.__exit__(*info):
raise
else:
raise
else:
if enter is not None:
ampChildInstance.__exit__(None, None, None)
main(sys.argv[-2], sys.argv[-1])
""" % ('%s', TO_CHILD, FROM_CHILD)
# in the first spot above, either insert an empty string or
# 'log.startLogging(sys.stderr)'
# to start logging
class ProcessStarter(object):
implements(iampoule.IStarter)
connectorFactory = AMPConnector
def __init__(self, bootstrap=BOOTSTRAP, args=(), env={},
path=None, uid=None, gid=None, usePTY=0,
packages=(), childReactor="select"):
"""
@param bootstrap: Startup code for the child process
@type bootstrap: C{str}
@param args: Arguments that should be supplied to every child
created.
@type args: C{tuple} of C{str}
@param env: Environment variables that should be present in the
child environment
@type env: C{dict}
@param path: Path in which to run the child
@type path: C{str}
@param uid: if defined, the uid used to run the new process.
@type uid: C{int}
@param gid: if defined, the gid used to run the new process.
@type gid: C{int}
@param usePTY: Should the child processes use PTY processes
@type usePTY: 0 or 1
@param packages: A tuple of packages that should be guaranteed
to be importable in the child processes
@type packages: C{tuple} of C{str}
@param childReactor: a string that sets the reactor for child
processes
@type childReactor: C{str}
"""
self.bootstrap = bootstrap
self.args = args
self.env = env
self.path = path
self.uid = uid
self.gid = gid
self.usePTY = usePTY
self.packages = ("evennia.contrib.procpools.ampoule",) + packages
self.packages = packages
self.childReactor = childReactor
def __repr__(self):
"""
Represent the ProcessStarter with a string.
"""
return """ProcessStarter(bootstrap=%r,
args=%r,
env=%r,
path=%r,
uid=%r,
gid=%r,
usePTY=%r,
packages=%r,
childReactor=%r)""" % (self.bootstrap,
self.args,
self.env,
self.path,
self.uid,
self.gid,
self.usePTY,
self.packages,
self.childReactor)
def _checkRoundTrip(self, obj):
"""
Make sure that an object will properly round-trip through 'qual' and
'namedAny'.
Raise a L{RuntimeError} if they aren't.
"""
tripped = reflect.namedAny(reflect.qual(obj))
if tripped is not obj:
raise RuntimeError("importing %r is not the same as %r" %
(reflect.qual(obj), obj))
def startAMPProcess(self, ampChild, ampParent=None, ampChildArgs=()):
"""
@param ampChild: a L{ampoule.child.AMPChild} subclass.
@type ampChild: L{ampoule.child.AMPChild}
@param ampParent: an L{amp.AMP} subclass that implements the parent
protocol for this process pool
@type ampParent: L{amp.AMP}
"""
self._checkRoundTrip(ampChild)
fullPath = reflect.qual(ampChild)
if ampParent is None:
ampParent = amp.AMP
prot = self.connectorFactory(ampParent())
args = ampChildArgs + (self.childReactor, fullPath)
return self.startPythonProcess(prot, *args)
def startPythonProcess(self, prot, *args):
"""
@param prot: a L{protocol.ProcessProtocol} subclass
@type prot: L{protocol.ProcessProtocol}
@param args: a tuple of arguments that will be added after the
ones in L{self.args} to start the child process.
@return: a tuple of the child process and the deferred finished.
finished triggers when the subprocess dies for any reason.
"""
spawnProcess(prot, self.bootstrap, self.args+args, env=self.env,
path=self.path, uid=self.uid, gid=self.gid,
usePTY=self.usePTY, packages=self.packages)
# XXX: we could wait for startup here, but ... is there really any
# reason to? the pipe should be ready for writing. The subprocess
# might not start up properly, but then, a subprocess might shut down
# at any point too. So we just return amp and have this piece to be
# synchronous.
return prot.amp, prot.finished
def spawnProcess(processProtocol, bootstrap, args=(), env={},
path=None, uid=None, gid=None, usePTY=0,
packages=()):
env = env.copy()
pythonpath = []
for pkg in packages:
pkg_path, name = os.path.split(pkg)
p = os.path.split(imp.find_module(name, [pkg_path] if pkg_path else None)[1])[0]
if p.startswith(os.path.join(sys.prefix, 'lib')):
continue
pythonpath.append(p)
pythonpath = list(set(pythonpath))
pythonpath.extend(env.get('PYTHONPATH', '').split(os.pathsep))
env['PYTHONPATH'] = os.pathsep.join(pythonpath)
args = (sys.executable, '-c', bootstrap) + args
# childFDs variable is needed because sometimes child processes
# misbehave and use stdout to output stuff that should really go
# to stderr. Of course child process might even use the wrong FDs
# that I'm using here, 3 and 4, so we are going to fix all these
# issues when I add support for the configuration object that can
# fix this stuff in a more configurable way.
if IS_WINDOWS:
return reactor.spawnProcess(processProtocol, sys.executable, args,
env, path, uid, gid, usePTY)
else:
return reactor.spawnProcess(processProtocol, sys.executable, args,
env, path, uid, gid, usePTY,
childFDs={0:"w", 1:"r", 2:"r", 3:"w", 4:"r"})

View file

@ -1,416 +0,0 @@
import time
import random
import heapq
import itertools
import signal
choice = random.choice
now = time.time
count = itertools.count().next
pop = heapq.heappop
from twisted.internet import defer, task, error
from twisted.python import log, failure
from evennia.contrib.procpools.ampoule import commands, main
try:
DIE = signal.SIGKILL
except AttributeError:
# Windows doesn't have SIGKILL, let's just use SIGTERM then
DIE = signal.SIGTERM
class ProcessPool(object):
"""
This class generalizes the functionality of a pool of
processes to which work can be dispatched.
@ivar finished: Boolean flag, L{True} when the pool is finished.
@ivar started: Boolean flag, L{True} when the pool is started.
@ivar name: Optional name for the process pool
@ivar min: Minimum number of subprocesses to set up
@ivar max: Maximum number of subprocesses to set up
@ivar maxIdle: Maximum number of seconds of indleness in a child
@ivar starter: A process starter instance that provides
L{iampoule.IStarter}.
@ivar recycleAfter: Maximum number of calls before restarting a
subprocess, 0 to not recycle.
@ivar ampChild: The child AMP protocol subclass with the commands
that the child should implement.
@ivar ampParent: The parent AMP protocol subclass with the commands
that the parent should implement.
@ivar timeout: The general timeout (in seconds) for every child
process call.
"""
finished = False
started = False
name = None
def __init__(self, ampChild=None, ampParent=None, min=5, max=20,
name=None, maxIdle=20, recycleAfter=500, starter=None,
timeout=None, timeout_signal=DIE, ampChildArgs=()):
self.starter = starter
self.ampChildArgs = tuple(ampChildArgs)
if starter is None:
self.starter = main.ProcessStarter(packages=("twisted",))
self.ampParent = ampParent
self.ampChild = ampChild
if ampChild is None:
from evennia.contrib.procpools.ampoule.child import AMPChild
self.ampChild = AMPChild
self.min = min
self.max = max
self.name = name
self.maxIdle = maxIdle
self.recycleAfter = recycleAfter
self.timeout = timeout
self.timeout_signal = timeout_signal
self._queue = []
self.processes = set()
self.ready = set()
self.busy = set()
self._finishCallbacks = {}
self._lastUsage = {}
self._calls = {}
self.looping = task.LoopingCall(self._pruneProcesses)
self.looping.start(maxIdle, now=False)
def start(self, ampChild=None):
"""
Starts the ProcessPool with a given child protocol.
@param ampChild: a L{ampoule.child.AMPChild} subclass.
@type ampChild: L{ampoule.child.AMPChild} subclass
"""
if ampChild is not None and not self.started:
self.ampChild = ampChild
self.finished = False
self.started = True
return self.adjustPoolSize()
def _pruneProcesses(self):
"""
Remove idle processes from the pool.
"""
n = now()
d = []
for child, lastUse in self._lastUsage.iteritems():
if len(self.processes) > self.min and (n - lastUse) > self.maxIdle:
# we are setting lastUse when processing finishes, it
# might be processing right now
if child not in self.busy:
# we need to remove this child from the ready set
# and the processes set because otherwise it might
# get calls from doWork
self.ready.discard(child)
self.processes.discard(child)
d.append(self.stopAWorker(child))
return defer.DeferredList(d)
def _pruneProcess(self, child):
"""
Remove every trace of the process from this instance.
"""
self.processes.discard(child)
self.ready.discard(child)
self.busy.discard(child)
self._lastUsage.pop(child, None)
self._calls.pop(child, None)
self._finishCallbacks.pop(child, None)
def _addProcess(self, child, finished):
"""
Adds the newly created child process to the pool.
"""
def restart(child, reason):
#log.msg("FATAL: Restarting after %s" % (reason,))
self._pruneProcess(child)
return self.startAWorker()
def dieGently(data, child):
#log.msg("STOPPING: '%s'" % (data,))
self._pruneProcess(child)
self.processes.add(child)
self.ready.add(child)
finished.addCallback(dieGently, child
).addErrback(lambda reason: restart(child, reason))
self._finishCallbacks[child] = finished
self._lastUsage[child] = now()
self._calls[child] = 0
self._catchUp()
def _catchUp(self):
"""
If there are queued items in the list then run them.
"""
if self._queue:
_, (d, command, kwargs) = pop(self._queue)
self._cb_doWork(command, **kwargs).chainDeferred(d)
def _handleTimeout(self, child):
"""
One of the children went timeout, we need to deal with it
@param child: The child process
@type child: L{child.AMPChild}
"""
try:
child.transport.signalProcess(self.timeout_signal)
except error.ProcessExitedAlready:
# don't do anything then... we are too late
# or we were too early to call
pass
def startAWorker(self):
"""
Start a worker and set it up in the system.
"""
if self.finished:
# this is a race condition: basically if we call self.stop()
# while a process is being recycled what happens is that the
# process will be created anyway. By putting a check for
# self.finished here we make sure that in no way we are creating
# processes when the pool is stopped.
# The race condition comes from the fact that:
# stopAWorker() is asynchronous while stop() is synchronous.
# so if you call:
# pp.stopAWorker(child).addCallback(lambda _: pp.startAWorker())
# pp.stop()
# You might end up with a dirty reactor due to the stop()
# returning before the new process is created.
return
startAMPProcess = self.starter.startAMPProcess
child, finished = startAMPProcess(self.ampChild,
ampParent=self.ampParent,
ampChildArgs=self.ampChildArgs)
return self._addProcess(child, finished)
def _cb_doWork(self, command, _timeout=None, _deadline=None,
**kwargs):
"""
Go and call the command.
@param command: The L{amp.Command} to be executed in the child
@type command: L{amp.Command}
@param _d: The deferred for the calling code.
@type _d: L{defer.Deferred}
@param _timeout: The timeout for this call only
@type _timeout: C{int}
@param _deadline: The deadline for this call only
@type _deadline: C{int}
"""
timeoutCall = None
deadlineCall = None
def _returned(result, child, is_error=False):
def cancelCall(call):
if call is not None and call.active():
call.cancel()
cancelCall(timeoutCall)
cancelCall(deadlineCall)
self.busy.discard(child)
if not die:
# we are not marked to be removed, so add us back to
# the ready set and let's see if there's some catching
# up to do
self.ready.add(child)
self._catchUp()
else:
# We should die and we do, then we start a new worker
# to pick up stuff from the queue otherwise we end up
# without workers and the queue will remain there.
self.stopAWorker(child).addCallback(lambda _: self.startAWorker())
self._lastUsage[child] = now()
# we can't do recycling here because it's too late and
# the process might have received tons of calls already
# which would make it run more calls than what is
# configured to do.
return result
die = False
child = self.ready.pop()
self.busy.add(child)
self._calls[child] += 1
# Let's see if this call goes over the recycling barrier
if self.recycleAfter and self._calls[child] >= self.recycleAfter:
# it does so mark this child, using a closure, to be
# removed at the end of the call.
die = True
# If the command doesn't require a response then callRemote
# returns nothing, so we prepare for that too.
# We also need to guard against timeout errors for child
# and local timeout parameter overrides the global one
if _timeout == 0:
timeout = _timeout
else:
timeout = _timeout or self.timeout
if timeout is not None:
from twisted.internet import reactor
timeoutCall = reactor.callLater(timeout, self._handleTimeout, child)
if _deadline is not None:
from twisted.internet import reactor
delay = max(0, _deadline - reactor.seconds())
deadlineCall = reactor.callLater(delay, self._handleTimeout,
child)
return defer.maybeDeferred(child.callRemote, command, **kwargs
).addCallback(_returned, child
).addErrback(_returned, child, is_error=True)
def callRemote(self, *args, **kwargs):
"""
Proxy call to keep the API homogeneous across twisted's RPCs
"""
return self.doWork(*args, **kwargs)
def doWork(self, command, **kwargs):
"""
Sends the command to one child.
@param command: an L{amp.Command} type object.
@type command: L{amp.Command}
@param kwargs: dictionary containing the arguments for the command.
"""
if self.ready: # there are unused processes, let's use them
return self._cb_doWork(command, **kwargs)
else:
if len(self.processes) < self.max:
# no unused but we can start some new ones
# since startAWorker is synchronous we won't have a
# race condition here in case of multiple calls to
# doWork, so we will end up in the else clause in case
# of such calls:
# Process pool with min=1, max=1, recycle_after=1
# [call(Command) for x in xrange(BIG_NUMBER)]
self.startAWorker()
return self._cb_doWork(command, **kwargs)
else:
# No one is free... just queue up and wait for a process
# to start and pick up the first item in the queue.
d = defer.Deferred()
self._queue.append((count(), (d, command, kwargs)))
return d
def stopAWorker(self, child=None):
"""
Gently stop a child so that it's not restarted anymore
@param command: an L{ampoule.child.AmpChild} type object.
@type command: L{ampoule.child.AmpChild} or None
"""
if child is None:
if self.ready:
child = self.ready.pop()
else:
child = choice(list(self.processes))
child.callRemote(commands.Shutdown
# This is needed for timeout handling, the reason is pretty hard
# to explain but I'll try to:
# There's another small race condition in the system. If the
# child process is shut down by a signal and you try to stop
# the process pool immediately afterwards, like tests would do,
# the child AMP object would still be in the system and trying
# to call the command Shutdown on it would result in the same
# errback that we got originally, for this reason we need to
# trap it now so that it doesn't raise by not being handled.
# Does this even make sense to you?
).addErrback(lambda reason: reason.trap(error.ProcessTerminated))
return self._finishCallbacks[child]
def _startSomeWorkers(self):
"""
Start a bunch of workers until we reach the max number of them.
"""
if len(self.processes) < self.max:
self.startAWorker()
def adjustPoolSize(self, min=None, max=None):
"""
Change the pool size to be at least min and less than max,
useful when you change the values of max and min in the instance
and you want the pool to adapt to them.
"""
if min is None:
min = self.min
if max is None:
max = self.max
assert min >= 0, 'minimum is negative'
assert min <= max, 'minimum is greater than maximum'
self.min = min
self.max = max
l = []
if self.started:
for i in xrange(len(self.processes)-self.max):
l.append(self.stopAWorker())
while len(self.processes) < self.min:
self.startAWorker()
return defer.DeferredList(l)#.addCallback(lambda _: self.dumpStats())
def stop(self):
"""
Stops the process protocol.
"""
self.finished = True
l = [self.stopAWorker(process) for process in self.processes]
def _cb(_):
if self.looping.running:
self.looping.stop()
return defer.DeferredList(l).addCallback(_cb)
def dumpStats(self):
log.msg("ProcessPool stats:")
log.msg('\tworkers: %s' % len(self.processes))
log.msg('\ttimeout: %s' % (self.timeout))
log.msg('\tparent: %r' % (self.ampParent,))
log.msg('\tchild: %r' % (self.ampChild,))
log.msg('\tmax idle: %r' % (self.maxIdle,))
log.msg('\trecycle after: %r' % (self.recycleAfter,))
log.msg('\tProcessStarter:')
log.msg('\t\t%r' % (self.starter,))
pp = None
def deferToAMPProcess(command, **kwargs):
"""
Helper function that sends a command to the default process pool
and returns a deferred that fires when the result of the
subprocess computation is ready.
@param command: an L{amp.Command} subclass
@param kwargs: dictionary containing the arguments for the command.
@return: a L{defer.Deferred} with the data from the subprocess.
"""
global pp
if pp is None:
pp = ProcessPool()
return pp.start().addCallback(lambda _: pp.doWork(command, **kwargs))
return pp.doWork(command, **kwargs)

View file

@ -1,65 +0,0 @@
"""
This module implements a remote pool to use with AMP.
"""
from twisted.protocols import amp
class AMPProxy(amp.AMP):
"""
A Proxy AMP protocol that forwards calls to a wrapped
callRemote-like callable.
"""
def __init__(self, wrapped, child):
"""
@param wrapped: A callRemote-like callable that takes an
L{amp.Command} as first argument and other
optional keyword arguments afterwards.
@type wrapped: L{callable}.
@param child: The protocol class of the process pool children.
Used to forward only the methods that are actually
understood correctly by them.
@type child: L{amp.AMP}
"""
amp.AMP.__init__(self)
self.wrapped = wrapped
self.child = child
localCd = set(self._commandDispatch.keys())
childCd = set(self.child._commandDispatch.keys())
assert localCd.intersection(childCd) == set(["StartTLS"]), \
"Illegal method overriding in Proxy"
def locateResponder(self, name):
"""
This is a custom locator to forward calls to the children
processes while keeping the ProcessPool a transparent MITM.
This way of working has a few limitations, the first of which
is the fact that children won't be able to take advantage of
any dynamic locator except for the default L{CommandLocator}
that is based on the _commandDispatch attribute added by the
metaclass. This limitation might be lifted in the future.
"""
if name == "StartTLS":
# This is a special case where the proxy takes precedence
return amp.AMP.locateResponder(self, "StartTLS")
# Get the dict of commands from the child AMP implementation.
cd = self.child._commandDispatch
if name in cd:
# If the command is there, then we forward stuff to it.
commandClass, _responderFunc = cd[name]
# We need to wrap the doWork function because the wrapping
# call doesn't pass the command as first argument since it
# thinks that we are the actual receivers and callable is
# already the responder while it isn't.
doWork = lambda **kw: self.wrapped(commandClass, **kw)
# Now let's call the right function and wrap the result
# dictionary.
return self._wrapWithSerialization(doWork, commandClass)
# of course if the name of the command is not in the child it
# means that it might be in this class, so fallback to the
# default behavior of this module.
return amp.AMP.locateResponder(self, name)

View file

@ -1,69 +0,0 @@
import os
from twisted.application import service
from twisted.internet.protocol import ServerFactory
def makeService(options):
"""
Create the service for the application
"""
ms = service.MultiService()
from evennia.contrib.procpools.ampoule.pool import ProcessPool
from evennia.contrib.procpools.ampoule.main import ProcessStarter
name = options['name']
ampport = options['ampport']
ampinterface = options['ampinterface']
child = options['child']
parent = options['parent']
min = options['min']
max = options['max']
maxIdle = options['max_idle']
recycle = options['recycle']
childReactor = options['reactor']
timeout = options['timeout']
starter = ProcessStarter(packages=("twisted",), childReactor=childReactor)
pp = ProcessPool(child, parent, min, max, name, maxIdle, recycle, starter, timeout)
svc = AMPouleService(pp, child, ampport, ampinterface)
svc.setServiceParent(ms)
return ms
class AMPouleService(service.Service):
def __init__(self, pool, child, port, interface):
self.pool = pool
self.port = port
self.child = child
self.interface = interface
self.server = None
def startService(self):
"""
Before reactor.run() is called we setup the system.
"""
service.Service.startService(self)
from evennia.contrib.procpools.ampoule import rpool
from twisted.internet import reactor
try:
factory = ServerFactory()
factory.protocol = lambda: rpool.AMPProxy(wrapped=self.pool.doWork,
child=self.child)
self.server = reactor.listenTCP(self.port,
factory,
interface=self.interface)
# this is synchronous when it's the startup, even though
# it returns a deferred. But we need to run it after the
# first cycle in order to wait for signal handlers to be
# installed.
reactor.callLater(0, self.pool.start)
except:
import traceback
print traceback.format_exc()
def stopService(self):
service.Service.stopService(self)
if self.server is not None:
self.server.stopListening()
return self.pool.stop()

View file

@ -1,892 +0,0 @@
from signal import SIGHUP
import os
import os.path
from cStringIO import StringIO as sio
import tempfile
from twisted.internet import error, defer, reactor
from twisted.python import failure
from twisted.trial import unittest
from twisted.protocols import amp
from evennia.contrib.procpools.ampoule import main, child, commands, pool
class ShouldntHaveBeenCalled(Exception):
pass
def _raise(_):
raise ShouldntHaveBeenCalled(_)
class _FakeT(object):
closeStdinCalled = False
def __init__(self, s):
self.s = s
def closeStdin(self):
self.closeStdinCalled = True
def write(self, data):
self.s.write(data)
class FakeAMP(object):
connector = None
reason = None
def __init__(self, s):
self.s = s
def makeConnection(self, connector):
if self.connector is not None:
raise Exception("makeConnection called twice")
self.connector = connector
def connectionLost(self, reason):
if self.reason is not None:
raise Exception("connectionLost called twice")
self.reason = reason
def dataReceived(self, data):
self.s.write(data)
class Ping(amp.Command):
arguments = [('data', amp.String())]
response = [('response', amp.String())]
class Pong(amp.Command):
arguments = [('data', amp.String())]
response = [('response', amp.String())]
class Pid(amp.Command):
response = [('pid', amp.Integer())]
class Reactor(amp.Command):
response = [('classname', amp.String())]
class NoResponse(amp.Command):
arguments = [('arg', amp.String())]
requiresAnswer = False
class GetResponse(amp.Command):
response = [("response", amp.String())]
class Child(child.AMPChild):
def ping(self, data):
return self.callRemote(Pong, data=data)
Ping.responder(ping)
class PidChild(child.AMPChild):
def pid(self):
import os
return {'pid': os.getpid()}
Pid.responder(pid)
class NoResponseChild(child.AMPChild):
_set = False
def noresponse(self, arg):
self._set = arg
return {}
NoResponse.responder(noresponse)
def getresponse(self):
return {"response": self._set}
GetResponse.responder(getresponse)
class ReactorChild(child.AMPChild):
def reactor(self):
from twisted.internet import reactor
return {'classname': reactor.__class__.__name__}
Reactor.responder(reactor)
class First(amp.Command):
arguments = [('data', amp.String())]
response = [('response', amp.String())]
class Second(amp.Command):
pass
class WaitingChild(child.AMPChild):
deferred = None
def first(self, data):
self.deferred = defer.Deferred()
return self.deferred.addCallback(lambda _: {'response': data})
First.responder(first)
def second(self):
self.deferred.callback('')
return {}
Second.responder(second)
class Die(amp.Command):
pass
class BadChild(child.AMPChild):
def die(self):
self.shutdown = False
self.transport.loseConnection()
return {}
Die.responder(die)
class Write(amp.Command):
response = [("response", amp.String())]
pass
class Writer(child.AMPChild):
def __init__(self, data='hello'):
child.AMPChild.__init__(self)
self.data = data
def write(self):
return {'response': self.data}
Write.responder(write)
class GetCWD(amp.Command):
response = [("cwd", amp.String())]
class TempDirChild(child.AMPChild):
def __init__(self, directory=None):
child.AMPChild.__init__(self)
self.directory = directory
def __enter__(self):
directory = tempfile.mkdtemp()
os.chdir(directory)
if self.directory is not None:
os.mkdir(self.directory)
os.chdir(self.directory)
def __exit__(self, exc_type, exc_val, exc_tb):
cwd = os.getcwd()
os.chdir('..')
os.rmdir(cwd)
def getcwd(self):
return {'cwd': os.getcwd()}
GetCWD.responder(getcwd)
class TestAMPConnector(unittest.TestCase):
def setUp(self):
"""
The only reason why this method exists is to let 'trial ampoule'
to install the signal handlers (#3178 for reference).
"""
super(TestAMPConnector, self).setUp()
d = defer.Deferred()
reactor.callLater(0, d.callback, None)
return d
def _makeConnector(self, s, sa):
a = FakeAMP(sa)
ac = main.AMPConnector(a)
assert ac.name is not None
ac.transport = _FakeT(s)
return ac
def test_protocol(self):
"""
Test that outReceived writes to AMP and that it triggers the
finished deferred once the process ended.
"""
s = sio()
sa = sio()
ac = self._makeConnector(s, sa)
for x in xrange(99):
ac.childDataReceived(4, str(x))
ac.processEnded(failure.Failure(error.ProcessDone(0)))
return ac.finished.addCallback(
lambda _: self.assertEqual(sa.getvalue(), ''.join(str(x) for x in xrange(99)))
)
def test_protocol_failing(self):
"""
Test that a failure in the process termination is correctly
propagated to the finished deferred.
"""
s = sio()
sa = sio()
ac = self._makeConnector(s, sa)
ac.finished.addCallback(_raise)
fail = failure.Failure(error.ProcessTerminated())
self.assertFailure(ac.finished, error.ProcessTerminated)
ac.processEnded(fail)
def test_startProcess(self):
"""
Test that startProcess actually starts a subprocess and that
it receives data back from the process through AMP.
"""
s = sio()
a = FakeAMP(s)
STRING = "ciao"
BOOT = """\
import sys, os
def main(arg):
os.write(4, arg)
main(sys.argv[1])
"""
starter = main.ProcessStarter(bootstrap=BOOT,
args=(STRING,),
packages=("twisted",))
amp, finished = starter.startPythonProcess(main.AMPConnector(a))
def _eb(reason):
print reason
finished.addErrback(_eb)
return finished.addCallback(lambda _: self.assertEquals(s.getvalue(), STRING))
def test_failing_deferToProcess(self):
"""
Test failing subprocesses and the way they terminate and preserve
failing information.
"""
s = sio()
a = FakeAMP(s)
STRING = "ciao"
BOOT = """\
import sys
def main(arg):
raise Exception(arg)
main(sys.argv[1])
"""
starter = main.ProcessStarter(bootstrap=BOOT, args=(STRING,), packages=("twisted",))
ready, finished = starter.startPythonProcess(main.AMPConnector(a), "I'll be ignored")
self.assertFailure(finished, error.ProcessTerminated)
finished.addErrback(lambda reason: self.assertEquals(reason.getMessage(), STRING))
return finished
def test_env_setting(self):
"""
Test that and environment variable passed to the process starter
is correctly passed to the child process.
"""
s = sio()
a = FakeAMP(s)
STRING = "ciao"
BOOT = """\
import sys, os
def main():
os.write(4, os.getenv("FOOBAR"))
main()
"""
starter = main.ProcessStarter(bootstrap=BOOT,
packages=("twisted",),
env={"FOOBAR": STRING})
amp, finished = starter.startPythonProcess(main.AMPConnector(a), "I'll be ignored")
def _eb(reason):
print reason
finished.addErrback(_eb)
return finished.addCallback(lambda _: self.assertEquals(s.getvalue(), STRING))
def test_startAMPProcess(self):
"""
Test that you can start an AMP subprocess and that it correctly
accepts commands and correctly answers them.
"""
STRING = "ciao"
starter = main.ProcessStarter(packages=("twisted",))
c, finished = starter.startAMPProcess(child.AMPChild)
c.callRemote(commands.Echo, data=STRING
).addCallback(lambda response:
self.assertEquals(response['response'], STRING)
).addCallback(lambda _: c.callRemote(commands.Shutdown))
return finished
def test_BootstrapContext(self):
starter = main.ProcessStarter(packages=('twisted',))
c, finished = starter.startAMPProcess(TempDirChild)
cwd = []
def checkBootstrap(response):
cwd.append(response['cwd'])
self.assertNotEquals(cwd, os.getcwd())
d = c.callRemote(GetCWD)
d.addCallback(checkBootstrap)
d.addCallback(lambda _: c.callRemote(commands.Shutdown))
finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0])))
return finished
def test_BootstrapContextInstance(self):
starter = main.ProcessStarter(packages=('twisted',))
c, finished = starter.startAMPProcess(TempDirChild,
ampChildArgs=('foo',))
cwd = []
def checkBootstrap(response):
cwd.append(response['cwd'])
self.assertTrue(cwd[0].endswith('/foo'))
d = c.callRemote(GetCWD)
d.addCallback(checkBootstrap)
d.addCallback(lambda _: c.callRemote(commands.Shutdown))
finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0])))
return finished
def test_startAMPAndParentProtocol(self):
"""
Test that you can start an AMP subprocess and the children can
call methods on their parent.
"""
DATA = "CIAO"
APPEND = "123"
class Parent(amp.AMP):
def pong(self, data):
return {'response': DATA+APPEND}
Pong.responder(pong)
starter = main.ProcessStarter(packages=("twisted",))
subp, finished = starter.startAMPProcess(ampChild=Child, ampParent=Parent)
subp.callRemote(Ping, data=DATA
).addCallback(lambda response:
self.assertEquals(response['response'], DATA+APPEND)
).addCallback(lambda _: subp.callRemote(commands.Shutdown))
return finished
def test_roundtripError(self):
"""
Test that invoking a child using an unreachable class raises
a L{RunTimeError} .
"""
class Child(child.AMPChild):
pass
starter = main.ProcessStarter(packages=("twisted",))
self.assertRaises(RuntimeError, starter.startAMPProcess, ampChild=Child)
class TestProcessPool(unittest.TestCase):
def test_startStopWorker(self):
"""
Test that starting and stopping a worker keeps the state of
the process pool consistent.
"""
pp = pool.ProcessPool()
self.assertEquals(pp.started, False)
self.assertEquals(pp.finished, False)
self.assertEquals(pp.processes, set())
self.assertEquals(pp._finishCallbacks, {})
def _checks():
self.assertEquals(pp.started, False)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), 1)
self.assertEquals(len(pp._finishCallbacks), 1)
return pp.stopAWorker()
def _closingUp(_):
self.assertEquals(pp.started, False)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), 0)
self.assertEquals(pp._finishCallbacks, {})
pp.startAWorker()
return _checks().addCallback(_closingUp).addCallback(lambda _: pp.stop())
def test_startAndStop(self):
"""
Test that a process pool's start and stop method create the
expected number of workers and keep state consistent in the
process pool.
"""
pp = pool.ProcessPool()
self.assertEquals(pp.started, False)
self.assertEquals(pp.finished, False)
self.assertEquals(pp.processes, set())
self.assertEquals(pp._finishCallbacks, {})
def _checks(_):
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.min)
self.assertEquals(len(pp._finishCallbacks), pp.min)
return pp.stop()
def _closingUp(_):
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, True)
self.assertEquals(len(pp.processes), 0)
self.assertEquals(pp._finishCallbacks, {})
return pp.start().addCallback(_checks).addCallback(_closingUp)
def test_adjustPoolSize(self):
"""
Test that calls to pool.adjustPoolSize are correctly handled.
"""
pp = pool.ProcessPool(min=10)
self.assertEquals(pp.started, False)
self.assertEquals(pp.finished, False)
self.assertEquals(pp.processes, set())
self.assertEquals(pp._finishCallbacks, {})
def _resize1(_):
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.min)
self.assertEquals(len(pp._finishCallbacks), pp.min)
return pp.adjustPoolSize(min=2, max=3)
def _resize2(_):
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(pp.max, 3)
self.assertEquals(pp.min, 2)
self.assertEquals(len(pp.processes), pp.max)
self.assertEquals(len(pp._finishCallbacks), pp.max)
def _resize3(_):
self.assertRaises(AssertionError, pp.adjustPoolSize, min=-1, max=5)
self.assertRaises(AssertionError, pp.adjustPoolSize, min=5, max=1)
return pp.stop()
return pp.start(
).addCallback(_resize1
).addCallback(_resize2
).addCallback(_resize3)
def test_childRestart(self):
"""
Test that a failing child process is immediately restarted.
"""
pp = pool.ProcessPool(ampChild=BadChild, min=1)
STRING = "DATA"
def _checks(_):
d = pp._finishCallbacks.values()[0]
pp.doWork(Die).addErrback(lambda _: None)
return d.addBoth(_checksAgain)
def _checksAgain(_):
return pp.doWork(commands.Echo, data=STRING
).addCallback(lambda result: self.assertEquals(result['response'], STRING))
return pp.start(
).addCallback(_checks
).addCallback(lambda _: pp.stop())
def test_parentProtocolChange(self):
"""
Test that the father can use an AMP protocol too.
"""
DATA = "CIAO"
APPEND = "123"
class Parent(amp.AMP):
def pong(self, data):
return {'response': DATA+APPEND}
Pong.responder(pong)
pp = pool.ProcessPool(ampChild=Child, ampParent=Parent)
def _checks(_):
return pp.doWork(Ping, data=DATA
).addCallback(lambda response:
self.assertEquals(response['response'], DATA+APPEND)
)
return pp.start().addCallback(_checks).addCallback(lambda _: pp.stop())
def test_deferToAMPProcess(self):
"""
Test that deferToAMPProcess works as expected.
"""
def cleanupGlobalPool():
d = pool.pp.stop()
pool.pp = None
return d
self.addCleanup(cleanupGlobalPool)
STRING = "CIAOOOO"
d = pool.deferToAMPProcess(commands.Echo, data=STRING)
d.addCallback(self.assertEquals, {"response": STRING})
return d
def test_checkStateInPool(self):
"""
Test that busy and ready lists are correctly maintained.
"""
pp = pool.ProcessPool(ampChild=WaitingChild)
DATA = "foobar"
def _checks(_):
d = pp.callRemote(First, data=DATA)
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.min)
self.assertEquals(len(pp._finishCallbacks), pp.min)
self.assertEquals(len(pp.ready), pp.min-1)
self.assertEquals(len(pp.busy), 1)
child = pp.busy.pop()
pp.busy.add(child)
child.callRemote(Second)
return d
return pp.start(
).addCallback(_checks
).addCallback(lambda _: pp.stop())
def test_growingToMax(self):
"""
Test that the pool grows over time until it reaches max processes.
"""
MAX = 5
pp = pool.ProcessPool(ampChild=WaitingChild, min=1, max=MAX)
def _checks(_):
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.min)
self.assertEquals(len(pp._finishCallbacks), pp.min)
D = "DATA"
d = [pp.doWork(First, data=D) for x in xrange(MAX)]
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.max)
self.assertEquals(len(pp._finishCallbacks), pp.max)
[child.callRemote(Second) for child in pp.processes]
return defer.DeferredList(d)
return pp.start(
).addCallback(_checks
).addCallback(lambda _: pp.stop())
def test_growingToMaxAndShrinking(self):
"""
Test that the pool grows but after 'idle' time the number of
processes goes back to the minimum.
"""
MAX = 5
MIN = 1
IDLE = 1
pp = pool.ProcessPool(ampChild=WaitingChild, min=MIN, max=MAX, maxIdle=IDLE)
def _checks(_):
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.min)
self.assertEquals(len(pp._finishCallbacks), pp.min)
D = "DATA"
d = [pp.doWork(First, data=D) for x in xrange(MAX)]
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.max)
self.assertEquals(len(pp._finishCallbacks), pp.max)
[child.callRemote(Second) for child in pp.processes]
return defer.DeferredList(d).addCallback(_realChecks)
def _realChecks(_):
from twisted.internet import reactor
d = defer.Deferred()
def _cb():
def __(_):
try:
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.min)
self.assertEquals(len(pp._finishCallbacks), pp.min)
d.callback(None)
except Exception, e:
d.errback(e)
return pp._pruneProcesses().addCallback(__)
# just to be shure we are called after the pruner
pp.looping.stop() # stop the looping, we don't want it to
# this right here
reactor.callLater(IDLE, _cb)
return d
return pp.start(
).addCallback(_checks
).addCallback(lambda _: pp.stop())
def test_recycling(self):
"""
Test that after a given number of calls subprocesses are
recycled.
"""
MAX = 1
MIN = 1
RECYCLE_AFTER = 1
pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER)
self.addCleanup(pp.stop)
def _checks(_):
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.min)
self.assertEquals(len(pp._finishCallbacks), pp.min)
return pp.doWork(Pid
).addCallback(lambda response: response['pid'])
def _checks2(pid):
return pp.doWork(Pid
).addCallback(lambda response: response['pid']
).addCallback(self.assertNotEquals, pid)
d = pp.start()
d.addCallback(_checks)
d.addCallback(_checks2)
return d
def test_recyclingWithQueueOverload(self):
"""
Test that we get the correct number of different results when
we overload the pool of calls.
"""
MAX = 5
MIN = 1
RECYCLE_AFTER = 10
CALLS = 60
pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER)
self.addCleanup(pp.stop)
def _check(results):
s = set()
for succeed, response in results:
s.add(response['pid'])
# For the first C{MAX} calls, each is basically guaranteed to go to
# a different child. After that, though, there are no guarantees.
# All the rest might go to a single child, since the child to
# perform a job is selected arbitrarily from the "ready" set. Fair
# distribution of jobs needs to be implemented; right now it's "set
# ordering" distribution of jobs.
self.assertTrue(len(s) > MAX)
def _work(_):
l = [pp.doWork(Pid) for x in xrange(CALLS)]
d = defer.DeferredList(l)
return d.addCallback(_check)
d = pp.start()
d.addCallback(_work)
return d
def test_disableProcessRecycling(self):
"""
Test that by setting 0 to recycleAfter we actually disable process recycling.
"""
MAX = 1
MIN = 1
RECYCLE_AFTER = 0
pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER)
def _checks(_):
self.assertEquals(pp.started, True)
self.assertEquals(pp.finished, False)
self.assertEquals(len(pp.processes), pp.min)
self.assertEquals(len(pp._finishCallbacks), pp.min)
return pp.doWork(Pid
).addCallback(lambda response: response['pid'])
def _checks2(pid):
return pp.doWork(Pid
).addCallback(lambda response: response['pid']
).addCallback(self.assertEquals, pid
).addCallback(lambda _: pid)
def finish(reason):
return pp.stop().addCallback(lambda _: reason)
return pp.start(
).addCallback(_checks
).addCallback(_checks2
).addCallback(_checks2
).addCallback(finish)
def test_changeChildrenReactor(self):
"""
Test that by passing the correct argument children change their
reactor type.
"""
MAX = 1
MIN = 1
FIRST = "select"
SECOND = "poll"
def checkDefault():
pp = pool.ProcessPool(
starter=main.ProcessStarter(
childReactor=FIRST,
packages=("twisted",)),
ampChild=ReactorChild, min=MIN, max=MAX)
pp.start()
return pp.doWork(Reactor
).addCallback(self.assertEquals, {'classname': "SelectReactor"}
).addCallback(lambda _: pp.stop())
def checkPool(_):
pp = pool.ProcessPool(
starter=main.ProcessStarter(
childReactor=SECOND,
packages=("twisted",)),
ampChild=ReactorChild, min=MIN, max=MAX)
pp.start()
return pp.doWork(Reactor
).addCallback(self.assertEquals, {'classname': "PollReactor"}
).addCallback(lambda _: pp.stop())
return checkDefault(
).addCallback(checkPool)
try:
from select import poll
except ImportError:
test_changeChildrenReactor.skip = "This architecture doesn't support select.poll, I can't run this test"
def test_commandsWithoutResponse(self):
"""
Test that if we send a command without a required answer we
actually don't have any problems.
"""
DATA = "hello"
pp = pool.ProcessPool(ampChild=NoResponseChild, min=1, max=1)
def _check(_):
return pp.doWork(GetResponse
).addCallback(self.assertEquals, {"response": DATA})
def _work(_):
return pp.doWork(NoResponse, arg=DATA)
return pp.start(
).addCallback(_work
).addCallback(_check
).addCallback(lambda _: pp.stop())
def test_SupplyChildArgs(self):
"""Ensure that arguments for the child constructor are passed in."""
pp = pool.ProcessPool(Writer, ampChildArgs=['body'], min=0)
def _check(result):
return pp.doWork(Write).addCallback(
self.assertEquals, {'response': 'body'})
return pp.start(
).addCallback(_check
).addCallback(lambda _: pp.stop())
def processTimeoutTest(self, timeout):
pp = pool.ProcessPool(WaitingChild, min=1, max=1)
def _work(_):
d = pp.callRemote(First, data="ciao", _timeout=timeout)
self.assertFailure(d, error.ProcessTerminated)
return d
return pp.start(
).addCallback(_work
).addCallback(lambda _: pp.stop())
def test_processTimeout(self):
"""
Test that a call that doesn't finish within the given timeout
time is correctly handled.
"""
return self.processTimeoutTest(1)
def test_processTimeoutZero(self):
"""
Test that the process is correctly handled when the timeout is zero.
"""
return self.processTimeoutTest(0)
def test_processDeadline(self):
pp = pool.ProcessPool(WaitingChild, min=1, max=1)
def _work(_):
d = pp.callRemote(First, data="ciao", _deadline=reactor.seconds())
self.assertFailure(d, error.ProcessTerminated)
return d
return pp.start(
).addCallback(_work
).addCallback(lambda _: pp.stop())
def test_processBeforeDeadline(self):
pp = pool.ProcessPool(PidChild, min=1, max=1)
def _work(_):
d = pp.callRemote(Pid, _deadline=reactor.seconds() + 10)
d.addCallback(lambda result: self.assertNotEqual(result['pid'], 0))
return d
return pp.start(
).addCallback(_work
).addCallback(lambda _: pp.stop())
def test_processTimeoutSignal(self):
"""
Test that a call that doesn't finish within the given timeout
time is correctly handled.
"""
pp = pool.ProcessPool(WaitingChild, min=1, max=1,
timeout_signal=SIGHUP)
def _work(_):
d = pp.callRemote(First, data="ciao", _timeout=1)
d.addCallback(lambda d: self.fail())
text = 'signal %d' % SIGHUP
d.addErrback(
lambda f: self.assertTrue(text in f.value[0],
'"%s" not in "%s"' % (text, f.value[0])))
return d
return pp.start(
).addCallback(_work
).addCallback(lambda _: pp.stop())
def test_processGlobalTimeout(self):
"""
Test that a call that doesn't finish within the given global
timeout time is correctly handled.
"""
pp = pool.ProcessPool(WaitingChild, min=1, max=1, timeout=1)
def _work(_):
d = pp.callRemote(First, data="ciao")
self.assertFailure(d, error.ProcessTerminated)
return d
return pp.start(
).addCallback(_work
).addCallback(lambda _: pp.stop())

View file

@ -1,52 +0,0 @@
from twisted.internet import defer, reactor
from twisted.internet.protocol import ClientFactory
from twisted.trial import unittest
from twisted.protocols import amp
from evennia.contrib.procpools.ampoule import service, child, pool, main
from evennia.contrib.procpools.ampoule.commands import Echo
class ClientAMP(amp.AMP):
factory = None
def connectionMade(self):
if self.factory is not None:
self.factory.theProto = self
if hasattr(self.factory, 'onMade'):
self.factory.onMade.callback(None)
class TestAMPProxy(unittest.TestCase):
def setUp(self):
"""
Setup the proxy service and the client connection to the proxy
service in order to run call through them.
Inspiration comes from twisted.test.test_amp
"""
self.pp = pool.ProcessPool()
self.svc = service.AMPouleService(self.pp, child.AMPChild, 0, "")
self.svc.startService()
self.proxy_port = self.svc.server.getHost().port
self.clientFactory = ClientFactory()
self.clientFactory.protocol = ClientAMP
d = self.clientFactory.onMade = defer.Deferred()
self.clientConn = reactor.connectTCP("127.0.0.1",
self.proxy_port,
self.clientFactory)
self.addCleanup(self.clientConn.disconnect)
self.addCleanup(self.svc.stopService)
def setClient(_):
self.client = self.clientFactory.theProto
return d.addCallback(setClient)
def test_forwardCall(self):
"""
Test that a call made from a client is correctly forwarded to
the process pool and the result is correctly reported.
"""
DATA = "hello"
return self.client.callRemote(Echo, data=DATA).addCallback(
self.assertEquals, {'response': DATA}
)

View file

@ -1,46 +0,0 @@
"""
some utilities
"""
import os
import sys
import __main__
from twisted.python.filepath import FilePath
from twisted.python.reflect import namedAny
# from twisted.python.modules import theSystemPath
def findPackagePath(modulePath):
"""
Try to find the sys.path entry from a modulePath object, simultaneously
computing the module name of the targetted file.
"""
p = modulePath
l = [p.basename().split(".")[0]]
while p.parent() != p:
for extension in ['py', 'pyc', 'pyo', 'pyd', 'dll']:
sib = p.sibling("__init__."+extension)
if sib.exists():
p = p.parent()
l.insert(0, p.basename())
break
else:
return p.parent(), '.'.join(l)
def mainpoint(function):
"""
Decorator which declares a function to be an object's mainpoint.
"""
if function.__module__ == '__main__':
# OK time to run a function
p = FilePath(__main__.__file__)
p, mn = findPackagePath(p)
pname = p.path
if pname not in map(os.path.abspath, sys.path):
sys.path.insert(0, pname)
# Maybe remove the module's path?
exitcode = namedAny(mn+'.'+function.__name__)(sys.argv)
if exitcode is None:
exitcode = 0
sys.exit(exitcode)
return function

View file

@ -1,326 +0,0 @@
"""
Python ProcPool
Evennia Contribution - Griatch 2012
The ProcPool is used to execute code on a separate process. This allows for
true asynchronous operation. Process communication happens over AMP and is
thus fully asynchronous as far as Evennia is concerned.
The process pool is implemented using a slightly modified version of
the Ampoule package (included).
The python_process pool is a service activated with the instructions
in python_procpool_plugin.py.
To use, import run_async from this module and use instead of the
in-process version found in evennia.utils.utils. Note that this is a much
more complex function than the default run_async, so make sure to read
the header carefully.
To test it works, make sure to activate the process pool, then try the
following as superuser:
@py from evennia.contrib.procpools.python_procpool import run_async;run_async("_return('Wohoo!')", at_return=self.msg, at_err=self.msg)
You can also try to import time and do time.sleep(5) before the
_return statement, to test it really is asynchronous.
"""
from twisted.protocols import amp
from twisted.internet import threads
from evennia.contrib.procpools.ampoule.child import AMPChild
from evennia.utils.dbserialize import to_pickle, from_pickle, do_pickle, do_unpickle
from evennia.utils.idmapper.base import PROC_MODIFIED_OBJS
from evennia.utils.utils import clean_object_caches, to_str
from evennia.utils import logger
#
# Multiprocess command for communication Server<->Client, relaying
# data for remote Python execution
#
class ExecuteCode(amp.Command):
"""
Executes python code in the python process,
returning result when ready.
source - a compileable Python source code string
environment - a pickled dictionary of Python
data. Each key will become the name
of a variable available to the source
code. Database objects are stored on
the form ((app, modelname), id) allowing
the receiver to easily rebuild them on
this side.
errors - an all-encompassing error handler
response - a string or a pickled string
"""
arguments = [('source', amp.String()),
('environment', amp.String())]
errors = [(Exception, 'EXCEPTION')]
response = [('response', amp.String()),
('recached', amp.String())]
#
# Multiprocess AMP client-side factory, for executing remote Python code
#
class PythonProcPoolChild(AMPChild):
"""
This is describing what happens on the subprocess side.
This already supports Echo, Shutdown and Ping.
Methods:
executecode - a remote code execution environment
"""
def executecode(self, source, environment):
"""
Remote code execution
source - Python code snippet
environment - pickled dictionary of environment
variables. They are stored in
two keys "normal" and "objs" where
normal holds a dictionary of
normally pickled python objects
wheras objs points to a dictionary
of database represenations ((app,key),id).
The environment's entries will be made available as
local variables during the execution. Normal eval
results will be returned as-is. For more complex
code snippets (run by exec), the _return function
is available: All data sent to _return(retval) will
be returned from this system whenever the system
finishes. Multiple calls to _return will result in
a list being return. The return value is pickled
and thus allows for returning any pickleable data.
"""
class Ret(object):
"Helper class for holding returns from exec"
def __init__(self):
self.returns = []
def __call__(self, *args, **kwargs):
self.returns.extend(list(args))
def get_returns(self):
lr = len(self.returns)
val = lr and (lr == 1 and self.returns[0] or self.returns) or None
if val not in (None, [], ()):
return do_pickle(to_pickle(val))
else:
return ""
_return = Ret()
available_vars = {'_return': _return}
if environment:
# load environment
try:
environment = from_pickle(do_unpickle(environment))
available_vars.update(environment)
except Exception:
logger.log_trace()
# try to execute with eval first
try:
ret = eval(source, {}, available_vars)
if ret not in (None, [], ()):
ret = _return.get_returns() or do_pickle(to_pickle(ret))
else:
ret = ""
except Exception:
# use exec instead
exec source in available_vars
ret = _return.get_returns()
# get the list of affected objects to recache
objs = PROC_MODIFIED_OBJS.values()
# we need to include the locations too, to update their content caches
objs = objs + list(set([o.location for o in objs
if hasattr(o, "location") and o.location]))
#print "objs:", objs
#print "to_pickle", to_pickle(objs, emptypickle=False, do_pickle=False)
if objs not in (None, [], ()):
to_recache = do_pickle(to_pickle(objs))
else:
to_recache = ""
# empty the list without loosing memory reference
#PROC_MODIFIED_OBJS[:] = []
PROC_MODIFIED_OBJS.clear() #TODO - is this not messing anything up?
return {'response': ret,
'recached': to_recache}
ExecuteCode.responder(executecode)
#
# Procpool run_async - Server-side access function for executing
# code in another process
#
_PPOOL = None
_SESSIONS = None
_PROC_ERR = "A process has ended with a probable error condition: process ended by signal 9."
def run_async(to_execute, *args, **kwargs):
"""
Runs a function or executes a code snippet asynchronously.
Inputs:
to_execute (callable) - if this is a callable, it will
be executed with *args and non-reserver *kwargs as
arguments.
The callable will be executed using ProcPool, or in
a thread if ProcPool is not available.
to_execute (string) - this is only available is ProcPool is
running. If a string, to_execute this will be treated as a code
snippet to execute asynchronously. *args are then not used
and non-reserverd *kwargs are used to define the execution
environment made available to the code.
reserved kwargs:
'use_thread' (bool) - this only works with callables (not code).
It forces the code to run in a thread instead
of using the Process Pool, even if the latter
is available. This could be useful if you want
to make sure to not get out of sync with the
main process (such as accessing in-memory global
properties)
'proc_timeout' (int) - only used if ProcPool is available. Sets a
max time for execution. This alters the value set
by settings.PROCPOOL_TIMEOUT
'at_return' -should point to a callable with one argument.
It will be called with the return value from
to_execute.
'at_return_kwargs' - this dictionary which be used as keyword
arguments to the at_return callback.
'at_err' - this will be called with a Failure instance if
there is an error in to_execute.
'at_err_kwargs' - this dictionary will be used as keyword
arguments to the at_err errback.
'procpool_name' - the Service name of the procpool to use.
Default is PythonProcPool.
*args - if to_execute is a callable, these args will be used
as arguments for that function. If to_execute is a string
*args are not used.
*kwargs - if to_execute is a callable, these kwargs will be used
as keyword arguments in that function. If a string, they
instead are used to define the executable environment
that should be available to execute the code in to_execute.
run_async will either relay the code to a thread or to a processPool
depending on input and what is available in the system. To activate
Process pooling, settings.PROCPOOL_ENABLE must be set.
to_execute in string form should handle all imports needed. kwargs
can be used to send objects and properties. Such properties will
be pickled, except Database Objects which will be sent across
on a special format and re-loaded on the other side.
To get a return value from your code snippet, Use the _return()
function: Every call to this function from your snippet will
append the argument to an internal list of returns. This return value
(or a list) will be the first argument to the at_return callback.
Use this function with restrain and only for features/commands
that you know has no influence on the cause-and-effect order of your
game (commands given after the async function might be executed before
it has finished). Accessing the same property from different
threads/processes can lead to unpredicted behaviour if you are not
careful (this is called a "race condition").
Also note that some databases, notably sqlite3, don't support access from
multiple threads simultaneously, so if you do heavy database access from
your to_execute under sqlite3 you will probably run very slow or even get
tracebacks.
"""
# handle all global imports.
global _PPOOL, _SESSIONS
# get the procpool name, if set in kwargs
procpool_name = kwargs.get("procpool_name", "PythonProcPool")
if _PPOOL is None:
# Try to load process Pool
from evennia.server.sessionhandler import SESSIONS as _SESSIONS
try:
_PPOOL = _SESSIONS.server.services.namedServices.get(procpool_name).pool
except AttributeError:
_PPOOL = False
use_timeout = kwargs.pop("proc_timeout", _PPOOL.timeout)
# helper converters for callbacks/errbacks
def convert_return(f):
def func(ret, *args, **kwargs):
rval = ret["response"] and from_pickle(do_unpickle(ret["response"]))
reca = ret["recached"] and from_pickle(do_unpickle(ret["recached"]))
# recache all indicated objects
[clean_object_caches(obj) for obj in reca]
if f:
return f(rval, *args, **kwargs)
else:
return rval
return func
def convert_err(f):
def func(err, *args, **kwargs):
err.trap(Exception)
err = err.getErrorMessage()
if use_timeout and err == _PROC_ERR:
err = "Process took longer than %ss and timed out." % use_timeout
if f:
return f(err, *args, **kwargs)
else:
err = "Error reported from subprocess: '%s'" % err
logger.log_errmsg(err)
return func
# handle special reserved input kwargs
use_thread = kwargs.pop("use_thread", False)
callback = convert_return(kwargs.pop("at_return", None))
errback = convert_err(kwargs.pop("at_err", None))
callback_kwargs = kwargs.pop("at_return_kwargs", {})
errback_kwargs = kwargs.pop("at_err_kwargs", {})
if _PPOOL and not use_thread:
# process pool is running
if isinstance(to_execute, basestring):
# run source code in process pool
cmdargs = {"_timeout": use_timeout}
cmdargs["source"] = to_str(to_execute)
if kwargs:
cmdargs["environment"] = do_pickle(to_pickle(kwargs))
else:
cmdargs["environment"] = ""
# defer to process pool
deferred = _PPOOL.doWork(ExecuteCode, **cmdargs)
elif callable(to_execute):
# execute callable in process
callname = to_execute.__name__
cmdargs = {"_timeout": use_timeout}
cmdargs["source"] = "_return(%s(*args,**kwargs))" % callname
cmdargs["environment"] = do_pickle(to_pickle({callname: to_execute,
"args": args,
"kwargs": kwargs}))
deferred = _PPOOL.doWork(ExecuteCode, **cmdargs)
else:
raise RuntimeError("'%s' could not be handled by the process pool" % to_execute)
elif callable(to_execute):
# no process pool available, fall back to old deferToThread mechanism.
deferred = threads.deferToThread(to_execute, *args, **kwargs)
else:
# no appropriate input for this server setup
raise RuntimeError("'%s' could not be handled by run_async - no valid input or no process pool." % to_execute)
# attach callbacks
if callback:
deferred.addCallback(callback, **callback_kwargs)
deferred.addErrback(errback, **errback_kwargs)

View file

@ -1,115 +0,0 @@
"""
Python ProcPool plugin
Evennia contribution - Griatch 2012
This is a plugin for the Evennia services. It will make the service
and run_async in python_procpool.py available to the system.
To activate, add the following line to your settings file:
SERVER_SERVICES_PLUGIN_MODULES.append("contrib.procpools.python_procpool_plugin")
Next reboot the server and the new service will be available.
It is not recommended to use this with an SQLite3 database, at least
if you plan to do many out-of-process database writes. SQLite3 does
not work very well with a high frequency of off-process writes due to
file locking clashes. Test what works with your mileage.
"""
import os
import sys
from django.conf import settings
# Process Pool setup
# convenient flag to turn off process pool without changing settings
PROCPOOL_ENABLED = True
# relay process stdout to log (debug mode, very spammy)
PROCPOOL_DEBUG = False
# max/min size of the process pool. Will expand up to max limit on demand.
PROCPOOL_MIN_NPROC = 5
PROCPOOL_MAX_NPROC = 20
# maximum time (seconds) a process may idle before being pruned from
# pool (if pool bigger than minsize)
PROCPOOL_IDLETIME = 20
# after sending a command, this is the maximum time in seconds the process
# may run without returning. After this time the process will be killed. This
# can be seen as a fallback; the run_async method takes a keyword proc_timeout
# that will override this value on a per-case basis.
PROCPOOL_TIMEOUT = 10
# only change if the port clashes with something else on the system
PROCPOOL_PORT = 5001
# 0.0.0.0 means listening to all interfaces
PROCPOOL_INTERFACE = '127.0.0.1'
# user-id and group-id to run the processes as (for OS:es supporting this).
# If you plan to run unsafe code one could experiment with setting this
# to an unprivileged user.
PROCPOOL_UID = None
PROCPOOL_GID = None
# real path to a directory where all processes will be run. If
# not given, processes will be executed in game/.
PROCPOOL_DIRECTORY = None
# don't need to change normally
SERVICE_NAME = "PythonProcPool"
# plugin hook
def start_plugin_services(server):
"""
This will be called by the Evennia Server when starting up.
server - the main Evennia server application
"""
if not PROCPOOL_ENABLED:
return
# terminal output
print ' amp (Process Pool): %s' % PROCPOOL_PORT
from evennia.contrib.procpools.ampoule import main as ampoule_main
from evennia.contrib.procpools.ampoule import service as ampoule_service
from evennia.contrib.procpools.ampoule import pool as ampoule_pool
from evennia.contrib.procpools.ampoule.main import BOOTSTRAP as _BOOTSTRAP
from evennia.contrib.procpools.python_procpool import PythonProcPoolChild
# for some reason absolute paths don't work here, only relative ones.
apackages = ("twisted",
"evennia",
"settings")
aenv = {"DJANGO_SETTINGS_MODULE": "settings",
"DATABASE_NAME": settings.DATABASES.get("default", {}).get("NAME") or settings.DATABASE_NAME}
if PROCPOOL_DEBUG:
_BOOTSTRAP = _BOOTSTRAP % "log.startLogging(sys.stderr)"
else:
_BOOTSTRAP = _BOOTSTRAP % ""
procpool_starter = ampoule_main.ProcessStarter(packages=apackages,
env=aenv,
path=PROCPOOL_DIRECTORY,
uid=PROCPOOL_UID,
gid=PROCPOOL_GID,
bootstrap=_BOOTSTRAP,
childReactor=sys.platform == 'linux2' and "epoll" or "default")
procpool = ampoule_pool.ProcessPool(name=SERVICE_NAME,
min=PROCPOOL_MIN_NPROC,
max=PROCPOOL_MAX_NPROC,
recycleAfter=500,
timeout=PROCPOOL_TIMEOUT,
maxIdle=PROCPOOL_IDLETIME,
ampChild=PythonProcPoolChild,
starter=procpool_starter)
procpool_service = ampoule_service.AMPouleService(procpool,
PythonProcPoolChild,
PROCPOOL_PORT,
PROCPOOL_INTERFACE)
procpool_service.setName(SERVICE_NAME)
# add the new services to the server
server.services.addService(procpool_service)