From 7001090b78dc5a26cf8da85e173891c2f91a2a84 Mon Sep 17 00:00:00 2001 From: Griatch Date: Thu, 11 Jan 2018 23:43:58 +0100 Subject: [PATCH] Refactor amp into three modules, separating clients/server better --- evennia/server/amp.py | 701 ---------------------------- evennia/server/amp_client.py | 185 ++++++++ evennia/server/portal/amp.py | 353 ++++++++++++++ evennia/server/portal/amp_server.py | 168 +++++++ evennia/server/portal/portal.py | 6 +- evennia/server/server.py | 4 +- 6 files changed, 711 insertions(+), 706 deletions(-) delete mode 100644 evennia/server/amp.py create mode 100644 evennia/server/amp_client.py create mode 100644 evennia/server/portal/amp.py create mode 100644 evennia/server/portal/amp_server.py diff --git a/evennia/server/amp.py b/evennia/server/amp.py deleted file mode 100644 index 2774958a5a..0000000000 --- a/evennia/server/amp.py +++ /dev/null @@ -1,701 +0,0 @@ -""" -Contains the protocols, commands, and client factory needed for the Server -and Portal to communicate with each other, letting Portal work as a proxy. -Both sides use this same protocol. - -The separation works like this: - -Portal - (AMP client) handles protocols. It contains a list of connected - sessions in a dictionary for identifying the respective account - connected. If it loses the AMP connection it will automatically - try to reconnect. - -Server - (AMP server) Handles all mud operations. The server holds its own list - of sessions tied to account objects. This is synced against the portal - at startup and when a session connects/disconnects - -""" -from __future__ import print_function - -# imports needed on both server and portal side -import os -import time -from collections import defaultdict, namedtuple -from itertools import count -from cStringIO import StringIO -try: - import cPickle as pickle -except ImportError: - import pickle -from twisted.protocols import amp -from twisted.internet import protocol -from twisted.internet.defer import Deferred, DeferredList -from evennia.utils import logger -from evennia.utils.utils import to_str, variable_from_module -import zlib # Used in Compressed class - -DUMMYSESSION = namedtuple('DummySession', ['sessid'])(0) - -# communication bits -# (chr(9) and chr(10) are \t and \n, so skipping them) - -PCONN = chr(1) # portal session connect -PDISCONN = chr(2) # portal session disconnect -PSYNC = chr(3) # portal session sync -SLOGIN = chr(4) # server session login -SDISCONN = chr(5) # server session disconnect -SDISCONNALL = chr(6) # server session disconnect all -SSHUTD = chr(7) # server shutdown (shutdown portal too) -SSYNC = chr(8) # server session sync -SCONN = chr(11) # server creating new connection (for irc bots and etc) -PCONNSYNC = chr(12) # portal post-syncing a session -PDISCONNALL = chr(13) # portal session disconnect all -SRELOAD = chr(14) # server reloading (have portal start a new server) - -AMP_MAXLEN = amp.MAX_VALUE_LENGTH # max allowed data length in AMP protocol (cannot be changed) - -BATCH_RATE = 250 # max commands/sec before switching to batch-sending -BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds - -# buffers -_SENDBATCH = defaultdict(list) -_MSGBUFFER = defaultdict(list) - - -_HTTP_WARNING = """ -HTTP/1.1 200 OK -Content-Type: text/html - - -This is Evennia's interal AMP port. It handles communication -between Evennia's different processes.

This port should NOT be -publicly visible.

-""".strip() - - -def get_restart_mode(restart_file): - """ - Parse the server/portal restart status - - Args: - restart_file (str): Path to restart.dat file. - - Returns: - restart_mode (bool): If the file indicates the server is in - restart mode or not. - - """ - if os.path.exists(restart_file): - flag = open(restart_file, 'r').read() - return flag == "True" - return False - - -class AmpServerFactory(protocol.ServerFactory): - """ - This factory creates AMP Server instance. This - is meant to sit on the Evennia Portal service. - """ - noisy = False - - def __init__(self, portal): - """ - Initialize the factory. - - Args: - portal (Portal): The Evennia Portal service instance. - protocol (Protocol): The protocol the factory creates - instances of. - - """ - self.portal = portal - self.protocol = AMPProtocol - self.broadcasts = [] - - def buildProtocol(self, addr): - """ - Start a new connection, and store it on the service object. - - Args: - addr (str): Connection address. Not used. - - Returns: - protocol (Protocol): The created protocol. - - """ - self.portal.amp_protocol = AMPProtocol() - self.portal.amp_protocol.factory = self - return self.portal.amp_protocol - - -_AMP_TRANSPORTS = [] - - -class AmpClientFactory(protocol.ReconnectingClientFactory): - """ - This factory creates an instance of an AMP client. This - is intended to be the Evennia 'Server' service. - - """ - # Initial reconnect delay in seconds. - initialDelay = 1 - factor = 1.5 - maxDelay = 1 - noisy = False - - def __init__(self, server): - """ - Initializes the client factory. - - Args: - server (server): server instance. - - """ - self.server = server - self.protocol = AMPProtocol - self.maxDelay = 10 - # not really used unless connecting to multiple servers, but - # avoids having to check for its existence on the protocol - self.broadcasts = [] - - def startedConnecting(self, connector): - """ - Called when starting to try to connect to the MUD server. - - Args: - connector (Connector): Twisted Connector instance representing - this connection. - - """ - pass - - def buildProtocol(self, addr): - """ - Creates an AMPProtocol instance when connecting to the server. - - Args: - addr (str): Connection address. Not used. - - """ - self.resetDelay() - self.server.amp_protocol = AMPProtocol() - self.server.amp_protocol.factory = self - return self.server.amp_protocol - - def clientConnectionLost(self, connector, reason): - """ - Called when the AMP connection to the MUD server is lost. - - Args: - connector (Connector): Twisted Connector instance representing - this connection. - reason (str): Eventual text describing why connection was lost. - - """ - logger.log_info("Server lost connection to the Portal. Reconnecting ...") - protocol.ReconnectingClientFactory.clientConnectionLost(self, connector, reason) - - def clientConnectionFailed(self, connector, reason): - """ - Called when an AMP connection attempt to the MUD server fails. - - Args: - connector (Connector): Twisted Connector instance representing - this connection. - reason (str): Eventual text describing why connection failed. - - """ - logger.log_info("Attempting to reconnect to Portal ...") - protocol.ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) - - -# AMP Communication Command types - -class Compressed(amp.String): - """ - This is a customn AMP command Argument that both handles too-long - sends as well as uses zlib for compression across the wire. The - batch-grouping of too-long sends is borrowed from the "mediumbox" - recipy at twisted-hacks's ~glyph/+junk/amphacks/mediumbox. - - """ - - def fromBox(self, name, strings, objects, proto): - """ - Converts from box representation to python. We - group very long data into batches. - """ - value = StringIO() - value.write(strings.get(name)) - for counter in count(2): - # count from 2 upwards - chunk = strings.get("%s.%d" % (name, counter)) - if chunk is None: - break - value.write(chunk) - objects[name] = value.getvalue() - - def toBox(self, name, strings, objects, proto): - """ - Convert from data to box. We handled too-long - batched data and put it together here. - """ - value = StringIO(objects[name]) - strings[name] = value.read(AMP_MAXLEN) - for counter in count(2): - chunk = value.read(AMP_MAXLEN) - if not chunk: - break - strings["%s.%d" % (name, counter)] = chunk - - def toString(self, inObject): - """ - Convert to send on the wire, with compression. - """ - return zlib.compress(inObject, 9) - - def fromString(self, inString): - """ - Convert (decompress) from the wire to Python. - """ - return zlib.decompress(inString) - - -class MsgPortal2Server(amp.Command): - """ - Message Portal -> Server - - """ - key = "MsgPortal2Server" - arguments = [('packed_data', Compressed())] - errors = {Exception: 'EXCEPTION'} - response = [] - - -class MsgServer2Portal(amp.Command): - """ - Message Server -> Portal - - """ - key = "MsgServer2Portal" - arguments = [('packed_data', Compressed())] - errors = {Exception: 'EXCEPTION'} - response = [] - - -class AdminPortal2Server(amp.Command): - """ - Administration Portal -> Server - - Sent when the portal needs to perform admin operations on the - server, such as when a new session connects or resyncs - - """ - key = "AdminPortal2Server" - arguments = [('packed_data', Compressed())] - errors = {Exception: 'EXCEPTION'} - response = [] - - -class AdminServer2Portal(amp.Command): - """ - Administration Server -> Portal - - Sent when the server needs to perform admin operations on the - portal. - - """ - key = "AdminServer2Portal" - arguments = [('packed_data', Compressed())] - errors = {Exception: 'EXCEPTION'} - response = [] - - -class FunctionCall(amp.Command): - """ - Bidirectional Server <-> Portal - - Sent when either process needs to call an arbitrary function in - the other. This does not use the batch-send functionality. - - """ - key = "FunctionCall" - arguments = [('module', amp.String()), - ('function', amp.String()), - ('args', amp.String()), - ('kwargs', amp.String())] - errors = {Exception: 'EXCEPTION'} - response = [('result', amp.String())] - - -# Helper functions for pickling. - -def dumps(data): - return to_str(pickle.dumps(to_str(data), pickle.HIGHEST_PROTOCOL)) - - -def loads(data): - return pickle.loads(to_str(data)) - - -def cmdline_input(data): - print("cmdline_input received:\n %s" % data) - - -# ------------------------------------------------------------- -# Core AMP protocol for communication Server <-> Portal -# ------------------------------------------------------------- - -class AMPProtocol(amp.AMP): - """ - This is the protocol that the MUD server and the proxy server - communicate to each other with. AMP is a bi-directional protocol, - so both the proxy and the MUD use the same commands and protocol. - - AMP specifies responder methods here and connect them to - amp.Command subclasses that specify the datatypes of the - input/output of these methods. - - This version of the protocol is a broadcast-version: it can - accept multiple connections and will broadcast to all of them. - IT will also correctly intercept non-AMP messages to avoid them - interrupting the connection. - - """ - - # helper methods - - def __init__(self, *args, **kwargs): - """ - Initialize protocol with some things that need to be in place - already before connecting both on portal and server. - - """ - self.send_batch_counter = 0 - self.send_reset_time = time.time() - self.send_mode = True - self.send_task = None - - def dataReceived(self, data): - """ - Handle non-AMP messages, such as HTTP communication. - """ - if data[0] != b'\0': - self.transport.write(_HTTP_WARNING) - self.transport.loseConnection() - else: - super(AMPProtocol, self).dataReceived(data) - - def connectionMade(self): - """ - This is called when an AMP connection is (re-)established - between server and portal. AMP calls it on both sides, so we - need to make sure to only trigger resync from the portal side. - - """ - self.factory.broadcasts.append(self) - if hasattr(self.factory, "portal"): - # only the portal has the 'portal' property, so we know we are - # on the portal side and can initialize the connection. - sessdata = self.factory.portal.sessions.get_all_sync_data() - self.send_AdminPortal2Server(DUMMYSESSION, - PSYNC, - sessiondata=sessdata) - self.factory.portal.sessions.at_server_connection() - - def connectionLost(self, reason): - """ - We swallow connection errors here. The reason is that during a - normal reload/shutdown there will almost always be cases where - either the portal or server shuts down before a message has - returned its (empty) return, triggering a connectionLost error - that is irrelevant. If a true connection error happens, the - portal will continuously try to reconnect, showing the problem - that way. - """ - self.factory.broadcasts.remove(self) - - # Error handling - - def errback(self, e, info): - """ - Error callback. - Handles errors to avoid dropping connections on server tracebacks. - - Args: - e (Failure): Deferred error instance. - info (str): Error string. - - """ - e.trap(Exception) - logger.log_err("AMP Error for %(info)s: %(e)s" % {'info': info, - 'e': e.getErrorMessage()}) - - def send_data(self, command, sessid, **kwargs): - """ - Send data across the wire. - - Args: - command (AMP Command): A protocol send command. - sessid (int): A unique Session id. - - Returns: - deferred (deferred or None): A deferred with an errback. - - Notes: - Data will be sent across the wire pickled as a tuple - (sessid, kwargs). - - """ - deferreds = [] - for prot in self.factory.broadcasts: - deferreds.append(prot.callRemote(command, - packed_data=dumps((sessid, kwargs)))) - return DeferredList(deferreds, fireOnOneErrback=1).addErrback(self.errback, command.key) - - # Message definition + helper methods to call/create each message type - - # Portal -> Server Msg - - @MsgPortal2Server.responder - def server_receive_msgportal2server(self, packed_data): - """ - Receives message arriving to server. This method is executed - on the Server. - - Args: - packed_data (str): Data to receive (a pickled tuple (sessid,kwargs)) - - """ - sessid, kwargs = loads(packed_data) - session = self.factory.server.sessions.get(sessid, None) - if session: - self.factory.server.sessions.data_in(session, **kwargs) - return {} - - def send_MsgPortal2Server(self, session, **kwargs): - """ - Access method called by the Portal and executed on the Portal. - - Args: - session (session): Session - kwargs (any, optional): Optional data. - - Returns: - deferred (Deferred): Asynchronous return. - - """ - return self.send_data(MsgPortal2Server, session.sessid, **kwargs) - - # Server -> Portal message - - @MsgServer2Portal.responder - def portal_receive_server2portal(self, packed_data): - """ - Receives message arriving to Portal from Server. - This method is executed on the Portal. - - Args: - packed_data (str): Pickled data (sessid, kwargs) coming over the wire. - """ - sessid, kwargs = loads(packed_data) - session = self.factory.portal.sessions.get(sessid, None) - if session: - self.factory.portal.sessions.data_out(session, **kwargs) - return {} - - def send_MsgServer2Portal(self, session, **kwargs): - """ - Access method - executed on the Server for sending data - to Portal. - - Args: - session (Session): Unique Session. - kwargs (any, optiona): Extra data. - - """ - return self.send_data(MsgServer2Portal, session.sessid, **kwargs) - - # Server administration from the Portal side - @AdminPortal2Server.responder - def server_receive_adminportal2server(self, packed_data): - """ - Receives admin data from the Portal (allows the portal to - perform admin operations on the server). This is executed on - the Server. - - Args: - packed_data (str): Incoming, pickled data. - - """ - sessid, kwargs = loads(packed_data) - operation = kwargs.pop("operation", "") - server_sessionhandler = self.factory.server.sessions - - if operation == PCONN: # portal_session_connect - # create a new session and sync it - server_sessionhandler.portal_connect(kwargs.get("sessiondata")) - - elif operation == PCONNSYNC: # portal_session_sync - server_sessionhandler.portal_session_sync(kwargs.get("sessiondata")) - - elif operation == PDISCONN: # portal_session_disconnect - # session closed from portal sid - session = server_sessionhandler.get(sessid) - if session: - server_sessionhandler.portal_disconnect(session) - - elif operation == PDISCONNALL: # portal_disconnect_all - # portal orders all sessions to close - server_sessionhandler.portal_disconnect_all() - - elif operation == PSYNC: # portal_session_sync - # force a resync of sessions when portal reconnects to - # server (e.g. after a server reboot) the data kwarg - # contains a dict {sessid: {arg1:val1,...}} - # representing the attributes to sync for each - # session. - server_sessionhandler.portal_sessions_sync(kwargs.get("sessiondata")) - else: - raise Exception("operation %(op)s not recognized." % {'op': operation}) - return {} - - def send_AdminPortal2Server(self, session, operation="", **kwargs): - """ - Send Admin instructions from the Portal to the Server. - Executed - on the Portal. - - Args: - session (Session): Session. - operation (char, optional): Identifier for the server operation, as defined by the - global variables in `evennia/server/amp.py`. - data (str or dict, optional): Data used in the administrative operation. - - """ - return self.send_data(AdminPortal2Server, session.sessid, operation=operation, **kwargs) - - # Portal administration from the Server side - - @AdminServer2Portal.responder - def portal_receive_adminserver2portal(self, packed_data): - """ - - Receives and handles admin operations sent to the Portal - This is executed on the Portal. - - Args: - packed_data (str): Data received, a pickled tuple (sessid, kwargs). - - """ - sessid, kwargs = loads(packed_data) - operation = kwargs.pop("operation") - portal_sessionhandler = self.factory.portal.sessions - - if operation == SLOGIN: # server_session_login - # a session has authenticated; sync it. - session = portal_sessionhandler.get(sessid) - if session: - portal_sessionhandler.server_logged_in(session, kwargs.get("sessiondata")) - - elif operation == SDISCONN: # server_session_disconnect - # the server is ordering to disconnect the session - session = portal_sessionhandler.get(sessid) - if session: - portal_sessionhandler.server_disconnect(session, reason=kwargs.get("reason")) - - elif operation == SDISCONNALL: # server_session_disconnect_all - # server orders all sessions to disconnect - portal_sessionhandler.server_disconnect_all(reason=kwargs.get("reason")) - - elif operation == SSHUTD: # server_shutdown - # the server orders the portal to shut down - self.factory.portal.shutdown(restart=False) - - elif operation == SRELOAD: # server reload - self.factory.portal.server_reload(**kwargs) - - elif operation == SSYNC: # server_session_sync - # server wants to save session data to the portal, - # maybe because it's about to shut down. - portal_sessionhandler.server_session_sync(kwargs.get("sessiondata"), - kwargs.get("clean", True)) - # set a flag in case we are about to shut down soon - self.factory.server_restart_mode = True - - elif operation == SCONN: # server_force_connection (for irc/etc) - portal_sessionhandler.server_connect(**kwargs) - - else: - raise Exception("operation %(op)s not recognized." % {'op': operation}) - return {} - - def send_AdminServer2Portal(self, session, operation="", **kwargs): - """ - Administrative access method called by the Server to send an - instruction to the Portal. - - Args: - session (Session): Session. - operation (char, optional): Identifier for the server - operation, as defined by the global variables in - `evennia/server/amp.py`. - data (str or dict, optional): Data going into the adminstrative. - - """ - return self.send_data(AdminServer2Portal, session.sessid, operation=operation, **kwargs) - - # Extra functions - - @FunctionCall.responder - def receive_functioncall(self, module, function, func_args, func_kwargs): - """ - This allows Portal- and Server-process to call an arbitrary - function in the other process. It is intended for use by - plugin modules. - - Args: - module (str or module): The module containing the - `function` to call. - function (str): The name of the function to call in - `module`. - func_args (str): Pickled args tuple for use in `function` call. - func_kwargs (str): Pickled kwargs dict for use in `function` call. - - """ - args = loads(func_args) - kwargs = loads(func_kwargs) - - # call the function (don't catch tracebacks here) - result = variable_from_module(module, function)(*args, **kwargs) - - if isinstance(result, Deferred): - # if result is a deferred, attach handler to properly - # wrap the return value - result.addCallback(lambda r: {"result": dumps(r)}) - return result - else: - return {'result': dumps(result)} - - def send_FunctionCall(self, modulepath, functionname, *args, **kwargs): - """ - Access method called by either process. This will call an arbitrary - function on the other process (On Portal if calling from Server and - vice versa). - - Inputs: - modulepath (str) - python path to module holding function to call - functionname (str) - name of function in given module - *args, **kwargs will be used as arguments/keyword args for the - remote function call - Returns: - A deferred that fires with the return value of the remote - function call - - """ - return self.callRemote(FunctionCall, - module=modulepath, - function=functionname, - args=dumps(args), - kwargs=dumps(kwargs)).addCallback( - lambda r: loads(r["result"])).addErrback(self.errback, "FunctionCall") diff --git a/evennia/server/amp_client.py b/evennia/server/amp_client.py new file mode 100644 index 0000000000..9552ad2122 --- /dev/null +++ b/evennia/server/amp_client.py @@ -0,0 +1,185 @@ +""" +The Evennia Server service acts as an AMP-client when talking to the +Portal. This module sets up the Client-side communication. + +""" + +from evennia.server.portal import amp +from twisted.internet import protocol +from evennia.utils import logger + + +class AMPClientFactory(protocol.ReconnectingClientFactory): + """ + This factory creates an instance of an AMP client connection. This handles communication from + the be the Evennia 'Server' service to the 'Portal'. The client will try to auto-reconnect on a + connection error. + + """ + # Initial reconnect delay in seconds. + initialDelay = 1 + factor = 1.5 + maxDelay = 1 + noisy = False + + def __init__(self, server): + """ + Initializes the client factory. + + Args: + server (server): server instance. + + """ + self.server = server + self.protocol = AMPServerClientProtocol + self.maxDelay = 10 + # not really used unless connecting to multiple servers, but + # avoids having to check for its existence on the protocol + self.broadcasts = [] + + def startedConnecting(self, connector): + """ + Called when starting to try to connect to the MUD server. + + Args: + connector (Connector): Twisted Connector instance representing + this connection. + + """ + pass + + def buildProtocol(self, addr): + """ + Creates an AMPProtocol instance when connecting to the server. + + Args: + addr (str): Connection address. Not used. + + """ + self.resetDelay() + self.server.amp_protocol = AMPServerClientProtocol() + self.server.amp_protocol.factory = self + return self.server.amp_protocol + + def clientConnectionLost(self, connector, reason): + """ + Called when the AMP connection to the MUD server is lost. + + Args: + connector (Connector): Twisted Connector instance representing + this connection. + reason (str): Eventual text describing why connection was lost. + + """ + logger.log_info("Server lost connection to the Portal. Reconnecting ...") + protocol.ReconnectingClientFactory.clientConnectionLost(self, connector, reason) + + def clientConnectionFailed(self, connector, reason): + """ + Called when an AMP connection attempt to the MUD server fails. + + Args: + connector (Connector): Twisted Connector instance representing + this connection. + reason (str): Eventual text describing why connection failed. + + """ + logger.log_info("Attempting to reconnect to Portal ...") + protocol.ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) + + +class AMPServerClientProtocol(amp.AMPMultiConnectionProtocol): + """ + This protocol describes the Server service (acting as an AMP-client)'s communication with the + Portal (which acts as the AMP-server) + + """ + # sending AMP data + + def send_MsgServer2Portal(self, session, **kwargs): + """ + Access method - executed on the Server for sending data + to Portal. + + Args: + session (Session): Unique Session. + kwargs (any, optiona): Extra data. + + """ + return self.data_out(amp.MsgServer2Portal, session.sessid, **kwargs) + + def send_AdminServer2Portal(self, session, operation="", **kwargs): + """ + Administrative access method called by the Server to send an + instruction to the Portal. + + Args: + session (Session): Session. + operation (char, optional): Identifier for the server + operation, as defined by the global variables in + `evennia/server/amp.py`. + data (str or dict, optional): Data going into the adminstrative. + + """ + return self.data_out(amp.AdminServer2Portal, session.sessid, operation=operation, **kwargs) + + # receiving AMP data + + @amp.MsgPortal2Server.responder + def server_receive_msgportal2server(self, packed_data): + """ + Receives message arriving to server. This method is executed + on the Server. + + Args: + packed_data (str): Data to receive (a pickled tuple (sessid,kwargs)) + + """ + sessid, kwargs = self.data_in(packed_data) + session = self.factory.server.sessions.get(sessid, None) + if session: + self.factory.server.sessions.data_in(session, **kwargs) + return {} + + @amp.AdminPortal2Server.responder + def server_receive_adminportal2server(self, packed_data): + """ + Receives admin data from the Portal (allows the portal to + perform admin operations on the server). This is executed on + the Server. + + Args: + packed_data (str): Incoming, pickled data. + + """ + sessid, kwargs = self.data_in(packed_data) + operation = kwargs.pop("operation", "") + server_sessionhandler = self.factory.server.sessions + + if operation == amp.PCONN: # portal_session_connect + # create a new session and sync it + server_sessionhandler.portal_connect(kwargs.get("sessiondata")) + + elif operation == amp.PCONNSYNC: # portal_session_sync + server_sessionhandler.portal_session_sync(kwargs.get("sessiondata")) + + elif operation == amp.PDISCONN: # portal_session_disconnect + # session closed from portal sid + session = server_sessionhandler.get(sessid) + if session: + server_sessionhandler.portal_disconnect(session) + + elif operation == amp.PDISCONNALL: # portal_disconnect_all + # portal orders all sessions to close + server_sessionhandler.portal_disconnect_all() + + elif operation == amp.PSYNC: # portal_session_sync + # force a resync of sessions when portal reconnects to + # server (e.g. after a server reboot) the data kwarg + # contains a dict {sessid: {arg1:val1,...}} + # representing the attributes to sync for each + # session. + server_sessionhandler.portal_sessions_sync(kwargs.get("sessiondata")) + else: + raise Exception("operation %(op)s not recognized." % {'op': operation}) + return {} diff --git a/evennia/server/portal/amp.py b/evennia/server/portal/amp.py new file mode 100644 index 0000000000..9968c22a9e --- /dev/null +++ b/evennia/server/portal/amp.py @@ -0,0 +1,353 @@ +""" +The AMP (Asynchronous Message Protocol)-communication commands and constants used by Evennia. + +This module acts as a central place for AMP-servers and -clients to get commands to use. + +""" +from __future__ import print_function +import time +from twisted.protocols import amp +from collections import defaultdict, namedtuple +from cStringIO import StringIO +from itertools import count +import zlib # Used in Compressed class +try: + import cPickle as pickle +except ImportError: + import pickle + +from twisted.internet.defer import DeferredList, Deferred +from evennia.utils import logger +from evennia.utils.utils import to_str, variable_from_module + + +# communication bits +# (chr(9) and chr(10) are \t and \n, so skipping them) + +PCONN = chr(1) # portal session connect +PDISCONN = chr(2) # portal session disconnect +PSYNC = chr(3) # portal session sync +SLOGIN = chr(4) # server session login +SDISCONN = chr(5) # server session disconnect +SDISCONNALL = chr(6) # server session disconnect all +SSHUTD = chr(7) # server shutdown (shutdown portal too) +SSYNC = chr(8) # server session sync +SCONN = chr(11) # server creating new connection (for irc bots and etc) +PCONNSYNC = chr(12) # portal post-syncing a session +PDISCONNALL = chr(13) # portal session disconnect all +SRELOAD = chr(14) # server reloading (have portal start a new server) + +AMP_MAXLEN = amp.MAX_VALUE_LENGTH # max allowed data length in AMP protocol (cannot be changed) +BATCH_RATE = 250 # max commands/sec before switching to batch-sending +BATCH_TIMEOUT = 0.5 # how often to poll to empty batch queue, in seconds + +# buffers +_SENDBATCH = defaultdict(list) +_MSGBUFFER = defaultdict(list) + +# resources + +DUMMYSESSION = namedtuple('DummySession', ['sessid'])(0) + + +_HTTP_WARNING = """ +HTTP/1.1 200 OK +Content-Type: text/html + + +This is Evennia's interal AMP port. It handles communication +between Evennia's different processes.

This port should NOT be +publicly visible.

+""".strip() + + +# Helper functions for pickling. + +def dumps(data): + return to_str(pickle.dumps(to_str(data), pickle.HIGHEST_PROTOCOL)) + + +def loads(data): + return pickle.loads(to_str(data)) + + +# AMP Communication Command types + +class Compressed(amp.String): + """ + This is a custom AMP command Argument that both handles too-long + sends as well as uses zlib for compression across the wire. The + batch-grouping of too-long sends is borrowed from the "mediumbox" + recipy at twisted-hacks's ~glyph/+junk/amphacks/mediumbox. + + """ + + def fromBox(self, name, strings, objects, proto): + """ + Converts from box representation to python. We + group very long data into batches. + """ + value = StringIO() + value.write(strings.get(name)) + for counter in count(2): + # count from 2 upwards + chunk = strings.get("%s.%d" % (name, counter)) + if chunk is None: + break + value.write(chunk) + objects[name] = value.getvalue() + + def toBox(self, name, strings, objects, proto): + """ + Convert from data to box. We handled too-long + batched data and put it together here. + """ + value = StringIO(objects[name]) + strings[name] = value.read(AMP_MAXLEN) + for counter in count(2): + chunk = value.read(AMP_MAXLEN) + if not chunk: + break + strings["%s.%d" % (name, counter)] = chunk + + def toString(self, inObject): + """ + Convert to send on the wire, with compression. + """ + return zlib.compress(inObject, 9) + + def fromString(self, inString): + """ + Convert (decompress) from the wire to Python. + """ + return zlib.decompress(inString) + + +class MsgPortal2Server(amp.Command): + """ + Message Portal -> Server + + """ + key = "MsgPortal2Server" + arguments = [('packed_data', Compressed())] + errors = {Exception: 'EXCEPTION'} + response = [] + + +class MsgServer2Portal(amp.Command): + """ + Message Server -> Portal + + """ + key = "MsgServer2Portal" + arguments = [('packed_data', Compressed())] + errors = {Exception: 'EXCEPTION'} + response = [] + + +class AdminPortal2Server(amp.Command): + """ + Administration Portal -> Server + + Sent when the portal needs to perform admin operations on the + server, such as when a new session connects or resyncs + + """ + key = "AdminPortal2Server" + arguments = [('packed_data', Compressed())] + errors = {Exception: 'EXCEPTION'} + response = [] + + +class AdminServer2Portal(amp.Command): + """ + Administration Server -> Portal + + Sent when the server needs to perform admin operations on the + portal. + + """ + key = "AdminServer2Portal" + arguments = [('packed_data', Compressed())] + errors = {Exception: 'EXCEPTION'} + response = [] + + +class FunctionCall(amp.Command): + """ + Bidirectional Server <-> Portal + + Sent when either process needs to call an arbitrary function in + the other. This does not use the batch-send functionality. + + """ + key = "FunctionCall" + arguments = [('module', amp.String()), + ('function', amp.String()), + ('args', amp.String()), + ('kwargs', amp.String())] + errors = {Exception: 'EXCEPTION'} + response = [('result', amp.String())] + + +# ------------------------------------------------------------- +# Core AMP protocol for communication Server <-> Portal +# ------------------------------------------------------------- + +class AMPMultiConnectionProtocol(amp.AMP): + """ + AMP protocol that safely handle multiple connections to the same + server without dropping old ones - new clients will receive + all server returns (broadcast). Will also correctly handle + erroneous HTTP requests on the port and return a HTTP error response. + + """ + + # helper methods + + def __init__(self, *args, **kwargs): + """ + Initialize protocol with some things that need to be in place + already before connecting both on portal and server. + + """ + self.send_batch_counter = 0 + self.send_reset_time = time.time() + self.send_mode = True + self.send_task = None + + def dataReceived(self, data): + """ + Handle non-AMP messages, such as HTTP communication. + """ + if data[0] != b'\0': + self.transport.write(_HTTP_WARNING) + self.transport.loseConnection() + else: + super(AMPMultiConnectionProtocol, self).dataReceived(data) + + def connectionMade(self): + """ + This is called when an AMP connection is (re-)established + between server and portal. AMP calls it on both sides, so we + need to make sure to only trigger resync from the portal side. + + """ + self.factory.broadcasts.append(self) + + def connectionLost(self, reason): + """ + We swallow connection errors here. The reason is that during a + normal reload/shutdown there will almost always be cases where + either the portal or server shuts down before a message has + returned its (empty) return, triggering a connectionLost error + that is irrelevant. If a true connection error happens, the + portal will continuously try to reconnect, showing the problem + that way. + """ + self.factory.broadcasts.remove(self) + + # Error handling + + def errback(self, e, info): + """ + Error callback. + Handles errors to avoid dropping connections on server tracebacks. + + Args: + e (Failure): Deferred error instance. + info (str): Error string. + + """ + e.trap(Exception) + logger.log_err("AMP Error for %(info)s: %(e)s" % {'info': info, + 'e': e.getErrorMessage()}) + + def data_in(self, packed_data): + """ + Process incoming packed data. + + Args: + packed_data (bytes): Zip-packed data. + Returns: + unpaced_data (any): Unpacked package + + """ + return loads(packed_data) + + def data_out(self, command, sessid, **kwargs): + """ + Send data across the wire. Always use this to send. + + Args: + command (AMP Command): A protocol send command. + sessid (int): A unique Session id. + + Returns: + deferred (deferred or None): A deferred with an errback. + + Notes: + Data will be sent across the wire pickled as a tuple + (sessid, kwargs). + + """ + deferreds = [] + for prot in self.factory.broadcasts: + deferreds.append(prot.callRemote(command, + packed_data=dumps((sessid, kwargs)))) + return DeferredList(deferreds, fireOnOneErrback=1).addErrback(self.errback, command.key) + + # generic function send/recvs + + def send_FunctionCall(self, modulepath, functionname, *args, **kwargs): + """ + Access method called by either process. This will call an arbitrary + function on the other process (On Portal if calling from Server and + vice versa). + + Inputs: + modulepath (str) - python path to module holding function to call + functionname (str) - name of function in given module + *args, **kwargs will be used as arguments/keyword args for the + remote function call + Returns: + A deferred that fires with the return value of the remote + function call + + """ + return self.callRemote(FunctionCall, + module=modulepath, + function=functionname, + args=dumps(args), + kwargs=dumps(kwargs)).addCallback( + lambda r: loads(r["result"])).addErrback(self.errback, "FunctionCall") + + @FunctionCall.responder + def receive_functioncall(self, module, function, func_args, func_kwargs): + """ + This allows Portal- and Server-process to call an arbitrary + function in the other process. It is intended for use by + plugin modules. + + Args: + module (str or module): The module containing the + `function` to call. + function (str): The name of the function to call in + `module`. + func_args (str): Pickled args tuple for use in `function` call. + func_kwargs (str): Pickled kwargs dict for use in `function` call. + + """ + args = loads(func_args) + kwargs = loads(func_kwargs) + + # call the function (don't catch tracebacks here) + result = variable_from_module(module, function)(*args, **kwargs) + + if isinstance(result, Deferred): + # if result is a deferred, attach handler to properly + # wrap the return value + result.addCallback(lambda r: {"result": dumps(r)}) + return result + else: + return {'result': dumps(result)} diff --git a/evennia/server/portal/amp_server.py b/evennia/server/portal/amp_server.py new file mode 100644 index 0000000000..3c5bf175d2 --- /dev/null +++ b/evennia/server/portal/amp_server.py @@ -0,0 +1,168 @@ +""" +The Evennia Portal service acts as an AMP-server, handling AMP +communication to the AMP clients connecting to it (by default +these are the Evennia Server and the evennia launcher). + +""" +from twisted.internet import protocol +from evennia.server.portal import amp + + +class AMPServerFactory(protocol.ServerFactory): + + """ + This factory creates AMP Server connection. This acts as the 'Portal'-side communication to the + 'Server' process. + + """ + noisy = False + + def __init__(self, portal): + """ + Initialize the factory. This is called as the Portal service starts. + + Args: + portal (Portal): The Evennia Portal service instance. + protocol (Protocol): The protocol the factory creates + instances of. + + """ + self.portal = portal + self.protocol = AMPServerProtocol + self.broadcasts = [] + + def buildProtocol(self, addr): + """ + Start a new connection, and store it on the service object. + + Args: + addr (str): Connection address. Not used. + + Returns: + protocol (Protocol): The created protocol. + + """ + self.portal.amp_protocol = AMPServerProtocol() + self.portal.amp_protocol.factory = self + return self.portal.amp_protocol + + +class AMPServerProtocol(amp.AMPMultiConnectionProtocol): + """ + Protocol subclass for the AMP-server run by the Portal. + + """ + def connectionMade(self): + """ + Called when a new connection is established. + + """ + super(AMPServerProtocol, self).connectionMade() + + sessdata = self.factory.portal.sessions.get_all_sync_data() + self.send_AdminPortal2Server(amp.DUMMYSESSION, + amp.PSYNC, + sessiondata=sessdata) + self.factory.portal.sessions.at_server_connection() + + # sending amp data + + def send_MsgPortal2Server(self, session, **kwargs): + """ + Access method called by the Portal and executed on the Portal. + + Args: + session (session): Session + kwargs (any, optional): Optional data. + + Returns: + deferred (Deferred): Asynchronous return. + + """ + return self.data_out(amp.MsgPortal2Server, session.sessid, **kwargs) + + def send_AdminPortal2Server(self, session, operation="", **kwargs): + """ + Send Admin instructions from the Portal to the Server. + Executed + on the Portal. + + Args: + session (Session): Session. + operation (char, optional): Identifier for the server operation, as defined by the + global variables in `evennia/server/amp.py`. + data (str or dict, optional): Data used in the administrative operation. + + """ + return self.data_out(amp.AdminPortal2Server, session.sessid, operation=operation, **kwargs) + + # receive amp data + + @amp.MsgServer2Portal.responder + def portal_receive_server2portal(self, packed_data): + """ + Receives message arriving to Portal from Server. + This method is executed on the Portal. + + Args: + packed_data (str): Pickled data (sessid, kwargs) coming over the wire. + + """ + sessid, kwargs = self.data_in(packed_data) + session = self.factory.portal.sessions.get(sessid, None) + if session: + self.factory.portal.sessions.data_out(session, **kwargs) + return {} + + @amp.AdminServer2Portal.responder + def portal_receive_adminserver2portal(self, packed_data): + """ + + Receives and handles admin operations sent to the Portal + This is executed on the Portal. + + Args: + packed_data (str): Data received, a pickled tuple (sessid, kwargs). + + """ + sessid, kwargs = self.data_in(packed_data) + operation = kwargs.pop("operation") + portal_sessionhandler = self.factory.portal.sessions + + if operation == amp.SLOGIN: # server_session_login + # a session has authenticated; sync it. + session = portal_sessionhandler.get(sessid) + if session: + portal_sessionhandler.server_logged_in(session, kwargs.get("sessiondata")) + + elif operation == amp.SDISCONN: # server_session_disconnect + # the server is ordering to disconnect the session + session = portal_sessionhandler.get(sessid) + if session: + portal_sessionhandler.server_disconnect(session, reason=kwargs.get("reason")) + + elif operation == amp.SDISCONNALL: # server_session_disconnect_all + # server orders all sessions to disconnect + portal_sessionhandler.server_disconnect_all(reason=kwargs.get("reason")) + + elif operation == amp.SSHUTD: # server_shutdown + # the server orders the portal to shut down + self.factory.portal.shutdown(restart=False) + + elif operation == amp.SRELOAD: # server reload + self.factory.portal.server_reload(**kwargs) + + elif operation == amp.SSYNC: # server_session_sync + # server wants to save session data to the portal, + # maybe because it's about to shut down. + portal_sessionhandler.server_session_sync(kwargs.get("sessiondata"), + kwargs.get("clean", True)) + # set a flag in case we are about to shut down soon + self.factory.server_restart_mode = True + + elif operation == amp.SCONN: # server_force_connection (for irc/etc) + portal_sessionhandler.server_connect(**kwargs) + + else: + raise Exception("operation %(op)s not recognized." % {'op': operation}) + return {} diff --git a/evennia/server/portal/portal.py b/evennia/server/portal/portal.py index eda7aeada4..293d1d6db5 100644 --- a/evennia/server/portal/portal.py +++ b/evennia/server/portal/portal.py @@ -187,13 +187,13 @@ if AMP_ENABLED: # the portal and the mud server. Only reason to ever deactivate # it would be during testing and debugging. - from evennia.server import amp + from evennia.server.portal import amp_server print(' amp (to Server): %s (internal)' % AMP_PORT) - factory = amp.AmpServerFactory(PORTAL) + factory = amp_server.AMPServerFactory(PORTAL) amp_service = internet.TCPServer(AMP_PORT, factory, interface=AMP_INTERFACE) - amp_service.setName("PortalAMPService") + amp_service.setName("PortalAMPServer") PORTAL.services.addService(amp_service) diff --git a/evennia/server/server.py b/evennia/server/server.py index 643b3161f7..be282527f6 100644 --- a/evennia/server/server.py +++ b/evennia/server/server.py @@ -548,9 +548,9 @@ if AMP_ENABLED: ifacestr = "-%s" % AMP_INTERFACE print(' amp (to Portal)%s: %s (internal)' % (ifacestr, AMP_PORT)) - from evennia.server import amp + from evennia.server import amp_client - factory = amp.AmpClientFactory(EVENNIA) + factory = amp_client.AMPClientFactory(EVENNIA) amp_service = internet.TCPClient(AMP_HOST, AMP_PORT, factory) amp_service.setName('ServerAMPClient') EVENNIA.services.addService(amp_service)