diff --git a/evennia/server/amp.py b/evennia/server/amp.py index c01eb60014..20c18456d0 100644 --- a/evennia/server/amp.py +++ b/evennia/server/amp.py @@ -25,7 +25,7 @@ try: except ImportError: import pickle from twisted.protocols import amp -from twisted.internet import protocol, reactor +from twisted.internet import protocol, task from twisted.internet.defer import Deferred from evennia.utils.utils import to_str, variable_from_module @@ -41,8 +41,13 @@ SSHUTD = chr(7) # server shutdown SSYNC = chr(8) # server session sync SCONN = chr(9) # server creating new connection (for irc/imc2 bots etc) PCONNSYNC = chr(10) # portal post-syncing a session +AMP_MAXLEN = 65535 # max allowed data length in AMP protocol (cannot be changed) -MAXLEN = 65535 # max allowed data length in AMP protocol +BATCH_RATE = 500 # max commands/sec before switching to batch-sending +BATCH_TIMEOUT = 1.0 # how often to poll to empty batch queue, in seconds + +# buffers +_SENDBATCH = defaultdict(list) _MSGBUFFER = defaultdict(list) def get_restart_mode(restart_file): @@ -140,11 +145,10 @@ class MsgPortal2Server(amp.Command): Message portal -> server """ key = "MsgPortal2Server" - arguments = [('sessid', amp.Integer()), + arguments = [('hashid', amp.Integer()), + ('data', amp.String()), ('ipart', amp.Integer()), - ('nparts', amp.Integer()), - ('msg', amp.String()), - ('data', amp.String())] + ('nparts', amp.Integer())] errors = [(Exception, 'EXCEPTION')] response = [] @@ -154,23 +158,6 @@ class MsgServer2Portal(amp.Command): Message server -> portal """ key = "MsgServer2Portal" - arguments = [('sessid', amp.Integer()), - ('ipart', amp.Integer()), - ('nparts', amp.Integer()), - ('msg', amp.String()), - ('data', amp.String())] - errors = [(Exception, 'EXCEPTION')] - response = [] - - -class BatchServer2Portal(amp.Command): - """ - Batch data server -> portal - - This is used when the amount of outgoing data - is very high, to minimize the throughput. - """ - key = "BatchServer2Portal" arguments = [('hashid', amp.Integer()), ('data', amp.String()), ('ipart', amp.Integer()), @@ -178,6 +165,7 @@ class BatchServer2Portal(amp.Command): errors = [(Exception, 'EXCEPTION')] response = [] + class ServerAdmin(amp.Command): """ Portal -> Server @@ -187,11 +175,10 @@ class ServerAdmin(amp.Command): session connects or resyncs """ key = "ServerAdmin" - arguments = [('sessid', amp.Integer()), + arguments = [('hashid', amp.Integer()), + ('data', amp.String()), ('ipart', amp.Integer()), - ('nparts', amp.Integer()), - ('operation', amp.String()), - ('data', amp.String())] + ('nparts', amp.Integer())] errors = [(Exception, 'EXCEPTION')] response = [] @@ -204,11 +191,10 @@ class PortalAdmin(amp.Command): operations on the portal. """ key = "PortalAdmin" - arguments = [('sessid', amp.Integer()), + arguments = [('hashid', amp.Integer()), + ('data', amp.String()), ('ipart', amp.Integer()), - ('nparts', amp.Integer()), - ('operation', amp.String()), - ('data', amp.String())] + ('nparts', amp.Integer())] errors = [(Exception, 'EXCEPTION')] response = [] @@ -218,7 +204,8 @@ class FunctionCall(amp.Command): Bidirectional Sent when either process needs to call an - arbitrary function in the other. + arbitrary function in the other. This does + not use the batch-send functionality. """ key = "FunctionCall" arguments = [('module', amp.String()), @@ -234,9 +221,6 @@ class FunctionCall(amp.Command): dumps = lambda data: to_str(pickle.dumps(to_str(data), pickle.HIGHEST_PROTOCOL)) loads = lambda data: pickle.loads(to_str(data)) -# multipart message store - - #------------------------------------------------------------ # Core AMP protocol for communication Server <-> Portal @@ -252,15 +236,6 @@ class AMPProtocol(amp.AMP): subclasses that specify the datatypes of the input/output of these methods. """ - def __init__(self, *args, **kwargs): - """ - Initialize the protocol - """ - super(AMPProtocol, self).__init__(*args, **kwargs) - self.outbatch = [] - self.inbatch = [] - self.lastsend = time() - # helper methods def connectionMade(self): @@ -280,12 +255,17 @@ class AMPProtocol(amp.AMP): self.factory.portal.sessions.at_server_connection() if hasattr(self.factory, "server_restart_mode"): del self.factory.server_restart_mode + # should be set both on portal and server + self.min_batch_step = 1.0 / BATCH_RATE + self.lastsend = time() + self.task = task.LoopingCall(self.batch_send, MsgPortal2Server, None) + self.task.start(BATCH_TIMEOUT) # Error handling def errback(self, e, info): "error handler, to avoid dropping connections on server tracebacks." - f = e.trap(Exception) + e.trap(Exception) print "AMP Error for %(info)s: %(e)s" % {'info': info, 'e': e.getErrorMessage()} @@ -293,22 +273,26 @@ class AMPProtocol(amp.AMP): """ This will batch data together to send fewer, large batches. """ + global _SENDBATCH if sessid is not None: - self.outbatch.append((sessid, kwargs)) + _SENDBATCH.append((sessid, kwargs)) + if not _SENDBATCH: + return + now = time() - timeout = 0.0025 - if time() - self.lastsend > timeout: - batch = dumps(self.outbatch) - self.outbatch = [] - to_send = [batch[i:i+MAXLEN] for i in range(0, len(batch), MAXLEN)] + if now - self.lastsend > self.min_batch_step: + batch = dumps(_SENDBATCH) + _SENDBATCH = [] + to_send = [batch[i:i+AMP_MAXLEN] for i in range(0, len(batch), AMP_MAXLEN)] nparts = len(to_send) - hashid=id(batch) + # tag this batch + hashid = "%s-%s" % (id(batch), now) if nparts == 1: deferreds = self.callRemote(command, hashid=hashid, data=batch, ipart=0, - nparts=1).addErrback(self.errback, "BatchServer2Portal") + nparts=1).addErrback(self.errback, command.key) else: deferreds = [] for ipart, part in enumerate(to_send): @@ -317,111 +301,37 @@ class AMPProtocol(amp.AMP): data=part, ipart=ipart, nparts=nparts) - deferred.addErrback(self.errback, "BatchServer2Portal-part") + deferred.addErrback(self.errback, "%s part %i/%i" % (command.key, ipart, part)) deferreds.append(deferred) - self.lastsend = time() + self.lastsend = time() # don't use now here, keep it as up-to-date as possible return deferreds - else: - # make sure to tick - reactor.callLater(0.01, self.batch_send, command, None) def batch_recv(self, hashid, data, ipart, nparts): """ - This will receive and unpack data sent as a batch. + This will receive and unpack data sent as a batch. This both + handles too-long data as well as batch-sending very fast- + arriving commands. """ global _MSGBUFFER if nparts == 1: + # most common case return loads(data) else: if ipart < nparts-1: # not yet complete _MSGBUFFER[hashid].append(data) + return [] else: - # all parts in place - data = _MSGBUFFER.pop(hashid) + data - return loads(data) + # all parts in place - deserialize it + return loads(_MSGBUFFER.pop(hashid) + data) - def safe_send(self, command, sessid, **kwargs): - """ - This helper method splits the sending of a message into - multiple parts with a maxlength of MAXLEN. This is to avoid - repetition in two sending commands. when calling this the - maximum length has already been exceeded. The max-length will - be checked for all kwargs and these will be used as argument - to the command. The command type must have keywords ipart and - nparts to track the parts and put them back together on the - other side. - - Returns a deferred or a list of such - """ - - - to_send = [(key, [string[i:i+MAXLEN] for i in range(0, len(string), MAXLEN)]) - for key, string in kwargs.items()] - nparts_max = max(len(part[1]) for part in to_send) - if nparts_max == 1: - # first try to send directly - return self.callRemote(command, - sessid=sessid, - ipart=0, - nparts=1, - **kwargs).addErrback(self.errback, command.key) - else: - # one or more parts were too long for MAXLEN. - #print "TooLong triggered!" - deferreds = [] - for ipart in range(nparts_max): - part_kwargs = {} - for key, str_part in to_send: - try: - part_kwargs[key] = str_part[ipart] - except IndexError: - # means this kwarg needed fewer splits - part_kwargs[key] = "" - # send this part - #print "amp safe sending:", ipart, nparts_max, str_part - deferreds.append(self.callRemote( - command, - sessid=sessid, - ipart=ipart, - nparts=nparts_max, - **part_kwargs).addErrback(self.errback, command.key)) - return deferreds - - def safe_recv(self, command, sessid, ipart, nparts, **kwargs): - """ - Safely decode potentially split data coming over the wire. No - decoding or parsing is done here, only merging of data split - with safe_send(). - If the data stream is not yet complete, this method will return - None, otherwise it will return a dictionary of the (possibly - merged) properties. - """ - global _MSGBUFFER - if nparts == 1: - # the most common case - return kwargs - else: - # part of a multi-part send - hashid = "%s_%s" % (command.key, sessid) - #print "amp safe receive:", ipart, nparts-1, kwargs - if ipart < nparts-1: - # not yet complete - _MSGBUFFER[hashid].append(kwargs) - return - else: - # all parts in place, put them back together - buf = _MSGBUFFER.pop(hashid) + [kwargs] - recv_kwargs = dict((key, "".join(kw[key] for kw in buf)) for key in kwargs) - return recv_kwargs - # Message definition + helper methods to call/create each message type # Portal -> Server Msg - def amp_msg_portal2server(self, sessid, ipart, nparts, msg, data): + def amp_msg_portal2server(self, hashid, data, ipart, nparts): """ Relays message to server. This method is executed on the Server. @@ -429,13 +339,12 @@ class AMPProtocol(amp.AMP): data comes in multiple chunks; if so (nparts>1) we buffer the data and wait for the remaining parts to arrive before continuing. """ - ret = self.safe_recv(MsgPortal2Server, sessid, ipart, nparts, - text=msg, data=data) - if ret is not None: + batch = self.batch_recv(MsgPortal2Server, hashid, data, ipart, nparts) + for (sessid, kwargs) in batch: #print "msg portal -> server (server side):", sessid, msg, loads(ret["data"]) self.factory.server.sessions.data_in(sessid, - text=ret["text"], - **loads(ret["data"])) + text=kwargs["msg"], + data=kwargs["data"]) return {} MsgPortal2Server.responder(amp_msg_portal2server) @@ -444,23 +353,22 @@ class AMPProtocol(amp.AMP): Access method called by the Portal and executed on the Portal. """ #print "msg portal->server (portal side):", sessid, msg, data - return self.safe_send(MsgPortal2Server, sessid, - msg=msg if msg is not None else "", - data=dumps(data)) + return self.batch_send(MsgPortal2Server, sessid, + msg=msg if msg is not None else "", + data=data) # Server -> Portal message - def amp_msg_server2portal(self, sessid, ipart, nparts, msg, data): + def amp_msg_server2portal(self, hashid, data, ipart, nparts): """ Relays message to Portal. This method is executed on the Portal. """ - ret = self.safe_recv(MsgServer2Portal, sessid, - ipart, nparts, text=msg, data=data) - if ret is not None: + batch = self.batch_recv(hashid, data, ipart, nparts) + for (sessid, kwargs) in batch: #print "msg server->portal (portal side):", sessid, ret["text"], loads(ret["data"]) self.factory.portal.sessions.data_out(sessid, - text=ret["text"], - **loads(ret["data"])) + text=kwargs["msg"], + data=kwargs["data"]) return {} MsgServer2Portal.responder(amp_msg_server2portal) @@ -475,31 +383,27 @@ class AMPProtocol(amp.AMP): text=kwargs["msg"], **kwargs["data"]) return {} - BatchServer2Portal.responder(amp_batch_server2portal) + MsgServer2Portal.responder(amp_batch_server2portal) def call_remote_MsgServer2Portal(self, sessid, msg, data=""): """ Access method called by the Server and executed on the Server. """ #print "msg server->portal (server side):", sessid, msg, data - return self.batch_send(BatchServer2Portal, sessid, msg=msg, data=data) - #return self.safe_send(MsgServer2Portal, sessid, - # msg=msg if msg is not None else "", - # data=dumps(data)) + return self.batch_send(MsgServer2Portal, sessid, msg=msg, data=data) # Server administration from the Portal side - def amp_server_admin(self, sessid, ipart, nparts, operation, data): + def amp_server_admin(self, hashid, data, ipart, nparts): """ This allows the portal to perform admin operations on the server. This is executed on the Server. """ - ret = self.safe_recv(ServerAdmin, sessid, ipart, nparts, - operation=operation, data=data) + batch = self.batch_recv(hashid, data, ipart, nparts) - if ret is not None: - data = loads(ret["data"]) - operation = ret["operation"] + for (sessid, kwargs) in batch: + operation = kwargs["operation"] + data = kwargs["data"] server_sessionhandler = self.factory.server.sessions #print "serveradmin (server side):", sessid, ord(operation), data @@ -532,21 +436,20 @@ class AMPProtocol(amp.AMP): Access method called by the Portal and Executed on the Portal. """ #print "serveradmin (portal side):", sessid, ord(operation), data - data = dumps(data) - return self.safe_send(ServerAdmin, sessid, operation=operation, data=data) + return self.batch_send(ServerAdmin, sessid, operation=operation, data=data) # Portal administraton from the Server side - def amp_portal_admin(self, sessid, ipart, nparts, operation, data): + def amp_portal_admin(self, hashid, data, ipart, nparts): """ This allows the server to perform admin operations on the portal. This is executed on the Portal. """ #print "portaladmin (portal side):", sessid, ord(operation), data - ret = self.safe_recv(PortalAdmin, sessid, ipart, nparts, - operation=operation, data=data) - if ret is not None: - data = loads(data) + batch = self.batch_recv(hashid, data, ipart, nparts) + for (sessid, kwargs) in batch: + operation = kwargs["operation"] + data = kwargs["data"] portal_sessionhandler = self.factory.portal.sessions if operation == SLOGIN: # server_session_login @@ -584,7 +487,7 @@ class AMPProtocol(amp.AMP): """ Access method called by the server side. """ - return self.safe_send(PortalAdmin, sessid, operation=operation, data=dumps(data)) + return self.batch_send(PortalAdmin, sessid, operation=operation, data=dumps(data)) # Extra functions