mirror of
https://github.com/evennia/evennia.git
synced 2026-03-27 02:06:32 +01:00
Shifting ProcPool out of src and into a contrib, using the service plugin system.
This commit is contained in:
parent
f677902811
commit
93d95377ce
23 changed files with 363 additions and 390 deletions
|
|
@ -1,23 +0,0 @@
|
|||
Copyright (c) 2008
|
||||
Valentino Volonghi
|
||||
Matthew Lefkowitz
|
||||
Copyright (c) 2009 Canonical Ltd.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files (the
|
||||
"Software"), to deal in the Software without restriction, including
|
||||
without limitation the rights to use, copy, modify, merge, publish,
|
||||
distribute, sublicense, and/or sell copies of the Software, and to
|
||||
permit persons to whom the Software is furnished to do so, subject to
|
||||
the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be
|
||||
included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
||||
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
@ -1,20 +0,0 @@
|
|||
|
||||
AMPOULE
|
||||
-------
|
||||
|
||||
https://launchpad.net/ampoule
|
||||
|
||||
AMPoule is a process management system using Twisted spawnProcess
|
||||
functionality. It uses AMP to pipe messages to a process Pool that it
|
||||
manages. The service is called ProcPool in Evennia settings.
|
||||
|
||||
AMPoule's very good, but unfortunately the source is very poorly
|
||||
documented. Hence the source in this directory does not comform to
|
||||
Evennia's normally rigid standards - for now we try to edit it as
|
||||
little as possible so as to make it easy to apply upstream updates
|
||||
down the line.
|
||||
|
||||
Changes made by Evennia are minor - it's mainly limiting spam to the
|
||||
log and an added ability to turn this on/off through settings. Most
|
||||
Evennia related code are found in src/server/procpool.py and
|
||||
src/server/server.py.
|
||||
|
|
@ -1,4 +0,0 @@
|
|||
from pool import deferToAMPProcess, pp
|
||||
from commands import Shutdown, Ping, Echo
|
||||
from child import AMPChild
|
||||
__version__ = "0.2.1"
|
||||
|
|
@ -1,53 +0,0 @@
|
|||
from twisted.python import log
|
||||
from twisted.internet import error
|
||||
from twisted.protocols import amp
|
||||
from src.utils.ampoule.commands import Echo, Shutdown, Ping
|
||||
|
||||
class AMPChild(amp.AMP):
|
||||
def __init__(self):
|
||||
super(AMPChild, self).__init__(self)
|
||||
self.shutdown = False
|
||||
|
||||
def connectionLost(self, reason):
|
||||
amp.AMP.connectionLost(self, reason)
|
||||
from twisted.internet import reactor
|
||||
try:
|
||||
reactor.stop()
|
||||
except error.ReactorNotRunning:
|
||||
# woa, this means that something bad happened,
|
||||
# most probably we received a SIGINT. Now this is only
|
||||
# a problem when you use Ctrl+C to stop the main process
|
||||
# because it would send the SIGINT to child processes too.
|
||||
# In all other cases receiving a SIGINT here would be an
|
||||
# error condition and correctly restarted. maybe we should
|
||||
# use sigprocmask?
|
||||
pass
|
||||
if not self.shutdown:
|
||||
# if the shutdown wasn't explicit we presume that it's an
|
||||
# error condition and thus we return a -1 error returncode.
|
||||
import os
|
||||
os._exit(-1)
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
This method is needed to shutdown the child gently without
|
||||
generating an exception.
|
||||
"""
|
||||
#log.msg("Shutdown message received, goodbye.")
|
||||
self.shutdown = True
|
||||
return {}
|
||||
Shutdown.responder(shutdown)
|
||||
|
||||
def ping(self):
|
||||
"""
|
||||
Ping the child and return an answer
|
||||
"""
|
||||
return {'response': "pong"}
|
||||
Ping.responder(ping)
|
||||
|
||||
def echo(self, data):
|
||||
"""
|
||||
Echo some data through the child.
|
||||
"""
|
||||
return {'response': data}
|
||||
Echo.responder(echo)
|
||||
|
|
@ -1,11 +0,0 @@
|
|||
from twisted.protocols import amp
|
||||
|
||||
class Shutdown(amp.Command):
|
||||
responseType = amp.QuitBox
|
||||
|
||||
class Ping(amp.Command):
|
||||
response = [('response', amp.String())]
|
||||
|
||||
class Echo(amp.Command):
|
||||
arguments = [('data', amp.String())]
|
||||
response = [('response', amp.String())]
|
||||
|
|
@ -1,24 +0,0 @@
|
|||
from zope.interface import Interface
|
||||
|
||||
class IStarter(Interface):
|
||||
def startAMPProcess(ampChild, ampParent=None):
|
||||
"""
|
||||
@param ampChild: The AMP protocol spoken by the created child.
|
||||
@type ampChild: L{twisted.protocols.amp.AMP}
|
||||
|
||||
@param ampParent: The AMP protocol spoken by the parent.
|
||||
@type ampParent: L{twisted.protocols.amp.AMP}
|
||||
"""
|
||||
|
||||
def startPythonProcess(prot, *args):
|
||||
"""
|
||||
@param prot: a L{protocol.ProcessProtocol} subclass
|
||||
@type prot: L{protocol.ProcessProtocol}
|
||||
|
||||
@param args: a tuple of arguments that will be passed to the
|
||||
child process.
|
||||
|
||||
@return: a tuple of the child process and the deferred finished.
|
||||
finished triggers when the subprocess dies for any reason.
|
||||
"""
|
||||
|
||||
|
|
@ -1,301 +0,0 @@
|
|||
import os
|
||||
import sys
|
||||
import imp
|
||||
import itertools
|
||||
|
||||
from zope.interface import implements
|
||||
|
||||
from twisted.internet import reactor, protocol, defer, error
|
||||
from twisted.python import log, util, reflect
|
||||
from twisted.protocols import amp
|
||||
from twisted.python import runtime
|
||||
from twisted.python.compat import set
|
||||
|
||||
from src.utils.ampoule import iampoule
|
||||
|
||||
gen = itertools.count()
|
||||
|
||||
if runtime.platform.isWindows():
|
||||
IS_WINDOWS = True
|
||||
TO_CHILD = 0
|
||||
FROM_CHILD = 1
|
||||
else:
|
||||
IS_WINDOWS = False
|
||||
TO_CHILD = 3
|
||||
FROM_CHILD = 4
|
||||
|
||||
class AMPConnector(protocol.ProcessProtocol):
|
||||
"""
|
||||
A L{ProcessProtocol} subclass that can understand and speak AMP.
|
||||
|
||||
@ivar amp: the children AMP process
|
||||
@type amp: L{amp.AMP}
|
||||
|
||||
@ivar finished: a deferred triggered when the process dies.
|
||||
@type finished: L{defer.Deferred}
|
||||
|
||||
@ivar name: Unique name for the connector, much like a pid.
|
||||
@type name: int
|
||||
"""
|
||||
|
||||
def __init__(self, proto, name=None):
|
||||
"""
|
||||
@param proto: An instance or subclass of L{amp.AMP}
|
||||
@type proto: L{amp.AMP}
|
||||
|
||||
@param name: optional name of the subprocess.
|
||||
@type name: int
|
||||
"""
|
||||
self.finished = defer.Deferred()
|
||||
self.amp = proto
|
||||
self.name = name
|
||||
if name is None:
|
||||
self.name = gen.next()
|
||||
|
||||
def signalProcess(self, signalID):
|
||||
"""
|
||||
Send the signal signalID to the child process
|
||||
|
||||
@param signalID: The signal ID that you want to send to the
|
||||
corresponding child
|
||||
@type signalID: C{str} or C{int}
|
||||
"""
|
||||
return self.transport.signalProcess(signalID)
|
||||
|
||||
def connectionMade(self):
|
||||
#log.msg("Subprocess %s started." % (self.name,))
|
||||
self.amp.makeConnection(self)
|
||||
|
||||
# Transport
|
||||
disconnecting = False
|
||||
|
||||
def write(self, data):
|
||||
if IS_WINDOWS:
|
||||
self.transport.write(data)
|
||||
else:
|
||||
self.transport.writeToChild(TO_CHILD, data)
|
||||
|
||||
def loseConnection(self):
|
||||
self.transport.closeChildFD(TO_CHILD)
|
||||
self.transport.closeChildFD(FROM_CHILD)
|
||||
self.transport.loseConnection()
|
||||
|
||||
def getPeer(self):
|
||||
return ('subprocess %i' % self.name,)
|
||||
|
||||
def getHost(self):
|
||||
return ('Evennia Server',)
|
||||
|
||||
def childDataReceived(self, childFD, data):
|
||||
if childFD == FROM_CHILD:
|
||||
self.amp.dataReceived(data)
|
||||
return
|
||||
self.errReceived(data)
|
||||
|
||||
def errReceived(self, data):
|
||||
for line in data.strip().splitlines():
|
||||
log.msg("FROM %s: %s" % (self.name, line))
|
||||
|
||||
def processEnded(self, status):
|
||||
#log.msg("Process: %s ended" % (self.name,))
|
||||
self.amp.connectionLost(status)
|
||||
if status.check(error.ProcessDone):
|
||||
self.finished.callback('')
|
||||
return
|
||||
self.finished.errback(status)
|
||||
|
||||
BOOTSTRAP = """\
|
||||
import sys
|
||||
|
||||
def main(reactor, ampChildPath):
|
||||
from twisted.application import reactors
|
||||
reactors.installReactor(reactor)
|
||||
|
||||
from twisted.python import log
|
||||
%s
|
||||
|
||||
from twisted.internet import reactor, stdio
|
||||
from twisted.python import reflect, runtime
|
||||
|
||||
ampChild = reflect.namedAny(ampChildPath)
|
||||
ampChildInstance = ampChild(*sys.argv[1:-2])
|
||||
if runtime.platform.isWindows():
|
||||
stdio.StandardIO(ampChildInstance)
|
||||
else:
|
||||
stdio.StandardIO(ampChildInstance, %s, %s)
|
||||
enter = getattr(ampChildInstance, '__enter__', None)
|
||||
if enter is not None:
|
||||
enter()
|
||||
try:
|
||||
reactor.run()
|
||||
except:
|
||||
if enter is not None:
|
||||
info = sys.exc_info()
|
||||
if not ampChildInstance.__exit__(*info):
|
||||
raise
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
if enter is not None:
|
||||
ampChildInstance.__exit__(None, None, None)
|
||||
|
||||
main(sys.argv[-2], sys.argv[-1])
|
||||
""" % ('%s', TO_CHILD, FROM_CHILD)
|
||||
|
||||
# in the first spot above, either insert an empty string or
|
||||
# 'log.startLogging(sys.stderr)'
|
||||
# to start logging
|
||||
|
||||
class ProcessStarter(object):
|
||||
|
||||
implements(iampoule.IStarter)
|
||||
|
||||
connectorFactory = AMPConnector
|
||||
def __init__(self, bootstrap=BOOTSTRAP, args=(), env={},
|
||||
path=None, uid=None, gid=None, usePTY=0,
|
||||
packages=(), childReactor="select"):
|
||||
"""
|
||||
@param bootstrap: Startup code for the child process
|
||||
@type bootstrap: C{str}
|
||||
|
||||
@param args: Arguments that should be supplied to every child
|
||||
created.
|
||||
@type args: C{tuple} of C{str}
|
||||
|
||||
@param env: Environment variables that should be present in the
|
||||
child environment
|
||||
@type env: C{dict}
|
||||
|
||||
@param path: Path in which to run the child
|
||||
@type path: C{str}
|
||||
|
||||
@param uid: if defined, the uid used to run the new process.
|
||||
@type uid: C{int}
|
||||
|
||||
@param gid: if defined, the gid used to run the new process.
|
||||
@type gid: C{int}
|
||||
|
||||
@param usePTY: Should the child processes use PTY processes
|
||||
@type usePTY: 0 or 1
|
||||
|
||||
@param packages: A tuple of packages that should be guaranteed
|
||||
to be importable in the child processes
|
||||
@type packages: C{tuple} of C{str}
|
||||
|
||||
@param childReactor: a string that sets the reactor for child
|
||||
processes
|
||||
@type childReactor: C{str}
|
||||
"""
|
||||
self.bootstrap = bootstrap
|
||||
self.args = args
|
||||
self.env = env
|
||||
self.path = path
|
||||
self.uid = uid
|
||||
self.gid = gid
|
||||
self.usePTY = usePTY
|
||||
self.packages = ("ampoule",) + packages
|
||||
self.packages = packages
|
||||
self.childReactor = childReactor
|
||||
|
||||
def __repr__(self):
|
||||
"""
|
||||
Represent the ProcessStarter with a string.
|
||||
"""
|
||||
return """ProcessStarter(bootstrap=%r,
|
||||
args=%r,
|
||||
env=%r,
|
||||
path=%r,
|
||||
uid=%r,
|
||||
gid=%r,
|
||||
usePTY=%r,
|
||||
packages=%r,
|
||||
childReactor=%r)""" % (self.bootstrap,
|
||||
self.args,
|
||||
self.env,
|
||||
self.path,
|
||||
self.uid,
|
||||
self.gid,
|
||||
self.usePTY,
|
||||
self.packages,
|
||||
self.childReactor)
|
||||
|
||||
def _checkRoundTrip(self, obj):
|
||||
"""
|
||||
Make sure that an object will properly round-trip through 'qual' and
|
||||
'namedAny'.
|
||||
|
||||
Raise a L{RuntimeError} if they aren't.
|
||||
"""
|
||||
tripped = reflect.namedAny(reflect.qual(obj))
|
||||
if tripped is not obj:
|
||||
raise RuntimeError("importing %r is not the same as %r" %
|
||||
(reflect.qual(obj), obj))
|
||||
|
||||
def startAMPProcess(self, ampChild, ampParent=None, ampChildArgs=()):
|
||||
"""
|
||||
@param ampChild: a L{ampoule.child.AMPChild} subclass.
|
||||
@type ampChild: L{ampoule.child.AMPChild}
|
||||
|
||||
@param ampParent: an L{amp.AMP} subclass that implements the parent
|
||||
protocol for this process pool
|
||||
@type ampParent: L{amp.AMP}
|
||||
"""
|
||||
self._checkRoundTrip(ampChild)
|
||||
fullPath = reflect.qual(ampChild)
|
||||
if ampParent is None:
|
||||
ampParent = amp.AMP
|
||||
prot = self.connectorFactory(ampParent())
|
||||
args = ampChildArgs + (self.childReactor, fullPath)
|
||||
return self.startPythonProcess(prot, *args)
|
||||
|
||||
|
||||
def startPythonProcess(self, prot, *args):
|
||||
"""
|
||||
@param prot: a L{protocol.ProcessProtocol} subclass
|
||||
@type prot: L{protocol.ProcessProtocol}
|
||||
|
||||
@param args: a tuple of arguments that will be added after the
|
||||
ones in L{self.args} to start the child process.
|
||||
|
||||
@return: a tuple of the child process and the deferred finished.
|
||||
finished triggers when the subprocess dies for any reason.
|
||||
"""
|
||||
spawnProcess(prot, self.bootstrap, self.args+args, env=self.env,
|
||||
path=self.path, uid=self.uid, gid=self.gid,
|
||||
usePTY=self.usePTY, packages=self.packages)
|
||||
|
||||
# XXX: we could wait for startup here, but ... is there really any
|
||||
# reason to? the pipe should be ready for writing. The subprocess
|
||||
# might not start up properly, but then, a subprocess might shut down
|
||||
# at any point too. So we just return amp and have this piece to be
|
||||
# synchronous.
|
||||
return prot.amp, prot.finished
|
||||
|
||||
def spawnProcess(processProtocol, bootstrap, args=(), env={},
|
||||
path=None, uid=None, gid=None, usePTY=0,
|
||||
packages=()):
|
||||
env = env.copy()
|
||||
|
||||
pythonpath = []
|
||||
for pkg in packages:
|
||||
p = os.path.split(imp.find_module(pkg)[1])[0]
|
||||
if p.startswith(os.path.join(sys.prefix, 'lib')):
|
||||
continue
|
||||
pythonpath.append(p)
|
||||
pythonpath = list(set(pythonpath))
|
||||
pythonpath.extend(env.get('PYTHONPATH', '').split(os.pathsep))
|
||||
env['PYTHONPATH'] = os.pathsep.join(pythonpath)
|
||||
args = (sys.executable, '-c', bootstrap) + args
|
||||
# childFDs variable is needed because sometimes child processes
|
||||
# misbehave and use stdout to output stuff that should really go
|
||||
# to stderr. Of course child process might even use the wrong FDs
|
||||
# that I'm using here, 3 and 4, so we are going to fix all these
|
||||
# issues when I add support for the configuration object that can
|
||||
# fix this stuff in a more configurable way.
|
||||
if IS_WINDOWS:
|
||||
return reactor.spawnProcess(processProtocol, sys.executable, args,
|
||||
env, path, uid, gid, usePTY)
|
||||
else:
|
||||
return reactor.spawnProcess(processProtocol, sys.executable, args,
|
||||
env, path, uid, gid, usePTY,
|
||||
childFDs={0:"w", 1:"r", 2:"r", 3:"w", 4:"r"})
|
||||
|
|
@ -1,414 +0,0 @@
|
|||
import time
|
||||
import random
|
||||
import heapq
|
||||
import itertools
|
||||
import signal
|
||||
choice = random.choice
|
||||
now = time.time
|
||||
count = itertools.count().next
|
||||
pop = heapq.heappop
|
||||
|
||||
from twisted.internet import defer, task, error
|
||||
from twisted.python import log, failure
|
||||
|
||||
from src.utils.ampoule import commands, main
|
||||
|
||||
try:
|
||||
DIE = signal.SIGKILL
|
||||
except AttributeError:
|
||||
# Windows doesn't have SIGKILL, let's just use SIGTERM then
|
||||
DIE = signal.SIGTERM
|
||||
|
||||
class ProcessPool(object):
|
||||
"""
|
||||
This class generalizes the functionality of a pool of
|
||||
processes to which work can be dispatched.
|
||||
|
||||
@ivar finished: Boolean flag, L{True} when the pool is finished.
|
||||
|
||||
@ivar started: Boolean flag, L{True} when the pool is started.
|
||||
|
||||
@ivar name: Optional name for the process pool
|
||||
|
||||
@ivar min: Minimum number of subprocesses to set up
|
||||
|
||||
@ivar max: Maximum number of subprocesses to set up
|
||||
|
||||
@ivar maxIdle: Maximum number of seconds of indleness in a child
|
||||
|
||||
@ivar starter: A process starter instance that provides
|
||||
L{iampoule.IStarter}.
|
||||
|
||||
@ivar recycleAfter: Maximum number of calls before restarting a
|
||||
subprocess, 0 to not recycle.
|
||||
|
||||
@ivar ampChild: The child AMP protocol subclass with the commands
|
||||
that the child should implement.
|
||||
|
||||
@ivar ampParent: The parent AMP protocol subclass with the commands
|
||||
that the parent should implement.
|
||||
|
||||
@ivar timeout: The general timeout (in seconds) for every child
|
||||
process call.
|
||||
"""
|
||||
|
||||
finished = False
|
||||
started = False
|
||||
name = None
|
||||
|
||||
def __init__(self, ampChild=None, ampParent=None, min=5, max=20,
|
||||
name=None, maxIdle=20, recycleAfter=500, starter=None,
|
||||
timeout=None, timeout_signal=DIE, ampChildArgs=()):
|
||||
self.starter = starter
|
||||
self.ampChildArgs = tuple(ampChildArgs)
|
||||
if starter is None:
|
||||
self.starter = main.ProcessStarter(packages=("twisted", "ampoule"))
|
||||
self.ampParent = ampParent
|
||||
self.ampChild = ampChild
|
||||
if ampChild is None:
|
||||
from src.utils.ampoule.child import AMPChild
|
||||
self.ampChild = AMPChild
|
||||
self.min = min
|
||||
self.max = max
|
||||
self.name = name
|
||||
self.maxIdle = maxIdle
|
||||
self.recycleAfter = recycleAfter
|
||||
self.timeout = timeout
|
||||
self.timeout_signal = timeout_signal
|
||||
self._queue = []
|
||||
|
||||
self.processes = set()
|
||||
self.ready = set()
|
||||
self.busy = set()
|
||||
self._finishCallbacks = {}
|
||||
self._lastUsage = {}
|
||||
self._calls = {}
|
||||
self.looping = task.LoopingCall(self._pruneProcesses)
|
||||
self.looping.start(maxIdle, now=False)
|
||||
|
||||
def start(self, ampChild=None):
|
||||
"""
|
||||
Starts the ProcessPool with a given child protocol.
|
||||
|
||||
@param ampChild: a L{ampoule.child.AMPChild} subclass.
|
||||
@type ampChild: L{ampoule.child.AMPChild} subclass
|
||||
"""
|
||||
if ampChild is not None and not self.started:
|
||||
self.ampChild = ampChild
|
||||
self.finished = False
|
||||
self.started = True
|
||||
return self.adjustPoolSize()
|
||||
|
||||
def _pruneProcesses(self):
|
||||
"""
|
||||
Remove idle processes from the pool.
|
||||
"""
|
||||
n = now()
|
||||
d = []
|
||||
for child, lastUse in self._lastUsage.iteritems():
|
||||
if len(self.processes) > self.min and (n - lastUse) > self.maxIdle:
|
||||
# we are setting lastUse when processing finishes, it
|
||||
# might be processing right now
|
||||
if child not in self.busy:
|
||||
# we need to remove this child from the ready set
|
||||
# and the processes set because otherwise it might
|
||||
# get calls from doWork
|
||||
self.ready.discard(child)
|
||||
self.processes.discard(child)
|
||||
d.append(self.stopAWorker(child))
|
||||
return defer.DeferredList(d)
|
||||
|
||||
def _pruneProcess(self, child):
|
||||
"""
|
||||
Remove every trace of the process from this instance.
|
||||
"""
|
||||
self.processes.discard(child)
|
||||
self.ready.discard(child)
|
||||
self.busy.discard(child)
|
||||
self._lastUsage.pop(child, None)
|
||||
self._calls.pop(child, None)
|
||||
self._finishCallbacks.pop(child, None)
|
||||
|
||||
def _addProcess(self, child, finished):
|
||||
"""
|
||||
Adds the newly created child process to the pool.
|
||||
"""
|
||||
def restart(child, reason):
|
||||
#log.msg("FATAL: Restarting after %s" % (reason,))
|
||||
self._pruneProcess(child)
|
||||
return self.startAWorker()
|
||||
|
||||
def dieGently(data, child):
|
||||
#log.msg("STOPPING: '%s'" % (data,))
|
||||
self._pruneProcess(child)
|
||||
|
||||
self.processes.add(child)
|
||||
self.ready.add(child)
|
||||
finished.addCallback(dieGently, child
|
||||
).addErrback(lambda reason: restart(child, reason))
|
||||
self._finishCallbacks[child] = finished
|
||||
self._lastUsage[child] = now()
|
||||
self._calls[child] = 0
|
||||
self._catchUp()
|
||||
|
||||
def _catchUp(self):
|
||||
"""
|
||||
If there are queued items in the list then run them.
|
||||
"""
|
||||
if self._queue:
|
||||
_, (d, command, kwargs) = pop(self._queue)
|
||||
self._cb_doWork(command, **kwargs).chainDeferred(d)
|
||||
|
||||
def _handleTimeout(self, child):
|
||||
"""
|
||||
One of the children went timeout, we need to deal with it
|
||||
|
||||
@param child: The child process
|
||||
@type child: L{child.AMPChild}
|
||||
"""
|
||||
try:
|
||||
child.transport.signalProcess(self.timeout_signal)
|
||||
except error.ProcessExitedAlready:
|
||||
# don't do anything then... we are too late
|
||||
# or we were too early to call
|
||||
pass
|
||||
|
||||
def startAWorker(self):
|
||||
"""
|
||||
Start a worker and set it up in the system.
|
||||
"""
|
||||
if self.finished:
|
||||
# this is a race condition: basically if we call self.stop()
|
||||
# while a process is being recycled what happens is that the
|
||||
# process will be created anyway. By putting a check for
|
||||
# self.finished here we make sure that in no way we are creating
|
||||
# processes when the pool is stopped.
|
||||
# The race condition comes from the fact that:
|
||||
# stopAWorker() is asynchronous while stop() is synchronous.
|
||||
# so if you call:
|
||||
# pp.stopAWorker(child).addCallback(lambda _: pp.startAWorker())
|
||||
# pp.stop()
|
||||
# You might end up with a dirty reactor due to the stop()
|
||||
# returning before the new process is created.
|
||||
return
|
||||
startAMPProcess = self.starter.startAMPProcess
|
||||
child, finished = startAMPProcess(self.ampChild,
|
||||
ampParent=self.ampParent,
|
||||
ampChildArgs=self.ampChildArgs)
|
||||
return self._addProcess(child, finished)
|
||||
|
||||
def _cb_doWork(self, command, _timeout=None, _deadline=None,
|
||||
**kwargs):
|
||||
"""
|
||||
Go and call the command.
|
||||
|
||||
@param command: The L{amp.Command} to be executed in the child
|
||||
@type command: L{amp.Command}
|
||||
|
||||
@param _d: The deferred for the calling code.
|
||||
@type _d: L{defer.Deferred}
|
||||
|
||||
@param _timeout: The timeout for this call only
|
||||
@type _timeout: C{int}
|
||||
@param _deadline: The deadline for this call only
|
||||
@type _deadline: C{int}
|
||||
"""
|
||||
timeoutCall = None
|
||||
deadlineCall = None
|
||||
|
||||
def _returned(result, child, is_error=False):
|
||||
def cancelCall(call):
|
||||
if call is not None and call.active():
|
||||
call.cancel()
|
||||
cancelCall(timeoutCall)
|
||||
cancelCall(deadlineCall)
|
||||
self.busy.discard(child)
|
||||
if not die:
|
||||
# we are not marked to be removed, so add us back to
|
||||
# the ready set and let's see if there's some catching
|
||||
# up to do
|
||||
self.ready.add(child)
|
||||
self._catchUp()
|
||||
else:
|
||||
# We should die and we do, then we start a new worker
|
||||
# to pick up stuff from the queue otherwise we end up
|
||||
# without workers and the queue will remain there.
|
||||
self.stopAWorker(child).addCallback(lambda _: self.startAWorker())
|
||||
self._lastUsage[child] = now()
|
||||
# we can't do recycling here because it's too late and
|
||||
# the process might have received tons of calls already
|
||||
# which would make it run more calls than what is
|
||||
# configured to do.
|
||||
return result
|
||||
|
||||
die = False
|
||||
child = self.ready.pop()
|
||||
self.busy.add(child)
|
||||
self._calls[child] += 1
|
||||
|
||||
# Let's see if this call goes over the recycling barrier
|
||||
if self.recycleAfter and self._calls[child] >= self.recycleAfter:
|
||||
# it does so mark this child, using a closure, to be
|
||||
# removed at the end of the call.
|
||||
die = True
|
||||
|
||||
# If the command doesn't require a response then callRemote
|
||||
# returns nothing, so we prepare for that too.
|
||||
# We also need to guard against timeout errors for child
|
||||
# and local timeout parameter overrides the global one
|
||||
if _timeout == 0:
|
||||
timeout = _timeout
|
||||
else:
|
||||
timeout = _timeout or self.timeout
|
||||
|
||||
if timeout is not None:
|
||||
from twisted.internet import reactor
|
||||
timeoutCall = reactor.callLater(timeout, self._handleTimeout, child)
|
||||
|
||||
if _deadline is not None:
|
||||
from twisted.internet import reactor
|
||||
delay = max(0, _deadline - reactor.seconds())
|
||||
deadlineCall = reactor.callLater(delay, self._handleTimeout,
|
||||
child)
|
||||
|
||||
return defer.maybeDeferred(child.callRemote, command, **kwargs
|
||||
).addCallback(_returned, child
|
||||
).addErrback(_returned, child, is_error=True)
|
||||
|
||||
def callRemote(self, *args, **kwargs):
|
||||
"""
|
||||
Proxy call to keep the API homogeneous across twisted's RPCs
|
||||
"""
|
||||
return self.doWork(*args, **kwargs)
|
||||
|
||||
def doWork(self, command, **kwargs):
|
||||
"""
|
||||
Sends the command to one child.
|
||||
|
||||
@param command: an L{amp.Command} type object.
|
||||
@type command: L{amp.Command}
|
||||
|
||||
@param kwargs: dictionary containing the arguments for the command.
|
||||
"""
|
||||
if self.ready: # there are unused processes, let's use them
|
||||
return self._cb_doWork(command, **kwargs)
|
||||
else:
|
||||
if len(self.processes) < self.max:
|
||||
# no unused but we can start some new ones
|
||||
# since startAWorker is synchronous we won't have a
|
||||
# race condition here in case of multiple calls to
|
||||
# doWork, so we will end up in the else clause in case
|
||||
# of such calls:
|
||||
# Process pool with min=1, max=1, recycle_after=1
|
||||
# [call(Command) for x in xrange(BIG_NUMBER)]
|
||||
self.startAWorker()
|
||||
return self._cb_doWork(command, **kwargs)
|
||||
else:
|
||||
# No one is free... just queue up and wait for a process
|
||||
# to start and pick up the first item in the queue.
|
||||
d = defer.Deferred()
|
||||
self._queue.append((count(), (d, command, kwargs)))
|
||||
return d
|
||||
|
||||
def stopAWorker(self, child=None):
|
||||
"""
|
||||
Gently stop a child so that it's not restarted anymore
|
||||
|
||||
@param command: an L{ampoule.child.AmpChild} type object.
|
||||
@type command: L{ampoule.child.AmpChild} or None
|
||||
|
||||
"""
|
||||
if child is None:
|
||||
if self.ready:
|
||||
child = self.ready.pop()
|
||||
else:
|
||||
child = choice(list(self.processes))
|
||||
child.callRemote(commands.Shutdown
|
||||
# This is needed for timeout handling, the reason is pretty hard
|
||||
# to explain but I'll try to:
|
||||
# There's another small race condition in the system. If the
|
||||
# child process is shut down by a signal and you try to stop
|
||||
# the process pool immediately afterwards, like tests would do,
|
||||
# the child AMP object would still be in the system and trying
|
||||
# to call the command Shutdown on it would result in the same
|
||||
# errback that we got originally, for this reason we need to
|
||||
# trap it now so that it doesn't raise by not being handled.
|
||||
# Does this even make sense to you?
|
||||
).addErrback(lambda reason: reason.trap(error.ProcessTerminated))
|
||||
return self._finishCallbacks[child]
|
||||
|
||||
def _startSomeWorkers(self):
|
||||
"""
|
||||
Start a bunch of workers until we reach the max number of them.
|
||||
"""
|
||||
if len(self.processes) < self.max:
|
||||
self.startAWorker()
|
||||
|
||||
def adjustPoolSize(self, min=None, max=None):
|
||||
"""
|
||||
Change the pool size to be at least min and less than max,
|
||||
useful when you change the values of max and min in the instance
|
||||
and you want the pool to adapt to them.
|
||||
"""
|
||||
if min is None:
|
||||
min = self.min
|
||||
if max is None:
|
||||
max = self.max
|
||||
|
||||
assert min >= 0, 'minimum is negative'
|
||||
assert min <= max, 'minimum is greater than maximum'
|
||||
|
||||
self.min = min
|
||||
self.max = max
|
||||
|
||||
l = []
|
||||
if self.started:
|
||||
|
||||
for i in xrange(len(self.processes)-self.max):
|
||||
l.append(self.stopAWorker())
|
||||
while len(self.processes) < self.min:
|
||||
self.startAWorker()
|
||||
|
||||
return defer.DeferredList(l)#.addCallback(lambda _: self.dumpStats())
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stops the process protocol.
|
||||
"""
|
||||
self.finished = True
|
||||
l = [self.stopAWorker(process) for process in self.processes]
|
||||
def _cb(_):
|
||||
if self.looping.running:
|
||||
self.looping.stop()
|
||||
|
||||
return defer.DeferredList(l).addCallback(_cb)
|
||||
|
||||
def dumpStats(self):
|
||||
log.msg("ProcessPool stats:")
|
||||
log.msg('\tworkers: %s' % len(self.processes))
|
||||
log.msg('\ttimeout: %s' % (self.timeout))
|
||||
log.msg('\tparent: %r' % (self.ampParent,))
|
||||
log.msg('\tchild: %r' % (self.ampChild,))
|
||||
log.msg('\tmax idle: %r' % (self.maxIdle,))
|
||||
log.msg('\trecycle after: %r' % (self.recycleAfter,))
|
||||
log.msg('\tProcessStarter:')
|
||||
log.msg('\t\t%r' % (self.starter,))
|
||||
|
||||
pp = None
|
||||
|
||||
def deferToAMPProcess(command, **kwargs):
|
||||
"""
|
||||
Helper function that sends a command to the default process pool
|
||||
and returns a deferred that fires when the result of the
|
||||
subprocess computation is ready.
|
||||
|
||||
@param command: an L{amp.Command} subclass
|
||||
@param kwargs: dictionary containing the arguments for the command.
|
||||
|
||||
@return: a L{defer.Deferred} with the data from the subprocess.
|
||||
"""
|
||||
global pp
|
||||
if pp is None:
|
||||
pp = ProcessPool()
|
||||
return pp.start().addCallback(lambda _: pp.doWork(command, **kwargs))
|
||||
return pp.doWork(command, **kwargs)
|
||||
|
|
@ -1,67 +0,0 @@
|
|||
"""
|
||||
This module implements a remote pool to use with AMP.
|
||||
"""
|
||||
from zope.interface import implements
|
||||
|
||||
from twisted.protocols import amp
|
||||
from twisted.internet import utils
|
||||
|
||||
class AMPProxy(amp.AMP):
|
||||
"""
|
||||
A Proxy AMP protocol that forwards calls to a wrapped
|
||||
callRemote-like callable.
|
||||
"""
|
||||
def __init__(self, wrapped, child):
|
||||
"""
|
||||
@param wrapped: A callRemote-like callable that takes an
|
||||
L{amp.Command} as first argument and other
|
||||
optional keyword arguments afterwards.
|
||||
@type wrapped: L{callable}.
|
||||
|
||||
@param child: The protocol class of the process pool children.
|
||||
Used to forward only the methods that are actually
|
||||
understood correctly by them.
|
||||
@type child: L{amp.AMP}
|
||||
"""
|
||||
amp.AMP.__init__(self)
|
||||
self.wrapped = wrapped
|
||||
self.child = child
|
||||
|
||||
localCd = set(self._commandDispatch.keys())
|
||||
childCd = set(self.child._commandDispatch.keys())
|
||||
assert localCd.intersection(childCd) == set(["StartTLS"]), \
|
||||
"Illegal method overriding in Proxy"
|
||||
|
||||
def locateResponder(self, name):
|
||||
"""
|
||||
This is a custom locator to forward calls to the children
|
||||
processes while keeping the ProcessPool a transparent MITM.
|
||||
|
||||
This way of working has a few limitations, the first of which
|
||||
is the fact that children won't be able to take advantage of
|
||||
any dynamic locator except for the default L{CommandLocator}
|
||||
that is based on the _commandDispatch attribute added by the
|
||||
metaclass. This limitation might be lifted in the future.
|
||||
"""
|
||||
if name == "StartTLS":
|
||||
# This is a special case where the proxy takes precedence
|
||||
return amp.AMP.locateResponder(self, "StartTLS")
|
||||
|
||||
# Get the dict of commands from the child AMP implementation.
|
||||
cd = self.child._commandDispatch
|
||||
if name in cd:
|
||||
# If the command is there, then we forward stuff to it.
|
||||
commandClass, _responderFunc = cd[name]
|
||||
# We need to wrap the doWork function because the wrapping
|
||||
# call doesn't pass the command as first argument since it
|
||||
# thinks that we are the actual receivers and callable is
|
||||
# already the responder while it isn't.
|
||||
doWork = lambda **kw: self.wrapped(commandClass, **kw)
|
||||
# Now let's call the right function and wrap the result
|
||||
# dictionary.
|
||||
return self._wrapWithSerialization(doWork, commandClass)
|
||||
# of course if the name of the command is not in the child it
|
||||
# means that it might be in this class, so fallback to the
|
||||
# default behavior of this module.
|
||||
return amp.AMP.locateResponder(self, name)
|
||||
|
||||
|
|
@ -1,69 +0,0 @@
|
|||
import os
|
||||
|
||||
from twisted.application import service
|
||||
from twisted.internet.protocol import ServerFactory
|
||||
|
||||
def makeService(options):
|
||||
"""
|
||||
Create the service for the application
|
||||
"""
|
||||
ms = service.MultiService()
|
||||
|
||||
from src.utils.ampoule.pool import ProcessPool
|
||||
from src.utils.ampoule.main import ProcessStarter
|
||||
name = options['name']
|
||||
ampport = options['ampport']
|
||||
ampinterface = options['ampinterface']
|
||||
child = options['child']
|
||||
parent = options['parent']
|
||||
min = options['min']
|
||||
max = options['max']
|
||||
maxIdle = options['max_idle']
|
||||
recycle = options['recycle']
|
||||
childReactor = options['reactor']
|
||||
timeout = options['timeout']
|
||||
|
||||
starter = ProcessStarter(packages=("twisted", "ampoule"), childReactor=childReactor)
|
||||
pp = ProcessPool(child, parent, min, max, name, maxIdle, recycle, starter, timeout)
|
||||
svc = AMPouleService(pp, child, ampport, ampinterface)
|
||||
svc.setServiceParent(ms)
|
||||
|
||||
return ms
|
||||
|
||||
class AMPouleService(service.Service):
|
||||
def __init__(self, pool, child, port, interface):
|
||||
self.pool = pool
|
||||
self.port = port
|
||||
self.child = child
|
||||
self.interface = interface
|
||||
self.server = None
|
||||
|
||||
def startService(self):
|
||||
"""
|
||||
Before reactor.run() is called we setup the system.
|
||||
"""
|
||||
service.Service.startService(self)
|
||||
from src.utils.ampoule import rpool
|
||||
from twisted.internet import reactor
|
||||
|
||||
try:
|
||||
factory = ServerFactory()
|
||||
factory.protocol = lambda: rpool.AMPProxy(wrapped=self.pool.doWork,
|
||||
child=self.child)
|
||||
self.server = reactor.listenTCP(self.port,
|
||||
factory,
|
||||
interface=self.interface)
|
||||
# this is synchronous when it's the startup, even though
|
||||
# it returns a deferred. But we need to run it after the
|
||||
# first cycle in order to wait for signal handlers to be
|
||||
# installed.
|
||||
reactor.callLater(0, self.pool.start)
|
||||
except:
|
||||
import traceback
|
||||
print traceback.format_exc()
|
||||
|
||||
def stopService(self):
|
||||
service.Service.stopService(self)
|
||||
if self.server is not None:
|
||||
self.server.stopListening()
|
||||
return self.pool.stop()
|
||||
|
|
@ -1,867 +0,0 @@
|
|||
|
||||
from signal import SIGHUP
|
||||
import math
|
||||
import os
|
||||
import os.path
|
||||
from cStringIO import StringIO as sio
|
||||
import tempfile
|
||||
|
||||
from twisted.internet import error, defer, reactor
|
||||
from twisted.python import failure, reflect
|
||||
from twisted.trial import unittest
|
||||
from twisted.protocols import amp
|
||||
from src.utils.ampoule import main, child, commands, pool
|
||||
|
||||
class ShouldntHaveBeenCalled(Exception):
|
||||
pass
|
||||
|
||||
def _raise(_):
|
||||
raise ShouldntHaveBeenCalled(_)
|
||||
|
||||
class _FakeT(object):
|
||||
closeStdinCalled = False
|
||||
def __init__(self, s):
|
||||
self.s = s
|
||||
|
||||
def closeStdin(self):
|
||||
self.closeStdinCalled = True
|
||||
|
||||
def write(self, data):
|
||||
self.s.write(data)
|
||||
|
||||
class FakeAMP(object):
|
||||
connector = None
|
||||
reason = None
|
||||
def __init__(self, s):
|
||||
self.s = s
|
||||
|
||||
def makeConnection(self, connector):
|
||||
if self.connector is not None:
|
||||
raise Exception("makeConnection called twice")
|
||||
self.connector = connector
|
||||
|
||||
def connectionLost(self, reason):
|
||||
if self.reason is not None:
|
||||
raise Exception("connectionLost called twice")
|
||||
self.reason = reason
|
||||
|
||||
def dataReceived(self, data):
|
||||
self.s.write(data)
|
||||
|
||||
class Ping(amp.Command):
|
||||
arguments = [('data', amp.String())]
|
||||
response = [('response', amp.String())]
|
||||
|
||||
class Pong(amp.Command):
|
||||
arguments = [('data', amp.String())]
|
||||
response = [('response', amp.String())]
|
||||
|
||||
class Pid(amp.Command):
|
||||
response = [('pid', amp.Integer())]
|
||||
|
||||
class Reactor(amp.Command):
|
||||
response = [('classname', amp.String())]
|
||||
|
||||
class NoResponse(amp.Command):
|
||||
arguments = [('arg', amp.String())]
|
||||
requiresAnswer = False
|
||||
|
||||
class GetResponse(amp.Command):
|
||||
response = [("response", amp.String())]
|
||||
|
||||
class Child(child.AMPChild):
|
||||
def ping(self, data):
|
||||
return self.callRemote(Pong, data=data)
|
||||
Ping.responder(ping)
|
||||
|
||||
class PidChild(child.AMPChild):
|
||||
def pid(self):
|
||||
import os
|
||||
return {'pid': os.getpid()}
|
||||
Pid.responder(pid)
|
||||
|
||||
class NoResponseChild(child.AMPChild):
|
||||
_set = False
|
||||
def noresponse(self, arg):
|
||||
self._set = arg
|
||||
return {}
|
||||
NoResponse.responder(noresponse)
|
||||
|
||||
def getresponse(self):
|
||||
return {"response": self._set}
|
||||
GetResponse.responder(getresponse)
|
||||
|
||||
class ReactorChild(child.AMPChild):
|
||||
def reactor(self):
|
||||
from twisted.internet import reactor
|
||||
return {'classname': reactor.__class__.__name__}
|
||||
Reactor.responder(reactor)
|
||||
|
||||
class First(amp.Command):
|
||||
arguments = [('data', amp.String())]
|
||||
response = [('response', amp.String())]
|
||||
|
||||
class Second(amp.Command):
|
||||
pass
|
||||
|
||||
class WaitingChild(child.AMPChild):
|
||||
deferred = None
|
||||
def first(self, data):
|
||||
self.deferred = defer.Deferred()
|
||||
return self.deferred.addCallback(lambda _: {'response': data})
|
||||
First.responder(first)
|
||||
def second(self):
|
||||
self.deferred.callback('')
|
||||
return {}
|
||||
Second.responder(second)
|
||||
|
||||
class Die(amp.Command):
|
||||
pass
|
||||
|
||||
class BadChild(child.AMPChild):
|
||||
def die(self):
|
||||
self.shutdown = False
|
||||
self.transport.loseConnection()
|
||||
return {}
|
||||
Die.responder(die)
|
||||
|
||||
|
||||
class Write(amp.Command):
|
||||
response = [("response", amp.String())]
|
||||
pass
|
||||
|
||||
|
||||
class Writer(child.AMPChild):
|
||||
|
||||
def __init__(self, data='hello'):
|
||||
child.AMPChild.__init__(self)
|
||||
self.data = data
|
||||
|
||||
def write(self):
|
||||
return {'response': self.data}
|
||||
Write.responder(write)
|
||||
|
||||
|
||||
class GetCWD(amp.Command):
|
||||
|
||||
response = [("cwd", amp.String())]
|
||||
|
||||
|
||||
class TempDirChild(child.AMPChild):
|
||||
|
||||
def __init__(self, directory=None):
|
||||
child.AMPChild.__init__(self)
|
||||
self.directory = directory
|
||||
|
||||
def __enter__(self):
|
||||
directory = tempfile.mkdtemp()
|
||||
os.chdir(directory)
|
||||
if self.directory is not None:
|
||||
os.mkdir(self.directory)
|
||||
os.chdir(self.directory)
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
cwd = os.getcwd()
|
||||
os.chdir('..')
|
||||
os.rmdir(cwd)
|
||||
|
||||
def getcwd(self):
|
||||
return {'cwd': os.getcwd()}
|
||||
GetCWD.responder(getcwd)
|
||||
|
||||
|
||||
class TestAMPConnector(unittest.TestCase):
|
||||
def setUp(self):
|
||||
"""
|
||||
The only reason why this method exists is to let 'trial ampoule'
|
||||
to install the signal handlers (#3178 for reference).
|
||||
"""
|
||||
super(TestAMPConnector, self).setUp()
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(0, d.callback, None)
|
||||
return d
|
||||
|
||||
def _makeConnector(self, s, sa):
|
||||
a = FakeAMP(sa)
|
||||
ac = main.AMPConnector(a)
|
||||
assert ac.name is not None
|
||||
ac.transport = _FakeT(s)
|
||||
return ac
|
||||
|
||||
def test_protocol(self):
|
||||
"""
|
||||
Test that outReceived writes to AMP and that it triggers the
|
||||
finished deferred once the process ended.
|
||||
"""
|
||||
s = sio()
|
||||
sa = sio()
|
||||
ac = self._makeConnector(s, sa)
|
||||
|
||||
for x in xrange(99):
|
||||
ac.childDataReceived(4, str(x))
|
||||
|
||||
ac.processEnded(failure.Failure(error.ProcessDone(0)))
|
||||
return ac.finished.addCallback(
|
||||
lambda _: self.assertEqual(sa.getvalue(), ''.join(str(x) for x in xrange(99)))
|
||||
)
|
||||
|
||||
def test_protocol_failing(self):
|
||||
"""
|
||||
Test that a failure in the process termination is correctly
|
||||
propagated to the finished deferred.
|
||||
"""
|
||||
s = sio()
|
||||
sa = sio()
|
||||
ac = self._makeConnector(s, sa)
|
||||
|
||||
ac.finished.addCallback(_raise)
|
||||
fail = failure.Failure(error.ProcessTerminated())
|
||||
self.assertFailure(ac.finished, error.ProcessTerminated)
|
||||
ac.processEnded(fail)
|
||||
|
||||
def test_startProcess(self):
|
||||
"""
|
||||
Test that startProcess actually starts a subprocess and that
|
||||
it receives data back from the process through AMP.
|
||||
"""
|
||||
s = sio()
|
||||
a = FakeAMP(s)
|
||||
STRING = "ciao"
|
||||
BOOT = """\
|
||||
import sys, os
|
||||
def main(arg):
|
||||
os.write(4, arg)
|
||||
main(sys.argv[1])
|
||||
"""
|
||||
starter = main.ProcessStarter(bootstrap=BOOT,
|
||||
args=(STRING,),
|
||||
packages=("twisted", "ampoule"))
|
||||
|
||||
amp, finished = starter.startPythonProcess(main.AMPConnector(a))
|
||||
def _eb(reason):
|
||||
print reason
|
||||
finished.addErrback(_eb)
|
||||
return finished.addCallback(lambda _: self.assertEquals(s.getvalue(), STRING))
|
||||
|
||||
def test_failing_deferToProcess(self):
|
||||
"""
|
||||
Test failing subprocesses and the way they terminate and preserve
|
||||
failing information.
|
||||
"""
|
||||
s = sio()
|
||||
a = FakeAMP(s)
|
||||
STRING = "ciao"
|
||||
BOOT = """\
|
||||
import sys
|
||||
def main(arg):
|
||||
raise Exception(arg)
|
||||
main(sys.argv[1])
|
||||
"""
|
||||
starter = main.ProcessStarter(bootstrap=BOOT, args=(STRING,), packages=("twisted", "ampoule"))
|
||||
ready, finished = starter.startPythonProcess(main.AMPConnector(a), "I'll be ignored")
|
||||
|
||||
self.assertFailure(finished, error.ProcessTerminated)
|
||||
finished.addErrback(lambda reason: self.assertEquals(reason.getMessage(), STRING))
|
||||
return finished
|
||||
|
||||
def test_env_setting(self):
|
||||
"""
|
||||
Test that and environment variable passed to the process starter
|
||||
is correctly passed to the child process.
|
||||
"""
|
||||
s = sio()
|
||||
a = FakeAMP(s)
|
||||
STRING = "ciao"
|
||||
BOOT = """\
|
||||
import sys, os
|
||||
def main():
|
||||
os.write(4, os.getenv("FOOBAR"))
|
||||
main()
|
||||
"""
|
||||
starter = main.ProcessStarter(bootstrap=BOOT,
|
||||
packages=("twisted", "ampoule"),
|
||||
env={"FOOBAR": STRING})
|
||||
amp, finished = starter.startPythonProcess(main.AMPConnector(a), "I'll be ignored")
|
||||
def _eb(reason):
|
||||
print reason
|
||||
finished.addErrback(_eb)
|
||||
return finished.addCallback(lambda _: self.assertEquals(s.getvalue(), STRING))
|
||||
|
||||
def test_startAMPProcess(self):
|
||||
"""
|
||||
Test that you can start an AMP subprocess and that it correctly
|
||||
accepts commands and correctly answers them.
|
||||
"""
|
||||
STRING = "ciao"
|
||||
|
||||
starter = main.ProcessStarter(packages=("twisted", "ampoule"))
|
||||
c, finished = starter.startAMPProcess(child.AMPChild)
|
||||
c.callRemote(commands.Echo, data=STRING
|
||||
).addCallback(lambda response:
|
||||
self.assertEquals(response['response'], STRING)
|
||||
).addCallback(lambda _: c.callRemote(commands.Shutdown))
|
||||
return finished
|
||||
|
||||
def test_BootstrapContext(self):
|
||||
starter = main.ProcessStarter(packages=('twisted', 'ampoule'))
|
||||
c, finished = starter.startAMPProcess(TempDirChild)
|
||||
cwd = []
|
||||
def checkBootstrap(response):
|
||||
cwd.append(response['cwd'])
|
||||
self.assertNotEquals(cwd, os.getcwd())
|
||||
d = c.callRemote(GetCWD)
|
||||
d.addCallback(checkBootstrap)
|
||||
d.addCallback(lambda _: c.callRemote(commands.Shutdown))
|
||||
finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0])))
|
||||
return finished
|
||||
|
||||
def test_BootstrapContextInstance(self):
|
||||
starter = main.ProcessStarter(packages=('twisted', 'ampoule'))
|
||||
c, finished = starter.startAMPProcess(TempDirChild,
|
||||
ampChildArgs=('foo',))
|
||||
cwd = []
|
||||
def checkBootstrap(response):
|
||||
cwd.append(response['cwd'])
|
||||
self.assertTrue(cwd[0].endswith('/foo'))
|
||||
d = c.callRemote(GetCWD)
|
||||
d.addCallback(checkBootstrap)
|
||||
d.addCallback(lambda _: c.callRemote(commands.Shutdown))
|
||||
finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0])))
|
||||
return finished
|
||||
|
||||
def test_startAMPAndParentProtocol(self):
|
||||
"""
|
||||
Test that you can start an AMP subprocess and the children can
|
||||
call methods on their parent.
|
||||
"""
|
||||
DATA = "CIAO"
|
||||
APPEND = "123"
|
||||
|
||||
class Parent(amp.AMP):
|
||||
def pong(self, data):
|
||||
return {'response': DATA+APPEND}
|
||||
Pong.responder(pong)
|
||||
|
||||
starter = main.ProcessStarter(packages=("twisted", "ampoule"))
|
||||
|
||||
subp, finished = starter.startAMPProcess(ampChild=Child, ampParent=Parent)
|
||||
subp.callRemote(Ping, data=DATA
|
||||
).addCallback(lambda response:
|
||||
self.assertEquals(response['response'], DATA+APPEND)
|
||||
).addCallback(lambda _: subp.callRemote(commands.Shutdown))
|
||||
return finished
|
||||
|
||||
def test_roundtripError(self):
|
||||
"""
|
||||
Test that invoking a child using an unreachable class raises
|
||||
a L{RunTimeError} .
|
||||
"""
|
||||
class Child(child.AMPChild):
|
||||
pass
|
||||
|
||||
starter = main.ProcessStarter(packages=("twisted", "ampoule"))
|
||||
|
||||
self.assertRaises(RuntimeError, starter.startAMPProcess, ampChild=Child)
|
||||
|
||||
class TestProcessPool(unittest.TestCase):
|
||||
|
||||
def test_startStopWorker(self):
|
||||
"""
|
||||
Test that starting and stopping a worker keeps the state of
|
||||
the process pool consistent.
|
||||
"""
|
||||
pp = pool.ProcessPool()
|
||||
self.assertEquals(pp.started, False)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(pp.processes, set())
|
||||
self.assertEquals(pp._finishCallbacks, {})
|
||||
|
||||
def _checks():
|
||||
self.assertEquals(pp.started, False)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), 1)
|
||||
self.assertEquals(len(pp._finishCallbacks), 1)
|
||||
return pp.stopAWorker()
|
||||
|
||||
def _closingUp(_):
|
||||
self.assertEquals(pp.started, False)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), 0)
|
||||
self.assertEquals(pp._finishCallbacks, {})
|
||||
pp.startAWorker()
|
||||
return _checks().addCallback(_closingUp).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_startAndStop(self):
|
||||
"""
|
||||
Test that a process pool's start and stop method create the
|
||||
expected number of workers and keep state consistent in the
|
||||
process pool.
|
||||
"""
|
||||
pp = pool.ProcessPool()
|
||||
self.assertEquals(pp.started, False)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(pp.processes, set())
|
||||
self.assertEquals(pp._finishCallbacks, {})
|
||||
|
||||
def _checks(_):
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.min)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.min)
|
||||
return pp.stop()
|
||||
|
||||
def _closingUp(_):
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, True)
|
||||
self.assertEquals(len(pp.processes), 0)
|
||||
self.assertEquals(pp._finishCallbacks, {})
|
||||
return pp.start().addCallback(_checks).addCallback(_closingUp)
|
||||
|
||||
def test_adjustPoolSize(self):
|
||||
"""
|
||||
Test that calls to pool.adjustPoolSize are correctly handled.
|
||||
"""
|
||||
pp = pool.ProcessPool(min=10)
|
||||
self.assertEquals(pp.started, False)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(pp.processes, set())
|
||||
self.assertEquals(pp._finishCallbacks, {})
|
||||
|
||||
def _resize1(_):
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.min)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.min)
|
||||
return pp.adjustPoolSize(min=2, max=3)
|
||||
|
||||
def _resize2(_):
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(pp.max, 3)
|
||||
self.assertEquals(pp.min, 2)
|
||||
self.assertEquals(len(pp.processes), pp.max)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.max)
|
||||
|
||||
def _resize3(_):
|
||||
self.assertRaises(AssertionError, pp.adjustPoolSize, min=-1, max=5)
|
||||
self.assertRaises(AssertionError, pp.adjustPoolSize, min=5, max=1)
|
||||
return pp.stop()
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_resize1
|
||||
).addCallback(_resize2
|
||||
).addCallback(_resize3)
|
||||
|
||||
def test_childRestart(self):
|
||||
"""
|
||||
Test that a failing child process is immediately restarted.
|
||||
"""
|
||||
pp = pool.ProcessPool(ampChild=BadChild, min=1)
|
||||
STRING = "DATA"
|
||||
|
||||
def _checks(_):
|
||||
d = pp._finishCallbacks.values()[0]
|
||||
pp.doWork(Die).addErrback(lambda _: None)
|
||||
return d.addBoth(_checksAgain)
|
||||
|
||||
def _checksAgain(_):
|
||||
return pp.doWork(commands.Echo, data=STRING
|
||||
).addCallback(lambda result: self.assertEquals(result['response'], STRING))
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_checks
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_parentProtocolChange(self):
|
||||
"""
|
||||
Test that the father can use an AMP protocol too.
|
||||
"""
|
||||
DATA = "CIAO"
|
||||
APPEND = "123"
|
||||
|
||||
class Parent(amp.AMP):
|
||||
def pong(self, data):
|
||||
return {'response': DATA+APPEND}
|
||||
Pong.responder(pong)
|
||||
|
||||
pp = pool.ProcessPool(ampChild=Child, ampParent=Parent)
|
||||
def _checks(_):
|
||||
return pp.doWork(Ping, data=DATA
|
||||
).addCallback(lambda response:
|
||||
self.assertEquals(response['response'], DATA+APPEND)
|
||||
)
|
||||
|
||||
return pp.start().addCallback(_checks).addCallback(lambda _: pp.stop())
|
||||
|
||||
|
||||
def test_deferToAMPProcess(self):
|
||||
"""
|
||||
Test that deferToAMPProcess works as expected.
|
||||
"""
|
||||
def cleanupGlobalPool():
|
||||
d = pool.pp.stop()
|
||||
pool.pp = None
|
||||
return d
|
||||
self.addCleanup(cleanupGlobalPool)
|
||||
|
||||
STRING = "CIAOOOO"
|
||||
d = pool.deferToAMPProcess(commands.Echo, data=STRING)
|
||||
d.addCallback(self.assertEquals, {"response": STRING})
|
||||
return d
|
||||
|
||||
def test_checkStateInPool(self):
|
||||
"""
|
||||
Test that busy and ready lists are correctly maintained.
|
||||
"""
|
||||
pp = pool.ProcessPool(ampChild=WaitingChild)
|
||||
|
||||
DATA = "foobar"
|
||||
|
||||
def _checks(_):
|
||||
d = pp.callRemote(First, data=DATA)
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.min)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.min)
|
||||
self.assertEquals(len(pp.ready), pp.min-1)
|
||||
self.assertEquals(len(pp.busy), 1)
|
||||
child = pp.busy.pop()
|
||||
pp.busy.add(child)
|
||||
child.callRemote(Second)
|
||||
return d
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_checks
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_growingToMax(self):
|
||||
"""
|
||||
Test that the pool grows over time until it reaches max processes.
|
||||
"""
|
||||
MAX = 5
|
||||
pp = pool.ProcessPool(ampChild=WaitingChild, min=1, max=MAX)
|
||||
|
||||
def _checks(_):
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.min)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.min)
|
||||
|
||||
D = "DATA"
|
||||
d = [pp.doWork(First, data=D) for x in xrange(MAX)]
|
||||
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.max)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.max)
|
||||
|
||||
[child.callRemote(Second) for child in pp.processes]
|
||||
return defer.DeferredList(d)
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_checks
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_growingToMaxAndShrinking(self):
|
||||
"""
|
||||
Test that the pool grows but after 'idle' time the number of
|
||||
processes goes back to the minimum.
|
||||
"""
|
||||
|
||||
MAX = 5
|
||||
MIN = 1
|
||||
IDLE = 1
|
||||
pp = pool.ProcessPool(ampChild=WaitingChild, min=MIN, max=MAX, maxIdle=IDLE)
|
||||
|
||||
def _checks(_):
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.min)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.min)
|
||||
|
||||
D = "DATA"
|
||||
d = [pp.doWork(First, data=D) for x in xrange(MAX)]
|
||||
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.max)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.max)
|
||||
|
||||
[child.callRemote(Second) for child in pp.processes]
|
||||
return defer.DeferredList(d).addCallback(_realChecks)
|
||||
|
||||
def _realChecks(_):
|
||||
from twisted.internet import reactor
|
||||
d = defer.Deferred()
|
||||
def _cb():
|
||||
def __(_):
|
||||
try:
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.min)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.min)
|
||||
d.callback(None)
|
||||
except Exception, e:
|
||||
d.errback(e)
|
||||
return pp._pruneProcesses().addCallback(__)
|
||||
# just to be shure we are called after the pruner
|
||||
pp.looping.stop() # stop the looping, we don't want it to
|
||||
# this right here
|
||||
reactor.callLater(IDLE, _cb)
|
||||
return d
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_checks
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_recycling(self):
|
||||
"""
|
||||
Test that after a given number of calls subprocesses are
|
||||
recycled.
|
||||
"""
|
||||
MAX = 1
|
||||
MIN = 1
|
||||
RECYCLE_AFTER = 1
|
||||
pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER)
|
||||
self.addCleanup(pp.stop)
|
||||
|
||||
def _checks(_):
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.min)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.min)
|
||||
return pp.doWork(Pid
|
||||
).addCallback(lambda response: response['pid'])
|
||||
|
||||
def _checks2(pid):
|
||||
return pp.doWork(Pid
|
||||
).addCallback(lambda response: response['pid']
|
||||
).addCallback(self.assertNotEquals, pid)
|
||||
|
||||
|
||||
d = pp.start()
|
||||
d.addCallback(_checks)
|
||||
d.addCallback(_checks2)
|
||||
return d
|
||||
|
||||
def test_recyclingWithQueueOverload(self):
|
||||
"""
|
||||
Test that we get the correct number of different results when
|
||||
we overload the pool of calls.
|
||||
"""
|
||||
MAX = 5
|
||||
MIN = 1
|
||||
RECYCLE_AFTER = 10
|
||||
CALLS = 60
|
||||
pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER)
|
||||
self.addCleanup(pp.stop)
|
||||
|
||||
def _check(results):
|
||||
s = set()
|
||||
for succeed, response in results:
|
||||
s.add(response['pid'])
|
||||
|
||||
# For the first C{MAX} calls, each is basically guaranteed to go to
|
||||
# a different child. After that, though, there are no guarantees.
|
||||
# All the rest might go to a single child, since the child to
|
||||
# perform a job is selected arbitrarily from the "ready" set. Fair
|
||||
# distribution of jobs needs to be implemented; right now it's "set
|
||||
# ordering" distribution of jobs.
|
||||
self.assertTrue(len(s) > MAX)
|
||||
|
||||
def _work(_):
|
||||
l = [pp.doWork(Pid) for x in xrange(CALLS)]
|
||||
d = defer.DeferredList(l)
|
||||
return d.addCallback(_check)
|
||||
d = pp.start()
|
||||
d.addCallback(_work)
|
||||
return d
|
||||
|
||||
|
||||
def test_disableProcessRecycling(self):
|
||||
"""
|
||||
Test that by setting 0 to recycleAfter we actually disable process recycling.
|
||||
"""
|
||||
MAX = 1
|
||||
MIN = 1
|
||||
RECYCLE_AFTER = 0
|
||||
pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER)
|
||||
|
||||
def _checks(_):
|
||||
self.assertEquals(pp.started, True)
|
||||
self.assertEquals(pp.finished, False)
|
||||
self.assertEquals(len(pp.processes), pp.min)
|
||||
self.assertEquals(len(pp._finishCallbacks), pp.min)
|
||||
return pp.doWork(Pid
|
||||
).addCallback(lambda response: response['pid'])
|
||||
|
||||
def _checks2(pid):
|
||||
return pp.doWork(Pid
|
||||
).addCallback(lambda response: response['pid']
|
||||
).addCallback(self.assertEquals, pid
|
||||
).addCallback(lambda _: pid)
|
||||
|
||||
def finish(reason):
|
||||
return pp.stop().addCallback(lambda _: reason)
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_checks
|
||||
).addCallback(_checks2
|
||||
).addCallback(_checks2
|
||||
).addCallback(finish)
|
||||
|
||||
def test_changeChildrenReactor(self):
|
||||
"""
|
||||
Test that by passing the correct argument children change their
|
||||
reactor type.
|
||||
"""
|
||||
MAX = 1
|
||||
MIN = 1
|
||||
FIRST = "select"
|
||||
SECOND = "poll"
|
||||
|
||||
def checkDefault():
|
||||
pp = pool.ProcessPool(
|
||||
starter=main.ProcessStarter(
|
||||
childReactor=FIRST,
|
||||
packages=("twisted", "ampoule")),
|
||||
ampChild=ReactorChild, min=MIN, max=MAX)
|
||||
pp.start()
|
||||
return pp.doWork(Reactor
|
||||
).addCallback(self.assertEquals, {'classname': "SelectReactor"}
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def checkPool(_):
|
||||
pp = pool.ProcessPool(
|
||||
starter=main.ProcessStarter(
|
||||
childReactor=SECOND,
|
||||
packages=("twisted", "ampoule")),
|
||||
ampChild=ReactorChild, min=MIN, max=MAX)
|
||||
pp.start()
|
||||
return pp.doWork(Reactor
|
||||
).addCallback(self.assertEquals, {'classname': "PollReactor"}
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
return checkDefault(
|
||||
).addCallback(checkPool)
|
||||
try:
|
||||
from select import poll
|
||||
except ImportError:
|
||||
test_changeChildrenReactor.skip = "This architecture doesn't support select.poll, I can't run this test"
|
||||
|
||||
def test_commandsWithoutResponse(self):
|
||||
"""
|
||||
Test that if we send a command without a required answer we
|
||||
actually don't have any problems.
|
||||
"""
|
||||
DATA = "hello"
|
||||
pp = pool.ProcessPool(ampChild=NoResponseChild, min=1, max=1)
|
||||
|
||||
def _check(_):
|
||||
return pp.doWork(GetResponse
|
||||
).addCallback(self.assertEquals, {"response": DATA})
|
||||
|
||||
def _work(_):
|
||||
return pp.doWork(NoResponse, arg=DATA)
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_work
|
||||
).addCallback(_check
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_SupplyChildArgs(self):
|
||||
"""Ensure that arguments for the child constructor are passed in."""
|
||||
pp = pool.ProcessPool(Writer, ampChildArgs=['body'], min=0)
|
||||
def _check(result):
|
||||
return pp.doWork(Write).addCallback(
|
||||
self.assertEquals, {'response': 'body'})
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_check
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def processTimeoutTest(self, timeout):
|
||||
pp = pool.ProcessPool(WaitingChild, min=1, max=1)
|
||||
|
||||
def _work(_):
|
||||
d = pp.callRemote(First, data="ciao", _timeout=timeout)
|
||||
self.assertFailure(d, error.ProcessTerminated)
|
||||
return d
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_work
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_processTimeout(self):
|
||||
"""
|
||||
Test that a call that doesn't finish within the given timeout
|
||||
time is correctly handled.
|
||||
"""
|
||||
return self.processTimeoutTest(1)
|
||||
|
||||
def test_processTimeoutZero(self):
|
||||
"""
|
||||
Test that the process is correctly handled when the timeout is zero.
|
||||
"""
|
||||
return self.processTimeoutTest(0)
|
||||
|
||||
def test_processDeadline(self):
|
||||
pp = pool.ProcessPool(WaitingChild, min=1, max=1)
|
||||
|
||||
def _work(_):
|
||||
d = pp.callRemote(First, data="ciao", _deadline=reactor.seconds())
|
||||
self.assertFailure(d, error.ProcessTerminated)
|
||||
return d
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_work
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_processBeforeDeadline(self):
|
||||
pp = pool.ProcessPool(PidChild, min=1, max=1)
|
||||
|
||||
def _work(_):
|
||||
d = pp.callRemote(Pid, _deadline=reactor.seconds() + 10)
|
||||
d.addCallback(lambda result: self.assertNotEqual(result['pid'], 0))
|
||||
return d
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_work
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_processTimeoutSignal(self):
|
||||
"""
|
||||
Test that a call that doesn't finish within the given timeout
|
||||
time is correctly handled.
|
||||
"""
|
||||
pp = pool.ProcessPool(WaitingChild, min=1, max=1,
|
||||
timeout_signal=SIGHUP)
|
||||
|
||||
def _work(_):
|
||||
d = pp.callRemote(First, data="ciao", _timeout=1)
|
||||
d.addCallback(lambda d: self.fail())
|
||||
text = 'signal %d' % SIGHUP
|
||||
d.addErrback(
|
||||
lambda f: self.assertTrue(text in f.value[0],
|
||||
'"%s" not in "%s"' % (text, f.value[0])))
|
||||
return d
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_work
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
||||
def test_processGlobalTimeout(self):
|
||||
"""
|
||||
Test that a call that doesn't finish within the given global
|
||||
timeout time is correctly handled.
|
||||
"""
|
||||
pp = pool.ProcessPool(WaitingChild, min=1, max=1, timeout=1)
|
||||
|
||||
def _work(_):
|
||||
d = pp.callRemote(First, data="ciao")
|
||||
self.assertFailure(d, error.ProcessTerminated)
|
||||
return d
|
||||
|
||||
return pp.start(
|
||||
).addCallback(_work
|
||||
).addCallback(lambda _: pp.stop())
|
||||
|
|
@ -1,49 +0,0 @@
|
|||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.protocol import ClientFactory
|
||||
from twisted.trial import unittest
|
||||
from twisted.protocols import amp
|
||||
|
||||
from src.utils.ampoule import service, child, pool, main
|
||||
from src.utils.ampoule.commands import Echo
|
||||
|
||||
class ClientAMP(amp.AMP):
|
||||
factory = None
|
||||
def connectionMade(self):
|
||||
if self.factory is not None:
|
||||
self.factory.theProto = self
|
||||
if hasattr(self.factory, 'onMade'):
|
||||
self.factory.onMade.callback(None)
|
||||
|
||||
class TestAMPProxy(unittest.TestCase):
|
||||
def setUp(self):
|
||||
"""
|
||||
Setup the proxy service and the client connection to the proxy
|
||||
service in order to run call through them.
|
||||
|
||||
Inspiration comes from twisted.test.test_amp
|
||||
"""
|
||||
self.pp = pool.ProcessPool()
|
||||
self.svc = service.AMPouleService(self.pp, child.AMPChild, 0, "")
|
||||
self.svc.startService()
|
||||
self.proxy_port = self.svc.server.getHost().port
|
||||
self.clientFactory = ClientFactory()
|
||||
self.clientFactory.protocol = ClientAMP
|
||||
d = self.clientFactory.onMade = defer.Deferred()
|
||||
self.clientConn = reactor.connectTCP("127.0.0.1",
|
||||
self.proxy_port,
|
||||
self.clientFactory)
|
||||
self.addCleanup(self.clientConn.disconnect)
|
||||
self.addCleanup(self.svc.stopService)
|
||||
def setClient(_):
|
||||
self.client = self.clientFactory.theProto
|
||||
return d.addCallback(setClient)
|
||||
|
||||
def test_forwardCall(self):
|
||||
"""
|
||||
Test that a call made from a client is correctly forwarded to
|
||||
the process pool and the result is correctly reported.
|
||||
"""
|
||||
DATA = "hello"
|
||||
return self.client.callRemote(Echo, data=DATA).addCallback(
|
||||
self.assertEquals, {'response': DATA}
|
||||
)
|
||||
|
|
@ -1,46 +0,0 @@
|
|||
"""
|
||||
some utilities
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import __main__
|
||||
|
||||
from twisted.python.filepath import FilePath
|
||||
from twisted.python.reflect import namedAny
|
||||
# from twisted.python.modules import theSystemPath
|
||||
|
||||
def findPackagePath(modulePath):
|
||||
"""
|
||||
Try to find the sys.path entry from a modulePath object, simultaneously
|
||||
computing the module name of the targetted file.
|
||||
"""
|
||||
p = modulePath
|
||||
l = [p.basename().split(".")[0]]
|
||||
while p.parent() != p:
|
||||
for extension in ['py', 'pyc', 'pyo', 'pyd', 'dll']:
|
||||
sib = p.sibling("__init__."+extension)
|
||||
if sib.exists():
|
||||
p = p.parent()
|
||||
l.insert(0, p.basename())
|
||||
break
|
||||
else:
|
||||
return p.parent(), '.'.join(l)
|
||||
|
||||
|
||||
def mainpoint(function):
|
||||
"""
|
||||
Decorator which declares a function to be an object's mainpoint.
|
||||
"""
|
||||
if function.__module__ == '__main__':
|
||||
# OK time to run a function
|
||||
p = FilePath(__main__.__file__)
|
||||
p, mn = findPackagePath(p)
|
||||
pname = p.path
|
||||
if pname not in map(os.path.abspath, sys.path):
|
||||
sys.path.insert(0, pname)
|
||||
# Maybe remove the module's path?
|
||||
exitcode = namedAny(mn+'.'+function.__name__)(sys.argv)
|
||||
if exitcode is None:
|
||||
exitcode = 0
|
||||
sys.exit(exitcode)
|
||||
return function
|
||||
|
|
@ -7,7 +7,6 @@ a higher layer module.
|
|||
"""
|
||||
from traceback import format_exc
|
||||
from twisted.python import log
|
||||
from src.utils import utils
|
||||
|
||||
def log_trace(errmsg=None):
|
||||
"""
|
||||
|
|
@ -22,7 +21,7 @@ def log_trace(errmsg=None):
|
|||
log.msg('[::] %s' % line)
|
||||
if errmsg:
|
||||
try:
|
||||
errmsg = utils.to_str(errmsg)
|
||||
errmsg = str(errmsg)
|
||||
except Exception, e:
|
||||
errmsg = str(e)
|
||||
for line in errmsg.splitlines():
|
||||
|
|
@ -37,7 +36,7 @@ def log_errmsg(errmsg):
|
|||
errormsg: (string) The message to be logged.
|
||||
"""
|
||||
try:
|
||||
errmsg = utils.to_str(errmsg)
|
||||
errmsg = str(errmsg)
|
||||
except Exception, e:
|
||||
errmsg = str(e)
|
||||
for line in errmsg.splitlines():
|
||||
|
|
@ -51,7 +50,7 @@ def log_warnmsg(warnmsg):
|
|||
warnmsg: (string) The message to be logged.
|
||||
"""
|
||||
try:
|
||||
warnmsg = utils.to_str(warnmsg)
|
||||
warnmsg = str(warnmsg)
|
||||
except Exception, e:
|
||||
warnmsg = str(e)
|
||||
for line in warnmsg.splitlines():
|
||||
|
|
@ -65,7 +64,7 @@ def log_infomsg(infomsg):
|
|||
infomsg: (string) The message to be logged.
|
||||
"""
|
||||
try:
|
||||
infomsg = utils.to_str(infomsg)
|
||||
infomsg = str(infomsg)
|
||||
except Exception, e:
|
||||
infomsg = str(e)
|
||||
for line in infomsg.splitlines():
|
||||
|
|
@ -76,7 +75,7 @@ def log_depmsg(depmsg):
|
|||
Prints a deprecation message
|
||||
"""
|
||||
try:
|
||||
depmsg = utils.to_str(depmsg)
|
||||
depmsg = str(depmsg)
|
||||
except Exception, e:
|
||||
depmsg = str(e)
|
||||
for line in depmsg.splitlines():
|
||||
|
|
|
|||
|
|
@ -20,13 +20,10 @@ except ImportError:
|
|||
import pickle
|
||||
|
||||
ENCODINGS = settings.ENCODINGS
|
||||
_LOGGER = None
|
||||
_GA = object.__getattribute__
|
||||
_SA = object.__setattr__
|
||||
_DA = object.__delattr__
|
||||
|
||||
|
||||
|
||||
def is_iter(iterable):
|
||||
"""
|
||||
Checks if an object behaves iterably. However,
|
||||
|
|
@ -465,10 +462,10 @@ def format_table(table, extra_space=1):
|
|||
return ftable
|
||||
|
||||
|
||||
|
||||
_FROM_MODEL_MAP = None
|
||||
_TO_DBOBJ = lambda o: (hasattr(o, "dbobj") and o.dbobj) or o
|
||||
_TO_PACKED_DBOBJ = lambda natural_key, dbref: ('__packed_dbobj__', natural_key, dbref)
|
||||
_DUMPS = None
|
||||
def to_pickle(data, do_pickle=True, emptypickle=True):
|
||||
"""
|
||||
Prepares object for being pickled. This will remap database models
|
||||
|
|
@ -482,11 +479,10 @@ def to_pickle(data, do_pickle=True, emptypickle=True):
|
|||
Database objects are stored as ('__packed_dbobj__', <natural_key_tuple>, <dbref>)
|
||||
"""
|
||||
# prepare globals
|
||||
global _DUMPS, _LOADS, _FROM_MODEL_MAP
|
||||
global _DUMPS, _FROM_MODEL_MAP
|
||||
_DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
|
||||
if not _DUMPS:
|
||||
_DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
|
||||
if not _LOADS:
|
||||
_LOADS = lambda data: pickle.loads(to_str(data))
|
||||
if not _FROM_MODEL_MAP:
|
||||
_FROM_MODEL_MAP = defaultdict(str)
|
||||
_FROM_MODEL_MAP.update(dict((c.model, c.natural_key()) for c in ContentType.objects.all()))
|
||||
|
|
@ -511,12 +507,14 @@ def to_pickle(data, do_pickle=True, emptypickle=True):
|
|||
# do recursive conversion
|
||||
data = iter_db2id(data)
|
||||
if do_pickle and not (not emptypickle and not data and data != False):
|
||||
print "_DUMPS2:", _DUMPS
|
||||
return _DUMPS(data)
|
||||
return data
|
||||
|
||||
_TO_MODEL_MAP = None
|
||||
_IS_PACKED_DBOBJ = lambda o: type(o)== tuple and len(o)==3 and o[0]=='__packed_dbobj__'
|
||||
_TO_TYPECLASS = lambda o: (hasattr(o, 'typeclass') and o.typeclass) or o
|
||||
_LOADS = None
|
||||
from django.db import transaction
|
||||
@transaction.autocommit
|
||||
def from_pickle(data, do_pickle=True):
|
||||
|
|
@ -528,9 +526,7 @@ def from_pickle(data, do_pickle=True):
|
|||
do_pickle - actually unpickle the input before continuing
|
||||
"""
|
||||
# prepare globals
|
||||
global _DUMPS, _LOADS, _TO_MODEL_MAP
|
||||
if not _DUMPS:
|
||||
_DUMPS = lambda data: to_str(pickle.dumps(data, pickle.HIGHEST_PROTOCOL))
|
||||
global _LOADS, _TO_MODEL_MAP
|
||||
if not _LOADS:
|
||||
_LOADS = lambda data: pickle.loads(to_str(data))
|
||||
if not _TO_MODEL_MAP:
|
||||
|
|
@ -572,8 +568,8 @@ def clean_object_caches(obj):
|
|||
global _TYPECLASSMODELS, _OBJECTMODELS
|
||||
if not _TYPECLASSMODELS:
|
||||
from src.typeclasses import models as _TYPECLASSMODELS
|
||||
if not _OBJECTMODELS:
|
||||
from src.objects import models as _OBJECTMODELS
|
||||
#if not _OBJECTMODELS:
|
||||
# from src.objects import models as _OBJECTMODELS
|
||||
|
||||
#print "recaching:", obj
|
||||
if not obj:
|
||||
|
|
@ -596,8 +592,6 @@ def clean_object_caches(obj):
|
|||
|
||||
_PPOOL = None
|
||||
_PCMD = None
|
||||
_DUMPS = None
|
||||
_LOADS = None
|
||||
_PROC_ERR = "A process has ended with a probable error condition: process ended by signal 9."
|
||||
def run_async(to_execute, *args, **kwargs):
|
||||
"""
|
||||
|
|
@ -609,23 +603,8 @@ def run_async(to_execute, *args, **kwargs):
|
|||
arguments.
|
||||
The callable will be executed using ProcPool, or in
|
||||
a thread if ProcPool is not available.
|
||||
to_execute (string) - this is only available is ProcPool is
|
||||
running. If a string, to_execute this will be treated as a code
|
||||
snippet to execute asynchronously. *args are then not used
|
||||
and non-reserverd *kwargs are used to define the execution
|
||||
environment made available to the code.
|
||||
|
||||
reserved kwargs:
|
||||
'use_thread' (bool) - this only works with callables (not code).
|
||||
It forces the code to run in a thread instead
|
||||
of using the Process Pool, even if the latter
|
||||
is available. This could be useful if you want
|
||||
to make sure to not get out of sync with the
|
||||
main process (such as accessing in-memory global
|
||||
properties)
|
||||
'proc_timeout' (int) - only used if ProcPool is available. Sets a
|
||||
max time for execution. This alters the value set
|
||||
by settings.PROCPOOL_TIMEOUT
|
||||
'at_return' -should point to a callable with one argument.
|
||||
It will be called with the return value from
|
||||
to_execute.
|
||||
|
|
@ -636,32 +615,20 @@ def run_async(to_execute, *args, **kwargs):
|
|||
'at_err_kwargs' - this dictionary will be used as keyword
|
||||
arguments to the at_err errback.
|
||||
|
||||
*args - if to_execute is a callable, these args will be used
|
||||
*args - these args will be used
|
||||
as arguments for that function. If to_execute is a string
|
||||
*args are not used.
|
||||
*kwargs - if to_execute is a callable, these kwargs will be used
|
||||
*kwargs - these kwargs will be used
|
||||
as keyword arguments in that function. If a string, they
|
||||
instead are used to define the executable environment
|
||||
that should be available to execute the code in to_execute.
|
||||
|
||||
run_async will either relay the code to a thread or to a processPool
|
||||
depending on input and what is available in the system. To activate
|
||||
Process pooling, settings.PROCPOOL_ENABLE must be set.
|
||||
|
||||
to_execute in string form should handle all imports needed. kwargs
|
||||
can be used to send objects and properties. Such properties will
|
||||
be pickled, except Database Objects which will be sent across
|
||||
on a special format and re-loaded on the other side.
|
||||
|
||||
To get a return value from your code snippet, Use the _return()
|
||||
function: Every call to this function from your snippet will
|
||||
append the argument to an internal list of returns. This return value
|
||||
(or a list) will be the first argument to the at_return callback.
|
||||
run_async will relay executed code to a thread.
|
||||
|
||||
Use this function with restrain and only for features/commands
|
||||
that you know has no influence on the cause-and-effect order of your
|
||||
game (commands given after the async function might be executed before
|
||||
it has finished). Accessing the same property from different threads/processes
|
||||
it has finished). Accessing the same property from different threads
|
||||
can lead to unpredicted behaviour if you are not careful (this is called a
|
||||
"race condition").
|
||||
|
||||
|
|
@ -671,72 +638,14 @@ def run_async(to_execute, *args, **kwargs):
|
|||
tracebacks.
|
||||
|
||||
"""
|
||||
# handle all global imports.
|
||||
global _PPOOL, _PCMD
|
||||
if _PPOOL == None:
|
||||
# Try to load process Pool
|
||||
from src.server.sessionhandler import SESSIONS as _SESSIONS
|
||||
try:
|
||||
_PPOOL = _SESSIONS.server.services.namedServices.get("ProcPool").pool
|
||||
except AttributeError:
|
||||
_PPOOL = False
|
||||
if not _PCMD:
|
||||
from src.server.procpool import ExecuteCode as _PCMD
|
||||
|
||||
use_timeout = kwargs.pop("proc_timeout", None)
|
||||
|
||||
# helper converters for callbacks/errbacks
|
||||
def convert_return(f):
|
||||
def func(ret, *args, **kwargs):
|
||||
rval = ret["response"] and from_pickle(ret["response"])
|
||||
reca = ret["recached"] and from_pickle(ret["recached"])
|
||||
# recache all indicated objects
|
||||
[clean_object_caches(obj) for obj in reca]
|
||||
if f: return f(rval, *args, **kwargs)
|
||||
else: return rval
|
||||
return func
|
||||
def convert_err(f):
|
||||
def func(err, *args, **kwargs):
|
||||
err.trap(Exception)
|
||||
err = err.getErrorMessage()
|
||||
if use_timeout and err == _PROC_ERR:
|
||||
err = "Process took longer than %ss and timed out." % use_timeout
|
||||
if f:
|
||||
return f(err, *args, **kwargs)
|
||||
else:
|
||||
global _LOGGER
|
||||
if not _LOGGER:
|
||||
from src.utils import logger as _LOGGER
|
||||
err = "Error reported from subprocess: '%s'" % err
|
||||
_LOGGER.log_errmsg(err)
|
||||
return func
|
||||
|
||||
# handle special reserved input kwargs
|
||||
use_thread = kwargs.pop("use_thread", False)
|
||||
callback = convert_return(kwargs.pop("at_return", None))
|
||||
errback = convert_err(kwargs.pop("at_err", None))
|
||||
callback_kwargs = kwargs.pop("at_return_kwargs", {})
|
||||
errback_kwargs = kwargs.pop("at_err_kwargs", {})
|
||||
|
||||
if _PPOOL and not use_thread:
|
||||
# process pool is running
|
||||
if isinstance(to_execute, basestring):
|
||||
# run source code in process pool
|
||||
cmdargs = {"_timeout":use_timeout}
|
||||
cmdargs["source"] = to_str(to_execute)
|
||||
cmdargs["environment"] = to_pickle(kwargs, emptypickle=False) or ""
|
||||
# defer to process pool
|
||||
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
|
||||
elif callable(to_execute):
|
||||
# execute callable in process
|
||||
callname = to_execute.__name__
|
||||
cmdargs = {"_timeout":use_timeout}
|
||||
cmdargs["source"] = "_return(%s(*args,**kwargs))" % callname
|
||||
cmdargs["environment"] = to_pickle({callname:to_execute, "args":args, "kwargs":kwargs})
|
||||
deferred = _PPOOL.doWork(_PCMD, **cmdargs)
|
||||
else:
|
||||
raise RuntimeError("'%s' could not be handled by run_async" % to_execute)
|
||||
elif callable(to_execute):
|
||||
if callable(to_execute):
|
||||
# no process pool available, fall back to old deferToThread mechanism.
|
||||
deferred = threads.deferToThread(to_execute, *args, **kwargs)
|
||||
else:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue