diff --git a/evennia/server/amp_client.py b/evennia/server/amp_client.py index b490d8393b..70ec00a3a7 100644 --- a/evennia/server/amp_client.py +++ b/evennia/server/amp_client.py @@ -106,6 +106,26 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol): # back with the Server side self.send_AdminServer2Portal(amp.DUMMYSESSION, operation=amp.PSYNC) + def data_to_portal(self, command, sessid, **kwargs): + """ + Send data across the wire to the Portal + + Args: + command (AMP Command): A protocol send command. + sessid (int): A unique Session id. + kwargs (any): Any data to pickle into the command. + + Returns: + deferred (deferred or None): A deferred with an errback. + + Notes: + Data will be sent across the wire pickled as a tuple + (sessid, kwargs). + + """ + return self.callRemote(command, packed_data=amp.dumps((sessid, kwargs))).addErrback( + self.errback, command.key) + def send_MsgServer2Portal(self, session, **kwargs): """ Access method - executed on the Server for sending data @@ -116,7 +136,7 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol): kwargs (any, optiona): Extra data. """ - return self.data_out(amp.MsgServer2Portal, session.sessid, **kwargs) + return self.data_to_portal(amp.MsgServer2Portal, session.sessid, **kwargs) def send_AdminServer2Portal(self, session, operation="", **kwargs): """ @@ -131,7 +151,8 @@ class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol): kwargs (dict, optional): Data going into the adminstrative. """ - return self.data_out(amp.AdminServer2Portal, session.sessid, operation=operation, **kwargs) + return self.data_to_portal(amp.AdminServer2Portal, session.sessid, + operation=operation, **kwargs) # receiving AMP data diff --git a/evennia/server/evennia_launcher.py b/evennia/server/evennia_launcher.py index c2d29ad961..b90c43b1f0 100644 --- a/evennia/server/evennia_launcher.py +++ b/evennia/server/evennia_launcher.py @@ -19,7 +19,7 @@ import shutil import importlib from distutils.version import LooseVersion from argparse import ArgumentParser -from subprocess import Popen, check_output, call, CalledProcessError, STDOUT, PIPE +from subprocess import Popen, check_output, call, CalledProcessError, STDOUT try: import cPickle as pickle @@ -57,9 +57,6 @@ CURRENT_DIR = os.getcwd() GAMEDIR = CURRENT_DIR # Operational setup -AMP_PORT = None -AMP_HOST = None -AMP_INTERFACE = None SERVER_LOGFILE = None PORTAL_LOGFILE = None @@ -81,6 +78,9 @@ ENFORCED_SETTING = False # communication constants +AMP_PORT = None +AMP_HOST = None +AMP_INTERFACE = None AMP_CONNECTION = None SRELOAD = chr(14) # server reloading (have portal start a new server) @@ -461,7 +461,35 @@ class MsgLauncher2Portal(amp.Command): arguments = [('operation', amp.String()), ('arguments', amp.String())] errors = {Exception: 'EXCEPTION'} - response = [('result', amp.String())] + response = [] + + +class AMPLauncherProtocol(amp.AMP): + """ + Defines callbacks to the launcher + + """ + def __init__(self): + self.on_status = [] + + def wait_for_status(self, callback): + """ + Register a waiter for a status return. + + """ + self.on_status.append(callback) + + @MsgStatus.responder + def receive_status_from_portal(self, status): + """ + Get a status signal from portal - fire callbacks + + """ + status = pickle.loads(status) + for callback in self.on_status: + callback(status) + self.on_status = [] + return {"status": ""} def send_instruction(operation, arguments, callback=None, errback=None): @@ -475,39 +503,54 @@ def send_instruction(operation, arguments, callback=None, errback=None): print(ERROR_AMP_UNCONFIGURED) sys.exit() + def _timeout(*args): + print("Client timed out.") + reactor.stop() + + def _callback(result): + if callback: + callback(result) + # prot.transport.loseConnection() + + def _errback(fail): + if errback: + errback(fail) + # prot.transport.loseConnection() + def _on_connect(prot): """ This fires with the protocol when connection is established. We - immediately send off the instruction then shut down. + immediately send off the instruction """ - def _callback(result): - if callback: - callback(result) - prot.transport.loseConnection() - - def _errback(fail): - if errback: - errback(fail) - prot.transport.loseConnection() - - if operation == PSTATUS: - prot.callRemote(MsgStatus, status="").addCallbacks(_callback, _errback) - else: - prot.callRemote( - MsgLauncher2Portal, - operation=operation, - arguments=pickle.dumps(arguments, pickle.HIGHEST_PROTOCOL)).addCallbacks( - _callback, _errback) + global AMP_CONNECTION + AMP_CONNECTION = prot + _send() def _on_connect_fail(fail): "This is called if portal is not reachable." errback(fail) - point = endpoints.TCP4ClientEndpoint(reactor, AMP_HOST, AMP_PORT) - deferred = endpoints.connectProtocol(point, amp.AMP()) - deferred.addCallbacks(_on_connect, _on_connect_fail) - return deferred + def _send(): + if operation == PSTATUS: + return AMP_CONNECTION.callRemote(MsgStatus, status="").addCallbacks(_callback, _errback) + else: + return AMP_CONNECTION.callRemote( + MsgLauncher2Portal, + operation=operation, + arguments=pickle.dumps(arguments, pickle.HIGHEST_PROTOCOL)).addCallbacks( + _callback, _errback) + + if AMP_CONNECTION: + # already connected - send right away + _send() + else: + # we must connect first, send once connected + point = endpoints.TCP4ClientEndpoint(reactor, AMP_HOST, AMP_PORT) + deferred = endpoints.connectProtocol(point, AMPLauncherProtocol()) + deferred.addCallbacks(_on_connect, _on_connect_fail) + if not reactor.running: + reactor.run() def _parse_status(response): @@ -541,7 +584,6 @@ def _get_twistd_cmdline(pprofiler, sprofiler): "--profiler=cprofiler", "--profile={}".format(SPROFILER_LOGFILE)]) - return portal_cmd, server_cmd @@ -565,7 +607,16 @@ def query_status(repeat=False): reactor.stop() send_instruction(PSTATUS, None, _callback, _errback) - reactor.run() + + +def wait_for_status_reply(callback): + """ + Wait for an explicit STATUS signal to be sent back from Evennia. + """ + if AMP_CONNECTION: + AMP_CONNECTION.wait_for_status(callback) + else: + print("No Evennia connection established.") def wait_for_status(portal_running=True, server_running=True, callback=None, errback=None, @@ -587,7 +638,7 @@ def wait_for_status(portal_running=True, server_running=True, callback=None, err def _callback(response): prun, srun, _, _ = _parse_status(response) if ((portal_running is None or prun == portal_running) and - (server_running is None or srun == server_running)): + (server_running is None or srun == server_running)): # the correct state was achieved if callback: callback(prun, srun) @@ -653,7 +704,8 @@ def start_evennia(pprofiler=False, sprofiler=False): reactor.stop() def _portal_started(*args): - send_instruction(SSTART, server_cmd, _server_started) + wait_for_status_reply(_server_started) + send_instruction(SSTART, server_cmd) def _portal_running(response): prun, srun, ppid, spid = _parse_status(response) @@ -675,12 +727,14 @@ def start_evennia(pprofiler=False, sprofiler=False): wait_for_status(True, None, _portal_started) send_instruction(PSTATUS, None, _portal_running, _portal_not_running) - reactor.run() def reload_evennia(sprofiler=False, reset=False): """ - This will instruct the Portal to reboot the Server component. + This will instruct the Portal to reboot the Server component. We + do this manually by telling the server to shutdown (in reload mode) + and wait for the portal to report back, at which point we start the + server again. This way we control the process exactly. """ _, server_cmd = _get_twistd_cmdline(False, sprofiler) @@ -689,23 +743,24 @@ def reload_evennia(sprofiler=False, reset=False): print("... Server re-started.") reactor.stop() - def _server_reloaded(*args): - print("... Server {}.".format("reset" if reset else "reloaded")) + def _server_reloaded(status): + print("{} ... Server {}.".format(status, "reset" if reset else "reloaded")) reactor.stop() - def _server_not_running(*args): + def _server_stopped(status): + wait_for_status_reply(_server_reloaded) send_instruction(SSTART, server_cmd) - wait_for_status(True, True, _server_reloaded) def _portal_running(response): _, srun, _, _ = _parse_status(response) if srun: print("Server {}...".format("resetting" if reset else "reloading")) - send_instruction(SRESET if reset else SRELOAD, server_cmd) - wait_for_status(True, False, _server_not_running) + wait_for_status_reply(_server_stopped) + send_instruction(SRESET if reset else SRELOAD, {}) else: print("Server down. Re-starting ...") - send_instruction(SSTART, server_cmd, _server_restarted) + wait_for_status_reply(_server_restarted) + send_instruction(SSTART, server_cmd) def _portal_not_running(fail): print("Evennia not running. Starting from scratch ...") @@ -713,7 +768,6 @@ def reload_evennia(sprofiler=False, reset=False): # get portal status send_instruction(PSTATUS, None, _portal_running, _portal_not_running) - reactor.run() def stop_evennia(): @@ -735,18 +789,17 @@ def stop_evennia(): if srun: print("Server stopping ...") send_instruction(SSHUTD, {}) - wait_for_status(True, False, _server_stopped) + wait_for_status_reply(_server_stopped) else: print("Server already stopped.\nStopping Portal ...") send_instruction(PSHUTD, {}) - wait_for_status(False, False, _portal_stopped) + wait_for_status(False, None, _portal_stopped) def _portal_not_running(fail): print("Evennia is not running.") reactor.stop() send_instruction(PSTATUS, None, _portal_running, _portal_not_running) - reactor.run() def stop_server_only(): @@ -762,8 +815,8 @@ def stop_server_only(): _, srun, _, _ = _parse_status(response) if srun: print("Server stopping ...") + wait_for_status_reply(_server_stopped) send_instruction(SSHUTD, {}) - wait_for_status(True, False, _server_stopped) else: print("Server is not running.") @@ -771,7 +824,6 @@ def stop_server_only(): print("Evennia is not running.") send_instruction(PSTATUS, None, _portal_running, _portal_not_running) - reactor.run() def evennia_version(): diff --git a/evennia/server/portal/amp.py b/evennia/server/portal/amp.py index 6da13905c2..2c1d28bc1f 100644 --- a/evennia/server/portal/amp.py +++ b/evennia/server/portal/amp.py @@ -155,7 +155,7 @@ class MsgLauncher2Portal(amp.Command): arguments = [('operation', amp.String()), ('arguments', amp.String())] errors = {Exception: 'EXCEPTION'} - response = [('result', amp.String())] + response = [] class MsgPortal2Server(amp.Command): @@ -335,9 +335,9 @@ class AMPMultiConnectionProtocol(amp.AMP): """ return loads(packed_data) - def data_out(self, command, sessid, **kwargs): + def broadcast(self, command, sessid, **kwargs): """ - Send data across the wire. Always use this to send. + Send data across the wire to all connections. Args: command (AMP Command): A protocol send command. @@ -353,9 +353,9 @@ class AMPMultiConnectionProtocol(amp.AMP): """ deferreds = [] for protcl in self.factory.broadcasts: - deferreds.append(protcl.callRemote(command, - packed_data=dumps((sessid, kwargs))).addErrback( - self.errback, command.key)) + deferreds.append(protcl.callRemote(command, **kwargs).addErrback( + self.errback, command.key)) + return DeferredList(deferreds) # generic function send/recvs diff --git a/evennia/server/portal/amp_server.py b/evennia/server/portal/amp_server.py index 024454fc7a..8e49bca263 100644 --- a/evennia/server/portal/amp_server.py +++ b/evennia/server/portal/amp_server.py @@ -49,7 +49,9 @@ class AMPServerFactory(protocol.ServerFactory): self.protocol = AMPServerProtocol self.broadcasts = [] self.server_connection = None + self.launcher_connection = None self.disconnect_callbacks = {} + self.server_connect_callbacks = [] def buildProtocol(self, addr): """ @@ -72,12 +74,18 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): Protocol subclass for the AMP-server run by the Portal. """ - def connectionLost(self, reason): """ Set up a simple callback mechanism to let the amp-server wait for a connection to close. """ + # wipe broadcast and data memory + super(AMPServerProtocol, self).connectionLost(reason) + if self.factory.server_connection == self: + self.factory.server_connection = None + if self.factory.launcher_connection == self: + self.factory.launcher_connection = None + callback, args, kwargs = self.factory.disconnect_callbacks.pop(self, (None, None, None)) if callback: try: @@ -85,6 +93,45 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): except Exception: logger.log_trace() + def get_status(self): + """ + Return status for the Evennia infrastructure. + + Returns: + status (tuple): The portal/server status and pids + (portal_live, server_live, portal_PID, server_PID). + + """ + server_connected = bool(self.factory.server_connection and + self.factory.server_connection.transport.connected) + server_pid = self.factory.portal.server_process_id + portal_pid = os.getpid() + return (True, server_connected, portal_pid, server_pid) + + def data_to_server(self, command, sessid, **kwargs): + """ + Send data across the wire to the Server. + + Args: + command (AMP Command): A protocol send command. + sessid (int): A unique Session id. + + Returns: + deferred (deferred or None): A deferred with an errback. + + Notes: + Data will be sent across the wire pickled as a tuple + (sessid, kwargs). + + """ + if self.factory.server_connection: + return self.factory.server_connection.callRemote( + command, packed_data=amp.dumps((sessid, kwargs))).addErrback( + self.errback, command.key) + else: + # if no server connection is available, broadcast + return self.broadcast(command, sessid, packed_data=amp.dumps((sessid, kwargs))) + def start_server(self, server_twistd_cmd): """ (Re-)Launch the Evennia server. @@ -122,6 +169,17 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): """ self.factory.disconnect_callbacks[self] = (callback, args, kwargs) + def wait_for_server_connect(self, callback, *args, **kwargs): + """ + Add a callback for when the Server is sure to have connected. + + Args: + callback (callable): Will be called with *args, **kwargs + once the Server handshake with Portal is complete. + + """ + self.factory.server_connect_callbacks.append((callback, args, kwargs)) + def stop_server(self, mode='shutdown'): """ Shut down server in one or more modes. @@ -139,6 +197,17 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): # sending amp data + def send_Status2Launcher(self): + """ + Send a status stanza to the launcher. + + """ + if self.factory.launcher_connection: + self.factory.launcher_connection.callRemote( + amp.MsgStatus, + status=amp.dumps(self.get_status())).addErrback( + self.errback, amp.MsgStatus.key) + def send_MsgPortal2Server(self, session, **kwargs): """ Access method called by the Portal and executed on the Portal. @@ -151,7 +220,7 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): deferred (Deferred): Asynchronous return. """ - return self.data_out(amp.MsgPortal2Server, session.sessid, **kwargs) + return self.data_to_server(amp.MsgPortal2Server, session.sessid, **kwargs) def send_AdminPortal2Server(self, session, operation="", **kwargs): """ @@ -166,7 +235,8 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): data (str or dict, optional): Data used in the administrative operation. """ - return self.data_out(amp.AdminPortal2Server, session.sessid, operation=operation, **kwargs) + return self.data_to_server(amp.AdminPortal2Server, session.sessid, + operation=operation, **kwargs) # receive amp data @@ -183,16 +253,7 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): (portal_running, server_running, portal_pid, server_pid). """ - # check if the server is connected - server_connected = (self.factory.server_connection and - self.factory.server_connection.transport.connected) - server_pid = self.factory.portal.server_process_id - portal_pid = os.getpid() - - if server_connected: - return {"status": amp.dumps((True, True, portal_pid, server_pid))} - else: - return {"status": amp.dumps((True, False, portal_pid, server_pid))} + return {"status": amp.dumps(self.get_status())} @amp.MsgLauncher2Portal.responder @amp.catch_traceback @@ -213,58 +274,55 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): launcher. It can obviously only accessed when the Portal is already up and running. """ - def _retval(success, txt): - return {"result": amp.dumps((success, txt))} + self.factory.launcher_connection = self - server_connected = (self.factory.server_connection and - self.factory.server_connection.transport.connected) - server_pid = self.factory.portal.server_process_id + _, server_connected, _, _ = self.get_status() logger.log_msg("AMP SERVER operation == %s received" % (ord(operation))) logger.log_msg("AMP SERVER arguments: %s" % (amp.loads(arguments))) if operation == amp.SSTART: # portal start #15 # first, check if server is already running - if server_connected: - return _retval(False, - "Server already running at PID={spid}".format(spid=server_pid)) - else: - spid = self.start_server(amp.loads(arguments)) - return _retval(True, "Server started with PID {spid}.".format(spid=spid)) + if not server_connected: + self.wait_for_server_connect(self.send_Status2Launcher) + self.start_server(amp.loads(arguments)) elif operation == amp.SRELOAD: # reload server #14 if server_connected: - # don't restart until the server connection goes down + # We let the launcher restart us once they get the signal + self.factory.server_connection.wait_for_disconnect( + self.send_Status2Launcher) self.stop_server(mode='reload') else: - spid = self.start_server(amp.loads(arguments)) - return _retval(True, "Server started with PID {spid}.".format(spid=spid)) + self.wait_for_server_connect(self.send_Status2Launcher) + self.start_server(amp.loads(arguments)) elif operation == amp.SRESET: # reload server #19 if server_connected: + self.factory.server_connection.wait_for_disconnect( + self.send_Status2Launcher) self.stop_server(mode='reset') - return _retval(True, "Server restarted with PID {spid}.".format(spid=spid)) else: - spid = self.start_server(amp.loads(arguments)) - return _retval(True, "Server started with PID {spid}.".format(spid=spid)) + self.wait_for_server_connect(self.send_Status2Launcher) + self.start_server(amp.loads(arguments)) elif operation == amp.SSHUTD: # server-only shutdown #17 if server_connected: + self.factory.server_connection.wait_for_disconnect( + self.send_Status2Launcher) self.stop_server(mode='shutdown') - return _retval(True, "Server stopped.") - else: - return _retval(False, "Server not running") elif operation == amp.PSHUTD: # portal + server shutdown #16 if server_connected: - self.stop_server(mode='shutdown') - return _retval(True, "Server stopped.") - self.factory.portal.shutdown(restart=False) + self.factory.server_connection.wait_for_disconnect( + self.factory.portal.shutdown, restart=False) + else: + self.factory.portal.shutdown(restart=False) else: raise Exception("operation %(op)s not recognized." % {'op': operation}) - # fallback - return {"result": ""} + + return {} @amp.MsgServer2Portal.responder @amp.catch_traceback @@ -295,13 +353,12 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): packed_data (str): Data received, a pickled tuple (sessid, kwargs). """ + self.factory.server_connection = self + sessid, kwargs = self.data_in(packed_data) operation = kwargs.pop("operation") portal_sessionhandler = self.factory.portal.sessions - # store this transport since we know it comes from the Server - self.factory.server_connection = self - if operation == amp.SLOGIN: # server_session_login # a session has authenticated; sync it. session = portal_sessionhandler.get(sessid) @@ -344,11 +401,23 @@ class AMPServerProtocol(amp.AMPMultiConnectionProtocol): sessiondata=sessdata) self.factory.portal.sessions.at_server_connection() + print("Portal PSYNC: %s" % self.factory.server_connection) + if self.factory.server_connection: + # this is an indication the server has successfully connected, so + # we trigger any callbacks (usually to tell the launcher server is up) + for callback, args, kwargs in self.factory.server_connect_callbacks: + try: + callback(*args, **kwargs) + except Exception: + logger.log_trace() + self.factory.server_connect_callbacks = [] + elif operation == amp.SSYNC: # server_session_sync # server wants to save session data to the portal, # maybe because it's about to shut down. portal_sessionhandler.server_session_sync(kwargs.get("sessiondata"), kwargs.get("clean", True)) + # set a flag in case we are about to shut down soon self.factory.server_restart_mode = True