diff --git a/evennia/server/amp.py b/evennia/server/amp.py index 76f03aca4d..76b99101f0 100644 --- a/evennia/server/amp.py +++ b/evennia/server/amp.py @@ -53,12 +53,6 @@ _SENDBATCH = defaultdict(list) _MSGBUFFER = defaultdict(list) import zlib -#_ZLIB_FLUSH = zlib.Z_SYNC_FLUSH -#_ZLIB_COMP = zlib.compressobj(9) -#_ZLIB_DECOMP = zlib.decompressobj() -#_ZLIB_COMPRESS = lambda data: _ZLIB_COMP.compress(data) + _ZLIB_COMP.flush(_ZLIB_FLUSH) -#_ZLIB_DECOMPRESS = lambda data: _ZLIB_DECOMP.decompress(data) - def get_restart_mode(restart_file): """ @@ -201,15 +195,17 @@ class AmpClientFactory(protocol.ReconnectingClientFactory): class Compressed(amp.String): """ - This is an Argument that both handles too-long sends as well as - uses zlib for compression across the wire. Much of this is - borrowed from ~glyph/+junk/amphacks/mediumbox. + This is a customn AMP command Argument that both handles too-long + sends as well as uses zlib for compression across the wire. The + batch-grouping of too-long sends is borrowed from the "mediumbox" + recipy at twisted-hacks's ~glyph/+junk/amphacks/mediumbox. """ def fromBox(self, name, strings, objects, proto): """ - Converts from box representation to python. + Converts from box representation to python. We + group very long data into batches. """ value = StringIO() value.write(strings.get(name)) @@ -223,7 +219,8 @@ class Compressed(amp.String): def toBox(self, name, strings, objects, proto): """ - Convert from data to box. + Convert from data to box. We handled too-long + batched data and put it together here. """ value = StringIO(objects[name]) strings[name] = value.read(AMP_MAXLEN) @@ -235,13 +232,13 @@ class Compressed(amp.String): def toString(self, inObject): """ - Convert to send on the wire. + Convert to send on the wire, with compression. """ return zlib.compress(inObject, 9) def fromString(self, inString): """ - Convert from the wire to Python. + Convert (decompress) from the wire to Python. """ return zlib.decompress(inString) @@ -254,7 +251,7 @@ class MsgPortal2Server(amp.Command): key = "MsgPortal2Server" arguments = [('data', Compressed())] errors = [(Exception, 'EXCEPTION')] - response = [('timing', amp.String())] + response = [] class MsgServer2Portal(amp.Command): @@ -265,11 +262,10 @@ class MsgServer2Portal(amp.Command): key = "MsgServer2Portal" arguments = [('data', Compressed())] errors = [(Exception, 'EXCEPTION')] - #response = [] - response = [('timing', amp.String())] + response = [] -class ServerAdmin(amp.Command): +class AdminPortal2Server(amp.Command): """ Administration Portal -> Server @@ -277,14 +273,13 @@ class ServerAdmin(amp.Command): server, such as when a new session connects or resyncs """ - key = "ServerAdmin" + key = "AdminPortal2Server" arguments = [('data', Compressed())] errors = [(Exception, 'EXCEPTION')] - #response = [] - response = [('timing', amp.String())] + response = [] -class PortalAdmin(amp.Command): +class AdminServer2Portal(amp.Command): """ Administration Server -> Portal @@ -292,11 +287,10 @@ class PortalAdmin(amp.Command): portal. """ - key = "PortalAdmin" + key = "AdminServer2Portal" arguments = [('data', Compressed())] errors = [(Exception, 'EXCEPTION')] - #response = [()] - response = [('timing', amp.String())] + response = [] class FunctionCall(amp.Command): @@ -353,18 +347,19 @@ class AMPProtocol(amp.AMP): def connectionMade(self): """ - This is called when a connection is established between server - and portal. AMP calls it on both sides, so we need to make - sure to only trigger resync from the portal side. + This is called when an AMP connection is (re-)established + between server and portal. AMP calls it on both sides, so we + need to make sure to only trigger resync from the portal side. """ - # this makes for a factor x10 faster sends! + # this makes for a factor x10 faster sends across the wire self.transport.setTcpNoDelay(True) + if hasattr(self.factory, "portal"): # only the portal has the 'portal' property, so we know we are # on the portal side and can initialize the connection. sessdata = self.factory.portal.sessions.get_all_sync_data() - self.call_remote_ServerAdmin(0, + self.send_AdminPortal2Server(0, PSYNC, data=sessdata) self.factory.portal.sessions.at_server_connection() @@ -386,10 +381,8 @@ class AMPProtocol(amp.AMP): e.trap(Exception) print "AMP Error for %(info)s: %(e)s" % {'info': info, 'e': e.getErrorMessage()} - def callback(self, ret, info): - print "AMP return timing (%s): %f" % (info, time() - float(ret["timing"])) - def batch_send(self, command, sessid, **kwargs): + def send_data(self, command, sessid, **kwargs): """ This will batch data together to send fewer, large batches. @@ -397,107 +390,40 @@ class AMPProtocol(amp.AMP): command (AMP Command): A protocol send command. sessid (int): A unique Session id. - Kwargs: - force_direct (bool): Send direct, without batching data. - Returns: - deferreds (list or None): A list of deferreds firing with - as batch parts get sent (or fails). + deferred (deferred or None): A deferred with an errback. + + Notes: + Data will be sent across the wire pickled as a tuple + (sessid, kwargs). """ - - global _SENDBATCH - - if command: - # always put AMP command in cache - _SENDBATCH[command].append((sessid, kwargs)) - self.send_batch_counter += 1 - force_direct = kwargs.pop("force_direct", False) - now = time() - - if force_direct: - # check the current command rate to determine if we - # can return send mode or not. We add 1 to counter - # to avoid cases when it happens to be 0. - self.send_mode = (((self.send_batch_counter + 1) / - (now - self.send_reset_time)) <= (BATCH_RATE*BATCH_TIMEOUT)) - self.send_batch_counter = 0 - self.send_reset_time = now - if not (self.send_mode and self.send_task): - self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True) - else: - self.send_task = None - elif self.send_mode and self.send_batch_counter > BATCH_RATE: - # we have reached the batch count. How long this took - # defines if we should halt sending or not. - self.send_mode = now - self.send_reset_time >= 1.0 - #print "BATCH_RATE:", BATCH_RATE / (now - self.send_reset_time) - self.send_batch_counter = 0 - self.send_reset_time = now - if not (self.send_mode and self.send_task): - force_direct = True # make sure to empty cache - self.send_task = reactor.callLater(BATCH_TIMEOUT, self.batch_send, None, None, force_direct=True) - - if self.send_mode or force_direct: - sendsize = 0 - sendcount = 0 - for command, cmdlist in _SENDBATCH.items(): - batch = dumps(cmdlist) # batch is a list of (sessid,kwargs) tuples. - # We pack the data in a string-form pickle. - sendsize += sys.getsizeof(batch) - sendcount += 1 - del _SENDBATCH[command] - deferred = self.callRemote(command, - data=batch).addErrback(self.errback, command.key + " " + repr(cmdlist)) - print " sent %i batches: %fkB." % (sendcount, sendsize / 1000.0) - return deferred - - - def batch_recv(self, data): - """ - This will receive and unpack data sent as a batch. This both - handles too-long data as well as batch-sending very fast- - arriving commands. - - Args: - data (str): Data coming over the wire. - Returns: - data (str or list): The received data. - - """ - # most common case - return loads(data) + batch = dumps((sessid, kwargs)) + return self.callRemote(command, + data=batch).addErrback(self.errback, command.key) # Message definition + helper methods to call/create each message type # Portal -> Server Msg - def amp_msg_portal2server(self, data): + @MsgPortal2Server.responder + def server_receive_msgportal2server(self, data): """ - Relays message to server. This method is executed on the - Server. - - Since AMP has a limit of 65355 bytes per message, it's - possible the data comes in multiple chunks; if so (nparts>1) - we buffer the data and wait for the remaining parts to arrive - before continuing. + Receives message arriving to server. This method is executed + on the Server. Args: - data (str): Data to send (often a part of a batch) + data (str): Data to receive (a pickled tuple (sessid,kwargs)) """ - batch = self.batch_recv(data) - for (sessid, kwargs) in batch: - #print "msg portal -> server (server side):", sessid, msg, loads(ret["data"]) - from evennia.server.profiling.timetrace import timetrace - kwargs["msg"] = timetrace(kwargs["msg"], "AMP.amp_msg_portal2server") - self.factory.server.sessions.data_in(sessid, + sessid, kwargs = loads(data) + #print "msg portal -> server (server side):", sessid, msg, loads(ret["data"]) + self.factory.server.sessions.data_in(sessid, text=kwargs["msg"], data=kwargs["data"]) - return {"timing":"%f" % time()} - MsgPortal2Server.responder(amp_msg_portal2server) + return {} - def call_remote_MsgPortal2Server(self, sessid, msg, data=""): + def send_MsgPortal2Server(self, sessid, msg="", data=""): """ Access method called by the Portal and executed on the Portal. @@ -511,17 +437,17 @@ class AMPProtocol(amp.AMP): """ #print "msg portal->server (portal side):", sessid, msg, data - from evennia.server.profiling.timetrace import timetrace - msg = timetrace(msg, "AMP.call_remote_MsgPortal2Server") - return self.batch_send(MsgPortal2Server, sessid, - msg=msg if msg is not None else "", - data=data) + return self.send_data(MsgPortal2Server, sessid, + msg=msg, + data=data) # Server -> Portal message - def amp_msg_server2portal(self, data): + @MsgServer2Portal.responder + def portal_receive_server2portal(self, data): """ - Relays message to Portal. This method is executed on the Portal. + Receives message arriving to Portal from Server. + This method is executed on the Portal. Since AMP has a limit of 65355 bytes per message, it's possible the data comes in multiple chunks; if so (nparts>1) @@ -529,48 +455,20 @@ class AMPProtocol(amp.AMP): before continuing. Args: - data (str): Data to send (often a part of a batch) + data (str): Pickled data (sessid, kwargs) coming over the wire. """ - batch = self.batch_recv(data) - for (sessid, kwargs) in batch: - #print "msg server->portal (portal side):", sessid, ret["text"], loads(ret["data"]) - from evennia.server.profiling.timetrace import timetrace - kwargs["msg"] = timetrace(kwargs["msg"], "AMP.amp_msg_server2portal") - self.factory.portal.sessions.data_out(sessid, - text=kwargs["msg"], - data=kwargs["data"]) - #return {} - return {"timing":"%f" % time()} - MsgServer2Portal.responder(amp_msg_server2portal) + sessid, kwargs = loads(data) + #print "msg server->portal (portal side):", sessid, ret["text"], loads(ret["data"]) + self.factory.portal.sessions.data_out(sessid, + text=kwargs["msg"], + data=kwargs["data"]) + return {} - def amp_batch_server2portal(self, data): + + def send_MsgServer2Portal(self, sessid, msg="", data=""): """ - Relays batch data to Portal. This method is executed on the Portal. - - Since AMP has a limit of 65355 bytes per message, it's - possible the data comes in multiple chunks; if so (nparts>1) - we buffer the data and wait for the remaining parts to arrive - before continuing. - - Args: - data (str): Data to send (often a part of a batch) - - """ - batch = self.batch_recv(data) - if batch is not None: - for (sessid, kwargs) in batch: - from evennia.server.profiling.timetrace import timetrace - kwargs["msg"] = timetrace(kwargs["msg"], "AMP.amp_batch_server2portal") - self.factory.portal.sessions.data_out(sessid, - text=kwargs["msg"], - **kwargs["data"]) - #return {} - return {"timing":"%f" % time()} - MsgServer2Portal.responder(amp_batch_server2portal) - - def call_remote_MsgServer2Portal(self, sessid, msg="", data=""): - """ - Send Message - access method called by the Server and executed on the Server. + Access method - executed on the Server for sending data + to Portal. Args: sessid (int): Unique Session id. @@ -578,63 +476,56 @@ class AMPProtocol(amp.AMP): data (str, optional): Extra data. """ - from evennia.server.profiling.timetrace import timetrace - msg = timetrace(msg, "AMP.call_remote_MsgServer2Portal") #print "msg server->portal (server side):", sessid, msg, data - return self.batch_send(MsgServer2Portal, sessid, msg=msg, data=data) + return self.send_data(MsgServer2Portal, sessid, msg=msg, data=data) # Server administration from the Portal side - def amp_server_admin(self, data): + @AdminPortal2Server.responder + def server_receive_adminportal2server(self, data): """ - This allows the portal to perform admin - operations on the server. This is executed on the Server. - - Since AMP has a limit of 65355 bytes per message, it's - possible the data comes in multiple chunks; if so (nparts>1) - we buffer the data and wait for the remaining parts to arrive - before continuing. + Receives admin data from the Portal (allows the portal to + perform admin operations on the server). This is executed on + the Server. Args: data (str): Data to send (often a part of a batch) """ #print "serveradmin (server side):", hashid, ipart, nparts - batch = self.batch_recv(data) + sessid, kwargs = loads(data) - for (sessid, kwargs) in batch: - operation = kwargs["operation"] - data = kwargs["data"] - server_sessionhandler = self.factory.server.sessions + operation = kwargs["operation"] + data = kwargs["data"] + server_sessionhandler = self.factory.server.sessions - #print "serveradmin (server side):", sessid, ord(operation), data + #print "serveradmin (server side):", sessid, ord(operation), data - if operation == PCONN: # portal_session_connect - # create a new session and sync it - server_sessionhandler.portal_connect(data) + if operation == PCONN: # portal_session_connect + # create a new session and sync it + server_sessionhandler.portal_connect(data) - elif operation == PCONNSYNC: #portal_session_sync - server_sessionhandler.portal_session_sync(data) + elif operation == PCONNSYNC: #portal_session_sync + server_sessionhandler.portal_session_sync(data) - elif operation == PDISCONN: # portal_session_disconnect - # session closed from portal side - self.factory.server.sessions.portal_disconnect(sessid) + elif operation == PDISCONN: # portal_session_disconnect + # session closed from portal side + self.factory.server.sessions.portal_disconnect(sessid) - elif operation == PSYNC: # portal_session_sync - # force a resync of sessions when portal reconnects to - # server (e.g. after a server reboot) the data kwarg - # contains a dict {sessid: {arg1:val1,...}} - # representing the attributes to sync for each - # session. - server_sessionhandler.portal_sessions_sync(data) - else: - raise Exception("operation %(op)s not recognized." % {'op': operation}) - #return {} - return {"timing":"%f" % time()} - ServerAdmin.responder(amp_server_admin) + elif operation == PSYNC: # portal_session_sync + # force a resync of sessions when portal reconnects to + # server (e.g. after a server reboot) the data kwarg + # contains a dict {sessid: {arg1:val1,...}} + # representing the attributes to sync for each + # session. + server_sessionhandler.portal_sessions_sync(data) + else: + raise Exception("operation %(op)s not recognized." % {'op': operation}) + return {} - def call_remote_ServerAdmin(self, sessid, operation="", data=""): + def send_AdminPortal2Server(self, sessid, operation="", data=""): """ - Administrative access method called by the Portal and Executed + Send Admin instructions from the Portal to the Server. + Executed on the Portal. Args: @@ -645,69 +536,61 @@ class AMPProtocol(amp.AMP): """ #print "serveradmin (portal side):", sessid, ord(operation), data - if hasattr(self.factory, "server_restart_mode"): - return self.batch_send(ServerAdmin, sessid, force_direct=True, operation=operation, data=data) - return self.batch_send(ServerAdmin, sessid, operation=operation, data=data) + return self.send_data(AdminPortal2Server, sessid, operation=operation, data=data) # Portal administraton from the Server side - def amp_portal_admin(self, data): + @AdminServer2Portal.responder + def portal_receive_adminserver2portal(self, data): """ - This allows the server to perform admin - operations on the portal. This is executed on the Portal. - Since AMP has a limit of 65355 bytes per message, it's - possible the data comes in multiple chunks; if so (nparts>1) - we buffer the data and wait for the remaining parts to arrive - before continuing. + Receives and handles admin operations sent to the Portal + This is executed on the Portal. Args: - data (str): Data to send (often a part of a batch) + data (str): Data received, a pickled tuple (sessid, kwargs). """ #print "portaladmin (portal side):", sessid, ord(operation), data - batch = self.batch_recv(data) - for (sessid, kwargs) in batch: - operation = kwargs["operation"] - data = kwargs["data"] - portal_sessionhandler = self.factory.portal.sessions + sessid, kwargs = loads(data) + operation = kwargs["operation"] + data = kwargs["data"] + portal_sessionhandler = self.factory.portal.sessions - if operation == SLOGIN: # server_session_login - # a session has authenticated; sync it. - portal_sessionhandler.server_logged_in(sessid, data) + if operation == SLOGIN: # server_session_login + # a session has authenticated; sync it. + portal_sessionhandler.server_logged_in(sessid, data) - elif operation == SDISCONN: # server_session_disconnect - # the server is ordering to disconnect the session - portal_sessionhandler.server_disconnect(sessid, reason=data) + elif operation == SDISCONN: # server_session_disconnect + # the server is ordering to disconnect the session + portal_sessionhandler.server_disconnect(sessid, reason=data) - elif operation == SDISCONNALL: # server_session_disconnect_all - # server orders all sessions to disconnect - portal_sessionhandler.server_disconnect_all(reason=data) + elif operation == SDISCONNALL: # server_session_disconnect_all + # server orders all sessions to disconnect + portal_sessionhandler.server_disconnect_all(reason=data) - elif operation == SSHUTD: # server_shutdown - # the server orders the portal to shut down - self.factory.portal.shutdown(restart=False) + elif operation == SSHUTD: # server_shutdown + # the server orders the portal to shut down + self.factory.portal.shutdown(restart=False) - elif operation == SSYNC: # server_session_sync - # server wants to save session data to the portal, - # maybe because it's about to shut down. - portal_sessionhandler.server_session_sync(data) - # set a flag in case we are about to shut down soon - self.factory.server_restart_mode = True + elif operation == SSYNC: # server_session_sync + # server wants to save session data to the portal, + # maybe because it's about to shut down. + portal_sessionhandler.server_session_sync(data) + # set a flag in case we are about to shut down soon + self.factory.server_restart_mode = True - elif operation == SCONN: # server_force_connection (for irc/imc2 etc) - portal_sessionhandler.server_connect(**data) + elif operation == SCONN: # server_force_connection (for irc/imc2 etc) + portal_sessionhandler.server_connect(**data) - else: - raise Exception("operation %(op)s not recognized." % {'op': operation}) - #return {} - return {"timing":"%f" % time()} - PortalAdmin.responder(amp_portal_admin) + else: + raise Exception("operation %(op)s not recognized." % {'op': operation}) + return {} - def call_remote_PortalAdmin(self, sessid, operation="", data=""): + def send_AdminServer2Portal(self, sessid, operation="", data=""): """ - Administrative access method called by the Server side and executed - onthe Portal. + Administrative access method called by the Server to send an + instruction to the Portal. Args: sessid (int): Session id. @@ -718,13 +601,12 @@ class AMPProtocol(amp.AMP): operation. """ - if operation == SSYNC: - return self.batch_send(PortalAdmin, sessid, force_direct=True, operation=operation, data=data) - return self.batch_send(PortalAdmin, sessid, operation=operation, data=data) + return self.send_data(AdminServer2Portal, sessid, operation=operation, data=data) # Extra functions - def amp_function_call(self, module, function, args, **kwargs): + @FunctionCall.responder + def receive_functioncall(self, module, function, args, **kwargs): """ This allows Portal- and Server-process to call an arbitrary function in the other process. It is intended for use by @@ -752,9 +634,8 @@ class AMPProtocol(amp.AMP): return result else: return {'result': dumps(result)} - FunctionCall.responder(amp_function_call) - def call_remote_FunctionCall(self, modulepath, functionname, *args, **kwargs): + def send_FunctionCall(self, modulepath, functionname, *args, **kwargs): """ Access method called by either process. This will call an arbitrary function on the other process (On Portal if calling from Server and