Rework launcher with persistent connection for better server status reporting. Still some errors.

This commit is contained in:
Griatch 2018-01-16 00:11:46 +01:00
parent aa7b57e92f
commit 66c209a314
4 changed files with 238 additions and 96 deletions

View file

@ -106,6 +106,26 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol):
# back with the Server side
self.send_AdminServer2Portal(amp.DUMMYSESSION, operation=amp.PSYNC)
def data_to_portal(self, command, sessid, **kwargs):
"""
Send data across the wire to the Portal
Args:
command (AMP Command): A protocol send command.
sessid (int): A unique Session id.
kwargs (any): Any data to pickle into the command.
Returns:
deferred (deferred or None): A deferred with an errback.
Notes:
Data will be sent across the wire pickled as a tuple
(sessid, kwargs).
"""
return self.callRemote(command, packed_data=amp.dumps((sessid, kwargs))).addErrback(
self.errback, command.key)
def send_MsgServer2Portal(self, session, **kwargs):
"""
Access method - executed on the Server for sending data
@ -116,7 +136,7 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol):
kwargs (any, optiona): Extra data.
"""
return self.data_out(amp.MsgServer2Portal, session.sessid, **kwargs)
return self.data_to_portal(amp.MsgServer2Portal, session.sessid, **kwargs)
def send_AdminServer2Portal(self, session, operation="", **kwargs):
"""
@ -131,7 +151,8 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol):
kwargs (dict, optional): Data going into the adminstrative.
"""
return self.data_out(amp.AdminServer2Portal, session.sessid, operation=operation, **kwargs)
return self.data_to_portal(amp.AdminServer2Portal, session.sessid,
operation=operation, **kwargs)
# receiving AMP data

View file

@ -19,7 +19,7 @@ import shutil
import importlib
from distutils.version import LooseVersion
from argparse import ArgumentParser
from subprocess import Popen, check_output, call, CalledProcessError, STDOUT, PIPE
from subprocess import Popen, check_output, call, CalledProcessError, STDOUT
try:
import cPickle as pickle
@ -57,9 +57,6 @@ CURRENT_DIR = os.getcwd()
GAMEDIR = CURRENT_DIR
# Operational setup
AMP_PORT = None
AMP_HOST = None
AMP_INTERFACE = None
SERVER_LOGFILE = None
PORTAL_LOGFILE = None
@ -81,6 +78,9 @@ ENFORCED_SETTING = False
# communication constants
AMP_PORT = None
AMP_HOST = None
AMP_INTERFACE = None
AMP_CONNECTION = None
SRELOAD = chr(14) # server reloading (have portal start a new server)
@ -461,7 +461,35 @@ class MsgLauncher2Portal(amp.Command):
arguments = [('operation', amp.String()),
('arguments', amp.String())]
errors = {Exception: 'EXCEPTION'}
response = [('result', amp.String())]
response = []
class AMPLauncherProtocol(amp.AMP):
"""
Defines callbacks to the launcher
"""
def __init__(self):
self.on_status = []
def wait_for_status(self, callback):
"""
Register a waiter for a status return.
"""
self.on_status.append(callback)
@MsgStatus.responder
def receive_status_from_portal(self, status):
"""
Get a status signal from portal - fire callbacks
"""
status = pickle.loads(status)
for callback in self.on_status:
callback(status)
self.on_status = []
return {"status": ""}
def send_instruction(operation, arguments, callback=None, errback=None):
@ -475,39 +503,54 @@ def send_instruction(operation, arguments, callback=None, errback=None):
print(ERROR_AMP_UNCONFIGURED)
sys.exit()
def _timeout(*args):
print("Client timed out.")
reactor.stop()
def _callback(result):
if callback:
callback(result)
# prot.transport.loseConnection()
def _errback(fail):
if errback:
errback(fail)
# prot.transport.loseConnection()
def _on_connect(prot):
"""
This fires with the protocol when connection is established. We
immediately send off the instruction then shut down.
immediately send off the instruction
"""
def _callback(result):
if callback:
callback(result)
prot.transport.loseConnection()
def _errback(fail):
if errback:
errback(fail)
prot.transport.loseConnection()
if operation == PSTATUS:
prot.callRemote(MsgStatus, status="").addCallbacks(_callback, _errback)
else:
prot.callRemote(
MsgLauncher2Portal,
operation=operation,
arguments=pickle.dumps(arguments, pickle.HIGHEST_PROTOCOL)).addCallbacks(
_callback, _errback)
global AMP_CONNECTION
AMP_CONNECTION = prot
_send()
def _on_connect_fail(fail):
"This is called if portal is not reachable."
errback(fail)
point = endpoints.TCP4ClientEndpoint(reactor, AMP_HOST, AMP_PORT)
deferred = endpoints.connectProtocol(point, amp.AMP())
deferred.addCallbacks(_on_connect, _on_connect_fail)
return deferred
def _send():
if operation == PSTATUS:
return AMP_CONNECTION.callRemote(MsgStatus, status="").addCallbacks(_callback, _errback)
else:
return AMP_CONNECTION.callRemote(
MsgLauncher2Portal,
operation=operation,
arguments=pickle.dumps(arguments, pickle.HIGHEST_PROTOCOL)).addCallbacks(
_callback, _errback)
if AMP_CONNECTION:
# already connected - send right away
_send()
else:
# we must connect first, send once connected
point = endpoints.TCP4ClientEndpoint(reactor, AMP_HOST, AMP_PORT)
deferred = endpoints.connectProtocol(point, AMPLauncherProtocol())
deferred.addCallbacks(_on_connect, _on_connect_fail)
if not reactor.running:
reactor.run()
def _parse_status(response):
@ -541,7 +584,6 @@ def _get_twistd_cmdline(pprofiler, sprofiler):
"--profiler=cprofiler",
"--profile={}".format(SPROFILER_LOGFILE)])
return portal_cmd, server_cmd
@ -565,7 +607,16 @@ def query_status(repeat=False):
reactor.stop()
send_instruction(PSTATUS, None, _callback, _errback)
reactor.run()
def wait_for_status_reply(callback):
"""
Wait for an explicit STATUS signal to be sent back from Evennia.
"""
if AMP_CONNECTION:
AMP_CONNECTION.wait_for_status(callback)
else:
print("No Evennia connection established.")
def wait_for_status(portal_running=True, server_running=True, callback=None, errback=None,
@ -587,7 +638,7 @@ def wait_for_status(portal_running=True, server_running=True, callback=None, err
def _callback(response):
prun, srun, _, _ = _parse_status(response)
if ((portal_running is None or prun == portal_running) and
(server_running is None or srun == server_running)):
(server_running is None or srun == server_running)):
# the correct state was achieved
if callback:
callback(prun, srun)
@ -653,7 +704,8 @@ def start_evennia(pprofiler=False, sprofiler=False):
reactor.stop()
def _portal_started(*args):
send_instruction(SSTART, server_cmd, _server_started)
wait_for_status_reply(_server_started)
send_instruction(SSTART, server_cmd)
def _portal_running(response):
prun, srun, ppid, spid = _parse_status(response)
@ -675,12 +727,14 @@ def start_evennia(pprofiler=False, sprofiler=False):
wait_for_status(True, None, _portal_started)
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
reactor.run()
def reload_evennia(sprofiler=False, reset=False):
"""
This will instruct the Portal to reboot the Server component.
This will instruct the Portal to reboot the Server component. We
do this manually by telling the server to shutdown (in reload mode)
and wait for the portal to report back, at which point we start the
server again. This way we control the process exactly.
"""
_, server_cmd = _get_twistd_cmdline(False, sprofiler)
@ -689,23 +743,24 @@ def reload_evennia(sprofiler=False, reset=False):
print("... Server re-started.")
reactor.stop()
def _server_reloaded(*args):
print("... Server {}.".format("reset" if reset else "reloaded"))
def _server_reloaded(status):
print("{} ... Server {}.".format(status, "reset" if reset else "reloaded"))
reactor.stop()
def _server_not_running(*args):
def _server_stopped(status):
wait_for_status_reply(_server_reloaded)
send_instruction(SSTART, server_cmd)
wait_for_status(True, True, _server_reloaded)
def _portal_running(response):
_, srun, _, _ = _parse_status(response)
if srun:
print("Server {}...".format("resetting" if reset else "reloading"))
send_instruction(SRESET if reset else SRELOAD, server_cmd)
wait_for_status(True, False, _server_not_running)
wait_for_status_reply(_server_stopped)
send_instruction(SRESET if reset else SRELOAD, {})
else:
print("Server down. Re-starting ...")
send_instruction(SSTART, server_cmd, _server_restarted)
wait_for_status_reply(_server_restarted)
send_instruction(SSTART, server_cmd)
def _portal_not_running(fail):
print("Evennia not running. Starting from scratch ...")
@ -713,7 +768,6 @@ def reload_evennia(sprofiler=False, reset=False):
# get portal status
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
reactor.run()
def stop_evennia():
@ -735,18 +789,17 @@ def stop_evennia():
if srun:
print("Server stopping ...")
send_instruction(SSHUTD, {})
wait_for_status(True, False, _server_stopped)
wait_for_status_reply(_server_stopped)
else:
print("Server already stopped.\nStopping Portal ...")
send_instruction(PSHUTD, {})
wait_for_status(False, False, _portal_stopped)
wait_for_status(False, None, _portal_stopped)
def _portal_not_running(fail):
print("Evennia is not running.")
reactor.stop()
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
reactor.run()
def stop_server_only():
@ -762,8 +815,8 @@ def stop_server_only():
_, srun, _, _ = _parse_status(response)
if srun:
print("Server stopping ...")
wait_for_status_reply(_server_stopped)
send_instruction(SSHUTD, {})
wait_for_status(True, False, _server_stopped)
else:
print("Server is not running.")
@ -771,7 +824,6 @@ def stop_server_only():
print("Evennia is not running.")
send_instruction(PSTATUS, None, _portal_running, _portal_not_running)
reactor.run()
def evennia_version():

View file

@ -155,7 +155,7 @@ class MsgLauncher2Portal(amp.Command):
arguments = [('operation', amp.String()),
('arguments', amp.String())]
errors = {Exception: 'EXCEPTION'}
response = [('result', amp.String())]
response = []
class MsgPortal2Server(amp.Command):
@ -335,9 +335,9 @@ class AMPMultiConnectionProtocol(amp.AMP):
"""
return loads(packed_data)
def data_out(self, command, sessid, **kwargs):
def broadcast(self, command, sessid, **kwargs):
"""
Send data across the wire. Always use this to send.
Send data across the wire to all connections.
Args:
command (AMP Command): A protocol send command.
@ -353,9 +353,9 @@ class AMPMultiConnectionProtocol(amp.AMP):
"""
deferreds = []
for protcl in self.factory.broadcasts:
deferreds.append(protcl.callRemote(command,
packed_data=dumps((sessid, kwargs))).addErrback(
self.errback, command.key))
deferreds.append(protcl.callRemote(command, **kwargs).addErrback(
self.errback, command.key))
return DeferredList(deferreds)
# generic function send/recvs

View file

@ -49,7 +49,9 @@ class AMPServerFactory(protocol.ServerFactory):
self.protocol = AMPServerProtocol
self.broadcasts = []
self.server_connection = None
self.launcher_connection = None
self.disconnect_callbacks = {}
self.server_connect_callbacks = []
def buildProtocol(self, addr):
"""
@ -72,12 +74,18 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
Protocol subclass for the AMP-server run by the Portal.
"""
def connectionLost(self, reason):
"""
Set up a simple callback mechanism to let the amp-server wait for a connection to close.
"""
# wipe broadcast and data memory
super(AMPServerProtocol, self).connectionLost(reason)
if self.factory.server_connection == self:
self.factory.server_connection = None
if self.factory.launcher_connection == self:
self.factory.launcher_connection = None
callback, args, kwargs = self.factory.disconnect_callbacks.pop(self, (None, None, None))
if callback:
try:
@ -85,6 +93,45 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
except Exception:
logger.log_trace()
def get_status(self):
"""
Return status for the Evennia infrastructure.
Returns:
status (tuple): The portal/server status and pids
(portal_live, server_live, portal_PID, server_PID).
"""
server_connected = bool(self.factory.server_connection and
self.factory.server_connection.transport.connected)
server_pid = self.factory.portal.server_process_id
portal_pid = os.getpid()
return (True, server_connected, portal_pid, server_pid)
def data_to_server(self, command, sessid, **kwargs):
"""
Send data across the wire to the Server.
Args:
command (AMP Command): A protocol send command.
sessid (int): A unique Session id.
Returns:
deferred (deferred or None): A deferred with an errback.
Notes:
Data will be sent across the wire pickled as a tuple
(sessid, kwargs).
"""
if self.factory.server_connection:
return self.factory.server_connection.callRemote(
command, packed_data=amp.dumps((sessid, kwargs))).addErrback(
self.errback, command.key)
else:
# if no server connection is available, broadcast
return self.broadcast(command, sessid, packed_data=amp.dumps((sessid, kwargs)))
def start_server(self, server_twistd_cmd):
"""
(Re-)Launch the Evennia server.
@ -122,6 +169,17 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
"""
self.factory.disconnect_callbacks[self] = (callback, args, kwargs)
def wait_for_server_connect(self, callback, *args, **kwargs):
"""
Add a callback for when the Server is sure to have connected.
Args:
callback (callable): Will be called with *args, **kwargs
once the Server handshake with Portal is complete.
"""
self.factory.server_connect_callbacks.append((callback, args, kwargs))
def stop_server(self, mode='shutdown'):
"""
Shut down server in one or more modes.
@ -139,6 +197,17 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
# sending amp data
def send_Status2Launcher(self):
"""
Send a status stanza to the launcher.
"""
if self.factory.launcher_connection:
self.factory.launcher_connection.callRemote(
amp.MsgStatus,
status=amp.dumps(self.get_status())).addErrback(
self.errback, amp.MsgStatus.key)
def send_MsgPortal2Server(self, session, **kwargs):
"""
Access method called by the Portal and executed on the Portal.
@ -151,7 +220,7 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
deferred (Deferred): Asynchronous return.
"""
return self.data_out(amp.MsgPortal2Server, session.sessid, **kwargs)
return self.data_to_server(amp.MsgPortal2Server, session.sessid, **kwargs)
def send_AdminPortal2Server(self, session, operation="", **kwargs):
"""
@ -166,7 +235,8 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
data (str or dict, optional): Data used in the administrative operation.
"""
return self.data_out(amp.AdminPortal2Server, session.sessid, operation=operation, **kwargs)
return self.data_to_server(amp.AdminPortal2Server, session.sessid,
operation=operation, **kwargs)
# receive amp data
@ -183,16 +253,7 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
(portal_running, server_running, portal_pid, server_pid).
"""
# check if the server is connected
server_connected = (self.factory.server_connection and
self.factory.server_connection.transport.connected)
server_pid = self.factory.portal.server_process_id
portal_pid = os.getpid()
if server_connected:
return {"status": amp.dumps((True, True, portal_pid, server_pid))}
else:
return {"status": amp.dumps((True, False, portal_pid, server_pid))}
return {"status": amp.dumps(self.get_status())}
@amp.MsgLauncher2Portal.responder
@amp.catch_traceback
@ -213,58 +274,55 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
launcher. It can obviously only accessed when the Portal is already up and running.
"""
def _retval(success, txt):
return {"result": amp.dumps((success, txt))}
self.factory.launcher_connection = self
server_connected = (self.factory.server_connection and
self.factory.server_connection.transport.connected)
server_pid = self.factory.portal.server_process_id
_, server_connected, _, _ = self.get_status()
logger.log_msg("AMP SERVER operation == %s received" % (ord(operation)))
logger.log_msg("AMP SERVER arguments: %s" % (amp.loads(arguments)))
if operation == amp.SSTART: # portal start #15
# first, check if server is already running
if server_connected:
return _retval(False,
"Server already running at PID={spid}".format(spid=server_pid))
else:
spid = self.start_server(amp.loads(arguments))
return _retval(True, "Server started with PID {spid}.".format(spid=spid))
if not server_connected:
self.wait_for_server_connect(self.send_Status2Launcher)
self.start_server(amp.loads(arguments))
elif operation == amp.SRELOAD: # reload server #14
if server_connected:
# don't restart until the server connection goes down
# We let the launcher restart us once they get the signal
self.factory.server_connection.wait_for_disconnect(
self.send_Status2Launcher)
self.stop_server(mode='reload')
else:
spid = self.start_server(amp.loads(arguments))
return _retval(True, "Server started with PID {spid}.".format(spid=spid))
self.wait_for_server_connect(self.send_Status2Launcher)
self.start_server(amp.loads(arguments))
elif operation == amp.SRESET: # reload server #19
if server_connected:
self.factory.server_connection.wait_for_disconnect(
self.send_Status2Launcher)
self.stop_server(mode='reset')
return _retval(True, "Server restarted with PID {spid}.".format(spid=spid))
else:
spid = self.start_server(amp.loads(arguments))
return _retval(True, "Server started with PID {spid}.".format(spid=spid))
self.wait_for_server_connect(self.send_Status2Launcher)
self.start_server(amp.loads(arguments))
elif operation == amp.SSHUTD: # server-only shutdown #17
if server_connected:
self.factory.server_connection.wait_for_disconnect(
self.send_Status2Launcher)
self.stop_server(mode='shutdown')
return _retval(True, "Server stopped.")
else:
return _retval(False, "Server not running")
elif operation == amp.PSHUTD: # portal + server shutdown #16
if server_connected:
self.stop_server(mode='shutdown')
return _retval(True, "Server stopped.")
self.factory.portal.shutdown(restart=False)
self.factory.server_connection.wait_for_disconnect(
self.factory.portal.shutdown, restart=False)
else:
self.factory.portal.shutdown(restart=False)
else:
raise Exception("operation %(op)s not recognized." % {'op': operation})
# fallback
return {"result": ""}
return {}
@amp.MsgServer2Portal.responder
@amp.catch_traceback
@ -295,13 +353,12 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
packed_data (str): Data received, a pickled tuple (sessid, kwargs).
"""
self.factory.server_connection = self
sessid, kwargs = self.data_in(packed_data)
operation = kwargs.pop("operation")
portal_sessionhandler = self.factory.portal.sessions
# store this transport since we know it comes from the Server
self.factory.server_connection = self
if operation == amp.SLOGIN: # server_session_login
# a session has authenticated; sync it.
session = portal_sessionhandler.get(sessid)
@ -344,11 +401,23 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol):
sessiondata=sessdata)
self.factory.portal.sessions.at_server_connection()
print("Portal PSYNC: %s" % self.factory.server_connection)
if self.factory.server_connection:
# this is an indication the server has successfully connected, so
# we trigger any callbacks (usually to tell the launcher server is up)
for callback, args, kwargs in self.factory.server_connect_callbacks:
try:
callback(*args, **kwargs)
except Exception:
logger.log_trace()
self.factory.server_connect_callbacks = []
elif operation == amp.SSYNC: # server_session_sync
# server wants to save session data to the portal,
# maybe because it's about to shut down.
portal_sessionhandler.server_session_sync(kwargs.get("sessiondata"),
kwargs.get("clean", True))
# set a flag in case we are about to shut down soon
self.factory.server_restart_mode = True